package org.infinispan.client.hotrod.impl.transport.netty;

import com.github.benmanes.caffeine.cache.Caffeine;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderException;
import io.reactivex.rxjava3.core.Completable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer;
import java.util.function.Function;
import net.jcip.annotations.GuardedBy;
import org.infinispan.client.hotrod.CacheTopologyInfo;
import org.infinispan.client.hotrod.FailoverRequestBalancingStrategy;
import org.infinispan.client.hotrod.configuration.ClientIntelligence;
import org.infinispan.client.hotrod.configuration.ClusterConfiguration;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ServerConfiguration;
import org.infinispan.client.hotrod.event.impl.ClientEventDispatcher;
import org.infinispan.client.hotrod.event.impl.ClientListenerNotifier;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.client.hotrod.exceptions.RemoteIllegalLifecycleStateException;
import org.infinispan.client.hotrod.exceptions.RemoteNodeSuspectException;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.ClientTopology;
import org.infinispan.client.hotrod.impl.TopologyInfo;
import org.infinispan.client.hotrod.impl.Util;
import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash;
import org.infinispan.client.hotrod.impl.consistenthash.SegmentConsistentHash;
import org.infinispan.client.hotrod.impl.operations.AddClientListenerOperation;
import org.infinispan.client.hotrod.impl.operations.ClientListenerOperation;
import org.infinispan.client.hotrod.impl.operations.DelegatingHotRodOperation;
import org.infinispan.client.hotrod.impl.operations.HotRodBulkOperation;
import org.infinispan.client.hotrod.impl.operations.HotRodOperation;
import org.infinispan.client.hotrod.impl.operations.NoCachePingOperation;
import org.infinispan.client.hotrod.impl.operations.NoHotRodOperation;
import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
import org.infinispan.client.hotrod.impl.topology.CacheInfo;
import org.infinispan.client.hotrod.impl.topology.ClusterInfo;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.stat.CounterTracker;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.concurrent.CompletionStages;

/* loaded from: input_file:org/infinispan/client/hotrod/impl/transport/netty/OperationDispatcher.class */
public class OperationDispatcher {
    private static final Log log;
    public static final String DEFAULT_CLUSTER_NAME = "___DEFAULT-CLUSTER___";
    private final List<ClusterInfo> clusters;
    private final int maxRetries;

    @GuardedBy("lock")
    private final TopologyInfo topologyInfo;

    @GuardedBy("lock")
    private CompletableFuture<Void> clusterSwitchStage;

    @GuardedBy("lock")
    private final Set<SocketAddress> connectionFailedServers;
    private final ChannelHandler channelHandler;
    private final ExecutorService executorService;
    private final TimeService timeService;
    private final ClientListenerNotifier clientListenerNotifier;
    private final CounterTracker totalRetriesMetric;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final StampedLock lock = new StampedLock();
    private final AtomicLong retryCounter = new AtomicLong();

    @GuardedBy("lock")
    private Set<HotRodOperation<?>> priorAgeOperations = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/infinispan/client/hotrod/impl/transport/netty/OperationDispatcher$RetryingHotRodOperation.class */
    public static class RetryingHotRodOperation<T> extends DelegatingHotRodOperation<T> {
        private final Set<SocketAddress> failedServers;
        private int retryCount;

        static <T> RetryingHotRodOperation<T> retryingOp(HotRodOperation<T> hotRodOperation) {
            return hotRodOperation instanceof RetryingHotRodOperation ? (RetryingHotRodOperation) hotRodOperation : new RetryingHotRodOperation<>(hotRodOperation);
        }

        RetryingHotRodOperation(HotRodOperation<T> hotRodOperation) {
            super(hotRodOperation);
            this.failedServers = new HashSet();
        }

        void addFailedServer(SocketAddress socketAddress) {
            this.failedServers.add(socketAddress);
        }

        int incrementRetry() {
            int i = this.retryCount + 1;
            this.retryCount = i;
            return i;
        }

        public Set<SocketAddress> getFailedServers() {
            return this.failedServers;
        }
    }

