package io.debezium.server.infinispan;

import io.debezium.server.DebeziumServer;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
import java.time.Duration;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@QuarkusTest
@QuarkusTestResource.List({@QuarkusTestResource(PostgresTestResourceLifecycleManager.class), @QuarkusTestResource(InfinispanTestResourceLifecycleManager.class)})
/* loaded from: input_file:io/debezium/server/infinispan/InfinispanSinkConsumerIT.class */
public class InfinispanSinkConsumerIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanSinkConsumerIT.class);
    private static final int MESSAGE_COUNT = 4;

    @Inject
    DebeziumServer server;
    private DefaultCacheManager cacheManager;
    private Cache<String, String> cache;

    @Test
    public void testStreaming() throws Exception {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        String format = String.format("hotrod://%s:%s@%s:%d", InfinispanTestConfigSource.USER_NAME, InfinispanTestConfigSource.PASSWORD, InfinispanTestResourceLifecycleManager.getHost(), Integer.valueOf(InfinispanTestResourceLifecycleManager.getPort()));
        LOGGER.info("Connected to Infinispan server using URI '{}'", format);
        configurationBuilder.uri(format);
        RemoteCache cache = new RemoteCacheManager(configurationBuilder.build()).getCache(InfinispanTestConfigSource.CACHE_NAME);
        Assertions.assertThat(cache.size()).isEqualTo(0);
        Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
            return Boolean.valueOf(cache.size() == MESSAGE_COUNT);
        });
        Assertions.assertThat(cache.size()).isEqualTo(MESSAGE_COUNT);
    }

    static {
        Testing.Files.delete(InfinispanTestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile(InfinispanTestConfigSource.OFFSET_STORE_PATH);
    }
}
