package cc.chensoul.rose.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:cc/chensoul/rose/canal/CanalRunner.class */
public class CanalRunner {
    private static final Logger log = LoggerFactory.getLogger(CanalRunner.class);
    int BATCH_SIZE = 5120;

    @Resource
    private CanalConnector connector;

    private static void printEntry(List<CanalEntry.Entry> list) {
        for (CanalEntry.Entry entry : list) {
            if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
                try {
                    CanalEntry.RowChange parseFrom = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    CanalEntry.EventType eventType = parseFrom.getEventType();
                    if (parseFrom.getIsDdl()) {
                        log.info("binlog: {}:{}, table: {}.{}, eventType: {}, ddlSql: {}", new Object[]{entry.getHeader().getLogfileName(), Long.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, parseFrom.getSql()});
                    } else {
                        log.info("binlog: {}:{}, table: {}.{}, eventType: {}", new Object[]{entry.getHeader().getLogfileName(), Long.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType});
                    }
                    for (CanalEntry.RowData rowData : parseFrom.getRowDatasList()) {
                        log.info("eventType:{}", eventType);
                        if (eventType == CanalEntry.EventType.DELETE) {
                            printColumn(rowData.getAfterColumnsList());
                        } else if (eventType == CanalEntry.EventType.INSERT) {
                            printColumn(rowData.getAfterColumnsList());
                        } else {
                            printColumn(rowData.getBeforeColumnsList());
                            printColumn(rowData.getAfterColumnsList());
                        }
                    }
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> list) {
        for (CanalEntry.Column column : list) {
            System.out.println(column.getName() + ": " + column.getValue() + ",updated=" + column.getUpdated());
        }
    }

    @PostConstruct
    public void connect() {
        this.connector.connect();
        this.connector.subscribe();
        this.connector.rollback();
    }

    @Async
    @Scheduled(initialDelayString = "${canal.scheduled.initialDelay:2000}", fixedDelayString = "${canal.scheduled.fixedDelay:2000}")
    public void processData() {
        try {
            if (this.connector.checkValid()) {
                Message withoutAck = this.connector.getWithoutAck(this.BATCH_SIZE);
                long id = withoutAck.getId();
                int size = withoutAck.getEntries().size();
                if (id == -1 || size == 0) {
                    log.info("本次[{}]没有检测到数据更新。", Long.valueOf(id));
                } else {
                    log.info("本次[{}]数据共有[{}]次更新需要处理", Long.valueOf(id), Integer.valueOf(size));
                    printEntry(withoutAck.getEntries());
                    this.connector.ack(id);
                    log.info("本次[{}]处理Canal同步数据完成", Long.valueOf(id));
                }
            } else {
                log.warn("与Canal服务器的连接失效！！！重连，下个周期再检查数据变更");
                connect();
            }
        } catch (Exception e) {
            log.error("处理Canal同步数据失效，请检查：", e);
        }
    }

    @PreDestroy
    public void disConnect() {
        this.connector.disconnect();
    }
}
