package org.somda.sdc.dpws.soap.wseventing;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.google.inject.name.Named;
import jakarta.xml.bind.JAXBException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.somda.sdc.common.logging.InstanceLogger;
import org.somda.sdc.common.util.ExecutorWrapperService;
import org.somda.sdc.dpws.CommunicationLogContext;
import org.somda.sdc.dpws.client.ClientEventObserver;
import org.somda.sdc.dpws.client.SubscriptionEvent;
import org.somda.sdc.dpws.factory.TransportBindingFactory;
import org.somda.sdc.dpws.guice.NetworkJobThreadPool;
import org.somda.sdc.dpws.http.HttpException;
import org.somda.sdc.dpws.http.HttpServerRegistry;
import org.somda.sdc.dpws.http.HttpUriBuilder;
import org.somda.sdc.dpws.soap.CommunicationContext;
import org.somda.sdc.dpws.soap.NotificationSink;
import org.somda.sdc.dpws.soap.RequestResponseClient;
import org.somda.sdc.dpws.soap.SoapConstants;
import org.somda.sdc.dpws.soap.SoapFaultHttpStatusCodeMapping;
import org.somda.sdc.dpws.soap.SoapMarshalling;
import org.somda.sdc.dpws.soap.SoapMessage;
import org.somda.sdc.dpws.soap.SoapUtil;
import org.somda.sdc.dpws.soap.exception.MalformedSoapMessageException;
import org.somda.sdc.dpws.soap.exception.SoapFaultException;
import org.somda.sdc.dpws.soap.factory.RequestResponseClientFactory;
import org.somda.sdc.dpws.soap.factory.SoapFaultFactory;
import org.somda.sdc.dpws.soap.model.Fault;
import org.somda.sdc.dpws.soap.wsaddressing.WsAddressingConstants;
import org.somda.sdc.dpws.soap.wsaddressing.WsAddressingUtil;
import org.somda.sdc.dpws.soap.wsaddressing.model.EndpointReferenceType;
import org.somda.sdc.dpws.soap.wseventing.exception.SubscriptionNotFoundException;
import org.somda.sdc.dpws.soap.wseventing.exception.SubscriptionRequestResponseClientNotFoundException;
import org.somda.sdc.dpws.soap.wseventing.factory.SubscriptionManagerFactory;
import org.somda.sdc.dpws.soap.wseventing.model.DeliveryType;
import org.somda.sdc.dpws.soap.wseventing.model.FilterType;
import org.somda.sdc.dpws.soap.wseventing.model.GetStatus;
import org.somda.sdc.dpws.soap.wseventing.model.GetStatusResponse;
import org.somda.sdc.dpws.soap.wseventing.model.ObjectFactory;
import org.somda.sdc.dpws.soap.wseventing.model.Renew;
import org.somda.sdc.dpws.soap.wseventing.model.RenewResponse;
import org.somda.sdc.dpws.soap.wseventing.model.Subscribe;
import org.somda.sdc.dpws.soap.wseventing.model.SubscribeResponse;
import org.somda.sdc.dpws.soap.wseventing.model.SubscriptionEnd;
import org.somda.sdc.dpws.soap.wseventing.model.Unsubscribe;
import org.somda.sdc.dpws.soap.wseventing.model.WsEventingStatus;

