diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java index 0ab2a23ec780..f68f4d598a2f 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java @@ -37,7 +37,7 @@ public class AmqpChannelProcessor extends Mono implements Processor, private final Object lock = new Object(); private final AmqpRetryPolicy retryPolicy; - private final String connectionId; + private final String fullyQualifiedNamespace; private final String entityPath; private final Function> endpointStatesFunction; @@ -48,9 +48,10 @@ public class AmqpChannelProcessor extends Mono implements Processor, private volatile Disposable connectionSubscription; private volatile Disposable retrySubscription; - public AmqpChannelProcessor(String connectionId, String entityPath, + public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath, Function> endpointStatesFunction, AmqpRetryPolicy retryPolicy, ClientLogger logger) { - this.connectionId = Objects.requireNonNull(connectionId, "'connectionId' cannot be null."); + this.fullyQualifiedNamespace = Objects + .requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null."); this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null."); this.endpointStatesFunction = Objects.requireNonNull(endpointStatesFunction, "'endpointStates' cannot be null."); @@ -71,7 +72,7 @@ public void onSubscribe(Subscription subscription) { @Override public void onNext(T amqpChannel) { - logger.info("connectionId[{}] entityPath[{}]: Setting next AMQP channel.", connectionId, entityPath); + logger.info("namespace[{}] entityPath[{}]: Setting next AMQP channel.", fullyQualifiedNamespace, entityPath); Objects.requireNonNull(amqpChannel, "'amqpChannel' cannot be null."); @@ -84,7 +85,8 @@ public void onNext(T amqpChannel) { currentChannel = amqpChannel; final ConcurrentLinkedDeque> currentSubscribers = subscribers; - subscribers = new ConcurrentLinkedDeque<>(); + logger.info("namespace[{}] entityPath[{}]: Next AMQP channel received, updating {} current " + + "subscribers: {}", fullyQualifiedNamespace, entityPath, subscribers.size(), subscribers); currentSubscribers.forEach(subscription -> subscription.onNext(amqpChannel)); @@ -93,6 +95,8 @@ public void onNext(T amqpChannel) { // Connection was successfully opened, we can reset the retry interval. if (state == AmqpEndpointState.ACTIVE) { retryAttempts.set(0); + logger.info("namespace[{}] entityPath[{}]: Channel is now active.", + fullyQualifiedNamespace, entityPath); } }, error -> { @@ -101,9 +105,11 @@ public void onNext(T amqpChannel) { }, () -> { if (isDisposed()) { - logger.info("Channel is disposed."); + logger.info("namespace[{}] entityPath[{}]: Channel is disposed.", + fullyQualifiedNamespace, entityPath); } else { - logger.info("Channel closed."); + logger.info("namespace[{}] entityPath[{}]: Channel is closed.", + fullyQualifiedNamespace, entityPath); setAndClearChannel(); } }); @@ -179,6 +185,8 @@ public void onError(Throwable throwable) { synchronized (lock) { final ConcurrentLinkedDeque> currentSubscribers = subscribers; subscribers = new ConcurrentLinkedDeque<>(); + logger.info("namespace[{}] entityPath[{}]: Error in AMQP channel processor. Notifying {} " + + "subscribers.", fullyQualifiedNamespace, entityPath, currentSubscribers.size()); currentSubscribers.forEach(subscriber -> subscriber.onError(throwable)); } @@ -192,7 +200,8 @@ public void onComplete() { synchronized (lock) { final ConcurrentLinkedDeque> currentSubscribers = subscribers; subscribers = new ConcurrentLinkedDeque<>(); - + logger.info("namespace[{}] entityPath[{}]: AMQP channel processor completed. Notifying {} " + + "subscribers.", fullyQualifiedNamespace, entityPath, currentSubscribers.size()); currentSubscribers.forEach(subscriber -> subscriber.onComplete()); } } @@ -205,8 +214,8 @@ public void subscribe(CoreSubscriber actual) { actual.onError(lastError); } else { Operators.error(actual, logger.logExceptionAsError(new IllegalStateException( - String.format("connectionId[%s] entityPath[%s]: Cannot subscribe. Processor is already terminated.", - connectionId, entityPath)))); + String.format("namespace[%s] entityPath[%s]: Cannot subscribe. Processor is already terminated.", + fullyQualifiedNamespace, entityPath)))); } return; @@ -223,6 +232,8 @@ public void subscribe(CoreSubscriber actual) { } subscribers.add(subscriber); + logger.info("Added a subscriber {} to AMQP channel processor. Total " + + "subscribers = {}", subscriber, subscribers.size()); if (!isRetryPending.get()) { requestUpstream(); @@ -253,25 +264,25 @@ public boolean isDisposed() { private void requestUpstream() { if (currentChannel != null) { - logger.verbose("connectionId[{}] entityPath[{}]: Connection exists, not requesting another.", - connectionId, entityPath); + logger.verbose("namespace[{}] entityPath[{}]: Connection exists, not requesting another.", + fullyQualifiedNamespace, entityPath); return; } else if (isDisposed()) { - logger.verbose("connectionId[{}] entityPath[{}]: Is already disposed.", connectionId, entityPath); + logger.verbose("namespace[{}] entityPath[{}]: Is already disposed.", fullyQualifiedNamespace, entityPath); return; } final Subscription subscription = UPSTREAM.get(this); if (subscription == null) { - logger.warning("connectionId[{}] entityPath[{}]: There is no upstream subscription.", - connectionId, entityPath); + logger.warning("namespace[{}] entityPath[{}]: There is no upstream subscription.", + fullyQualifiedNamespace, entityPath); return; } // subscribe(CoreSubscriber) may have requested a subscriber already. if (!isRequested.getAndSet(true)) { - logger.info("connectionId[{}] entityPath[{}]: Connection not requested, yet. Requesting one.", - connectionId, entityPath); + logger.info("namespace[{}] entityPath[{}]: Connection not requested, yet. Requesting one.", + fullyQualifiedNamespace, entityPath); subscription.request(1); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java index f89b954bf064..d0b070343d4d 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java @@ -69,7 +69,7 @@ class EventHubPartitionAsyncConsumer implements AutoCloseable { }); this.emitterProcessor = amqpReceiveLinkProcessor - .map(message -> onMessageReceived(message)) + .map(this::onMessageReceived) .doOnNext(event -> { // Keep track of the last position so if the link goes down, we don't start from the original location. final Long offset = event.getData().getOffset(); @@ -92,7 +92,11 @@ class EventHubPartitionAsyncConsumer implements AutoCloseable { public void close() { if (!isDisposed.getAndSet(true)) { emitterProcessor.onComplete(); - amqpReceiveLinkProcessor.cancel(); + if (!amqpReceiveLinkProcessor.isTerminated()) { + // cancel only if the processor is not already terminated. + amqpReceiveLinkProcessor.cancel(); + } + logger.info("Closed consumer for partition {}", this.partitionId); } }