package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.ConfirmationStatus;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageBuilder;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.ProducerUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/rabbitmq/stream/impl/StreamProducer.class */
public class StreamProducer implements Producer {
    private final long id;
    private final MessageAccumulator accumulator;
    private final ConcurrentMap<Long, ProducerUtils.AccumulatedEntity> unconfirmedMessages;
    private final int batchSize;
    private final String name;
    private final String stream;
    private final Client.OutboundEntityWriteCallback writeCallback;
    private final Semaphore unconfirmedMessagesSemaphore;
    private final Runnable closingCallback;
    private final StreamEnvironment environment;
    private final int maxUnconfirmedMessages;
    private final Codec codec;
    private final long enqueueTimeoutMs;
    private final boolean blockOnMaxUnconfirmed;
    private final boolean retryOnRecovery;
    private volatile Client client;
    private volatile byte publisherId;
    private volatile Status status;
    private volatile ScheduledFuture<?> confirmTimeoutFuture;
    private final short publishVersion;
    private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamProducer.class);
    private static final ConfirmationHandler NO_OP_CONFIRMATION_HANDLER = confirmationStatus -> {
    };
    private static final Client.OutboundEntityWriteCallback OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK = new OutboundMessageFilterValueWriterCallback();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final ToLongFunction<Object> publishSequenceFunction = obj -> {
        return ((ProducerUtils.AccumulatedEntity) obj).publishingId();
    };
    private final Lock lock = new ReentrantLock();

    /* loaded from: input_file:com/rabbitmq/stream/impl/StreamProducer$OutboundMessageFilterValueWriterCallback.class */
    private static final class OutboundMessageFilterValueWriterCallback implements Client.OutboundEntityWriteCallback {
        private OutboundMessageFilterValueWriterCallback() {
        }

        @Override // com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback
        public int write(ByteBuf byteBuf, Object obj, long j) {
            ProducerUtils.AccumulatedEntity accumulatedEntity = (ProducerUtils.AccumulatedEntity) obj;
            String filterValue = accumulatedEntity.filterValue();
            if (filterValue == null) {
                byteBuf.writeShort(-1);
            } else {
                byteBuf.writeShort(filterValue.length());
                byteBuf.writeBytes(filterValue.getBytes(StandardCharsets.UTF_8));
            }
            Codec.EncodedMessage encodedMessage = (Codec.EncodedMessage) accumulatedEntity.encodedEntity();
            byteBuf.writeInt(encodedMessage.getSize());
            byteBuf.writeBytes(encodedMessage.getData(), 0, encodedMessage.getSize());
            return 1;
        }

        @Override // com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback
        public int fragmentLength(Object obj) {
            ProducerUtils.AccumulatedEntity accumulatedEntity = (ProducerUtils.AccumulatedEntity) obj;
            Codec.EncodedMessage encodedMessage = (Codec.EncodedMessage) accumulatedEntity.encodedEntity();
            return accumulatedEntity.filterValue() == null ? 14 + encodedMessage.getSize() : 10 + accumulatedEntity.filterValue().length() + 4 + encodedMessage.getSize();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/rabbitmq/stream/impl/StreamProducer$Status.class */
    public enum Status {
        RUNNING,
        NOT_AVAILABLE,
        CLOSED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @SuppressFBWarnings({"CT_CONSTRUCTOR_THROW"})
    public StreamProducer(String str, String str2, int i, int i2, boolean z, Compression compression, Duration duration, int i3, Duration duration2, Duration duration3, boolean z2, Function<Message, String> function, StreamEnvironment streamEnvironment) {
        if (function != null && !streamEnvironment.filteringSupported()) {
            throw new IllegalArgumentException("Filtering is not supported by the broker (requires RabbitMQ 3.13+ and stream_filtering feature flag activated");
        }
        this.id = ID_SEQUENCE.getAndIncrement();
        this.environment = streamEnvironment;
        this.name = str;
        this.stream = str2;
        this.enqueueTimeoutMs = duration3.toMillis();
        this.retryOnRecovery = z2;
        this.blockOnMaxUnconfirmed = duration3.isZero();
        this.closingCallback = streamEnvironment.registerProducer(this, str, this.stream);
        AtomicLong atomicLong = new AtomicLong(computeFirstValueOfPublishingSequence());
        ToLongFunction toLongFunction = message -> {
            return message.hasPublishingId() ? message.getPublishingId() : atomicLong.getAndIncrement();
        };
        Client.OutboundEntityWriteCallback outboundEntityWriteCallback = i <= 1 ? function == null ? Client.OUTBOUND_MESSAGE_WRITE_CALLBACK : OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK : Client.OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK;
        this.maxUnconfirmedMessages = i3;
        this.unconfirmedMessagesSemaphore = new Semaphore(i3, true);
        this.unconfirmedMessages = new ConcurrentHashMap(this.maxUnconfirmedMessages, 0.75f, 2);
        if (function == null) {
            this.publishVersion = (short) 1;
            final Client.OutboundEntityWriteCallback outboundEntityWriteCallback2 = outboundEntityWriteCallback;
            this.writeCallback = new Client.OutboundEntityWriteCallback() { // from class: com.rabbitmq.stream.impl.StreamProducer.1
                @Override // com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback
                public int write(ByteBuf byteBuf, Object obj, long j) {
                    ProducerUtils.AccumulatedEntity accumulatedEntity = (ProducerUtils.AccumulatedEntity) obj;
                    StreamProducer.this.unconfirmedMessages.put(Long.valueOf(j), accumulatedEntity);
                    return outboundEntityWriteCallback2.write(byteBuf, accumulatedEntity.encodedEntity(), j);
                }

                @Override // com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback
                public int fragmentLength(Object obj) {
                    return outboundEntityWriteCallback2.fragmentLength(((ProducerUtils.AccumulatedEntity) obj).encodedEntity());
                }
            };
        } else {
            this.publishVersion = (short) 2;
            final Client.OutboundEntityWriteCallback outboundEntityWriteCallback3 = outboundEntityWriteCallback;
            this.writeCallback = new Client.OutboundEntityWriteCallback() { // from class: com.rabbitmq.stream.impl.StreamProducer.2
                @Override // com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback
                public int write(ByteBuf byteBuf, Object obj, long j) {
                    ProducerUtils.AccumulatedEntity accumulatedEntity = (ProducerUtils.AccumulatedEntity) obj;
                    StreamProducer.this.unconfirmedMessages.put(Long.valueOf(j), accumulatedEntity);
                    return outboundEntityWriteCallback3.write(byteBuf, accumulatedEntity, j);
                }

                @Override // com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback
                public int fragmentLength(Object obj) {
                    return outboundEntityWriteCallback3.fragmentLength(obj);
                }
            };
        }
        this.accumulator = ProducerUtils.createMessageAccumulator(z, i, i2, i3, compression != null ? streamEnvironment.compressionCodecFactory().get(compression) : null, streamEnvironment.codec(), streamEnvironment.byteBufAllocator(), this.client.maxFrameSize(), toLongFunction, function, streamEnvironment.clock(), str2, streamEnvironment.observationCollector(), this);
        boolean z3 = !z && duration.toMillis() > 0;
        LOGGER.debug("Background batch publishing task required? {}", Boolean.valueOf(z3));
        if (z3) {
            AtomicReference atomicReference = new AtomicReference();
            Runnable runnable = () -> {
                if (canSend()) {
                    this.accumulator.flush(false);
                }
                if (this.status != Status.CLOSED) {
                    streamEnvironment.scheduledExecutorService().schedule(Utils.namedRunnable((Runnable) atomicReference.get(), "Background batch publishing task for publisher %d on stream '%s'", Long.valueOf(this.id), this.stream), duration.toMillis(), TimeUnit.MILLISECONDS);
                }
            };
            atomicReference.set(runnable);
            streamEnvironment.scheduledExecutorService().schedule(Utils.namedRunnable(runnable, "Background batch publishing task for publisher %d on stream '%s'", Long.valueOf(this.id), this.stream), duration.toMillis(), TimeUnit.MILLISECONDS);
        }
        this.batchSize = i2;
        this.codec = streamEnvironment.codec();
        if (!duration2.isZero()) {
            AtomicReference atomicReference2 = new AtomicReference();
            Runnable confirmTimeoutTask = confirmTimeoutTask(duration2);
            atomicReference2.set(() -> {
                try {
                    confirmTimeoutTask.run();
                } catch (Exception e) {
                    LOGGER.info("Error while executing confirm timeout check task: {}", e.getCause());
                }
                if (this.status != Status.CLOSED) {
                    this.confirmTimeoutFuture = this.environment.scheduledExecutorService().schedule(Utils.namedRunnable((Runnable) atomicReference2.get(), "Background confirm timeout task for producer %d on stream %s", Long.valueOf(this.id), this.stream), duration2.toMillis(), TimeUnit.MILLISECONDS);
                }
            });
            this.confirmTimeoutFuture = this.environment.scheduledExecutorService().schedule(Utils.namedRunnable((Runnable) atomicReference2.get(), "Background confirm timeout task for producer %d on stream %s", Long.valueOf(this.id), this.stream), duration2.toMillis(), TimeUnit.MILLISECONDS);
        }
        this.status = Status.RUNNING;
    }

    private Runnable confirmTimeoutTask(Duration duration) {
        return () -> {
            long time = this.environment.clock().time() - duration.toNanos();
            int i = 0;
            for (Map.Entry entry : new TreeMap(this.unconfirmedMessages).entrySet()) {
                if (((ProducerUtils.AccumulatedEntity) entry.getValue()).time() >= time) {
                    break;
                }
                if (Thread.currentThread().isInterrupted()) {
                    return;
                }
                error(((Long) entry.getKey()).longValue(), (short) 10004);
                i++;
            }
            if (i > 0) {
                LOGGER.debug("{} outbound message(s) had reached the confirm timeout (limit {}) for producer {} on stream '{}', application notified with callback", new Object[]{Integer.valueOf(i), Long.valueOf(time), Long.valueOf(this.id), this.stream});
            }
        };
    }

    private long computeFirstValueOfPublishingSequence() {
        if (this.name == null || this.name.isEmpty()) {
            return 0L;
        }
        long queryPublisherSequence = this.client.queryPublisherSequence(this.name, this.stream);
        if (queryPublisherSequence == 0) {
            return 0L;
        }
        return queryPublisherSequence + 1;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void confirm(long j) {
        ProducerUtils.AccumulatedEntity remove = this.unconfirmedMessages.remove(Long.valueOf(j));
        if (remove == null) {
            this.unconfirmedMessagesSemaphore.release();
        } else {
            this.unconfirmedMessagesSemaphore.release(remove.confirmationCallback().handle(true, (short) 1));
        }
    }

    int unconfirmedCount() {
        return this.unconfirmedMessages.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void error(long j, short s) {
        ProducerUtils.AccumulatedEntity remove = this.unconfirmedMessages.remove(Long.valueOf(j));
        if (remove == null) {
            this.unconfirmedMessagesSemaphore.release();
        } else {
            this.unconfirmedMessagesSemaphore.release(remove.confirmationCallback().handle(false, s));
        }
    }

    @Override // com.rabbitmq.stream.Producer
    public MessageBuilder messageBuilder() {
        return this.codec.messageBuilder();
    }

    @Override // com.rabbitmq.stream.Producer
    public long getLastPublishingId() {
        checkNotClosed();
        if (this.name == null || this.name.isEmpty()) {
            throw new IllegalStateException("The producer has no name");
        }
        if (!canSend()) {
            throw new IllegalStateException("The producer has no connection");
        }
        try {
            return this.client.queryPublisherSequence(this.name, this.stream);
        } catch (Exception e) {
            throw new IllegalStateException("Error while trying to query last publishing ID for producer " + this.name + " on stream " + this.stream);
        }
    }

    @Override // com.rabbitmq.stream.Producer
    public void send(Message message, ConfirmationHandler confirmationHandler) {
        if (confirmationHandler == null) {
            confirmationHandler = NO_OP_CONFIRMATION_HANDLER;
        }
        try {
            if (!canSend()) {
                failPublishing(message, confirmationHandler);
            } else if (this.blockOnMaxUnconfirmed) {
                this.unconfirmedMessagesSemaphore.acquire();
                doSend(message, confirmationHandler);
            } else if (this.unconfirmedMessagesSemaphore.tryAcquire(this.enqueueTimeoutMs, TimeUnit.MILLISECONDS)) {
                doSend(message, confirmationHandler);
            } else {
                confirmationHandler.handle(new ConfirmationStatus(message, false, (short) 10001));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new StreamException("Interrupted while waiting to accumulate outbound message", e);
        }
    }

    private void doSend(Message message, ConfirmationHandler confirmationHandler) {
        if (canSend()) {
            this.accumulator.add(message, confirmationHandler);
        } else {
            failPublishing(message, confirmationHandler);
        }
    }

    private void failPublishing(Message message, ConfirmationHandler confirmationHandler) {
        if (this.status == Status.NOT_AVAILABLE) {
            confirmationHandler.handle(new ConfirmationStatus(message, false, (short) 10002));
        } else if (this.status == Status.CLOSED) {
            confirmationHandler.handle(new ConfirmationStatus(message, false, (short) 10003));
        } else {
            confirmationHandler.handle(new ConfirmationStatus(message, false, (short) 10002));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean canSend() {
        return this.status == Status.RUNNING;
    }

    @Override // com.rabbitmq.stream.Producer, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            if (this.status != Status.RUNNING || this.client == null) {
                LOGGER.debug("No need to delete producer {}, it is currently unavailable", Byte.valueOf(this.publisherId));
            } else {
                LOGGER.debug("Deleting producer {}", Byte.valueOf(this.publisherId));
                Client.Response deletePublisher = this.client.deletePublisher(this.publisherId);
                if (!deletePublisher.isOk()) {
                    LOGGER.info("Could not delete publisher {} on producer closing: {}", Byte.valueOf(this.publisherId), Utils.formatConstant(deletePublisher.getResponseCode()));
                }
            }
            this.environment.removeProducer(this);
            closeFromEnvironment();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeFromEnvironment() {
        this.accumulator.close();
        this.closingCallback.run();
        cancelConfirmTimeoutTask();
        this.closed.set(true);
        this.status = Status.CLOSED;
        LOGGER.debug("Closed publisher {} successfully", Byte.valueOf(this.publisherId));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeAfterStreamDeletion(short s) {
        if (this.closed.compareAndSet(false, true)) {
            if (!this.unconfirmedMessages.isEmpty()) {
                Iterator<Map.Entry<Long, ProducerUtils.AccumulatedEntity>> it = this.unconfirmedMessages.entrySet().iterator();
                while (it.hasNext()) {
                    this.unconfirmedMessagesSemaphore.release(it.next().getValue().confirmationCallback().handle(false, s));
                    it.remove();
                }
            }
            cancelConfirmTimeoutTask();
            this.environment.removeProducer(this);
            this.status = Status.CLOSED;
        }
    }

    private void cancelConfirmTimeoutTask() {
        if (this.confirmTimeoutFuture != null) {
            this.confirmTimeoutFuture.cancel(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publishInternal(List<Object> list) {
        this.client.publishInternal(this.publishVersion, this.publisherId, list, this.writeCallback, this.publishSequenceFunction);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isOpen() {
        return !this.closed.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unavailable() {
        this.status = Status.NOT_AVAILABLE;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void running() {
        executeInLock(() -> {
            LOGGER.debug("Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)", Integer.valueOf(this.unconfirmedMessages.size()), Integer.valueOf(this.accumulator.size()));
            if (this.retryOnRecovery) {
                LOGGER.debug("Re-publishing {} unconfirmed message(s)", Integer.valueOf(this.unconfirmedMessages.size()));
                if (!this.unconfirmedMessages.isEmpty()) {
                    TreeMap treeMap = new TreeMap(this.unconfirmedMessages);
                    this.unconfirmedMessages.clear();
                    Iterator it = treeMap.entrySet().iterator();
                    while (it.hasNext()) {
                        ArrayList arrayList = new ArrayList(this.batchSize);
                        for (int i = 0; i != this.batchSize; i++) {
                            Object value = it.hasNext() ? ((Map.Entry) it.next()).getValue() : null;
                            if (value == null) {
                                break;
                            }
                            arrayList.add(value);
                        }
                        this.client.publishInternal(this.publishVersion, this.publisherId, arrayList, this.writeCallback, this.publishSequenceFunction);
                    }
                }
            } else {
                LOGGER.debug("Skipping republishing of {} unconfirmed messages", Integer.valueOf(this.unconfirmedMessages.size()));
                TreeMap treeMap2 = new TreeMap(this.unconfirmedMessages);
                this.unconfirmedMessages.clear();
                Iterator it2 = treeMap2.values().iterator();
                while (it2.hasNext()) {
                    try {
                        this.unconfirmedMessagesSemaphore.release(((ProducerUtils.AccumulatedEntity) it2.next()).confirmationCallback().handle(false, (short) 10004));
                    } catch (Exception e) {
                        LOGGER.debug("Error while nack-ing outbound message: {}", e.getMessage());
                        this.unconfirmedMessagesSemaphore.release(1);
                    }
                }
            }
            this.accumulator.flush(true);
            int availablePermits = this.maxUnconfirmedMessages - this.unconfirmedMessagesSemaphore.availablePermits();
            if (availablePermits > 0) {
                this.unconfirmedMessagesSemaphore.release(availablePermits);
                if (this.unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
                    return;
                }
                LOGGER.debug("Could not acquire {} permit(s) for message republishing", Integer.valueOf(this.unconfirmedMessages.size()));
            }
        });
        this.status = Status.RUNNING;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setClient(Client client) {
        executeInLock(() -> {
            this.client = client;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setPublisherId(byte b) {
        executeInLock(() -> {
            this.publisherId = b;
        });
    }

    Status status() {
        return this.status;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        StreamProducer streamProducer = (StreamProducer) obj;
        return this.id == streamProducer.id && this.stream.equals(streamProducer.stream);
    }

    public int hashCode() {
        return Objects.hash(Long.valueOf(this.id), this.stream);
    }

    public String toString() {
        Client client = this.client;
        long j = this.id;
        String str = this.stream;
        if (client != null) {
            String str2 = "\"" + client.connectionName() + "\"";
        }
        return "{ \"id\" : " + j + ",\"stream\" : \"" + j + "\",\"publishing_client\" : " + str + "}";
    }

    private void checkNotClosed() {
        if (this.closed.get()) {
            throw new IllegalStateException("This producer instance has been closed");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void lock() {
        this.lock.lock();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unlock() {
        this.lock.unlock();
    }

    private void executeInLock(Runnable runnable) {
        lock();
        try {
            runnable.run();
        } finally {
            unlock();
        }
    }
}
