package top.turboweb.http.response.sync;

import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import java.util.LinkedList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import top.turboweb.commons.exception.TurboSseException;
import top.turboweb.http.connect.ConnectSession;

/* loaded from: input_file:top/turboweb/http/response/sync/SseEmitter.class */
public abstract class SseEmitter extends DefaultHttpResponse {
    protected final ConnectSession session;
    protected volatile boolean isInit;
    protected LinkedList<String> messageCache;
    private final int maxMessageCache;
    protected int messageCacheSize;
    protected final ReentrantReadWriteLock sseLock;

    public SseEmitter(ConnectSession connectSession, int i) {
        super(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        this.isInit = false;
        this.messageCache = new LinkedList<>();
        this.messageCacheSize = 0;
        this.sseLock = new ReentrantReadWriteLock();
        this.session = connectSession;
        headers().set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream");
        headers().set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
        headers().set(HttpHeaderNames.CONNECTION, "keep-alive");
        headers().set(HttpHeaderNames.TRANSFER_ENCODING, "chunked");
        HttpUtil.setTransferEncodingChunked(this, true);
        this.maxMessageCache = i;
    }

    public void send(String str) {
        ReentrantReadWriteLock.ReadLock readLock = this.sseLock.readLock();
        try {
            readLock.lock();
            if (this.isInit) {
                this.session.send(str);
            } else {
                saveMessageToCache(str);
            }
        } finally {
            readLock.unlock();
        }
    }

    private synchronized void saveMessageToCache(String str) {
        if (this.messageCacheSize >= this.maxMessageCache) {
            throw new TurboSseException("消息缓存已满");
        }
        this.messageCache.add(str);
        this.messageCacheSize++;
    }

    public void close() {
        this.session.close();
    }

    public void onClose(Consumer<SseEmitter> consumer) {
        this.session.closeListener(() -> {
            consumer.accept(this);
        });
    }
}
