package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.StreamDoesNotExistException;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.StreamNotAvailableException;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.Tuples;
import com.rabbitmq.stream.impl.Utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/rabbitmq/stream/impl/ProducersCoordinator.class */
public final class ProducersCoordinator implements AutoCloseable {
    static final int MAX_PRODUCERS_PER_CLIENT = 256;
    static final int MAX_TRACKING_CONSUMERS_PER_CLIENT = 50;
    private static final boolean DEBUG = false;
    private final StreamEnvironment environment;
    private final Utils.ClientFactory clientFactory;
    private final int maxProducersByClient;
    private final int maxTrackingConsumersByClient;
    private final Function<Utils.ClientConnectionType, String> connectionNamingStrategy;
    private final AtomicLong managerIdSequence = new AtomicLong(0);
    private final NavigableSet<ClientProducersManager> managers = new ConcurrentSkipListSet();
    private final AtomicLong trackerIdSequence = new AtomicLong(0);
    private final List<ProducerTracker> producerTrackers = new CopyOnWriteArrayList();
    private final ExecutorServiceFactory executorServiceFactory = new DefaultExecutorServiceFactory(Utils.AVAILABLE_PROCESSORS, 10, "rabbitmq-stream-producer-connection-");
    private final Lock coordinatorLock = new ReentrantLock();
    private final boolean forceLeader;
    private static final Logger LOGGER = LoggerFactory.getLogger(ProducersCoordinator.class);
    private static final Predicate<Exception> RETRY_ON_TIMEOUT = exc -> {
        return exc instanceof TimeoutStreamException;
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/rabbitmq/stream/impl/ProducersCoordinator$AgentTracker.class */
    public interface AgentTracker {
        void assign(byte b, Client client, ClientProducersManager clientProducersManager);

        boolean identifiable();

        byte id();

        void unavailable();

        void running();

        void cancel();

        void closeAfterStreamDeletion(short s);

        String stream();

        String reference();

        boolean isOpen();

        long uniqueId();

        String type();

        boolean markRecoveryInProgress();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/rabbitmq/stream/impl/ProducersCoordinator$ClientClosedException.class */
    public static class ClientClosedException extends StreamException {
        public ClientClosedException() {
            super("Client already closed");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/rabbitmq/stream/impl/ProducersCoordinator$ClientProducersManager.class */
    public class ClientProducersManager implements Comparable<ClientProducersManager> {
        private final long id;
        private final String name;
        private final Client.Broker node;
        private final ConcurrentMap<Byte, ProducerTracker> producers;
        private final Set<AgentTracker> trackingConsumerTrackers;
        private final Client client;
        private final AtomicInteger producerIndexSequence = new AtomicInteger(ProducersCoordinator.DEBUG);
        private final Map<String, Set<AgentTracker>> streamToTrackers = new ConcurrentHashMap();
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final Lock managerLock = new ReentrantLock();

        private ClientProducersManager(Client.Broker broker, List<Utils.BrokerWrapper> list, Utils.ClientFactory clientFactory, Client.ClientParameters clientParameters) {
            this.producers = new ConcurrentHashMap(ProducersCoordinator.this.maxProducersByClient);
            this.trackingConsumerTrackers = ConcurrentHashMap.newKeySet(ProducersCoordinator.this.maxTrackingConsumersByClient);
            this.id = ProducersCoordinator.this.managerIdSequence.getAndIncrement();
            AtomicReference atomicReference = new AtomicReference();
            AtomicReference atomicReference2 = new AtomicReference();
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            Client.PublishConfirmListener publishConfirmListener = (b, j) -> {
                ProducerTracker producerTracker = this.producers.get(Byte.valueOf(b));
                if (producerTracker == null) {
                    ProducersCoordinator.LOGGER.info("Received publish confirm for unknown producer: {}", Byte.valueOf(b));
                } else {
                    producerTracker.producer.confirm(j);
                }
            };
            Client.PublishErrorListener publishErrorListener = (b2, j2, s) -> {
                ProducerTracker producerTracker = this.producers.get(Byte.valueOf(b2));
                if (producerTracker == null) {
                    ProducersCoordinator.LOGGER.info("Received publish error for unknown producer: {}, error code {}", Byte.valueOf(b2), Utils.formatConstant(s));
                } else {
                    producerTracker.producer.error(j2, s);
                }
            };
            Client.ShutdownListener shutdownListener = shutdownContext -> {
                if (atomicBoolean.get()) {
                    this.closed.set(true);
                    ProducersCoordinator.this.managers.remove(this);
                }
                if (shutdownContext.isShutdownUnexpected()) {
                    ProducersCoordinator.LOGGER.debug("Recovering {} producer(s) and {} tracking consumer(s) after unexpected connection termination", Integer.valueOf(this.producers.size()), Integer.valueOf(this.trackingConsumerTrackers.size()));
                    this.producers.forEach((b3, producerTracker) -> {
                        producerTracker.unavailable();
                    });
                    this.trackingConsumerTrackers.forEach((v0) -> {
                        v0.unavailable();
                    });
                    ProducersCoordinator.this.environment.scheduledExecutorService().execute(Utils.namedRunnable(() -> {
                        if (Thread.currentThread().isInterrupted()) {
                            return;
                        }
                        this.streamToTrackers.forEach((str, set) -> {
                            if (Thread.currentThread().isInterrupted()) {
                                return;
                            }
                            assignProducersToNewManagers(set, str, ProducersCoordinator.this.environment.recoveryBackOffDelayPolicy());
                        });
                    }, "Producer recovery after disconnection from %s", atomicReference.get()));
                }
            };
            Client.MetadataListener metadataListener = (str, s2) -> {
                ProducersCoordinator.LOGGER.debug("Received metadata notification for '{}', stream is likely to have become unavailable", str);
                this.managerLock.lock();
                try {
                    Set<AgentTracker> remove = this.streamToTrackers.remove(str);
                    ProducersCoordinator.LOGGER.debug("Affected publishers and consumer trackers after metadata update: {}", Integer.valueOf(remove == null ? ProducersCoordinator.DEBUG : remove.size()));
                    if (remove != null && !remove.isEmpty()) {
                        remove.forEach(agentTracker -> {
                            agentTracker.unavailable();
                            if (agentTracker.identifiable()) {
                                this.producers.remove(Byte.valueOf(agentTracker.id()));
                            } else {
                                this.trackingConsumerTrackers.remove(agentTracker);
                            }
                        });
                    }
                    if (remove == null || remove.isEmpty()) {
                        return;
                    }
                    ProducersCoordinator.this.environment.scheduledExecutorService().execute(Utils.namedRunnable(() -> {
                        if (Thread.currentThread().isInterrupted()) {
                            return;
                        }
                        closeIfEmpty();
                        assignProducersToNewManagers(remove, str, ProducersCoordinator.this.environment.topologyUpdateBackOffDelayPolicy());
                    }, "Producer re-assignment after metadata update on stream '%s'", str));
                } finally {
                    this.managerLock.unlock();
                }
            };
            String apply = ProducersCoordinator.this.connectionNamingStrategy.apply(Utils.ClientConnectionType.PRODUCER);
            this.client = clientFactory.client(new Utils.ClientFactoryContext(clientParameters.publishConfirmListener(publishConfirmListener).publishErrorListener(publishErrorListener).shutdownListener(shutdownListener).metadataListener(metadataListener).clientProperty("connection_name", apply), Utils.keyForNode(broker), (List) list.stream().map((v0) -> {
                return v0.broker();
            }).collect(Collectors.toList())));
            this.node = Utils.brokerFromClient(this.client);
            this.name = Utils.keyForNode(this.node);
            atomicReference.set(this.name);
            ProducersCoordinator.LOGGER.debug("Created producer connection '{}'", apply);
            atomicBoolean.set(true);
            atomicReference2.set(this.client);
        }

        private void assignProducersToNewManagers(Collection<AgentTracker> collection, String str, BackOffDelayPolicy backOffDelayPolicy) {
            AsyncRetry.asyncRetry(() -> {
                List<Utils.BrokerWrapper> findCandidateNodes = ProducersCoordinator.this.findCandidateNodes(str, ProducersCoordinator.this.forceLeader);
                return Tuples.pair(ProducersCoordinator.pickBroker(findCandidateNodes), findCandidateNodes);
            }).description("Candidate lookup to publish to " + str, new Object[ProducersCoordinator.DEBUG]).scheduler(ProducersCoordinator.this.environment.scheduledExecutorService()).retry(exc -> {
                return !(exc instanceof StreamDoesNotExistException);
            }).delayPolicy(backOffDelayPolicy).build().thenAccept(pair -> {
                Client.Broker broker = (Client.Broker) pair.v1();
                List list = (List) pair.v2();
                ProducersCoordinator.LOGGER.debug("Assigning {} producer(s) and consumer tracker(s) to {} (stream '{}')", new Object[]{Integer.valueOf(collection.size()), Utils.keyForNode(broker), str});
                collection.forEach(agentTracker -> {
                    maybeRecoverAgent(broker, list, agentTracker);
                });
            }).exceptionally(th -> {
                ProducersCoordinator.LOGGER.info("Error while re-assigning producers and consumer trackers, closing them: {}", th.getMessage());
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    try {
                        ((AgentTracker) it.next()).closeAfterStreamDeletion(((th instanceof StreamDoesNotExistException) || (th.getCause() instanceof StreamDoesNotExistException)) ? (short) 2 : (short) 6);
                    } catch (Exception e) {
                        ProducersCoordinator.LOGGER.debug("Error while closing producer: {}", e.getMessage());
                    }
                }
                return null;
            });
        }

        private void maybeRecoverAgent(Client.Broker broker, List<Utils.BrokerWrapper> list, AgentTracker agentTracker) {
            if (!agentTracker.markRecoveryInProgress()) {
                ProducersCoordinator.LOGGER.debug("Not recovering {} (stream '{}'), recovery is already is progress", agentTracker.type(), agentTracker.stream());
                return;
            }
            try {
                recoverAgent(broker, list, agentTracker);
            } catch (Exception e) {
                ProducersCoordinator.LOGGER.warn("Error while recovering {} tracker {} (stream '{}'). Reason: {}", new Object[]{agentTracker.type(), Long.valueOf(agentTracker.uniqueId()), agentTracker.stream(), Utils.exceptionMessage(e)});
            }
        }

        private void recoverAgent(Client.Broker broker, List<Utils.BrokerWrapper> list, AgentTracker agentTracker) {
            boolean z = ProducersCoordinator.DEBUG;
            while (!z) {
                try {
                    if (agentTracker.isOpen()) {
                        ProducersCoordinator.LOGGER.debug("Using {} to resume {} to {}", new Object[]{broker.label(), agentTracker.type(), agentTracker.stream()});
                        ProducersCoordinator.this.addToManager(broker, list, agentTracker);
                        agentTracker.running();
                    } else {
                        ProducersCoordinator.LOGGER.debug("Not recovering {} (stream '{}') because it has been closed", agentTracker.type(), agentTracker.stream());
                    }
                    z = true;
                } catch (StreamNotAvailableException | ConnectionStreamException | ClientClosedException e) {
                    Logger logger = ProducersCoordinator.LOGGER;
                    Object[] objArr = new Object[3];
                    objArr[ProducersCoordinator.DEBUG] = agentTracker.type();
                    objArr[1] = agentTracker.identifiable() ? Byte.valueOf(agentTracker.id()) : "N/A";
                    objArr[2] = agentTracker.stream();
                    logger.debug("{} re-assignment on stream {} (ID {}) timed out or connection closed or stream not available, refreshing candidate leader and retrying", objArr);
                    Tuples.Pair pair = (Tuples.Pair) Utils.callAndMaybeRetry(() -> {
                        List<Utils.BrokerWrapper> findCandidateNodes = ProducersCoordinator.this.findCandidateNodes(agentTracker.stream(), ProducersCoordinator.this.forceLeader);
                        return Tuples.pair(ProducersCoordinator.pickBroker(findCandidateNodes), findCandidateNodes);
                    }, exc -> {
                        return !(exc instanceof StreamDoesNotExistException);
                    }, ProducersCoordinator.this.environment.recoveryBackOffDelayPolicy(), "Candidate lookup for %s on stream '%s'", agentTracker.type(), agentTracker.stream());
                    broker = (Client.Broker) pair.v1();
                    list = (List) pair.v2();
                } catch (Exception e2) {
                    ProducersCoordinator.LOGGER.warn("Error while re-assigning {} (stream '{}')", new Object[]{agentTracker.type(), agentTracker.stream(), e2});
                    z = true;
                }
            }
        }

        private void register(AgentTracker agentTracker) {
            Utils.lock(this.managerLock, () -> {
                if (isFullFor(agentTracker)) {
                    throw new IllegalStateException("Cannot add subscription tracker, the manager is full");
                }
                if (isClosed()) {
                    throw new IllegalStateException("Cannot add subscription tracker, the manager is closed");
                }
                checkNotClosed();
                if (agentTracker.identifiable()) {
                    ProducerTracker producerTracker = (ProducerTracker) agentTracker;
                    int pickSlot = ProducersCoordinator.pickSlot(this.producers, producerTracker, this.producerIndexSequence);
                    checkNotClosed();
                    Client.Response response = (Client.Response) Utils.callAndMaybeRetry(() -> {
                        return this.client.declarePublisher((byte) pickSlot, agentTracker.reference(), agentTracker.stream());
                    }, ProducersCoordinator.RETRY_ON_TIMEOUT, "Declare publisher request for publisher %d on stream '%s'", Long.valueOf(producerTracker.uniqueId()), producerTracker.stream());
                    if (!response.isOk()) {
                        String str = "Error while declaring publisher: " + Utils.formatConstant(response.getResponseCode()) + ". Could not assign producer to client.";
                        ProducersCoordinator.LOGGER.info(str);
                        throw new StreamException(str, response.getResponseCode());
                    }
                    agentTracker.assign((byte) pickSlot, this.client, this);
                    this.producers.put(Byte.valueOf(agentTracker.id()), producerTracker);
                } else {
                    agentTracker.assign((byte) 0, this.client, this);
                    this.trackingConsumerTrackers.add(agentTracker);
                }
                this.streamToTrackers.computeIfAbsent(agentTracker.stream(), str2 -> {
                    return ConcurrentHashMap.newKeySet();
                }).add(agentTracker);
            });
        }

        private void unregister(AgentTracker agentTracker) {
            Utils.lock(this.managerLock, () -> {
                ProducersCoordinator.LOGGER.debug("Unregistering {} {} from manager on {}", new Object[]{agentTracker.type(), Long.valueOf(agentTracker.uniqueId()), this.name});
                if (agentTracker.identifiable()) {
                    this.producers.remove(Byte.valueOf(agentTracker.id()));
                } else {
                    this.trackingConsumerTrackers.remove(agentTracker);
                }
                this.streamToTrackers.compute(agentTracker.stream(), (str, set) -> {
                    if (str == null || set == null) {
                        return null;
                    }
                    set.remove(agentTracker);
                    if (set.isEmpty()) {
                        return null;
                    }
                    return set;
                });
                closeIfEmpty();
            });
        }

        boolean isFullFor(AgentTracker agentTracker) {
            return ((Boolean) Utils.lock(this.managerLock, () -> {
                if (agentTracker.identifiable()) {
                    return Boolean.valueOf(this.producers.size() == ProducersCoordinator.this.maxProducersByClient);
                }
                return Boolean.valueOf(this.trackingConsumerTrackers.size() == ProducersCoordinator.this.maxTrackingConsumersByClient);
            })).booleanValue();
        }

        boolean isEmpty() {
            return ((Boolean) Utils.lock(this.managerLock, () -> {
                return Boolean.valueOf(this.producers.isEmpty() && this.trackingConsumerTrackers.isEmpty());
            })).booleanValue();
        }

        private void checkNotClosed() {
            if (!this.client.isOpen()) {
                throw new ClientClosedException();
            }
        }

        boolean isClosed() {
            if (!this.client.isOpen()) {
                close();
            }
            return this.closed.get();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void closeIfEmpty() {
            if (this.closed.get()) {
                return;
            }
            Utils.lock(this.managerLock, () -> {
                if (isEmpty()) {
                    close();
                } else {
                    ProducersCoordinator.LOGGER.debug("Not closing producer manager {} because it is not empty", Long.valueOf(this.id));
                }
            });
        }

        private void close() {
            if (this.closed.compareAndSet(false, true)) {
                ProducersCoordinator.this.managers.remove(this);
                try {
                    if (this.client.isOpen()) {
                        this.client.close();
                    }
                } catch (Exception e) {
                    ProducersCoordinator.LOGGER.debug("Error while closing client producer connection: ", e.getMessage());
                }
            }
        }

        @Override // java.lang.Comparable
        public int compareTo(ClientProducersManager clientProducersManager) {
            return Long.compare(this.id, clientProducersManager.id);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.id == ((ClientProducersManager) obj).id;
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.id));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/rabbitmq/stream/impl/ProducersCoordinator$ProducerTracker.class */
    public static class ProducerTracker implements AgentTracker {
        private final long uniqueId;
        private final String reference;
        private final String stream;
        private final StreamProducer producer;
        private volatile byte publisherId;
        private volatile ClientProducersManager clientProducersManager;
        private final AtomicBoolean recovering = new AtomicBoolean(false);
        private final Lock trackerLock = new ReentrantLock();

        private ProducerTracker(long j, String str, String str2, StreamProducer streamProducer) {
            this.uniqueId = j;
            this.reference = str;
            this.stream = str2;
            this.producer = streamProducer;
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public void assign(byte b, Client client, ClientProducersManager clientProducersManager) {
            Utils.lock(this.trackerLock, () -> {
                this.publisherId = b;
                this.clientProducersManager = clientProducersManager;
            });
            this.producer.setPublisherId(b);
            this.producer.setClient(client);
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public boolean identifiable() {
            return true;
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public byte id() {
            return this.publisherId;
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public String reference() {
            return this.reference;
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public String stream() {
            return this.stream;
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public void unavailable() {
            Utils.lock(this.trackerLock, () -> {
                this.clientProducersManager = null;
                return null;
            });
            this.producer.unavailable();
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public void running() {
            this.producer.running();
            this.recovering.set(false);
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public void cancel() {
            Utils.lock(this.trackerLock, () -> {
                if (this.clientProducersManager != null) {
                    this.clientProducersManager.unregister(this);
                }
            });
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public void closeAfterStreamDeletion(short s) {
            this.producer.closeAfterStreamDeletion(s);
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public boolean isOpen() {
            return this.producer.isOpen();
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public long uniqueId() {
            return this.uniqueId;
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public String type() {
            return "producer";
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public boolean markRecoveryInProgress() {
            return this.recovering.compareAndSet(false, true);
        }
    }

    /* loaded from: input_file:com/rabbitmq/stream/impl/ProducersCoordinator$TrackingConsumerTracker.class */
    private static class TrackingConsumerTracker implements AgentTracker {
        private final long uniqueId;
        private final String stream;
        private final StreamConsumer consumer;
        private volatile ClientProducersManager clientProducersManager;
        private final AtomicBoolean recovering = new AtomicBoolean(false);
        private final Lock trackerLock = new ReentrantLock();

        private TrackingConsumerTracker(long j, String str, StreamConsumer streamConsumer) {
            this.uniqueId = j;
            this.stream = str;
            this.consumer = streamConsumer;
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public void assign(byte b, Client client, ClientProducersManager clientProducersManager) {
            Utils.lock(this.trackerLock, () -> {
                this.clientProducersManager = clientProducersManager;
                return clientProducersManager;
            });
            this.consumer.setTrackingClient(client);
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public boolean identifiable() {
            return false;
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public byte id() {
            throw new UnsupportedOperationException();
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public String reference() {
            throw new UnsupportedOperationException();
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public String stream() {
            return this.stream;
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public void unavailable() {
            Utils.lock(this.trackerLock, () -> {
                this.clientProducersManager = null;
                return null;
            });
            this.consumer.unavailable();
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public void running() {
            this.consumer.running();
            this.recovering.set(false);
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public void cancel() {
            Utils.lock(this.trackerLock, () -> {
                this.clientProducersManager.unregister(this);
            });
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public void closeAfterStreamDeletion(short s) {
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public boolean isOpen() {
            return this.consumer.isOpen();
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public long uniqueId() {
            return this.uniqueId;
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public String type() {
            return "tracking consumer";
        }

        @Override // com.rabbitmq.stream.impl.ProducersCoordinator.AgentTracker
        public boolean markRecoveryInProgress() {
            return this.recovering.compareAndSet(false, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProducersCoordinator(StreamEnvironment streamEnvironment, int i, int i2, Function<Utils.ClientConnectionType, String> function, Utils.ClientFactory clientFactory, boolean z) {
        this.environment = streamEnvironment;
        this.clientFactory = clientFactory;
        this.maxProducersByClient = i;
        this.maxTrackingConsumersByClient = i2;
        this.connectionNamingStrategy = function;
        this.forceLeader = z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Runnable registerProducer(StreamProducer streamProducer, String str, String str2) {
        return (Runnable) Utils.lock(this.coordinatorLock, () -> {
            return registerAgentTracker(new ProducerTracker(this.trackerIdSequence.getAndIncrement(), str, str2, streamProducer), str2);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Runnable registerTrackingConsumer(StreamConsumer streamConsumer) {
        return (Runnable) Utils.lock(this.coordinatorLock, () -> {
            return registerAgentTracker(new TrackingConsumerTracker(this.trackerIdSequence.getAndIncrement(), streamConsumer.stream(), streamConsumer), streamConsumer.stream());
        });
    }

    private Runnable registerAgentTracker(AgentTracker agentTracker, String str) {
        List<Utils.BrokerWrapper> findCandidateNodes = findCandidateNodes(str, this.forceLeader);
        addToManager(pickBroker(findCandidateNodes), findCandidateNodes, agentTracker);
        Objects.requireNonNull(agentTracker);
        return agentTracker::cancel;
    }

    private void addToManager(Client.Broker broker, List<Utils.BrokerWrapper> list, AgentTracker agentTracker) {
        Client.ClientParameters dispatchingExecutorServiceFactory = this.environment.clientParametersCopy().host(broker.getHost()).port(broker.getPort()).executorServiceFactory(this.executorServiceFactory).dispatchingExecutorServiceFactory(Utils.NO_OP_EXECUTOR_SERVICE_FACTORY);
        ClientProducersManager clientProducersManager = DEBUG;
        while (clientProducersManager == null) {
            Iterator<ClientProducersManager> it = this.managers.iterator();
            while (it.hasNext()) {
                clientProducersManager = it.next();
                if (!clientProducersManager.isClosed()) {
                    if (broker.equals(clientProducersManager.node) && !clientProducersManager.isFullFor(agentTracker)) {
                        break;
                    } else {
                        clientProducersManager = DEBUG;
                    }
                } else {
                    it.remove();
                    clientProducersManager = DEBUG;
                }
            }
            if (clientProducersManager == null) {
                String keyForNode = Utils.keyForNode(broker);
                LOGGER.debug("Trying to create producer manager on {}", keyForNode);
                clientProducersManager = new ClientProducersManager(broker, list, this.clientFactory, dispatchingExecutorServiceFactory);
                LOGGER.debug("Created producer manager on {}, id {}", keyForNode, Long.valueOf(clientProducersManager.id));
            }
            try {
                clientProducersManager.register(agentTracker);
                Logger logger = LOGGER;
                Object[] objArr = new Object[6];
                objArr[DEBUG] = agentTracker.type();
                objArr[1] = Long.valueOf(agentTracker.uniqueId());
                objArr[2] = agentTracker.stream();
                objArr[3] = Long.valueOf(clientProducersManager.id);
                objArr[4] = clientProducersManager.name;
                objArr[5] = agentTracker.identifiable() ? Byte.valueOf(agentTracker.id()) : "N/A";
                logger.debug("Assigned {} tracker {} (stream '{}') to manager {} (node {}), publisher ID {}", objArr);
                this.managers.add(clientProducersManager);
            } catch (StreamNotAvailableException | ConnectionStreamException | ClientClosedException e) {
                if (clientProducersManager.isEmpty()) {
                    ClientProducersManager clientProducersManager2 = clientProducersManager;
                    this.environment.execute(() -> {
                        clientProducersManager2.closeIfEmpty();
                    }, "Producer manager closing after timeout, producer %d on stream '%s'", Long.valueOf(agentTracker.uniqueId()), agentTracker.stream());
                }
                throw e;
            } catch (IllegalStateException e2) {
                clientProducersManager = DEBUG;
            } catch (RuntimeException e3) {
                if (clientProducersManager != null) {
                    clientProducersManager.closeIfEmpty();
                }
                throw e3;
            }
        }
    }

    List<Utils.BrokerWrapper> findCandidateNodes(String str, boolean z) {
        Map map = (Map) this.environment.locatorOperation(Utils.namedFunction(client -> {
            return client.metadata(str);
        }, "Candidate lookup to publish to '%s'", str));
        if (map.isEmpty() || map.get(str) == null) {
            throw new StreamDoesNotExistException(str);
        }
        Client.StreamMetadata streamMetadata = (Client.StreamMetadata) map.get(str);
        if (!streamMetadata.isResponseOk()) {
            if (streamMetadata.getResponseCode() == 2) {
                throw new StreamDoesNotExistException(str);
            }
            throw new IllegalStateException("Could not get stream metadata, response code: " + streamMetadata.getResponseCode());
        }
        ArrayList arrayList = new ArrayList();
        Client.Broker leader = streamMetadata.getLeader();
        if (leader != null) {
            arrayList.add(new Utils.BrokerWrapper(leader, true));
        } else if (z) {
            throw new IllegalStateException("Not leader available for stream " + str);
        }
        if (!z && streamMetadata.hasReplicas()) {
            arrayList.addAll((Collection) streamMetadata.getReplicas().stream().map(broker -> {
                return new Utils.BrokerWrapper(broker, false);
            }).collect(Collectors.toList()));
        }
        if (arrayList.isEmpty()) {
            throw new IllegalStateException("No stream member available to publish for stream " + str);
        }
        LOGGER.debug("Candidates to publish to {}: {}", str, arrayList);
        return List.copyOf(arrayList);
    }

    static Client.Broker pickBroker(List<Utils.BrokerWrapper> list) {
        return (Client.Broker) list.stream().filter((v0) -> {
            return v0.isLeader();
        }).findFirst().map((v0) -> {
            return v0.broker();
        }).orElseThrow(() -> {
            return new IllegalStateException("Not leader available");
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        Iterator<ClientProducersManager> it = this.managers.iterator();
        while (it.hasNext()) {
            ClientProducersManager next = it.next();
            try {
                it.remove();
                next.close();
            } catch (Exception e) {
                LOGGER.info("Error while closing manager {} connected to node {}: {}", new Object[]{Long.valueOf(next.id), next.name, e.getMessage()});
            }
        }
        try {
            this.executorServiceFactory.close();
        } catch (Exception e2) {
            LOGGER.info("Error while closing executor service factory: {}", e2.getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int clientCount() {
        return this.managers.size();
    }

    int nodesConnected() {
        return ((Set) this.managers.stream().map(clientProducersManager -> {
            return clientProducersManager.name;
        }).collect(Collectors.toSet())).size();
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("{");
        sb.append(Utils.jsonField("client_count", Integer.valueOf(this.managers.size()))).append(",");
        sb.append(Utils.jsonField("producer_count", Integer.valueOf(this.managers.stream().mapToInt(clientProducersManager -> {
            return clientProducersManager.producers.size();
        }).sum()))).append(",");
        sb.append(Utils.jsonField("tracking_consumer_count", Integer.valueOf(this.managers.stream().mapToInt(clientProducersManager2 -> {
            return clientProducersManager2.trackingConsumerTrackers.size();
        }).sum()))).append(",");
        sb.append(Utils.quote("clients")).append(" : [");
        sb.append((String) this.managers.stream().map(clientProducersManager3 -> {
            StringBuilder sb2 = new StringBuilder("{");
            sb2.append(Utils.jsonField("id", Long.valueOf(clientProducersManager3.id))).append(",").append(Utils.jsonField("node", clientProducersManager3.name)).append(",").append(Utils.jsonField("producer_count", Integer.valueOf(clientProducersManager3.producers.size()))).append(",").append(Utils.jsonField("tracking_consumer_count", Integer.valueOf(clientProducersManager3.trackingConsumerTrackers.size()))).append(",");
            sb2.append("\"producers\" : [");
            sb2.append((String) clientProducersManager3.producers.values().stream().map(producerTracker -> {
                StringBuilder sb3 = new StringBuilder("{");
                sb3.append(Utils.jsonField("stream", producerTracker.stream())).append(",");
                sb3.append(Utils.jsonField("producer_id", Byte.valueOf(producerTracker.publisherId)));
                return sb3.append("}").toString();
            }).collect(Collectors.joining(",")));
            sb2.append("],");
            sb2.append("\"tracking_consumers\" : [");
            sb2.append((String) clientProducersManager3.trackingConsumerTrackers.stream().map(agentTracker -> {
                StringBuilder sb3 = new StringBuilder("{");
                sb3.append(Utils.jsonField("stream", agentTracker.stream()));
                return sb3.append("}").toString();
            }).collect(Collectors.joining(",")));
            sb2.append("]");
            return sb2.append("}").toString();
        }).collect(Collectors.joining(",")));
        sb.append("]");
        return sb.append("}").toString();
    }

    static <T> int pickSlot(ConcurrentMap<Byte, T> concurrentMap, T t, AtomicInteger atomicInteger) {
        int i = -1;
        T t2 = t;
        while (t2 != null) {
            i = Integer.remainderUnsigned(atomicInteger.getAndIncrement(), MAX_PRODUCERS_PER_CLIENT);
            t2 = concurrentMap.putIfAbsent(Byte.valueOf((byte) i), t);
        }
        return i;
    }

    private static /* synthetic */ String lambda$toString$13(ProducerTracker producerTracker) {
        StringBuilder sb = new StringBuilder("{");
        sb.append(Utils.quote("stream")).append(":").append(Utils.quote(producerTracker.stream)).append(",");
        sb.append(Utils.quote("node")).append(":");
        Client client = DEBUG;
        ClientProducersManager clientProducersManager = producerTracker.clientProducersManager;
        if (clientProducersManager != null) {
            client = clientProducersManager.client;
        }
        if (client == null) {
            sb.append("null");
        } else {
            sb.append(Utils.quote(client.getHost() + ":" + client.getPort()));
        }
        return sb.append("}").toString();
    }

    private /* synthetic */ void lambda$registerAgentTracker$2(AgentTracker agentTracker) {
        if (agentTracker instanceof ProducerTracker) {
            try {
                this.producerTrackers.remove(agentTracker);
            } catch (Exception e) {
                LOGGER.debug("Error while removing producer tracker from list");
            }
        }
        agentTracker.cancel();
    }
}
