package io.synadia.flink.examples;

import io.nats.client.Connection;
import io.nats.client.IterableConsumer;
import io.nats.client.Message;
import io.nats.client.api.OrderedConsumerConfiguration;
import io.synadia.flink.examples.support.ExampleUtils;
import io.synadia.flink.examples.support.Publisher;
import io.synadia.flink.message.Utf8StringSinkConverter;
import io.synadia.flink.message.Utf8StringSourceConverter;
import io.synadia.flink.sink.JetStreamSink;
import io.synadia.flink.sink.JetStreamSinkBuilder;
import io.synadia.flink.source.JetStreamSource;
import io.synadia.flink.source.JetStreamSourceBuilder;
import io.synadia.flink.source.JetStreamSubjectConfiguration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/* loaded from: input_file:io/synadia/flink/examples/JetStreamExample.class */
public class JetStreamExample {
    public static final String JOB_NAME = "jse";
    public static final int REPORT_FREQUENCY = 20000;
    public static final int QUIET_PERIOD = 3000;
    public static final String SOURCE_CONFIG_FILE_JSON = "src/examples/resources/js-source-config.json";
    public static final String SOURCE_CONFIG_FILE_YAML = "src/examples/resources/js-source-config.yaml";
    public static final String SINK_CONFIG_FILE_JSON = "src/examples/resources/js-sink-config.json";
    public static final String SINK_CONFIG_FILE_YAML = "src/examples/resources/js-sink-config.yaml";
    public static final boolean ACK_MODE = false;
    public static final int MAX_MESSAGES_TO_READ = -1;
    public static final int PARALLELISM = 3;
    public static final int CHECKPOINTING_INTERVAL = 5000;

    public static void main(String[] strArr) throws Exception {
        long j;
        Connection connect = ExampleUtils.connect(ExampleUtils.EXAMPLES_CONNECTION_PROPERTIES_FILE);
        JetStreamExampleHelper.setupSinkStream(connect);
        JetStreamExampleHelper.setupDataStreams(connect);
        JetStreamSubjectConfiguration build = JetStreamSubjectConfiguration.builder().streamName(JetStreamExampleHelper.SOURCE_A_STREAM).subject(JetStreamExampleHelper.SOURCE_A_SUBJECT).maxMessagesToRead(-1L).ackMode(false).build();
        System.out.println("JetStreamSubjectConfiguration" + build.toJson());
        ArrayList arrayList = new ArrayList();
        arrayList.add(JetStreamSubjectConfiguration.builder().streamName(JetStreamExampleHelper.SOURCE_B_STREAM).subject(JetStreamExampleHelper.SOURCE_B_SUBJECTS[0]).maxMessagesToRead(-1L).ackMode(false).build());
        for (int i = 1; i < JetStreamExampleHelper.SOURCE_B_SUBJECTS.length; i++) {
            arrayList.add(JetStreamSubjectConfiguration.builder().copy((JetStreamSubjectConfiguration) arrayList.get(0)).subject(JetStreamExampleHelper.SOURCE_B_SUBJECTS[i]).build());
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            System.out.println("JetStreamSubjectConfiguration" + ((JetStreamSubjectConfiguration) it.next()).toJson());
        }
        JetStreamSource build2 = new JetStreamSourceBuilder().connectionPropertiesFile(ExampleUtils.EXAMPLES_CONNECTION_PROPERTIES_FILE).sourceConverter(new Utf8StringSourceConverter()).addSubjectConfigurations(build).addSubjectConfigurations(arrayList).build();
        ExampleUtils.writeToFile(SOURCE_CONFIG_FILE_JSON, build2.toJson());
        ExampleUtils.writeToFile(SOURCE_CONFIG_FILE_YAML, build2.toYaml());
        JetStreamSink build3 = new JetStreamSinkBuilder().connectionPropertiesFile(ExampleUtils.EXAMPLES_CONNECTION_PROPERTIES_FILE).sinkConverter(new Utf8StringSinkConverter()).subjects(JetStreamExampleHelper.SINK_SUBJECT).build();
        ExampleUtils.writeToFile(SINK_CONFIG_FILE_JSON, build3.toJson());
        ExampleUtils.writeToFile(SINK_CONFIG_FILE_YAML, build3.toYaml());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executionEnvironment.enableCheckpointing(5000L);
        executionEnvironment.setParallelism(3);
        executionEnvironment.fromSource(build2, WatermarkStrategy.noWatermarks(), JOB_NAME).sinkTo(build3);
        executionEnvironment.executeAsync(JOB_NAME);
        IterableConsumer iterate = connect.getStreamContext(JetStreamExampleHelper.SINK_STREAM_NAME).createOrderedConsumer(new OrderedConsumerConfiguration().filterSubjects(new String[]{JetStreamExampleHelper.SINK_SUBJECT})).iterate();
        try {
            long currentTimeMillis = System.currentTimeMillis() + 5000;
            int i2 = 0;
            HashMap hashMap = new HashMap();
            do {
                Message nextMessage = iterate.nextMessage(1000L);
                if (nextMessage == null) {
                    j = System.currentTimeMillis() - currentTimeMillis;
                } else {
                    ((AtomicInteger) hashMap.computeIfAbsent(Publisher.extractSubject(new String(nextMessage.getData())), str -> {
                        return new AtomicInteger();
                    })).incrementAndGet();
                    currentTimeMillis = System.currentTimeMillis();
                    j = 0;
                    i2++;
                    if (i2 % 20000 == 0) {
                        ExampleUtils.reportSinkListener(hashMap, i2);
                    }
                }
                if (i2 >= JetStreamExampleHelper.SOURCES_TOTAL_MESSAGES) {
                    break;
                }
            } while (j < 3000);
            ExampleUtils.reportSinkListener(hashMap, i2);
            if (iterate != null) {
                iterate.close();
            }
            System.exit(0);
        } catch (Throwable th) {
            if (iterate != null) {
                try {
                    iterate.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
