/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.AmqpTransactionCoordinator;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.AmqpEndpointStateUtil;
import com.azure.core.amqp.implementation.AmqpLinkProvider;
import com.azure.core.amqp.implementation.AmqpLoggingUtils;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.ConsumerFactory;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ProtonSession;
import com.azure.core.amqp.implementation.ProtonSessionWrapper;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorReceiver;
import com.azure.core.amqp.implementation.ReactorSender;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.TransactionCoordinator;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class ReactorSession
implements AmqpSession {
    private static final String TRANSACTION_LINK_NAME = "coordinator";
    private static final String ACTIVE_WAIT_TIMED_OUT = "connectionId[%s] sessionName[%s] Timeout waiting for session to be active.";
    private static final String COMPLETED_WITHOUT_ACTIVE = "connectionId[%s] sessionName[%s] Session completed without being active.";
    private final ConcurrentMap<String, LinkSubscription<AmqpSendLink>> openSendLinks = new ConcurrentHashMap<String, LinkSubscription<AmqpSendLink>>();
    private final ConcurrentMap<String, LinkSubscription<AmqpReceiveLink>> openReceiveLinks = new ConcurrentHashMap<String, LinkSubscription<AmqpReceiveLink>>();
    private final Scheduler timeoutScheduler = Schedulers.parallel();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Object closeLock = new Object();
    private final Sinks.Empty<Void> isClosedMono = Sinks.empty();
    private final ClientLogger logger;
    private final Flux<AmqpEndpointState> endpointStates;
    private final AmqpConnection amqpConnection;
    private final ProtonSessionWrapper protonSession;
    private final Mono<Void> activeAwaiter;
    private final String id;
    private final String sessionName;
    private final ReactorProvider provider;
    private final TokenManagerProvider tokenManagerProvider;
    private final MessageSerializer messageSerializer;
    private final String activeTimeoutMessage;
    private final AmqpRetryOptions retryOptions;
    private final ReactorHandlerProvider handlerProvider;
    private final AmqpLinkProvider linkProvider;
    private final Mono<ClaimsBasedSecurityNode> cbsNodeSupplier;
    private final Disposable.Composite subscriptions = Disposables.composite();
    private final AtomicReference<TransactionCoordinator> transactionCoordinator = new AtomicReference();
    private final Flux<AmqpShutdownSignal> shutdownSignals;

    public ReactorSession(AmqpConnection amqpConnection, ProtonSessionWrapper protonSession, ReactorHandlerProvider handlerProvider, AmqpLinkProvider linkProvider, Mono<ClaimsBasedSecurityNode> cbsNodeSupplier, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, AmqpRetryOptions retryOptions) {
        boolean isV1OrV2WithoutSessionCache;
        this.amqpConnection = amqpConnection;
        this.protonSession = protonSession;
        this.handlerProvider = handlerProvider;
        this.id = protonSession.getId();
        this.sessionName = protonSession.getName();
        this.provider = protonSession.getReactorProvider();
        this.linkProvider = linkProvider;
        this.cbsNodeSupplier = cbsNodeSupplier;
        this.tokenManagerProvider = tokenManagerProvider;
        this.messageSerializer = messageSerializer;
        this.retryOptions = retryOptions;
        this.activeTimeoutMessage = String.format("ReactorSession connectionId[%s], session[%s]: Retries exhausted waiting for ACTIVE endpoint state.", protonSession.getConnectionId(), this.sessionName);
        HashMap<String, String> loggingContext = new HashMap<String, String>(3);
        loggingContext.put("connectionId", protonSession.getConnectionId());
        loggingContext.put("sessionName", this.sessionName);
        loggingContext.put("sessionId", this.id);
        this.logger = new ClientLogger(ReactorSession.class, loggingContext);
        this.endpointStates = protonSession.getEndpointStates().map(state -> {
            this.logger.atVerbose().addKeyValue("sessionName", this.sessionName).addKeyValue("state", state).log("Got endpoint state.");
            return AmqpEndpointStateUtil.getConnectionState(state);
        }).doOnError(error -> this.handleError((Throwable)error)).doOnComplete(() -> this.handleClose()).cache(1);
        this.shutdownSignals = amqpConnection.getShutdownSignals();
        this.subscriptions.add(this.endpointStates.subscribe(null, e -> this.logger.warning("Session endpoint state signaled error.", new Object[]{e})));
        this.subscriptions.add(this.shutdownSignals.flatMap(signal -> this.closeAsync("Shutdown signal received (" + signal.toString() + ")", null, false)).subscribe());
        boolean bl = isV1OrV2WithoutSessionCache = !protonSession.isV2ClientOnSessionCache();
        if (isV1OrV2WithoutSessionCache) {
            protonSession.openUnsafe(this.logger);
        }
        this.activeAwaiter = ReactorSession.activeAwaiter(protonSession, retryOptions.getTryTimeout(), this.endpointStates);
    }

    final String getId() {
        return this.id;
    }

    final Mono<ReactorSession> open() {
        return Mono.when((Publisher[])new Publisher[]{this.protonSession.open(), this.activeAwaiter}).thenReturn((Object)this);
    }

    final Mono<ProtonSessionWrapper.ProtonChannelWrapper> channel(String name) {
        return this.protonSession.channel(name, this.retryOptions.getTryTimeout());
    }

    final ProtonSessionWrapper session() {
        return this.protonSession;
    }

    @Override
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates;
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    public void dispose() {
        this.closeAsync().block(this.retryOptions.getTryTimeout());
    }

    @Override
    public String getSessionName() {
        return this.sessionName;
    }

    @Override
    public Duration getOperationTimeout() {
        return this.retryOptions.getTryTimeout();
    }

    @Override
    public Mono<AmqpTransaction> createTransaction() {
        return this.getOrCreateTransactionCoordinator().flatMap(coordinator -> coordinator.declare());
    }

    @Override
    public Mono<Void> commitTransaction(AmqpTransaction transaction) {
        return this.getOrCreateTransactionCoordinator().flatMap(coordinator -> coordinator.discharge(transaction, true));
    }

    @Override
    public Mono<Void> rollbackTransaction(AmqpTransaction transaction) {
        return this.getOrCreateTransactionCoordinator().flatMap(coordinator -> coordinator.discharge(transaction, false));
    }

    @Override
    public Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry) {
        return this.createProducer(linkName, entityPath, timeout, retry, null).or(this.onClosedError("Connection closed while waiting for new producer link.", entityPath, linkName));
    }

    @Override
    public Mono<AmqpLink> createConsumer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry) {
        return this.createConsumer(linkName, entityPath, timeout, retry, null, null, null, SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND, new ConsumerFactory()).or(this.onClosedError("Connection closed while waiting for new receive link.", entityPath, linkName)).cast(AmqpLink.class);
    }

    @Override
    public boolean removeLink(String linkName) {
        return this.removeLink(this.openSendLinks, linkName) || this.removeLink(this.openReceiveLinks, linkName);
    }

    Mono<Void> isClosed() {
        return this.isClosedMono.asMono();
    }

    @Override
    public Mono<Void> closeAsync() {
        return this.closeAsync(null, null, true);
    }

    Mono<Void> closeAsync(String message, ErrorCondition errorCondition, boolean disposeLinks) {
        if (this.isDisposed.getAndSet(true)) {
            return this.isClosedMono.asMono();
        }
        AmqpLoggingUtils.addErrorCondition(this.logger.atVerbose(), errorCondition).log("Setting error condition and disposing session. {}", new Object[]{message});
        return Mono.fromRunnable(() -> {
            try {
                this.provider.getReactorDispatcher().invoke(() -> this.disposeWork(errorCondition, disposeLinks));
            }
            catch (IOException e) {
                this.logger.atInfo().log("Error when scheduling work. Manually disposing.", new Object[]{e});
                this.disposeWork(errorCondition, disposeLinks);
            }
            catch (RejectedExecutionException e) {
                this.logger.atInfo().log("RejectedExecutionException when scheduling work.");
                this.disposeWork(errorCondition, disposeLinks);
            }
        }).then(this.isClosedMono.asMono());
    }

    @Override
    public Mono<? extends AmqpTransactionCoordinator> getOrCreateTransactionCoordinator() {
        if (this.isDisposed()) {
            return FluxUtil.monoError((LoggingEventBuilder)this.logger.atWarning(), (RuntimeException)((Object)new AmqpException(true, String.format("Cannot create coordinator send link %s from a closed session.", TRANSACTION_LINK_NAME), this.getErrorContext())));
        }
        TransactionCoordinator existing = this.transactionCoordinator.get();
        if (existing != null) {
            this.logger.atVerbose().addKeyValue(TRANSACTION_LINK_NAME, TRANSACTION_LINK_NAME).log("Returning existing transaction coordinator.");
            return Mono.just((Object)existing);
        }
        return this.createProducer(TRANSACTION_LINK_NAME, TRANSACTION_LINK_NAME, (org.apache.qpid.proton.amqp.transport.Target)new Coordinator(), this.retryOptions, null, false).map(link -> {
            TransactionCoordinator newCoordinator = new TransactionCoordinator((AmqpSendLink)link, this.messageSerializer);
            if (this.transactionCoordinator.compareAndSet(null, newCoordinator)) {
                return newCoordinator;
            }
            return this.transactionCoordinator.get();
        }).or(this.onClosedError("Connection closed while waiting for transaction coordinator creation.", "n/a", "n/a"));
    }

    protected Mono<AmqpReceiveLink> createConsumer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry, Map<Symbol, Object> sourceFilters, Map<Symbol, Object> receiverProperties, Symbol[] receiverDesiredCapabilities, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode, ConsumerFactory consumerFactory) {
        if (this.isDisposed()) {
            LoggingEventBuilder logBuilder = this.logger.atWarning().addKeyValue("entityPath", entityPath).addKeyValue("linkName", linkName);
            return FluxUtil.monoError((LoggingEventBuilder)logBuilder, (RuntimeException)((Object)new AmqpException(true, "Cannot create receive link from a closed session.", this.getErrorContext())));
        }
        LinkSubscription existingLink = (LinkSubscription)this.openReceiveLinks.get(linkName);
        if (existingLink != null) {
            ProtonSession.ProtonSessionClosedException error = existingLink.getError();
            if (error != null) {
                return Mono.error((Throwable)((Object)error));
            }
            this.logger.atInfo().addKeyValue("linkName", linkName).addKeyValue("entityPath", entityPath).log("Returning existing receive link.");
            return Mono.just((Object)((AmqpReceiveLink)existingLink.getLink()));
        }
        TokenManager tokenManager = this.tokenManagerProvider.getTokenManager(this.cbsNodeSupplier, entityPath);
        return Mono.when((Publisher[])new Publisher[]{this.onActiveEndpoint(), tokenManager.authorize()}).then(Mono.create(sink -> {
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    LinkSubscription computed = this.openReceiveLinks.compute(linkName, (linkNameKey, existing) -> {
                        if (existing != null) {
                            this.logger.atInfo().addKeyValue("linkName", linkName).log("Another receive link exists. Disposing of new one.");
                            tokenManager.close();
                            return existing;
                        }
                        this.logger.atInfo().addKeyValue("linkName", linkName).log("Creating a new receiver link.");
                        return this.getSubscription((String)linkNameKey, entityPath, sourceFilters, receiverProperties, receiverDesiredCapabilities, senderSettleMode, receiverSettleMode, tokenManager, consumerFactory);
                    });
                    ProtonSession.ProtonSessionClosedException error = computed.getError();
                    if (error != null) {
                        sink.error((Throwable)((Object)error));
                    } else {
                        sink.success((Object)((AmqpReceiveLink)computed.getLink()));
                    }
                });
            }
            catch (IOException | RejectedExecutionException e) {
                sink.error((Throwable)e);
            }
        })).onErrorResume(t -> Mono.error(() -> {
            tokenManager.close();
            return t;
        }));
    }

    protected Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry, Map<Symbol, Object> linkProperties) {
        AmqpRetryOptions options;
        Target target = new Target();
        target.setAddress(entityPath);
        AmqpRetryOptions amqpRetryOptions = options = retry != null ? new AmqpRetryOptions(retry.getRetryOptions()) : new AmqpRetryOptions();
        if (timeout != null) {
            options.setTryTimeout(timeout);
        }
        return this.createProducer(linkName, entityPath, (org.apache.qpid.proton.amqp.transport.Target)target, options, linkProperties, true).cast(AmqpLink.class);
    }

    private Mono<AmqpSendLink> createProducer(String linkName, String entityPath, org.apache.qpid.proton.amqp.transport.Target target, AmqpRetryOptions options, Map<Symbol, Object> linkProperties, boolean requiresAuthorization) {
        Mono<Long> authorize;
        TokenManager tokenManager;
        if (this.isDisposed()) {
            LoggingEventBuilder logBuilder = this.logger.atWarning().addKeyValue("entityPath", entityPath).addKeyValue("linkName", linkName);
            return FluxUtil.monoError((LoggingEventBuilder)logBuilder, (RuntimeException)((Object)new AmqpException(true, "Cannot create send link from a closed session.", this.getErrorContext())));
        }
        LinkSubscription existing = (LinkSubscription)this.openSendLinks.get(linkName);
        if (existing != null) {
            ProtonSession.ProtonSessionClosedException error = existing.getError();
            if (error != null) {
                return Mono.error((Throwable)((Object)error));
            }
            this.logger.atVerbose().addKeyValue("linkName", linkName).log("Returning existing send link.");
            return Mono.just((Object)((AmqpSendLink)existing.getLink()));
        }
        if (requiresAuthorization) {
            tokenManager = this.tokenManagerProvider.getTokenManager(this.cbsNodeSupplier, entityPath);
            authorize = tokenManager.authorize();
        } else {
            tokenManager = null;
            authorize = Mono.empty();
        }
        return Mono.when((Publisher[])new Publisher[]{this.onActiveEndpoint(), authorize}).then(Mono.create(sink -> {
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    LinkSubscription computed = this.openSendLinks.compute(linkName, (linkNameKey, existingLink) -> {
                        if (existingLink != null) {
                            this.logger.atInfo().addKeyValue("linkName", linkName).log("Another send link exists. Disposing of new one.");
                            if (tokenManager != null) {
                                tokenManager.close();
                            }
                            return existingLink;
                        }
                        this.logger.atInfo().addKeyValue("linkName", linkName).log("Creating a new send link.");
                        return this.getSubscription(linkName, entityPath, target, linkProperties, options, tokenManager);
                    });
                    ProtonSession.ProtonSessionClosedException error = computed.getError();
                    if (error != null) {
                        sink.error((Throwable)((Object)error));
                    } else {
                        sink.success((Object)((AmqpSendLink)computed.getLink()));
                    }
                });
            }
            catch (IOException | RejectedExecutionException e) {
                sink.error((Throwable)e);
            }
        }));
    }

    private LinkSubscription<AmqpSendLink> getSubscription(String linkName, String entityPath, org.apache.qpid.proton.amqp.transport.Target target, Map<Symbol, Object> linkProperties, AmqpRetryOptions options, TokenManager tokenManager) {
        Sender sender;
        try {
            sender = this.protonSession.senderUnsafe(linkName);
        }
        catch (ProtonSession.ProtonSessionClosedException e) {
            return new LinkSubscription<AmqpSendLink>(e);
        }
        sender.setTarget(target);
        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        Source source = new Source();
        if (linkProperties != null && linkProperties.size() > 0) {
            String clientIdentifier = (String)linkProperties.get(AmqpConstants.CLIENT_IDENTIFIER);
            if (!CoreUtils.isNullOrEmpty((CharSequence)clientIdentifier)) {
                source.setAddress(clientIdentifier);
                linkProperties.remove(AmqpConstants.CLIENT_IDENTIFIER);
            }
            sender.setProperties(linkProperties);
        }
        sender.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        SendLinkHandler sendLinkHandler = this.handlerProvider.createSendLinkHandler(this.protonSession.getConnectionId(), this.protonSession.getHostname(), linkName, entityPath);
        BaseHandler.setHandler((Extendable)sender, (Handler)sendLinkHandler);
        sender.open();
        AmqpSendLink reactorSender = this.linkProvider.createSendLink(this.amqpConnection, entityPath, sender, sendLinkHandler, this.provider, tokenManager, this.messageSerializer, options, this.timeoutScheduler, this.handlerProvider.getMetricProvider(this.amqpConnection.getFullyQualifiedNamespace(), entityPath));
        Disposable subscription = reactorSender.getEndpointStates().subscribe(state -> {}, error -> {
            if (!this.isDisposed.get()) {
                this.removeLink(this.openSendLinks, linkName);
            }
        }, () -> {
            if (!this.isDisposed.get()) {
                this.logger.atInfo().addKeyValue("linkName", linkName).log("Complete. Removing and disposing send link.");
                this.removeLink(this.openSendLinks, linkName);
            }
        });
        return new LinkSubscription<AmqpSendLink>(reactorSender, subscription, String.format("connectionId[%s] session[%s]: Setting error on receive link.", this.protonSession.getConnectionId(), this.sessionName), null);
    }

    private LinkSubscription<AmqpReceiveLink> getSubscription(String linkName, String entityPath, Map<Symbol, Object> sourceFilters, Map<Symbol, Object> receiverProperties, Symbol[] receiverDesiredCapabilities, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode, TokenManager tokenManager, ConsumerFactory consumerFactory) {
        Receiver receiver;
        try {
            receiver = this.protonSession.receiverUnsafe(linkName);
        }
        catch (ProtonSession.ProtonSessionClosedException e) {
            return new LinkSubscription<AmqpReceiveLink>(e);
        }
        Source source = new Source();
        source.setAddress(entityPath);
        if (sourceFilters != null && sourceFilters.size() > 0) {
            source.setFilter(sourceFilters);
        }
        receiver.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        receiver.setSenderSettleMode(senderSettleMode);
        receiver.setReceiverSettleMode(receiverSettleMode);
        Target target = new Target();
        if (receiverProperties != null && !receiverProperties.isEmpty()) {
            receiver.setProperties(receiverProperties);
            String clientIdentifier = (String)receiverProperties.get(AmqpConstants.CLIENT_RECEIVER_IDENTIFIER);
            if (!CoreUtils.isNullOrEmpty((CharSequence)clientIdentifier)) {
                target.setAddress(clientIdentifier);
            }
        }
        receiver.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
        if (receiverDesiredCapabilities != null && receiverDesiredCapabilities.length > 0) {
            receiver.setDesiredCapabilities(receiverDesiredCapabilities);
        }
        AmqpReceiveLink reactorReceiver = consumerFactory.createConsumer(this.amqpConnection, linkName, entityPath, receiver, tokenManager, this.provider, this.handlerProvider, this.linkProvider, this.retryOptions);
        Disposable subscription = reactorReceiver.getEndpointStates().subscribe(state -> {}, error -> {
            if (!this.isDisposed.get()) {
                this.removeLink(this.openReceiveLinks, linkName);
            }
        }, () -> {
            if (!this.isDisposed.get()) {
                this.logger.atInfo().addKeyValue("linkName", linkName).addKeyValue("entityPath", entityPath).log("Complete. Removing receive link.");
                this.removeLink(this.openReceiveLinks, linkName);
            }
        });
        return new LinkSubscription<AmqpReceiveLink>(reactorReceiver, subscription, String.format("connectionId[%s] sessionName[%s]: Setting error on receive link.", this.protonSession.getConnectionId(), this.sessionName), null);
    }

    private <T> Mono<T> onClosedError(String message, String linkName, String entityPath) {
        return Mono.firstWithSignal((Mono[])new Mono[]{this.isClosedMono.asMono(), this.shutdownSignals.next()}).then(Mono.error((Throwable)((Object)new AmqpException(false, String.format("connectionId[%s] entityPath[%s] linkName[%s] Connection closed. %s", this.protonSession.getConnectionId(), entityPath, linkName, message), this.getErrorContext()))));
    }

    private Mono<Void> onActiveEndpoint() {
        return RetryUtil.withRetry(this.getEndpointStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE), this.retryOptions, this.activeTimeoutMessage).then();
    }

    private void handleClose() {
        this.logger.atVerbose().log("Disposing of active links due to session close.");
        this.closeAsync().subscribe();
    }

    private void handleError(Throwable error) {
        ErrorCondition condition;
        this.logger.atVerbose().log("Disposing of active links due to session error.");
        if (error instanceof AmqpException) {
            AmqpException exception = (AmqpException)((Object)error);
            String errorCondition = exception.getErrorCondition() != null ? exception.getErrorCondition().getErrorCondition() : "UNKNOWN";
            condition = new ErrorCondition(Symbol.getSymbol((String)errorCondition), exception.getMessage());
        } else {
            condition = null;
        }
        this.closeAsync(error.getMessage(), condition, true).subscribe();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void disposeWork(ErrorCondition errorCondition, boolean disposeLinks) {
        this.protonSession.beginClose(errorCondition);
        ArrayList closingLinks = new ArrayList();
        if (disposeLinks) {
            Object object = this.closeLock;
            synchronized (object) {
                this.openReceiveLinks.values().forEach(link -> {
                    if (link == null) {
                        return;
                    }
                    closingLinks.add(link.closeAsync(errorCondition));
                });
                this.openSendLinks.values().forEach(link -> {
                    if (link == null) {
                        return;
                    }
                    closingLinks.add(link.closeAsync(errorCondition));
                });
            }
        }
        Mono closeLinksMono = Mono.when(closingLinks).timeout(this.retryOptions.getTryTimeout()).onErrorResume(error -> {
            this.logger.atWarning().log("Timed out waiting for all links to close.", new Object[]{error});
            return Mono.empty();
        }).then(Mono.fromRunnable(() -> {
            this.isClosedMono.emitEmpty((signalType, result) -> {
                AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atWarning(), signalType, result).log("Unable to emit shutdown signal.");
                return false;
            });
            this.protonSession.endClose();
            this.subscriptions.dispose();
            this.logger.atInfo().log("session disposal is completed");
        }));
        this.subscriptions.add(closeLinksMono.subscribe());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T extends AmqpLink> boolean removeLink(ConcurrentMap<String, LinkSubscription<T>> openLinks, String key) {
        if (key == null) {
            return false;
        }
        Object object = this.closeLock;
        synchronized (object) {
            LinkSubscription removed = (LinkSubscription)openLinks.remove(key);
            if (removed != null) {
                removed.closeAsync(null).subscribe();
            }
            return removed != null;
        }
    }

    private AmqpErrorContext getErrorContext() {
        return this.protonSession.getErrorContext();
    }

    private static Mono<Void> activeAwaiter(ProtonSessionWrapper protonSession, Duration tryTimeout, Flux<AmqpEndpointState> endpointStates) {
        String connectionId = protonSession.getConnectionId();
        String sessionName = protonSession.getName();
        return endpointStates.filter(state -> state == AmqpEndpointState.ACTIVE).next().switchIfEmpty(Mono.defer(() -> {
            String message = String.format(COMPLETED_WITHOUT_ACTIVE, connectionId, sessionName);
            return Mono.error((Throwable)((Object)new AmqpException(true, message, protonSession.getErrorContext())));
        })).timeout(tryTimeout, Mono.error(() -> {
            String message = String.format(ACTIVE_WAIT_TIMED_OUT, connectionId, sessionName);
            return new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, message, protonSession.getErrorContext());
        })).then();
    }

    private static final class LinkSubscription<T extends AmqpLink> {
        private final AtomicBoolean isDisposed = new AtomicBoolean();
        private final T link;
        private final Disposable subscription;
        private final String errorMessage;
        private final ProtonSession.ProtonSessionClosedException error;

        private LinkSubscription(T link, Disposable subscription, String errorMessage) {
            this.link = link;
            this.subscription = subscription;
            this.errorMessage = errorMessage;
            this.error = null;
        }

        private LinkSubscription(ProtonSession.ProtonSessionClosedException error) {
            this.link = null;
            this.subscription = null;
            this.errorMessage = null;
            this.error = Objects.requireNonNull(error, "'error' cannot be null.");
        }

        public T getLink() {
            return this.link;
        }

        ProtonSession.ProtonSessionClosedException getError() {
            return this.error;
        }

        Mono<Void> closeAsync(ErrorCondition errorCondition) {
            if (this.isDisposed.getAndSet(true) || this.error != null) {
                return Mono.empty();
            }
            this.subscription.dispose();
            if (this.link instanceof ReactorReceiver) {
                return ((ReactorReceiver)this.link).closeAsync(this.errorMessage, errorCondition);
            }
            if (this.link instanceof ReactorSender) {
                return ((ReactorSender)this.link).closeAsync(this.errorMessage, errorCondition);
            }
            this.link.dispose();
            return Mono.empty();
        }

        /* synthetic */ LinkSubscription(AmqpLink x0, Disposable x1, String x2, 1 x3) {
            this(x0, x1, x2);
        }
    }
}

