package io.debezium.pipeline.signal;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.pipeline.signal.actions.SignalAction;
import io.debezium.pipeline.signal.channels.SignalChannelReader;
import io.debezium.pipeline.signal.channels.SourceSignalChannel;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.util.Threads;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/pipeline/signal/SignalProcessor.class */
public class SignalProcessor<P extends Partition, O extends OffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SignalProcessor.class);
    public static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(90);
    public static final int SEMAPHORE_WAIT_TIME = 10;
    private final CommonConnectorConfig connectorConfig;
    private final List<SignalChannelReader> signalChannelReaders;
    private final ScheduledExecutorService signalProcessorExecutor;
    private final DocumentReader documentReader;
    private Offsets<P, O> previousOffsets;
    private final Map<String, SignalAction<P>> signalActions = new HashMap();
    private final Semaphore semaphore = new Semaphore(1);
    private final List<SignalChannelReader> enabledChannelReaders = getEnabledChannelReaders();

    public SignalProcessor(Class<? extends SourceConnector> cls, CommonConnectorConfig commonConnectorConfig, Map<String, SignalAction<P>> map, List<SignalChannelReader> list, DocumentReader documentReader, Offsets<P, O> offsets) {
        this.connectorConfig = commonConnectorConfig;
        this.signalChannelReaders = list;
        this.documentReader = documentReader;
        this.previousOffsets = offsets;
        this.signalProcessorExecutor = Threads.newSingleThreadScheduledExecutor(cls, commonConnectorConfig.getLogicalName(), SignalProcessor.class.getSimpleName(), false);
        this.enabledChannelReaders.forEach(signalChannelReader -> {
            signalChannelReader.init(this.connectorConfig);
        });
        this.signalActions.putAll(map);
    }

    private Predicate<SignalChannelReader> isEnabled() {
        return signalChannelReader -> {
            return this.connectorConfig.getEnabledChannels().contains(signalChannelReader.name());
        };
    }

    private List<SignalChannelReader> getEnabledChannelReaders() {
        return (List) this.signalChannelReaders.stream().filter(isEnabled()).collect(Collectors.toList());
    }

    public void setContext(O o) {
        this.previousOffsets = Offsets.of(Collections.singletonMap(this.previousOffsets.getTheOnlyPartition(), o));
    }

    public void start() {
        LOGGER.info("SignalProcessor started. Scheduling it every {}ms", Long.valueOf(this.connectorConfig.getSignalPollInterval().toMillis()));
        this.signalProcessorExecutor.scheduleAtFixedRate(this::process, 0L, this.connectorConfig.getSignalPollInterval().toMillis(), TimeUnit.MILLISECONDS);
    }

    public void stop() throws InterruptedException {
        this.signalProcessorExecutor.submit(() -> {
            this.enabledChannelReaders.forEach((v0) -> {
                v0.close();
            });
        });
        this.signalProcessorExecutor.shutdown();
        if (!this.signalProcessorExecutor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {
            LOGGER.warn("SignalProcessor didn't stop in the expected time, shutting down executor now");
            Thread.interrupted();
            this.signalProcessorExecutor.shutdownNow();
            this.signalProcessorExecutor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        }
        LOGGER.info("SignalProcessor stopped");
    }

    public void registerSignalAction(String str, SignalAction<P> signalAction) {
        LOGGER.debug("Registering signal '{}' using class '{}'", str, signalAction.getClass().getName());
        this.signalActions.put(str, signalAction);
    }

    public void process() {
        executeWithSemaphore(() -> {
            LOGGER.trace("SignalProcessor processing");
            this.enabledChannelReaders.stream().map((v0) -> {
                return v0.read();
            }).flatMap((v0) -> {
                return v0.stream();
            }).forEach(this::processSignal);
        });
    }

    public void processSourceSignal() {
        executeWithSemaphore(() -> {
            LOGGER.trace("Processing source signals");
            this.enabledChannelReaders.stream().filter(isSignal(SourceSignalChannel.class)).map((v0) -> {
                return v0.read();
            }).flatMap((v0) -> {
                return v0.stream();
            }).forEach(this::processSignal);
        });
    }

    private void executeWithSemaphore(Runnable runnable) {
        boolean z = false;
        try {
            try {
                z = this.semaphore.tryAcquire(10L, TimeUnit.SECONDS);
                runnable.run();
                if (z) {
                    this.semaphore.release();
                }
            } catch (InterruptedException e) {
                LOGGER.error("Not able to acquire semaphore after {}s", (Object) 10);
                throw new DebeziumException("Not able to acquire semaphore during signaling processing", e);
            }
        } catch (Throwable th) {
            if (z) {
                this.semaphore.release();
            }
            throw th;
        }
    }

    private void processSignal(SignalRecord signalRecord) {
        LOGGER.debug("Signal Processor offset context {}", this.previousOffsets.getOffsets());
        LOGGER.debug("Received signal id = '{}', type = '{}', data = '{}'", signalRecord.getId(), signalRecord.getType(), signalRecord.getData());
        SignalAction<P> signalAction = this.signalActions.get(signalRecord.getType());
        if (signalAction == null) {
            LOGGER.warn("Signal '{}' has been received but the type '{}' is not recognized", signalRecord.getId(), signalRecord.getType());
            return;
        }
        try {
            signalAction.arrived(new SignalPayload<>(this.previousOffsets.getTheOnlyPartition(), signalRecord.getId(), signalRecord.getType(), (signalRecord.getData() == null || signalRecord.getData().isEmpty()) ? Document.create() : this.documentReader.read(signalRecord.getData()), this.previousOffsets.getTheOnlyOffset(), signalRecord.getAdditionalData()));
        } catch (IOException e) {
            LOGGER.warn("Signal '{}' has been received but the data '{}' cannot be parsed", signalRecord.getId(), signalRecord.getData(), e);
        } catch (InterruptedException e2) {
            LOGGER.warn("Action {} has been interrupted. The signal {} may not have been processed.", signalRecord.getType(), signalRecord);
            Thread.currentThread().interrupt();
        } catch (Exception e3) {
            LOGGER.warn("Action {} failed. The signal {} may not have been processed.", signalRecord.getType(), signalRecord, e3);
        }
    }

    public <T extends SignalChannelReader> T getSignalChannel(Class<T> cls) {
        return cls.cast(this.signalChannelReaders.stream().filter(isSignal(cls)).findFirst().get());
    }

    private static <T extends SignalChannelReader> Predicate<SignalChannelReader> isSignal(Class<T> cls) {
        return signalChannelReader -> {
            return signalChannelReader.getClass().equals(cls);
        };
    }
}
