package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.slf4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-3.8.1.jar:org/apache/kafka/streams/processor/internals/TaskExecutor.class */
public class TaskExecutor {
    private final Logger log;
    private final TasksRegistry tasks;
    private final TaskManager taskManager;
    private final TaskExecutionMetadata executionMetadata;

    public TaskExecutor(TasksRegistry tasksRegistry, TaskManager taskManager, TaskExecutionMetadata taskExecutionMetadata, LogContext logContext) {
        this.tasks = tasksRegistry;
        this.taskManager = taskManager;
        this.executionMetadata = taskExecutionMetadata;
        this.log = logContext.logger(getClass());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int process(int i, Time time) {
        int i2 = 0;
        Task task = null;
        for (Task task2 : this.tasks.activeTasks()) {
            long milliseconds = time.milliseconds();
            try {
                if (this.executionMetadata.canProcessTask(task2, milliseconds)) {
                    task = task2;
                    i2 += processTask(task2, i, milliseconds, time);
                }
            } catch (Throwable th) {
                this.executionMetadata.registerTaskError(task2, th, milliseconds);
                this.executionMetadata.removeTaskFromSuccessfullyProcessedBeforeClosing(task);
                commitSuccessfullyProcessedTasks();
                throw th;
            }
        }
        return i2;
    }

    private int processTask(Task task, int i, long j, Time time) {
        int i2 = 0;
        while (i2 < i) {
            try {
                try {
                    try {
                        try {
                            if (!task.process(j)) {
                                break;
                            }
                            task.clearTaskTimeout();
                            i2++;
                        } catch (TaskMigratedException e) {
                            this.log.info("Failed to process stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", task.id());
                            throw e;
                        }
                    } catch (TimeoutException e2) {
                        task.maybeInitTaskTimeoutOrThrow(j, e2);
                        this.log.error(String.format("Could not complete processing records for %s due to the following exception; will move to next task and retry later", task.id()), (Throwable) e2);
                        task.recordProcessBatchTime(time.milliseconds() - j);
                    }
                } catch (StreamsException e3) {
                    this.log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), (Throwable) e3);
                    e3.setTaskId(task.id());
                    throw e3;
                } catch (RuntimeException e4) {
                    this.log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), (Throwable) e4);
                    throw new StreamsException(e4, task.id());
                }
            } catch (Throwable th) {
                task.recordProcessBatchTime(time.milliseconds() - j);
                throw th;
            }
        }
        if (i2 > 0 && this.executionMetadata.hasNamedTopologies() && this.executionMetadata.processingMode() != StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
            this.log.trace("Successfully processed task {}", task.id());
            this.executionMetadata.addToSuccessfullyProcessed(task);
        }
        task.recordProcessBatchTime(time.milliseconds() - j);
        return i2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int commitTasksAndMaybeUpdateCommittableOffsets(Collection<Task> collection, Map<Task, Map<TopicPartition, OffsetAndMetadata>> map) {
        int i = 0;
        for (Task task : collection) {
            if (task.commitNeeded()) {
                Map<TopicPartition, OffsetAndMetadata> prepareCommit = task.prepareCommit();
                if (!prepareCommit.isEmpty()) {
                    map.put(task, prepareCommit);
                }
            }
        }
        commitOffsetsOrTransaction(map);
        for (Task task2 : collection) {
            if (task2.commitNeeded()) {
                task2.clearTaskTimeout();
                i++;
                task2.postCommit(false);
            }
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commitOffsetsOrTransaction(Map<Task, Map<TopicPartition, OffsetAndMetadata>> map) {
        this.log.debug("Committing task offsets {}", map.entrySet().stream().collect(Collectors.toMap(entry -> {
            return ((Task) entry.getKey()).id();
        }, (v0) -> {
            return v0.getValue();
        })));
        HashSet hashSet = new HashSet();
        if (this.executionMetadata.processingMode() == StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA) {
            for (Task task : this.taskManager.activeRunningTaskIterable()) {
                Map<TopicPartition, OffsetAndMetadata> orDefault = map.getOrDefault(task, Collections.emptyMap());
                if (!orDefault.isEmpty() || this.taskManager.streamsProducerForTask(task.id()).transactionInFlight()) {
                    try {
                        this.taskManager.streamsProducerForTask(task.id()).commitTransaction(orDefault, this.taskManager.consumerGroupMetadata());
                        updateTaskCommitMetadata(orDefault);
                    } catch (TimeoutException e) {
                        this.log.error(String.format("Committing task %s failed.", task.id()), (Throwable) e);
                        hashSet.add(task.id());
                    }
                }
            }
        } else if (this.executionMetadata.processingMode() == StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
            if (!map.isEmpty() || this.taskManager.threadProducer().transactionInFlight()) {
                Map<TopicPartition, OffsetAndMetadata> map2 = (Map) map.values().stream().flatMap(map3 -> {
                    return map3.entrySet().stream();
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                }));
                try {
                    this.taskManager.threadProducer().commitTransaction(map2, this.taskManager.consumerGroupMetadata());
                    updateTaskCommitMetadata(map2);
                } catch (TimeoutException e2) {
                    this.log.error(String.format("Committing task(s) %s failed.", map.keySet().stream().map(task2 -> {
                        return task2.id().toString();
                    }).collect(Collectors.joining(", "))), (Throwable) e2);
                    map.keySet().forEach(task3 -> {
                        hashSet.add(task3.id());
                    });
                }
            }
        } else if (!map.isEmpty()) {
            Map<TopicPartition, OffsetAndMetadata> map4 = (Map) map.values().stream().flatMap(map5 -> {
                return map5.entrySet().stream();
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
            try {
                this.taskManager.consumerCommitSync(map4);
                updateTaskCommitMetadata(map4);
            } catch (CommitFailedException e3) {
                throw new TaskMigratedException("Consumer committing offsets failed, indicating the corresponding thread is no longer part of the group", e3);
            } catch (TimeoutException e4) {
                this.log.error(String.format("Committing task(s) %s failed.", map.keySet().stream().map(task4 -> {
                    return task4.id().toString();
                }).collect(Collectors.joining(", "))), (Throwable) e4);
                throw e4;
            } catch (KafkaException e5) {
                throw new StreamsException("Error encountered committing offsets via consumer", e5);
            }
        }
        if (!hashSet.isEmpty()) {
            throw new TaskCorruptedException(hashSet);
        }
    }

    private void updateTaskCommitMetadata(Map<TopicPartition, OffsetAndMetadata> map) {
        if (map.isEmpty()) {
            return;
        }
        for (Task task : this.tasks.activeTasks()) {
            if (task instanceof StreamTask) {
                for (TopicPartition topicPartition : task.inputPartitions()) {
                    if (map.containsKey(topicPartition)) {
                        ((StreamTask) task).updateCommittedOffsets(topicPartition, Long.valueOf(map.get(topicPartition).offset()));
                    }
                }
            }
        }
    }

    private void commitSuccessfullyProcessedTasks() {
        if (!this.executionMetadata.successfullyProcessed().isEmpty()) {
            this.log.info("Streams encountered an error when processing tasks. Will commit all previously successfully processed tasks {}", this.executionMetadata.successfullyProcessed().stream().map((v0) -> {
                return v0.id();
            }));
            commitTasksAndMaybeUpdateCommittableOffsets(this.executionMetadata.successfullyProcessed(), new HashMap());
        }
        this.executionMetadata.clearSuccessfullyProcessed();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int punctuate() {
        int i = 0;
        for (Task task : this.tasks.activeTasks()) {
            try {
                if (this.executionMetadata.canPunctuateTask(task)) {
                    if (task.maybePunctuateStreamTime()) {
                        i++;
                    }
                    if (task.maybePunctuateSystemTime()) {
                        i++;
                    }
                }
            } catch (TaskMigratedException e) {
                this.log.info("Failed to punctuate stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", task.id());
                throw e;
            } catch (StreamsException e2) {
                this.log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e2);
                e2.setTaskId(task.id());
                throw e2;
            } catch (KafkaException e3) {
                this.log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e3);
                throw new StreamsException(e3, task.id());
            }
        }
        return i;
    }
}
