package com.binaryigor.eventsql.internal.sql;

import com.binaryigor.eventsql.Event;
import com.binaryigor.eventsql.EventPublication;
import com.binaryigor.eventsql.internal.EventInput;
import com.binaryigor.eventsql.internal.EventRepository;
import com.binaryigor.eventsql.internal.Transactions;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.InsertValuesStep5;
import org.jooq.JSON;
import org.jooq.OrderField;
import org.jooq.Table;
import org.jooq.exception.DataAccessException;
import org.jooq.impl.DSL;

/* loaded from: input_file:com/binaryigor/eventsql/internal/sql/SQLEventRepository.class */
public class SQLEventRepository implements EventRepository {
    private static final Table<?> EVENT_BUFFER = DSL.table("event_buffer");
    private static final Table<?> EVENT_BUFFER_LOCK = DSL.table("event_buffer_lock");
    private static final Field<String> TOPIC = DSL.field("eql_topic", String.class);
    private static final Field<Long> ID = DSL.field("eql_id", Long.class);
    private static final Field<Short> PARTITION = DSL.field("eql_partition", Short.class);
    private static final Field<String> KEY = DSL.field("eql_key", String.class);
    private static final Field<byte[]> VALUE = DSL.field("eql_value", byte[].class);
    private static final Field<JSON> METADATA = DSL.field("eql_metadata", JSON.class);
    private static final Field<Timestamp> BUFFERED_AT = DSL.field("eql_buffered_at", Timestamp.class);
    private static final Field<Timestamp> CREATED_AT = DSL.field("eql_created_at", Timestamp.class);
    private final Transactions transactions;
    private final DSLContextProvider contextProvider;

    public SQLEventRepository(Transactions transactions, DSLContextProvider dSLContextProvider) {
        this.transactions = transactions;
        this.contextProvider = dSLContextProvider;
    }

    @Override // com.binaryigor.eventsql.internal.EventRepository
    public void createBuffer() {
        this.transactions.execute(() -> {
            this.contextProvider.get().createTableIfNotExists(EVENT_BUFFER).column(ID, ID.getDataType().identity(true)).column(TOPIC, TOPIC.getDataType().notNull().length(255)).column(PARTITION, PARTITION.getDataType().notNull()).column(KEY).column(VALUE, VALUE.getDataType().notNull()).column(CREATED_AT, CREATED_AT.getDataType().notNull().defaultValue(DSL.now())).column(METADATA, METADATA.getDataType().notNull()).constraint(DSL.primaryKey(new Field[]{ID})).execute();
            try {
                this.contextProvider.get().createIndex("event_buffer_topic_id").on(EVENT_BUFFER, new OrderField[]{TOPIC, ID}).execute();
            } catch (DataAccessException e) {
                String lowerCase = e.getMessage() == null ? "" : e.getMessage().toLowerCase();
                if (!lowerCase.contains("duplicate") && !lowerCase.contains("exists")) {
                    throw e;
                }
            }
        });
    }

    @Override // com.binaryigor.eventsql.internal.EventRepository
    public void createPartition(String str) {
        this.transactions.execute(() -> {
            DSLContext dSLContext = this.contextProvider.get();
            dSLContext.createTableIfNotExists(eventTable(str)).column(ID, ID.getDataType().identity(true)).column(PARTITION, PARTITION.getDataType().notNull()).column(KEY).column(VALUE, VALUE.getDataType().notNull()).column(BUFFERED_AT, BUFFERED_AT.getDataType().notNull()).column(CREATED_AT, CREATED_AT.getDataType().notNull().defaultValue(DSL.now())).column(METADATA, METADATA.getDataType().notNull()).constraint(DSL.constraint().primaryKey(new Field[]{ID})).execute();
            dSLContext.createTableIfNotExists(EVENT_BUFFER_LOCK).column(TOPIC, TOPIC.getDataType().length(255)).constraint(DSL.primaryKey(new Field[]{TOPIC})).execute();
            dSLContext.insertInto(EVENT_BUFFER_LOCK).columns(TOPIC).values(str).onConflictDoNothing().execute();
        });
    }

