package io.helidon.messaging.connectors.kafka;

import io.helidon.common.reactive.BufferedEmittingPublisher;
import io.helidon.common.reactive.EmittingPublisher;
import io.helidon.common.reactive.Multi;
import io.helidon.config.Config;
import io.helidon.messaging.NackHandler;
import java.lang.System;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.reactivestreams.FlowAdapters;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/helidon/messaging/connectors/kafka/KafkaNackHandler.class */
public interface KafkaNackHandler<K, V> extends NackHandler<KafkaMessage<K, V>> {

    /* loaded from: input_file:io/helidon/messaging/connectors/kafka/KafkaNackHandler$KafkaDLQ.class */
    public static class KafkaDLQ<K, V> implements KafkaNackHandler<K, V> {
        private static final String DESERIALIZER_MASK = "([^.]*)Deserializer([^.]*$)";
        private static final String DESERIALIZER_REPLACEMENT = "$1Serializer$2";
        private final BufferedEmittingPublisher<KafkaMessage<K, V>> dlqEmitter;
        private final String dlqTopic;

        KafkaDLQ(EmittingPublisher<KafkaMessage<K, V>> emittingPublisher, Config config, Config config2) {
            KafkaConfig create;
            if (config2.isLeaf()) {
                create = KafkaConfig.create(config);
                this.dlqTopic = (String) config2.asString().get();
                create.topics().clear();
                create.topics().add(this.dlqTopic);
            } else {
                create = KafkaConfig.create(config2);
                this.dlqTopic = (String) config2.get("topic").asString().orElseThrow(() -> {
                    return new IllegalStateException("Missing dlq.topic property!");
                });
                create.putIfAbsent("bootstrap.servers", () -> {
                    return config.get("bootstrap.servers").as(String.class).get();
                });
            }
            create.putIfAbsent("key.serializer", () -> {
                return ((String) config.get("key.deserializer").asString().orElseThrow(() -> {
                    return new IllegalStateException("Missing dlq.key.serializer property!");
                })).replaceAll(DESERIALIZER_MASK, DESERIALIZER_REPLACEMENT);
            });
            create.putIfAbsent("value.serializer", () -> {
                return ((String) config.get("value.deserializer").asString().orElseThrow(() -> {
                    return new IllegalStateException("Missing dlq.value.serializer property!");
                })).replaceAll(DESERIALIZER_MASK, DESERIALIZER_REPLACEMENT);
            });
            create.remove("topic");
            this.dlqEmitter = BufferedEmittingPublisher.create();
            this.dlqEmitter.onAbort(th -> {
                if (th != null) {
                    emittingPublisher.fail(new Exception("DLQ channel failed", th));
                } else {
                    emittingPublisher.fail(new Exception("DLQ channel cancelled"));
                }
            });
            Multi.create(this.dlqEmitter).subscribe(FlowAdapters.toFlowSubscriber(KafkaSubscriber.builder().config(create).m8build()));
        }

        @Override // io.helidon.messaging.connectors.kafka.KafkaNackHandler
        public Function<Throwable, CompletionStage<Void>> getNack(KafkaMessage<K, V> kafkaMessage) {
            return th -> {
                return nack(th, kafkaMessage);
            };
        }

        private CompletionStage<Void> nack(Throwable th, KafkaMessage<K, V> kafkaMessage) {
            KafkaMessage kafkaMessage2 = (KafkaMessage) kafkaMessage.getKey().map(obj -> {
                return KafkaMessage.of(obj, kafkaMessage.getPayload(), () -> {
                    return kafkaMessage.ack();
                });
            }).orElseGet(() -> {
                return KafkaMessage.of(kafkaMessage.getPayload(), (Supplier<CompletionStage<Void>>) () -> {
                    return kafkaMessage.ack();
                });
            });
            Headers headers = kafkaMessage2.getHeaders();
            Headers headers2 = kafkaMessage.getHeaders();
            Objects.requireNonNull(headers);
            headers2.forEach(headers::add);
            headers.remove("topic");
            headers.add(new StringHeader("topic", this.dlqTopic));
            headers.add(new StringHeader("dlq-error", th.getClass().getName()));
            headers.add(new StringHeader("dlq-error-msg", th.getMessage()));
            kafkaMessage.getTopic().ifPresent(str -> {
                headers.add(new StringHeader("dlq-orig-topic", str));
            });
            kafkaMessage.getOffset().ifPresent(l -> {
                headers.add(new StringHeader("dlq-orig-offset", l));
            });
            kafkaMessage.getPartition().ifPresent(num -> {
                headers.add(new StringHeader("dlq-orig-partition", num));
            });
            if (this.dlqEmitter.isCancelled()) {
                return CompletableFuture.failedFuture(new IllegalStateException("DQL stream  to " + this.dlqTopic + " cancelled!"));
            }
            this.dlqEmitter.emit(kafkaMessage2);
            return CompletableFuture.completedFuture(null);
        }
    }

