package io.synadia.flink.examples;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.synadia.flink.examples.support.ExampleUtils;
import io.synadia.flink.examples.support.Publisher;
import io.synadia.flink.sink.NatsSink;
import io.synadia.flink.sink.NatsSinkBuilder;
import io.synadia.flink.source.NatsSource;
import io.synadia.flink.source.NatsSourceBuilder;
import io.synadia.flink.utils.Constants;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/* loaded from: input_file:io/synadia/flink/examples/CoreSubjectExample.class */
public class CoreSubjectExample {
    public static final String JOB_NAME = "cse";
    public static final String SOURCE_CONFIG_FILE_JSON = "src/examples/resources/core-source-config.json";
    public static final String SOURCE_CONFIG_FILE_YAML = "src/examples/resources/core-source-config.yaml";
    public static final String SINK_CONFIG_FILE_JSON = "src/examples/resources/core-sink-config.json";
    public static final String SINK_CONFIG_FILE_YAML = "src/examples/resources/core-sink-config.yaml";

    public static void main(String[] strArr) throws Exception {
        NatsSource build = new NatsSourceBuilder().connectionPropertiesFile(ExampleUtils.EXAMPLES_CONNECTION_PROPERTIES_FILE).sourceConverterClass(Constants.UTF8_STRING_SOURCE_CONVERTER_CLASSNAME).subjects("source1", "source2").build();
        ExampleUtils.writeToFile(SOURCE_CONFIG_FILE_JSON, build.toJson());
        ExampleUtils.writeToFile(SOURCE_CONFIG_FILE_YAML, build.toYaml());
        NatsSink build2 = new NatsSinkBuilder().connectionPropertiesFile(ExampleUtils.EXAMPLES_CONNECTION_PROPERTIES_FILE).sinkConverterClass(Constants.UTF8_STRING_SINK_CONVERTER_CLASSNAME).subjects("sink1", "sink2").build();
        ExampleUtils.writeToFile(SINK_CONFIG_FILE_JSON, build2.toJson());
        ExampleUtils.writeToFile(SINK_CONFIG_FILE_YAML, build2.toYaml());
        Connection connect = ExampleUtils.connect(ExampleUtils.EXAMPLES_CONNECTION_PROPERTIES_FILE);
        Publisher publisher = new Publisher(connect, build.getSubjects());
        new Thread(publisher).start();
        List<String> subjects = build2.getSubjects();
        System.out.println("Setting up core subscriptions to the following subjects: " + String.valueOf(subjects));
        Dispatcher createDispatcher = connect.createDispatcher(message -> {
            System.out.println("Sink listener received message on subject: '" + message.getSubject() + "' with data: '" + new String(message.getData()) + "'");
        });
        Iterator<String> it = subjects.iterator();
        while (it.hasNext()) {
            createDispatcher.subscribe(it.next());
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        executionEnvironment.fromSource(build, WatermarkStrategy.noWatermarks(), "NatsSource").sinkTo(build2);
        executionEnvironment.executeAsync(JOB_NAME);
        Thread.sleep(10000L);
        publisher.stop();
        executionEnvironment.close();
        System.exit(0);
    }
}
