package com.networknt.mesh.kafka.service;

import com.networknt.client.ClientConfig;
import com.networknt.kafka.entity.KsqlDbPullQueryRequest;
import com.networknt.mesh.kafka.KsqldbActiveConsumerStartupHook;
import io.confluent.ksql.api.client.BatchedQueryResult;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.exception.KsqlException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/mesh/kafka/service/KsqlDBQueryServiceImpl.class */
public class KsqlDBQueryServiceImpl implements KsqlDBQueryService {
    private static Logger logger = LoggerFactory.getLogger((Class<?>) KsqlDBQueryService.class);

    @Override // com.networknt.mesh.kafka.service.KsqlDBQueryService
    public List<Map<String, Object>> executeQuery(KsqlDbPullQueryRequest ksqlDbPullQueryRequest) {
        BatchedQueryResult executeQuery = ksqlDbPullQueryRequest.getProperties().size() > 0 ? KsqldbActiveConsumerStartupHook.client.executeQuery(ksqlDbPullQueryRequest.getQuery(), ksqlDbPullQueryRequest.getProperties()) : KsqldbActiveConsumerStartupHook.client.executeQuery(ksqlDbPullQueryRequest.getQuery());
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        if (KsqlDbPullQueryRequest.QueryType.PUSH.equals(ksqlDbPullQueryRequest.getQueryType())) {
            hashSet.add(executeQuery.queryID());
            hashSet.add(executeQuery.queryID().thenAccept(str -> {
                KsqldbActiveConsumerStartupHook.client.terminatePushQuery(str);
            }));
        }
        hashSet.add(executeQuery);
        hashSet.add(processResponse(arrayList, executeQuery));
        try {
            CompletableFuture.allOf((CompletableFuture[]) hashSet.toArray(new CompletableFuture[0])).get(ClientConfig.get().getTimeout(), TimeUnit.MILLISECONDS);
            return arrayList;
        } catch (Exception e) {
            logger.error("Error happen when the ksql processing.", (Throwable) e);
            throw new KsqlException("Ksql execution error:" + String.valueOf(e));
        }
    }

    private CompletableFuture<Void> processResponse(List<Map<String, Object>> list, BatchedQueryResult batchedQueryResult) {
        return batchedQueryResult.thenAccept(list2 -> {
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                Row row = (Row) it.next();
                HashMap hashMap = new HashMap();
                row.columnNames().stream().forEach(str -> {
                    hashMap.put(str, row.getValue(str));
                });
                list.add(hashMap);
            }
        });
    }
}
