package com.networknt.mesh.kafka;

import com.networknt.client.Http2Client;
import com.networknt.client.simplepool.SimpleConnectionHolder;
import com.networknt.config.Config;
import com.networknt.kafka.common.KafkaKsqldbConfig;
import io.confluent.ksql.api.client.Row;
import io.undertow.UndertowOptions;
import io.undertow.client.ClientConnection;
import io.undertow.client.ClientRequest;
import io.undertow.client.ClientResponse;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xnio.OptionMap;

/* loaded from: input_file:com/networknt/mesh/kafka/RowSubscriber.class */
public class RowSubscriber implements Subscriber<Row> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) RowSubscriber.class);
    static Http2Client client = Http2Client.getInstance();
    private ClientConnection connection;
    private Subscription subscription;
    private KafkaKsqldbConfig config = (KafkaKsqldbConfig) Config.getInstance().getJsonObjectConfig(KafkaKsqldbConfig.CONFIG_NAME, KafkaKsqldbConfig.class);

    @Override // org.reactivestreams.Subscriber
    public synchronized void onSubscribe(Subscription subscription) {
        logger.info("Subscriber is subscribed.");
        this.subscription = subscription;
        subscription.request(1L);
    }

    @Override // org.reactivestreams.Subscriber
    public synchronized void onNext(Row row) {
        logger.info("Received a row!");
        logger.info("Row: {}", row.values());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference<ClientResponse> atomicReference = new AtomicReference<>();
        SimpleConnectionHolder.ConnectionToken connectionToken = null;
        try {
            try {
                connectionToken = this.config.getBackendUrl().startsWith("https") ? client.borrow(new URI(this.config.getBackendUrl()), Http2Client.WORKER, client.getDefaultXnioSsl(), Http2Client.BUFFER_POOL, OptionMap.create(UndertowOptions.ENABLE_HTTP2, true)) : client.borrow(new URI(this.config.getBackendUrl()), Http2Client.WORKER, Http2Client.BUFFER_POOL, OptionMap.EMPTY);
                ClientConnection clientConnection = (ClientConnection) connectionToken.getRawConnection();
                ClientRequest path = new ClientRequest().setMethod(Methods.POST).setPath(this.config.getBackendPath());
                path.getRequestHeaders().put(Headers.CONTENT_TYPE, "application/json");
                path.getRequestHeaders().put(Headers.TRANSFER_ENCODING, "chunked");
                clientConnection.sendRequest(path, client.createClientCallback(atomicReference, countDownLatch, row.values().toString()));
                countDownLatch.await();
                int responseCode = atomicReference.get().getResponseCode();
                String str = (String) atomicReference.get().getAttachment(Http2Client.RESPONSE_BODY);
                logger.debug("statusCode = {}", Integer.valueOf(responseCode));
                logger.debug("body = {}", str);
                client.restore(connectionToken);
            } catch (Exception e) {
                logger.error("exception thrown in onNext send request", (Throwable) e);
                client.restore(connectionToken);
            }
            this.subscription.request(1L);
        } catch (Throwable th) {
            client.restore(connectionToken);
            throw th;
        }
    }

    @Override // org.reactivestreams.Subscriber
    public synchronized void onError(Throwable th) {
        logger.info("Received an error: ", th);
    }

    @Override // org.reactivestreams.Subscriber
    public synchronized void onComplete() {
        logger.info("Query has ended.");
    }
}
