package io.debezium.performance.engine;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.embedded.async.AsyncEngineConfig;
import io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.KeyValueChangeEventFormat;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.util.IoUtil;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.awaitility.Awaitility;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;

/* loaded from: input_file:io/debezium/performance/engine/PostgresEndToEndPerf.class */
public class PostgresEndToEndPerf {
    private static final String HOST = "localhost";
    private static final int PORT = 5432;
    private static final String USER = "postgres";
    private static final String PASSWORD = "postgres";
    private static final String DATABASE = "postgres";
    private static final String SERVER_NAME = "server1";
    private static final String BASE_TABLE_NAME = "inventory.test";
    private static final KeyValueChangeEventFormat KV_EVENT_FORMAT = KeyValueChangeEventFormat.of(Json.class, Json.class);

    @State(Scope.Thread)
    /* loaded from: input_file:io/debezium/performance/engine/PostgresEndToEndPerf$AsyncEngineEndToEndPerfTest.class */
    public static class AsyncEngineEndToEndPerfTest extends DebeziumEndToEndPerfTest {

        @Param({"1", "2", "4", "8", "16"})
        public int threadCount;

        @Param({"ORDERED", "UNORDERED"})
        public String processingOrder;

        @Override // io.debezium.performance.engine.PostgresEndToEndPerf.DebeziumEndToEndPerfTest
        public String getBaseTableName() {
            return "inventory.test_async_" + this.threadCount + "_" + this.processingOrder;
        }

        @Override // io.debezium.performance.engine.PostgresEndToEndPerf.DebeziumEndToEndPerfTest
        public DebeziumEngine createEngine() {
            return new ConvertingAsyncEngineBuilderFactory().builder(PostgresEndToEndPerf.KV_EVENT_FORMAT).using(PostgresEndToEndPerf.addSmtConfig(PostgresEndToEndPerf.defaultConnectorConfig().with(PostgresConnectorConfig.SLOT_NAME, "async_" + this.eventCount).with(AsyncEngineConfig.RECORD_PROCESSING_SHUTDOWN_TIMEOUT_MS, 100).with(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, 5000).with(AsyncEngineConfig.RECORD_PROCESSING_THREADS, this.threadCount).with(AsyncEngineConfig.RECORD_PROCESSING_ORDER, this.processingOrder).build())).notifying(PostgresEndToEndPerf.getRecordConsumer(this.consumedLines)).using(getClass().getClassLoader()).build();
        }
    }

    @State(Scope.Thread)
    /* loaded from: input_file:io/debezium/performance/engine/PostgresEndToEndPerf$DebeziumEndToEndPerfTest.class */
    public static abstract class DebeziumEndToEndPerfTest {
        private DebeziumEngine engine;
        private ExecutorService executors;
        protected BlockingQueue<EmbeddedEngineChangeEvent> consumedLines;
        protected AtomicInteger count = new AtomicInteger(0);

        @Param({"100000", "1000000"})
        public int eventCount;

        public abstract String getBaseTableName();

        public abstract DebeziumEngine createEngine();

        @Setup(Level.Iteration)
        public void doSetup() {
            String str = getBaseTableName() + "_" + this.eventCount;
            PostgresEndToEndPerf.delete("offsets.txt");
            PostgresEndToEndPerf.recreateTable(str);
            this.consumedLines = new ArrayBlockingQueue(this.eventCount);
            this.engine = createEngine();
            this.executors = Executors.newFixedThreadPool(1);
            this.executors.execute(this.engine);
            PostgresEndToEndPerf.waitForStreamingToStart();
            PostgresEndToEndPerf.createDmlEvents(str, this.eventCount);
        }