/* loaded from: input_file:org/somda/sdc/dpws/soap/wseventing/EventSinkImpl.class */
public class EventSinkImpl implements EventSink {
    private static final Logger LOG = LogManager.getLogger(EventSinkImpl.class);
    private static final String EVENT_SINK_CONTEXT_PREFIX = "/EventSink/";
    private static final String EVENT_SINK_NOTIFY_TO_CONTEXT_PREFIX = "/EventSink/NotifyTo/";
    private static final String EVENT_SINK_END_TO_CONTEXT_PREFIX = "/EventSink/EndTo/";
    private final RequestResponseClient requestResponseClient;
    private final TransportBindingFactory transportBindingFactory;
    private final RequestResponseClientFactory requestResponseClientFactory;
    private final String hostAddress;
    private final HttpServerRegistry httpServerRegistry;
    private final ObjectFactory wseFactory;
    private final WsAddressingUtil wsaUtil;
    private final SoapMarshalling marshalling;
    private final SoapUtil soapUtil;
    private final ExecutorWrapperService<ListeningExecutorService> executorService;
    private final SubscriptionManagerFactory subscriptionManagerFactory;
    private final Duration maxWaitForFutures;
    private final CommunicationLogContext communicationLogContext;
    private final Logger instanceLogger;
    private final SoapFaultFactory soapFaultFactory;
    private final ClientEventObserver clientEventObserver;
    private final Map<String, SubscriptionData> subscriptionData = new ConcurrentHashMap();
    private final Lock subscriptionsLock = new ReentrantLock();
    private final HttpUriBuilder httpUriBuilder = new HttpUriBuilder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/somda/sdc/dpws/soap/wseventing/EventSinkImpl$SubscriptionData.class */
    public static final class SubscriptionData extends Record {
        private final SinkSubscriptionManager subscriptionManager;
        private final RequestResponseClient requestResponseClient;