    public OperationDispatcher(Configuration configuration, ExecutorService executorService, TimeService timeService, ClientListenerNotifier clientListenerNotifier, Consumer<ChannelPipeline> consumer) {
        this.executorService = executorService;
        this.timeService = timeService;
        this.clientListenerNotifier = clientListenerNotifier;
        this.maxRetries = configuration.maxRetries();
        this.connectionFailedServers = configuration.serverFailureTimeout() > 0 ? Collections.newSetFromMap(Caffeine.newBuilder().expireAfterWrite(configuration.serverFailureTimeout(), TimeUnit.MILLISECONDS).build().asMap()) : ConcurrentHashMap.newKeySet();
        ArrayList arrayList = new ArrayList();
        for (ServerConfiguration serverConfiguration : configuration.servers()) {
            arrayList.add(InetSocketAddress.createUnresolved(serverConfiguration.host(), serverConfiguration.port()));
        }
        ClusterInfo clusterInfo = new ClusterInfo(DEFAULT_CLUSTER_NAME, arrayList, configuration.clientIntelligence(), configuration.security().ssl().sniHostName());
        this.topologyInfo = new TopologyInfo(configuration, clusterInfo);
        ArrayList arrayList2 = new ArrayList();
        if (log.isDebugEnabled()) {
            log.debugf("Statically configured servers: %s", arrayList);
            log.debugf("Tcp no delay = %b; client socket timeout = %d ms; connect timeout = %d ms", Boolean.valueOf(configuration.tcpNoDelay()), Integer.valueOf(configuration.socketTimeout()), Integer.valueOf(configuration.connectionTimeout()));
        }
        if (!configuration.clusters().isEmpty()) {
            for (ClusterConfiguration clusterConfiguration : configuration.clusters()) {
                ArrayList arrayList3 = new ArrayList();
                for (ServerConfiguration serverConfiguration2 : clusterConfiguration.getCluster()) {
                    arrayList3.add(InetSocketAddress.createUnresolved(serverConfiguration2.host(), serverConfiguration2.port()));
                }
                ClusterInfo clusterInfo2 = new ClusterInfo(clusterConfiguration.getClusterName(), arrayList3, clusterConfiguration.getClientIntelligence() != null ? clusterConfiguration.getClientIntelligence() : configuration.clientIntelligence(), clusterConfiguration.sniHostName() != null ? clusterConfiguration.sniHostName() : configuration.security().ssl().sniHostName());
                log.debugf("Add secondary cluster: %s", clusterInfo2);
                arrayList2.add(clusterInfo2);
            }
            arrayList2.add(clusterInfo);
        }
        this.clusters = List.copyOf(arrayList2);
        this.channelHandler = new ChannelHandler(configuration, this.topologyInfo.getCluster().getSniHostName(), executorService, this, consumer);
        this.topologyInfo.getOrCreateCacheInfo(HotRodConstants.DEFAULT_CACHE_NAME);
        this.totalRetriesMetric = configuration.metricRegistry().createCounter("connection.pool.retries", "The total number of retries", Map.of(), null);
    }

    public CacheInfo getCacheInfo(String str) {
        long tryOptimisticRead = this.lock.tryOptimisticRead();
        CacheInfo cacheInfo = this.topologyInfo.getCacheInfo(str);
        if (!this.lock.validate(tryOptimisticRead)) {
            long readLock = this.lock.readLock();
            try {
                cacheInfo = this.topologyInfo.getCacheInfo(str);
                this.lock.unlockRead(readLock);
            } catch (Throwable th) {
                this.lock.unlockRead(readLock);
                throw th;
            }
        }
        return cacheInfo;
    }

    ClusterInfo getClusterInfo() {
        long readLock = this.lock.readLock();
        try {
            ClusterInfo cluster = this.topologyInfo.getCluster();
            this.lock.unlockRead(readLock);
            return cluster;
        } catch (Throwable th) {
            this.lock.unlockRead(readLock);
            throw th;
        }
    }

    public TimeService getTimeService() {
        return this.timeService;
    }

    public ClientListenerNotifier getClientListenerNotifier() {
        return this.clientListenerNotifier;
    }

    public void start() {
        Util.await(CompletionStages.performSequentially(this.topologyInfo.getCluster().getInitialServers().iterator(), inetSocketAddress -> {
            return this.channelHandler.startChannelIfNeeded(inetSocketAddress).exceptionally(th -> {
                if (!log.isTraceEnabled()) {
                    return null;
                }
                log.tracef(th, "Ignoring exception establishing a connection to initial server %s", inetSocketAddress);
                return null;
            });
        }));
    }

    public void stop() {
        try {
            this.channelHandler.close();
        } catch (Exception e) {
            log.warn("Exception while shutting down the operation dispatcher.", e);
        }
    }

    public OperationChannel getHandlerForAddress(SocketAddress socketAddress) {
        return this.channelHandler.getChannelForAddress(socketAddress);
    }

