package io.vertx.ext.stomp.impl;

import com.jayway.awaitility.Awaitility;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.stomp.Destination;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.StompClient;
import io.vertx.ext.stomp.StompClientConnection;
import io.vertx.ext.stomp.StompServer;
import io.vertx.ext.stomp.StompServerHandler;
import io.vertx.ext.stomp.utils.Headers;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/ext/stomp/impl/AckTest.class */
public class AckTest {
    private Vertx vertx;
    private StompServer server;
    private List<StompClient> clients = new ArrayList();
    private List<Frame> acked = new CopyOnWriteArrayList();
    private List<Frame> nacked = new CopyOnWriteArrayList();

    @Before
    public void setUp(TestContext testContext) throws InterruptedException {
        AsyncLock asyncLock = new AsyncLock();
        this.vertx = Vertx.vertx();
        this.server = StompServer.create(this.vertx).handler(StompServerHandler.create(this.vertx).destinationFactory(new QueueManagingAcknowledgmentsFactory()).onAckHandler(acknowledgement -> {
            this.acked.addAll(acknowledgement.frames());
        }).onNackHandler(acknowledgement2 -> {
            this.nacked.addAll(acknowledgement2.frames());
        })).listen(asyncLock.handler());
        asyncLock.waitForSuccess();
    }

    @After
    public void tearDown(TestContext testContext) {
        this.clients.forEach((v0) -> {
            v0.close();
        });
        this.clients.clear();
        AsyncLock asyncLock = new AsyncLock();
        this.server.close(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        this.vertx.close(asyncLock2.handler());
        asyncLock2.waitForSuccess();
    }

    @Test
    public void testSimpleAck() {
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", Headers.create(new String[]{"ack", "client"}), frame -> {
                stompClientConnection.ack(frame.getAck());
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            ((StompClientConnection) asyncResult2.result()).send("/queue", Buffer.buffer("Hello"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!this.acked.isEmpty());
        });
    }

