package io.debezium.server.pulsar;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Named;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named("pulsar")
@Dependent
/* loaded from: input_file:io/debezium/server/pulsar/PulsarChangeConsumer.class */
public class PulsarChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final String PROP_PREFIX = "debezium.sink.pulsar.";
    private static final String PROP_CLIENT_PREFIX = "debezium.sink.pulsar.client.";
    private static final String PROP_PRODUCER_PREFIX = "debezium.sink.pulsar.producer.";
    private final Map<String, Producer<?>> producers = new HashMap();
    private PulsarClient pulsarClient;
    private Map<String, Object> producerConfig;

    @ConfigProperty(name = "debezium.sink.pulsar.null.key", defaultValue = "default")
    String nullKey;

    @ConfigProperty(name = "debezium.sink.pulsar.tenant", defaultValue = "public")
    String pulsarTenant;

    @ConfigProperty(name = "debezium.sink.pulsar.namespace", defaultValue = "default")
    String pulsarNamespace;

    @ConfigProperty(name = "debezium.sink.pulsar.timeout", defaultValue = "0")
    Integer timeout;

    @ConfigProperty(name = "debezium.sink.pulsar.producer.batcherBuilder", defaultValue = "DEFAULT")
    String batcherBuilderConfig;
    private static final Logger LOGGER = LoggerFactory.getLogger(PulsarChangeConsumer.class);
    private static final AtomicBoolean hasFailed = new AtomicBoolean(false);

    /* loaded from: input_file:io/debezium/server/pulsar/PulsarChangeConsumer$ProducerBuilder.class */
    public interface ProducerBuilder {
        Producer<Object> get(String str, Object obj);
    }

    @PostConstruct
    void connect() {
        Config config = ConfigProvider.getConfig();
        try {
            this.pulsarClient = PulsarClient.builder().loadConf(getConfigSubset(config, PROP_CLIENT_PREFIX)).build();
            this.producerConfig = getConfigSubset(config, PROP_PRODUCER_PREFIX);
        } catch (PulsarClientException e) {
            throw new DebeziumException(e);
        }
    }

    @PreDestroy
    void close() {
        ArrayList arrayList = new ArrayList(this.producers.size());
        this.producers.values().forEach(producer -> {
            arrayList.add(producer.closeAsync().orTimeout(this.timeout.intValue(), TimeUnit.MILLISECONDS));
        });
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                ((CompletableFuture) it.next()).get();
            } catch (Exception e) {
                hasFailed.set(true);
                LOGGER.warn("Exception while closing producer", e);
            }
        }
        try {
            if (hasFailed.get()) {
                LOGGER.warn("Shutting down pulsar client forcefully due to previous failure");
                this.pulsarClient.shutdown();
            } else {
                this.pulsarClient.close();
            }
        } catch (Exception e2) {
            LOGGER.warn("Exception while closing client", e2);
        }
    }

    private Producer<?> createProducer(String str, Object obj) {
        String str2 = this.pulsarTenant + "/" + this.pulsarNamespace + "/" + str;
        try {
            return obj instanceof String ? this.pulsarClient.newProducer(Schema.STRING).loadConf(this.producerConfig).topic(str2).batcherBuilder(getBatcherBuilder(this.batcherBuilderConfig)).create() : this.pulsarClient.newProducer().loadConf(this.producerConfig).topic(str2).batcherBuilder(getBatcherBuilder(this.batcherBuilderConfig)).create();
        } catch (PulsarClientException e) {
            hasFailed.set(true);
            throw new DebeziumException(e);
        }
    }

    private BatcherBuilder getBatcherBuilder(String str) {
        boolean z = -1;
        switch (str.hashCode()) {
            case -2032180703:
                if (str.equals("DEFAULT")) {
                    z = true;
                    break;
                }
                break;
            case 2040886675:
                if (str.equals("KEY_BASED")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return BatcherBuilder.KEY_BASED;
            case true:
            default:
                return BatcherBuilder.DEFAULT;
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        HashMap hashMap = new HashMap();
        for (ChangeEvent<Object, Object> changeEvent : list) {
            LOGGER.trace("Received event '{}'", changeEvent);
            String map = this.streamNameMapper.map(changeEvent.destination());
            Producer<?> computeIfAbsent = this.producers.computeIfAbsent(map, str -> {
                return createProducer(str, changeEvent.value());
            });
            hashMap.put(map, computeIfAbsent);
            String string = changeEvent.key() == null ? this.nullKey : getString(changeEvent.key());
            TypedMessageBuilder newMessage = changeEvent.value() instanceof String ? computeIfAbsent.newMessage(Schema.STRING) : computeIfAbsent.newMessage();
            newMessage.properties(convertHeaders(changeEvent)).key(string).value(changeEvent.value());
            newMessage.sendAsync().whenComplete((obj, obj2) -> {
                if (obj2 != null) {
                    LOGGER.error("Failed to send record to {} destination", changeEvent.destination(), obj2);
                    return;
                }
                LOGGER.trace("Sent message with id: {}", obj);
                try {
                    recordCommitter.markProcessed(changeEvent);
                } catch (InterruptedException e) {
                    throw new DebeziumException(e);
                }
            });
        }
        CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) hashMap.values().stream().map((v0) -> {
            return v0.flushAsync();
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
        try {
            if (this.timeout.intValue() > 0) {
                allOf.get(this.timeout.intValue(), TimeUnit.MILLISECONDS);
            } else {
                allOf.join();
            }
            recordCommitter.markBatchFinished();
        } catch (CompletionException | ExecutionException | TimeoutException e) {
            hasFailed.set(true);
            LOGGER.error("Failed to send batch", e);
            throw new DebeziumException(e);
        }
    }
}