    public <E> CompletionStage<E> execute(HotRodOperation<E> hotRodOperation) {
        if ($assertionsDisabled || !hotRodOperation.isInstanceOf(ClientListenerOperation.class)) {
            return execute(hotRodOperation, Set.of());
        }
        throw new AssertionError();
    }

    public <E, O extends HotRodOperation<E>> CompletionStage<E> executeBulk(String str, HotRodBulkOperation<?, E, O> hotRodBulkOperation) {
        return hotRodBulkOperation.executeOperations(identifyOperationTarget(str, this.connectionFailedServers), this::executeOnSingleAddress);
    }

    public CompletionStage<Channel> executeAddListener(ClientListenerOperation clientListenerOperation) {
        return executeAddListener(clientListenerOperation, getBalancer(clientListenerOperation.getCacheName()).nextServer(Set.of()));
    }

    public CompletionStage<Channel> executeAddListener(ClientListenerOperation clientListenerOperation, SocketAddress socketAddress) {
        this.clientListenerNotifier.addDispatcher(ClientEventDispatcher.create(clientListenerOperation, socketAddress, () -> {
        }, clientListenerOperation.getRemoteCache()));
        clientListenerOperation.whenComplete((channel, th) -> {
            if (th != null) {
                log.errorf("Error encountered trying to add listener %s", clientListenerOperation.listener);
                this.clientListenerNotifier.removeClientListener(clientListenerOperation.listenerId);
                return;
            }
            SocketAddress of = ChannelRecord.of(channel);
            if (of != socketAddress) {
                this.clientListenerNotifier.addDispatcher(ClientEventDispatcher.create(clientListenerOperation, of, () -> {
                }, clientListenerOperation.getRemoteCache()));
            }
            addListener(of, clientListenerOperation.listenerId);
            this.clientListenerNotifier.startClientListener(clientListenerOperation.listenerId);
        });
        return executeOnSingleAddress(clientListenerOperation, socketAddress);
    }

    private Function<Object, SocketAddress> identifyOperationTarget(String str, Set<SocketAddress> set) {
        CacheInfo cacheInfo = getCacheInfo(str);
        if (cacheInfo == null || cacheInfo.getConsistentHash() == null) {
            FailoverRequestBalancingStrategy balancer = getBalancer(str);
            return obj -> {
                return balancer.nextServer(set);
            };
        }
        ConsistentHash consistentHash = cacheInfo.getConsistentHash();
        Objects.requireNonNull(consistentHash);
        return consistentHash::getServer;
    }

    private <E> CompletionStage<E> execute(HotRodOperation<E> hotRodOperation, Set<SocketAddress> set) {
        Object routingObject = hotRodOperation.getRoutingObject();
        SocketAddress socketAddress = null;
        if (routingObject != null) {
            socketAddress = addressForObject(routingObject, hotRodOperation.getCacheName(), set);
        }
        if (socketAddress == null) {
            socketAddress = getBalancer(hotRodOperation.getCacheName()).nextServer(set);
        }
        return executeOnSingleAddress(hotRodOperation, socketAddress);
    }

    public SocketAddress addressForObject(Object obj, String str) {
        return addressForObject(obj, str, Set.of());
    }

    protected SocketAddress addressForObject(Object obj, String str, Set<SocketAddress> set) {
        SocketAddress server;
        CacheInfo cacheInfo = getCacheInfo(str);
        if (cacheInfo == null || cacheInfo.getConsistentHash() == null || (server = cacheInfo.getConsistentHash().getServer(obj)) == null || set.contains(server)) {
            return null;
        }
        return server;
    }

    public <E> CompletionStage<E> executeOnSingleAddress(HotRodOperation<E> hotRodOperation, SocketAddress socketAddress) {
        if (!this.connectionFailedServers.isEmpty() && this.connectionFailedServers.contains(socketAddress)) {
            log.tracef("Server %s is suspected, trying another for %s", socketAddress, hotRodOperation);
            socketAddress = getBalancer(hotRodOperation.getCacheName()).nextServer(this.connectionFailedServers);
        }
        log.tracef("Dispatching %s to %s", hotRodOperation, socketAddress);
        return this.channelHandler.submitOperation(hotRodOperation, (SocketAddress) Objects.requireNonNull(socketAddress));
    }

    public FailoverRequestBalancingStrategy getBalancer(String str) {
        return this.topologyInfo.getOrCreateCacheInfo(str).getBalancer();
    }

    public ClientIntelligence getClientIntelligence() {
        return getClusterInfo().getIntelligence();
    }

    public CacheTopologyInfo getCacheTopologyInfo(String str) {
        return getCacheInfo(str).getCacheTopologyInfo();
    }

