package io.openlineage.client.transports;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import lombok.Generated;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openlineage/client/transports/CompositeTransport.class */
public class CompositeTransport extends Transport {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) CompositeTransport.class);
    private final CompositeConfig config;
    private final List<Transport> transports = new ArrayList();
    private final Optional<ExecutorService> executorService;

    public CompositeTransport(@NonNull CompositeConfig compositeConfig) {
        if (compositeConfig == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        this.config = compositeConfig;
        initializeTransports();
        if (compositeConfig.getWithThreadPool().booleanValue()) {
            this.executorService = Optional.of(Executors.newFixedThreadPool(this.transports.size()));
        } else {
            this.executorService = Optional.empty();
        }
    }

    private void initializeTransports() {
        Iterator<TransportConfig> it = this.config.getTransports().iterator();
        while (it.hasNext()) {
            this.transports.add(TransportResolver.resolveTransportByConfig(it.next()));
        }
    }

    public List<Transport> getTransports() {
        return this.transports;
    }

    @Override // io.openlineage.client.transports.Transport
    public void emit(@NonNull OpenLineage.RunEvent runEvent) {
        if (runEvent == null) {
            throw new NullPointerException("runEvent is marked non-null but is null");
        }
        doEmit(runEvent);
    }

    @Override // io.openlineage.client.transports.Transport
    public void emit(@NonNull OpenLineage.DatasetEvent datasetEvent) {
        if (datasetEvent == null) {
            throw new NullPointerException("datasetEvent is marked non-null but is null");
        }
        doEmit(datasetEvent);
    }

    @Override // io.openlineage.client.transports.Transport
    public void emit(@NonNull OpenLineage.JobEvent jobEvent) {
        if (jobEvent == null) {
            throw new NullPointerException("jobEvent is marked non-null but is null");
        }
        doEmit(jobEvent);
    }

    private void doEmit(OpenLineage.BaseEvent baseEvent) {
        if (!this.config.getContinueOnFailure().booleanValue()) {
            Iterator<Transport> it = this.transports.iterator();
            while (it.hasNext()) {
                emit(it.next(), baseEvent);
            }
            return;
        }
        ExecutorService orElse = this.executorService.orElse(Executors.newFixedThreadPool(this.transports.size()));
        try {
            try {
                orElse.invokeAll((Collection) this.transports.stream().map(transport -> {
                    return () -> {
                        emit(transport, baseEvent);
                        return null;
                    };
                }).collect(Collectors.toList())).forEach(future -> {
                    try {
                        future.get();
                    } catch (InterruptedException | ExecutionException e) {
                    }
                });
                if (this.config.getWithThreadPool().booleanValue()) {
                    return;
                }
                orElse.shutdown();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            if (!this.config.getWithThreadPool().booleanValue()) {
                orElse.shutdown();
            }
            throw th;
        }
    }

    private void emit(Transport transport, OpenLineage.BaseEvent baseEvent) {
        try {
            if (baseEvent instanceof OpenLineage.RunEvent) {
                transport.emit((OpenLineage.RunEvent) baseEvent);
            } else if (baseEvent instanceof OpenLineage.DatasetEvent) {
                transport.emit((OpenLineage.DatasetEvent) baseEvent);
            } else {
                if (!(baseEvent instanceof OpenLineage.JobEvent)) {
                    throw new IllegalArgumentException("Unsupported event type: " + baseEvent.getClass().getName());
                }
                transport.emit((OpenLineage.JobEvent) baseEvent);
            }
        } catch (Exception e) {
            throw new RuntimeException("Transport " + transport.getClass().getSimpleName() + " failed to emit event", e);
        }
    }

    @Override // io.openlineage.client.transports.Transport, java.lang.AutoCloseable
    public void close() throws Exception {
        this.executorService.ifPresent((v0) -> {
            v0.shutdown();
        });
        this.transports.forEach(transport -> {
            try {
                transport.close();
            } catch (Exception e) {
                log.error("Failed to close {} transport", transport.getClass().getSimpleName(), e);
                throw new OpenLineageClientException(e);
            }
        });
    }
}
