package io.debezium.connector.oracle.logminer.buffered;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.logminer.buffered.ehcache.CacheCapacityExceededException;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.ErrorHandler;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenAdapterNameIsNot(SkipWhenAdapterNameIsNot.AdapterName.LOGMINER_BUFFERED)
/* loaded from: input_file:io/debezium/connector/oracle/logminer/buffered/EhcacheIT.class */
public class EhcacheIT extends AbstractAsyncEngineConnectorTest {

    @Rule
    public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();
    private static OracleConnection connection;

    @BeforeClass
    public static void beforeClass() throws SQLException {
        connection = TestHelper.testConnection();
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (connection != null) {
            connection.close();
        }
    }

    @Test
    @FixFor({"DBZ-8874"})
    public void shouldNotSilentlyEvictEventsOverThreshold() throws Exception {
        TestHelper.dropTable(connection, "dbz8874");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz8874 (id numeric(9,0) primary key, data varchar2(4000))"});
            TestHelper.streamTable(connection, "dbz8874");
            connection.execute(new String[]{"INSERT INTO dbz8874 (id,data) values (1,'snapshot')"});
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ8874").with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, "ehcache").with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_GLOBAL_CONFIG, TestHelper.getEhcacheGlobalCacheConfig()).with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_TRANSACTIONS_CONFIG, getSmallCacheSize()).with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_PROCESSED_TRANSACTIONS_CONFIG, getSmallCacheSize()).with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_SCHEMA_CHANGES_CONFIG, getSmallCacheSize()).with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG, getSmallCacheSize()).build();
            LogInterceptor logInterceptor = new LogInterceptor(ErrorHandler.class);
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ8874");
            Assertions.assertThat(getAfter((SourceRecord) recordsForTopic.get(0)).get("ID")).isEqualTo(1);
            Assertions.assertThat(getAfter((SourceRecord) recordsForTopic.get(0)).get("DATA")).isEqualTo("snapshot");
            String randomAlphanumeric = RandomStringUtils.randomAlphanumeric(4000);
            for (int i = 0; i < 10000; i++) {
                connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz8874 (id,data) values (" + (i + 2) + ",'" + randomAlphanumeric + "')"});
            }
            connection.commit();
            Awaitility.await().atMost(Duration.ofSeconds(TestHelper.defaultMessageConsumerPollTimeout())).until(() -> {
                return Boolean.valueOf(!isStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME));
            });
            ((AbstractBooleanAssert) Assertions.assertThat(logInterceptor.containsThrowableWithCause(CacheCapacityExceededException.class)).as("Expected an Ehcache cache to throw " + CacheCapacityExceededException.class.getName(), new Object[0])).isTrue();
            TestHelper.dropTable(connection, "dbz8874");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz8874");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-8874"})
    public void shouldNotCauseEvictionWhenCacheSizedProperly() throws Exception {
        TestHelper.dropTable(connection, "dbz8874");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz8874 (id numeric(9,0) primary key, data varchar2(4000))"});
            TestHelper.streamTable(connection, "dbz8874");
            connection.execute(new String[]{"INSERT INTO dbz8874 (id,data) values (1,'snapshot')"});
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ8874").with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, "ehcache").with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_GLOBAL_CONFIG, TestHelper.getEhcacheGlobalCacheConfig()).with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_TRANSACTIONS_CONFIG, getLargeCacheSize()).with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_PROCESSED_TRANSACTIONS_CONFIG, getLargeCacheSize()).with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_SCHEMA_CHANGES_CONFIG, getLargeCacheSize()).with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG, getLargeCacheSize()).build();
            LogInterceptor logInterceptor = new LogInterceptor(ErrorHandler.class);
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ8874");
            Assertions.assertThat(getAfter((SourceRecord) recordsForTopic.get(0)).get("ID")).isEqualTo(1);
            Assertions.assertThat(getAfter((SourceRecord) recordsForTopic.get(0)).get("DATA")).isEqualTo("snapshot");
            String randomAlphanumeric = RandomStringUtils.randomAlphanumeric(4000);
            for (int i = 0; i < 10000; i++) {
                connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz8874 (id,data) values (" + (i + 2) + ",'" + randomAlphanumeric + "')"});
            }
            connection.commit();
            for (int i2 = 0; i2 < 10000; i2++) {
                SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ8874").get(0);
                Assertions.assertThat(getAfter(sourceRecord).get("ID")).isEqualTo(Integer.valueOf(2 + i2));
                Assertions.assertThat(getAfter(sourceRecord).get("DATA")).isEqualTo(randomAlphanumeric);
            }
            assertNoRecordsToConsume();
            ((AbstractBooleanAssert) Assertions.assertThat(logInterceptor.containsThrowableWithCause(CacheCapacityExceededException.class)).as("Expected an Ehcache cache to throw " + CacheCapacityExceededException.class.getName(), new Object[0])).isFalse();
            TestHelper.dropTable(connection, "dbz8874");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz8874");
            throw th;
        }
    }

    private Struct getAfter(SourceRecord sourceRecord) {
        return ((Struct) sourceRecord.value()).getStruct("after");
    }

    private String getSmallCacheSize() {
        return TestHelper.getEhcacheBasicCacheConfig(67584);
    }

    private String getLargeCacheSize() {
        return TestHelper.getEhcacheBasicCacheConfig(1024000000);
    }
}
