Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,19 @@ void loadBalance() {
* Retrieve the list of partition ids from the Event Hub.
*/
Mono<List<String>> partitionsMono;
if (partitionsCache.get() == null || partitionsCache.get().isEmpty()) {
if (CoreUtils.isNullOrEmpty(partitionsCache.get())) {
// Call Event Hubs service to get the partition ids if the cache is empty
logger.info("Getting partitions from Event Hubs service for {}", eventHubName);
partitionsMono = eventHubAsyncClient
.getPartitionIds()
.timeout(Duration.ofMinutes(1))
.collectList();
} else {
partitionsMono = Mono.just(partitionsCache.get());
// we have the partitions, the client can be closed now
closeClient();
}


Mono.zip(partitionOwnershipMono, partitionsMono)
.flatMap(this::loadBalance)
.then()
Expand Down Expand Up @@ -170,8 +172,9 @@ private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, Lis
}
partitionsCache.set(partitionIds);
int numberOfPartitions = partitionIds.size();
logger.info("CheckpointStore returned {} ownership records", partitionOwnershipMap.size());
logger.info("Event Hubs service returned {} partitions", numberOfPartitions);
logger.info("Number of ownership records {}, number of partitions {}", partitionOwnershipMap.size(),
numberOfPartitions);

if (!isValid(partitionOwnershipMap)) {
// User data is corrupt.
throw logger.logExceptionAsError(Exceptions.propagate(
Expand Down Expand Up @@ -275,6 +278,18 @@ private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, Lis
});
}

/*
* Closes the client used by load balancer to get the partitions.
*/
private void closeClient() {
try {
// this is an idempotent operation, calling close on an already closed client is just a no-op.
this.eventHubAsyncClient.close();
} catch (Exception ex) {
logger.warning("Failed to close the client", ex);
}
}

/*
* This method renews the ownership of currently owned partitions
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.LinkErrorContext;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.message.Message;
Expand Down Expand Up @@ -166,6 +169,21 @@ public void onNext(AmqpReceiveLink next) {
}
},
error -> {
if (error instanceof AmqpException) {
AmqpException amqpException = (AmqpException) error;
if (amqpException.getErrorCondition() == AmqpErrorCondition.LINK_STOLEN
&& amqpException.getContext() != null
&& amqpException.getContext() instanceof LinkErrorContext) {
LinkErrorContext errorContext = (LinkErrorContext) amqpException.getContext();
if (currentLink != null
&& !currentLink.getLinkName().equals(errorContext.getTrackingId())) {
logger.info("EntityPath[{}]: Link lost signal received for a link "
+ "that is not current. Ignoring the error. Current link {}, link lost {}",
entityPath, linkName, errorContext.getTrackingId());
return;
}
}
}
currentLink = null;
logger.warning("linkName[{}] entityPath[{}]. Error occurred in link.", linkName, entityPath);
onError(error);
Expand Down