package io.confluent.ksql.api.client.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.reactive.BaseSubscriber;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.util.Objects;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/confluent/ksql/api/client/impl/StreamInsertsSubscriber.class */
public class StreamInsertsSubscriber extends BaseSubscriber<KsqlObject> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StreamInsertsSubscriber.class);
    private static final int REQUEST_BATCH_SIZE = 200;
    private final HttpClientRequest httpRequest;
    private int outstandingTokens;
    private boolean drainHandlerSet;

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public StreamInsertsSubscriber(Context context, HttpClientRequest httpClientRequest) {
        super(context);
        this.httpRequest = (HttpClientRequest) Objects.requireNonNull(httpClientRequest);
    }

    @Override // io.confluent.ksql.reactive.BaseSubscriber
    protected void afterSubscribe(Subscription subscription) {
        checkRequest();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.ksql.reactive.BaseSubscriber
    public void handleValue(KsqlObject ksqlObject) {
        this.httpRequest.writeCustomFrame(0, 0, Buffer.buffer().appendString(ksqlObject.toJsonString()).appendString("\n"));
        this.outstandingTokens--;
        if (this.httpRequest.writeQueueFull()) {
            if (this.drainHandlerSet) {
                checkRequest();
            } else {
                this.httpRequest.drainHandler(this::httpRequestReceptive);
                this.drainHandlerSet = true;
            }
        }
    }

    @Override // io.confluent.ksql.reactive.BaseSubscriber
    protected void handleComplete() {
        this.httpRequest.end();
    }

    @Override // io.confluent.ksql.reactive.BaseSubscriber
    protected void handleError(Throwable th) {
        log.error("Received error from streamInserts() publisher. Ending connection.", th);
        this.httpRequest.end();
    }

    private void checkRequest() {
        if (this.outstandingTokens == 0) {
            this.outstandingTokens = 200;
            makeRequest(200L);
        }
    }

    private void httpRequestReceptive(Void r4) {
        this.drainHandlerSet = false;
        checkRequest();
    }
}