    /* loaded from: input_file:io/helidon/messaging/connectors/kafka/KafkaNackHandler$KillChannel.class */
    public static class KillChannel<K, V> implements KafkaNackHandler<K, V> {
        private static final System.Logger LOGGER = System.getLogger(KillChannel.class.getName());
        private final EmittingPublisher<KafkaMessage<K, V>> emitter;

        KillChannel(EmittingPublisher<KafkaMessage<K, V>> emittingPublisher) {
            this.emitter = emittingPublisher;
        }

        @Override // io.helidon.messaging.connectors.kafka.KafkaNackHandler
        public Function<Throwable, CompletionStage<Void>> getNack(KafkaMessage<K, V> kafkaMessage) {
            return th -> {
                return nack(th, kafkaMessage);
            };
        }

        private CompletionStage<Void> nack(Throwable th, KafkaMessage<K, V> kafkaMessage) {
            LOGGER.log(System.Logger.Level.ERROR, KafkaNackHandler.messageToString("NACKED message - killing the channel", kafkaMessage), th);
            this.emitter.fail(th);
            return CompletableFuture.failedStage(th);
        }
    }

    /* loaded from: input_file:io/helidon/messaging/connectors/kafka/KafkaNackHandler$Log.class */
    public static class Log<K, V> implements KafkaNackHandler<K, V> {
        private static final System.Logger LOGGER = System.getLogger(Log.class.getName());

        Log(Config config, Config config2) {
        }

        @Override // io.helidon.messaging.connectors.kafka.KafkaNackHandler
        public Function<Throwable, CompletionStage<Void>> getNack(KafkaMessage<K, V> kafkaMessage) {
            return th -> {
                return nack(th, kafkaMessage);
            };
        }

        private CompletionStage<Void> nack(Throwable th, KafkaMessage<K, V> kafkaMessage) {
            LOGGER.log(System.Logger.Level.WARNING, KafkaNackHandler.messageToString("NACKED Message - ignored", kafkaMessage));
            return CompletableFuture.completedFuture(null);
        }
    }

    /* loaded from: input_file:io/helidon/messaging/connectors/kafka/KafkaNackHandler$StringHeader.class */
    public static class StringHeader extends RecordHeader {
        StringHeader(String str, Object obj) {
            super(str, obj.toString().getBytes(StandardCharsets.UTF_8));
        }
    }

    @Override // 
    Function<Throwable, CompletionStage<Void>> getNack(KafkaMessage<K, V> kafkaMessage);

    static <K, V> KafkaNackHandler<K, V> create(EmittingPublisher<KafkaMessage<K, V>> emittingPublisher, Config config) {
        Config config2 = config.get("nack-dlq");
        Config config3 = config.get("nack-log-only");
        return config2.exists() ? new KafkaDLQ(emittingPublisher, config, config2.detach()) : (config3.exists() && ((Boolean) config3.asBoolean().orElse(true)).booleanValue()) ? new Log(config, config3.detach()) : new KillChannel(emittingPublisher);
    }

    static <K, V> String messageToString(String str, KafkaMessage<K, V> kafkaMessage) {
        StringBuilder sb = new StringBuilder(str);
        kafkaMessage.getKey().ifPresent(obj -> {
            sb.append(" key: ").append(obj);
        });
        kafkaMessage.getTopic().ifPresent(str2 -> {
            sb.append(" topic: ").append(str2);
        });
        kafkaMessage.getOffset().ifPresent(l -> {
            sb.append(" offset: ").append(l);
        });
        kafkaMessage.getPartition().ifPresent(num -> {
            sb.append(" partition: ").append(num);
        });
        return sb.toString();
    }
}
