package io.synadia.flink.source.reader;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Message;
import io.synadia.flink.message.SourceConverter;
import io.synadia.flink.source.split.NatsSubjectSplit;
import io.synadia.flink.utils.ConnectionFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:io/synadia/flink/source/reader/NatsSourceReader.class */
public class NatsSourceReader<OutputT> implements SourceReader<OutputT, NatsSubjectSplit> {
    private final ConnectionFactory connectionFactory;
    private final SourceConverter<OutputT> sourceConverter;
    private final List<NatsSubjectSplit> subbedSplits;
    private final FutureCompletingBlockingQueue<Message> messages;
    private final ReentrantLock connectionLock;
    private Connection _connection;
    private Dispatcher dispatcher;

    public NatsSourceReader(ConnectionFactory connectionFactory, SourceConverter<OutputT> sourceConverter, SourceReaderContext sourceReaderContext) {
        this.connectionFactory = connectionFactory;
        this.sourceConverter = sourceConverter;
        Preconditions.checkNotNull(sourceReaderContext);
        this.subbedSplits = new ArrayList();
        this.messages = new FutureCompletingBlockingQueue<>();
        this.connectionLock = new ReentrantLock();
    }

    public void start() {
        getConnection();
    }

    private Connection getConnection() {
        this.connectionLock.lock();
        try {
            if (this._connection == null) {
                try {
                    this._connection = this.connectionFactory.connect();
                    this.dispatcher = this._connection.createDispatcher(message -> {
                        this.messages.put(1, message);
                    });
                } catch (IOException e) {
                    throw new FlinkRuntimeException(e);
                }
            }
            return this._connection;
        } finally {
            this.connectionLock.unlock();
        }
    }

    public InputStatus pollNext(ReaderOutput<OutputT> readerOutput) throws Exception {
        Message message = (Message) this.messages.poll();
        if (message == null) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        readerOutput.collect(this.sourceConverter.convert(message));
        return this.messages.isEmpty() ? InputStatus.NOTHING_AVAILABLE : InputStatus.MORE_AVAILABLE;
    }

    public List<NatsSubjectSplit> snapshotState(long j) {
        return Collections.unmodifiableList(this.subbedSplits);
    }

    public CompletableFuture<Void> isAvailable() {
        return this.messages.getAvailabilityFuture();
    }

    public void addSplits(List<NatsSubjectSplit> list) {
        Connection connection = null;
        for (NatsSubjectSplit natsSubjectSplit : list) {
            if (this.subbedSplits.indexOf(natsSubjectSplit) == -1) {
                if (connection == null) {
                    connection = getConnection();
                }
                this.dispatcher.subscribe(natsSubjectSplit.getSubject());
                this.subbedSplits.add(natsSubjectSplit);
            }
        }
    }

    public void notifyNoMoreSplits() {
    }

    public void close() throws Exception {
        this.connectionLock.lock();
        try {
            if (this._connection != null) {
                this._connection.close();
            }
        } finally {
            this._connection = null;
            this.connectionLock.unlock();
        }
    }

    public String toString() {
        return "NatsSourceReader{, subbedSplits=" + String.valueOf(this.subbedSplits) + "}";
    }
}