    private Table<?> eventTable(String str) {
        return DSL.table(str + "_event");
    }

    @Override // com.binaryigor.eventsql.internal.EventRepository
    public void dropPartition(String str) {
        this.transactions.execute(() -> {
            DSLContext dSLContext = this.contextProvider.get();
            dSLContext.dropTableIfExists(eventTable(str)).execute();
            dSLContext.deleteFrom(EVENT_BUFFER).where(TOPIC.eq(str)).execute();
            dSLContext.deleteFrom(EVENT_BUFFER_LOCK).where(TOPIC.eq(str)).execute();
        });
    }

    @Override // com.binaryigor.eventsql.internal.EventRepository
    public void create(EventInput eventInput) {
        createAll(List.of(eventInput));
    }

    @Override // com.binaryigor.eventsql.internal.EventRepository
    public void createAll(Collection<EventInput> collection) {
        if (collection.isEmpty()) {
            return;
        }
        InsertValuesStep5 columns = this.contextProvider.get().insertInto(EVENT_BUFFER).columns(TOPIC, PARTITION, KEY, VALUE, METADATA);
        collection.forEach(eventInput -> {
            EventPublication publication = eventInput.publication();
            columns.values(publication.topic(), Short.valueOf(eventInput.partition()), publication.key(), publication.value(), JSON.valueOf(SimpleJSONMapper.toJSON(publication.metadata())));
        });
        columns.execute();
    }

    @Override // com.binaryigor.eventsql.internal.EventRepository
    public int flushBuffer(Collection<String> collection, int i) {
        if (collection.isEmpty()) {
            return 0;
        }
        int i2 = i + 1;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.transactions.execute(() -> {
            DSLContext dSLContext = this.contextProvider.get();
            List fetch = dSLContext.select(TOPIC).from(EVENT_BUFFER_LOCK).where(TOPIC.in(collection)).forUpdate().skipLocked().fetch(TOPIC);
            if (fetch.isEmpty()) {
                return;
            }
            Map fetchGroups = dSLContext.select(TOPIC, ID).from(EVENT_BUFFER).where(TOPIC.in(fetch)).orderBy(TOPIC, ID).limit(Integer.valueOf(i2)).fetchGroups(TOPIC, ID);
            if (fetchGroups.isEmpty()) {
                return;
            }
            ArrayList arrayList = new ArrayList();
            fetchGroups.forEach((str, list) -> {
                arrayList.add(dSLContext.insertInto(eventTable(str)).columns(PARTITION, KEY, VALUE, METADATA, BUFFERED_AT).select(dSLContext.select(PARTITION, KEY, VALUE, METADATA, CREATED_AT.as(BUFFERED_AT)).from(EVENT_BUFFER).where(ID.in(list))));
                atomicInteger.addAndGet(list.size());
            });
            arrayList.add(dSLContext.deleteFrom(EVENT_BUFFER).where(ID.in(fetchGroups.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).toList())));
            dSLContext.batch(arrayList).execute();
        });
        return atomicInteger.get();
    }

    @Override // com.binaryigor.eventsql.internal.EventRepository
    public List<Event> nextEvents(String str, Long l, int i) {
        return nextEvents(str, (Short) null, l, i);
    }

    public List<Event> nextEvents(String str, Short sh, Long l, int i) {
        Condition trueCondition = DSL.trueCondition();
        if (l != null) {
            trueCondition = trueCondition.and(ID.greaterThan(l));
        }
        if (sh != null) {
            trueCondition = trueCondition.and(PARTITION.eq(sh));
        }
        return this.contextProvider.get().select(DSL.val(str).as(TOPIC), ID, PARTITION, KEY, VALUE, METADATA).from(eventTable(str)).where(trueCondition).orderBy(ID).limit(Integer.valueOf(i)).fetchInto(Event.class);
    }

    @Override // com.binaryigor.eventsql.internal.EventRepository
    public List<Event> nextEvents(String str, int i, Long l, int i2) {
        return nextEvents(str, Short.valueOf((short) i), l, i2);
    }
}
