/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation.handler;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.ReactorDispatcher;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.handler.DeliveryNotOnLinkException;
import com.azure.core.amqp.implementation.handler.LinkHandler;
import com.azure.core.amqp.implementation.handler.ReceiverDeliveryHandler;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public final class ReceiverUnsettledDeliveries
implements AutoCloseable {
    private static final String DELIVERY_TAG_KEY = "lockToken";
    private final AtomicBoolean isTerminated = new AtomicBoolean();
    private final String hostname;
    private final String entityPath;
    private final String receiveLinkName;
    private final ReactorDispatcher dispatcher;
    private final AmqpRetryPolicy retryPolicy;
    private final Duration timeout;
    private final UUID deliveryEmptyTag;
    private final ClientLogger logger;
    private final Disposable timoutTimer;
    private final boolean settleOnClose;
    private final ConcurrentHashMap<String, Delivery> deliveries = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, DispositionWork> pendingDispositions = new ConcurrentHashMap();

    public ReceiverUnsettledDeliveries(String hostname, String entityPath, String receiveLinkName, ReactorDispatcher dispatcher, AmqpRetryOptions retryOptions, UUID deliveryEmptyTag, ClientLogger logger) {
        this.hostname = hostname;
        this.entityPath = entityPath;
        this.receiveLinkName = receiveLinkName;
        this.dispatcher = dispatcher;
        this.retryPolicy = RetryUtil.getRetryPolicy(retryOptions);
        this.timeout = retryOptions.getTryTimeout();
        this.deliveryEmptyTag = deliveryEmptyTag;
        this.logger = logger;
        this.timoutTimer = Flux.interval((Duration)this.timeout).subscribe(__ -> this.completeDispositionWorksOnTimeout("timer"));
        this.settleOnClose = false;
    }

    ReceiverUnsettledDeliveries(String hostname, String entityPath, String receiveLinkName, ReactorDispatcher dispatcher, AmqpRetryOptions retryOptions, ClientLogger logger) {
        this.hostname = hostname;
        this.entityPath = entityPath;
        this.receiveLinkName = receiveLinkName;
        this.dispatcher = dispatcher;
        this.retryPolicy = RetryUtil.getRetryPolicy(retryOptions);
        this.timeout = retryOptions.getTryTimeout();
        this.deliveryEmptyTag = ReceiverDeliveryHandler.DELIVERY_EMPTY_TAG;
        this.logger = logger;
        this.timoutTimer = Flux.interval((Duration)this.timeout).subscribe(__ -> this.completeDispositionWorksOnTimeout("timer"));
        this.settleOnClose = true;
    }

    public boolean onDelivery(UUID deliveryTag, Delivery delivery) {
        if (this.isTerminated.get()) {
            return false;
        }
        this.deliveries.putIfAbsent(deliveryTag.toString(), delivery);
        return true;
    }

    public boolean containsDelivery(UUID deliveryTag) {
        return deliveryTag != this.deliveryEmptyTag && this.deliveries.containsKey(deliveryTag.toString());
    }

    public Mono<Void> sendDisposition(String deliveryTag, DeliveryState desiredState) {
        if (this.isTerminated.get()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)DeliveryNotOnLinkException.linkClosed(deliveryTag, desiredState));
        }
        return this.sendDispositionImpl(deliveryTag, desiredState);
    }

    public void onDispositionAck(UUID deliveryTag, Delivery delivery) {
        DeliveryState.DeliveryStateType remoteOutcomeType;
        DeliveryState remoteState = delivery.getRemoteState();
        this.logger.atVerbose().addKeyValue(DELIVERY_TAG_KEY, (Object)deliveryTag).addKeyValue("deliveryState", (Object)remoteState).log("onDispositionAck");
        Object remoteOutcome = remoteState instanceof Outcome ? (Outcome)remoteState : (remoteState instanceof TransactionalState ? ((TransactionalState)remoteState).getOutcome() : null);
        if (remoteOutcome == null) {
            this.logger.atWarning().addKeyValue(DELIVERY_TAG_KEY, (Object)deliveryTag).addKeyValue("delivery", (Object)delivery).log("No outcome associated with delivery.");
            return;
        }
        DispositionWork work = this.pendingDispositions.get(deliveryTag.toString());
        if (work == null) {
            this.logger.atWarning().addKeyValue(DELIVERY_TAG_KEY, (Object)deliveryTag).addKeyValue("delivery", (Object)delivery).log("No pending update for delivery.");
            return;
        }
        DeliveryState.DeliveryStateType desiredOutcomeType = work.getDesiredState().getType();
        if (desiredOutcomeType == (remoteOutcomeType = remoteState.getType())) {
            this.completeDispositionWorkWithSettle(work, delivery, null);
        } else {
            this.logger.atInfo().addKeyValue(DELIVERY_TAG_KEY, (Object)deliveryTag).addKeyValue("receivedDeliveryState", (Object)remoteState).addKeyValue("deliveryState", (Object)work.getDesiredState()).log("Received delivery state doesn't match expected state.");
            if (remoteOutcomeType == DeliveryState.DeliveryStateType.Rejected) {
                this.handleRetriableRejectedRemoteOutcome(work, delivery, (Rejected)remoteOutcome);
            } else {
                this.handleReleasedOrUnknownRemoteOutcome(work, delivery, (Outcome)remoteOutcome);
            }
        }
    }

    public Mono<Void> terminateAndAwaitForDispositionsInProgressToComplete() {
        Mono workMonoListMerged;
        this.isTerminated.getAndSet(true);
        this.completeDispositionWorksOnTimeout("terminateAndAwaitForDispositionsInProgressToComplete");
        ArrayList<Mono<Void>> workMonoList = new ArrayList<Mono<Void>>();
        StringJoiner deliveryTags = new StringJoiner(", ");
        for (DispositionWork work : this.pendingDispositions.values()) {
            if (work == null || work.hasTimedout()) continue;
            if (work.getDesiredState() instanceof TransactionalState) {
                Mono<Void> workMono = this.sendDispositionImpl(work.getDeliveryTag(), (DeliveryState)Released.getInstance());
                workMonoList.add(workMono);
            } else {
                workMonoList.add(work.getMono());
            }
            deliveryTags.add(work.getDeliveryTag());
        }
        if (!workMonoList.isEmpty()) {
            this.logger.info("Waiting for pending updates to complete. Locks: {}", new Object[]{deliveryTags.toString()});
            workMonoListMerged = Mono.whenDelayError(workMonoList).onErrorResume(error -> {
                this.logger.info("There was exception(s) while disposing of all disposition work.", new Object[]{error});
                return Mono.empty();
            });
        } else {
            workMonoListMerged = Mono.empty();
        }
        return workMonoListMerged.doFinally(__ -> this.timoutTimer.dispose());
    }

    @Override
    public void close() {
        this.isTerminated.getAndSet(true);
        if (this.settleOnClose && !this.deliveries.isEmpty()) {
            Runnable localSettlement = () -> {
                for (Delivery delivery : this.deliveries.values()) {
                    delivery.disposition((DeliveryState)new Modified());
                    delivery.settle();
                }
            };
            try {
                this.dispatcher.invoke(localSettlement);
            }
            catch (IOException e) {
                this.logger.info("IO sink was closed when scheduling local settlement. Manually settling.", new Object[]{e});
                localSettlement.run();
            }
            catch (RejectedExecutionException e) {
                this.logger.info("RejectedExecutionException when scheduling local settlement. Manually settling.", new Object[]{e});
                localSettlement.run();
            }
        }
        this.timoutTimer.dispose();
        this.completeDispositionWorksOnClose();
    }

    private Mono<Void> sendDispositionImpl(String deliveryTag, DeliveryState desiredState) {
        Delivery delivery = this.deliveries.get(deliveryTag);
        if (delivery == null) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)DeliveryNotOnLinkException.noMatchingDelivery(deliveryTag, desiredState));
        }
        DispositionWork work = new DispositionWork(deliveryTag, desiredState, this.timeout);
        Mono mono = Mono.create(sink -> {
            work.onStart((MonoSink<Void>)sink);
            try {
                this.dispatcher.invoke(() -> {
                    delivery.disposition(desiredState);
                    if (this.pendingDispositions.putIfAbsent(deliveryTag, work) != null) {
                        work.onComplete((Throwable)((Object)new AmqpException(false, "A disposition requested earlier is waiting for the broker's ack; a new disposition request is not allowed.", null)));
                    }
                });
            }
            catch (IOException | RejectedExecutionException dispatchError) {
                work.onComplete((Throwable)((Object)new AmqpException(false, "updateDisposition failed while dispatching to Reactor.", (Throwable)dispatchError, this.getErrorContext(delivery))));
            }
        });
        work.setMono((Mono<Void>)mono);
        return work.getMono();
    }

    private void handleRetriableRejectedRemoteOutcome(DispositionWork work, Delivery delivery, Rejected remoteOutcome) {
        AmqpErrorContext amqpErrorContext = this.getErrorContext(delivery);
        ErrorCondition errorCondition = remoteOutcome.getError();
        Exception error = ExceptionUtil.toException(errorCondition.getCondition().toString(), errorCondition.getDescription(), amqpErrorContext);
        Duration retry = this.retryPolicy.calculateRetryDelay(error, work.getTryCount());
        if (retry != null) {
            work.onRetriableRejectedOutcome(error);
            try {
                this.dispatcher.invoke(() -> delivery.disposition(work.getDesiredState()));
            }
            catch (IOException | RejectedExecutionException dispatchError) {
                RuntimeException amqpException = this.logger.atError().addKeyValue(DELIVERY_TAG_KEY, work.getDeliveryTag()).addKeyValue("linkName", this.receiveLinkName).log((RuntimeException)((Object)new AmqpException(false, "Retrying updateDisposition failed to dispatch to Reactor.", (Throwable)dispatchError, this.getErrorContext(delivery))));
                this.completeDispositionWorkWithSettle(work, delivery, amqpException);
            }
        } else {
            this.logger.atInfo().addKeyValue(DELIVERY_TAG_KEY, work.getDeliveryTag()).addKeyValue("deliveryState", (Object)delivery.getRemoteState()).log("Retry attempts exhausted.", new Object[]{error});
            this.completeDispositionWorkWithSettle(work, delivery, error);
        }
    }

    private void handleReleasedOrUnknownRemoteOutcome(DispositionWork work, Delivery delivery, Outcome remoteOutcome) {
        AmqpErrorContext amqpErrorContext = this.getErrorContext(delivery);
        DeliveryState.DeliveryStateType remoteOutcomeType = delivery.getRemoteState().getType();
        AmqpException completionError = remoteOutcomeType == DeliveryState.DeliveryStateType.Released ? new AmqpException(false, AmqpErrorCondition.OPERATION_CANCELLED, "AMQP layer unexpectedly aborted or disconnected.", amqpErrorContext) : new AmqpException(false, remoteOutcome.toString(), amqpErrorContext);
        this.logger.atInfo().addKeyValue(DELIVERY_TAG_KEY, work.getDeliveryTag()).addKeyValue("deliveryState", (Object)delivery.getRemoteState()).log("Completing pending updateState operation with exception.", new Object[]{completionError});
        this.completeDispositionWorkWithSettle(work, delivery, (Throwable)((Object)completionError));
    }

    private void completeDispositionWorksOnTimeout(String callSite) {
        if (this.pendingDispositions.isEmpty()) {
            return;
        }
        int[] completionCount = new int[1];
        StringJoiner deliveryTags = new StringJoiner(", ");
        this.pendingDispositions.forEach((deliveryTag, work) -> {
            if (work == null || !work.hasTimedout()) {
                return;
            }
            if (completionCount[0] == 0) {
                this.logger.atInfo().addKeyValue("callSite", callSite).log("Starting completion of timed out disposition works.");
            }
            Object completionError = work.getRejectedOutcomeError() != null ? work.getRejectedOutcomeError() : new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, "Update disposition request timed out.", this.getErrorContext(this.deliveries.get(work.getDeliveryTag())));
            deliveryTags.add(work.getDeliveryTag());
            this.completeDispositionWork((DispositionWork)work, (Throwable)completionError);
            completionCount[0] = completionCount[0] + 1;
        });
        if (completionCount[0] > 0) {
            this.logger.atInfo().addKeyValue("callSite", callSite).addKeyValue("locks", deliveryTags.toString()).log("Completed {} timed-out disposition works.", new Object[]{completionCount[0]});
        }
    }

    private void completeDispositionWorksOnClose() {
        if (this.pendingDispositions.isEmpty()) {
            return;
        }
        int[] completionCount = new int[1];
        StringJoiner deliveryTags = new StringJoiner(", ");
        AmqpException completionError = new AmqpException(false, "The receiver didn't receive the disposition acknowledgment due to receive link closure.", null);
        this.pendingDispositions.forEach((deliveryTag, work) -> {
            if (work == null || work.isCompleted()) {
                return;
            }
            if (completionCount[0] == 0) {
                this.logger.info("Starting completion of disposition works as part of receive link closure.");
            }
            deliveryTags.add(work.getDeliveryTag());
            this.completeDispositionWork((DispositionWork)work, (Throwable)((Object)completionError));
            completionCount[0] = completionCount[0] + 1;
        });
        if (completionCount[0] > 0) {
            this.logger.info("Completed {} disposition works as part of receive link closure. Locks {}", new Object[]{completionCount[0], deliveryTags.toString()});
        }
    }

    private void completeDispositionWorkWithSettle(DispositionWork work, Delivery delivery, Throwable completionError) {
        boolean isRemotelySettled = delivery.remotelySettled();
        if (isRemotelySettled) {
            delivery.settle();
        }
        if (completionError != null) {
            Throwable loggedError = completionError instanceof RuntimeException ? this.logger.logExceptionAsError((RuntimeException)completionError) : completionError;
            work.onComplete(loggedError);
        } else {
            work.onComplete();
        }
        if (isRemotelySettled) {
            String deliveryTag = work.getDeliveryTag();
            this.pendingDispositions.remove(deliveryTag);
            this.deliveries.remove(deliveryTag);
        }
    }

    private void completeDispositionWork(DispositionWork work, Throwable completionError) {
        this.pendingDispositions.remove(work.getDeliveryTag());
        Throwable loggedError = completionError instanceof RuntimeException ? this.logger.logExceptionAsError((RuntimeException)completionError) : completionError;
        work.onComplete(loggedError);
    }

    private AmqpErrorContext getErrorContext(Delivery delivery) {
        if (delivery == null || delivery.getLink() == null) {
            return null;
        }
        return LinkHandler.getErrorContext(this.hostname, this.entityPath, delivery.getLink());
    }

    private static final class DispositionWork
    extends AtomicBoolean {
        private final AtomicInteger tryCount = new AtomicInteger(1);
        private final String deliveryTag;
        private final DeliveryState desiredState;
        private final Duration timeout;
        private Mono<Void> mono;
        private MonoSink<Void> monoSink;
        private Instant expirationTime;
        private Throwable rejectedOutcomeError;

        DispositionWork(String deliveryTag, DeliveryState desiredState, Duration timeout) {
            this.deliveryTag = deliveryTag;
            this.desiredState = desiredState;
            this.timeout = timeout;
            this.monoSink = null;
        }

        String getDeliveryTag() {
            return this.deliveryTag;
        }

        DeliveryState getDesiredState() {
            return this.desiredState;
        }

        int getTryCount() {
            return this.tryCount.get();
        }

        Throwable getRejectedOutcomeError() {
            return this.rejectedOutcomeError;
        }

        boolean hasTimedout() {
            return this.expirationTime != null && this.expirationTime.isBefore(Instant.now());
        }

        Mono<Void> getMono() {
            return this.mono;
        }

        void setMono(Mono<Void> mono) {
            this.mono = mono.cache();
        }

        boolean isCompleted() {
            return this.get();
        }

        void onStart(MonoSink<Void> monoSink) {
            this.monoSink = monoSink;
            this.expirationTime = Instant.now().plus(this.timeout);
        }

        void onRetriableRejectedOutcome(Throwable error) {
            this.rejectedOutcomeError = error;
            this.expirationTime = Instant.now().plus(this.timeout);
            this.tryCount.incrementAndGet();
        }

        void onComplete() {
            this.set(true);
            Objects.requireNonNull(this.monoSink);
            this.monoSink.success();
        }

        void onComplete(Throwable error) {
            this.set(true);
            Objects.requireNonNull(this.monoSink);
            this.monoSink.error(error);
        }
    }
}

