package io.synadia.examples;

import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.impl.ErrorListenerConsoleImpl;
import io.synadia.jnats.extension.AsyncJsPublisher;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/* loaded from: input_file:io/synadia/examples/AsyncJsPublisherCustomThreadsExample.class */
public class AsyncJsPublisherCustomThreadsExample {
    public static final String STREAM = "exampleStream";
    public static final String SUBJECT = "exampleSubject";
    public static final int PUBLISH_COUNT = 1000000;
    public static final int MAX_IN_FLIGHT = 10000;
    public static final int RESUME_AMOUNT = 1000;
    public static final long POLL_TIME = 50;
    public static final long PUBLISH_PAUSE_TIME = 100;
    public static final long WAIT_TIMEOUT = 2500;

    public static void main(String[] strArr) {
        try {
            Connection connect = Nats.connect(Options.builder().server("nats://localhost:4222").connectionListener((connection, events) -> {
                ExampleUtils.print("Connection Event", events.getEvent());
            }).errorListener(new ErrorListenerConsoleImpl()).build());
            Throwable th = null;
            try {
                ExampleUtils.setupStream(connect, "exampleStream", "exampleSubject");
                JetStream jetStream = connect.jetStream();
                ExamplePublishListener examplePublishListener = new ExamplePublishListener();
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
                AsyncJsPublisher build = AsyncJsPublisher.builder(jetStream).maxInFlight(10000).resumeAmount(1000).pollTime(50L).publishPauseTime(100L).waitTimeout(2500L).publishListener(examplePublishListener).notificationExecutorService(newFixedThreadPool).build();
                build.getClass();
                Thread thread = new Thread(build::publishRunner);
                thread.start();
                build.getClass();
                Thread thread2 = new Thread(build::flightsRunner);
                thread2.start();
                for (int i = 1; i <= 1000000; i++) {
                    build.publishAsync("exampleSubject", ("data-" + i).getBytes());
                }
                while (examplePublishListener.publishedCount.get() < 1000000) {
                    ExampleUtils.printStatus(build, examplePublishListener, false);
                    Thread.sleep(500L);
                }
                build.stop(true);
                build.getPublishRunnerDoneFuture().get(60L, TimeUnit.MILLISECONDS);
                try {
                    System.out.println("Attempting to publish after stop should throw an exception...");
                    build.publishAsync("exampleSubject", "should fail".getBytes());
                    System.out.println("SHOULD HAVE EXCEPTIONED!");
                } catch (IllegalStateException e) {
                    System.out.println("Got exception as expected: " + e);
                }
                System.out.println("Waiting for the flight runner to complete processing publish acks...");
                build.getFlightsRunnerDoneFuture().get(11L, TimeUnit.MINUTES);
                ExampleUtils.printStatus(build, examplePublishListener, true);
                build.close();
                newFixedThreadPool.shutdown();
                try {
                    build.getPublishRunnerDoneFuture().get(build.getPollTime(), TimeUnit.MILLISECONDS);
                } catch (TimeoutException e2) {
                    if (thread.isAlive()) {
                        thread.interrupt();
                    }
                }
                try {
                    build.getFlightsRunnerDoneFuture().get(build.getPollTime(), TimeUnit.MILLISECONDS);
                } catch (TimeoutException e3) {
                    if (thread2.isAlive()) {
                        thread2.interrupt();
                    }
                }
                if (connect != null) {
                    if (0 != 0) {
                        try {
                            connect.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        connect.close();
                    }
                }
            } finally {
            }
        } catch (Exception e4) {
            e4.printStackTrace();
        }
    }
}