    public ClientTopology getClientTopologyInfo(String str) {
        return getCacheInfo(str).getClientTopologyRef().get();
    }

    public ChannelHandler getChannelHandler() {
        return this.channelHandler;
    }

    public Map<SocketAddress, Set<Integer>> getPrimarySegmentsByAddress(String str) {
        CacheInfo cacheInfo = getCacheInfo(str);
        if (cacheInfo != null) {
            return cacheInfo.getPrimarySegments();
        }
        return null;
    }

    public Collection<InetSocketAddress> getServers() {
        long readLock = this.lock.readLock();
        try {
            Collection<InetSocketAddress> allServers = this.topologyInfo.getAllServers();
            this.lock.unlockRead(readLock);
            return allServers;
        } catch (Throwable th) {
            this.lock.unlockRead(readLock);
            throw th;
        }
    }

    @GuardedBy("lock")
    private boolean fromPreviousAge(HotRodOperation<?> hotRodOperation) {
        return this.priorAgeOperations != null && this.priorAgeOperations.contains(hotRodOperation);
    }

    public void updateTopology(String str, HotRodOperation<?> hotRodOperation, int i, InetSocketAddress[] inetSocketAddressArr, SocketAddress[][] socketAddressArr, short s) {
        long writeLock = this.lock.writeLock();
        try {
            CacheInfo cacheInfo = this.topologyInfo.getCacheInfo(str);
            if (!$assertionsDisabled && cacheInfo == null) {
                throw new AssertionError("The cache info must exist before receiving a topology update");
            }
            if (this.priorAgeOperations == null && i != cacheInfo.getTopologyId()) {
                List<InetSocketAddress> asList = Arrays.asList(inetSocketAddressArr);
                Log.HOTROD.newTopology(i, -1, inetSocketAddressArr.length, asList);
                updateCacheInfo(str, s >= 0 ? cacheInfo.withNewHash(i, asList, createConsistentHash(socketAddressArr, s, cacheInfo.getCacheName()), socketAddressArr.length) : cacheInfo.withNewServers(i, asList));
            } else if (log.isTraceEnabled()) {
                log.tracef("[%s] Ignoring outdated topology: topology id = %s, previous topology age = %s, servers = %s", new Object[]{cacheInfo.getCacheName(), Integer.valueOf(i), Boolean.valueOf(fromPreviousAge(hotRodOperation)), Arrays.toString(inetSocketAddressArr)});
            }
        } finally {
            this.lock.unlockWrite(writeLock);
        }
    }

    private SegmentConsistentHash createConsistentHash(SocketAddress[][] socketAddressArr, short s, String str) {
        if (log.isTraceEnabled()) {
            if (s == 0) {
                log.tracef("[%s] Not using a consistent hash function (hash function version == 0).", str);
            } else {
                log.tracef("[%s] Updating client hash function with %s number of segments", str, Integer.valueOf(socketAddressArr.length));
            }
        }
        return this.topologyInfo.createConsistentHash(socketAddressArr.length, s, socketAddressArr);
    }

    @GuardedBy("lock")
    protected void updateCacheInfo(String str, CacheInfo cacheInfo) {
        List<InetSocketAddress> servers = cacheInfo.getServers();
        CacheInfo cacheInfo2 = this.topologyInfo.getCacheInfo(str);
        List<InetSocketAddress> servers2 = cacheInfo2.getServers();
        HashSet<SocketAddress> hashSet = new HashSet(servers);
        Objects.requireNonNull(hashSet);
        servers2.forEach((v1) -> {
            r1.remove(v1);
        });
        HashSet<SocketAddress> hashSet2 = new HashSet(servers2);
        Objects.requireNonNull(hashSet2);
        servers.forEach((v1) -> {
            r1.remove(v1);
        });
        if (log.isTraceEnabled()) {
            String cacheName = cacheInfo.getCacheName();
            log.tracef("[%s] Current list: %s", cacheName, servers2);
            log.tracef("[%s] New list: %s", cacheName, servers);
            log.tracef("[%s] Added servers: %s", cacheName, hashSet);
            log.tracef("[%s] Removed servers: %s", cacheName, hashSet2);
        }
        for (SocketAddress socketAddress : hashSet) {
            Log.HOTROD.newServerAdded(socketAddress);
            this.channelHandler.startChannelIfNeeded(socketAddress);
        }
        this.topologyInfo.updateCacheInfo(str, cacheInfo2, cacheInfo);
        for (SocketAddress socketAddress2 : hashSet2) {
            Log.HOTROD.removingServer(socketAddress2);
            this.connectionFailedServers.remove(socketAddress2);
            closeChannel(socketAddress2);
        }
    }

