package io.synadia.flink.source.reader;

import io.nats.client.BaseConsumerContext;
import io.nats.client.Connection;
import io.nats.client.ConsumeOptions;
import io.nats.client.JetStreamApiException;
import io.nats.client.StreamContext;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.api.OrderedConsumerConfiguration;
import io.nats.client.impl.AckType;
import io.nats.client.support.SerializableConsumeOptions;
import io.synadia.flink.message.SourceConverter;
import io.synadia.flink.source.reader.JetStreamSourceReaderSplit;
import io.synadia.flink.source.split.JetStreamSplit;
import io.synadia.flink.source.split.JetStreamSplitMessage;
import io.synadia.flink.utils.ConnectionFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:io/synadia/flink/source/reader/JetStreamSourceReader.class */
public class JetStreamSourceReader<OutputT> implements SourceReader<OutputT, JetStreamSplit> {
    private static final byte[] ACK_BODY_BYTES = AckType.AckAck.bodyBytes(-1);
    private final boolean bounded;
    private final ConnectionFactory connectionFactory;
    private final SourceConverter<OutputT> sourceConverter;
    private final Map<String, JetStreamSourceReaderSplit> splitMap;
    private final FutureCompletingBlockingQueue<JetStreamSplitMessage> queue;
    private final CompletableFuture<Void> availableFuture;
    private final ExecutorService scheduler;
    private int activeSplits;
    private Connection connection;

    public JetStreamSourceReader(Boundedness boundedness, SourceConverter<OutputT> sourceConverter, ConnectionFactory connectionFactory, SourceReaderContext sourceReaderContext) {
        this.bounded = boundedness == Boundedness.BOUNDED;
        this.sourceConverter = sourceConverter;
        this.connectionFactory = connectionFactory;
        Preconditions.checkNotNull(sourceReaderContext);
        this.splitMap = new HashMap();
        this.queue = new FutureCompletingBlockingQueue<>();
        this.availableFuture = CompletableFuture.completedFuture(null);
        this.scheduler = Executors.newCachedThreadPool();
    }

    public void start() {
        try {
            this.connection = this.connectionFactory.connect();
        } catch (IOException e) {
            throw new FlinkRuntimeException(e);
        }
    }

    public InputStatus pollNext(ReaderOutput<OutputT> readerOutput) throws Exception {
        JetStreamSplitMessage jetStreamSplitMessage = (JetStreamSplitMessage) this.queue.poll();
        if (jetStreamSplitMessage == null) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        JetStreamSourceReaderSplit jetStreamSourceReaderSplit = this.splitMap.get(jetStreamSplitMessage.splitId);
        if (!jetStreamSourceReaderSplit.isFinished()) {
            readerOutput.collect(this.sourceConverter.convert(jetStreamSplitMessage.message));
            long markEmitted = jetStreamSourceReaderSplit.markEmitted(jetStreamSplitMessage.message);
            if (this.bounded && markEmitted >= jetStreamSourceReaderSplit.split.subjectConfig.maxMessagesToRead) {
                jetStreamSourceReaderSplit.done();
                int i = this.activeSplits - 1;
                this.activeSplits = i;
                if (i < 1) {
                    this.availableFuture.complete(null);
                    return InputStatus.END_OF_INPUT;
                }
            }
        }
        return this.queue.isEmpty() ? InputStatus.NOTHING_AVAILABLE : InputStatus.MORE_AVAILABLE;
    }

    public List<JetStreamSplit> snapshotState(long j) {
        ArrayList arrayList = new ArrayList();
        for (JetStreamSourceReaderSplit jetStreamSourceReaderSplit : this.splitMap.values()) {
            jetStreamSourceReaderSplit.takeSnapshot(j);
            arrayList.add(jetStreamSourceReaderSplit.split);
        }
        return Collections.unmodifiableList(arrayList);
    }

    public CompletableFuture<Void> isAvailable() {
        return this.availableFuture.isDone() ? this.availableFuture : this.queue.getAvailabilityFuture();
    }

