package io.vertx.proton.streams.example;

import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.streams.ProtonStreams;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/vertx/proton/streams/example/Receiver.class */
public class Receiver {
    private static final int COUNT = 100;
    private static final int REQ_WINDOW = 10;

    public static void main(String[] strArr) throws Exception {
        try {
            Vertx vertx = Vertx.vertx();
            ProtonClient.create(vertx).connect("localhost", 5672, asyncResult -> {
                final Context currentContext = Vertx.currentContext();
                if (!asyncResult.succeeded()) {
                    System.out.println("Failed to connect, exiting: " + asyncResult.cause());
                    System.exit(1);
                } else {
                    System.out.println("We're connected");
                    final ProtonConnection protonConnection = (ProtonConnection) asyncResult.result();
                    protonConnection.open();
                    ProtonStreams.createConsumer(protonConnection, "queue").subscribe(new Subscriber<Message>() { // from class: io.vertx.proton.streams.example.Receiver.1
                        AtomicInteger receievedCount = new AtomicInteger();
                        Subscription subscription;

                        public void onSubscribe(Subscription subscription) {
                            this.subscription = subscription;
                            this.subscription.request(10L);
                        }

                        public void onNext(Message message) {
                            int incrementAndGet = this.receievedCount.incrementAndGet();
                            System.out.println("Received message: " + ((String) message.getBody().getValue()));
                            if (incrementAndGet % Receiver.REQ_WINDOW == 0 && incrementAndGet < Receiver.COUNT) {
                                this.subscription.request(10L);
                            } else if (incrementAndGet == Receiver.COUNT) {
                                this.subscription.cancel();
                                closeConnection(vertx, protonConnection);
                            }
                        }

                        public void onError(Throwable th) {
                            System.out.println("onError(): " + th);
                            closeConnection(vertx, protonConnection);
                        }

                        public void onComplete() {
                            System.out.println("onComplete()");
                            closeConnection(vertx, protonConnection);
                        }

                        private void closeConnection(Vertx vertx2, ProtonConnection protonConnection2) {
                            currentContext.runOnContext(r6 -> {
                                System.out.println("Subscriber finished, closing connection.");
                                protonConnection2.closeHandler(asyncResult -> {
                                    System.out.println("Connection closed.");
                                    protonConnection2.disconnect();
                                    vertx2.close();
                                }).close();
                            });
                        }
                    });
                }
            });
        } catch (Exception e) {
            System.out.println("Caught exception, exiting.");
            e.printStackTrace(System.out);
            System.exit(1);
        }
    }
}
