/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.annotation.ServiceClient;
import com.azure.core.util.IterableStream;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.SynchronousMessageSubscriber;
import com.azure.messaging.servicebus.SynchronousReceiveWork;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.AbandonOptions;
import com.azure.messaging.servicebus.models.CompleteOptions;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.DeferOptions;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

@ServiceClient(builder=ServiceBusClientBuilder.class)
public final class ServiceBusReceiverClient
implements AutoCloseable {
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverClient.class);
    private final AtomicInteger idGenerator = new AtomicInteger();
    private final ServiceBusReceiverAsyncClient asyncClient;
    private final Duration operationTimeout;
    private final boolean isPrefetchDisabled;
    private final AtomicReference<SynchronousMessageSubscriber> synchronousMessageSubscriber = new AtomicReference();
    private final AtomicBoolean syncSubscribed = new AtomicBoolean(false);
    private final ServiceBusTracer tracer;

    ServiceBusReceiverClient(ServiceBusReceiverAsyncClient asyncClient, boolean isPrefetchDisabled, Duration operationTimeout) {
        this.asyncClient = Objects.requireNonNull(asyncClient, "'asyncClient' cannot be null.");
        this.operationTimeout = Objects.requireNonNull(operationTimeout, "'operationTimeout' cannot be null.");
        this.isPrefetchDisabled = isPrefetchDisabled;
        this.tracer = asyncClient.getInstrumentation().getTracer();
    }

    public String getFullyQualifiedNamespace() {
        return this.asyncClient.getFullyQualifiedNamespace();
    }

    public String getEntityPath() {
        return this.asyncClient.getEntityPath();
    }

    public String getSessionId() {
        return this.asyncClient.getSessionId();
    }

    public String getIdentifier() {
        return this.asyncClient.getIdentifier();
    }

    public void abandon(ServiceBusReceivedMessage message) {
        this.asyncClient.abandon(message).block(this.operationTimeout);
    }

    public void abandon(ServiceBusReceivedMessage message, AbandonOptions options) {
        this.asyncClient.abandon(message, options).block(this.operationTimeout);
    }

    public void complete(ServiceBusReceivedMessage message) {
        this.asyncClient.complete(message).block(this.operationTimeout);
    }

    public void complete(ServiceBusReceivedMessage message, CompleteOptions options) {
        this.asyncClient.complete(message, options).block(this.operationTimeout);
    }

    public void defer(ServiceBusReceivedMessage message) {
        this.asyncClient.defer(message).block(this.operationTimeout);
    }

    public void defer(ServiceBusReceivedMessage message, DeferOptions options) {
        this.asyncClient.defer(message, options).block(this.operationTimeout);
    }

    public void deadLetter(ServiceBusReceivedMessage message) {
        this.asyncClient.deadLetter(message).block(this.operationTimeout);
    }

    public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options) {
        this.asyncClient.deadLetter(message, options).block(this.operationTimeout);
    }

    public byte[] getSessionState() {
        return (byte[])this.asyncClient.getSessionState().block(this.operationTimeout);
    }

    public ServiceBusReceivedMessage peekMessage() {
        return this.peekMessage(this.asyncClient.getReceiverOptions().getSessionId());
    }

    ServiceBusReceivedMessage peekMessage(String sessionId) {
        return (ServiceBusReceivedMessage)this.asyncClient.peekMessage(sessionId).block(this.operationTimeout);
    }

    public ServiceBusReceivedMessage peekMessage(long sequenceNumber) {
        return this.peekMessage(sequenceNumber, this.asyncClient.getReceiverOptions().getSessionId());
    }

    ServiceBusReceivedMessage peekMessage(long sequenceNumber, String sessionId) {
        return (ServiceBusReceivedMessage)this.asyncClient.peekMessage(sequenceNumber, sessionId).block(this.operationTimeout);
    }

    public IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages) {
        return this.peekMessages(maxMessages, this.asyncClient.getReceiverOptions().getSessionId());
    }

    IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages, String sessionId) {
        if (maxMessages <= 0) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages));
        }
        Flux<ServiceBusReceivedMessage> messages = this.tracer.traceSyncReceive("ServiceBus.peekMessages", (Flux<ServiceBusReceivedMessage>)this.asyncClient.peekMessages(maxMessages, sessionId).timeout(this.operationTimeout));
        return this.fromFluxAndSubscribe(messages);
    }

    public IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber) {
        return this.peekMessages(maxMessages, sequenceNumber, this.asyncClient.getReceiverOptions().getSessionId());
    }

    IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber, String sessionId) {
        if (maxMessages <= 0) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages));
        }
        Flux<ServiceBusReceivedMessage> messages = this.tracer.traceSyncReceive("ServiceBus.peekMessages", (Flux<ServiceBusReceivedMessage>)this.asyncClient.peekMessages(maxMessages, sequenceNumber, sessionId).timeout(this.operationTimeout));
        return this.fromFluxAndSubscribe(messages);
    }

    public IterableStream<ServiceBusReceivedMessage> receiveMessages(int maxMessages) {
        return this.receiveMessages(maxMessages, this.operationTimeout);
    }

    public IterableStream<ServiceBusReceivedMessage> receiveMessages(int maxMessages, Duration maxWaitTime) {
        if (maxMessages <= 0) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages));
        }
        if (Objects.isNull(maxWaitTime)) {
            throw LOGGER.logExceptionAsError((RuntimeException)new NullPointerException("'maxWaitTime' cannot be null."));
        }
        if (maxWaitTime.isNegative() || maxWaitTime.isZero()) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("'maxWaitTime' cannot be zero or less. maxWaitTime: " + maxWaitTime));
        }
        Sinks.Many emitter = Sinks.many().replay().all();
        this.queueWork(maxMessages, maxWaitTime, (Sinks.Many<ServiceBusReceivedMessage>)emitter);
        Flux<ServiceBusReceivedMessage> messagesFlux = this.tracer.traceSyncReceive("ServiceBus.receiveMessages", (Flux<ServiceBusReceivedMessage>)emitter.asFlux());
        messagesFlux.subscribe();
        return new IterableStream(messagesFlux);
    }

    public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber) {
        return this.receiveDeferredMessage(sequenceNumber, this.asyncClient.getReceiverOptions().getSessionId());
    }

    ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber, String sessionId) {
        return (ServiceBusReceivedMessage)this.asyncClient.receiveDeferredMessage(sequenceNumber, sessionId).block(this.operationTimeout);
    }

    public IterableStream<ServiceBusReceivedMessage> receiveDeferredMessageBatch(Iterable<Long> sequenceNumbers) {
        return this.receiveDeferredMessageBatch(sequenceNumbers, this.asyncClient.getReceiverOptions().getSessionId());
    }

    IterableStream<ServiceBusReceivedMessage> receiveDeferredMessageBatch(Iterable<Long> sequenceNumbers, String sessionId) {
        Flux<ServiceBusReceivedMessage> messages = this.tracer.traceSyncReceive("ServiceBus.receiveDeferredMessageBatch", (Flux<ServiceBusReceivedMessage>)this.asyncClient.receiveDeferredMessages(sequenceNumbers, sessionId).timeout(this.operationTimeout));
        return this.fromFluxAndSubscribe(messages);
    }

    public OffsetDateTime renewMessageLock(ServiceBusReceivedMessage message) {
        return (OffsetDateTime)this.asyncClient.renewMessageLock(message).block(this.operationTimeout);
    }

    public void renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration, Consumer<Throwable> onError) {
        String lockToken = message != null ? message.getLockToken() : "null";
        Consumer<Throwable> throwableConsumer = onError != null ? onError : error -> LOGGER.atWarning().addKeyValue("lockToken", lockToken).log("Exception occurred while renewing lock token.", new Object[]{error});
        this.asyncClient.renewMessageLock(message, maxLockRenewalDuration).subscribe(v -> LOGGER.atVerbose().addKeyValue("lockToken", lockToken).log("Completed renewing lock token."), throwableConsumer, () -> LOGGER.atVerbose().addKeyValue("lockToken", lockToken).log("Auto message lock renewal operation completed"));
    }

    public OffsetDateTime renewSessionLock() {
        return (OffsetDateTime)this.asyncClient.renewSessionLock().block(this.operationTimeout);
    }

    public void renewSessionLock(Duration maxLockRenewalDuration, Consumer<Throwable> onError) {
        this.renewSessionLock(this.asyncClient.getReceiverOptions().getSessionId(), maxLockRenewalDuration, onError);
    }

    public void setSessionState(byte[] sessionState) {
        this.asyncClient.setSessionState(sessionState).block(this.operationTimeout);
    }

    public ServiceBusTransactionContext createTransaction() {
        return (ServiceBusTransactionContext)this.asyncClient.createTransaction().block(this.operationTimeout);
    }

    public void commitTransaction(ServiceBusTransactionContext transactionContext) {
        this.asyncClient.commitTransaction(transactionContext).block(this.operationTimeout);
    }

    public void rollbackTransaction(ServiceBusTransactionContext transactionContext) {
        this.asyncClient.rollbackTransaction(transactionContext).block(this.operationTimeout);
    }

    @Override
    public void close() {
        SynchronousMessageSubscriber messageSubscriber = this.synchronousMessageSubscriber.get();
        if (messageSubscriber != null && !messageSubscriber.isDisposed()) {
            messageSubscriber.dispose();
        }
        this.asyncClient.close();
    }

    private void queueWork(int maximumMessageCount, Duration maxWaitTime, Sinks.Many<ServiceBusReceivedMessage> emitter) {
        long id = this.idGenerator.getAndIncrement();
        SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime, emitter);
        SynchronousMessageSubscriber messageSubscriber = this.synchronousMessageSubscriber.get();
        if (messageSubscriber != null) {
            messageSubscriber.queueWork(work);
            return;
        }
        messageSubscriber = this.synchronousMessageSubscriber.updateAndGet(subscriber -> {
            if (subscriber == null) {
                return new SynchronousMessageSubscriber(this.asyncClient, work, this.isPrefetchDisabled, this.operationTimeout);
            }
            return subscriber;
        });
        if (!this.syncSubscribed.getAndSet(true)) {
            this.asyncClient.receiveMessagesNoBackPressure().subscribeWith((Subscriber)messageSubscriber);
        } else {
            messageSubscriber.queueWork(work);
        }
        LOGGER.atVerbose().addKeyValue("workId", work.getId()).log("Receive request queued up.");
    }

    void renewSessionLock(String sessionId, Duration maxLockRenewalDuration, Consumer<Throwable> onError) {
        Consumer<Throwable> throwableConsumer = onError != null ? onError : error -> LOGGER.atWarning().addKeyValue("sessionId", sessionId).log("Exception occurred while renewing session.", new Object[]{error});
        this.asyncClient.renewSessionLock(maxLockRenewalDuration).subscribe(v -> LOGGER.atVerbose().addKeyValue("sessionId", sessionId).log("Completed renewing session"), throwableConsumer, () -> LOGGER.atVerbose().addKeyValue("sessionId", sessionId).log("Auto session lock renewal operation completed."));
    }

    private <T> IterableStream<T> fromFluxAndSubscribe(Flux<T> flux) {
        Flux cached = flux.cache();
        cached.subscribe();
        return new IterableStream(cached);
    }
}

