package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/axonframework/axonserver/connector/query/StreamableFluxResponse.class */
class StreamableFluxResponse implements StreamableResponse {
    private final Subscription subscription;

    /* loaded from: input_file:org/axonframework/axonserver/connector/query/StreamableFluxResponse$SendingSubscriber.class */
    private static class SendingSubscriber extends BaseSubscriber<QueryResponse> {
        private final ReplyChannel<QueryResponse> responseHandler;
        private final String clientId;
        private final String requestId;

        public SendingSubscriber(ReplyChannel<QueryResponse> replyChannel, String str, String str2) {
            this.responseHandler = replyChannel;
            this.clientId = str;
            this.requestId = str2;
        }

        protected void hookOnSubscribe(Subscription subscription) {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(QueryResponse queryResponse) {
            this.responseHandler.send(queryResponse);
        }

        protected void hookOnComplete() {
            this.responseHandler.complete();
        }

        protected void hookOnError(Throwable th) {
            this.responseHandler.sendLast(QueryResponse.newBuilder().setErrorCode(ErrorCode.getQueryExecutionErrorCode(th).errorCode()).setErrorMessage(ExceptionSerializer.serialize(this.clientId, th)).setRequestIdentifier(this.requestId).build());
        }

        protected void hookOnCancel() {
            this.responseHandler.complete();
        }
    }

    public <T> StreamableFluxResponse(Flux<QueryResponseMessage<T>> flux, ReplyChannel<QueryResponse> replyChannel, QuerySerializer querySerializer, String str, String str2) {
        SendingSubscriber sendingSubscriber = new SendingSubscriber(replyChannel, str2, str);
        this.subscription = sendingSubscriber;
        flux.map(queryResponseMessage -> {
            return querySerializer.serializeResponse(queryResponseMessage, str);
        }).subscribeWith(sendingSubscriber);
    }

    public void request(long j) {
        this.subscription.request(j);
    }

    public void cancel() {
        this.subscription.cancel();
    }
}
