package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.ObservationCollector;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.compression.CompressionCodec;
import com.rabbitmq.stream.impl.ProducerUtils;
import io.netty.buffer.ByteBufAllocator;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.function.ToLongFunction;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.class */
public final class DynamicBatchMessageAccumulator implements MessageAccumulator {
    private final DynamicBatch<Object> dynamicBatch;
    private final ObservationCollector<Object> observationCollector;
    private final StreamProducer producer;
    private final ProducerUtils.MessageAccumulatorHelper helper;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DynamicBatchMessageAccumulator(int i, int i2, Codec codec, int i3, ToLongFunction<Message> toLongFunction, Function<Message, String> function, Clock clock, String str, CompressionCodec compressionCodec, ByteBufAllocator byteBufAllocator, ObservationCollector<?> observationCollector, StreamProducer streamProducer) {
        this.helper = new ProducerUtils.MessageAccumulatorHelper(codec, i3, toLongFunction, function, clock, str, observationCollector);
        this.producer = streamProducer;
        this.observationCollector = observationCollector;
        boolean z = !this.observationCollector.isNoop();
        if (i <= 1) {
            this.dynamicBatch = new DynamicBatch<>(list -> {
                boolean publish = publish(list);
                if (publish && z) {
                    list.forEach(obj -> {
                        ProducerUtils.AccumulatedEntity accumulatedEntity = (ProducerUtils.AccumulatedEntity) obj;
                        this.observationCollector.published(accumulatedEntity.observationContext(), accumulatedEntity.confirmationCallback().message());
                    });
                }
                return publish;
            }, i2);
        } else {
            byte code = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
            this.dynamicBatch = new DynamicBatch<>(list2 -> {
                ArrayList arrayList = new ArrayList();
                int i4 = 0;
                ProducerUtils.Batch batch = this.helper.batch(byteBufAllocator, code, compressionCodec, i);
                ProducerUtils.AccumulatedEntity accumulatedEntity = null;
                Iterator it = list2.iterator();
                while (it.hasNext()) {
                    ProducerUtils.AccumulatedEntity accumulatedEntity2 = (ProducerUtils.AccumulatedEntity) it.next();
                    accumulatedEntity = accumulatedEntity2;
                    batch.add((Codec.EncodedMessage) accumulatedEntity2.encodedEntity(), accumulatedEntity2.confirmationCallback());
                    i4++;
                    if (i4 == i) {
                        batch.time = accumulatedEntity.time();
                        batch.publishingId = accumulatedEntity.publishingId();
                        batch.encodedMessageBatch.close();
                        arrayList.add(batch);
                        accumulatedEntity = null;
                        batch = this.helper.batch(byteBufAllocator, code, compressionCodec, i);
                        i4 = 0;
                    }
                }
                if (!batch.isEmpty() && i4 < i) {
                    batch.time = accumulatedEntity.time();
                    batch.publishingId = accumulatedEntity.publishingId();
                    batch.encodedMessageBatch.close();
                    arrayList.add(batch);
                }
                boolean publish = publish(arrayList);
                if (publish && z) {
                    Iterator it2 = list2.iterator();
                    while (it2.hasNext()) {
                        ProducerUtils.AccumulatedEntity accumulatedEntity3 = (ProducerUtils.AccumulatedEntity) it2.next();
                        this.observationCollector.published(accumulatedEntity3.observationContext(), accumulatedEntity3.confirmationCallback().message());
                    }
                }
                return publish;
            }, i2 * i);
        }
    }

    @Override // com.rabbitmq.stream.impl.MessageAccumulator
    public void add(Message message, ConfirmationHandler confirmationHandler) {
        this.dynamicBatch.add(this.helper.entity(message, confirmationHandler));
    }

    @Override // com.rabbitmq.stream.impl.MessageAccumulator
    public int size() {
        return 0;
    }

    @Override // com.rabbitmq.stream.impl.MessageAccumulator
    public void flush(boolean z) {
    }

    private boolean publish(List<Object> list) {
        if (!this.producer.canSend()) {
            return false;
        }
        this.producer.publishInternal(list);
        return true;
    }

    @Override // com.rabbitmq.stream.impl.MessageAccumulator, java.lang.AutoCloseable
    public void close() {
        this.dynamicBatch.close();
    }
}