    private void trySwitchCluster() {
        long writeLock = this.lock.writeLock();
        try {
            int topologyAge = this.topologyInfo.getTopologyAge();
            ClusterInfo cluster = this.topologyInfo.getCluster();
            if (this.clusterSwitchStage != null) {
                if (log.isTraceEnabled()) {
                    log.trace("Cluster switch is already in progress");
                }
            } else {
                this.clusterSwitchStage = new CompletableFuture<>();
                this.lock.unlockWrite(writeLock);
                checkServersAlive(cluster.getInitialServers()).thenCompose(bool -> {
                    if (bool.booleanValue()) {
                        if (log.isTraceEnabled()) {
                            log.tracef("Cluster %s is still alive, not switching", cluster);
                        }
                        return CompletableFuture.completedFuture(null);
                    }
                    if (log.isTraceEnabled()) {
                        log.tracef("Trying to switch cluster away from '%s'", cluster.getName());
                    }
                    return findLiveCluster(cluster, topologyAge);
                }).thenAccept(clusterInfo -> {
                    if (clusterInfo != null) {
                        automaticSwitchToCluster(clusterInfo, cluster, topologyAge);
                    }
                }).whenComplete((r3, th) -> {
                    completeClusterSwitch();
                });
            }
        } finally {
            this.lock.unlockWrite(writeLock);
        }
    }

    private CompletionStage<Boolean> checkServersAlive(Collection<InetSocketAddress> collection) {
        if (collection.isEmpty()) {
            return CompletableFuture.completedFuture(false);
        }
        AtomicInteger atomicInteger = new AtomicInteger(collection.size());
        CompletableFuture completableFuture = new CompletableFuture();
        for (InetSocketAddress inetSocketAddress : collection) {
            executeOnSingleAddress(new NoCachePingOperation(), inetSocketAddress).whenComplete((pingResponse, th) -> {
                if (th == null) {
                    log.tracef("Ping to server %s succeeded", inetSocketAddress);
                    completableFuture.complete(true);
                    return;
                }
                if (log.isTraceEnabled()) {
                    log.tracef(th, "Error checking whether this server is alive: %s", inetSocketAddress);
                }
                if (atomicInteger.decrementAndGet() == 0) {
                    completableFuture.complete(false);
                }
            });
        }
        return completableFuture;
    }

    private CompletionStage<ClusterInfo> findLiveCluster(ClusterInfo clusterInfo, int i) {
        ArrayList arrayList = new ArrayList();
        for (ClusterInfo clusterInfo2 : this.clusters) {
            if (!clusterInfo2.getName().equals(clusterInfo.getName())) {
                arrayList.add(clusterInfo2);
            }
        }
        return findLiveCluster0(false, null, arrayList.iterator(), i);
    }

    private CompletionStage<ClusterInfo> findLiveCluster0(boolean z, ClusterInfo clusterInfo, Iterator<ClusterInfo> it, int i) {
        long writeLock = this.lock.writeLock();
        try {
            if (this.clusterSwitchStage == null || this.topologyInfo.getTopologyAge() != i) {
                log.debugf("Cluster switch already completed by another thread, bailing out", new Object[0]);
                CompletableFuture completedFuture = CompletableFuture.completedFuture(null);
                this.lock.unlockWrite(writeLock);
                return completedFuture;
            }
            this.lock.unlockWrite(writeLock);
            if (z) {
                return CompletableFuture.completedFuture(clusterInfo);
            }
            if (it.hasNext()) {
                ClusterInfo next = it.next();
                return checkServersAlive(next.getInitialServers()).thenCompose(bool -> {
                    return findLiveCluster0(bool.booleanValue(), next, it, i);
                });
            }
            log.debugf("All cluster addresses viewed and none worked: %s", this.clusters);
            return CompletableFuture.completedFuture(null);
        } catch (Throwable th) {
            this.lock.unlockWrite(writeLock);
            throw th;
        }
    }

