package io.debezium.connector.oracle;

import ch.qos.logback.classic.Level;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipTestDependingOnStrategyRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIs;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.junit.SkipWhenLogMiningStrategyIs;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.util.Testing;
import java.io.File;
import java.io.FileReader;
import java.math.BigDecimal;
import java.sql.Clob;
import java.sql.NClob;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Hybrid does not support CLOB")
/* loaded from: input_file:io/debezium/connector/oracle/OracleClobDataTypeIT.class */
public class OracleClobDataTypeIT extends AbstractAsyncEngineConnectorTest {
    private static final String JSON_DATA = Testing.Files.readResourceAsString("data/test_lob_data.json");
    private static final String JSON_DATA2 = Testing.Files.readResourceAsString("data/test_lob_data2.json");

    @Rule
    public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();

    @Rule
    public final TestRule skipStrategyRule = new SkipTestDependingOnStrategyRule();
    private OracleConnection connection;

    @Before
    public void before() {
        this.connection = TestHelper.testConnection();
        TestHelper.dropTable(this.connection, "CLOB_TEST");
        setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
    }

    @After
    public void after() throws Exception {
        if (this.connection != null) {
            TestHelper.dropTable(this.connection, "CLOB_TEST");
            this.connection.close();
        }
    }

    @Test
    @FixFor({"DBZ-2948"})
    public void shouldSnapshotClobDataTypeValues() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE CLOB_TEST (ID numeric(9,0), VAL_CLOB_SHORT clob, VAL_CLOB_LONG clob, VAL_NCLOB_SHORT nclob, VAL_NCLOB_LONG nclob, primary key(id))"});
        Clob createClob = createClob("Hello World");
        Clob createClob2 = createClob(part(JSON_DATA, 0, 5000));
        NClob createNClob = createNClob("Hello World");
        NClob createNClob2 = createNClob(part(JSON_DATA2, 0, 5000));
        this.connection.prepareQuery("INSERT INTO CLOB_TEST VALUES (1, ?, ?, ?, ?)", preparedStatement -> {
            preparedStatement.setClob(1, createClob);
            preparedStatement.setClob(2, createClob2);
            preparedStatement.setNClob(3, createNClob);
            preparedStatement.setNClob(4, createNClob2);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        TestHelper.streamTable(this.connection, "debezium.clob_test");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CLOB_TEST").with(OracleConnectorConfig.LOB_ENABLED, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidRead(sourceRecord, "ID", 1);
        Struct after = after(sourceRecord);
        Assertions.assertThat(after.get("ID")).isEqualTo(1);
        Assertions.assertThat(after.get("VAL_CLOB_SHORT")).isEqualTo(getClobString(createClob));
        Assertions.assertThat(after.get("VAL_CLOB_LONG")).isEqualTo(getClobString(createClob2));
        Assertions.assertThat(after.get("VAL_NCLOB_SHORT")).isEqualTo(getClobString(createNClob));
        Assertions.assertThat(after.get("VAL_NCLOB_LONG")).isEqualTo(getClobString(createNClob2));
    }

    @Test
    @FixFor({"DBZ-2948"})
    public void shouldStreamInlineClobDataTypeValues() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE CLOB_TEST (ID numeric(9,0), VAL_CLOB clob, VAL_NCLOB nclob, primary key(id))"});
        TestHelper.streamTable(this.connection, "debezium.clob_test");
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CLOB_TEST").with(OracleConnectorConfig.LOB_ENABLED, true).build();
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        Clob createClob = createClob(part(JSON_DATA, 0, 1000));
        NClob createNClob = createNClob(part(JSON_DATA2, 0, 1000));
        this.connection.prepareQuery("INSERT INTO CLOB_TEST VALUES (1, ?, ?)", preparedStatement -> {
            preparedStatement.setClob(1, createClob);
            preparedStatement.setNClob(2, createNClob);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
        Struct after = after(sourceRecord);
        Assertions.assertThat(after.get("ID")).isEqualTo(1);
        Assertions.assertThat(after.get("VAL_CLOB")).isEqualTo(getClobString(createClob));
        Assertions.assertThat(after.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob));
        Clob createClob2 = createClob(part(JSON_DATA, 1, 1000));
        NClob createNClob2 = createNClob(part(JSON_DATA2, 1, 1000));
        this.connection.prepareQuery("INSERT INTO CLOB_TEST VALUES (2, ?, ?)", preparedStatement2 -> {
            preparedStatement2.setClob(1, createClob2);
            preparedStatement2.setNClob(2, createNClob2);
        }, (JdbcConnection.ResultSetConsumer) null);
        Clob createClob3 = createClob(part(JSON_DATA, 2, 1000));
        NClob createNClob3 = createNClob(part(JSON_DATA2, 2, 1000));
        this.connection.prepareQuery("INSERT INTO CLOB_TEST VALUES (3, ?, ?)", preparedStatement3 -> {
            preparedStatement3.setClob(1, createClob3);
            preparedStatement3.setNClob(2, createNClob3);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
        Struct after2 = after(sourceRecord2);
        Assertions.assertThat(after2.get("ID")).isEqualTo(2);
        Assertions.assertThat(after2.get("VAL_CLOB")).isEqualTo(getClobString(createClob2));
        Assertions.assertThat(after2.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob2));
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(1);
        VerifyRecord.isValidInsert(sourceRecord3, "ID", 3);
        Struct after3 = after(sourceRecord3);
        Assertions.assertThat(after3.get("ID")).isEqualTo(3);
        Assertions.assertThat(after3.get("VAL_CLOB")).isEqualTo(getClobString(createClob3));
        Assertions.assertThat(after3.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob3));
        Clob createClob4 = createClob(part(JSON_DATA, 1, 1000));
        NClob createNClob4 = createNClob(part(JSON_DATA2, 1, 1000));
        this.connection.prepareQuery("UPDATE CLOB_TEST SET val_clob=?, val_nclob=? WHERE id = 1", preparedStatement4 -> {
            preparedStatement4.setClob(1, createClob4);
            preparedStatement4.setNClob(2, createNClob4);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidUpdate(sourceRecord4, "ID", 1);
        Struct after4 = after(sourceRecord4);
        Assertions.assertThat(after4.get("ID")).isEqualTo(1);
        Assertions.assertThat(after4.get("VAL_CLOB")).isEqualTo(getClobString(createClob4));
        Assertions.assertThat(after4.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob4));
        Clob createClob5 = createClob(part(JSON_DATA, 0, 1024));
        NClob createNClob5 = createNClob(part(JSON_DATA2, 0, 1024));
        this.connection.prepareQuery("UPDATE CLOB_TEST SET val_clob=?, val_nclob=? WHERE id = 2", preparedStatement5 -> {
            preparedStatement5.setClob(1, createClob5);
            preparedStatement5.setNClob(2, createNClob5);
        }, (JdbcConnection.ResultSetConsumer) null);
        Clob createClob6 = createClob(part(JSON_DATA, 1, 1025));
        NClob createNClob6 = createNClob(part(JSON_DATA2, 1, 1025));
        this.connection.prepareQuery("UPDATE CLOB_TEST SET val_clob=?, val_nclob=? WHERE id = 3", preparedStatement6 -> {
            preparedStatement6.setClob(1, createClob6);
            preparedStatement6.setNClob(2, createNClob6);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord5 = (SourceRecord) consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidUpdate(sourceRecord5, "ID", 2);
        Struct after5 = after(sourceRecord5);
        Assertions.assertThat(after5.get("ID")).isEqualTo(2);
        Assertions.assertThat(after5.get("VAL_CLOB")).isEqualTo(getClobString(createClob5));
        Assertions.assertThat(after5.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob5));
        SourceRecord sourceRecord6 = (SourceRecord) consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST")).get(1);
        VerifyRecord.isValidUpdate(sourceRecord6, "ID", 3);
        Struct after6 = after(sourceRecord6);
        Assertions.assertThat(after6.get("ID")).isEqualTo(3);
        Assertions.assertThat(after6.get("VAL_CLOB")).isEqualTo(getClobString(createClob6));
        Assertions.assertThat(after6.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob6));
        this.connection.execute(new String[]{"DELETE FROM debezium.clob_test WHERE id = 1"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic5 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic5.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord7 = (SourceRecord) consumeRecordsByTopic5.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidDelete(sourceRecord7, "ID", 1);
        Struct before = before(sourceRecord7);
        Assertions.assertThat(before.get("ID")).isEqualTo(1);
        Assertions.assertThat(before.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(after(sourceRecord7)).isNull();
        this.connection.executeWithoutCommitting(new String[]{"DELETE FROM debezium.clob_test WHERE id = 2"});
        this.connection.executeWithoutCommitting(new String[]{"DELETE FROM debezium.clob_test WHERE id = 3"});
        this.connection.execute(new String[]{"COMMIT"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic6 = consumeRecordsByTopic(4);
        Assertions.assertThat(consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST"))).hasSize(4);
        SourceRecord sourceRecord8 = (SourceRecord) consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidDelete(sourceRecord8, "ID", 2);
        Struct before2 = before(sourceRecord8);
        Assertions.assertThat(before2.get("ID")).isEqualTo(2);
        Assertions.assertThat(before2.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before2.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(after(sourceRecord8)).isNull();
        SourceRecord sourceRecord9 = (SourceRecord) consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST")).get(2);
        VerifyRecord.isValidDelete(sourceRecord9, "ID", 3);
        Struct before3 = before(sourceRecord9);
        Assertions.assertThat(before3.get("ID")).isEqualTo(3);
        Assertions.assertThat(before3.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before3.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(after(sourceRecord9)).isNull();
    }

    @Test
    @FixFor({"DBZ-2948"})
    public void shouldStreamInlineClobDataTypeValuesWithNonClobDataTypeField() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE CLOB_TEST (ID numeric(9,0), VAL_CLOB clob, VAL_NCLOB nclob, VAL_DATA varchar2(50), primary key(id))"});
        TestHelper.streamTable(this.connection, "debezium.clob_test");
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CLOB_TEST").with(OracleConnectorConfig.LOB_ENABLED, true).build();
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        Clob createClob = createClob(part(JSON_DATA, 0, 1000));
        NClob createNClob = createNClob(part(JSON_DATA2, 0, 1000));
        this.connection.prepareQuery("INSERT INTO clob_test VALUES (1, ?, ?, 'Test1')", preparedStatement -> {
            preparedStatement.setClob(1, createClob);
            preparedStatement.setNClob(2, createNClob);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
        Struct after = after(sourceRecord);
        Assertions.assertThat(after.get("ID")).isEqualTo(1);
        Assertions.assertThat(after.get("VAL_CLOB")).isEqualTo(getClobString(createClob));
        Assertions.assertThat(after.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob));
        Assertions.assertThat(after.get("VAL_DATA")).isEqualTo("Test1");
        Clob createClob2 = createClob(part(JSON_DATA, 1, 1000));
        NClob createNClob2 = createNClob(part(JSON_DATA2, 1, 1000));
        this.connection.prepareQuery("INSERT INTO clob_test VALUES (2, ?, ?, 'Test2')", preparedStatement2 -> {
            preparedStatement2.setClob(1, createClob2);
            preparedStatement2.setNClob(2, createNClob2);
        }, (JdbcConnection.ResultSetConsumer) null);
        Clob createClob3 = createClob(part(JSON_DATA, 2, 1000));
        NClob createNClob3 = createNClob(part(JSON_DATA2, 2, 1000));
        this.connection.prepareQuery("INSERT INTO clob_test VALUES (3, ?, ?, 'Test3')", preparedStatement3 -> {
            preparedStatement3.setClob(1, createClob3);
            preparedStatement3.setNClob(2, createNClob3);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
        Struct after2 = after(sourceRecord2);
        Assertions.assertThat(after2.get("ID")).isEqualTo(2);
        Assertions.assertThat(after2.get("VAL_CLOB")).isEqualTo(getClobString(createClob2));
        Assertions.assertThat(after2.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob2));
        Assertions.assertThat(after2.get("VAL_DATA")).isEqualTo("Test2");
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(1);
        VerifyRecord.isValidInsert(sourceRecord3, "ID", 3);
        Struct after3 = after(sourceRecord3);
        Assertions.assertThat(after3.get("ID")).isEqualTo(3);
        Assertions.assertThat(after3.get("VAL_CLOB")).isEqualTo(getClobString(createClob3));
        Assertions.assertThat(after3.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob3));
        Assertions.assertThat(after3.get("VAL_DATA")).isEqualTo("Test3");
        Clob createClob4 = createClob(part(JSON_DATA, 1, 1000));
        NClob createNClob4 = createNClob(part(JSON_DATA2, 1, 1000));
        this.connection.prepareQuery("UPDATE clob_test SET val_clob=?, val_nclob=?, val_data='Test1U' WHERE id = 1", preparedStatement4 -> {
            preparedStatement4.setClob(1, createClob4);
            preparedStatement4.setNClob(2, createNClob4);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidUpdate(sourceRecord4, "ID", 1);
        Struct after4 = after(sourceRecord4);
        Assertions.assertThat(after4.get("ID")).isEqualTo(1);
        Assertions.assertThat(after4.get("VAL_CLOB")).isEqualTo(getClobString(createClob4));
        Assertions.assertThat(after4.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob4));
        Assertions.assertThat(after4.get("VAL_DATA")).isEqualTo("Test1U");
        Clob createClob5 = createClob(part(JSON_DATA, 0, 1024));
        NClob createNClob5 = createNClob(part(JSON_DATA2, 0, 1024));
        this.connection.prepareQuery("UPDATE clob_test SET val_clob=?, val_nclob=?, val_data='Test2U' WHERE id = 2", preparedStatement5 -> {
            preparedStatement5.setClob(1, createClob5);
            preparedStatement5.setNClob(2, createNClob5);
        }, (JdbcConnection.ResultSetConsumer) null);
        Clob createClob6 = createClob(part(JSON_DATA, 1, 1025));
        NClob createNClob6 = createNClob(part(JSON_DATA2, 1, 1025));
        this.connection.prepareQuery("UPDATE clob_test SET val_clob=?, val_nclob=?, val_data='Test3U' WHERE id = 3", preparedStatement6 -> {
            preparedStatement6.setClob(1, createClob6);
            preparedStatement6.setNClob(2, createNClob6);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord5 = (SourceRecord) consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidUpdate(sourceRecord5, "ID", 2);
        Struct after5 = after(sourceRecord5);
        Assertions.assertThat(after5.get("ID")).isEqualTo(2);
        Assertions.assertThat(after5.get("VAL_CLOB")).isEqualTo(getClobString(createClob5));
        Assertions.assertThat(after5.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob5));
        Assertions.assertThat(after5.get("VAL_DATA")).isEqualTo("Test2U");
        SourceRecord sourceRecord6 = (SourceRecord) consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST")).get(1);
        VerifyRecord.isValidUpdate(sourceRecord6, "ID", 3);
        Struct after6 = after(sourceRecord6);
        Assertions.assertThat(after6.get("ID")).isEqualTo(3);
        Assertions.assertThat(after6.get("VAL_CLOB")).isEqualTo(getClobString(createClob6));
        Assertions.assertThat(after6.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob6));
        Assertions.assertThat(after6.get("VAL_DATA")).isEqualTo("Test3U");
        this.connection.execute(new String[]{"DELETE FROM debezium.clob_test WHERE id = 1"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic5 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic5.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord7 = (SourceRecord) consumeRecordsByTopic5.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidDelete(sourceRecord7, "ID", 1);
        Struct before = before(sourceRecord7);
        Assertions.assertThat(before.get("ID")).isEqualTo(1);
        Assertions.assertThat(before.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before.get("VAL_DATA")).isEqualTo("Test1U");
        Assertions.assertThat(after(sourceRecord7)).isNull();
        this.connection.executeWithoutCommitting(new String[]{"DELETE FROM debezium.clob_test WHERE id = 2"});
        this.connection.executeWithoutCommitting(new String[]{"DELETE FROM debezium.clob_test WHERE id = 3"});
        this.connection.execute(new String[]{"COMMIT"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic6 = consumeRecordsByTopic(4);
        Assertions.assertThat(consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST"))).hasSize(4);
        SourceRecord sourceRecord8 = (SourceRecord) consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidDelete(sourceRecord8, "ID", 2);
        Struct before2 = before(sourceRecord8);
        Assertions.assertThat(before2.get("ID")).isEqualTo(2);
        Assertions.assertThat(before2.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before2.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before2.get("VAL_DATA")).isEqualTo("Test2U");
        Assertions.assertThat(after(sourceRecord8)).isNull();
        SourceRecord sourceRecord9 = (SourceRecord) consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST")).get(2);
        VerifyRecord.isValidDelete(sourceRecord9, "ID", 3);
        Struct before3 = before(sourceRecord9);
        Assertions.assertThat(before3.get("ID")).isEqualTo(3);
        Assertions.assertThat(before3.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before3.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before3.get("VAL_DATA")).isEqualTo("Test3U");
        Assertions.assertThat(after(sourceRecord9)).isNull();
    }

    @Test
    @FixFor({"DBZ-2948"})
    public void shouldStreamLargeClobDataTypeValues() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE CLOB_TEST (ID numeric(9,0), VAL_CLOB clob, VAL_NCLOB nclob, primary key(id))"});
        TestHelper.streamTable(this.connection, "debezium.clob_test");
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CLOB_TEST").with(OracleConnectorConfig.LOB_ENABLED, true).build();
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        Clob createClob = createClob(part(JSON_DATA, 0, 25000));
        NClob createNClob = createNClob(part(JSON_DATA2, 0, 25000));
        this.connection.prepareQuery("INSERT INTO CLOB_TEST VALUES (1, ?, ?)", preparedStatement -> {
            preparedStatement.setClob(1, createClob);
            preparedStatement.setNClob(2, createNClob);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
        Struct after = after(sourceRecord);
        Assertions.assertThat(after.get("ID")).isEqualTo(1);
        Assertions.assertThat(after.get("VAL_CLOB")).isEqualTo(getClobString(createClob));
        Assertions.assertThat(after.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob));
        Clob createClob2 = createClob(part(JSON_DATA, 1, 24450));
        NClob createNClob2 = createNClob(part(JSON_DATA2, 1, 24450));
        this.connection.prepareQuery("INSERT INTO CLOB_TEST VALUES (2, ?, ?)", preparedStatement2 -> {
            preparedStatement2.setClob(1, createClob2);
            preparedStatement2.setNClob(2, createNClob2);
        }, (JdbcConnection.ResultSetConsumer) null);
        Clob createClob3 = createClob(part(JSON_DATA, 3, 24450));
        NClob createNClob3 = createNClob(part(JSON_DATA2, 3, 24450));
        this.connection.prepareQuery("INSERT INTO CLOB_TEST VALUES (3, ?, ?)", preparedStatement3 -> {
            preparedStatement3.setClob(1, createClob3);
            preparedStatement3.setNClob(2, createNClob3);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
        Struct after2 = after(sourceRecord2);
        Assertions.assertThat(after2.get("ID")).isEqualTo(2);
        Assertions.assertThat(after2.get("VAL_CLOB")).isEqualTo(getClobString(createClob2));
        Assertions.assertThat(after2.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob2));
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(1);
        VerifyRecord.isValidInsert(sourceRecord3, "ID", 3);
        Struct after3 = after(sourceRecord3);
        Assertions.assertThat(after3.get("ID")).isEqualTo(3);
        Assertions.assertThat(after3.get("VAL_CLOB")).isEqualTo(getClobString(createClob3));
        Assertions.assertThat(after3.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob3));
        Clob createClob4 = createClob(part(JSON_DATA, 1, 24500));
        NClob createNClob4 = createNClob(part(JSON_DATA2, 1, 24500));
        this.connection.prepareQuery("UPDATE CLOB_TEST SET val_clob=?, val_nclob=? WHERE id = 1", preparedStatement4 -> {
            preparedStatement4.setClob(1, createClob4);
            preparedStatement4.setNClob(2, createNClob4);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidUpdate(sourceRecord4, "ID", 1);
        Struct after4 = after(sourceRecord4);
        Assertions.assertThat(after4.get("ID")).isEqualTo(1);
        Assertions.assertThat(after4.get("VAL_CLOB")).isEqualTo(getClobString(createClob4));
        Assertions.assertThat(after4.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob4));
        Clob createClob5 = createClob(part(JSON_DATA, 2, 25000));
        NClob createNClob5 = createNClob(part(JSON_DATA2, 2, 25000));
        this.connection.prepareQuery("UPDATE CLOB_TEST SET val_clob=?, val_nclob=? WHERE id = 2", preparedStatement5 -> {
            preparedStatement5.setClob(1, createClob5);
            preparedStatement5.setNClob(2, createNClob5);
        }, (JdbcConnection.ResultSetConsumer) null);
        Clob createClob6 = createClob(part(JSON_DATA, 3, 25000));
        NClob createNClob6 = createNClob(part(JSON_DATA2, 3, 25000));
        this.connection.prepareQuery("UPDATE CLOB_TEST SET val_clob=?, val_nclob=? WHERE id = 3", preparedStatement6 -> {
            preparedStatement6.setClob(1, createClob6);
            preparedStatement6.setNClob(2, createNClob6);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord5 = (SourceRecord) consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidUpdate(sourceRecord5, "ID", 2);
        Struct after5 = after(sourceRecord5);
        Assertions.assertThat(after5.get("ID")).isEqualTo(2);
        Assertions.assertThat(after5.get("VAL_CLOB")).isEqualTo(getClobString(createClob5));
        Assertions.assertThat(after5.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob5));
        SourceRecord sourceRecord6 = (SourceRecord) consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST")).get(1);
        VerifyRecord.isValidUpdate(sourceRecord6, "ID", 3);
        Struct after6 = after(sourceRecord6);
        Assertions.assertThat(after6.get("ID")).isEqualTo(3);
        Assertions.assertThat(after6.get("VAL_CLOB")).isEqualTo(getClobString(createClob6));
        Assertions.assertThat(after6.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob6));
        this.connection.execute(new String[]{"DELETE FROM debezium.clob_test WHERE id = 1"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic5 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic5.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord7 = (SourceRecord) consumeRecordsByTopic5.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidDelete(sourceRecord7, "ID", 1);
        Struct before = before(sourceRecord7);
        Assertions.assertThat(before.get("ID")).isEqualTo(1);
        Assertions.assertThat(before.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(after(sourceRecord7)).isNull();
        this.connection.executeWithoutCommitting(new String[]{"DELETE FROM debezium.clob_test WHERE id = 2"});
        this.connection.executeWithoutCommitting(new String[]{"DELETE FROM debezium.clob_test WHERE id = 3"});
        this.connection.execute(new String[]{"COMMIT"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic6 = consumeRecordsByTopic(4);
        Assertions.assertThat(consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST"))).hasSize(4);
        SourceRecord sourceRecord8 = (SourceRecord) consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidDelete(sourceRecord8, "ID", 2);
        Struct before2 = before(sourceRecord8);
        Assertions.assertThat(before2.get("ID")).isEqualTo(2);
        Assertions.assertThat(before2.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before2.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(after(sourceRecord8)).isNull();
        SourceRecord sourceRecord9 = (SourceRecord) consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST")).get(2);
        VerifyRecord.isValidDelete(sourceRecord9, "ID", 3);
        Struct before3 = before(sourceRecord9);
        Assertions.assertThat(before3.get("ID")).isEqualTo(3);
        Assertions.assertThat(before3.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before3.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(after(sourceRecord9)).isNull();
    }

    @Test
    @FixFor({"DBZ-2948"})
    public void shouldStreamLargeClobDataTypeValuesWithNonClobDataTypeField() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE CLOB_TEST (ID numeric(9,0), VAL_CLOB clob, VAL_NCLOB nclob, VAL_DATA varchar2(50), primary key(id))"});
        TestHelper.streamTable(this.connection, "debezium.clob_test");
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CLOB_TEST").with(OracleConnectorConfig.LOB_ENABLED, true).build();
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        Clob createClob = createClob(part(JSON_DATA, 0, 25000));
        NClob createNClob = createNClob(part(JSON_DATA2, 0, 25000));
        this.connection.prepareQuery("INSERT INTO clob_test VALUES (1, ?, ?, 'Test1')", preparedStatement -> {
            preparedStatement.setClob(1, createClob);
            preparedStatement.setNClob(2, createNClob);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
        Struct after = after(sourceRecord);
        Assertions.assertThat(after.get("ID")).isEqualTo(1);
        Assertions.assertThat(after.get("VAL_CLOB")).isEqualTo(getClobString(createClob));
        Assertions.assertThat(after.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob));
        Assertions.assertThat(after.get("VAL_DATA")).isEqualTo("Test1");
        Clob createClob2 = createClob(part(JSON_DATA, 1, 24450));
        NClob createNClob2 = createNClob(part(JSON_DATA2, 2, 24450));
        this.connection.prepareQuery("INSERT INTO clob_test VALUES (2, ?, ?, 'Test2')", preparedStatement2 -> {
            preparedStatement2.setClob(1, createClob2);
            preparedStatement2.setNClob(2, createNClob2);
        }, (JdbcConnection.ResultSetConsumer) null);
        Clob createClob3 = createClob(part(JSON_DATA, 3, 24450));
        NClob createNClob3 = createNClob(part(JSON_DATA2, 4, 24450));
        this.connection.prepareQuery("INSERT INTO clob_test VALUES (3, ?, ?, 'Test3')", preparedStatement3 -> {
            preparedStatement3.setClob(1, createClob3);
            preparedStatement3.setNClob(2, createNClob3);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
        Struct after2 = after(sourceRecord2);
        Assertions.assertThat(after2.get("ID")).isEqualTo(2);
        Assertions.assertThat(after2.get("VAL_CLOB")).isEqualTo(getClobString(createClob2));
        Assertions.assertThat(after2.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob2));
        Assertions.assertThat(after2.get("VAL_DATA")).isEqualTo("Test2");
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(1);
        VerifyRecord.isValidInsert(sourceRecord3, "ID", 3);
        Struct after3 = after(sourceRecord3);
        Assertions.assertThat(after3.get("ID")).isEqualTo(3);
        Assertions.assertThat(after3.get("VAL_CLOB")).isEqualTo(getClobString(createClob3));
        Assertions.assertThat(after3.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob3));
        Assertions.assertThat(after3.get("VAL_DATA")).isEqualTo("Test3");
        Clob createClob4 = createClob(part(JSON_DATA, 1, 24500));
        NClob createNClob4 = createNClob(part(JSON_DATA2, 1, 24500));
        this.connection.prepareQuery("UPDATE clob_test SET val_clob=?, val_nclob=?, val_data='Test1U' WHERE id = 1", preparedStatement4 -> {
            preparedStatement4.setClob(1, createClob4);
            preparedStatement4.setNClob(2, createNClob4);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidUpdate(sourceRecord4, "ID", 1);
        Struct after4 = after(sourceRecord4);
        Assertions.assertThat(after4.get("ID")).isEqualTo(1);
        Assertions.assertThat(after4.get("VAL_CLOB")).isEqualTo(getClobString(createClob4));
        Assertions.assertThat(after4.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob4));
        Assertions.assertThat(after4.get("VAL_DATA")).isEqualTo("Test1U");
        Clob createClob5 = createClob(part(JSON_DATA, 2, 25000));
        NClob createNClob5 = createNClob(part(JSON_DATA2, 2, 25000));
        this.connection.prepareQuery("UPDATE clob_test SET val_clob=?, val_nclob=?, val_data='Test2U' WHERE id = 2", preparedStatement5 -> {
            preparedStatement5.setClob(1, createClob5);
            preparedStatement5.setNClob(2, createNClob5);
        }, (JdbcConnection.ResultSetConsumer) null);
        Clob createClob6 = createClob(part(JSON_DATA, 3, 25000));
        NClob createNClob6 = createNClob(part(JSON_DATA2, 3, 25000));
        this.connection.prepareQuery("UPDATE clob_test SET val_clob=?, val_nclob=?, val_data='Test3U' WHERE id = 3", preparedStatement6 -> {
            preparedStatement6.setClob(1, createClob6);
            preparedStatement6.setNClob(2, createNClob6);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord5 = (SourceRecord) consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidUpdate(sourceRecord5, "ID", 2);
        Struct after5 = after(sourceRecord5);
        Assertions.assertThat(after5.get("ID")).isEqualTo(2);
        Assertions.assertThat(after5.get("VAL_CLOB")).isEqualTo(getClobString(createClob5));
        Assertions.assertThat(after5.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob5));
        Assertions.assertThat(after5.get("VAL_DATA")).isEqualTo("Test2U");
        SourceRecord sourceRecord6 = (SourceRecord) consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST")).get(1);
        VerifyRecord.isValidUpdate(sourceRecord6, "ID", 3);
        Struct after6 = after(sourceRecord6);
        Assertions.assertThat(after6.get("ID")).isEqualTo(3);
        Assertions.assertThat(after6.get("VAL_CLOB")).isEqualTo(getClobString(createClob6));
        Assertions.assertThat(after6.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob6));
        Assertions.assertThat(after6.get("VAL_DATA")).isEqualTo("Test3U");
        this.connection.execute(new String[]{"DELETE FROM debezium.clob_test WHERE id = 1"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic5 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic5.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord7 = (SourceRecord) consumeRecordsByTopic5.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidDelete(sourceRecord7, "ID", 1);
        Struct before = before(sourceRecord7);
        Assertions.assertThat(before.get("ID")).isEqualTo(1);
        Assertions.assertThat(before.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before.get("VAL_DATA")).isEqualTo("Test1U");
        Assertions.assertThat(after(sourceRecord7)).isNull();
        this.connection.executeWithoutCommitting(new String[]{"DELETE FROM debezium.clob_test WHERE id = 2"});
        this.connection.executeWithoutCommitting(new String[]{"DELETE FROM debezium.clob_test WHERE id = 3"});
        this.connection.execute(new String[]{"COMMIT"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic6 = consumeRecordsByTopic(4);
        Assertions.assertThat(consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST"))).hasSize(4);
        SourceRecord sourceRecord8 = (SourceRecord) consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidDelete(sourceRecord8, "ID", 2);
        Struct before2 = before(sourceRecord8);
        Assertions.assertThat(before2.get("ID")).isEqualTo(2);
        Assertions.assertThat(before2.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before2.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before2.get("VAL_DATA")).isEqualTo("Test2U");
        Assertions.assertThat(after(sourceRecord8)).isNull();
        SourceRecord sourceRecord9 = (SourceRecord) consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST")).get(2);
        VerifyRecord.isValidDelete(sourceRecord9, "ID", 3);
        Struct before3 = before(sourceRecord9);
        Assertions.assertThat(before3.get("ID")).isEqualTo(3);
        Assertions.assertThat(before3.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before3.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before3.get("VAL_DATA")).isEqualTo("Test3U");
        Assertions.assertThat(after(sourceRecord9)).isNull();
    }

    @Test
    @FixFor({"DBZ-2948"})
    public void shouldStreamMixedClobDataTypeValuesWithNonClobFieldsSameTable() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE CLOB_TEST (ID numeric(9,0), VAL_CLOB clob, VAL_NCLOB nclob, VAL_CLOBS clob, VAL_NCLOBS nclob, VAL_VARCHAR2 varchar2(50),primary key(id))"});
        TestHelper.streamTable(this.connection, "debezium.clob_test");
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CLOB_TEST").with(OracleConnectorConfig.LOB_ENABLED, true).build();
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        Clob createClob = createClob(part(JSON_DATA, 0, 25000));
        NClob createNClob = createNClob(part(JSON_DATA2, 0, 25000));
        this.connection.prepareQuery("INSERT INTO clob_test VALUES (1, ?, ?, 'ClobTest', 'NClobTest', 'Test1')", preparedStatement -> {
            preparedStatement.setClob(1, createClob);
            preparedStatement.setNClob(2, createNClob);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
        Struct after = after(sourceRecord);
        Assertions.assertThat(after.get("ID")).isEqualTo(1);
        Assertions.assertThat(after.get("VAL_CLOB")).isEqualTo(getClobString(createClob));
        Assertions.assertThat(after.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob));
        Assertions.assertThat(after.get("VAL_CLOBS")).isEqualTo("ClobTest");
        Assertions.assertThat(after.get("VAL_NCLOBS")).isEqualTo("NClobTest");
        Assertions.assertThat(after.get("VAL_VARCHAR2")).isEqualTo("Test1");
        Clob createClob2 = createClob(part(JSON_DATA, 1, 24450));
        NClob createNClob2 = createNClob(part(JSON_DATA2, 2, 24450));
        this.connection.prepareQuery("INSERT INTO clob_test VALUES (2, ?, ?, 'ClobTest2', 'NClobTest2', 'Test2')", preparedStatement2 -> {
            preparedStatement2.setClob(1, createClob2);
            preparedStatement2.setNClob(2, createNClob2);
        }, (JdbcConnection.ResultSetConsumer) null);
        Clob createClob3 = createClob(part(JSON_DATA, 3, 24450));
        NClob createNClob3 = createNClob(part(JSON_DATA2, 4, 24450));
        this.connection.prepareQuery("INSERT INTO clob_test VALUES (3, ?, ?, 'ClobTest3', 'NClobTest3', 'Test3')", preparedStatement3 -> {
            preparedStatement3.setClob(1, createClob3);
            preparedStatement3.setNClob(2, createNClob3);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
        Struct after2 = after(sourceRecord2);
        Assertions.assertThat(after2.get("ID")).isEqualTo(2);
        Assertions.assertThat(after2.get("VAL_CLOB")).isEqualTo(getClobString(createClob2));
        Assertions.assertThat(after2.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob2));
        Assertions.assertThat(after2.get("VAL_CLOBS")).isEqualTo("ClobTest2");
        Assertions.assertThat(after2.get("VAL_NCLOBS")).isEqualTo("NClobTest2");
        Assertions.assertThat(after2.get("VAL_VARCHAR2")).isEqualTo("Test2");
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(1);
        VerifyRecord.isValidInsert(sourceRecord3, "ID", 3);
        Struct after3 = after(sourceRecord3);
        Assertions.assertThat(after3.get("ID")).isEqualTo(3);
        Assertions.assertThat(after3.get("VAL_CLOB")).isEqualTo(getClobString(createClob3));
        Assertions.assertThat(after3.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob3));
        Assertions.assertThat(after3.get("VAL_CLOBS")).isEqualTo("ClobTest3");
        Assertions.assertThat(after3.get("VAL_NCLOBS")).isEqualTo("NClobTest3");
        Assertions.assertThat(after3.get("VAL_VARCHAR2")).isEqualTo("Test3");
        Clob createClob4 = createClob(part(JSON_DATA, 1, 24500));
        NClob createNClob4 = createNClob(part(JSON_DATA2, 1, 24500));
        this.connection.prepareQuery("UPDATE clob_test SET val_clob=?, val_nclob=?, val_clobs=?, val_nclobs=?, val_varchar2='Test1U' WHERE id = 1", preparedStatement4 -> {
            preparedStatement4.setClob(1, createClob4);
            preparedStatement4.setNClob(2, createNClob4);
            preparedStatement4.setString(3, "ClobTest1Updated");
            preparedStatement4.setString(4, "NClobTest1Updated");
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidUpdate(sourceRecord4, "ID", 1);
        Struct after4 = after(sourceRecord4);
        Assertions.assertThat(after4.get("ID")).isEqualTo(1);
        Assertions.assertThat(after4.get("VAL_CLOB")).isEqualTo(getClobString(createClob4));
        Assertions.assertThat(after4.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob4));
        Assertions.assertThat(after4.get("VAL_CLOBS")).isEqualTo("ClobTest1Updated");
        Assertions.assertThat(after4.get("VAL_NCLOBS")).isEqualTo("NClobTest1Updated");
        Assertions.assertThat(after4.get("VAL_VARCHAR2")).isEqualTo("Test1U");
        Clob createClob5 = createClob(part(JSON_DATA, 2, 25000));
        NClob createNClob5 = createNClob(part(JSON_DATA2, 2, 25000));
        this.connection.prepareQuery("UPDATE clob_test SET val_clob=?, val_nclob=?, val_clobs=?, val_nclobs=?, val_varchar2='Test2U' WHERE id = 2", preparedStatement5 -> {
            preparedStatement5.setClob(1, createClob5);
            preparedStatement5.setNClob(2, createNClob5);
            preparedStatement5.setString(3, "ClobTest2Updated");
            preparedStatement5.setString(4, "NClobTest2Updated");
        }, (JdbcConnection.ResultSetConsumer) null);
        Clob createClob6 = createClob(part(JSON_DATA, 3, 25000));
        NClob createNClob6 = createNClob(part(JSON_DATA2, 3, 25000));
        this.connection.prepareQuery("UPDATE clob_test SET val_clob=?, val_nclob=?, val_clobs=?, val_nclobs=?, val_varchar2='Test3U' WHERE id = 3", preparedStatement6 -> {
            preparedStatement6.setClob(1, createClob6);
            preparedStatement6.setNClob(2, createNClob6);
            preparedStatement6.setString(3, "ClobTest3Updated");
            preparedStatement6.setString(4, "NClobTest3Updated");
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord5 = (SourceRecord) consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidUpdate(sourceRecord5, "ID", 2);
        Struct after5 = after(sourceRecord5);
        Assertions.assertThat(after5.get("ID")).isEqualTo(2);
        Assertions.assertThat(after5.get("VAL_CLOB")).isEqualTo(getClobString(createClob5));
        Assertions.assertThat(after5.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob5));
        Assertions.assertThat(after5.get("VAL_CLOBS")).isEqualTo("ClobTest2Updated");
        Assertions.assertThat(after5.get("VAL_NCLOBS")).isEqualTo("NClobTest2Updated");
        Assertions.assertThat(after5.get("VAL_VARCHAR2")).isEqualTo("Test2U");
        SourceRecord sourceRecord6 = (SourceRecord) consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST")).get(1);
        VerifyRecord.isValidUpdate(sourceRecord6, "ID", 3);
        Struct after6 = after(sourceRecord6);
        Assertions.assertThat(after6.get("ID")).isEqualTo(3);
        Assertions.assertThat(after6.get("VAL_CLOB")).isEqualTo(getClobString(createClob6));
        Assertions.assertThat(after6.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob6));
        Assertions.assertThat(after6.get("VAL_CLOBS")).isEqualTo("ClobTest3Updated");
        Assertions.assertThat(after6.get("VAL_NCLOBS")).isEqualTo("NClobTest3Updated");
        Assertions.assertThat(after6.get("VAL_VARCHAR2")).isEqualTo("Test3U");
        this.connection.execute(new String[]{"DELETE FROM debezium.clob_test WHERE id = 1"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic5 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic5.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord7 = (SourceRecord) consumeRecordsByTopic5.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidDelete(sourceRecord7, "ID", 1);
        Struct before = before(sourceRecord7);
        Assertions.assertThat(before.get("ID")).isEqualTo(1);
        Assertions.assertThat(before.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before.get("VAL_VARCHAR2")).isEqualTo("Test1U");
        Assertions.assertThat(after(sourceRecord7)).isNull();
        this.connection.executeWithoutCommitting(new String[]{"DELETE FROM debezium.clob_test WHERE id = 2"});
        this.connection.executeWithoutCommitting(new String[]{"DELETE FROM debezium.clob_test WHERE id = 3"});
        this.connection.execute(new String[]{"COMMIT"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic6 = consumeRecordsByTopic(4);
        Assertions.assertThat(consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST"))).hasSize(4);
        SourceRecord sourceRecord8 = (SourceRecord) consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidDelete(sourceRecord8, "ID", 2);
        Struct before2 = before(sourceRecord8);
        Assertions.assertThat(before2.get("ID")).isEqualTo(2);
        Assertions.assertThat(before2.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before2.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before2.get("VAL_VARCHAR2")).isEqualTo("Test2U");
        Assertions.assertThat(after(sourceRecord8)).isNull();
        SourceRecord sourceRecord9 = (SourceRecord) consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST")).get(2);
        VerifyRecord.isValidDelete(sourceRecord9, "ID", 3);
        Struct before3 = before(sourceRecord9);
        Assertions.assertThat(before3.get("ID")).isEqualTo(3);
        Assertions.assertThat(before3.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before3.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before3.get("VAL_VARCHAR2")).isEqualTo("Test3U");
        Assertions.assertThat(after(sourceRecord9)).isNull();
    }

    @Test
    @FixFor({"DBZ-2948", "DBZ-5773"})
    @SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.OLR, reason = "OpenLogReplicator does not differentiate between LOB operations")
    public void shouldNotStreamAnyChangesWhenLobEraseIsDetected() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE CLOB_TEST (ID numeric(9,0), VAL_CLOB clob, primary key(id))"});
        TestHelper.streamTable(this.connection, "debezium.clob_test");
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CLOB_TEST").with(OracleConnectorConfig.LOB_ENABLED, true).build();
        LogInterceptor eventCommitHandler = TestHelper.getEventCommitHandler();
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        Clob createClob = createClob(part(JSON_DATA, 0, 25000));
        this.connection.prepareQuery("INSERT INTO CLOB_TEST VALUES (1, ?)", preparedStatement -> {
            preparedStatement.setClob(1, createClob);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
        Struct after = after(sourceRecord);
        Assertions.assertThat(after.get("ID")).isEqualTo(1);
        Assertions.assertThat(after.get("VAL_CLOB")).isEqualTo(getClobString(createClob));
        this.connection.execute(new String[]{"DECLARE loc_c CLOB; amount integer; BEGIN SELECT \"VAL_CLOB\" INTO loc_c FROM CLOB_TEST WHERE ID = 1 for update; amount := 10;dbms_lob.erase(loc_c, amount, 1); end;"});
        Awaitility.await().atMost(Duration.ofMinutes(1L)).until(() -> {
            return Boolean.valueOf(eventCommitHandler.containsWarnMessage("LOB_ERASE for table"));
        });
        assertNoRecordsToConsume();
    }

    @Test
    @FixFor({"DBZ-2948", "DBZ-5773"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.OLR, reason = "OpenLogReplicator does not differentiate between LOB operations")
    public void shouldStreamChangesWhenLobEraseIsDetected() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE CLOB_TEST (ID numeric(9,0), VAL_CLOB clob, primary key(id))"});
        TestHelper.streamTable(this.connection, "debezium.clob_test");
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CLOB_TEST").with(OracleConnectorConfig.LOB_ENABLED, true).build();
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        Clob createClob = createClob(part(JSON_DATA, 0, 24000));
        this.connection.prepareQuery("INSERT INTO debezium.clob_test values (1, ?)", preparedStatement -> {
            preparedStatement.setClob(1, createClob);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
        Struct after = after(sourceRecord);
        Assertions.assertThat(after.get("ID")).isEqualTo(1);
        Assertions.assertThat(after.get("VAL_CLOB")).isEqualTo(getClobString(createClob));
        this.connection.execute(new String[]{"DECLARE loc_c CLOB; amount integer; BEGIN SELECT \"VAL_CLOB\" INTO loc_c FROM CLOB_TEST WHERE ID = 1 for update; amount := 10;dbms_lob.erase(loc_c, amount, 1); end;"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidUpdate(sourceRecord2, "ID", 1);
        Struct before = before(sourceRecord2);
        Assertions.assertThat(before.get("ID")).isEqualTo(1);
        Assertions.assertThat(before.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Struct after2 = after(sourceRecord2);
        Assertions.assertThat(after2.get("ID")).isEqualTo(1);
        Assertions.assertThat(after2.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        assertNoRecordsToConsume();
    }

    @Test
    @FixFor({"DBZ-2948"})
    public void shouldStreamClobDataTypeValuesWithPrimaryKeyChange() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE CLOB_TEST (ID numeric(9,0), VAL_CLOB clob, VAL_NCLOB nclob, VAL_CLOBS clob, VAL_NCLOBS nclob, VAL_VARCHAR2 varchar2(50), primary key(id))"});
        TestHelper.streamTable(this.connection, "debezium.clob_test");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CLOB_TEST").with(OracleConnectorConfig.LOB_ENABLED, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        Clob createClob = createClob(part(JSON_DATA, 0, 25000));
        NClob createNClob = createNClob(part(JSON_DATA2, 0, 25000));
        this.connection.prepareQuery("INSERT INTO clob_test VALUES (1, ?, ?, 'ClobTest', 'NClobTest', 'Test1')", preparedStatement -> {
            preparedStatement.setClob(1, createClob);
            preparedStatement.setNClob(2, createNClob);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
        Struct after = after(sourceRecord);
        Assertions.assertThat(after.get("ID")).isEqualTo(1);
        Assertions.assertThat(after.get("VAL_CLOB")).isEqualTo(getClobString(createClob));
        Assertions.assertThat(after.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob));
        Assertions.assertThat(after.get("VAL_CLOBS")).isEqualTo("ClobTest");
        Assertions.assertThat(after.get("VAL_NCLOBS")).isEqualTo("NClobTest");
        Assertions.assertThat(after.get("VAL_VARCHAR2")).isEqualTo("Test1");
        Clob createClob2 = createClob(part(JSON_DATA, 1, 24500));
        NClob createNClob2 = createNClob(part(JSON_DATA2, 1, 24500));
        this.connection.prepareQuery("UPDATE clob_test SET id=2, val_clob=?, val_nclob=?, val_clobs=?, val_nclobs=?, val_varchar2='Test1U' WHERE id = 1", preparedStatement2 -> {
            preparedStatement2.setClob(1, createClob2);
            preparedStatement2.setNClob(2, createNClob2);
            preparedStatement2.setString(3, "ClobTest1Updated");
            preparedStatement2.setString(4, "NClobTest1Updated");
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(3);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST"))).hasSize(3);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(2);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
        Struct after2 = after(sourceRecord2);
        Assertions.assertThat(after2.get("ID")).isEqualTo(2);
        Assertions.assertThat(after2.get("VAL_CLOB")).isEqualTo(getClobString(createClob2));
        Assertions.assertThat(after2.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob2));
        Assertions.assertThat(after2.get("VAL_CLOBS")).isEqualTo("ClobTest1Updated");
        Assertions.assertThat(after2.get("VAL_NCLOBS")).isEqualTo("NClobTest1Updated");
        Assertions.assertThat(after2.get("VAL_VARCHAR2")).isEqualTo("Test1U");
    }

    @Test
    @FixFor({"DBZ-2948"})
    public void shouldStreamClobDataTypeValuesUsingBasicFileStorage() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE CLOB_TEST (ID numeric(9,0), VAL_CLOB clob, VAL_NCLOB nclob, VAL_CLOBS clob, VAL_NCLOBS nclob, VAL_VARCHAR2 varchar2(50), primary key(id)) LOB(VAL_CLOB) STORE AS BASICFILE LOB(VAL_NCLOB) STORE AS BASICFILE LOB(VAL_CLOBS) STORE AS BASICFILE LOB(VAL_NCLOBS) STORE AS BASICFILE"});
        TestHelper.streamTable(this.connection, "debezium.clob_test");
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CLOB_TEST").with(OracleConnectorConfig.LOB_ENABLED, true).build();
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        Clob createClob = createClob(part(JSON_DATA, 0, 25000));
        NClob createNClob = createNClob(part(JSON_DATA2, 0, 25000));
        this.connection.prepareQuery("INSERT INTO clob_test VALUES (1, ?, ?, 'ClobTest', 'NClobTest', 'Test1')", preparedStatement -> {
            preparedStatement.setClob(1, createClob);
            preparedStatement.setNClob(2, createNClob);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
        Struct after = after(sourceRecord);
        Assertions.assertThat(after.get("ID")).isEqualTo(1);
        Assertions.assertThat(after.get("VAL_CLOB")).isEqualTo(getClobString(createClob));
        Assertions.assertThat(after.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob));
        Assertions.assertThat(after.get("VAL_CLOBS")).isEqualTo("ClobTest");
        Assertions.assertThat(after.get("VAL_NCLOBS")).isEqualTo("NClobTest");
        Assertions.assertThat(after.get("VAL_VARCHAR2")).isEqualTo("Test1");
        Clob createClob2 = createClob(part(JSON_DATA, 1, 24450));
        NClob createNClob2 = createNClob(part(JSON_DATA2, 2, 24450));
        this.connection.prepareQuery("INSERT INTO clob_test VALUES (2, ?, ?, 'ClobTest2', 'NClobTest2', 'Test2')", preparedStatement2 -> {
            preparedStatement2.setClob(1, createClob2);
            preparedStatement2.setNClob(2, createNClob2);
        }, (JdbcConnection.ResultSetConsumer) null);
        Clob createClob3 = createClob(part(JSON_DATA, 3, 24450));
        NClob createNClob3 = createNClob(part(JSON_DATA2, 4, 24450));
        this.connection.prepareQuery("INSERT INTO clob_test VALUES (3, ?, ?, 'ClobTest3', 'NClobTest3', 'Test3')", preparedStatement3 -> {
            preparedStatement3.setClob(1, createClob3);
            preparedStatement3.setNClob(2, createNClob3);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
        Struct after2 = after(sourceRecord2);
        Assertions.assertThat(after2.get("ID")).isEqualTo(2);
        Assertions.assertThat(after2.get("VAL_CLOB")).isEqualTo(getClobString(createClob2));
        Assertions.assertThat(after2.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob2));
        Assertions.assertThat(after2.get("VAL_CLOBS")).isEqualTo("ClobTest2");
        Assertions.assertThat(after2.get("VAL_NCLOBS")).isEqualTo("NClobTest2");
        Assertions.assertThat(after2.get("VAL_VARCHAR2")).isEqualTo("Test2");
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(1);
        VerifyRecord.isValidInsert(sourceRecord3, "ID", 3);
        Struct after3 = after(sourceRecord3);
        Assertions.assertThat(after3.get("ID")).isEqualTo(3);
        Assertions.assertThat(after3.get("VAL_CLOB")).isEqualTo(getClobString(createClob3));
        Assertions.assertThat(after3.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob3));
        Assertions.assertThat(after3.get("VAL_CLOBS")).isEqualTo("ClobTest3");
        Assertions.assertThat(after3.get("VAL_NCLOBS")).isEqualTo("NClobTest3");
        Assertions.assertThat(after3.get("VAL_VARCHAR2")).isEqualTo("Test3");
        Clob createClob4 = createClob(part(JSON_DATA, 1, 24500));
        NClob createNClob4 = createNClob(part(JSON_DATA2, 1, 24500));
        this.connection.prepareQuery("UPDATE clob_test SET val_clob=?, val_nclob=?, val_clobs=?, val_nclobs=?, val_varchar2='Test1U' WHERE id = 1", preparedStatement4 -> {
            preparedStatement4.setClob(1, createClob4);
            preparedStatement4.setNClob(2, createNClob4);
            preparedStatement4.setString(3, "ClobTest1Updated");
            preparedStatement4.setString(4, "NClobTest1Updated");
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidUpdate(sourceRecord4, "ID", 1);
        Struct after4 = after(sourceRecord4);
        Assertions.assertThat(after4.get("ID")).isEqualTo(1);
        Assertions.assertThat(after4.get("VAL_CLOB")).isEqualTo(getClobString(createClob4));
        Assertions.assertThat(after4.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob4));
        Assertions.assertThat(after4.get("VAL_CLOBS")).isEqualTo("ClobTest1Updated");
        Assertions.assertThat(after4.get("VAL_NCLOBS")).isEqualTo("NClobTest1Updated");
        Assertions.assertThat(after4.get("VAL_VARCHAR2")).isEqualTo("Test1U");
        Clob createClob5 = createClob(part(JSON_DATA, 2, 25000));
        NClob createNClob5 = createNClob(part(JSON_DATA2, 2, 25000));
        this.connection.prepareQuery("UPDATE clob_test SET val_clob=?, val_nclob=?, val_clobs=?, val_nclobs=?, val_varchar2='Test2U' WHERE id = 2", preparedStatement5 -> {
            preparedStatement5.setClob(1, createClob5);
            preparedStatement5.setNClob(2, createNClob5);
            preparedStatement5.setString(3, "ClobTest2Updated");
            preparedStatement5.setString(4, "NClobTest2Updated");
        }, (JdbcConnection.ResultSetConsumer) null);
        Clob createClob6 = createClob(part(JSON_DATA, 3, 25000));
        NClob createNClob6 = createNClob(part(JSON_DATA2, 3, 25000));
        this.connection.prepareQuery("UPDATE clob_test SET val_clob=?, val_nclob=?, val_clobs=?, val_nclobs=?, val_varchar2='Test3U' WHERE id = 3", preparedStatement6 -> {
            preparedStatement6.setClob(1, createClob6);
            preparedStatement6.setNClob(2, createNClob6);
            preparedStatement6.setString(3, "ClobTest3Updated");
            preparedStatement6.setString(4, "NClobTest3Updated");
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord5 = (SourceRecord) consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidUpdate(sourceRecord5, "ID", 2);
        Struct after5 = after(sourceRecord5);
        Assertions.assertThat(after5.get("ID")).isEqualTo(2);
        Assertions.assertThat(after5.get("VAL_CLOB")).isEqualTo(getClobString(createClob5));
        Assertions.assertThat(after5.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob5));
        Assertions.assertThat(after5.get("VAL_CLOBS")).isEqualTo("ClobTest2Updated");
        Assertions.assertThat(after5.get("VAL_NCLOBS")).isEqualTo("NClobTest2Updated");
        Assertions.assertThat(after5.get("VAL_VARCHAR2")).isEqualTo("Test2U");
        SourceRecord sourceRecord6 = (SourceRecord) consumeRecordsByTopic4.recordsForTopic(topicName("CLOB_TEST")).get(1);
        VerifyRecord.isValidUpdate(sourceRecord6, "ID", 3);
        Struct after6 = after(sourceRecord6);
        Assertions.assertThat(after6.get("ID")).isEqualTo(3);
        Assertions.assertThat(after6.get("VAL_CLOB")).isEqualTo(getClobString(createClob6));
        Assertions.assertThat(after6.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob6));
        Assertions.assertThat(after6.get("VAL_CLOBS")).isEqualTo("ClobTest3Updated");
        Assertions.assertThat(after6.get("VAL_NCLOBS")).isEqualTo("NClobTest3Updated");
        Assertions.assertThat(after6.get("VAL_VARCHAR2")).isEqualTo("Test3U");
        this.connection.execute(new String[]{"DELETE FROM debezium.clob_test WHERE id = 1"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic5 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic5.recordsForTopic(topicName("CLOB_TEST"))).hasSize(2);
        SourceRecord sourceRecord7 = (SourceRecord) consumeRecordsByTopic5.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidDelete(sourceRecord7, "ID", 1);
        Struct before = before(sourceRecord7);
        Assertions.assertThat(before.get("ID")).isEqualTo(1);
        Assertions.assertThat(before.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before.get("VAL_VARCHAR2")).isEqualTo("Test1U");
        Assertions.assertThat(after(sourceRecord7)).isNull();
        this.connection.executeWithoutCommitting(new String[]{"DELETE FROM debezium.clob_test WHERE id = 2"});
        this.connection.executeWithoutCommitting(new String[]{"DELETE FROM debezium.clob_test WHERE id = 3"});
        this.connection.execute(new String[]{"COMMIT"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic6 = consumeRecordsByTopic(4);
        Assertions.assertThat(consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST"))).hasSize(4);
        SourceRecord sourceRecord8 = (SourceRecord) consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidDelete(sourceRecord8, "ID", 2);
        Struct before2 = before(sourceRecord8);
        Assertions.assertThat(before2.get("ID")).isEqualTo(2);
        Assertions.assertThat(before2.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before2.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before2.get("VAL_VARCHAR2")).isEqualTo("Test2U");
        Assertions.assertThat(after(sourceRecord8)).isNull();
        SourceRecord sourceRecord9 = (SourceRecord) consumeRecordsByTopic6.recordsForTopic(topicName("CLOB_TEST")).get(2);
        VerifyRecord.isValidDelete(sourceRecord9, "ID", 3);
        Struct before3 = before(sourceRecord9);
        Assertions.assertThat(before3.get("ID")).isEqualTo(3);
        Assertions.assertThat(before3.get("VAL_CLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before3.get("VAL_NCLOB")).isEqualTo(getUnavailableValuePlaceholder(build));
        Assertions.assertThat(before3.get("VAL_VARCHAR2")).isEqualTo("Test3U");
        Assertions.assertThat(after(sourceRecord9)).isNull();
    }

    @Test
    @FixFor({"DBZ-3631"})
    public void shouldReconcileTransactionWhenAllBlobClobAreInitializedAsNull() throws Exception {
        TestHelper.dropTable(this.connection, "dbz3631");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz3631 (ID NUMBER(38) NOT NULL,ENTITY_ID NUMBER(38) NOT NULL,DOCX CLOB,DOCX_SIGNATURE CLOB,XML_OOS CLOB,XML_OOS_SIGNATURE CLOB,PRIMARY KEY(ID))"});
            TestHelper.streamTable(this.connection, "dbz3631");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.DBZ3631").with(OracleConnectorConfig.LOB_ENABLED, true).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3631 (ID,ENTITY_ID) VALUES (13268281,13340568)"});
            this.connection.commit();
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ3631");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Struct struct = (Struct) ((SourceRecord) recordsForTopic.get(0)).value();
            Struct struct2 = struct.getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(BigDecimal.valueOf(13268281L));
            Assertions.assertThat(struct2.get("ENTITY_ID")).isEqualTo(BigDecimal.valueOf(13340568L));
            Assertions.assertThat(struct2.get("DOCX")).isNull();
            Assertions.assertThat(struct2.get("DOCX_SIGNATURE")).isNull();
            Assertions.assertThat(struct2.get("XML_OOS")).isNull();
            Assertions.assertThat(struct2.get("XML_OOS_SIGNATURE")).isNull();
            Assertions.assertThat(struct.get("op")).isEqualTo(Envelope.Operation.CREATE.code());
            TestHelper.dropTable(this.connection, "dbz3631");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz3631");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3645"})
    public void shouldNotEmitClobFieldValuesWhenLobSupportIsNotEnabled() throws Exception {
        boolean isAnyLogMiner = TestHelper.isAnyLogMiner();
        TestHelper.dropTable(this.connection, "dbz3645");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz3645 (id numeric(9,0), data clob, primary key(id))"});
            TestHelper.streamTable(this.connection, "dbz3645");
            this.connection.execute(new String[]{"INSERT INTO dbz3645 (id,data) values (1,'Test1')"});
            Clob createClob = createClob(part(JSON_DATA, 0, 25000));
            this.connection.prepareQuery("INSERT INTO dbz3645 (id,data) values (2,?)", preparedStatement -> {
                preparedStatement.setClob(1, createClob);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3645").with(OracleConnectorConfig.LOB_ENABLED, false).build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic(topicName("DBZ3645"));
            Assertions.assertThat(recordsForTopic).hasSize(2);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("Test1");
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo(getClobString(createClob));
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3645 (id,data) values (3,'Test3')"});
            this.connection.prepareQuery("INSERT INTO dbz3645 (id,data) values (4,?)", preparedStatement2 -> {
                preparedStatement2.setClob(1, createClob);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic2 = consumeRecordsByTopic(isAnyLogMiner ? 3 : 2).recordsForTopic(topicName("DBZ3645"));
            Assertions.assertThat(recordsForTopic2).hasSize(isAnyLogMiner ? 3 : 2);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic2.get(0);
            Struct struct3 = ((Struct) sourceRecord.value()).getStruct("after");
            Assertions.assertThat(struct3.get("ID")).isEqualTo(3);
            if (isAnyLogMiner) {
                Assertions.assertThat(struct3.get("DATA")).isNull();
            } else {
                Assertions.assertThat(struct3.get("DATA")).isEqualTo("Test3");
            }
            Assertions.assertThat(((Struct) sourceRecord.value()).get("op")).isEqualTo("c");
            if (isAnyLogMiner) {
                SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(1);
                Struct struct4 = ((Struct) sourceRecord2.value()).getStruct("after");
                Assertions.assertThat(struct4.get("ID")).isEqualTo(3);
                Assertions.assertThat(struct4.get("DATA")).isEqualTo("Test3");
                Assertions.assertThat(((Struct) sourceRecord2.value()).get("op")).isEqualTo("u");
            }
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic2.get(isAnyLogMiner ? 2 : 1);
            Struct struct5 = ((Struct) sourceRecord3.value()).getStruct("after");
            Assertions.assertThat(struct5.get("ID")).isEqualTo(4);
            if (isAnyLogMiner) {
                Assertions.assertThat(struct5.get("DATA")).isNull();
            } else {
                Assertions.assertThat(struct5.get("DATA")).isEqualTo(getClobString(createClob));
            }
            Assertions.assertThat(((Struct) sourceRecord3.value()).get("op")).isEqualTo("c");
            this.connection.executeWithoutCommitting(new String[]{"UPDATE dbz3645 set data='Test3U' WHERE id = 3"});
            this.connection.commit();
            List recordsForTopic3 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3645"));
            VerifyRecord.isValidUpdate((SourceRecord) recordsForTopic3.get(0), "ID", 3);
            Assertions.assertThat(getBeforeField((SourceRecord) recordsForTopic3.get(0), "DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
            Assertions.assertThat(getAfterField((SourceRecord) recordsForTopic3.get(0), "DATA")).isEqualTo("Test3U");
            assertNoRecordsToConsume();
            Clob createClob2 = createClob(part(JSON_DATA, 1000, 21500));
            this.connection.prepareQuery("UPDATE dbz3645 set data=? WHERE id=4", preparedStatement3 -> {
                preparedStatement3.setClob(1, createClob2);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            if (isAnyLogMiner) {
                waitForAvailableRecords(10L, TimeUnit.SECONDS);
            } else {
                List recordsForTopic4 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3645"));
                VerifyRecord.isValidUpdate((SourceRecord) recordsForTopic4.get(0), "ID", 4);
                Assertions.assertThat(getBeforeField((SourceRecord) recordsForTopic4.get(0), "DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
                Assertions.assertThat(getAfterField((SourceRecord) recordsForTopic4.get(0), "DATA")).isEqualTo(getClobString(createClob2));
            }
            assertNoRecordsToConsume();
            this.connection.executeWithoutCommitting(new String[]{"UPDATE dbz3645 set id=5 where id=3"});
            this.connection.commit();
            List recordsForTopic5 = consumeRecordsByTopic(3).recordsForTopic(topicName("DBZ3645"));
            VerifyRecord.isValidDelete((SourceRecord) recordsForTopic5.get(0), "ID", 3);
            VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic5.get(1), "ID", 3);
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic5.get(2), "ID", 5);
            Assertions.assertThat(getBeforeField((SourceRecord) recordsForTopic5.get(0), "DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
            Assertions.assertThat(getAfterField((SourceRecord) recordsForTopic5.get(2), "DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
            assertNoRecordsToConsume();
            this.connection.executeWithoutCommitting(new String[]{"UPDATE dbz3645 SET ID=6 WHERE ID=4"});
            this.connection.commit();
            List recordsForTopic6 = consumeRecordsByTopic(3).recordsForTopic(topicName("DBZ3645"));
            VerifyRecord.isValidDelete((SourceRecord) recordsForTopic6.get(0), "ID", 4);
            VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic6.get(1), "ID", 4);
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic6.get(2), "ID", 6);
            Assertions.assertThat(getBeforeField((SourceRecord) recordsForTopic6.get(0), "DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
            Assertions.assertThat(getAfterField((SourceRecord) recordsForTopic6.get(2), "DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
            assertNoRecordsToConsume();
            Clob createClob3 = createClob(part(JSON_DATA, 10, 260));
            this.connection.prepareQuery("UPDATE dbz3645 SET data=?, id=7 WHERE id=5", preparedStatement4 -> {
                preparedStatement4.setClob(1, createClob3);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic7 = consumeRecordsByTopic(isAnyLogMiner ? 4 : 3).recordsForTopic(topicName("DBZ3645"));
            VerifyRecord.isValidDelete((SourceRecord) recordsForTopic7.get(0), "ID", 5);
            VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic7.get(1), "ID", 5);
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic7.get(2), "ID", 7);
            if (isAnyLogMiner) {
                VerifyRecord.isValidUpdate((SourceRecord) recordsForTopic7.get(3), "ID", 7);
            }
            Assertions.assertThat(getBeforeField((SourceRecord) recordsForTopic7.get(0), "DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
            if (isAnyLogMiner) {
                Assertions.assertThat(getAfterField((SourceRecord) recordsForTopic7.get(2), "DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
                Assertions.assertThat(getBeforeField((SourceRecord) recordsForTopic7.get(3), "DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
                Assertions.assertThat(getAfterField((SourceRecord) recordsForTopic7.get(3), "DATA")).isEqualTo(getClobString(createClob3));
            } else {
                Assertions.assertThat(getAfterField((SourceRecord) recordsForTopic7.get(2), "DATA")).isEqualTo(getClobString(createClob3));
            }
            assertNoRecordsToConsume();
            Clob createClob4 = createClob(part(JSON_DATA, 10, 12500));
            this.connection.prepareQuery("UPDATE dbz3645 SET data=?, id=8 WHERE id=6", preparedStatement5 -> {
                preparedStatement5.setClob(1, createClob4);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic8 = consumeRecordsByTopic(3).recordsForTopic(topicName("DBZ3645"));
            VerifyRecord.isValidDelete((SourceRecord) recordsForTopic8.get(0), "ID", 6);
            VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic8.get(1), "ID", 6);
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic8.get(2), "ID", 8);
            Assertions.assertThat(getBeforeField((SourceRecord) recordsForTopic8.get(0), "DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
            if (isAnyLogMiner) {
                Assertions.assertThat(getAfterField((SourceRecord) recordsForTopic8.get(2), "DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
            } else {
                Assertions.assertThat(getAfterField((SourceRecord) recordsForTopic8.get(2), "DATA")).isEqualTo(getClobString(createClob4));
            }
            assertNoRecordsToConsume();
            if (isAnyLogMiner) {
                this.connection.execute(new String[]{"DELETE FROM dbz3645 WHERE id=7"});
                List recordsForTopic9 = consumeRecordsByTopic(2).recordsForTopic(topicName("DBZ3645"));
                VerifyRecord.isValidDelete((SourceRecord) recordsForTopic9.get(0), "ID", 7);
                VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic9.get(1), "ID", 7);
                Assertions.assertThat(getBeforeField((SourceRecord) recordsForTopic9.get(0), "DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
                assertNoRecordsToConsume();
                this.connection.execute(new String[]{"DELETE FROM dbz3645 WHERE id=8"});
                List recordsForTopic10 = consumeRecordsByTopic(2).recordsForTopic(topicName("DBZ3645"));
                VerifyRecord.isValidDelete((SourceRecord) recordsForTopic10.get(0), "ID", 8);
                VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic10.get(1), "ID", 8);
                Assertions.assertThat(getBeforeField((SourceRecord) recordsForTopic10.get(0), "DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
                assertNoRecordsToConsume();
                TestHelper.dropTable(this.connection, "dbz3645");
            }
        } finally {
            TestHelper.dropTable(this.connection, "dbz3645");
        }
    }

    @Test
    @FixFor({"DBZ-3893"})
    public void shouldStreamNotNullClobUsingEmptyClobFunction() throws Exception {
        TestHelper.dropTable(this.connection, "dbz3898");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz3898 (id numeric(9,0), data clob not null, primary key(id))"});
            TestHelper.streamTable(this.connection, "dbz3898");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3898").with(OracleConnectorConfig.LOB_ENABLED, true).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"INSERT INTO dbz3898 (id,data) values (1,EMPTY_CLOB())"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("DBZ3898"))).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("DBZ3898")).get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("");
            assertNoRecordsToConsume();
            TestHelper.dropTable(this.connection, "dbz3898");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz3898");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4276"})
    public void shouldStreamClobWithUnavailableColumnValuePlaceholder() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4276");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz4276 (id numeric(9,0), data clob not null, data2 nclob not null, data3 varchar2(50), primary key(id))"});
            TestHelper.streamTable(this.connection, "dbz4276");
            this.connection.execute(new String[]{"INSERT INTO dbz4276 (id,data,data2,data3) values (1,EMPTY_CLOB(),EMPTY_CLOB(),'Test')"});
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4276").with(OracleConnectorConfig.LOB_ENABLED, true).build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("DBZ4276"))).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("DBZ4276")).get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("");
            Assertions.assertThat(struct.get("DATA2")).isEqualTo("");
            Assertions.assertThat(struct.get("DATA3")).isEqualTo("Test");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"UPDATE dbz4276 set data3 = '123' WHERE id = 1"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("DBZ4276"))).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("DBZ4276")).get(0);
            Struct struct2 = ((Struct) sourceRecord.value()).getStruct("before");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo(build.getString(OracleConnectorConfig.UNAVAILABLE_VALUE_PLACEHOLDER));
            Assertions.assertThat(struct2.get("DATA2")).isEqualTo(build.getString(OracleConnectorConfig.UNAVAILABLE_VALUE_PLACEHOLDER));
            Assertions.assertThat(struct2.get("DATA3")).isEqualTo("Test");
            Struct struct3 = ((Struct) sourceRecord.value()).getStruct("after");
            Assertions.assertThat(struct3.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct3.get("DATA")).isEqualTo(build.getString(OracleConnectorConfig.UNAVAILABLE_VALUE_PLACEHOLDER));
            Assertions.assertThat(struct3.get("DATA2")).isEqualTo(build.getString(OracleConnectorConfig.UNAVAILABLE_VALUE_PLACEHOLDER));
            Assertions.assertThat(struct3.get("DATA3")).isEqualTo("123");
            this.connection.execute(new String[]{"DELETE FROM dbz4276 WHERE id = 1"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(2);
            Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("DBZ4276"))).hasSize(2);
            SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("DBZ4276")).get(0);
            Struct struct4 = ((Struct) sourceRecord2.value()).getStruct("before");
            Assertions.assertThat(struct4.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct4.get("DATA")).isEqualTo(build.getString(OracleConnectorConfig.UNAVAILABLE_VALUE_PLACEHOLDER));
            Assertions.assertThat(struct4.get("DATA2")).isEqualTo(build.getString(OracleConnectorConfig.UNAVAILABLE_VALUE_PLACEHOLDER));
            Assertions.assertThat(struct4.get("DATA3")).isEqualTo("123");
            Assertions.assertThat(((Struct) sourceRecord2.value()).getStruct("after")).isNull();
            assertNoRecordsToConsume();
            TestHelper.dropTable(this.connection, "dbz4276");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz4276");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4366"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.ANY_LOGMINER, reason = "Xstream marks chunks as end of rows")
    public void shouldStreamClobsWrittenInChunkedMode() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4366");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz4366 (id numeric(9,0), val_clob clob not null, val_nclob nclob not null, primary key(id))"});
            TestHelper.streamTable(this.connection, "dbz4366");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4366").with(OracleConnectorConfig.LOB_ENABLED, true).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz4366 (id,val_clob,val_nclob) values (1,EMPTY_CLOB(),EMPTY_CLOB())"});
            this.connection.prepareQuery("DECLARE\n  loc CLOB;\n  nloc NCLOB;\n  i PLS_INTEGER;\n  str VARCHAR2(1024);\nBEGIN\n  str := ?;\n  SELECT val_clob into loc FROM dbz4366 WHERE id = 1 FOR UPDATE;\n  SELECT val_nclob into nloc FROM dbz4366 WHERE id = 1 FOR UPDATE;\n  DBMS_LOB.OPEN(loc, DBMS_LOB.LOB_READWRITE);\n  DBMS_LOB.OPEN(nloc, DBMS_LOB.LOB_READWRITE);\n  FOR i IN 1..1024 LOOP\n    DBMS_LOB.WRITEAPPEND(loc, 1024, str);\n    DBMS_LOB.WRITEAPPEND(nloc, 1024, str);\n  END LOOP;\n  DBMS_LOB.CLOSE(loc);\n  DBMS_LOB.CLOSE(nloc);\nEND;", preparedStatement -> {
                preparedStatement.setString(1, part(JSON_DATA, 0, 1024));
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.execute(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("DBZ4366"))).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("DBZ4366")).get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(((String) struct.get("VAL_CLOB")).length()).isEqualTo(1048576);
            Assertions.assertThat(((String) struct.get("VAL_NCLOB")).length()).isEqualTo(1048576);
            assertNoRecordsToConsume();
            TestHelper.dropTable(this.connection, "dbz4366");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz4366");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4891", "DBZ-4862", "DBZ-4994"})
    public void shouldStreamClobValueWithEscapedSingleQuoteValue() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE CLOB_TEST (ID numeric(9,0), VAL_CLOB clob, VAL_NCLOB nclob, VAL_USERNAME varchar2(100),VAL_DATA varchar2(100), primary key(id))"});
        TestHelper.streamTable(this.connection, "debezium.clob_test");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CLOB_TEST").with(OracleConnectorConfig.LOB_ENABLED, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        this.connection.execute(new String[]{"INSERT INTO clob_test (id,val_username,val_data) values (1,'This will be fixed soon so please don''t worry, she wrote.','2\"''\" sd f\"\"\" '''''''' ''''')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        VerifyRecord.isValidInsert((SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("CLOB_TEST")).get(0), "ID", 1);
        Clob createClob = createClob(part(JSON_DATA, 0, 25000));
        NClob createNClob = createNClob(part(JSON_DATA2, 0, 25000));
        this.connection.prepareQuery("update clob_test set val_clob=?, val_nclob=? where id=1", preparedStatement -> {
            preparedStatement.setClob(1, createClob);
            preparedStatement.setClob(2, createNClob);
        }, (JdbcConnection.ResultSetConsumer) null);
        this.connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST"))).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("CLOB_TEST")).get(0);
        VerifyRecord.isValidUpdate(sourceRecord, "ID", 1);
        Struct after = after(sourceRecord);
        Assertions.assertThat(after.get("ID")).isEqualTo(1);
        Assertions.assertThat(after.get("VAL_CLOB")).isEqualTo(getClobString(createClob));
        Assertions.assertThat(after.get("VAL_NCLOB")).isEqualTo(getClobString(createNClob));
        Assertions.assertThat(after.get("VAL_USERNAME")).isEqualTo("This will be fixed soon so please don't worry, she wrote.");
        Assertions.assertThat(after.get("VAL_DATA")).isEqualTo("2\"'\" sd f\"\"\" '''' ''");
    }

    @Test
    @FixFor({"DBZ-5266"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.ANY_LOGMINER, reason = "Commit SCN is only applicable to LogMiner")
    public void shouldUpdateCommitScnOnLobTransaction() throws Exception {
        TestHelper.dropTable(this.connection, "dbz5266");
        try {
            this.connection.execute(new String[]{"create table dbz5266 (data clob)"});
            TestHelper.streamTable(this.connection, "dbz5266");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5266").with(OracleConnectorConfig.LOB_ENABLED, true).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            try {
                PreparedStatement prepareStatement = this.connection.connection().prepareStatement("INSERT INTO dbz5266 values (?)");
                try {
                    File file = new File(getClass().getClassLoader().getResource("data/test_lob_data.json").toURI());
                    prepareStatement.setCharacterStream(1, new FileReader(file), file.length());
                    prepareStatement.addBatch();
                    prepareStatement.executeBatch();
                    this.connection.commit();
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                } catch (Throwable th) {
                    if (prepareStatement != null) {
                        try {
                            prepareStatement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Exception e) {
                Assertions.fail("Insert of clob data failed to happen", e);
            }
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ5266");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after").get("DATA")).isNotNull();
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("source");
            Assertions.assertThat(struct.get("scn")).isNotNull();
            Assertions.assertThat(struct.get("commit_scn")).isNotNull();
            Assertions.assertThat(Scn.valueOf(struct.getString("commit_scn")).longValue()).isGreaterThanOrEqualTo(Scn.valueOf(struct.getString("scn")).longValue());
            TestHelper.dropTable(this.connection, "dbz5266");
        } catch (Throwable th3) {
            TestHelper.dropTable(this.connection, "dbz5266");
            throw th3;
        }
    }

    @Test
    @FixFor({"DBZ-5266"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.ANY_LOGMINER, reason = "Commit SCN is only applicable to LogMiner")
    public void shouldUpdateCommitScnOnNonLobTransactionWithLobEnabled() throws Exception {
        TestHelper.dropTable(this.connection, "dbz5266");
        try {
            this.connection.execute(new String[]{"create table dbz5266 (data varchar2(50))"});
            TestHelper.streamTable(this.connection, "dbz5266");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5266").with(OracleConnectorConfig.LOB_ENABLED, true).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"INSERT INTO dbz5266 values ('test')"});
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ5266");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after").get("DATA")).isNotNull();
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("source");
            Assertions.assertThat(struct.get("scn")).isNotNull();
            Assertions.assertThat(struct.get("commit_scn")).isNotNull();
            Assertions.assertThat(Scn.valueOf(struct.getString("commit_scn")).longValue()).isGreaterThanOrEqualTo(Scn.valueOf(struct.getString("scn")).longValue());
            TestHelper.dropTable(this.connection, "dbz5266");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz5266");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5266"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.ANY_LOGMINER, reason = "Commit SCN is only applicable to LogMiner")
    public void shouldUpdateCommitScnOnNonLobTransactionWithLobDisabled() throws Exception {
        TestHelper.dropTable(this.connection, "dbz5266");
        try {
            this.connection.execute(new String[]{"create table dbz5266 (data varchar2(50))"});
            TestHelper.streamTable(this.connection, "dbz5266");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5266").with(OracleConnectorConfig.LOB_ENABLED, false).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"INSERT INTO dbz5266 values ('test')"});
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ5266");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after").get("DATA")).isNotNull();
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("source");
            Assertions.assertThat(struct.get("scn")).isNotNull();
            Assertions.assertThat(struct.get("commit_scn")).isNotNull();
            Assertions.assertThat(Scn.valueOf(struct.getString("commit_scn")).longValue()).isGreaterThanOrEqualTo(Scn.valueOf(struct.getString("scn")).longValue());
            TestHelper.dropTable(this.connection, "dbz5266");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz5266");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5295"})
    public void shouldReselectClobAfterPrimaryKeyChange() throws Exception {
        TestHelper.dropTable(this.connection, "dbz5295");
        try {
            LogInterceptor logInterceptor = new LogInterceptor(BaseChangeRecordEmitter.class);
            logInterceptor.setLoggerLevel(BaseChangeRecordEmitter.class, Level.INFO);
            this.connection.execute(new String[]{"create table dbz5295 (id numeric(9,0) primary key, data clob, data2 clob)"});
            TestHelper.streamTable(this.connection, "dbz5295");
            this.connection.execute(new String[]{"INSERT INTO dbz5295 (id,data,data2) values (1,'Small clob data','Data2')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5295").with(OracleConnectorConfig.LOB_ENABLED, true).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ5295"));
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("Small clob data");
            Assertions.assertThat(struct.get("DATA2")).isEqualTo("Data2");
            this.connection.execute(new String[]{"UPDATE dbz5295 set id = 2 where id = 1"});
            List recordsForTopic2 = consumeRecordsByTopic(3).recordsForTopic(topicName("DBZ5295"));
            Assertions.assertThat(recordsForTopic2).hasSize(3);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic2.get(0);
            VerifyRecord.isValidDelete(sourceRecord, "ID", 1);
            Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after")).isNull();
            VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic2.get(1));
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(2);
            VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
            Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo("Small clob data");
            Assertions.assertThat(struct2.get("DATA2")).isEqualTo("Data2");
            Assertions.assertThat(logInterceptor.containsMessage("re-selecting LOB columns [DATA, DATA2] out of bands")).isTrue();
            TestHelper.dropTable(this.connection, "dbz5295");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz5295");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5295"})
    public void shouldReselectClobAfterPrimaryKeyChangeWithRowDeletion() throws Exception {
        TestHelper.dropTable(this.connection, "dbz5295");
        try {
            LogInterceptor logInterceptor = new LogInterceptor(BaseChangeRecordEmitter.class);
            logInterceptor.setLoggerLevel(BaseChangeRecordEmitter.class, Level.INFO);
            this.connection.execute(new String[]{"create table dbz5295 (id numeric(9,0) primary key, data clob, data2 clob)"});
            TestHelper.streamTable(this.connection, "dbz5295");
            this.connection.execute(new String[]{"INSERT INTO dbz5295 (id,data,data2) values (1,'Small clob data','Data2')"});
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5295").with(OracleConnectorConfig.LOB_ENABLED, true).build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ5295"));
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("Small clob data");
            Assertions.assertThat(struct.get("DATA2")).isEqualTo("Data2");
            this.connection.executeWithoutCommitting(new String[]{"UPDATE dbz5295 set id = 2 where id = 1"});
            this.connection.execute(new String[]{"DELETE FROM dbz5295 where id = 2"});
            List recordsForTopic2 = consumeRecordsByTopic(4).recordsForTopic(topicName("DBZ5295"));
            Assertions.assertThat(recordsForTopic2).hasSize(4);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic2.get(0);
            VerifyRecord.isValidDelete(sourceRecord, "ID", 1);
            Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after")).isNull();
            VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic2.get(1));
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(2);
            VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
            Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
            Assertions.assertThat(struct2.get("DATA2")).isEqualTo(getUnavailableValuePlaceholder(build));
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic2.get(3);
            VerifyRecord.isValidDelete(sourceRecord3, "ID", 2);
            Struct struct3 = ((Struct) sourceRecord3.value()).getStruct("before");
            Assertions.assertThat(struct3.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct3.get("DATA")).isEqualTo(getUnavailableValuePlaceholder(build));
            Assertions.assertThat(struct3.get("DATA2")).isEqualTo(getUnavailableValuePlaceholder(build));
            Assertions.assertThat(logInterceptor.containsMessage("re-selecting LOB columns [DATA, DATA2] out of bands")).isTrue();
            TestHelper.dropTable(this.connection, "dbz5295");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz5295");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7456"})
    public void shouldNotReselectClobAfterPrimaryKeyChangeColumnExcluded() throws Exception {
        TestHelper.dropTable(this.connection, "dbz7456");
        try {
            LogInterceptor logInterceptor = new LogInterceptor(BaseChangeRecordEmitter.class);
            logInterceptor.setLoggerLevel(BaseChangeRecordEmitter.class, Level.INFO);
            this.connection.execute(new String[]{"create table dbz7456 (id numeric(9,0) primary key, data clob, data2 clob)"});
            TestHelper.streamTable(this.connection, "dbz7456");
            this.connection.execute(new String[]{"INSERT INTO dbz7456 (id,data,data2) values (1,'Small clob data','Data2')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ7456").with(OracleConnectorConfig.LOB_ENABLED, true).with(OracleConnectorConfig.COLUMN_EXCLUDE_LIST, "DEBEZIUM.DBZ7456.DATA,DEBEZIUM.DBZ7456.DATA2").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ7456"));
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.schema().field("DATA")).isNull();
            Assertions.assertThat(struct.schema().field("DATA2")).isNull();
            this.connection.execute(new String[]{"UPDATE dbz7456 set id = 2 where id = 1"});
            List recordsForTopic2 = consumeRecordsByTopic(3).recordsForTopic(topicName("DBZ7456"));
            Assertions.assertThat(recordsForTopic2).hasSize(3);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic2.get(0);
            VerifyRecord.isValidDelete(sourceRecord, "ID", 1);
            Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after")).isNull();
            VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic2.get(1));
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(2);
            VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
            Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.schema().field("DATA")).isNull();
            Assertions.assertThat(struct2.schema().field("DATA2")).isNull();
            Assertions.assertThat(logInterceptor.containsMessage("re-selecting LOB columns [DATA, DATA2] out of bands")).isFalse();
            TestHelper.dropTable(this.connection, "dbz7456");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz7456");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7456"})
    public void shouldNotReselectClobAfterPrimaryKeyChangeWithRowDeletionColumnExcluded() throws Exception {
        TestHelper.dropTable(this.connection, "dbz7456");
        try {
            LogInterceptor logInterceptor = new LogInterceptor(BaseChangeRecordEmitter.class);
            logInterceptor.setLoggerLevel(BaseChangeRecordEmitter.class, Level.INFO);
            this.connection.execute(new String[]{"create table dbz7456 (id numeric(9,0) primary key, data clob, data2 clob)"});
            TestHelper.streamTable(this.connection, "dbz7456");
            this.connection.execute(new String[]{"INSERT INTO dbz7456 (id,data,data2) values (1,'Small clob data','Data2')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ7456").with(OracleConnectorConfig.LOB_ENABLED, true).with(OracleConnectorConfig.COLUMN_EXCLUDE_LIST, "DEBEZIUM.DBZ7456.DATA,DEBEZIUM.DBZ7456.DATA2").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ7456"));
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.schema().field("DATA")).isNull();
            Assertions.assertThat(struct.schema().field("DATA2")).isNull();
            this.connection.executeWithoutCommitting(new String[]{"UPDATE dbz7456 set id = 2 where id = 1"});
            this.connection.execute(new String[]{"DELETE FROM dbz7456 where id = 2"});
            List recordsForTopic2 = consumeRecordsByTopic(4).recordsForTopic(topicName("DBZ7456"));
            Assertions.assertThat(recordsForTopic2).hasSize(4);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic2.get(0);
            VerifyRecord.isValidDelete(sourceRecord, "ID", 1);
            Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after")).isNull();
            VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic2.get(1));
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(2);
            VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
            Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.schema().field("DATA")).isNull();
            Assertions.assertThat(struct2.schema().field("DATA2")).isNull();
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic2.get(3);
            VerifyRecord.isValidDelete(sourceRecord3, "ID", 2);
            Struct struct3 = ((Struct) sourceRecord3.value()).getStruct("before");
            Assertions.assertThat(struct3.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct3.schema().field("DATA")).isNull();
            Assertions.assertThat(struct3.schema().field("DATA2")).isNull();
            Assertions.assertThat(logInterceptor.containsMessage("re-selecting LOB columns [DATA, DATA2] out of bands")).isFalse();
            TestHelper.dropTable(this.connection, "dbz7456");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz7456");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5581"})
    public void testClobUnavailableValuePlaceholderUpdateOnlyOneClobColumn() throws Exception {
        TestHelper.dropTable(this.connection, "dbz5581");
        try {
            this.connection.execute(new String[]{"create table dbz5581 (id numeric(9,0) primary key, a1 varchar2(200), a2 clob, a3 nclob, a4 varchar2(100))"});
            TestHelper.streamTable(this.connection, "dbz5581");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5581").with(OracleConnectorConfig.LOB_ENABLED, true).build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Clob createClob = createClob(part(JSON_DATA, 0, 4100));
            NClob createNClob = createNClob(part(JSON_DATA2, 0, 4100));
            this.connection.prepareQuery("INSERT into dbz5581 (id,a1,a2,a3,a4) values (1, 'lwmzVQd6r7', ?, ?, 'cuTVQV0OpK')", preparedStatement -> {
                preparedStatement.setClob(1, createClob);
                preparedStatement.setNClob(2, createNClob);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            Clob createClob2 = createClob(part(JSON_DATA, 1, 4101));
            this.connection.prepareQuery("UPDATE dbz5581 set A2=? WHERE ID=1", preparedStatement2 -> {
                preparedStatement2.setClob(1, createClob2);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            this.connection.execute(new String[]{"UPDATE dbz5581 set A2=NULL WHERE ID=1"});
            List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic("server1.DEBEZIUM.DBZ5581");
            Assertions.assertThat(recordsForTopic).hasSize(3);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("A1")).isEqualTo("lwmzVQd6r7");
            Assertions.assertThat(struct.get("A2")).isEqualTo(getClobString(createClob));
            Assertions.assertThat(struct.get("A3")).isEqualTo(getClobString(createNClob));
            Assertions.assertThat(struct.get("A4")).isEqualTo("cuTVQV0OpK");
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct2.get("A1")).isEqualTo("lwmzVQd6r7");
            Assertions.assertThat(struct2.get("A2")).isEqualTo(getClobString(createClob2));
            Assertions.assertThat(struct2.get("A3")).isEqualTo(getUnavailableValuePlaceholder(build));
            Assertions.assertThat(struct2.get("A4")).isEqualTo("cuTVQV0OpK");
            Struct struct3 = ((Struct) ((SourceRecord) recordsForTopic.get(2)).value()).getStruct("after");
            Assertions.assertThat(struct3.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct3.get("A1")).isEqualTo("lwmzVQd6r7");
            Assertions.assertThat(struct3.get("A2")).isNull();
            Assertions.assertThat(struct3.get("A3")).isEqualTo(getUnavailableValuePlaceholder(build));
            Assertions.assertThat(struct3.get("A4")).isEqualTo("cuTVQV0OpK");
            TestHelper.dropTable(this.connection, "dbz5581");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz5581");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7006"})
    public void shouldStreamClobDataDataThatContainsSingleQuotesAtSpecificBoundaries() throws Exception {
        TestHelper.dropTable(this.connection, "dbz7006");
        try {
            String replaceCharAt = replaceCharAt(createRandomStringWithAlphaNumeric(2000), 999, '\'');
            String replaceCharAt2 = replaceCharAt(createRandomStringWithAlphaNumeric(2000), 999, '\'');
            this.connection.execute(new String[]{"CREATE TABLE dbz7006 (id numeric(9,0) primary key, data clob)"});
            TestHelper.streamTable(this.connection, "dbz7006");
            Clob createClob = createClob(replaceCharAt);
            this.connection.prepareQuery("INSERT INTO dbz7006 values (1,?)", preparedStatement -> {
                preparedStatement.setClob(1, createClob);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ7006").with(OracleConnectorConfig.LOB_ENABLED, "true").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Clob createClob2 = createClob(replaceCharAt);
            this.connection.prepareQuery("INSERT INTO dbz7006 values (2,?)", preparedStatement2 -> {
                preparedStatement2.setClob(1, createClob2);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            Clob createClob3 = createClob(replaceCharAt2);
            this.connection.prepareQuery("UPDATE dbz7006 set data = ? WHERE id = 2", preparedStatement3 -> {
                preparedStatement3.setClob(1, createClob3);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic(topicName("DBZ7006"));
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidRead(sourceRecord, "ID", 1);
            Assertions.assertThat(getAfterField(sourceRecord, "DATA")).isEqualTo(replaceCharAt);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
            VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
            Assertions.assertThat(getAfterField(sourceRecord2, "DATA")).isEqualTo(replaceCharAt);
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic.get(2);
            VerifyRecord.isValidUpdate(sourceRecord3, "ID", 2);
            Assertions.assertThat(getAfterField(sourceRecord3, "DATA")).isEqualTo(replaceCharAt2);
            stopConnector();
            TestHelper.dropTable(this.connection, "dbz7006");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz7006");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7790"})
    public void shouldNotMergeClobDataWhenNoPrimaryKey() throws Exception {
        TestHelper.dropTable(this.connection, "DBZ7790");
        try {
            this.connection.execute(new String[]{"CREATE TABLE DBZ7790(id numeric(9,0), DATA CLOB)"});
            TestHelper.streamTable(this.connection, "DBZ7790");
            this.connection.execute(new String[]{"INSERT INTO DBZ7790 (id,data) values (1,'aaa')"});
            this.connection.execute(new String[]{"INSERT INTO DBZ7790 (id,data) values (2,'bbb')"});
            this.connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ7790").with(OracleConnectorConfig.LOB_ENABLED, "true").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic(topicName("DBZ7790"));
            Assertions.assertThat(recordsForTopic).hasSize(2);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("aaa");
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo("bbb");
            this.connection.execute(new String[]{"UPDATE DBZ7790 set data = 'ccc' WHERE id = 1"});
            this.connection.execute(new String[]{"UPDATE DBZ7790 set data = 'ddd' WHERE id = 2"});
            this.connection.commit();
            List recordsForTopic2 = consumeRecordsByTopic(2).recordsForTopic(topicName("DBZ7790"));
            Assertions.assertThat(recordsForTopic2).hasSize(2);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic2.get(0);
            Assertions.assertThat(getAfterField(sourceRecord, "ID")).isEqualTo(1);
            Assertions.assertThat(getAfterField(sourceRecord, "DATA")).isEqualTo("ccc");
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(1);
            Assertions.assertThat(getAfterField(sourceRecord2, "ID")).isEqualTo(2);
            Assertions.assertThat(getAfterField(sourceRecord2, "DATA")).isEqualTo("ddd");
            stopConnector();
            TestHelper.dropTable(this.connection, "DBZ7790");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "DBZ7790");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7790"})
    public void shouldNotMergeLargeClobDataWhenNoPrimaryKey() throws Exception {
        TestHelper.dropTable(this.connection, "DBZ7790");
        try {
            this.connection.execute(new String[]{"CREATE TABLE DBZ7790(id numeric(9,0), DATA CLOB)"});
            TestHelper.streamTable(this.connection, "DBZ7790");
            this.connection.execute(new String[]{"INSERT INTO DBZ7790 (id,data) values (1,'aaa')"});
            this.connection.execute(new String[]{"INSERT INTO DBZ7790 (id,data) values (2,'bbb')"});
            this.connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ7790").with(OracleConnectorConfig.LOB_ENABLED, "true").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic(topicName("DBZ7790"));
            Assertions.assertThat(recordsForTopic).hasSize(2);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("aaa");
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo("bbb");
            String createRandomStringWithAlphaNumeric = createRandomStringWithAlphaNumeric(5000);
            Clob createClob = this.connection.connection().createClob();
            createClob.setString(1L, createRandomStringWithAlphaNumeric);
            this.connection.prepareQuery("UPDATE DBZ7790 SET data = ? WHERE id = 1", preparedStatement -> {
                preparedStatement.setClob(1, createClob);
            }, (JdbcConnection.ResultSetConsumer) null);
            String createRandomStringWithAlphaNumeric2 = createRandomStringWithAlphaNumeric(5000);
            Clob createClob2 = this.connection.connection().createClob();
            createClob2.setString(1L, createRandomStringWithAlphaNumeric2);
            this.connection.prepareQuery("UPDATE DBZ7790 SET data = ? WHERE id = 2", preparedStatement2 -> {
                preparedStatement2.setClob(1, createClob2);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic2 = consumeRecordsByTopic(2).recordsForTopic(topicName("DBZ7790"));
            Assertions.assertThat(recordsForTopic2).hasSize(2);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic2.get(0);
            Assertions.assertThat(getAfterField(sourceRecord, "ID")).isEqualTo(1);
            Assertions.assertThat(getAfterField(sourceRecord, "DATA")).isEqualTo(createRandomStringWithAlphaNumeric);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(1);
            Assertions.assertThat(getAfterField(sourceRecord2, "ID")).isEqualTo(2);
            Assertions.assertThat(getAfterField(sourceRecord2, "DATA")).isEqualTo(createRandomStringWithAlphaNumeric2);
            stopConnector();
            TestHelper.dropTable(this.connection, "DBZ7790");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "DBZ7790");
            throw th;
        }
    }

    private String createRandomStringWithAlphaNumeric(int i) {
        return RandomStringUtils.randomAlphabetic(i);
    }

    private String replaceCharAt(String str, int i, char c) {
        StringBuilder sb = new StringBuilder(str);
        sb.setCharAt(i, c);
        return sb.toString();
    }

    private Clob createClob(String str) throws SQLException {
        Clob createClob = this.connection.connection().createClob();
        createClob.setString(1L, str);
        return createClob;
    }

    private NClob createNClob(String str) throws SQLException {
        NClob createNClob = this.connection.connection().createNClob();
        createNClob.setString(1L, str);
        return createNClob;
    }

    private static String part(String str, int i, int i2) {
        return str == null ? "" : str.substring(i, Math.min(i2, str.length()));
    }

    private static Struct before(SourceRecord sourceRecord) {
        return ((Struct) sourceRecord.value()).getStruct("before");
    }

    private static Struct after(SourceRecord sourceRecord) {
        return ((Struct) sourceRecord.value()).getStruct("after");
    }

    private static String topicName(String str) {
        return "server1.DEBEZIUM." + str;
    }

    private static String getClobString(Clob clob) throws SQLException {
        return clob.getSubString(1L, (int) clob.length());
    }

    private static String getUnavailableValuePlaceholder(Configuration configuration) {
        return configuration.getString(OracleConnectorConfig.UNAVAILABLE_VALUE_PLACEHOLDER);
    }

    private static Object getBeforeField(SourceRecord sourceRecord, String str) {
        return before(sourceRecord).get(str);
    }

    private static Object getAfterField(SourceRecord sourceRecord, String str) {
        return after(sourceRecord).get(str);
    }
}
