package io.debezium.server;

import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.time.Duration;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperties;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

@QuarkusTest
@TestProfile(DebeziumServerConnectFormatProfile.class)
@EnabledIfSystemProperty(named = "test.apicurio", matches = "false", disabledReason = "DebeziumServerConfigProvidersIT doesn't run with apicurio profile.")
@DisabledIfSystemProperties({@DisabledIfSystemProperty(named = "debezium.format.key", matches = "protobuf"), @DisabledIfSystemProperty(named = "debezium.format.value", matches = "protobuf")})
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
/* loaded from: input_file:io/debezium/server/DebeziumServerConnectFormatIT.class */
public class DebeziumServerConnectFormatIT {
    private static final int MESSAGE_COUNT = 4;

    @Inject
    DebeziumServer server;

    public DebeziumServerConnectFormatIT() {
        Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
    }

    void setupDependencies(@Observes ConnectorStartedEvent connectorStartedEvent) {
        if (TestConfigSource.isItTest()) {
        }
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent connectorCompletedEvent) throws Exception {
        if (!connectorCompletedEvent.isSuccess()) {
            throw ((Exception) connectorCompletedEvent.getError().get());
        }
    }

    @Test
    public void testPostgresWithSourceRecord() throws Exception {
        Testing.Print.enable();
        TestConsumer testConsumer = (TestConsumer) this.server.getConsumer();
        Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
            return Boolean.valueOf(testConsumer.getValues().size() >= MESSAGE_COUNT);
        });
        Assertions.assertThat(testConsumer.getValues().size()).isEqualTo(MESSAGE_COUNT);
        SourceRecord sourceRecord = (SourceRecord) testConsumer.getValues().get(3);
        Assertions.assertThat(((Struct) sourceRecord.key()).getInt32("id")).isEqualTo(1004);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
        Assertions.assertThat(struct.getInt32("id")).isEqualTo(1004);
        Assertions.assertThat(struct.getString("first_name")).isEqualTo("Anne");
        Assertions.assertThat(struct.getString("last_name")).isEqualTo("Kretchmar");
        Assertions.assertThat(struct.getString("email")).isEqualTo("annek@noanswer.org");
        Assertions.assertThat(sourceRecord.headers().size()).isEqualTo(0);
    }
}
