package org.axonframework.eventhandling;

import jakarta.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.LegacyUnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.axonframework.tracing.NoOpSpanFactory;
import org.axonframework.tracing.Span;
import org.axonframework.tracing.SpanScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventhandling/AbstractEventBus.class */
public abstract class AbstractEventBus implements EventBus {
    private static final Logger logger = LoggerFactory.getLogger(AbstractEventBus.class);
    private final MessageMonitor<? super EventMessage<?>> messageMonitor;
    private final String eventsKey = String.valueOf(this) + "_EVENTS";
    private final Set<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArraySet();
    private final Set<MessageDispatchInterceptor<? super EventMessage<?>>> dispatchInterceptors = new CopyOnWriteArraySet();
    private final EventBusSpanFactory spanFactory;

    /* loaded from: input_file:org/axonframework/eventhandling/AbstractEventBus$Builder.class */
    public static abstract class Builder {
        private MessageMonitor<? super EventMessage<?>> messageMonitor = NoOpMessageMonitor.INSTANCE;
        private EventBusSpanFactory spanFactory = DefaultEventBusSpanFactory.builder().spanFactory(NoOpSpanFactory.INSTANCE).build();

        public Builder messageMonitor(@Nonnull MessageMonitor<? super EventMessage<?>> messageMonitor) {
            BuilderUtils.assertNonNull(messageMonitor, "MessageMonitor may not be null");
            this.messageMonitor = messageMonitor;
            return this;
        }

