package io.debezium.connector.sqlserver;

import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.openlineage.DebeziumTestTransport;
import io.debezium.openlineage.facets.DebeziumConfigFacet;
import io.debezium.relational.TableId;
import io.debezium.util.Testing;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.transports.TransportBuilder;
import io.openlineage.client.transports.TransportConfig;
import java.sql.SQLException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.assertj.core.api.Assertions;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/sqlserver/OpenLineageIT.class */
public class OpenLineageIT extends AbstractAsyncEngineConnectorTest {
    private SqlServerConnection connection;

    @Before
    public void before() throws SQLException {
        TestHelper.createTestDatabases(TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2);
        this.connection = TestHelper.multiPartitionTestConnection();
        TableId tableId = new TableId(TestHelper.TEST_DATABASE_1, "dbo", "tableA");
        TableId tableId2 = new TableId(TestHelper.TEST_DATABASE_1, "dbo", "tableB");
        this.connection.execute(new String[]{"CREATE TABLE %s (id int primary key, colA varchar(32))".formatted(this.connection.quotedTableIdString(tableId)), "CREATE TABLE %s (id int primary key, colB varchar(32))".formatted(this.connection.quotedTableIdString(tableId2)), "INSERT INTO %s VALUES(1, 'a1')".formatted(this.connection.quotedTableIdString(tableId)), "INSERT INTO %s VALUES(2, 'b')".formatted(this.connection.quotedTableIdString(tableId2))});
        TestHelper.enableTableCdc(this.connection, tableId);
        TestHelper.enableTableCdc(this.connection, tableId2);
        TableId tableId3 = new TableId(TestHelper.TEST_DATABASE_2, "dbo", "tableA");
        TableId tableId4 = new TableId(TestHelper.TEST_DATABASE_2, "dbo", "tableC");
        this.connection.execute(new String[]{"CREATE TABLE %s (id int primary key, colA varchar(32))".formatted(this.connection.quotedTableIdString(tableId3)), "CREATE TABLE %s (id int primary key, colC varchar(32))".formatted(this.connection.quotedTableIdString(tableId4)), "INSERT INTO %s VALUES(3, 'a2')".formatted(this.connection.quotedTableIdString(tableId3)), "INSERT INTO %s VALUES(4, 'c')".formatted(this.connection.quotedTableIdString(tableId4))});
        TestHelper.enableTableCdc(this.connection, tableId3);
        TestHelper.enableTableCdc(this.connection, tableId4);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
    }

    @After
    public void after() throws SQLException {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @Test
    public void shouldProduceMultipleOpenLineageJobRunningEvent() {
        DebeziumTestTransport debeziumTestTransport = getDebeziumTestTransport();
        start(SqlServerConnector.class, TestHelper.defaultConfig(TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2).with("tasks.max", 2).with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL.getValue()).with("openlineage.integration.enabled", true).with("openlineage.integration.config.file.path", getClass().getClassLoader().getResource("openlineage/openlineage.yml").getPath()).with("openlineage.integration.job.description", "This connector does cdc for products").with("openlineage.integration.job.tags", "env=prod,team=cdc").with("openlineage.integration.job.owners", "Mario=maintainer,John Doe=Data scientist").build());
        assertConnectorIsRunning();
        TestHelper.waitForDatabaseSnapshotsToBeCompletedWithMultipleTasks(TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2);
        List list = debeziumTestTransport.getRunEvents().stream().filter(runningEventsWithOutInputDatasets()).sorted(Comparator.comparing(runEvent -> {
            return runEvent.getJob().getName();
        })).toList();
        Assertions.assertThat(list).hasSize(2);
        assertEventContainsExpectedData((OpenLineage.RunEvent) list.get(0), "0", TestHelper.TEST_DATABASE_1);
        assertEventContainsExpectedData((OpenLineage.RunEvent) list.get(1), "1", TestHelper.TEST_DATABASE_2);
    }