    private void automaticSwitchToCluster(ClusterInfo clusterInfo, ClusterInfo clusterInfo2, int i) {
        long writeLock = this.lock.writeLock();
        try {
            if (this.clusterSwitchStage == null || this.priorAgeOperations != null) {
                log.debugf("Cluster switch already completed by another thread, bailing out", new Object[0]);
                this.lock.unlockWrite(writeLock);
                return;
            }
            log.debugf("Switching to cluster %s, servers: %s", clusterInfo.getName(), clusterInfo.getInitialServers());
            markPendingOperationsAsPriorAge();
            for (InetSocketAddress inetSocketAddress : this.topologyInfo.getAllServers()) {
                closeChannel(inetSocketAddress);
                this.connectionFailedServers.remove(inetSocketAddress);
            }
            this.topologyInfo.switchCluster(clusterInfo);
            this.lock.unlockWrite(writeLock);
            if (clusterInfo.getName().equals(DEFAULT_CLUSTER_NAME)) {
                Log.HOTROD.switchedBackToMainCluster();
            } else {
                Log.HOTROD.switchedToCluster(clusterInfo.getName());
            }
        } catch (Throwable th) {
            this.lock.unlockWrite(writeLock);
            throw th;
        }
    }

    private void closeChannel(SocketAddress socketAddress) {
        List<HotRodOperation<?>> closeChannel = this.channelHandler.closeChannel(socketAddress);
        if (closeChannel.isEmpty()) {
            return;
        }
        this.executorService.submit(() -> {
            TransportException connectionClosed = log.connectionClosed(socketAddress, socketAddress);
            Iterator it = closeChannel.iterator();
            while (it.hasNext()) {
                handleResponse((HotRodOperation<SocketAddress>) it.next(), -1L, socketAddress, (SocketAddress) null, connectionClosed);
            }
        });
    }

    public boolean manualSwitchToCluster(String str) {
        if (this.clusters.isEmpty()) {
            log.debugf("No alternative clusters configured, so can't switch cluster", new Object[0]);
            return false;
        }
        ClusterInfo clusterInfo = null;
        for (ClusterInfo clusterInfo2 : this.clusters) {
            if (clusterInfo2.getName().equals(str)) {
                clusterInfo = clusterInfo2;
            }
        }
        if (clusterInfo == null) {
            log.debugf("Cluster named %s does not exist in the configuration", str);
            return false;
        }
        long writeLock = this.lock.writeLock();
        boolean z = false;
        try {
            if (this.clusterSwitchStage != null) {
                log.debugf("Another cluster switch is already in progress, overriding it", new Object[0]);
                z = true;
            }
            log.debugf("Switching to cluster %s, servers: %s", str, clusterInfo.getInitialServers());
            markPendingOperationsAsPriorAge();
            this.topologyInfo.switchCluster(clusterInfo);
            this.lock.unlockWrite(writeLock);
            if (str.equals(DEFAULT_CLUSTER_NAME)) {
                Log.HOTROD.manuallySwitchedBackToMainCluster();
            } else {
                Log.HOTROD.manuallySwitchedToCluster(str);
            }
            if (!z) {
                return true;
            }
            completeClusterSwitch();
            return true;
        } catch (Throwable th) {
            this.lock.unlockWrite(writeLock);
            throw th;
        }
    }

    @GuardedBy("lock")
    private void markPendingOperationsAsPriorAge() {
        Set<HotRodOperation<?>> set;
        if (this.priorAgeOperations == null) {
            set = ConcurrentHashMap.newKeySet();
            this.priorAgeOperations = set;
        } else {
            set = this.priorAgeOperations;
        }
        NoHotRodOperation instance = NoHotRodOperation.instance();
        set.add(instance);
        Set<HotRodOperation<?>> set2 = set;
        Set<HotRodOperation<?>> set3 = set;
        this.channelHandler.pendingOperationFlowable().concatMapCompletable(hotRodOperation -> {
            set2.add(hotRodOperation);
            return Completable.fromCompletionStage(hotRodOperation.asCompletableFuture().whenComplete((obj, th) -> {
                set2.remove(hotRodOperation);
            }));
        }).subscribe(() -> {
            if (set3.isEmpty()) {
                long writeLock = this.lock.writeLock();
                try {
                    this.priorAgeOperations = null;
                    this.lock.unlockWrite(writeLock);
                } catch (Throwable th) {
                    this.lock.unlockWrite(writeLock);
                    throw th;
                }
            }
        }, th -> {
            log.fatal("Problem occurred while configuring prior age operations for cluster failover", th);
        });
        set.remove(instance);
        if (set.isEmpty()) {
            this.priorAgeOperations = null;
        }
    }

    private void completeClusterSwitch() {
        long writeLock = this.lock.writeLock();
        try {
            CompletableFuture<Void> completableFuture = this.clusterSwitchStage;
            this.clusterSwitchStage = null;
            this.lock.unlockWrite(writeLock);
            if (completableFuture != null) {
                completableFuture.complete(null);
            }
        } catch (Throwable th) {
            this.lock.unlockWrite(writeLock);
            throw th;
        }
    }

