package io.synadia.flink.examples;

import io.nats.client.Connection;
import io.nats.client.IterableConsumer;
import io.nats.client.Message;
import io.nats.client.api.OrderedConsumerConfiguration;
import io.synadia.flink.examples.support.ExampleUtils;
import io.synadia.flink.examples.support.Publisher;
import io.synadia.flink.sink.JetStreamSink;
import io.synadia.flink.sink.JetStreamSinkBuilder;
import io.synadia.flink.source.JetStreamSource;
import io.synadia.flink.source.JetStreamSourceBuilder;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/* loaded from: input_file:io/synadia/flink/examples/JetStreamExampleFromConfigFiles.class */
public class JetStreamExampleFromConfigFiles {
    public static final String JOB_NAME = "jsefcf";
    public static final int REPORT_FREQUENCY = 20000;
    public static final int QUIET_PERIOD = 3000;
    public static final boolean USE_JSON_NOT_YAML = true;
    public static final int PARALLELISM = 3;
    public static final int CHECKPOINTING_INTERVAL = 5000;

    public static void main(String[] strArr) throws Exception {
        long j;
        Connection connect = ExampleUtils.connect(ExampleUtils.EXAMPLES_CONNECTION_PROPERTIES_FILE);
        JetStreamExampleHelper.setupSinkStream(connect);
        JetStreamExampleHelper.setupDataStreams(connect);
        JetStreamSource build = new JetStreamSourceBuilder().connectionPropertiesFile(ExampleUtils.EXAMPLES_CONNECTION_PROPERTIES_FILE).jsonConfigFile(JetStreamExample.SOURCE_CONFIG_FILE_JSON).build();
        System.out.println("Source as configured via JSON\n" + build.toJson());
        JetStreamSink build2 = new JetStreamSinkBuilder().connectionPropertiesFile(ExampleUtils.EXAMPLES_CONNECTION_PROPERTIES_FILE).jsonConfigFile(JetStreamExample.SINK_CONFIG_FILE_JSON).build();
        System.out.println("Sink as configured via JSON\n" + build2.toJson());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executionEnvironment.enableCheckpointing(5000L);
        executionEnvironment.setParallelism(3);
        executionEnvironment.fromSource(build, WatermarkStrategy.noWatermarks(), JOB_NAME).sinkTo(build2);
        executionEnvironment.executeAsync(JOB_NAME);
        IterableConsumer iterate = connect.getStreamContext(JetStreamExampleHelper.SINK_STREAM_NAME).createOrderedConsumer(new OrderedConsumerConfiguration().filterSubjects(new String[]{JetStreamExampleHelper.SINK_SUBJECT})).iterate();
        try {
            long currentTimeMillis = System.currentTimeMillis() + 5000;
            int i = 0;
            HashMap hashMap = new HashMap();
            do {
                Message nextMessage = iterate.nextMessage(1000L);
                if (nextMessage == null) {
                    j = System.currentTimeMillis() - currentTimeMillis;
                } else {
                    ((AtomicInteger) hashMap.computeIfAbsent(Publisher.extractSubject(new String(nextMessage.getData())), str -> {
                        return new AtomicInteger();
                    })).incrementAndGet();
                    currentTimeMillis = System.currentTimeMillis();
                    j = 0;
                    i++;
                    if (i % 20000 == 0) {
                        ExampleUtils.reportSinkListener(hashMap, i);
                    }
                }
                if (i >= JetStreamExampleHelper.SOURCES_TOTAL_MESSAGES) {
                    break;
                }
            } while (j < 3000);
            ExampleUtils.reportSinkListener(hashMap, i);
            if (iterate != null) {
                iterate.close();
            }
            System.exit(0);
        } catch (Throwable th) {
            if (iterate != null) {
                try {
                    iterate.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