        @TearDown(Level.Iteration)
        public void doCleanup() throws Exception {
            try {
                if (this.engine != null) {
                    this.engine.close();
                }
                if (this.executors != null) {
                    this.executors.shutdown();
                    try {
                        this.executors.awaitTermination(4L, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            } finally {
                this.executors.shutdownNow();
                this.engine = null;
                this.executors = null;
            }
        }
    }

    private static JdbcConfiguration defaultJdbcConfig() {
        return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")).withDefault(JdbcConfiguration.HOSTNAME, HOST).withDefault(JdbcConfiguration.PORT, PORT).withDefault(JdbcConfiguration.USER, "postgres").withDefault(JdbcConfiguration.PASSWORD, "postgres").withDefault(JdbcConfiguration.DATABASE, "postgres").build();
    }

    private static Configuration.Builder defaultConnectorConfig() {
        JdbcConfiguration defaultJdbcConfig = defaultJdbcConfig();
        Configuration.Builder create = Configuration.create();
        defaultJdbcConfig.forEach((str, str2) -> {
            create.with("database." + str, str2);
        });
        return create.with(CommonConnectorConfig.TOPIC_PREFIX, SERVER_NAME).with(PostgresConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, true).with(EmbeddedEngineConfig.ENGINE_NAME, "benchmark").with(EmbeddedEngineConfig.CONNECTOR_CLASS, PostgresConnector.class).with("offset.storage.file.filename", getPath("offsets.txt").toAbsolutePath()).with(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0);
    }

    private static Properties addSmtConfig(Configuration configuration) {
        Properties asProperties = configuration.asProperties();
        asProperties.setProperty("transforms", "replace");
        asProperties.setProperty("transforms.replace.type", "org.apache.kafka.connect.transforms.ReplaceField$Value");
        asProperties.setProperty("transforms.replace.renames", "name:transformed_name");
        asProperties.setProperty("transforms.replace.exclude", "id");
        return asProperties;
    }

    private static Consumer<ChangeEvent<String, String>> getRecordConsumer(BlockingQueue<EmbeddedEngineChangeEvent> blockingQueue) {
        return changeEvent -> {
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            while (!blockingQueue.offer((EmbeddedEngineChangeEvent) changeEvent) && !Thread.currentThread().isInterrupted()) {
            }
        };
    }

    private static void recreateTable(String str) {
        PostgresConnection testConnection = getTestConnection();
        try {
            testConnection.execute(new String[]{"DROP TABLE IF EXISTS " + str});
        } catch (SQLException e) {
            e.printStackTrace();
        }
        try {
            testConnection.execute(new String[]{"CREATE TABLE " + str + " (id numeric(9,0) primary key, name varchar(50))"});
        } catch (SQLException e2) {
            throw new RuntimeException("Failed to create table", e2);
        }
    }

    private static void createDmlEvents(String str, int i) {
        PostgresConnection testConnection = getTestConnection();
        for (int i2 = 0; i2 < i; i2++) {
            try {
                StringBuilder sb = new StringBuilder("INSERT INTO " + str + " (id, name) values (");
                sb.append(i2).append(",").append("'Test").append(i2).append("')");
                testConnection.executeWithoutCommitting(new String[]{sb.toString()});
            } catch (SQLException e) {
                throw new RuntimeException("Failed to insert data set", e);
            }
        }
        testConnection.commit();
    }

    private static PostgresConnection getTestConnection() {
        PostgresConnection postgresConnection = new PostgresConnection(defaultJdbcConfig(), "test_connection");
        try {
            postgresConnection.setAutoCommit(false);
            return postgresConnection;
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    private static void waitForStreamingToStart() {
        Awaitility.await().alias("Streaming was not started on time").pollInterval(100L, TimeUnit.MILLISECONDS).atMost(30L, TimeUnit.SECONDS).ignoreException(InstanceNotFoundException.class).until(() -> {
            try {
                return Boolean.valueOf(((Boolean) ManagementFactory.getPlatformMBeanServer().getAttribute(getMbeanName(), "Connected")).booleanValue());
            } catch (JMException e) {
                return false;
            }
        });
    }

    private static ObjectName getMbeanName() throws MalformedObjectNameException {
        return new ObjectName("debezium.postgres:type=connector-metrics,context=streaming,server=server1");
    }

    private static Path getPath(String str) {
        return Paths.get(resolveDataDir(), str).toAbsolutePath();
    }

    private static void delete(String str) {
        Path absolutePath = getPath(str).toAbsolutePath();
        if (absolutePath != null) {
            Path absolutePath2 = absolutePath.toAbsolutePath();
            if (inTestDataDir(absolutePath2)) {
                try {
                    IoUtil.delete(absolutePath2);
                } catch (IOException e) {
                }
            }
        }
    }

    private static boolean inTestDataDir(Path path) {
        return path.toAbsolutePath().startsWith(FileSystems.getDefault().getPath(resolveDataDir(), new String[0]).toAbsolutePath());
    }

    private static String resolveDataDir() {
        String property = System.getProperty("dbz.test.data.dir");
        if (property != null) {
            String trim = property.trim();
            if (trim.length() > 0) {
                return trim;
            }
        }
        String str = System.getenv("DBZ_TEST_DATA_DIR");
        if (str == null) {
            return "/tmp";
        }
        String trim2 = str.trim();
        return trim2.length() > 0 ? trim2 : "/tmp";
    }

    @Warmup(iterations = 1)
    @Measurement(iterations = 1, time = 1)
    @Benchmark
    @OutputTimeUnit(TimeUnit.SECONDS)
    @Fork(1)
    @BenchmarkMode({Mode.SingleShotTime})
    public void processRecordsAsyncEngine(AsyncEngineEndToEndPerfTest asyncEngineEndToEndPerfTest) {
        ArrayList arrayList = new ArrayList();
        while (arrayList.size() < asyncEngineEndToEndPerfTest.eventCount) {
            ArrayList arrayList2 = new ArrayList();
            asyncEngineEndToEndPerfTest.consumedLines.drainTo(arrayList2);
            arrayList.addAll(arrayList2);
        }
    }
}
