package io.debezium.connector.sqlserver;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.signal.actions.Log;
import io.debezium.util.Testing;
import java.lang.management.ManagementFactory;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/sqlserver/SignalsIT.class */
public class SignalsIT extends AbstractAsyncEngineConnectorTest {
    private SqlServerConnection connection;

    @Before
    public void before() throws SQLException {
        TestHelper.createTestDatabases(TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2);
        this.connection = TestHelper.multiPartitionTestConnection();
        this.connection.execute(new String[]{"USE testDB1", "CREATE TABLE tableA (id int primary key, colA varchar(32))", "CREATE TABLE tableB (id int primary key, colB varchar(32))", "INSERT INTO tableA VALUES(1, 'a1')", "INSERT INTO tableB VALUES(2, 'b')"});
        TestHelper.enableTableCdc(this.connection, "tableA");
        TestHelper.enableTableCdc(this.connection, "tableB");
        this.connection.execute(new String[]{"USE testDB2", "CREATE TABLE tableA (id int primary key, colA varchar(32))", "CREATE TABLE tableC (id int primary key, colC varchar(32))", "INSERT INTO tableA VALUES(3, 'a2')", "INSERT INTO tableC VALUES(4, 'c')"});
        TestHelper.enableTableCdc(this.connection, "tableA");
        TestHelper.enableTableCdc(this.connection, "tableC");
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
    }

    @After
    public void after() throws SQLException {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @Test
    public void jmxSignals() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(Log.class);
        start(SqlServerConnector.class, TestHelper.defaultConfig(TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2).with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, "500").with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "jmx").with("tasks.max", 2).build());
        assertConnectorIsRunning();
        sendLogSignalWithJmx("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", "0");
        sendLogSignalWithJmx("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", "1");
        Awaitility.await("Waiting for metrics to appear").atMost(waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(logInterceptor.countOccurrences("Signal message at offset") == 2);
        });
        Assertions.assertThat(logInterceptor.countOccurrences("Signal message at offset")).isEqualTo(2L);
    }

    private void sendLogSignalWithJmx(String str, String str2, String str3, String str4) throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, MBeanException {
        ObjectName objectName = new ObjectName(String.format("debezium.sql_server:type=management,context=signals,server=server1,task=%s", str4));
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        Awaitility.await("Waiting for metrics to appear").atMost(waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
            try {
                platformMBeanServer.getObjectInstance(objectName);
                return true;
            } catch (InstanceNotFoundException e) {
                return false;
            }
        });
        platformMBeanServer.invoke(objectName, "signal", new Object[]{str, str2, str3}, new String[]{String.class.getName(), String.class.getName(), String.class.getName()});
    }
}
