package org.apache.kafka.connect.runtime.errors;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.codehaus.plexus.util.LineOrientedInterpolatingReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/runtime/errors/LogReporter.class */
public abstract class LogReporter<T> implements ErrorReporter<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) LogReporter.class);
    private static final Future<RecordMetadata> COMPLETED = CompletableFuture.completedFuture(null);
    private final ConnectorTaskId id;
    private final ConnectorConfig connConfig;
    private final ErrorHandlingMetrics errorHandlingMetrics;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/errors/LogReporter$Sink.class */
    public static class Sink extends LogReporter<ConsumerRecord<byte[], byte[]>> {
        public Sink(ConnectorTaskId connectorTaskId, ConnectorConfig connectorConfig, ErrorHandlingMetrics errorHandlingMetrics) {
            super(connectorTaskId, connectorConfig, errorHandlingMetrics);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.kafka.connect.runtime.errors.LogReporter
        public void appendMessage(StringBuilder sb, ConsumerRecord<byte[], byte[]> consumerRecord) {
            sb.append(", where consumed record is ");
            sb.append("{topic='").append(consumerRecord.topic()).append('\'');
            sb.append(", partition=").append(consumerRecord.partition());
            sb.append(", offset=").append(consumerRecord.offset());
            if (consumerRecord.timestampType() == TimestampType.CREATE_TIME || consumerRecord.timestampType() == TimestampType.LOG_APPEND_TIME) {
                sb.append(", timestamp=").append(consumerRecord.timestamp());
                sb.append(", timestampType=").append(consumerRecord.timestampType());
            }
            sb.append(LineOrientedInterpolatingReader.DEFAULT_END_DELIM);
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/errors/LogReporter$Source.class */
    public static class Source extends LogReporter<SourceRecord> {
        public Source(ConnectorTaskId connectorTaskId, ConnectorConfig connectorConfig, ErrorHandlingMetrics errorHandlingMetrics) {
            super(connectorTaskId, connectorConfig, errorHandlingMetrics);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.kafka.connect.runtime.errors.LogReporter
        public void appendMessage(StringBuilder sb, SourceRecord sourceRecord) {
            sb.append(", where source record is = ");
            sb.append(sourceRecord);
        }
    }

    private LogReporter(ConnectorTaskId connectorTaskId, ConnectorConfig connectorConfig, ErrorHandlingMetrics errorHandlingMetrics) {
        Objects.requireNonNull(connectorTaskId);
        Objects.requireNonNull(connectorConfig);
        Objects.requireNonNull(errorHandlingMetrics);
        this.id = connectorTaskId;
        this.connConfig = connectorConfig;
        this.errorHandlingMetrics = errorHandlingMetrics;
    }

    @Override // org.apache.kafka.connect.runtime.errors.ErrorReporter
    public Future<RecordMetadata> report(ProcessingContext<T> processingContext) {
        if (this.connConfig.enableErrorLog() && processingContext.failed()) {
            log.error(message(processingContext), processingContext.error());
            this.errorHandlingMetrics.recordErrorLogged();
            return COMPLETED;
        }
        return COMPLETED;
    }

    String message(ProcessingContext<T> processingContext) {
        return String.format("Error encountered in task %s. %s", this.id, toString(processingContext, this.connConfig.includeRecordDetailsInErrorLog()));
    }

    private String toString(ProcessingContext<T> processingContext, boolean z) {
        StringBuilder sb = new StringBuilder();
        sb.append("Executing stage '");
        sb.append(processingContext.stage().name());
        sb.append("' with class '");
        sb.append(processingContext.executingClass() == null ? "null" : processingContext.executingClass().getName());
        sb.append('\'');
        if (z) {
            appendMessage(sb, processingContext.original());
        }
        sb.append('.');
        return sb.toString();
    }

    protected abstract void appendMessage(StringBuilder sb, T t);
}
