package com.github.tonivade.resp;

import com.github.tonivade.resp.command.CommandSuite;
import com.github.tonivade.resp.command.Request;
import com.github.tonivade.resp.command.RespCommand;
import com.github.tonivade.resp.command.ServerContext;
import com.github.tonivade.resp.command.Session;
import com.github.tonivade.resp.protocol.RedisToken;
import com.github.tonivade.resp.util.Precondition;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.function.Function;
import org.h2.engine.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/tonivade/resp/RespServerContext.class */
public class RespServerContext implements ServerContext {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) RespServerContext.class);
    private final StateHolder state;
    private final ConcurrentHashMap<String, Session> clients;
    private final Scheduler scheduler;
    private final String host;
    private final int port;
    private final CommandSuite commands;
    private final SessionListener sessionListener;

    public RespServerContext(String str, int i, CommandSuite commandSuite) {
        this(str, i, commandSuite, SessionListener.nullListener());
    }

    public RespServerContext(String str, int i, CommandSuite commandSuite, SessionListener sessionListener) {
        this.state = new StateHolder();
        this.clients = new ConcurrentHashMap<>();
        this.scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
        this.host = Precondition.checkNonEmpty(str);
        this.port = Precondition.checkRange(i, Constants.ENCRYPTION_KEY_HASH_ITERATIONS, 65535);
        this.commands = (CommandSuite) Precondition.checkNonNull(commandSuite);
        this.sessionListener = (SessionListener) Precondition.checkNonNull(sessionListener);
    }

    public void start() {
    }

    public void stop() {
        clear();
        this.scheduler.shutdown();
    }

    @Override // com.github.tonivade.resp.command.ServerContext
    public int getClients() {
        return this.clients.size();
    }

    @Override // com.github.tonivade.resp.command.ServerContext
    public RespCommand getCommand(String str) {
        return this.commands.getCommand(str);
    }

    @Override // com.github.tonivade.resp.command.ServerContext
    public <T> Optional<T> getValue(String str) {
        return this.state.getValue(str);
    }

    @Override // com.github.tonivade.resp.command.ServerContext
    public <T> Optional<T> removeValue(String str) {
        return this.state.removeValue(str);
    }

    @Override // com.github.tonivade.resp.command.ServerContext
    public void putValue(String str, Object obj) {
        this.state.putValue(str, obj);
    }

    @Override // com.github.tonivade.resp.command.ServerContext
    public String getHost() {
        return this.host;
    }

    @Override // com.github.tonivade.resp.command.ServerContext
    public int getPort() {
        return this.port;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Session getSession(String str, Function<String, Session> function) {
        return this.clients.computeIfAbsent(str, str2 -> {
            Session session = (Session) function.apply(str2);
            this.sessionListener.sessionCreated(session);
            return session;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processCommand(Request request) {
        LOGGER.debug("received command: {}", request);
        try {
            executeOn(execute(getCommand(request.getCommand()), request)).subscribe(redisToken -> {
                processResponse(request, redisToken);
            }, th -> {
                LOGGER.error("error executing command: " + request, th);
            });
        } catch (RuntimeException e) {
            LOGGER.error("error executing command: " + request, (Throwable) e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CommandSuite getCommands() {
        return this.commands;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void removeSession(String str) {
        Session remove = this.clients.remove(str);
        if (remove != null) {
            this.sessionListener.sessionDeleted(remove);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Session getSession(String str) {
        return this.clients.get(str);
    }

    protected RedisToken executeCommand(RespCommand respCommand, Request request) {
        return respCommand.execute(request);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> Observable<T> executeOn(Observable<T> observable) {
        return observable.observeOn(this.scheduler);
    }

    private void processResponse(Request request, RedisToken redisToken) {
        request.getSession().publish(redisToken);
        if (request.isExit()) {
            request.getSession().close();
        }
    }

    private Observable<RedisToken> execute(RespCommand respCommand, Request request) {
        return Observable.create(observableEmitter -> {
            observableEmitter.onNext(executeCommand(respCommand, request));
            observableEmitter.onComplete();
        });
    }

    private void clear() {
        this.clients.clear();
        this.state.clear();
    }
}
