package io.debezium.server.rocketmq;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named("rocketmq")
@Dependent
/* loaded from: input_file:io/debezium/server/rocketmq/RocketMqChangeConsumer.class */
public class RocketMqChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.rocketmq.";
    private static final String PROP_PRODUCER_PREFIX = "debezium.sink.rocketmq.producer.";
    private static final String PROP_PRODUCER_ACL_ENABLE = "debezium.sink.rocketmq.producer.acl.enabled";
    private static final String PROP_PRODUCER_ACCESS_KEY = "debezium.sink.rocketmq.producer.access.key";
    private static final String PROP_PRODUCER_SECRET_KEY = "debezium.sink.rocketmq.producer.secret.key";
    private static final String PROP_PRODUCER_NAME_SRV_ADDR = "debezium.sink.rocketmq.producer.name.srv.addr";
    private static final String PROP_PRODUCER_GROUP = "debezium.sink.rocketmq.producer.group";
    private static final String PROP_PRODUCER_MAX_MESSAGE_SIZE = "debezium.sink.rocketmq.producer.max.message.size";
    private static final String PROP_PRODUCER_SEND_MSG_TIMEOUT = "debezium.sink.rocketmq.producer.send.msg.timeout";

    @Inject
    @CustomConsumerBuilder
    Instance<DefaultMQProducer> customRocketMqProducer;
    private DefaultMQProducer mqProducer;

    @PostConstruct
    void connect() {
        if (this.customRocketMqProducer.isResolvable()) {
            this.mqProducer = (DefaultMQProducer) this.customRocketMqProducer.get();
            startProducer();
            LOGGER.info("Obtained custom configured RocketMqProducer '{}'", this.mqProducer);
            return;
        }
        Config config = ConfigProvider.getConfig();
        AclClientRPCHook aclClientRPCHook = null;
        Optional optionalValue = config.getOptionalValue(PROP_PRODUCER_ACL_ENABLE, Boolean.class);
        if (optionalValue.isPresent() && ((Boolean) optionalValue.get()).booleanValue()) {
            if (config.getOptionalValue(PROP_PRODUCER_ACCESS_KEY, String.class).isEmpty() || config.getOptionalValue(PROP_PRODUCER_SECRET_KEY, String.class).isEmpty()) {
                throw new DebeziumException("When acl.enabled is true, access key and secret key cannot be empty");
            }
            aclClientRPCHook = new AclClientRPCHook(new SessionCredentials((String) config.getValue(PROP_PRODUCER_ACCESS_KEY, String.class), (String) config.getValue(PROP_PRODUCER_SECRET_KEY, String.class)));
        }
        this.mqProducer = new DefaultMQProducer(aclClientRPCHook);
        this.mqProducer.setNamesrvAddr((String) config.getValue(PROP_PRODUCER_NAME_SRV_ADDR, String.class));
        this.mqProducer.setInstanceName(createUniqInstance((String) config.getValue(PROP_PRODUCER_NAME_SRV_ADDR, String.class)));
        this.mqProducer.setProducerGroup((String) config.getValue(PROP_PRODUCER_GROUP, String.class));
        if (config.getOptionalValue(PROP_PRODUCER_SEND_MSG_TIMEOUT, Integer.class).isPresent()) {
            this.mqProducer.setSendMsgTimeout(((Integer) config.getValue(PROP_PRODUCER_SEND_MSG_TIMEOUT, Integer.class)).intValue());
        }
        if (config.getOptionalValue(PROP_PRODUCER_MAX_MESSAGE_SIZE, Integer.class).isPresent()) {
            this.mqProducer.setMaxMessageSize(((Integer) config.getValue(PROP_PRODUCER_MAX_MESSAGE_SIZE, Integer.class)).intValue());
        }
        this.mqProducer.setLanguage(LanguageCode.JAVA);
        startProducer();
    }

    private void startProducer() {
        try {
            this.mqProducer.start();
            LOGGER.info("Consumer started...");
        } catch (MQClientException e) {
            throw new DebeziumException(e);
        }
    }

    private String createUniqInstance(String str) {
        return str.concat("-").concat(UUID.randomUUID().toString());
    }

    @PreDestroy
    void close() {
        LOGGER.info("Consumer destroy...");
        if (this.mqProducer != null) {
            this.mqProducer.shutdown();
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(list.size());
        for (final ChangeEvent<Object, Object> changeEvent : list) {
            try {
                String map = this.streamNameMapper.map(changeEvent.destination());
                String string = getString(changeEvent.key());
                Message message = new Message(map, (String) null, string, getBytes(changeEvent.value()));
                for (Map.Entry entry : convertHeaders(changeEvent).entrySet()) {
                    message.putUserProperty((String) entry.getKey(), (String) entry.getValue());
                }
                this.mqProducer.send(message, new SelectMessageQueueByHash(), string, new SendCallback(this) { // from class: io.debezium.server.rocketmq.RocketMqChangeConsumer.1
                    public void onSuccess(SendResult sendResult) {
                        RocketMqChangeConsumer.LOGGER.debug("Sent message with offset: {}", Long.valueOf(sendResult.getQueueOffset()));
                        countDownLatch.countDown();
                    }

                    public void onException(Throwable th) {
                        RocketMqChangeConsumer.LOGGER.error("Failed to send record to {}:", changeEvent.destination(), th);
                        throw new DebeziumException(th);
                    }
                });
            } catch (Exception e) {
                throw new DebeziumException(e);
            }
        }
        countDownLatch.await();
        Iterator<ChangeEvent<Object, Object>> it = list.iterator();
        while (it.hasNext()) {
            recordCommitter.markProcessed(it.next());
        }
        recordCommitter.markBatchFinished();
    }
}