    public <E> void handleResponse(HotRodOperation<E> hotRodOperation, long j, Channel channel, E e, Throwable th) {
        handleResponse((HotRodOperation<SocketAddress>) hotRodOperation, j, ChannelRecord.of(channel), (SocketAddress) e, th);
    }

    public <E> void handleResponse(HotRodOperation<E> hotRodOperation, long j, SocketAddress socketAddress, E e, Throwable th) {
        if (log.isTraceEnabled()) {
            log.tracef("Completing message %d op %s with value %s or exception %s", new Object[]{Long.valueOf(j), hotRodOperation, org.infinispan.commons.util.Util.toStr(e), th});
        }
        if (th == null) {
            hotRodOperation.asCompletableFuture().complete(e);
            return;
        }
        RetryingHotRodOperation<?> checkException = checkException(th, socketAddress, hotRodOperation);
        if (checkException != null) {
            if (hotRodOperation instanceof AddClientListenerOperation) {
                logAndRetryOrFail(th, checkException, (AddClientListenerOperation) hotRodOperation);
            } else {
                logAndRetryOrFail(th, checkException);
            }
        }
    }

    private void addListener(SocketAddress socketAddress, byte[] bArr) {
        OperationChannel channelForAddress = this.channelHandler.getChannelForAddress(socketAddress);
        if (channelForAddress == null) {
            throw new IllegalStateException("Channel is not running for address " + String.valueOf(socketAddress));
        }
        channelForAddress.getChannel().pipeline().get(HeaderDecoder.NAME).addListener(bArr);
    }

    public void removeListener(SocketAddress socketAddress, byte[] bArr) {
        OperationChannel channelForAddress = this.channelHandler.getChannelForAddress(socketAddress);
        if (channelForAddress != null) {
            channelForAddress.getChannel().pipeline().get(HeaderDecoder.NAME).removeListener(bArr);
        }
    }

    public SocketAddress unresolvedAddressForChannel(Channel channel) {
        return ChannelRecord.of(channel);
    }

    private RetryingHotRodOperation<?> checkException(Throwable th, SocketAddress socketAddress, HotRodOperation<?> hotRodOperation) {
        while ((th instanceof DecoderException) && th.getCause() != null) {
            th = th.getCause();
        }
        if (!hotRodOperation.supportRetry() || (isServerError(th) && !(th instanceof RemoteIllegalLifecycleStateException) && !(th instanceof RemoteNodeSuspectException))) {
            hotRodOperation.asCompletableFuture().completeExceptionally(th);
            return null;
        }
        if (Thread.interrupted()) {
            InterruptedException interruptedException = new InterruptedException();
            interruptedException.addSuppressed(th);
            hotRodOperation.asCompletableFuture().completeExceptionally(interruptedException);
            return null;
        }
        RetryingHotRodOperation<?> retryingOp = RetryingHotRodOperation.retryingOp(hotRodOperation);
        if (socketAddress != null) {
            retryingOp.addFailedServer(socketAddress);
        }
        return retryingOp;
    }

    protected final boolean isServerError(Throwable th) {
        return (th instanceof HotRodClientException) && ((HotRodClientException) th).isServerError();
    }

    protected void logAndRetryOrFail(Throwable th, RetryingHotRodOperation<?> retryingHotRodOperation) {
        if (canRetry(th, retryingHotRodOperation)) {
            execute(retryingHotRodOperation, retryingHotRodOperation.getFailedServers());
        }
    }

    protected void logAndRetryOrFail(Throwable th, RetryingHotRodOperation<?> retryingHotRodOperation, AddClientListenerOperation addClientListenerOperation) {
        if (canRetry(th, retryingHotRodOperation)) {
            FailoverRequestBalancingStrategy balancer = getBalancer(retryingHotRodOperation.getCacheName());
            SocketAddress nextServer = balancer.nextServer(retryingHotRodOperation.getFailedServers());
            if (this.connectionFailedServers.contains(nextServer)) {
                nextServer = balancer.nextServer(retryingHotRodOperation.getFailedServers());
            }
            this.clientListenerNotifier.addDispatcher(ClientEventDispatcher.create(addClientListenerOperation, nextServer, () -> {
            }, addClientListenerOperation.getRemoteCache()));
            executeOnSingleAddress(retryingHotRodOperation, nextServer);
        }
    }

