package org.axonframework.axonserver.connector.event.axon;

import io.axoniq.axonserver.connector.event.PersistentStreamProperties;
import jakarta.annotation.Nonnull;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import org.axonframework.common.Registration;
import org.axonframework.config.LegacyConfiguration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.messaging.SubscribableMessageSource;

/* loaded from: input_file:org/axonframework/axonserver/connector/event/axon/PersistentStreamMessageSource.class */
public class PersistentStreamMessageSource implements SubscribableMessageSource<EventMessage<?>> {
    private final PersistentStreamConnection persistentStreamConnection;
    private final String name;
    private Consumer<List<? extends EventMessage<?>>> consumer;
    private static final Consumer<List<? extends EventMessage<?>>> NO_OP_CONSUMER = list -> {
    };

    public PersistentStreamMessageSource(String str, LegacyConfiguration legacyConfiguration, PersistentStreamProperties persistentStreamProperties, ScheduledExecutorService scheduledExecutorService, int i) {
        this(str, legacyConfiguration, persistentStreamProperties, scheduledExecutorService, i, null);
    }

    public PersistentStreamMessageSource(String str, LegacyConfiguration legacyConfiguration, PersistentStreamProperties persistentStreamProperties, ScheduledExecutorService scheduledExecutorService, int i, String str2) {
        this.consumer = NO_OP_CONSUMER;
        this.name = str;
        this.persistentStreamConnection = new PersistentStreamConnection(str, legacyConfiguration, persistentStreamProperties, scheduledExecutorService, i, str2);
    }

    public Registration subscribe(@Nonnull Consumer<List<? extends EventMessage<?>>> consumer) {
        synchronized (this) {
            if (this.consumer.equals(NO_OP_CONSUMER)) {
                this.persistentStreamConnection.open(consumer);
                this.consumer = consumer;
            } else if (!this.consumer.equals(consumer)) {
                throw new IllegalStateException(String.format("%s: Cannot subscribe to PersistentStreamMessageSource with another consumer: there is already an active subscription.", this.name));
            }
        }
        return () -> {
            synchronized (this) {
                this.persistentStreamConnection.close();
                this.consumer = NO_OP_CONSUMER;
            }
            return true;
        };
    }
}
