package io.synadia.flink.sink.writer;

import io.nats.client.Connection;
import io.synadia.flink.message.SinkConverter;
import io.synadia.flink.message.SinkMessage;
import io.synadia.flink.utils.ConnectionContext;
import io.synadia.flink.utils.ConnectionFactory;
import io.synadia.flink.utils.MiscUtils;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;

@Internal
/* loaded from: input_file:io/synadia/flink/sink/writer/NatsSinkWriter.class */
public class NatsSinkWriter<InputT> implements SinkWriter<InputT>, Serializable {
    protected final String sinkId;
    protected final List<String> subjects;
    protected final ConnectionFactory connectionFactory;
    protected final SinkConverter<InputT> sinkConverter;
    protected final WriterInitContext writerInitContext;
    protected final String id;
    protected transient ConnectionContext ctx;

    public NatsSinkWriter(String str, List<String> list, SinkConverter<InputT> sinkConverter, ConnectionFactory connectionFactory, WriterInitContext writerInitContext) throws IOException {
        this.sinkId = str;
        this.id = MiscUtils.generatePrefixedId(str);
        this.subjects = Collections.unmodifiableList(list);
        this.sinkConverter = sinkConverter;
        this.connectionFactory = connectionFactory;
        this.writerInitContext = writerInitContext;
        this.ctx = connectionFactory.getConnectionContext();
    }

    public String getId() {
        return this.id;
    }

    public void write(InputT inputt, SinkWriter.Context context) throws IOException, InterruptedException {
        SinkMessage convert = this.sinkConverter.convert(inputt);
        if (convert != null) {
            Iterator<String> it = this.subjects.iterator();
            while (it.hasNext()) {
                this.ctx.connection.publish(it.next(), (String) null, convert.headers, convert.payload);
            }
        }
    }

    public void flush(boolean z) throws IOException, InterruptedException {
        if (this.ctx.connection.getStatus() == Connection.Status.CONNECTED) {
            this.ctx.connection.flushBuffer();
        }
    }

    public void close() throws Exception {
        this.ctx.connection.close();
    }

    protected void readObject(ObjectInputStream objectInputStream) throws ClassNotFoundException, IOException {
        objectInputStream.defaultReadObject();
        this.ctx = this.connectionFactory.getConnectionContext();
    }

    public String toString() {
        return getClass().getSimpleName() + "{sinkId='" + this.sinkId + "', id='" + this.id + "', subjects=" + String.valueOf(this.subjects) + "}";
    }
}