        private SubscriptionData(SinkSubscriptionManager sinkSubscriptionManager, RequestResponseClient requestResponseClient) {
            this.subscriptionManager = sinkSubscriptionManager;
            this.requestResponseClient = requestResponseClient;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, SubscriptionData.class), SubscriptionData.class, "subscriptionManager;requestResponseClient", "FIELD:Lorg/somda/sdc/dpws/soap/wseventing/EventSinkImpl$SubscriptionData;->subscriptionManager:Lorg/somda/sdc/dpws/soap/wseventing/SinkSubscriptionManager;", "FIELD:Lorg/somda/sdc/dpws/soap/wseventing/EventSinkImpl$SubscriptionData;->requestResponseClient:Lorg/somda/sdc/dpws/soap/RequestResponseClient;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, SubscriptionData.class), SubscriptionData.class, "subscriptionManager;requestResponseClient", "FIELD:Lorg/somda/sdc/dpws/soap/wseventing/EventSinkImpl$SubscriptionData;->subscriptionManager:Lorg/somda/sdc/dpws/soap/wseventing/SinkSubscriptionManager;", "FIELD:Lorg/somda/sdc/dpws/soap/wseventing/EventSinkImpl$SubscriptionData;->requestResponseClient:Lorg/somda/sdc/dpws/soap/RequestResponseClient;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, SubscriptionData.class, Object.class), SubscriptionData.class, "subscriptionManager;requestResponseClient", "FIELD:Lorg/somda/sdc/dpws/soap/wseventing/EventSinkImpl$SubscriptionData;->subscriptionManager:Lorg/somda/sdc/dpws/soap/wseventing/SinkSubscriptionManager;", "FIELD:Lorg/somda/sdc/dpws/soap/wseventing/EventSinkImpl$SubscriptionData;->requestResponseClient:Lorg/somda/sdc/dpws/soap/RequestResponseClient;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public SinkSubscriptionManager subscriptionManager() {
            return this.subscriptionManager;
        }

        public RequestResponseClient requestResponseClient() {
            return this.requestResponseClient;
        }
    }

    @AssistedInject
    EventSinkImpl(@Assisted RequestResponseClient requestResponseClient, @Assisted String str, @Assisted CommunicationLogContext communicationLogContext, @Assisted ClientEventObserver clientEventObserver, @Named("Dpws.MaxWaitForFutures") Duration duration, HttpServerRegistry httpServerRegistry, ObjectFactory objectFactory, WsAddressingUtil wsAddressingUtil, SoapMarshalling soapMarshalling, SoapUtil soapUtil, @NetworkJobThreadPool ExecutorWrapperService<ListeningExecutorService> executorWrapperService, SubscriptionManagerFactory subscriptionManagerFactory, @Named("Common.InstanceIdentifier") String str2, TransportBindingFactory transportBindingFactory, RequestResponseClientFactory requestResponseClientFactory, SoapFaultFactory soapFaultFactory) {
        this.instanceLogger = InstanceLogger.wrapLogger(LOG, str2);
        this.requestResponseClient = requestResponseClient;
        this.transportBindingFactory = transportBindingFactory;
        this.requestResponseClientFactory = requestResponseClientFactory;
        this.hostAddress = str;
        this.communicationLogContext = communicationLogContext;
        this.maxWaitForFutures = duration;
        this.httpServerRegistry = httpServerRegistry;
        this.wseFactory = objectFactory;
        this.wsaUtil = wsAddressingUtil;
        this.marshalling = soapMarshalling;
        this.soapUtil = soapUtil;
        this.executorService = executorWrapperService;
        this.subscriptionManagerFactory = subscriptionManagerFactory;
        this.soapFaultFactory = soapFaultFactory;
        this.clientEventObserver = clientEventObserver;
    }

    @Override // org.somda.sdc.dpws.soap.wseventing.EventSink
    public ListenableFuture<SubscribeResult> subscribe(String str, List<Object> list, Duration duration, NotificationSink notificationSink) {
        return this.executorService.get().submit(() -> {
            String uuid = UUID.randomUUID().toString();
            String registerContext = this.httpServerRegistry.registerContext(this.hostAddress, true, "/EventSink/EndTo/" + uuid, null, this.communicationLogContext, (inputStream, outputStream, communicationContext) -> {
                processIncomingEndTo(inputStream, outputStream, uuid);
            });
            String registerContext2 = this.httpServerRegistry.registerContext(this.hostAddress, true, "/EventSink/NotifyTo/" + uuid, null, this.communicationLogContext, (inputStream2, outputStream2, communicationContext2) -> {
                processIncomingNotification(notificationSink, inputStream2, outputStream2, communicationContext2, uuid);
            });
            Subscribe createSubscribe = this.wseFactory.createSubscribe();
            DeliveryType createDeliveryType = this.wseFactory.createDeliveryType();
            createDeliveryType.setMode(WsEventingConstants.SUPPORTED_DELIVERY_MODE);
            EndpointReferenceType createEprWithAddress = this.wsaUtil.createEprWithAddress(registerContext2);
            createDeliveryType.setContent(List.of(this.wseFactory.createNotifyTo(createEprWithAddress)));
            createSubscribe.setDelivery(createDeliveryType);
            EndpointReferenceType createEprWithAddress2 = this.wsaUtil.createEprWithAddress(registerContext);
            createSubscribe.setEndTo(createEprWithAddress2);
            FilterType createFilterType = this.wseFactory.createFilterType();
            createFilterType.setDialect(str);
            createFilterType.setContent(list);
            createSubscribe.setExpires(duration);
            createSubscribe.setFilter(createFilterType);
            SubscribeResponse subscribeResponse = (SubscribeResponse) this.soapUtil.getBody(this.requestResponseClient.sendRequestResponse(this.soapUtil.createMessage(WsEventingConstants.WSA_ACTION_SUBSCRIBE, createSubscribe)), SubscribeResponse.class).orElseThrow(() -> {
                return new MalformedSoapMessageException("Cannot read WS-Eventing Subscribe response");
            });
            SinkSubscriptionManager createSinkSubscriptionManager = this.subscriptionManagerFactory.createSinkSubscriptionManager(subscribeResponse.getSubscriptionManager(), subscribeResponse.getExpires(), createEprWithAddress, createEprWithAddress2, list, str, uuid);
            this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.Start(uuid));
            RequestResponseClient createRequestResponseClient = this.requestResponseClientFactory.createRequestResponseClient(this.transportBindingFactory.createTransportBinding(subscribeResponse.getSubscriptionManager().getAddress().getValue(), null));
            this.subscriptionsLock.lock();
            try {
                this.subscriptionData.put(uuid, new SubscriptionData(createSinkSubscriptionManager, createRequestResponseClient));
                this.subscriptionsLock.unlock();
                return new SubscribeResult(uuid, createSinkSubscriptionManager.getExpires());
            } catch (Throwable th) {
                this.subscriptionsLock.unlock();
                throw th;
            }
        });
    }

    @Override // org.somda.sdc.dpws.soap.wseventing.EventSink
    public ListenableFuture<Duration> renew(String str, Duration duration) {
        return this.executorService.get().submit(() -> {
            SinkSubscriptionManager subscriptionManagerProxy = getSubscriptionManagerProxy(str);
            RequestResponseClient subscriptionRequestResponseClient = getSubscriptionRequestResponseClient(str);
            Renew createRenew = this.wseFactory.createRenew();
            createRenew.setExpires(duration);
            Duration expires = ((RenewResponse) this.soapUtil.getBody(subscriptionRequestResponseClient.sendRequestResponse(this.soapUtil.createMessage(WsEventingConstants.WSA_ACTION_RENEW, this.wsaUtil.getAddressUri(subscriptionManagerProxy.getSubscriptionManagerEpr()).orElseThrow(() -> {
                return new RuntimeException("No subscription manager EPR found");
            }), createRenew, subscriptionManagerProxy.getSubscriptionManagerEpr().getReferenceParameters())), RenewResponse.class).orElseThrow(() -> {
                return new MalformedSoapMessageException("WS-Eventing RenewResponse message is malformed");
            })).getExpires();
            subscriptionManagerProxy.renew(expires);
            return expires;
        });
    }

    @Override // org.somda.sdc.dpws.soap.wseventing.EventSink
    public ListenableFuture<Duration> getStatus(String str) {
        return this.executorService.get().submit(() -> {
            SinkSubscriptionManager subscriptionManagerProxy = getSubscriptionManagerProxy(str);
            RequestResponseClient subscriptionRequestResponseClient = getSubscriptionRequestResponseClient(str);
            GetStatus createGetStatus = this.wseFactory.createGetStatus();
            return ((GetStatusResponse) this.soapUtil.getBody(subscriptionRequestResponseClient.sendRequestResponse(this.soapUtil.createMessage(WsEventingConstants.WSA_ACTION_GET_STATUS, this.wsaUtil.getAddressUri(subscriptionManagerProxy.getSubscriptionManagerEpr()).orElseThrow(() -> {
                return new RuntimeException("No subscription manager EPR found");
            }), createGetStatus, subscriptionManagerProxy.getSubscriptionManagerEpr().getReferenceParameters())), GetStatusResponse.class).orElseThrow(() -> {
                return new MalformedSoapMessageException("WS-Eventing GetStatusResponse message is malformed");
            })).getExpires();
        });
    }

    private SubscriptionData removeSubscriptionData(String str) {
        SubscriptionData removeSubscription = removeSubscription(str);
        if (removeSubscription == null) {
            throw new SubscriptionNotFoundException("Subscription with id " + str + " not found");
        }
        return removeSubscription;
    }

    @Override // org.somda.sdc.dpws.soap.wseventing.EventSink
    public ListenableFuture<?> unsubscribe(final String str) {
        LOG.debug("Unsubscribing for subscription {}", str);
        SubscriptionData removeSubscriptionData = removeSubscriptionData(str);
        final SinkSubscriptionManager sinkSubscriptionManager = removeSubscriptionData.subscriptionManager;
        RequestResponseClient requestResponseClient = removeSubscriptionData.requestResponseClient;
        ListenableFuture<?> submit = this.executorService.get().submit(() -> {
            Unsubscribe createUnsubscribe = this.wseFactory.createUnsubscribe();
            requestResponseClient.sendRequestResponse(this.soapUtil.createMessage(WsEventingConstants.WSA_ACTION_UNSUBSCRIBE, this.wsaUtil.getAddressUri(sinkSubscriptionManager.getSubscriptionManagerEpr()).orElseThrow(() -> {
                return new RuntimeException("No subscription manager EPR found");
            }), createUnsubscribe, sinkSubscriptionManager.getSubscriptionManagerEpr().getReferenceParameters()));
            unregisterContextPaths(sinkSubscriptionManager);
            return new Object();
        });
        Futures.addCallback(submit, new FutureCallback<Object>() { // from class: org.somda.sdc.dpws.soap.wseventing.EventSinkImpl.1
            public void onSuccess(Object obj) {
                EventSinkImpl.this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.End(str, null));
            }

            public void onFailure(Throwable th) {
                EventSinkImpl.this.unregisterContextPaths(sinkSubscriptionManager);
                EventSinkImpl.this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.Failed(str, "Error unsubscribing", th));
            }
        }, MoreExecutors.directExecutor());
        return submit;
    }

    @Override // org.somda.sdc.dpws.soap.wseventing.EventSink
    public void unsubscribeAll() {
        Iterator it = new ArrayList(this.subscriptionData.values()).iterator();
        while (it.hasNext()) {
            String subscriptionId = ((SubscriptionData) it.next()).subscriptionManager.getSubscriptionId();
            ListenableFuture<?> unsubscribe = unsubscribe(subscriptionId);
            try {
                unsubscribe.get(this.maxWaitForFutures.toSeconds(), TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException e) {
                this.instanceLogger.warn("Subscription {} could not be unsubscribed. Ignore.", subscriptionId);
                this.instanceLogger.trace("Subscription {} could not be unsubscribed", subscriptionId, e);
            } catch (TimeoutException e2) {
                this.instanceLogger.warn("Subscription {} could not be unsubscribed, timeout after {}s. Ignore.", subscriptionId, Long.valueOf(this.maxWaitForFutures.toSeconds()));
                this.instanceLogger.trace("Subscription {} could not be unsubscribed, timeout after {}s", subscriptionId, Long.valueOf(this.maxWaitForFutures.toSeconds()), e2);
                unsubscribe.cancel(true);
            }
        }
    }

    private void unregisterUri(URI uri) {
        this.httpServerRegistry.unregisterContext(this.httpUriBuilder.buildUri(uri.getScheme(), uri.getHost(), uri.getPort()), uri.getPath());
    }

    private void processIncomingEndTo(InputStream inputStream, OutputStream outputStream, String str) throws HttpException {
        try {
            SoapMessage createMessage = this.soapUtil.createMessage(this.marshalling.unmarshal(inputStream));
            this.instanceLogger.debug("Received message to EndTo address: {}", createMessage);
            try {
                processSubscriptionEnd((SubscriptionEnd) this.soapUtil.getBody(createMessage, SubscriptionEnd.class).orElseThrow(), str, outputStream);
            } catch (NoSuchElementException e) {
                handleInboundException(e, outputStream, "Inbound message to EndTo endpoint did not represent a SubscriptionEnd message", str);
                throw new HttpException(400, "Received message to EndTo address that is not a SubscriptionEnd");
            }
        } catch (JAXBException e2) {
            handleInboundException(e2, outputStream, "An error occurred during the processing of the inbound message", str);
        }
    }

    private void processSubscriptionEnd(SubscriptionEnd subscriptionEnd, String str, OutputStream outputStream) throws HttpException {
        try {
            WsEventingStatus fromString = WsEventingStatus.fromString(subscriptionEnd.getStatus());
            try {
                unregisterContextPaths(removeSubscriptionData(str).subscriptionManager);
                this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.End(str, fromString));
                closeOutputStream(outputStream, str);
            } catch (SubscriptionNotFoundException e) {
                this.instanceLogger.error("Error processing incoming message", e);
                this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.Failed(str, "Processing incoming message on EndTo led to unknown subscription", e));
            }
        } catch (IllegalArgumentException e2) {
            this.instanceLogger.error("Incoming SubscriptionEnd presented an unknown status: {}", subscriptionEnd.getStatus(), e2);
            this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.Failed(str, "Processing incoming message on EndTo found an unknown status", e2));
        }
    }

    private void unregisterContextPaths(SinkSubscriptionManager sinkSubscriptionManager) {
        Optional map = sinkSubscriptionManager.getEndTo().map(endpointReferenceType -> {
            return endpointReferenceType.getAddress().getValue();
        }).map(URI::create);
        URI create = URI.create(sinkSubscriptionManager.getNotifyTo().getAddress().getValue());
        map.ifPresent(this::unregisterUri);
        unregisterUri(create);
    }

    private void closeOutputStream(OutputStream outputStream, String str) throws HttpException {
        try {
            outputStream.close();
        } catch (IOException e) {
            this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.MessageProcessingFailed(str, "Closing the output stream for the response triggered an IOException", e));
            throw new HttpException(500, "Error closing output stream");
        }
    }

    private void processIncomingNotification(NotificationSink notificationSink, InputStream inputStream, OutputStream outputStream, CommunicationContext communicationContext, String str) throws HttpException {
        try {
            SoapMessage createMessage = this.soapUtil.createMessage(this.marshalling.unmarshal(inputStream));
            this.instanceLogger.debug("Received incoming notification {}", createMessage);
            try {
                notificationSink.receiveNotification(createMessage, communicationContext);
                closeOutputStream(outputStream, str);
            } catch (SoapFaultException e) {
                this.instanceLogger.error("Error processing incoming message", e);
                sendFaultResponse(e.getFaultMessage(), outputStream);
                this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.MessageProcessingFailed(str, "Processing incoming message triggered SOAP fault", e));
                throw new HttpException(SoapFaultHttpStatusCodeMapping.get(e.getFault()));
            } catch (Exception e2) {
                this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.MessageProcessingFailed(str, "Processing incoming message triggered exception", e2));
                throw new HttpException(500, extractExceptionMessage(e2));
            }
        } catch (JAXBException e3) {
            handleInboundException(e3, outputStream, "An error occurred during the processing of the inbound message", str);
        }
    }

    private String extractExceptionMessage(Exception exc) {
        return exc.getMessage() != null ? exc.getMessage() : exc.toString();
    }

    private void handleInboundException(Exception exc, OutputStream outputStream, String str, String str2) throws HttpException {
        LOG.error("Error processing incoming message for subscription {}", str2, exc);
        this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.MessageProcessingFailed(str2, str, exc));
        SoapMessage createFault = this.soapFaultFactory.createFault(WsAddressingConstants.FAULT_ACTION, SoapConstants.SENDER, SoapConstants.DEFAULT_SUBCODE, str);
        sendFaultResponse(createFault, outputStream);
        throw new HttpException(SoapFaultHttpStatusCodeMapping.get((Fault) this.soapUtil.getBody(createFault, Fault.class).orElseThrow()));
    }

    private void sendFaultResponse(SoapMessage soapMessage, OutputStream outputStream) throws HttpException {
        try {
            try {
                this.marshalling.marshal(soapMessage.getEnvelopeWithMappedHeaders(), outputStream);
            } finally {
                try {
                    outputStream.close();
                } catch (IOException e) {
                    this.instanceLogger.error("Error closing output stream while sending SOAP fault", e);
                }
            }
        } catch (JAXBException e2) {
            throw new HttpException(500, extractExceptionMessage(e2));
        }
    }

    private SinkSubscriptionManager getSubscriptionManagerProxy(String str) {
        this.subscriptionsLock.lock();
        try {
            return ((SubscriptionData) Optional.ofNullable(this.subscriptionData.get(str)).orElseThrow(SubscriptionNotFoundException::new)).subscriptionManager;
        } finally {
            this.subscriptionsLock.unlock();
        }
    }

    private RequestResponseClient getSubscriptionRequestResponseClient(String str) {
        this.subscriptionsLock.lock();
        try {
            return ((SubscriptionData) Optional.ofNullable(this.subscriptionData.get(str)).orElseThrow(SubscriptionRequestResponseClientNotFoundException::new)).requestResponseClient;
        } finally {
            this.subscriptionsLock.unlock();
        }
    }

    private SubscriptionData removeSubscription(String str) {
        this.subscriptionsLock.lock();
        try {
            return this.subscriptionData.remove(str);
        } finally {
            this.subscriptionsLock.unlock();
        }
    }
}
