package io.synadia.flink.examples;

import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.api.StorageType;
import io.synadia.flink.examples.support.ExampleUtils;
import io.synadia.flink.examples.support.Publisher;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/synadia/flink/examples/JetStreamExampleHelper.class */
public class JetStreamExampleHelper {
    public static final String SOURCE_A_STREAM = "source-a";
    public static final String SOURCE_B_STREAM = "source-b";
    public static final String SOURCE_A_SUBJECT = "ua";
    public static final int SOURCES_TOTAL_MESSAGES;
    public static final int SOURCES_MOST_MESSAGES_ANY_SUBJECT;
    public static final String SINK_STREAM_NAME = "sink";
    public static final String SINK_SUBJECT = "uk";
    public static final String[] SOURCE_B_SUBJECTS = {"ub1", "ub2", "ub3", "ub4"};
    public static final int[] SOURCE_B_MESSAGE_COUNTS = {30000, 70000, 50000, 90000};
    public static final StorageType SOURCE_A_STORAGE_TYPE = StorageType.Memory;
    public static final StorageType SOURCE_B_STORAGE_TYPE = StorageType.Memory;
    public static final StorageType SINK_STORAGE_TYPE = StorageType.Memory;
    public static final int SOURCE_A_MESSAGE_COUNT = 100000;
    public static final int SOURCES_TOTAL_SUBJECT_COUNT = SOURCE_A_MESSAGE_COUNT + SOURCE_B_MESSAGE_COUNTS.length;

    public static void setupSinkStream(Connection connection) throws IOException, JetStreamApiException {
        System.out.println("Setting up sink stream.");
        ExampleUtils.createOrReplaceStream(connection, SINK_STORAGE_TYPE, SINK_STREAM_NAME, SINK_SUBJECT);
    }

    public static void setupDataStreams(Connection connection) throws Exception {
        JetStreamManagement jetStreamManagement = connection.jetStreamManagement();
        JetStream jetStream = connection.jetStream();
        System.out.println("Setting up data streams");
        ExampleUtils.createOrReplaceStream(jetStreamManagement, SOURCE_A_STORAGE_TYPE, SOURCE_A_STREAM, SOURCE_A_SUBJECT);
        ExampleUtils.createOrReplaceStream(jetStreamManagement, SOURCE_B_STORAGE_TYPE, SOURCE_B_STREAM, SOURCE_B_SUBJECTS);
        System.out.println("Publishing...");
        int i = 0;
        int[] iArr = new int[SOURCE_B_MESSAGE_COUNTS.length];
        ArrayList arrayList = new ArrayList();
        for (int i2 = 1; i2 <= SOURCES_MOST_MESSAGES_ANY_SUBJECT; i2++) {
            if (i < 100000) {
                i++;
                arrayList.add(jetStream.publishAsync(SOURCE_A_SUBJECT, Publisher.makePayload(SOURCE_A_SUBJECT, i2).getBytes()));
            }
            for (int i3 = 0; i3 < SOURCE_B_MESSAGE_COUNTS.length; i3++) {
                if (iArr[i3] < SOURCE_B_MESSAGE_COUNTS[i3]) {
                    String str = SOURCE_B_SUBJECTS[i3];
                    arrayList.add(jetStream.publishAsync(str, Publisher.makePayload(str, i2).getBytes()));
                    int i4 = i3;
                    iArr[i4] = iArr[i4] + 1;
                }
            }
            if (i2 % 1000 == 0) {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    ((CompletableFuture) it.next()).get(1000L, TimeUnit.MILLISECONDS);
                }
                arrayList.clear();
            }
            if (i2 % 10000 == 0) {
                printSetupCounts(i, iArr);
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((CompletableFuture) it2.next()).get(1000L, TimeUnit.MILLISECONDS);
        }
        printSetupCounts(i, iArr);
        System.out.println("Publishing complete.");
    }

    private static void printSetupCounts(int i, int[] iArr) {
        System.out.print("ua/" + i);
        for (int i2 = 0; i2 < SOURCE_B_MESSAGE_COUNTS.length; i2++) {
            System.out.print(", ");
            System.out.print(SOURCE_B_SUBJECTS[i2] + "/" + iArr[i2]);
        }
        System.out.println();
    }

    static {
        int i = 100000;
        int i2 = 100000;
        for (int i3 : SOURCE_B_MESSAGE_COUNTS) {
            i += i3;
            i2 = Math.max(i2, i3);
        }
        SOURCES_TOTAL_MESSAGES = i;
        SOURCES_MOST_MESSAGES_ANY_SUBJECT = i2;
    }
}
