package io.vertx.amqp.impl;

import io.vertx.amqp.AmqpConnection;
import io.vertx.amqp.AmqpMessage;
import io.vertx.amqp.AmqpSender;
import io.vertx.core.AsyncResult;
import io.vertx.core.Completable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import io.vertx.proton.ProtonSender;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.DeliveryState;

/* loaded from: input_file:io/vertx/amqp/impl/AmqpSenderImpl.class */
public class AmqpSenderImpl implements AmqpSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpSender.class);
    private final ProtonSender sender;
    private final AmqpConnectionImpl connection;
    private boolean closed;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> drainHandler;
    private long remoteCredit = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.vertx.amqp.impl.AmqpSenderImpl$1, reason: invalid class name */
    /* loaded from: input_file:io/vertx/amqp/impl/AmqpSenderImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType = new int[DeliveryState.DeliveryStateType.values().length];

        static {
            try {
                $SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType[DeliveryState.DeliveryStateType.Rejected.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType[DeliveryState.DeliveryStateType.Modified.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType[DeliveryState.DeliveryStateType.Released.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType[DeliveryState.DeliveryStateType.Accepted.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    private AmqpSenderImpl(ProtonSender protonSender, AmqpConnectionImpl amqpConnectionImpl, Completable<AmqpSender> completable) {
        this.sender = protonSender;
        this.connection = amqpConnectionImpl;
        protonSender.closeHandler(asyncResult -> {
            onClose(protonSender, asyncResult, false);
        }).detachHandler(asyncResult2 -> {
            onClose(protonSender, asyncResult2, true);
        });
        protonSender.sendQueueDrainHandler(protonSender2 -> {
            Handler<Void> handler = null;
            synchronized (this) {
                this.remoteCredit = protonSender.getRemoteCredit();
                if (this.drainHandler != null) {
                    handler = this.drainHandler;
                }
            }
            if (handler != null) {
                handler.handle((Object) null);
            }
        });
        protonSender.openHandler(asyncResult3 -> {
            if (asyncResult3.failed()) {
                completable.complete((Object) null, asyncResult3.cause());
            } else {
                amqpConnectionImpl.register(this);
                completable.succeed(this);
            }
        });
        protonSender.open();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void create(ProtonSender protonSender, AmqpConnectionImpl amqpConnectionImpl, Completable<AmqpSender> completable) {
        new AmqpSenderImpl(protonSender, amqpConnectionImpl, completable);
    }

    private void onClose(ProtonSender protonSender, AsyncResult<ProtonSender> asyncResult, boolean z) {
        Handler<Throwable> handler = null;
        boolean z2 = false;
        synchronized (this) {
            if (!this.closed && this.exceptionHandler != null) {
                handler = this.exceptionHandler;
            }
            if (!this.closed) {
                this.closed = true;
                z2 = true;
            }
        }
        if (handler != null) {
            if (asyncResult.succeeded()) {
                handler.handle(new Exception("Sender closed remotely"));
            } else {
                handler.handle(new Exception("Sender closed remotely with error", asyncResult.cause()));
            }
        }
        if (z2) {
            if (z) {
                protonSender.detach();
            } else {
                protonSender.close();
            }
        }
    }

    public synchronized boolean writeQueueFull() {
        return this.remoteCredit <= 0;
    }

    @Override // io.vertx.amqp.AmqpSender
    public AmqpConnection connection() {
        return this.connection;
    }

    @Override // io.vertx.amqp.AmqpSender
    public AmqpSender send(AmqpMessage amqpMessage) {
        return doSend(amqpMessage, null);
    }

    private AmqpSender doSend(AmqpMessage amqpMessage, Completable<Void> completable) {
        Handler handler = protonDelivery -> {
            Rejected remoteState = protonDelivery.getRemoteState();
            Completable completable2 = completable;
            if (completable == null) {
                completable2 = (r4, th) -> {
                    if (th != null) {
                        LOGGER.warn("Message rejected by remote peer", th);
                    }
                };
            }
            if (remoteState == null) {
                completable2.fail("Unknown message state");
                return;
            }
            switch (AnonymousClass1.$SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType[remoteState.getType().ordinal()]) {
                case 1:
                    completable2.fail("message rejected (REJECTED): " + remoteState.getError());
                    return;
                case 2:
                    completable2.fail("message rejected (MODIFIED)");
                    return;
                case 3:
                    completable2.fail("message rejected (RELEASED)");
                    return;
                case 4:
                    completable2.succeed();
                    return;
                default:
                    completable2.fail("Unsupported delivery type: " + remoteState.getType());
                    return;
            }
        };
        synchronized (this) {
            this.remoteCredit--;
        }
        this.connection.runWithTrampoline(r7 -> {
            this.sender.send((amqpMessage.address() == null ? AmqpMessage.create(amqpMessage).address(address()).build() : amqpMessage).unwrap(), handler);
            synchronized (this) {
                this.remoteCredit = this.sender.getRemoteCredit();
            }
        });
        return this;
    }

    @Override // io.vertx.amqp.AmqpSender
    public synchronized AmqpSender exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public Future<Void> write(AmqpMessage amqpMessage) {
        Promise promise = Promise.promise();
        doSend(amqpMessage, promise);
        return promise.future();
    }

    @Override // io.vertx.amqp.AmqpSender
    /* renamed from: setWriteQueueMaxSize */
    public AmqpSender mo129setWriteQueueMaxSize(int i) {
        return this;
    }

    public Future<Void> end() {
        Promise promise = Promise.promise();
        close(promise);
        return promise.future();
    }

    public synchronized AmqpSender drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        return this;
    }

    public AmqpSender sendWithAck(AmqpMessage amqpMessage, Promise<Void> promise) {
        return doSend(amqpMessage, promise);
    }

    @Override // io.vertx.amqp.AmqpSender
    public Future<Void> sendWithAck(AmqpMessage amqpMessage) {
        Promise<Void> promise = Promise.promise();
        sendWithAck(amqpMessage, promise);
        return promise.future();
    }

    public void close(Completable<Void> completable) {
        Completable<Void> completable2 = completable == null ? (r1, th) -> {
        } : completable;
        synchronized (this) {
            if (this.closed) {
                completable2.succeed();
                return;
            }
            this.closed = true;
            this.connection.unregister(this);
            Completable<Void> completable3 = completable2;
            this.connection.runWithTrampoline(r5 -> {
                if (!this.sender.isOpen()) {
                    completable3.succeed();
                    return;
                }
                try {
                    this.sender.closeHandler(asyncResult -> {
                        completable3.complete((Object) null, asyncResult.cause());
                    }).close();
                } catch (Exception e) {
                    completable3.fail(e);
                }
            });
        }
    }

    @Override // io.vertx.amqp.AmqpSender
    public Future<Void> close() {
        Promise promise = Promise.promise();
        close(promise);
        return promise.future();
    }

    @Override // io.vertx.amqp.AmqpSender
    public String address() {
        return this.sender.getRemoteAddress();
    }

    @Override // io.vertx.amqp.AmqpSender
    public long remainingCredits() {
        return this.sender.getRemoteCredit();
    }

    @Override // io.vertx.amqp.AmqpSender
    public ProtonSender unwrap() {
        return this.sender;
    }

    /* renamed from: drainHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ WriteStream m135drainHandler(Handler handler) {
        return drainHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.amqp.AmqpSender
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ WriteStream mo130exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.amqp.AmqpSender
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ StreamBase mo131exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
