From 0e73302b73aeb639260d4df3592a31ce606bf278 Mon Sep 17 00:00:00 2001 From: Srikanta Date: Thu, 29 Oct 2020 15:52:10 -0700 Subject: [PATCH 1/2] Close client and check for link id when link is stolen --- .../eventhubs/PartitionBasedLoadBalancer.java | 23 +++++++++++++++---- .../AmqpReceiveLinkProcessor.java | 18 +++++++++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java index 60085e873de9..2bb6d9ffd079 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java @@ -125,17 +125,19 @@ void loadBalance() { * Retrieve the list of partition ids from the Event Hub. */ Mono> partitionsMono; - if (partitionsCache.get() == null || partitionsCache.get().isEmpty()) { + if (CoreUtils.isNullOrEmpty(partitionsCache.get())) { // Call Event Hubs service to get the partition ids if the cache is empty + logger.info("Getting partitions from Event Hubs service for {}", eventHubName); partitionsMono = eventHubAsyncClient .getPartitionIds() .timeout(Duration.ofMinutes(1)) .collectList(); } else { partitionsMono = Mono.just(partitionsCache.get()); + // we have the partitions, the client can be closed now + closeClient(); } - Mono.zip(partitionOwnershipMono, partitionsMono) .flatMap(this::loadBalance) .then() @@ -170,8 +172,9 @@ private Mono loadBalance(final Tuple2, Lis } partitionsCache.set(partitionIds); int numberOfPartitions = partitionIds.size(); - logger.info("CheckpointStore returned {} ownership records", partitionOwnershipMap.size()); - logger.info("Event Hubs service returned {} partitions", numberOfPartitions); + logger.info("Number of ownership records {}, number of partitions {}", partitionOwnershipMap.size(), + numberOfPartitions); + if (!isValid(partitionOwnershipMap)) { // User data is corrupt. throw logger.logExceptionAsError(Exceptions.propagate( @@ -275,6 +278,18 @@ private Mono loadBalance(final Tuple2, Lis }); } + /* + * Closes the client used by load balancer to get the partitions. + */ + private void closeClient() { + try { + // this is an idempotent operation, calling close on an already closed client is just a no-op. + this.eventHubAsyncClient.close(); + } catch (Exception ex) { + logger.warning("Failed to close the client", ex); + } + } + /* * This method renews the ownership of currently owned partitions */ diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java index c3dbc0a137fe..7ab8b457dbef 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java @@ -5,6 +5,9 @@ import com.azure.core.amqp.AmqpEndpointState; import com.azure.core.amqp.AmqpRetryPolicy; +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.amqp.exception.LinkErrorContext; import com.azure.core.amqp.implementation.AmqpReceiveLink; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.message.Message; @@ -166,6 +169,21 @@ public void onNext(AmqpReceiveLink next) { } }, error -> { + if (error instanceof AmqpException) { + AmqpException amqpException = (AmqpException) error; + if (amqpException.getErrorCondition() == AmqpErrorCondition.LINK_STOLEN + && amqpException.getContext() != null + && amqpException.getContext() instanceof LinkErrorContext) { + LinkErrorContext errorContext = (LinkErrorContext) amqpException.getContext(); + if (currentLink != null + && !currentLink.getLinkName().equals(errorContext.getTrackingId())) { + logger.info("EntityPath[{}]: Link lost signal received for a link " + + "that is not current. Ignoring the error. Current link {}, link lost {}", + entityPath, linkName, errorContext.getTrackingId()); + return; + } + } + } currentLink = null; logger.warning("linkName[{}] entityPath[{}]. Error occurred in link.", linkName, entityPath); onError(error); From 1a73212d0a7e0ae3d7b4ebb85b56ec4715ebd7bc Mon Sep 17 00:00:00 2001 From: Srikanta Date: Thu, 29 Oct 2020 16:29:48 -0700 Subject: [PATCH 2/2] Fix checkstyle --- .../eventhubs/implementation/AmqpReceiveLinkProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java index 7ab8b457dbef..3e49c28f33be 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java @@ -177,8 +177,8 @@ public void onNext(AmqpReceiveLink next) { LinkErrorContext errorContext = (LinkErrorContext) amqpException.getContext(); if (currentLink != null && !currentLink.getLinkName().equals(errorContext.getTrackingId())) { - logger.info("EntityPath[{}]: Link lost signal received for a link " + - "that is not current. Ignoring the error. Current link {}, link lost {}", + logger.info("EntityPath[{}]: Link lost signal received for a link " + + "that is not current. Ignoring the error. Current link {}, link lost {}", entityPath, linkName, errorContext.getTrackingId()); return; }