package io.debezium.connector.spanner.kafka.internal;

import io.debezium.connector.spanner.exception.SpannerConnectorException;
import io.debezium.connector.spanner.kafka.internal.model.MessageTypeEnum;
import io.debezium.connector.spanner.kafka.internal.model.TaskSyncEvent;
import io.debezium.connector.spanner.kafka.internal.proto.SyncEventToProtoMapper;
import io.debezium.connector.spanner.task.LoggerUtils;
import io.debezium.connector.spanner.task.TaskSyncContextHolder;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/kafka/internal/TaskSyncPublisher.class */
public class TaskSyncPublisher {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskSyncPublisher.class);
    private final String topic;
    private final KafkaProducer<String, byte[]> producer;
    private volatile Instant lastTime;
    private final BufferedPublisher<TaskSyncEvent> bufferedPublisher;
    private final Consumer<RuntimeException> errorHandler;
    private final String taskUid;

    public TaskSyncPublisher(String str, String str2, int i, ProducerFactory<String, byte[]> producerFactory, TaskSyncContextHolder taskSyncContextHolder, Consumer<RuntimeException> consumer) {
        this.topic = str2;
        this.producer = producerFactory.createProducer();
        this.errorHandler = consumer;
        this.taskUid = str;
        if (i <= 0) {
            this.bufferedPublisher = null;
        } else {
            this.bufferedPublisher = new BufferedPublisher<>(this.taskUid, "Buffer-Pub", taskSyncContextHolder, i, this::publishImmediately, this::publishSyncEvent);
            this.bufferedPublisher.start();
        }
    }

    public void send(TaskSyncEvent taskSyncEvent) {
        if (this.bufferedPublisher == null) {
            publishSyncEvent(taskSyncEvent);
        } else {
            LOGGER.debug("Buffering Sync Event, type: {}, timestamp: {}", taskSyncEvent.getMessageType(), Long.valueOf(taskSyncEvent.getMessageTimestamp()));
            this.bufferedPublisher.buffer(taskSyncEvent);
        }
    }

    private void publishSyncEvent(TaskSyncEvent taskSyncEvent) {
        LoggerUtils.debug(LOGGER, "Send SyncEvent to Kafka topic, type: {}, timestamp: {}, event: {}", taskSyncEvent.getMessageType(), Long.valueOf(taskSyncEvent.getMessageTimestamp()), taskSyncEvent);
        ProducerRecord producerRecord = new ProducerRecord(this.topic, taskSyncEvent.getTaskUid(), SyncEventToProtoMapper.mapToProto(taskSyncEvent).toByteArray());
        try {
            Instant now = Instant.now();
            this.producer.send(producerRecord).get();
            this.producer.flush();
            if (Instant.now().isAfter(now.plus((TemporalAmount) Duration.ofSeconds(60L)))) {
                LOGGER.info("Task Uid {} published record {} with {} seconds latency", new Object[]{this.taskUid, producerRecord, Long.valueOf(Instant.now().getEpochSecond() - now.getEpochSecond())});
            }
            this.lastTime = Instant.now();
        } catch (InterruptedException e) {
        } catch (ExecutionException e2) {
            this.errorHandler.accept(new SpannerConnectorException("Error during publishing to the Sync Topic", e2));
        }
    }

    public void close() {
        if (this.bufferedPublisher != null) {
            this.bufferedPublisher.close();
        }
        this.producer.close();
    }

    public Instant getLastTime() {
        return this.lastTime;
    }

    private boolean publishImmediately(TaskSyncEvent taskSyncEvent) {
        return (taskSyncEvent.getMessageType() == null || taskSyncEvent.getMessageType() == MessageTypeEnum.REGULAR) ? false : true;
    }
}