    @Test
    public void testSimpleNack() {
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", Headers.create(new String[]{"ack", "client"}), frame -> {
                stompClientConnection.nack(frame.getAck());
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            ((StompClientConnection) asyncResult2.result()).send("/queue", Buffer.buffer("Hello"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!this.nacked.isEmpty());
        });
        Assertions.assertThat(this.acked).isEmpty();
    }

    @Test
    public void testCumulativeAck() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", Headers.create(new String[]{"ack", "client"}), frame -> {
                copyOnWriteArrayList.add(frame);
                if (copyOnWriteArrayList.size() == 3) {
                    stompClientConnection.ack(frame.getAck());
                }
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
            stompClientConnection.send("/queue", Buffer.buffer("World"));
            stompClientConnection.send("/queue", Buffer.buffer("!!!"));
            stompClientConnection.send("/queue", Buffer.buffer("not acknowledged"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.acked.size() == 3);
        });
    }

    @Test
    public void testCumulativeNack() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", Headers.create(new String[]{"ack", "client"}), frame -> {
                copyOnWriteArrayList.add(frame);
                if (copyOnWriteArrayList.size() == 3) {
                    stompClientConnection.nack(frame.getAck());
                }
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
            stompClientConnection.send("/queue", Buffer.buffer("World"));
            stompClientConnection.send("/queue", Buffer.buffer("!!!"));
            stompClientConnection.send("/queue", Buffer.buffer("not acknowledged"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.nacked.size() == 3);
        });
    }

    @Test
    public void testIndividualAck() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", Headers.create(new String[]{"ack", "client-individual"}), frame -> {
                copyOnWriteArrayList.add(frame);
                if (copyOnWriteArrayList.size() == 3) {
                    Iterator it = copyOnWriteArrayList.iterator();
                    while (it.hasNext()) {
                        stompClientConnection.ack(((Frame) it.next()).getAck());
                    }
                }
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
            stompClientConnection.send("/queue", Buffer.buffer("World"));
            stompClientConnection.send("/queue", Buffer.buffer("!!!"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.acked.size() == 3);
        });
    }

    @Test
    public void testIndividualNack() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", Headers.create(new String[]{"ack", "client-individual"}), frame -> {
                copyOnWriteArrayList.add(frame);
                if (copyOnWriteArrayList.size() == 3) {
                    Iterator it = copyOnWriteArrayList.iterator();
                    while (it.hasNext()) {
                        stompClientConnection.nack(((Frame) it.next()).getAck());
                    }
                }
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
            stompClientConnection.send("/queue", Buffer.buffer("World"));
            stompClientConnection.send("/queue", Buffer.buffer("!!!"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.nacked.size() == 3);
        });
    }

    @Test
    public void testAckInTransaction() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", Headers.create(new String[]{"ack", "client"}), frame -> {
                if (copyOnWriteArrayList.isEmpty()) {
                    stompClientConnection.beginTX("my-tx");
                }
                copyOnWriteArrayList.add(frame);
                stompClientConnection.ack(frame.getAck(), "my-tx");
                if (copyOnWriteArrayList.size() == 3) {
                    stompClientConnection.commit("my-tx");
                }
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            ((StompClientConnection) asyncResult2.result()).send("/queue", Buffer.buffer("Hello"));
        }));
        Assertions.assertThat(this.acked.isEmpty());
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult3 -> {
            ((StompClientConnection) asyncResult3.result()).send("/queue", Buffer.buffer("Hello"));
        }));
        Assertions.assertThat(this.acked.isEmpty());
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult4 -> {
            ((StompClientConnection) asyncResult4.result()).send("/queue", Buffer.buffer("Hello"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.acked.size() == 3);
        });
    }

    @Test
    public void testNackInTransaction() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", Headers.create(new String[]{"ack", "client"}), frame -> {
                if (copyOnWriteArrayList.isEmpty()) {
                    stompClientConnection.beginTX("my-tx");
                }
                copyOnWriteArrayList.add(frame);
                stompClientConnection.nack(frame.getAck(), "my-tx");
                if (copyOnWriteArrayList.size() == 3) {
                    stompClientConnection.commit("my-tx");
                }
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            ((StompClientConnection) asyncResult2.result()).send("/queue", Buffer.buffer("Hello"));
        }));
        Assertions.assertThat(this.nacked.isEmpty());
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult3 -> {
            ((StompClientConnection) asyncResult3.result()).send("/queue", Buffer.buffer("Hello"));
        }));
        Assertions.assertThat(this.nacked.isEmpty());
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult4 -> {
            ((StompClientConnection) asyncResult4.result()).send("/queue", Buffer.buffer("Hello"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.nacked.size() == 3);
        });
    }

    @Test
    public void testUnknownMessageInAck(TestContext testContext) {
        Async async = testContext.async();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.errorHandler(frame -> {
                testContext.fail("unexpected error");
            });
            stompClientConnection.ack("unknown", asyncResult -> {
                async.complete();
            });
        }));
        Async async2 = testContext.async();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            stompClientConnection.errorHandler(frame -> {
                testContext.fail("unexpected error");
            });
            stompClientConnection.nack("unknown", asyncResult2 -> {
                async2.complete();
            });
        }));
    }

    @Test
    public void testWrongTransactionIdInAckAndNack(TestContext testContext) {
        Async async = testContext.async();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.errorHandler(frame -> {
                async.complete();
            });
            stompClientConnection.ack("id", "unknown", asyncResult -> {
                testContext.assertTrue(asyncResult.failed(), "unexpected receipt");
            });
        }));
        Async async2 = testContext.async();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            stompClientConnection.errorHandler(frame -> {
                async2.complete();
            });
            stompClientConnection.ack("id", "unknown", asyncResult2 -> {
                testContext.assertTrue(asyncResult2.failed(), "unexpected receipt");
            });
        }));
    }

    @Test
    public void testSubscriptionAndTwoReceptionsWithNackInClientMode() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", Headers.create(new String[]{"ack", "client"}), frame -> {
                copyOnWriteArrayList.add(frame);
                if (copyOnWriteArrayList.size() == 2) {
                    stompClientConnection.nack(frame.getAck());
                }
            });
        }));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            Headers create = Headers.create(new String[]{"ack", "client"});
            Objects.requireNonNull(copyOnWriteArrayList2);
            stompClientConnection.subscribe("/queue", create, (v1) -> {
                r3.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            Destination destination = this.server.stompHandler().getDestination("/queue");
            return Boolean.valueOf(destination != null && destination.numberOfSubscriptions() == 2);
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult3 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult3.result();
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
            stompClientConnection.send("/queue", Buffer.buffer("vert.x"));
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
            stompClientConnection.send("/queue", Buffer.buffer("vert.x"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList2.size() == 4);
        });
    }

    @Test
    public void testSubscriptionAndTwoReceptionsWithNack() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", Headers.create(new String[]{"ack", "client-individual"}), frame -> {
                stompClientConnection.nack(frame.getAck());
            });
        }));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            Headers create = Headers.create(new String[]{"ack", "client-individual"});
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.subscribe("/queue", create, (v1) -> {
                r3.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            Destination destination = this.server.stompHandler().getDestination("/queue");
            return Boolean.valueOf(destination != null && destination.numberOfSubscriptions() == 2);
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult3 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult3.result();
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
            stompClientConnection.send("/queue", Buffer.buffer("vert.x"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 2);
        });
    }
}
