package io.debezium.server.eventhubs;

import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumerClient;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import io.debezium.server.DebeziumServer;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.ArrayList;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@QuarkusTest
@TestProfile(EventHubsWithPartitionRouterProfile.class)
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
/* loaded from: input_file:io/debezium/server/eventhubs/EventHubsWithPartitionRouterIT.class */
public class EventHubsWithPartitionRouterIT {
    private static final int MESSAGE_COUNT = 4;
    private static final String CONSUMER_GROUP = "$Default";

    @Inject
    DebeziumServer server;
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsWithPartitionRouterIT.class);
    protected static EventHubProducerClient producer = null;
    protected static EventHubConsumerClient consumer = null;

    public EventHubsWithPartitionRouterIT() {
        Testing.Files.delete(EventHubsTestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile(EventHubsTestConfigSource.OFFSET_STORE_PATH);
    }

    @AfterAll
    static void stop() {
        if (producer != null) {
            producer.close();
        }
        if (consumer != null) {
            consumer.close();
        }
    }

    void setupDependencies(@Observes ConnectorStartedEvent connectorStartedEvent) {
        producer = new EventHubClientBuilder().connectionString(String.format("%s;EntityPath=%s", EventHubsTestConfigSource.getEventHubsConnectionString(), EventHubsTestConfigSource.getEventHubsName())).buildProducerClient();
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent connectorCompletedEvent) throws Exception {
        if (!connectorCompletedEvent.isSuccess()) {
            throw ((Exception) connectorCompletedEvent.getError().get());
        }
    }

    @Test
    public void testEventHubsWithCustomPartitionConfiguration() throws Exception {
        Testing.Print.enable();
        consumer = new EventHubClientBuilder().connectionString(String.format("%s;EntityPath=%s", EventHubsTestConfigSource.getEventHubsConnectionString(), EventHubsTestConfigSource.getEventHubsName())).consumerGroup(CONSUMER_GROUP).buildConsumerClient();
        ArrayList arrayList = new ArrayList();
        Awaitility.await().atMost(Duration.ofSeconds(EventHubsTestConfigSource.waitForSeconds())).until(() -> {
            consumer.receiveFromPartition("1", MESSAGE_COUNT, EventPosition.latest()).forEach(partitionEvent -> {
                arrayList.add(partitionEvent);
            });
            return Boolean.valueOf(arrayList.size() >= MESSAGE_COUNT);
        });
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String bodyAsString = ((PartitionEvent) arrayList.get(i)).getData().getBodyAsString();
            String str = "\"id\":100" + String.valueOf(i + 1);
            Assertions.assertTrue(bodyAsString.contains(str), str + " not found in payload");
        }
    }
}