        public Builder spanFactory(@Nonnull EventBusSpanFactory eventBusSpanFactory) {
            BuilderUtils.assertNonNull(eventBusSpanFactory, "SpanFactory may not be null");
            this.spanFactory = eventBusSpanFactory;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void validate() throws AxonConfigurationException {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractEventBus(Builder builder) {
        builder.validate();
        this.messageMonitor = builder.messageMonitor;
        this.spanFactory = builder.spanFactory;
    }

    @Override // org.axonframework.messaging.SubscribableMessageSource
    public Registration subscribe(@Nonnull Consumer<List<? extends EventMessage<?>>> consumer) {
        if (!this.eventProcessors.add(consumer)) {
            logger.info("EventProcessor [{}] not added. It was already subscribed", consumer);
        } else if (logger.isDebugEnabled()) {
            logger.debug("EventProcessor [{}] subscribed successfully", consumer);
        }
        return () -> {
            if (!this.eventProcessors.remove(consumer)) {
                logger.info("EventListener {} not removed. It was already unsubscribed", consumer);
                return false;
            }
            if (!logger.isDebugEnabled()) {
                return true;
            }
            logger.debug("EventListener {} unsubscribed successfully", consumer);
            return true;
        };
    }

    @Override // org.axonframework.messaging.MessageDispatchInterceptorSupport
    public Registration registerDispatchInterceptor(@Nonnull MessageDispatchInterceptor<? super EventMessage<?>> messageDispatchInterceptor) {
        this.dispatchInterceptors.add(messageDispatchInterceptor);
        return () -> {
            return this.dispatchInterceptors.remove(messageDispatchInterceptor);
        };
    }

    @Override // org.axonframework.eventhandling.EventBus
    public void publish(@Nonnull List<? extends EventMessage<?>> list) {
        List list2 = (List) list.stream().map(eventMessage -> {
            return (EventMessage) this.spanFactory.createPublishEventSpan(eventMessage).runSupplier(() -> {
                return this.spanFactory.propagateContext(eventMessage);
            });
        }).collect(Collectors.toList());
        Stream stream = list2.stream();
        MessageMonitor<? super EventMessage<?>> messageMonitor = this.messageMonitor;
        Objects.requireNonNull(messageMonitor);
        List list3 = (List) stream.map((v1) -> {
            return r1.onMessageIngested(v1);
        }).collect(Collectors.toList());
        if (!CurrentUnitOfWork.isStarted()) {
            this.spanFactory.createCommitEventsSpan().run(() -> {
                try {
                    prepareCommit(intercept(list2));
                    commit(list2);
                    afterCommit(list2);
                    list3.forEach((v0) -> {
                        v0.reportSuccess();
                    });
                } catch (Exception e) {
                    list3.forEach(monitorCallback -> {
                        monitorCallback.reportFailure(e);
                    });
                    throw e;
                }
            });
            return;
        }
        LegacyUnitOfWork<?> legacyUnitOfWork = CurrentUnitOfWork.get();
        Assert.state(!legacyUnitOfWork.phase().isAfter(LegacyUnitOfWork.Phase.PREPARE_COMMIT), () -> {
            return "It is not allowed to publish events when the current Unit of Work has already been committed. Please start a new Unit of Work before publishing events.";
        });
        Assert.state(!legacyUnitOfWork.root().phase().isAfter(LegacyUnitOfWork.Phase.PREPARE_COMMIT), () -> {
            return "It is not allowed to publish events when the root Unit of Work has already been committed.";
        });
        legacyUnitOfWork.afterCommit(legacyUnitOfWork2 -> {
            list3.forEach((v0) -> {
                v0.reportSuccess();
            });
        });
        legacyUnitOfWork.onRollback(legacyUnitOfWork3 -> {
            list3.forEach(monitorCallback -> {
                monitorCallback.reportFailure(legacyUnitOfWork3.getExecutionResult().getExceptionResult());
            });
        });
        eventsQueue(legacyUnitOfWork).addAll(list2);
    }

    private List<EventMessage<?>> eventsQueue(LegacyUnitOfWork<?> legacyUnitOfWork) {
        return (List) legacyUnitOfWork.getOrComputeResource(this.eventsKey, str -> {
            Span createCommitEventsSpan = this.spanFactory.createCommitEventsSpan();
            ArrayList arrayList = new ArrayList();
            legacyUnitOfWork.onPrepareCommit(legacyUnitOfWork2 -> {
                createCommitEventsSpan.start();
                SpanScope makeCurrent = createCommitEventsSpan.makeCurrent();
                try {
                    if (!legacyUnitOfWork2.parent().isPresent() || legacyUnitOfWork2.parent().get().phase().isAfter(LegacyUnitOfWork.Phase.PREPARE_COMMIT)) {
                        int size = arrayList.size();
                        doWithEvents(this::prepareCommit, intercept(arrayList));
                        while (size < arrayList.size()) {
                            List<? extends EventMessage<?>> intercept = intercept(arrayList.subList(size, arrayList.size()));
                            size = arrayList.size();
                            doWithEvents(this::prepareCommit, intercept);
                        }
                    } else {
                        eventsQueue(legacyUnitOfWork2.parent().get()).addAll(arrayList);
                    }
                    if (makeCurrent != null) {
                        makeCurrent.close();
                    }
                } catch (Throwable th) {
                    if (makeCurrent != null) {
                        try {
                            makeCurrent.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            });
            legacyUnitOfWork.onCommit(legacyUnitOfWork3 -> {
                SpanScope makeCurrent = createCommitEventsSpan.makeCurrent();
                try {
                    if (!legacyUnitOfWork3.parent().isPresent() || legacyUnitOfWork3.root().phase().isAfter(LegacyUnitOfWork.Phase.COMMIT)) {
                        doWithEvents(this::commit, arrayList);
                    } else {
                        legacyUnitOfWork3.root().onCommit(legacyUnitOfWork3 -> {
                            doWithEvents(this::commit, arrayList);
                        });
                    }
                    if (makeCurrent != null) {
                        makeCurrent.close();
                    }
                } catch (Throwable th) {
                    if (makeCurrent != null) {
                        try {
                            makeCurrent.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            });
            legacyUnitOfWork.afterCommit(legacyUnitOfWork4 -> {
                SpanScope makeCurrent = createCommitEventsSpan.makeCurrent();
                try {
                    if (!legacyUnitOfWork4.parent().isPresent() || legacyUnitOfWork4.root().phase().isAfter(LegacyUnitOfWork.Phase.AFTER_COMMIT)) {
                        doWithEvents(this::afterCommit, arrayList);
                    } else {
                        legacyUnitOfWork4.root().afterCommit(legacyUnitOfWork4 -> {
                            doWithEvents(this::afterCommit, arrayList);
                        });
                    }
                    if (makeCurrent != null) {
                        makeCurrent.close();
                    }
                } catch (Throwable th) {
                    if (makeCurrent != null) {
                        try {
                            makeCurrent.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            });
            legacyUnitOfWork.onCleanup(legacyUnitOfWork5 -> {
                legacyUnitOfWork5.resources().remove(this.eventsKey);
                createCommitEventsSpan.end();
            });
            return arrayList;
        });
    }

    protected List<EventMessage<?>> queuedMessages() {
        if (!CurrentUnitOfWork.isStarted()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        addStagedMessages(CurrentUnitOfWork.get(), arrayList);
        return arrayList;
    }

    private void addStagedMessages(LegacyUnitOfWork<?> legacyUnitOfWork, List<EventMessage<?>> list) {
        legacyUnitOfWork.parent().ifPresent(legacyUnitOfWork2 -> {
            addStagedMessages(legacyUnitOfWork2, list);
        });
        if (legacyUnitOfWork.isRolledBack()) {
            return;
        }
        for (EventMessage<?> eventMessage : (List) legacyUnitOfWork.getOrDefaultResource(this.eventsKey, Collections.emptyList())) {
            if (!list.contains(eventMessage)) {
                list.add(eventMessage);
            }
        }
    }

    protected List<? extends EventMessage<?>> intercept(List<? extends EventMessage<?>> list) {
        ArrayList arrayList = new ArrayList(list);
        Iterator<MessageDispatchInterceptor<? super EventMessage<?>>> it = this.dispatchInterceptors.iterator();
        while (it.hasNext()) {
            BiFunction<Integer, ? super EventMessage<?>, ? super EventMessage<?>> handle = it.next().handle((List<? extends Object>) arrayList);
            for (int i = 0; i < arrayList.size(); i++) {
                arrayList.set(i, handle.apply(Integer.valueOf(i), arrayList.get(i)));
            }
        }
        return arrayList;
    }

    private void doWithEvents(Consumer<List<? extends EventMessage<?>>> consumer, List<? extends EventMessage<?>> list) {
        consumer.accept(list);
    }

    protected void prepareCommit(List<? extends EventMessage<?>> list) {
        this.eventProcessors.forEach(consumer -> {
            consumer.accept(list);
        });
    }

    protected void commit(List<? extends EventMessage<?>> list) {
    }

    protected void afterCommit(List<? extends EventMessage<?>> list) {
    }
}
