package io.debezium.connector.jdbc.integration.mysql;

import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.integration.AbstractJdbcSinkTest;
import io.debezium.connector.jdbc.junit.jupiter.MySqlSinkDatabaseContextProvider;
import io.debezium.connector.jdbc.junit.jupiter.Sink;
import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider;
import io.debezium.connector.jdbc.util.SinkRecordFactory;
import io.debezium.doc.FixFor;
import io.debezium.sink.SinkConnectorConfig;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.hibernate.PessimisticLockException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

@Tags({@Tag("all"), @Tag("it"), @Tag("it-mysql")})
@ExtendWith({MySqlSinkDatabaseContextProvider.class})
/* loaded from: input_file:io/debezium/connector/jdbc/integration/mysql/JdbcSinkRetryIT.class */
public class JdbcSinkRetryIT extends AbstractJdbcSinkTest {
    public JdbcSinkRetryIT(Sink sink) {
        super(sink);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @FixFor({"DBZ-7291"})
    @ParameterizedTest
    public void testRetryToFlushBufferWhenRetriableExceptionOccurred(SinkRecordFactory sinkRecordFactory) throws SQLException {
        String randomTableName = randomTableName();
        Connection connection = getSink().getConnection();
        Statement createStatement = connection.createStatement();
        try {
            createStatement.execute("CREATE TABLE `" + randomTableName + "` (\n  `id` bigint(20) NOT NULL,\n  `content` varchar(200) DEFAULT NULL,\n  `dt` datetime DEFAULT NULL,\n  PRIMARY KEY (`id`),\n  KEY `dt_idx` (`dt`))");
            createStatement.execute("INSERT INTO " + randomTableName + " (`id`, `content`, `dt`) VALUES (1, 'c1', now()), (2, 'c2', now()), (3, 'c3', now())");
            if (createStatement != null) {
                createStatement.close();
            }
            connection.setAutoCommit(false);
            Statement createStatement2 = connection.createStatement();
            createStatement2.execute("START TRANSACTION");
            createStatement2.execute("SELECT * FROM " + randomTableName + " WHERE id=1 LOCK IN SHARE MODE;");
            Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
            defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
            defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
            defaultSinkConfig.put("primary.key.fields", "id");
            defaultSinkConfig.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
            defaultSinkConfig.put("connection.url", getSink().getJdbcUrl(Map.of("sessionVariables", "innodb_lock_wait_timeout=5")));
            defaultSinkConfig.put("collection.name.format", randomTableName);
            defaultSinkConfig.put("flush.max.retries", "1");
            defaultSinkConfig.put("flush.retry.delay.ms", "1000");
            startSinkConnector(defaultSinkConfig);
            assertSinkConnectorIsRunning();
            try {
                try {
                    consume(sinkRecordFactory.updateRecordWithSchemaValue(topicName("server1", "schema", randomTableName), (byte) 1, "content", Schema.OPTIONAL_STRING_SCHEMA, "c11"));
                    consume(Collections.emptyList());
                    Assertions.fail();
                    connection.close();
                    stopSinkConnector();
                } catch (Exception e) {
                    org.fest.assertions.Assertions.assertThat(e.getCause().getMessage()).matches("Exceeded max retries [0-9]* times, failed to process sink records");
                    Assertions.assertEquals(e.getCause().getCause().getClass(), PessimisticLockException.class);
                    org.fest.assertions.Assertions.assertThat(e.getCause().getCause().getCause().getMessage()).matches("Lock wait timeout exceeded; try restarting transaction");
                    connection.close();
                    stopSinkConnector();
                }
            } catch (Throwable th) {
                connection.close();
                stopSinkConnector();
                throw th;
            }
        } catch (Throwable th2) {
            if (createStatement != null) {
                try {
                    createStatement.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }
}