    @NotNull
    private static Predicate<OpenLineage.RunEvent> runningEventsWithOutInputDatasets() {
        return runEvent -> {
            return runEvent.getEventType() == OpenLineage.RunEvent.EventType.RUNNING && runEvent.getInputs().isEmpty();
        };
    }

    private static DebeziumTestTransport getDebeziumTestTransport() {
        return ((TransportBuilder) StreamSupport.stream(ServiceLoader.load(TransportBuilder.class).spliterator(), false).filter(transportBuilder -> {
            return transportBuilder.getType().equals("debezium");
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException("Failed to find TransportBuilder");
        })).build((TransportConfig) null);
    }

    private static void assertEventContainsExpectedData(OpenLineage.RunEvent runEvent, String str, String str2) {
        Assertions.assertThat(runEvent.getJob().getNamespace()).isEqualTo(TestHelper.TEST_SERVER_NAME);
        Assertions.assertThat(runEvent.getJob().getName()).isEqualTo("server1." + str);
        Assertions.assertThat(runEvent.getJob().getFacets().getDocumentation().getDescription()).isEqualTo("This connector does cdc for products");
        Assertions.assertThat(runEvent.getRun().getFacets().getProcessing_engine().getName()).isEqualTo("Debezium");
        Assertions.assertThat(runEvent.getRun().getFacets().getProcessing_engine().getVersion()).matches("^\\d+\\.\\d+\\.\\d+(\\.Final|-SNAPSHOT)$");
        Assertions.assertThat(runEvent.getRun().getFacets().getProcessing_engine().getOpenlineageAdapterVersion()).matches("^\\d+\\.\\d+\\.\\d+$");
        Assertions.assertThat(((DebeziumConfigFacet) runEvent.getRun().getFacets().getAdditionalProperties().get("debezium_config")).getConfigs()).contains(new String[]{"connector.class=io.debezium.connector.sqlserver.SqlServerConnector", "database.names=" + str2, "database.hostname=localhost", "database.password=Password!", "database.port=1433", "database.user=sa", "errors.max.retries=-1", "errors.retry.delay.initial.ms=300", "errors.retry.delay.max.ms=10000", "key.converter=org.apache.kafka.connect.json.JsonConverter", "name=testing-connector", "tasks.max=2", "offset.flush.interval.ms=0", "offset.flush.timeout.ms=5000", "offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore", "openlineage.integration.enabled=true", "openlineage.integration.job.description=This connector does cdc for products", "openlineage.integration.job.owners=Mario=maintainer,John Doe=Data scientist", "openlineage.integration.job.tags=env=prod,team=cdc", "record.processing.order=ORDERED", "record.processing.shutdown.timeout.ms=1000", "record.processing.threads=", "record.processing.with.serial.consumer=false", "snapshot.mode=initial", "topic.prefix=server1", "value.converter=org.apache.kafka.connect.json.JsonConverter"}).anyMatch(str3 -> {
            return str3.startsWith("openlineage.integration.config.file.path=") && str3.contains("openlineage.yml");
        }).anyMatch(str4 -> {
            return str4.startsWith("offset.storage.file.filename=") && str4.contains("file-connector-offsets.txt");
        });
        Map map = (Map) runEvent.getJob().getFacets().getTags().getTags().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        Assertions.assertThat(runEvent.getProducer().toString()).startsWith("https://github.com/debezium/debezium/");
        Assertions.assertThat(map).contains(new Map.Entry[]{Assertions.entry("env", "prod"), Assertions.entry("team", "cdc")});
        Assertions.assertThat((Map) runEvent.getJob().getFacets().getOwnership().getOwners().stream().collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, (v0) -> {
            return v0.getType();
        }))).contains(new Map.Entry[]{Assertions.entry("Mario", "maintainer"), Assertions.entry("John Doe", "Data scientist")});
    }
}
