diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java index 297d414b6a0e..11ff5cf49413 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java @@ -128,12 +128,19 @@ void stopAllPartitionPumps() { * @param ownership The partition ownership information for which the connection state will be verified. */ void verifyPartitionConnection(PartitionOwnership ownership) { - if (partitionPumps.containsKey(ownership.getPartitionId())) { - EventHubConsumerAsyncClient consumerClient = partitionPumps.get(ownership.getPartitionId()); + String partitionId = ownership.getPartitionId(); + if (partitionPumps.containsKey(partitionId)) { + EventHubConsumerAsyncClient consumerClient = partitionPumps.get(partitionId); if (consumerClient.isConnectionClosed()) { logger.info("Connection closed for {}, partition {}. Removing the consumer.", - ownership.getEventHubName(), ownership.getPartitionId()); - partitionPumps.remove(ownership.getPartitionId()); + ownership.getEventHubName(), partitionId); + try { + partitionPumps.get(partitionId).close(); + } catch (Exception ex) { + logger.warning(Messages.FAILED_CLOSE_CONSUMER_PARTITION, partitionId, ex); + } finally { + partitionPumps.remove(partitionId); + } } } }