package io.nats.examples.jetstream;

import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.PublishOptions;
import io.nats.client.api.PublishAck;
import io.nats.client.api.StorageType;
import io.nats.client.impl.ErrorListenerConsoleImpl;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;

/* loaded from: input_file:io/nats/examples/jetstream/ResilientPublisher.class */
public class ResilientPublisher implements Runnable {
    private final Connection nc;
    private final JetStreamManagement jsm;
    private final JetStream js;
    private final String stream;
    private final String subject;
    private final AtomicLong lastPub;
    private final AtomicBoolean keepGoing;
    private boolean expectationCheck;
    private long jitter;
    private long delay;
    private boolean reporting;
    private long reportFrequency;
    private Function<Long, byte[]> dataProvider;
    private BiConsumer<Connection, Long> beforePublish;
    private BiConsumer<Connection, PublishAck> afterPublish;
    private BiConsumer<Connection, Long> publishReporter;
    private BiConsumer<Connection, Exception> exceptionReporter;

    public static void main(String[] strArr) {
        try {
            Connection connect = Nats.connect(Options.builder().socketWriteTimeout(20000L).connectionListener((connection, events) -> {
                System.out.println(events);
            }).errorListener(new ErrorListenerConsoleImpl()).build());
            Throwable th = null;
            try {
                try {
                    JetStreamManagement jetStreamManagement = connect.jetStreamManagement();
                    NatsJsUtils.createOrReplaceStream(jetStreamManagement, "js-stream", StorageType.Memory, "js-subject");
                    Thread thread = new Thread(new ResilientPublisher(connect, jetStreamManagement, "js-stream", "js-subject").basicDataPrefix("data").delay(1L).reportFrequency(1000L));
                    thread.start();
                    thread.join();
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            connect.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public ResilientPublisher(Connection connection, String str) {
        this(connection, null, null, str);
    }

    public ResilientPublisher(Connection connection, JetStreamManagement jetStreamManagement, String str, String str2) {
        this.nc = connection;
        if (jetStreamManagement == null) {
            this.jsm = null;
            this.js = null;
            this.stream = null;
        } else {
            this.jsm = jetStreamManagement;
            this.js = jetStreamManagement.jetStream();
            this.stream = str;
        }
        this.subject = str2;
        this.lastPub = new AtomicLong();
        this.keepGoing = new AtomicBoolean(true);
        basicDataPrefix(null);
        beforePublish(null);
        afterPublish(null);
        publishReporter(null);
        exceptionReporter(null);
    }

    public ResilientPublisher expectationCheck(boolean z) {
        this.expectationCheck = z;
        return this;
    }

    public ResilientPublisher jitter(long j) {
        this.jitter = j;
        return this;
    }

    public ResilientPublisher delay(long j) {
        this.delay = j;
        return this;
    }

    public ResilientPublisher reportFrequency(long j) {
        this.reportFrequency = j;
        this.reporting = j > 0;
        return this;
    }

    public ResilientPublisher basicDataPrefix(String str) {
        this.dataProvider = str == null ? l -> {
            return null;
        } : l2 -> {
            return (str + "-" + l2).getBytes();
        };
        return this;
    }

    public ResilientPublisher dataProvider(Function<Long, byte[]> function) {
        this.dataProvider = function == null ? l -> {
            return null;
        } : function;
        return this;
    }

    public ResilientPublisher beforePublish(BiConsumer<Connection, Long> biConsumer) {
        this.beforePublish = biConsumer == null ? (connection, l) -> {
        } : biConsumer;
        return this;
    }

    public ResilientPublisher afterPublish(BiConsumer<Connection, PublishAck> biConsumer) {
        this.afterPublish = biConsumer == null ? (connection, publishAck) -> {
        } : biConsumer;
        return this;
    }

    public ResilientPublisher publishReporter(BiConsumer<Connection, Long> biConsumer) {
        this.publishReporter = biConsumer == null ? (connection, l) -> {
            NatsJsUtils.report("Published Id: " + l);
        } : biConsumer;
        return this;
    }

    public ResilientPublisher exceptionReporter(BiConsumer<Connection, Exception> biConsumer) {
        this.exceptionReporter = biConsumer == null ? (connection, exc) -> {
            NatsJsUtils.report("Publish Exception: " + exc);
        } : biConsumer;
        return this;
    }

    public void stop() {
        this.keepGoing.set(false);
    }

    public long getLastPub() {
        return this.lastPub.get();
    }

    @Override // java.lang.Runnable
    public void run() {
        JetStreamApiException jetStreamApiException = null;
        long j = 0;
        while (this.keepGoing.get()) {
            try {
                if (this.jitter > 0) {
                    Thread.sleep(ThreadLocalRandom.current().nextLong(this.jitter));
                }
                if (this.delay > 0) {
                    Thread.sleep(this.delay);
                }
                long j2 = this.lastPub.get();
                long incrementAndGet = this.lastPub.incrementAndGet();
                this.beforePublish.accept(this.nc, Long.valueOf(incrementAndGet));
                if (this.js == null) {
                    this.nc.publish(this.subject, this.dataProvider.apply(Long.valueOf(incrementAndGet)));
                } else {
                    this.afterPublish.accept(this.nc, this.js.publish(this.subject, this.dataProvider.apply(Long.valueOf(incrementAndGet)), this.expectationCheck ? PublishOptions.builder().expectedLastSequence(j2).build() : null));
                }
                if (this.reporting && (jetStreamApiException != null || System.currentTimeMillis() > j)) {
                    this.publishReporter.accept(this.nc, Long.valueOf(incrementAndGet));
                    j = System.currentTimeMillis() + this.reportFrequency;
                }
                jetStreamApiException = null;
            } catch (Exception e) {
                boolean z = jetStreamApiException == null;
                if (e instanceof JetStreamApiException) {
                    JetStreamApiException jetStreamApiException2 = e;
                    if (jetStreamApiException2.getApiErrorCode() == 10071) {
                        try {
                            this.lastPub.set(this.jsm.getLastMessage(this.stream, this.subject).getSeq());
                        } catch (Exception e2) {
                        }
                    }
                    if (!z && (jetStreamApiException instanceof JetStreamApiException)) {
                        z = jetStreamApiException2.getApiErrorCode() != jetStreamApiException.getApiErrorCode();
                    }
                }
                if (!z && jetStreamApiException.getClass().getSimpleName().equals(e.getClass().getSimpleName())) {
                    z = true;
                }
                if (z || System.currentTimeMillis() > j) {
                    this.exceptionReporter.accept(this.nc, e);
                    j = System.currentTimeMillis() + this.reportFrequency;
                }
                jetStreamApiException = e;
            }
        }
    }
}