    public void addSplits(List<JetStreamSplit> list) {
        for (JetStreamSplit jetStreamSplit : list) {
            if (!this.splitMap.containsKey(jetStreamSplit.splitId()) && !jetStreamSplit.finished.get()) {
                try {
                    StreamContext streamContext = this.connectionFactory.getConnectionContext().js.getStreamContext(jetStreamSplit.subjectConfig.streamName);
                    BaseConsumerContext createConsumer = jetStreamSplit.subjectConfig.ackMode ? createConsumer(jetStreamSplit, streamContext) : createOrderedConsumer(jetStreamSplit, streamContext);
                    SerializableConsumeOptions serializableConsumeOptions = jetStreamSplit.subjectConfig.serializableConsumeOptions;
                    this.splitMap.put(jetStreamSplit.splitId(), new JetStreamSourceReaderSplit(jetStreamSplit, createConsumer, createConsumer.consume(serializableConsumeOptions == null ? ConsumeOptions.DEFAULT_CONSUME_OPTIONS : serializableConsumeOptions.getConsumeOptions(), message -> {
                        this.queue.put(1, new JetStreamSplitMessage(jetStreamSplit.splitId(), message));
                    })));
                    this.activeSplits++;
                } catch (Exception e) {
                    throw new FlinkRuntimeException(e);
                }
            }
        }
    }

    private BaseConsumerContext createConsumer(JetStreamSplit jetStreamSplit, StreamContext streamContext) throws JetStreamApiException, IOException {
        ConsumerConfiguration.Builder filterSubject = ConsumerConfiguration.builder().ackPolicy(AckPolicy.All).filterSubject(jetStreamSplit.subjectConfig.subject);
        long j = jetStreamSplit.lastEmittedStreamSequence.get();
        if (j > 0) {
            filterSubject.deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(j + 1);
        } else {
            filterSubject.deliverPolicy(jetStreamSplit.subjectConfig.deliverPolicy);
            if (jetStreamSplit.subjectConfig.deliverPolicy == DeliverPolicy.ByStartSequence) {
                filterSubject.startSequence(jetStreamSplit.subjectConfig.startSequence);
            } else if (jetStreamSplit.subjectConfig.deliverPolicy == DeliverPolicy.ByStartTime) {
                filterSubject.startTime(jetStreamSplit.subjectConfig.startTime);
            }
        }
        return streamContext.createOrUpdateConsumer(filterSubject.build());
    }

    private BaseConsumerContext createOrderedConsumer(JetStreamSplit jetStreamSplit, StreamContext streamContext) throws JetStreamApiException, IOException {
        OrderedConsumerConfiguration filterSubject = new OrderedConsumerConfiguration().filterSubject(jetStreamSplit.subjectConfig.subject);
        long j = jetStreamSplit.lastEmittedStreamSequence.get();
        if (j > 0) {
            filterSubject.deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(j + 1);
        } else {
            filterSubject.deliverPolicy(jetStreamSplit.subjectConfig.deliverPolicy);
            if (jetStreamSplit.subjectConfig.deliverPolicy == DeliverPolicy.ByStartSequence) {
                filterSubject.startSequence(jetStreamSplit.subjectConfig.startSequence);
            } else if (jetStreamSplit.subjectConfig.deliverPolicy == DeliverPolicy.ByStartTime) {
                filterSubject.startTime(jetStreamSplit.subjectConfig.startTime);
            }
        }
        return streamContext.createOrderedConsumer(filterSubject);
    }

    public void notifyNoMoreSplits() {
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        for (JetStreamSourceReaderSplit jetStreamSourceReaderSplit : this.splitMap.values()) {
            JetStreamSourceReaderSplit.Snapshot removeSnapshot = jetStreamSourceReaderSplit.removeSnapshot(j);
            if (removeSnapshot != null && jetStreamSourceReaderSplit.split.subjectConfig.ackMode) {
                this.scheduler.execute(() -> {
                    this.connection.publish(removeSnapshot.replyTo, ACK_BODY_BYTES);
                });
            }
        }
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
    }

    public void close() throws Exception {
        Iterator<JetStreamSourceReaderSplit> it = this.splitMap.values().iterator();
        while (it.hasNext()) {
            it.next().consumer.stop();
        }
        this.connection.close();
    }
}
