package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.ObservationCollector;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.impl.ProducerUtils;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.ToLongFunction;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/rabbitmq/stream/impl/SimpleMessageAccumulator.class */
public class SimpleMessageAccumulator implements MessageAccumulator {
    protected final BlockingQueue<ProducerUtils.AccumulatedEntity> messages;
    private final int capacity;
    final ObservationCollector<Object> observationCollector;
    private final StreamProducer producer;
    final ProducerUtils.MessageAccumulatorHelper helper;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SimpleMessageAccumulator(int i, Codec codec, int i2, ToLongFunction<Message> toLongFunction, Function<Message, String> function, Clock clock, String str, ObservationCollector<?> observationCollector, StreamProducer streamProducer) {
        this.helper = new ProducerUtils.MessageAccumulatorHelper(codec, i2, toLongFunction, function, clock, str, observationCollector);
        this.capacity = i;
        this.messages = new LinkedBlockingQueue(this.capacity);
        this.observationCollector = observationCollector;
        this.producer = streamProducer;
    }

    @Override // com.rabbitmq.stream.impl.MessageAccumulator
    public void add(Message message, ConfirmationHandler confirmationHandler) {
        try {
            if (!this.messages.offer(this.helper.entity(message, confirmationHandler), 60L, TimeUnit.SECONDS)) {
                throw new StreamException("Could not accumulate outbound message");
            }
            if (this.messages.size() == this.capacity) {
                publishBatch(true);
            }
        } catch (InterruptedException e) {
            throw new StreamException("Error while accumulating outbound message", e);
        }
    }

    ProducerUtils.AccumulatedEntity get() {
        ProducerUtils.AccumulatedEntity poll = this.messages.poll();
        if (poll != null) {
            this.observationCollector.published(poll.observationContext(), poll.confirmationCallback().message());
        }
        return poll;
    }

    @Override // com.rabbitmq.stream.impl.MessageAccumulator
    public int size() {
        return this.messages.size();
    }

    @Override // com.rabbitmq.stream.impl.MessageAccumulator
    public void flush(boolean z) {
        publishBatch(!z);
    }

    /* JADX WARN: Code restructure failed: missing block: B:27:0x0012, code lost:
    
        if (r4.producer.canSend() != false) goto L7;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void publishBatch(boolean r5) {
        /*
            r4 = this;
            r0 = r4
            com.rabbitmq.stream.impl.StreamProducer r0 = r0.producer
            r0.lock()
            r0 = r5
            if (r0 == 0) goto L15
            r0 = r4
            com.rabbitmq.stream.impl.StreamProducer r0 = r0.producer     // Catch: java.lang.Throwable -> L66
            boolean r0 = r0.canSend()     // Catch: java.lang.Throwable -> L66
            if (r0 == 0) goto L5c
        L15:
            r0 = r4
            java.util.concurrent.BlockingQueue<com.rabbitmq.stream.impl.ProducerUtils$AccumulatedEntity> r0 = r0.messages     // Catch: java.lang.Throwable -> L66
            boolean r0 = r0.isEmpty()     // Catch: java.lang.Throwable -> L66
            if (r0 != 0) goto L5c
            java.util.ArrayList r0 = new java.util.ArrayList     // Catch: java.lang.Throwable -> L66
            r1 = r0
            r2 = r4
            int r2 = r2.capacity     // Catch: java.lang.Throwable -> L66
            r1.<init>(r2)     // Catch: java.lang.Throwable -> L66
            r6 = r0
            r0 = 0
            r7 = r0
        L2f:
            r0 = r7
            r1 = r4
            int r1 = r1.capacity     // Catch: java.lang.Throwable -> L66
            if (r0 == r1) goto L54
            r0 = r4
            com.rabbitmq.stream.impl.ProducerUtils$AccumulatedEntity r0 = r0.get()     // Catch: java.lang.Throwable -> L66
            r8 = r0
            r0 = r8
            if (r0 != 0) goto L45
            goto L54
        L45:
            r0 = r6
            r1 = r8
            boolean r0 = r0.add(r1)     // Catch: java.lang.Throwable -> L66
            int r7 = r7 + 1
            goto L2f
        L54:
            r0 = r4
            com.rabbitmq.stream.impl.StreamProducer r0 = r0.producer     // Catch: java.lang.Throwable -> L66
            r1 = r6
            r0.publishInternal(r1)     // Catch: java.lang.Throwable -> L66
        L5c:
            r0 = r4
            com.rabbitmq.stream.impl.StreamProducer r0 = r0.producer
            r0.unlock()
            goto L72
        L66:
            r9 = move-exception
            r0 = r4
            com.rabbitmq.stream.impl.StreamProducer r0 = r0.producer
            r0.unlock()
            r0 = r9
            throw r0
        L72:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.rabbitmq.stream.impl.SimpleMessageAccumulator.publishBatch(boolean):void");
    }

    @Override // com.rabbitmq.stream.impl.MessageAccumulator, java.lang.AutoCloseable
    public void close() {
    }
}
