package org.apache.pulsar.broker.transaction.buffer.impl;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.Generated;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.utils.SimpleCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/TableView.class */
public class TableView<T> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(TableView.class);
    private static final long CACHE_EXPIRE_TIMEOUT_MS = 60000;
    private static final long CACHE_EXPIRE_CHECK_FREQUENCY_MS = 3000;

    @VisibleForTesting
    protected final Function<TopicName, CompletableFuture<SystemTopicClient.Reader<T>>> readerCreator;
    private final Map<String, T> snapshots = new ConcurrentHashMap();
    private final long clientOperationTimeoutMs;
    private final SimpleCache<NamespaceName, SystemTopicClient.Reader<T>> readers;

    public TableView(Function<TopicName, CompletableFuture<SystemTopicClient.Reader<T>>> function, long j, ScheduledExecutorService scheduledExecutorService) {
        this.readerCreator = function;
        this.clientOperationTimeoutMs = j;
        this.readers = new SimpleCache<>(scheduledExecutorService, CACHE_EXPIRE_TIMEOUT_MS, CACHE_EXPIRE_CHECK_FREQUENCY_MS);
    }

    public T readLatest(String str) throws Exception {
        SystemTopicClient.Reader<T> reader = getReader(str);
        while (((Boolean) wait(reader.hasMoreEventsAsync(), "has more events")).booleanValue()) {
            Message message = (Message) wait(reader.readNextAsync(), "read message");
            if (message.getKey() != null) {
                if (message.getValue() != null) {
                    this.snapshots.put(message.getKey(), message.getValue());
                } else {
                    this.snapshots.remove(message.getKey());
                }
            }
        }
        return this.snapshots.get(str);
    }

    @VisibleForTesting
    protected SystemTopicClient.Reader<T> getReader(String str) {
        TopicName topicName = TopicName.get(str);
        return this.readers.get(topicName.getNamespaceObject(), () -> {
            try {
                return (SystemTopicClient.Reader) wait(this.readerCreator.apply(topicName), "create reader");
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, reader -> {
            reader.closeAsync().exceptionally(th -> {
                log.warn("Failed to close reader {}", th.getMessage());
                return null;
            });
        });
    }

    private <R> R wait(CompletableFuture<R> completableFuture, String str) throws Exception {
        try {
            return completableFuture.get(this.clientOperationTimeoutMs, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            throw new CompletionException("Failed to " + str, e.getCause());
        }
    }
}
