package io.debezium.server.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.StreamNameMapper;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/debezium/server/rabbitmq/RabbitMqStreamChangeConsumerTest.class */
class RabbitMqStreamChangeConsumerTest {
    private static final int DELIVERY_MODE = 2;
    private static final int ACK_TIMEOUT = 1000;
    private Channel channelMock;
    private StreamNameMapper streamNameMapperMock;
    private ChangeEvent<Object, Object> eventMock;
    private DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committerMock;
    private RabbitMqStreamChangeConsumer rabbitMqStreamChangeConsumer;

    RabbitMqStreamChangeConsumerTest() {
    }

    public static Stream<Arguments> testHandleBatch_StaticRoutingKeySourceParameters() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{"static-routing-key", "static-routing-key"}), Arguments.of(new Object[]{null, ""})});
    }

    public static Stream<Arguments> testHandleBatch_KeyRoutingKeySourceParameters() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{"test-routing-key", "test-routing-key"}), Arguments.of(new Object[]{null, ""})});
    }

    @BeforeEach
    void setUp() {
        this.eventMock = (ChangeEvent) Mockito.mock(ChangeEvent.class);
        this.channelMock = (Channel) Mockito.mock(Channel.class);
        this.committerMock = (DebeziumEngine.RecordCommitter) Mockito.mock(DebeziumEngine.RecordCommitter.class);
        this.streamNameMapperMock = (StreamNameMapper) Mockito.mock(StreamNameMapper.class);
        this.rabbitMqStreamChangeConsumer = new RabbitMqStreamChangeConsumer();
        this.rabbitMqStreamChangeConsumer.channel = this.channelMock;
        this.rabbitMqStreamChangeConsumer.setStreamNameMapper(this.streamNameMapperMock);
        this.rabbitMqStreamChangeConsumer.deliveryMode = DELIVERY_MODE;
        this.rabbitMqStreamChangeConsumer.ackTimeout = ACK_TIMEOUT;
    }

    @Test
    void testHandleBatch_TopicRoutingKeySource() throws InterruptedException, IOException, TimeoutException {
        List of = List.of(this.eventMock);
        Mockito.when(this.eventMock.destination()).thenReturn("test-topic");
        Mockito.when(this.eventMock.value()).thenReturn("test content");
        Mockito.when(this.eventMock.headers()).thenReturn(List.of());
        Mockito.when(this.streamNameMapperMock.map("test-topic")).thenReturn("test-topic");
        this.rabbitMqStreamChangeConsumer.exchange = Optional.of("test-topic");
        this.rabbitMqStreamChangeConsumer.routingKeySource = "topic";
        this.rabbitMqStreamChangeConsumer.autoCreateRoutingKey = true;
        this.rabbitMqStreamChangeConsumer.routingKeyDurable = true;
        this.rabbitMqStreamChangeConsumer.routingKey = Optional.of("ignored");
        this.rabbitMqStreamChangeConsumer.handleBatch(of, this.committerMock);
        ((Channel) Mockito.verify(this.channelMock)).queueDeclare("test-topic", true, false, false, (Map) null);
        ((Channel) Mockito.verify(this.channelMock)).basicPublish("test-topic", "test-topic", new AMQP.BasicProperties.Builder().deliveryMode(Integer.valueOf(DELIVERY_MODE)).headers(Map.of()).build(), "test content".getBytes());
        ((Channel) Mockito.verify(this.channelMock)).waitForConfirmsOrDie(1000L);
    }

    @MethodSource({"testHandleBatch_StaticRoutingKeySourceParameters"})
    @ParameterizedTest
    void testHandleBatch_StaticRoutingKeySource(String str, String str2) throws InterruptedException, IOException, TimeoutException {
        List of = List.of(this.eventMock);
        Mockito.when(this.eventMock.destination()).thenReturn("test-topic");
        Mockito.when(this.eventMock.value()).thenReturn("test content");
        Mockito.when(this.eventMock.headers()).thenReturn(List.of());
        Mockito.when(this.streamNameMapperMock.map("test-topic")).thenReturn("test-topic");
        this.rabbitMqStreamChangeConsumer.exchange = Optional.of("test-topic");
        this.rabbitMqStreamChangeConsumer.routingKeySource = "static";
        this.rabbitMqStreamChangeConsumer.routingKey = Optional.ofNullable(str);
        this.rabbitMqStreamChangeConsumer.handleBatch(of, this.committerMock);
        ((Channel) Mockito.verify(this.channelMock, Mockito.never())).queueDeclare((String) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean(), (Map) ArgumentMatchers.any());
        ((Channel) Mockito.verify(this.channelMock)).basicPublish("test-topic", str2, new AMQP.BasicProperties.Builder().deliveryMode(Integer.valueOf(DELIVERY_MODE)).headers(Map.of()).build(), "test content".getBytes());
        ((Channel) Mockito.verify(this.channelMock)).waitForConfirmsOrDie(1000L);
    }

    @MethodSource({"testHandleBatch_KeyRoutingKeySourceParameters"})
    @ParameterizedTest
    void testHandleBatch_KeyRoutingKeySource(String str, String str2) throws InterruptedException, IOException, TimeoutException {
        List of = List.of(this.eventMock);
        Mockito.when(this.eventMock.destination()).thenReturn("test-topic");
        Mockito.when(this.eventMock.value()).thenReturn("test content");
        Mockito.when(this.eventMock.key()).thenReturn(str);
        Mockito.when(this.eventMock.headers()).thenReturn(List.of());
        Mockito.when(this.streamNameMapperMock.map("test-topic")).thenReturn("test-topic");
        Mockito.when(this.streamNameMapperMock.map(str)).thenReturn(str);
        this.rabbitMqStreamChangeConsumer.exchange = Optional.of("test-topic");
        this.rabbitMqStreamChangeConsumer.routingKeySource = "key";
        this.rabbitMqStreamChangeConsumer.routingKey = Optional.of("ignored");
        this.rabbitMqStreamChangeConsumer.handleBatch(of, this.committerMock);
        ((Channel) Mockito.verify(this.channelMock, Mockito.never())).queueDeclare((String) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean(), (Map) ArgumentMatchers.any());
        ((Channel) Mockito.verify(this.channelMock)).basicPublish("test-topic", str2, new AMQP.BasicProperties.Builder().deliveryMode(Integer.valueOf(DELIVERY_MODE)).headers(Map.of()).build(), "test content".getBytes());
        ((Channel) Mockito.verify(this.channelMock)).waitForConfirmsOrDie(1000L);
    }
}