    protected boolean canRetry(Throwable th, RetryingHotRodOperation<?> retryingHotRodOperation) {
        int incrementRetry = retryingHotRodOperation.incrementRetry();
        if (incrementRetry > this.maxRetries) {
            Log.HOTROD.exceptionAndNoRetriesLeft(incrementRetry, this.maxRetries, th);
            retryingHotRodOperation.asCompletableFuture().completeExceptionally(th);
            return false;
        }
        if (log.isTraceEnabled()) {
            log.tracef(th, "Exception encountered in %s. Retry %d out of %d", this, Integer.valueOf(incrementRetry), Integer.valueOf(this.maxRetries));
        }
        this.totalRetriesMetric.increment();
        this.retryCounter.incrementAndGet();
        retryingHotRodOperation.reset();
        return true;
    }

    public void handleConnectionFailure(OperationChannel operationChannel, Throwable th) {
        if (th == null) {
            SocketAddress address = operationChannel.getAddress();
            ChannelRecord.set(operationChannel.getChannel(), address);
            this.connectionFailedServers.remove(address);
            log.tracef("OperationChannel connected: %s", operationChannel);
            return;
        }
        long writeLock = this.lock.writeLock();
        try {
            this.connectionFailedServers.add(operationChannel.getAddress());
            boolean containsAll = this.connectionFailedServers.containsAll(this.topologyInfo.getCluster().getInitialServers());
            if (log.isTraceEnabled()) {
                log.tracef("Connection attempt failed, we now have %d servers with no established connections: %s", this.connectionFailedServers.size(), this.connectionFailedServers);
            }
            if (!containsAll || this.clusters.isEmpty()) {
                resetCachesWithFailedServers();
            }
            if (containsAll && !this.clusters.isEmpty()) {
                trySwitchCluster();
            }
            handleChannelFailure(operationChannel, th);
        } finally {
            this.lock.unlockWrite(writeLock);
        }
    }

    @GuardedBy("lock")
    private void resetCachesWithFailedServers() {
        ArrayList arrayList = new ArrayList();
        this.topologyInfo.forEachCache((str, cacheInfo) -> {
            List<InetSocketAddress> servers = cacheInfo.getServers();
            boolean containsAll = this.connectionFailedServers.containsAll(servers);
            boolean z = !servers.equals(this.topologyInfo.getCluster().getInitialServers());
            if (containsAll && z) {
                arrayList.add(cacheInfo.getCacheName());
            }
        });
        if (arrayList.isEmpty()) {
            return;
        }
        Log.HOTROD.revertCacheToInitialServerList(arrayList);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.topologyInfo.reset((String) it.next());
        }
    }

    public void handleChannelFailure(Channel channel, Throwable th) {
        if (!$assertionsDisabled && !channel.eventLoop().inEventLoop()) {
            throw new AssertionError();
        }
        OperationChannel channelForAddress = this.channelHandler.getChannelForAddress(ChannelRecord.of(channel));
        if (channelForAddress != null) {
            handleChannelFailure(channelForAddress, th);
        }
    }

    private void handleChannelFailure(OperationChannel operationChannel, Throwable th) {
        Iterator<HotRodOperation<?>> it = operationChannel.reconnect(th).iterator();
        while (it.hasNext()) {
            handleResponse((HotRodOperation<SocketAddress>) it.next(), -1L, operationChannel.getAddress(), (SocketAddress) null, th);
        }
    }

    public String getCurrentClusterName() {
        return getClusterInfo().getName();
    }

    public long getRetries() {
        return this.retryCounter.get();
    }

    public ConsistentHash getConsistentHash(String str) {
        return getCacheInfo(str).getConsistentHash();
    }

    public int getTopologyId(String str) {
        return getCacheInfo(str).getTopologyId();
    }

    public Collection<InetSocketAddress> getServers(String str) {
        long readLock = this.lock.readLock();
        try {
            List<InetSocketAddress> servers = this.topologyInfo.getServers(str);
            this.lock.unlockRead(readLock);
            return servers;
        } catch (Throwable th) {
            this.lock.unlockRead(readLock);
            throw th;
        }
    }

    public void addCacheTopologyInfoIfAbsent(String str) {
        this.topologyInfo.getOrCreateCacheInfo(str);
    }

    public Set<SocketAddress> getConnectionFailedServers() {
        return this.connectionFailedServers;
    }

    static {
        $assertionsDisabled = !OperationDispatcher.class.desiredAssertionStatus();
        log = (Log) LogFactory.getLog(OperationDispatcher.class, Log.class);
    }
}
