package io.debezium.pipeline.signal;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.data.Envelope;
import io.debezium.pipeline.signal.channels.SourceSignalChannel;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:io/debezium/pipeline/signal/SourceSignalChannelTest.class */
public class SourceSignalChannelTest {
    @Test
    public void shouldExecuteFromEnvelope() throws Exception {
        SourceSignalChannel sourceSignalChannel = new SourceSignalChannel();
        sourceSignalChannel.init(config());
        Schema build = SchemaBuilder.struct().name("signal").field("col1", Schema.OPTIONAL_STRING_SCHEMA).field("col2", Schema.OPTIONAL_STRING_SCHEMA).field("col3", Schema.OPTIONAL_STRING_SCHEMA).build();
        Envelope build2 = Envelope.defineSchema().withName("someName").withRecord(build).withSource(SchemaBuilder.struct().name("source").build()).build();
        Struct struct = new Struct(build);
        struct.put("col1", "log1");
        struct.put("col2", "custom");
        struct.put("col3", "{\"v\": 5}");
        sourceSignalChannel.process(build2.create(struct, (Struct) null, (Instant) null));
        List read = sourceSignalChannel.read();
        Assertions.assertThat(read).hasSize(1);
        Assertions.assertThat(((SignalRecord) read.get(0)).getData()).isEqualTo("{\"v\": 5}");
    }

    @Test
    public void shouldIgnoreInvalidEnvelope() throws Exception {
        SourceSignalChannel sourceSignalChannel = new SourceSignalChannel();
        sourceSignalChannel.init(config());
        Schema build = SchemaBuilder.struct().name("signal").field("col1", Schema.OPTIONAL_STRING_SCHEMA).field("col2", Schema.OPTIONAL_STRING_SCHEMA).build();
        Envelope build2 = Envelope.defineSchema().withName("someName").withRecord(build).withSource(SchemaBuilder.struct().name("source").build()).build();
        Struct struct = new Struct(build);
        struct.put("col1", "log1");
        struct.put("col2", "custom");
        sourceSignalChannel.process(build2.create(struct, (Struct) null, (Instant) null));
        Assertions.assertThat(sourceSignalChannel.read()).hasSize(0);
        sourceSignalChannel.process(struct);
        Assertions.assertThat(sourceSignalChannel.read()).hasSize(0);
    }

    protected CommonConnectorConfig config() {
        return new CommonConnectorConfig(Configuration.create().with(CommonConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal").with(CommonConnectorConfig.TOPIC_PREFIX, "core").build(), 0) { // from class: io.debezium.pipeline.signal.SourceSignalChannelTest.1
            protected SourceInfoStructMaker<?> getSourceInfoStructMaker(CommonConnectorConfig.Version version) {
                return null;
            }

            public String getContextName() {
                return null;
            }

            public String getConnectorName() {
                return null;
            }

            public EnumeratedValue getSnapshotMode() {
                return null;
            }

            public Optional<EnumeratedValue> getSnapshotLockingMode() {
                return Optional.empty();
            }
        };
    }
}
