diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java index 7c6e464548e5..dae03a906856 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java @@ -18,6 +18,7 @@ import reactor.core.publisher.Operators; import java.time.Duration; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentLinkedDeque; @@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; +import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY; import static com.azure.core.amqp.implementation.ClientConstants.INTERVAL_KEY; public class AmqpChannelProcessor extends Mono implements Processor, CoreSubscriber, Disposable { @@ -64,8 +66,10 @@ public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath, this.endpointStatesFunction = Objects.requireNonNull(endpointStatesFunction, "'endpointStates' cannot be null."); this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null."); - this.logger = Objects.requireNonNull(logger, "'logger' cannot be null."); + Map loggingContext = new HashMap<>(1); + loggingContext.put(ENTITY_PATH_KEY, Objects.requireNonNull(entityPath, "'entityPath' cannot be null.")); + this.logger = new ClientLogger(getClass(), loggingContext); this.errorContext = new AmqpErrorContext(fullyQualifiedNamespace); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java index 59a1db3dc5f3..c5a18728e1bf 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java @@ -50,6 +50,13 @@ public class BlobCheckpointStore implements CheckpointStore { private static final String CHECKPOINT_PATH = "/checkpoint/"; private static final String OWNERSHIP_PATH = "/ownership/"; + // logging keys, consistent across all AMQP libraries and human-readable + private static final String PARTITION_ID_LOG_KEY = "partitionId"; + private static final String OWNER_ID_LOG_KEY = "ownerId"; + private static final String SEQUENCE_NUMBER_LOG_KEY = "sequenceNumber"; + private static final String BLOB_NAME_LOG_KEY = "blobName"; + private static final String OFFSET_LOG_KEY = "offset"; + /** * An empty string. */ @@ -103,20 +110,27 @@ private Flux listBlobs(String prefix, Function> convert private Mono convertToCheckpoint(BlobItem blobItem) { String[] names = blobItem.getName().split(BLOB_PATH_SEPARATOR); - logger.verbose(Messages.FOUND_BLOB_FOR_PARTITION, blobItem.getName()); + logger.atVerbose() + .addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()) + .log(Messages.FOUND_BLOB_FOR_PARTITION); if (names.length == 5) { // Blob names should be of the pattern // fullyqualifiednamespace/eventhub/consumergroup/checkpoints/ // While we can further check if the partition id is numeric, it may not necessarily be the case in future. if (CoreUtils.isNullOrEmpty(blobItem.getMetadata())) { - logger.warning(Messages.NO_METADATA_AVAILABLE_FOR_BLOB, blobItem.getName()); + logger.atWarning() + .addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()) + .log(Messages.NO_METADATA_AVAILABLE_FOR_BLOB); return Mono.empty(); } Map metadata = blobItem.getMetadata(); - logger.verbose(Messages.CHECKPOINT_INFO, blobItem.getName(), metadata.get(SEQUENCE_NUMBER), - metadata.get(OFFSET)); + logger.atVerbose() + .addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()) + .addKeyValue(SEQUENCE_NUMBER_LOG_KEY, metadata.get(SEQUENCE_NUMBER)) + .addKeyValue(OFFSET_LOG_KEY, metadata.get(OFFSET)) + .log(Messages.CHECKPOINT_INFO); Long sequenceNumber = null; Long offset = null; @@ -176,7 +190,9 @@ public Flux claimOwnership(List requeste .uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null, metadata, null, null, blobRequestConditions) .flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> { - logger.verbose(Messages.CLAIM_ERROR, partitionId, error.getMessage()); + logger.atVerbose() + .addKeyValue(PARTITION_ID_LOG_KEY, partitionId) + .log(Messages.CLAIM_ERROR, error); return Mono.empty(); }, Mono::empty); } else { @@ -184,12 +200,16 @@ public Flux claimOwnership(List requeste blobRequestConditions.setIfMatch(partitionOwnership.getETag()); return blobAsyncClient.setMetadataWithResponse(metadata, blobRequestConditions) .flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> { - logger.verbose(Messages.CLAIM_ERROR, partitionId, error); + logger.atVerbose() + .addKeyValue(PARTITION_ID_LOG_KEY, partitionId) + .log(Messages.CLAIM_ERROR, error); return Mono.empty(); }, Mono::empty); } } catch (Exception ex) { - logger.warning(Messages.CLAIM_ERROR, partitionOwnership.getPartitionId(), ex); + logger.atWarning() + .addKeyValue(PARTITION_ID_LOG_KEY, partitionOwnership.getPartitionId()) + .log(Messages.CLAIM_ERROR, ex); return Mono.empty(); } }); @@ -252,19 +272,21 @@ private String getBlobName(String fullyQualifiedNamespace, String eventHubName, } private Mono convertToPartitionOwnership(BlobItem blobItem) { - logger.verbose(Messages.FOUND_BLOB_FOR_PARTITION, blobItem.getName()); + logger.atVerbose() + .addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()) + .log(Messages.FOUND_BLOB_FOR_PARTITION); + String[] names = blobItem.getName().split(BLOB_PATH_SEPARATOR); if (names.length == 5) { // Blob names should be of the pattern // fullyqualifiednamespace/eventhub/consumergroup/ownership/ // While we can further check if the partition id is numeric, it may not necessarily be the case in future. if (CoreUtils.isNullOrEmpty(blobItem.getMetadata())) { - logger.warning(Messages.NO_METADATA_AVAILABLE_FOR_BLOB, blobItem.getName()); + logger.atWarning() + .addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()) + .log(Messages.NO_METADATA_AVAILABLE_FOR_BLOB); return Mono.empty(); } - logger - .verbose(Messages.BLOB_OWNER_INFO, blobItem.getName(), - blobItem.getMetadata().getOrDefault(OWNER_ID, EMPTY_STRING)); BlobItemProperties blobProperties = blobItem.getProperties(); @@ -273,6 +295,11 @@ private Mono convertToPartitionOwnership(BlobItem blobItem) ownerId = EMPTY_STRING; } + logger.atVerbose() + .addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()) + .addKeyValue(OWNER_ID_LOG_KEY, ownerId) + .log(Messages.BLOB_OWNER_INFO); + PartitionOwnership partitionOwnership = new PartitionOwnership() .setFullyQualifiedNamespace(names[0]) .setEventHubName(names[1]) diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/eventhubs-checkpointstore-blob-messages.properties b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/eventhubs-checkpointstore-blob-messages.properties index 76dcc830fd7b..a548bcbd5692 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/eventhubs-checkpointstore-blob-messages.properties +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/eventhubs-checkpointstore-blob-messages.properties @@ -1,5 +1,5 @@ -NO_METADATA_AVAILABLE_FOR_BLOB=No metadata available for blob {} -CLAIM_ERROR=Couldn't claim ownership of partition {} -FOUND_BLOB_FOR_PARTITION=Found blob for partition {} -BLOB_OWNER_INFO=Blob {} is owned by {} -CHECKPOINT_INFO=Blob {} has checkpoint with sequence number {} and offset {} +NO_METADATA_AVAILABLE_FOR_BLOB=No metadata available for blob. +CLAIM_ERROR=Couldn't claim ownership of partition. +FOUND_BLOB_FOR_PARTITION=Found blob for partition. +BLOB_OWNER_INFO=Blob ownership info. +CHECKPOINT_INFO=Blob checkpoint info. diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventData.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventData.java index 505c3f080e6b..e52815ce140a 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventData.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventData.java @@ -54,6 +54,7 @@ public class EventData { */ static final Set RESERVED_SYSTEM_PROPERTIES; + private static final ClientLogger LOGGER = new ClientLogger(EventData.class); private final Map properties; private final SystemProperties systemProperties; private final AmqpAnnotatedMessage annotatedMessage; @@ -140,11 +141,11 @@ public EventData(BinaryData body) { break; case SEQUENCE: case VALUE: - new ClientLogger(EventData.class).warning("Message body type '{}' is not supported in EH. " + LOGGER.warning("Message body type '{}' is not supported in EH. " + " Getting contents of body may throw.", annotatedMessage.getBody().getBodyType()); break; default: - throw new ClientLogger(EventData.class).logExceptionAsError(new IllegalArgumentException( + throw LOGGER.logExceptionAsError(new IllegalArgumentException( "Body type not valid " + annotatedMessage.getBody().getBodyType())); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java index f24abda7de1f..39dd06ebe1e5 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java @@ -50,6 +50,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONNECTION_ID_KEY; + /** * This class provides a fluent builder API to aid the instantiation of {@link EventHubProducerAsyncClient}, {@link * EventHubProducerClient}, {@link EventHubConsumerAsyncClient}, and {@link EventHubConsumerClient}. Calling any of the @@ -748,7 +750,9 @@ private EventHubConnectionProcessor buildConnectionProcessor(MessageSerializer m } final String connectionId = StringUtil.getRandomString("MF"); - logger.info("connectionId[{}]: Emitting a single connection.", connectionId); + logger.atInfo() + .addKeyValue(CONNECTION_ID_KEY, connectionId) + .log("Emitting a single connection."); final TokenManagerProvider tokenManagerProvider = new AzureTokenManagerProvider( connectionOptions.getAuthorizationType(), connectionOptions.getFullyQualifiedNamespace(), diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java index da7142d5ebdc..9e21c88e6a39 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java @@ -30,8 +30,12 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONNECTION_ID_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.LINK_NAME_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.SIGNAL_TYPE_KEY; import static com.azure.core.util.FluxUtil.fluxError; import static com.azure.core.util.FluxUtil.monoError; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY; /** * An asynchronous consumer responsible for reading {@link EventData} from either a specific Event Hub partition @@ -413,8 +417,11 @@ private Flux createConsumer(String linkName, String partitionId, } private void removeLink(String linkName, String partitionId, SignalType signalType) { - logger.info("linkName[{}], partitionId[{}], signal[{}]: Receiving completed.", - linkName, partitionId, signalType); + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue(PARTITION_ID_KEY, partitionId) + .addKeyValue(SIGNAL_TYPE_KEY, signalType) + .log("Receiving completed."); final EventHubPartitionAsyncConsumer consumer = openPartitionConsumers.remove(linkName); @@ -434,8 +441,11 @@ private EventHubPartitionAsyncConsumer createPartitionConsumer(String linkName, // final Mono receiveLinkMono = connectionProcessor .flatMap(connection -> { - logger.info("connectionId[{}] linkName[{}] Creating receive consumer for partition '{}'", - connection.getId(), linkName, partitionId); + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue(PARTITION_ID_KEY, partitionId) + .addKeyValue(CONNECTION_ID_KEY, connection.getId()) + .log("Creating receive consumer for partition."); return connection.createReceiveLink(linkName, entityPath, initialPosition.get().get(), receiveOptions); }); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerClient.java index d9c567698e8f..f252f0fe1507 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerClient.java @@ -21,6 +21,8 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY; + /** * A synchronous consumer responsible for reading {@link EventData} from an Event Hub partition in the context of * a specific consumer group. @@ -287,7 +289,10 @@ private void queueWork(String partitionId, int maximumMessageCount, EventPositio final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maximumWaitTime, emitter); final SynchronousEventSubscriber syncSubscriber = new SynchronousEventSubscriber(work); - logger.info("Started synchronous event subscriber for partition '{}'.", partitionId); + logger.atInfo() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .log("Started synchronous event subscriber."); + consumer.receiveFromPartition(partitionId, startingPosition, receiveOptions).subscribeWith(syncSubscriber); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubMessageSerializer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubMessageSerializer.java index 4b462850d08a..9085564ef2e2 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubMessageSerializer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubMessageSerializer.java @@ -103,7 +103,7 @@ public Message serialize(T object) { if (!(object instanceof EventData)) { throw logger.logExceptionAsError(new IllegalArgumentException( - "Cannot serialize object that is not EventData. Clazz: " + object.getClass())); + "Cannot serialize object that is not EventData. Class: " + object.getClass())); } final EventData eventData = (EventData) object; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java index 7c8fa1bcbe9e..178e473fb8aa 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java @@ -19,6 +19,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONSUMER_GROUP_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY; + /** * A package-private consumer responsible for reading {@link EventData} from a specific Event Hub partition in the * context of a specific consumer group. @@ -72,10 +75,11 @@ class EventHubPartitionAsyncConsumer implements AutoCloseable { if (offset != null) { currentOffset = offset; } else { - logger.warning( - "Offset for received event should not be null. Partition Id: {}. Consumer group: {}. Data: {}", - event.getPartitionContext().getPartitionId(), event.getPartitionContext().getConsumerGroup(), - event.getData().getBodyAsString()); + logger.atWarning() + .addKeyValue(PARTITION_ID_KEY, event.getPartitionContext().getPartitionId()) + .addKeyValue(CONSUMER_GROUP_KEY, event.getPartitionContext().getConsumerGroup()) + .addKeyValue("data", () -> event.getData().getBodyAsString()) + .log("Offset for received event should not be null."); } }); } @@ -90,7 +94,9 @@ public void close() { // cancel only if the processor is not already terminated. amqpReceiveLinkProcessor.cancel(); } - logger.info("Closed consumer for partition {}", this.partitionId); + logger.atInfo() + .addKeyValue(PARTITION_ID_KEY, this.partitionId) + .log("Closed consumer."); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java index 7be4f4059c45..874c623f9d02 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java @@ -56,6 +56,8 @@ import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_NAMESPACE_VALUE; import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_TRACING_SERVICE_NAME; import static com.azure.messaging.eventhubs.implementation.ClientConstants.MAX_MESSAGE_LENGTH_BYTES; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_KEY_KEY; /** * An asynchronous producer responsible for transmitting {@link EventData} to a specific Event Hub, grouped @@ -510,12 +512,19 @@ public Mono send(EventDataBatch batch) { } if (!CoreUtils.isNullOrEmpty(batch.getPartitionId())) { - logger.verbose("Sending batch with size[{}] to partitionId[{}].", batch.getCount(), batch.getPartitionId()); + logger.atVerbose() + .addKeyValue("size", batch.getCount()) + .addKeyValue(PARTITION_ID_KEY, batch.getPartitionId()) + .log("Sending batch."); } else if (!CoreUtils.isNullOrEmpty(batch.getPartitionKey())) { - logger.verbose("Sending batch with size[{}] with partitionKey[{}].", - batch.getCount(), batch.getPartitionKey()); + logger.atVerbose() + .addKeyValue("size", batch.getCount()) + .addKeyValue(PARTITION_KEY_KEY, batch.getPartitionKey()) + .log("Sending batch."); } else { - logger.verbose("Sending batch with size[{}] to be distributed round-robin in service.", batch.getCount()); + logger.atVerbose() + .addKeyValue("size", batch.getCount()) + .log("Sending batch to be distributed round-robin in service."); } final String partitionKey = batch.getPartitionKey(); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java index 2ce4b85ed269..443fc37e0544 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java @@ -10,6 +10,7 @@ import com.azure.messaging.eventhubs.models.ErrorContext; import com.azure.messaging.eventhubs.models.EventPosition; import java.time.Duration; +import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -40,7 +41,7 @@ public class EventProcessorClient { private static final long BASE_JITTER_IN_SECONDS = 2; // the initial delay jitter before starting the processor - private final ClientLogger logger = new ClientLogger(EventProcessorClient.class); + private final ClientLogger logger; private final String identifier; private final AtomicBoolean isRunning = new AtomicBoolean(false); @@ -90,12 +91,16 @@ public class EventProcessorClient { this.checkpointStore = Objects.requireNonNull(checkpointStore, "checkpointStore cannot be null"); this.identifier = UUID.randomUUID().toString(); + + Map loggingContext = new HashMap<>(); + loggingContext.put("eventProcessorId", identifier); + + this.logger = new ClientLogger(EventProcessorClient.class, loggingContext); this.fullyQualifiedNamespace = eventHubAsyncClient.getFullyQualifiedNamespace().toLowerCase(Locale.ROOT); this.eventHubName = eventHubAsyncClient.getEventHubName().toLowerCase(Locale.ROOT); this.consumerGroup = consumerGroup.toLowerCase(Locale.ROOT); this.loadBalancerUpdateInterval = loadBalancerUpdateInterval; - logger.info("The instance ID for this event processors is {}", this.identifier); this.partitionPumpManager = new PartitionPumpManager(checkpointStore, partitionProcessorFactory, eventHubClientBuilder, trackLastEnqueuedEventProperties, tracerProvider, initialPartitionEventPosition, maxBatchSize, maxWaitTime, batchReceiveMode); @@ -138,7 +143,7 @@ public synchronized void start() { logger.info("Event processor is already running"); return; } - logger.info("Starting a new event processor instance with id {}", this.identifier); + logger.info("Starting a new event processor instance."); ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); scheduler.set(executor); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java index 2bb6d9ffd079..2b17ea587844 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java @@ -5,6 +5,7 @@ import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.logging.LogLevel; import com.azure.messaging.eventhubs.models.ErrorContext; import com.azure.messaging.eventhubs.models.PartitionContext; import com.azure.messaging.eventhubs.models.PartitionOwnership; @@ -27,6 +28,9 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.ENTITY_PATH_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.OWNER_ID_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY; import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; @@ -112,7 +116,9 @@ void loadBalance() { return; } - logger.info("Starting load balancer for {}", this.ownerId); + logger.atInfo() + .addKeyValue(OWNER_ID_KEY, this.ownerId) + .log("Starting load balancer."); /* * Retrieve current partition ownership details from the datastore. */ @@ -127,7 +133,10 @@ void loadBalance() { Mono> partitionsMono; if (CoreUtils.isNullOrEmpty(partitionsCache.get())) { // Call Event Hubs service to get the partition ids if the cache is empty - logger.info("Getting partitions from Event Hubs service for {}", eventHubName); + logger.atInfo() + .addKeyValue(ENTITY_PATH_KEY, eventHubName) + .log("Getting partitions from Event Hubs service."); + partitionsMono = eventHubAsyncClient .getPartitionIds() .timeout(Duration.ofMinutes(1)) @@ -144,7 +153,7 @@ void loadBalance() { .repeat(() -> LoadBalancingStrategy.GREEDY == loadBalancingStrategy && morePartitionsToClaim.get()) .subscribe(ignored -> { }, ex -> { - logger.warning(Messages.LOAD_BALANCING_FAILED, ex.getMessage(), ex); + logger.warning(Messages.LOAD_BALANCING_FAILED, ex); ErrorContext errorContext = new ErrorContext(partitionAgnosticContext, ex); processError.accept(errorContext); isLoadBalancerRunning.set(false); @@ -200,7 +209,10 @@ private Mono loadBalance(final Tuple2, Lis // add the current event processor to the map if it doesn't exist ownerPartitionMap.putIfAbsent(this.ownerId, new ArrayList<>()); - logger.verbose("Current partition distribution {}", format(ownerPartitionMap)); + + if (logger.canLogAtLevel(LogLevel.VERBOSE)) { + logger.verbose("Current partition distribution {}", format(ownerPartitionMap)); + } if (CoreUtils.isNullOrEmpty(activePartitionOwnershipMap)) { /* @@ -350,8 +362,11 @@ private String findPartitionToSteal(final Map> .max(Comparator.comparingInt(entry -> entry.getValue().size())) .get(); int numberOfPartitions = ownerWithMaxPartitions.getValue().size(); - logger.info("Owner id {} owns {} partitions, stealing a partition from it", ownerWithMaxPartitions.getKey(), - numberOfPartitions); + + logger.atInfo() + .addKeyValue(OWNER_ID_KEY, ownerWithMaxPartitions.getKey()) + .log("Owner owns {} partitions, stealing a partition from it.", numberOfPartitions); + return ownerWithMaxPartitions.getValue().get(RANDOM.nextInt(numberOfPartitions)).getPartitionId(); } @@ -417,7 +432,10 @@ private Map removeInactivePartitionOwnerships( private void claimOwnership(final Map partitionOwnershipMap, Map> ownerPartitionsMap, final String partitionIdToClaim) { - logger.info("Attempting to claim ownership of partition {}", partitionIdToClaim); + logger.atInfo() + .addKeyValue(PARTITION_ID_KEY, partitionIdToClaim) + .log("Attempting to claim ownership of partition."); + PartitionOwnership ownershipRequest = createPartitionOwnershipRequest(partitionOwnershipMap, partitionIdToClaim); @@ -435,11 +453,13 @@ private void claimOwnership(final Map partitionOwner morePartitionsToClaim.set(true); checkpointStore .claimOwnership(partitionsToClaim) - .doOnNext(partitionOwnership -> logger.info("Successfully claimed ownership of partition {}", - partitionOwnership.getPartitionId())) + .doOnNext(partitionOwnership -> logger.atInfo() + .addKeyValue(PARTITION_ID_KEY, partitionOwnership.getPartitionId()) + .log("Successfully claimed ownership.")) .doOnError(ex -> logger - .warning(Messages.FAILED_TO_CLAIM_OWNERSHIP, ownershipRequest.getPartitionId(), - ex.getMessage(), ex)) + .atWarning() + .addKeyValue(PARTITION_ID_KEY, ownershipRequest.getPartitionId()) + .log(Messages.FAILED_TO_CLAIM_OWNERSHIP, ex)) .collectList() .zipWhen(ownershipList -> checkpointStore.listCheckpoints(fullyQualifiedNamespace, eventHubName, consumerGroupName) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPump.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPump.java index 48789f27f421..81ce6368626d 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPump.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPump.java @@ -6,6 +6,8 @@ import com.azure.core.util.logging.ClientLogger; import reactor.core.scheduler.Scheduler; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY; + /** * Contains the event hub consumer and scheduler that continuously receive events. */ @@ -40,7 +42,9 @@ public void close() { try { client.close(); } catch (Exception error) { - logger.info("partitionId[{}] Exception occurred disposing of consumer client.", partitionId, error); + logger.atInfo() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .log("Exception occurred disposing of consumer client.", error); } finally { scheduler.dispose(); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java index 2c387e433099..ef1b11a69fd3 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java @@ -47,6 +47,8 @@ import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_NAMESPACE_VALUE; import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_TRACING_SERVICE_NAME; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.SEQUENCE_NUMBER_KEY; /** * The partition pump manager that keeps track of all the partition pumps started by this {@link EventProcessorClient}. @@ -124,7 +126,9 @@ void stopAllPartitionPumps() { try { eventHubConsumer.close(); } catch (Exception ex) { - logger.warning(Messages.FAILED_CLOSE_CONSUMER_PARTITION, partitionId, ex); + logger.atWarning() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .log(Messages.FAILED_CLOSE_CONSUMER_PARTITION, ex); } finally { partitionPumps.remove(partitionId); } @@ -142,19 +146,27 @@ void verifyPartitionConnection(PartitionOwnership ownership) { final PartitionPump partitionPump = partitionPumps.get(partitionId); if (partitionPump == null) { - logger.info("eventHubName[{}] partitionId[{}] No partition pump found for ownership record.", - ownership.getEventHubName(), partitionId); + + logger.atInfo() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .addKeyValue(ENTITY_PATH_KEY, ownership.getEventHubName()) + .log("No partition pump found for ownership record."); return; } final EventHubConsumerAsyncClient consumerClient = partitionPump.getClient(); if (consumerClient.isConnectionClosed()) { - logger.info("eventHubName[{}] partitionId[{}] Connection closed for partition. Removing the consumer.", - ownership.getEventHubName(), partitionId); + logger.atInfo() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .addKeyValue(ENTITY_PATH_KEY, ownership.getEventHubName()) + .log("Connection closed for partition. Removing the consumer."); + try { partitionPump.close(); } catch (Exception ex) { - logger.warning(Messages.FAILED_CLOSE_CONSUMER_PARTITION, partitionId, ex); + logger.atWarning() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .log(Messages.FAILED_CLOSE_CONSUMER_PARTITION, ex); } finally { partitionPumps.remove(partitionId); } @@ -169,7 +181,10 @@ void verifyPartitionConnection(PartitionOwnership ownership) { */ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoint) { if (partitionPumps.containsKey(claimedOwnership.getPartitionId())) { - logger.verbose("Consumer is already running for this partition {}", claimedOwnership.getPartitionId()); + logger.atVerbose() + .addKeyValue(PARTITION_ID_KEY, claimedOwnership.getPartitionId()) + .log("Consumer is already running."); + return; } @@ -197,8 +212,12 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi } else { startFromEventPosition = EventPosition.latest(); } - logger.info("Starting event processing from {} for partition {}", startFromEventPosition, - claimedOwnership.getPartitionId()); + + logger.atInfo() + .addKeyValue(PARTITION_ID_KEY, claimedOwnership.getPartitionId()) + .addKeyValue("eventPosition", startFromEventPosition) + .log("Starting event processing."); + ReceiveOptions receiveOptions = new ReceiveOptions().setOwnerLevel(0L) .setTrackLastEnqueuedEventProperties(trackLastEnqueuedEventProperties); @@ -216,9 +235,11 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi .receiveFromPartition(claimedOwnership.getPartitionId(), startFromEventPosition, receiveOptions) .doOnNext(partitionEvent -> { if (logger.canLogAtLevel(LogLevel.VERBOSE)) { - logger.verbose("On next {}, {}, {}", - partitionContext.getEventHubName(), partitionContext.getPartitionId(), - partitionEvent.getData().getSequenceNumber()); + logger.atVerbose() + .addKeyValue(PARTITION_ID_KEY, partitionContext.getPartitionId()) + .addKeyValue(ENTITY_PATH_KEY, partitionContext.getEventHubName()) + .addKeyValue(SEQUENCE_NUMBER_KEY, partitionEvent.getData().getSequenceNumber()) + .log("On next."); } }); @@ -248,8 +269,10 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi if (partitionPumps.containsKey(claimedOwnership.getPartitionId())) { cleanup(claimedOwnership, partitionPumps.get(claimedOwnership.getPartitionId())); } - throw logger.logExceptionAsError( - new PartitionProcessorException( + + throw logger.atError() + .addKeyValue(PARTITION_ID_KEY, claimedOwnership.getPartitionId()) + .log(new PartitionProcessorException( "Error occurred while starting partition pump for partition " + claimedOwnership.getPartitionId(), ex)); } @@ -269,14 +292,19 @@ private void processEvent(PartitionContext partitionContext, PartitionProcessor } try { if (logger.canLogAtLevel(LogLevel.VERBOSE)) { - logger.verbose("Processing event {}, {}", partitionContext.getEventHubName(), - partitionContext.getPartitionId()); + + logger.atVerbose() + .addKeyValue(PARTITION_ID_KEY, partitionContext.getPartitionId()) + .addKeyValue(ENTITY_PATH_KEY, partitionContext.getEventHubName()) + .log("Processing event."); } partitionProcessor.processEvent(new EventContext(partitionContext, eventData, checkpointStore, eventContext.getLastEnqueuedEventProperties())); if (logger.canLogAtLevel(LogLevel.VERBOSE)) { - logger.verbose("Completed processing event {}, {}", partitionContext.getEventHubName(), - partitionContext.getPartitionId()); + logger.atVerbose() + .addKeyValue(PARTITION_ID_KEY, partitionContext.getPartitionId()) + .addKeyValue(ENTITY_PATH_KEY, partitionContext.getEventHubName()) + .log("Completed processing event."); } endProcessTracingSpan(processSpanContext, Signal.complete()); } catch (Throwable throwable) { @@ -301,13 +329,17 @@ private void processEvents(PartitionContext partitionContext, PartitionProcessor EventBatchContext eventBatchContext = new EventBatchContext(partitionContext, eventDataList, checkpointStore, lastEnqueuedEventProperties[0]); if (logger.canLogAtLevel(LogLevel.VERBOSE)) { - logger.verbose("Processing event batch {}, {}", partitionContext.getEventHubName(), - partitionContext.getPartitionId()); + logger.atVerbose() + .addKeyValue(PARTITION_ID_KEY, partitionContext.getPartitionId()) + .addKeyValue(ENTITY_PATH_KEY, partitionContext.getEventHubName()) + .log("Processing event batch."); } partitionProcessor.processEventBatch(eventBatchContext); if (logger.canLogAtLevel(LogLevel.VERBOSE)) { - logger.verbose("Completed processing event batch{}, {}", partitionContext.getEventHubName(), - partitionContext.getPartitionId()); + logger.atVerbose() + .addKeyValue(PARTITION_ID_KEY, partitionContext.getPartitionId()) + .addKeyValue(ENTITY_PATH_KEY, partitionContext.getEventHubName()) + .log("Completed processing event batch."); } } else { EventData eventData = (partitionEventBatch.size() == 1 @@ -335,7 +367,11 @@ private void handleError(PartitionOwnership claimedOwnership, PartitionPump part if (!(throwable instanceof PartitionProcessorException)) { shouldRethrow = false; // If user code threw an exception in processEvent callback, bubble up the exception - logger.warning("Error receiving events from partition {}", partitionContext.getPartitionId(), throwable); + + logger.atWarning() + .addKeyValue(PARTITION_ID_KEY, partitionContext.getPartitionId()) + .log("Error receiving events from partition.", throwable); + partitionProcessor.processError(new ErrorContext(partitionContext, throwable)); } // If there was an error on receive, it also marks the end of the event data stream @@ -352,12 +388,14 @@ private void handleError(PartitionOwnership claimedOwnership, PartitionPump part private void cleanup(PartitionOwnership claimedOwnership, PartitionPump partitionPump) { try { // close the consumer - logger.info("Closing consumer for partition id {}", claimedOwnership.getPartitionId()); + logger.atInfo().addKeyValue(PARTITION_ID_KEY, claimedOwnership.getPartitionId()) + .log("Closing consumer."); + partitionPump.close(); } finally { // finally, remove the partition from partitionPumps map - logger.info("Removing partition id {} from list of processing partitions", - claimedOwnership.getPartitionId()); + logger.atInfo().addKeyValue(PARTITION_ID_KEY, claimedOwnership.getPartitionId()) + .log("Removing partition from list of processing partitions."); partitionPumps.remove(claimedOwnership.getPartitionId()); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java index 68c5cb5a7c49..a71763663f3e 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java @@ -33,6 +33,11 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Supplier; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.ENTITY_PATH_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.LINK_NAME_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.CREDITS_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.TRACKING_ID_KEY; + /** * Processes AMQP receive links into a stream of AMQP messages. */ @@ -146,15 +151,20 @@ public void onNext(AmqpReceiveLink next) { Objects.requireNonNull(next, "'next' cannot be null."); if (isTerminated()) { - logger.warning("linkName[{}] entityPath[{}]. Got another link when we have already terminated processor.", - next.getLinkName(), next.getEntityPath()); + logger.atWarning() + .addKeyValue(LINK_NAME_KEY, next.getLinkName()) + .addKeyValue(ENTITY_PATH_KEY, next.getEntityPath()) + .log("Got another link when we have already terminated processor."); Operators.onNextDropped(next, currentContext()); return; } final String linkName = next.getLinkName(); - logger.info("linkName[{}] entityPath[{}]. Setting next AMQP receive link.", linkName, entityPath); + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue(ENTITY_PATH_KEY, next.getEntityPath()) + .log("Setting next AMQP receive link."); final AmqpReceiveLink oldChannel; final Disposable oldSubscription; @@ -178,8 +188,11 @@ public void onNext(AmqpReceiveLink next) { if (credits < 1) { linkHasNoCredits.compareAndSet(false, true); } else { - logger.info("linkName[{}] entityPath[{}] credits[{}] Link is empty. Adding more credits.", - linkName, entityPath, credits); + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue(ENTITY_PATH_KEY, next.getEntityPath()) + .addKeyValue(CREDITS_KEY, credits) + .log("Link is empty. Adding more credits."); } } @@ -198,20 +211,27 @@ public void onNext(AmqpReceiveLink next) { final int creditsToAdd = getCreditsToAdd(); final int total = Math.max(prefetch, creditsToAdd); - logger.verbose("linkName[{}] prefetch[{}] creditsToAdd[{}] Adding initial credits.", - linkName, prefetch, creditsToAdd); + logger.atVerbose() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue("prefetch", prefetch) + .addKeyValue(CREDITS_KEY, creditsToAdd) + .log("Adding initial credits."); + operation = next.addCredits(total); } return operation; }) .subscribe(noop -> { - }, error -> logger.info("linkName[{}] was already closed. Could not add credits.", linkName)), + }, error -> logger.atInfo().addKeyValue(LINK_NAME_KEY, linkName).log("Link was already closed. Could not add credits.")), next.getEndpointStates().subscribeOn(Schedulers.boundedElastic()).subscribe( state -> { // Connection was successfully opened, we can reset the retry interval. if (state == AmqpEndpointState.ACTIVE) { - logger.info("linkName[{}] credits[{}] is active.", linkName, next.getCredits()); + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue(CREDITS_KEY, next.getCredits()) + .log("Link is active."); retryAttempts.set(0); } }, @@ -224,9 +244,11 @@ public void onNext(AmqpReceiveLink next) { LinkErrorContext errorContext = (LinkErrorContext) amqpException.getContext(); if (currentLink != null && !currentLink.getLinkName().equals(errorContext.getTrackingId())) { - logger.info("linkName[{}] entityPath[{}] trackingId[{}] Link lost signal received" - + " for a link that is not current. Ignoring the error.", - linkName, entityPath, errorContext.getTrackingId()); + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .addKeyValue(TRACKING_ID_KEY, errorContext.getTrackingId()) + .log("Link lost signal received for a link that is not current. Ignoring the error."); return; } } @@ -238,13 +260,18 @@ public void onNext(AmqpReceiveLink next) { () -> { if (parentConnection.isDisposed() || isTerminated() || UPSTREAM.get(this) == Operators.cancelledSubscription()) { - logger.info("linkName[{}] entityPath[{}] Terminal state reached. Disposing of link " - + "processor.", linkName, entityPath); + + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Terminal state reached. Disposing of link processor."); dispose(); } else { - logger.info("linkName[{}] entityPath[{}] Receive link endpoint states are closed. " - + "Requesting another.", linkName, entityPath); + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Receive link endpoint states are closed. Requesting another."); final AmqpReceiveLink existing = currentLink; currentLink = null; @@ -283,9 +310,10 @@ public void subscribe(CoreSubscriber actual) { final boolean terminateSubscriber = isTerminated() || (currentLink == null && upstream == Operators.cancelledSubscription()); if (isTerminated()) { - logger.info("linkName[{}] entityPath[{}]. AmqpReceiveLink is already terminated.", - currentLinkName, entityPath); - + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, currentLinkName) + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("AmqpReceiveLink is already terminated."); } else if (currentLink == null && upstream == Operators.cancelledSubscription()) { logger.info("There is no current link and upstream is terminated."); } @@ -320,17 +348,22 @@ public void subscribe(CoreSubscriber actual) { public void onError(Throwable throwable) { Objects.requireNonNull(throwable, "'throwable' is required."); - logger.info("linkName[{}] Error on receive link.", currentLinkName, throwable); + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, currentLinkName) + .log("Error on receive link.", throwable); if (isTerminated() || isCancelled) { - logger.info("linkName[{}] AmqpReceiveLinkProcessor is terminated. Cannot process another error.", - currentLinkName, throwable); + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, currentLinkName) + .log("AmqpReceiveLinkProcessor is terminated. Cannot process another error.", throwable); Operators.onErrorDropped(throwable, currentContext()); return; } if (parentConnection.isDisposed()) { - logger.info("linkName[{}] Parent connection is disposed. Not reopening on error.", currentLinkName); + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, currentLinkName) + .log("Parent connection is disposed. Not reopening on error."); } lastError = throwable; @@ -349,7 +382,9 @@ public void onError(Throwable throwable) { */ @Override public void onComplete() { - logger.info("linkName[{}] Receive link completed from upstream.", currentLinkName); + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, currentLinkName) + .log("Receive link completed from upstream."); UPSTREAM.set(this, Operators.cancelledSubscription()); } @@ -360,7 +395,9 @@ public void dispose() { return; } - logger.info("linkName[{}] Disposing receive link.", currentLinkName); + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, currentLinkName) + .log("Disposing receive link."); drain(); onDispose(); @@ -485,8 +522,11 @@ private void drainQueue() { try { subscriber.onNext(message); } catch (Exception e) { - logger.error("linkName[{}] entityPath[{}] Exception occurred while handling downstream onNext " - + "operation.", currentLinkName, entityPath, e); + logger.atError() + .addKeyValue(LINK_NAME_KEY, currentLinkName) + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Exception occurred while handling downstream onNext operation.", e); + throw logger.logExceptionAsError(Exceptions.propagate( Operators.onOperatorError(upstream, e, message, subscriber.currentContext()))); } @@ -546,16 +586,24 @@ private void addCreditsToLink(String message) { final int credits = getCreditsToAdd(); if (link == null) { - logger.verbose("entityPath[{}] creditsToAdd[{}] There is no link to add credits to.", - entityPath, credits); + + logger.atVerbose() + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .addKeyValue(CREDITS_KEY, credits) + .log("There is no link to add credits to."); + return; } final String linkName = link.getLinkName(); if (credits < 1) { - logger.verbose("linkName[{}] entityPath[{}] creditsToAdd[{}] There are no additional credits to add.", - linkName, entityPath, credits); + logger.atVerbose() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .addKeyValue(CREDITS_KEY, credits) + .log("There are no additional credits to add."); + return; } @@ -564,13 +612,19 @@ private void addCreditsToLink(String message) { // many events to buffer on the client and also control the throughput. If users need higher throughput, // they can set a higher prefetch number and allocate larger heap size accordingly. if (currentLinkCredits < prefetch) { - logger.info("linkName[{}] entityPath[{}] creditsToAdd[{}] Link running low on credits. Adding more. " - + "{}", linkName, entityPath, credits, message); - + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .addKeyValue(CREDITS_KEY, credits) + .addKeyValue("message", message) + .log("Link running low on credits. Adding more."); link.addCredits(credits).subscribe(noop -> { }, error -> { - logger.info("linkName[{}] entityPath[{}] was already closed. Could not add credits.", - linkName, entityPath); + logger.atInfo() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Link was already closed. Could not add credits."); + linkHasNoCredits.compareAndSet(false, true); }); } @@ -609,8 +663,10 @@ private void disposeReceiver(AmqpReceiveLink link) { try { ((AsyncCloseable) link).closeAsync().subscribe(); } catch (Exception error) { - logger.warning("linkName[{}] entityPath[{}] Unable to dispose of link.", link.getLinkName(), - link.getEntityPath(), error); + logger.atWarning() + .addKeyValue(LINK_NAME_KEY, link.getLinkName()) + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Unable to dispose of link.", error); } } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java index b4afd24461d5..03aee9811827 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java @@ -13,6 +13,23 @@ public final class ClientConstants { public static final Duration TOKEN_VALIDITY = Duration.ofMinutes(20); public static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(60); + // Logging context keys + // in sync with azure-core-amqp, but duplicate to minimize dependency + public static final String CONNECTION_ID_KEY = "connectionId"; + public static final String LINK_NAME_KEY = "linkName"; + public static final String ENTITY_PATH_KEY = "entityPath"; + public static final String SIGNAL_TYPE_KEY = "signalType"; + + // EventHubs specific logging context keys + public static final String PARTITION_ID_KEY = "partitionId"; + public static final String PARTITION_KEY_KEY = "partitionKey"; + public static final String SEQUENCE_NUMBER_KEY = "sequenceNumber"; + public static final String CONSUMER_GROUP_KEY = "consumerGroup"; + public static final String OWNER_ID_KEY = "ownerId"; + public static final String TRACKING_ID_KEY = "trackingId"; + public static final String WORK_ID_KEY = "workId"; + public static final String CREDITS_KEY = "credits"; + /** * The default maximum allowable size, in bytes, for a batch to be sent. */ diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubConnectionProcessor.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubConnectionProcessor.java index 404c528990c5..90f114bf86d1 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubConnectionProcessor.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubConnectionProcessor.java @@ -19,11 +19,15 @@ public class EventHubConnectionProcessor extends AmqpChannelProcessor channel.getEndpointStates(), RetryUtil.getRetryPolicy(retryOptions), new ClientLogger(EventHubConnectionProcessor.class)); + // TODO(limolkova) switch to new AmqpChannelProcessor constructor after azure-core-amqp is released + // this is to avoid strict dependency on the latest (and potentially beta) azure-core-amqp + // super(fullyQualifiedNamespace, channel -> channel.getEndpointStates(), + // RetryUtil.getRetryPolicy(retryOptions), Collections.singletonMap(ENTITY_PATH_KEY, eventHubName)); + this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null."); this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null."); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java index dcd57fb75dba..65db5881b034 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java @@ -27,6 +27,12 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; +import java.util.HashMap; +import java.util.Map; + +import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONNECTION_ID_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.ENTITY_PATH_KEY; + /** * A proton-j AMQP connection to an Azure Event Hub instance. Adds additional support for management operations. */ @@ -35,7 +41,7 @@ public class EventHubReactorAmqpConnection extends ReactorConnection implements private static final String MANAGEMENT_LINK_NAME = "mgmt"; private static final String MANAGEMENT_ADDRESS = "$management"; - private final ClientLogger logger = new ClientLogger(EventHubReactorAmqpConnection.class); + private final ClientLogger logger; private final TokenCredential tokenCredential; private final String connectionId; private final ReactorProvider reactorProvider; @@ -72,6 +78,10 @@ public EventHubReactorAmqpConnection(String connectionId, ConnectionOptions conn this.retryOptions = connectionOptions.getRetry(); this.tokenCredential = connectionOptions.getTokenCredential(); this.scheduler = connectionOptions.getScheduler(); + + Map loggingContext = new HashMap<>(1); + loggingContext.put(CONNECTION_ID_KEY, connectionId); + this.logger = new ClientLogger(EventHubReactorAmqpConnection.class, loggingContext); } @Override @@ -96,7 +106,9 @@ public Mono getManagementNode() { @Override public Mono createSendLink(String linkName, String entityPath, AmqpRetryOptions retryOptions) { return createSession(entityPath).flatMap(session -> { - logger.verbose("Get or create producer for path: '{}'", entityPath); + logger.atVerbose() + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Get or create producer."); final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions); return session.createProducer(linkName, entityPath, retryOptions.getTryTimeout(), retryPolicy) @@ -119,7 +131,9 @@ public Mono createReceiveLink(String linkName, String entityPat ReceiveOptions options) { return createSession(entityPath).cast(EventHubSession.class) .flatMap(session -> { - logger.verbose("Get or create consumer for path: '{}'", entityPath); + logger.atVerbose() + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Get or create consumer."); final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions); return session.createConsumer(linkName, entityPath, retryOptions.getTryTimeout(), retryPolicy, diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionProcessor.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionProcessor.java index 7f9b25be30b9..8f0fbd663d40 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionProcessor.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionProcessor.java @@ -14,6 +14,8 @@ import com.azure.messaging.eventhubs.models.InitializationContext; import java.util.function.Consumer; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY; + /** * An abstract class defining all the operations that a partition processor can perform. Users of {@link * EventProcessorClient} should extend from this class and implement {@link #processEvent(EventContext)} for @@ -42,8 +44,9 @@ public abstract class PartitionProcessor { * @param initializationContext The initialization context before events from the partition are processed. */ public void initialize(InitializationContext initializationContext) { - logger.info("Initializing partition processor for partition {}", - initializationContext.getPartitionContext().getPartitionId()); + logger.atInfo() + .addKeyValue(PARTITION_ID_KEY, initializationContext.getPartitionContext().getPartitionId()) + .log("Initializing partition processor for partition"); } /** @@ -81,8 +84,8 @@ public void processEventBatch(EventBatchContext eventBatchContext) { * events is closed. */ public void close(CloseContext closeContext) { - logger.info("Closing partition processor for partition {} with close reason {}", - closeContext.getPartitionContext().getPartitionId(), closeContext.getCloseReason()); + logger.atInfo() + .addKeyValue(PARTITION_ID_KEY, closeContext.getPartitionContext().getPartitionId()) + .log("Closing partition processor with close reason {}", closeContext.getCloseReason()); } - } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber.java index b588d7107d9e..4e397159501b 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber.java @@ -9,21 +9,29 @@ import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.Timer; import java.util.TimerTask; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.WORK_ID_KEY; + /** * Subscriber that takes {@link SynchronousReceiveWork} and publishes events to them in the order received. */ public class SynchronousEventSubscriber extends BaseSubscriber { private final Timer timer = new Timer(); - private final ClientLogger logger = new ClientLogger(SynchronousEventSubscriber.class); + private final ClientLogger logger; private final SynchronousReceiveWork work; private volatile Subscription subscription; public SynchronousEventSubscriber(SynchronousReceiveWork work) { this.work = Objects.requireNonNull(work, "'work' cannot be null."); + Map loggingContext = new HashMap<>(); + loggingContext.put(WORK_ID_KEY, this.work.getId()); + + this.logger = new ClientLogger(SynchronousEventSubscriber.class, loggingContext); } /** @@ -37,10 +45,12 @@ protected void hookOnSubscribe(Subscription subscription) { this.subscription = subscription; } - logger.info("Work: {}, Pending: {}, Scheduling receive timeout task.", work.getId(), work.getNumberOfEvents()); + logger.atInfo() + .addKeyValue("pendingEvents", work.getNumberOfEvents()) + .log("Scheduling receive timeout task."); subscription.request(work.getNumberOfEvents()); - timer.schedule(new ReceiveTimeoutTask(work.getId(), this::dispose), work.getTimeout().toMillis()); + timer.schedule(new ReceiveTimeoutTask(this::dispose, this.logger), work.getTimeout().toMillis()); } /** @@ -54,7 +64,7 @@ protected void hookOnNext(PartitionEvent value) { work.next(value); if (work.isTerminal()) { - logger.info("Work: {}. Completed. Closing Flux and cancelling subscription.", work.getId()); + logger.info("Work completed. Closing Flux and cancelling subscription."); dispose(); } } @@ -87,18 +97,17 @@ public void dispose() { } private static class ReceiveTimeoutTask extends TimerTask { - private final ClientLogger logger = new ClientLogger(ReceiveTimeoutTask.class); - private final long workId; + private final ClientLogger logger; private final Runnable onDispose; - ReceiveTimeoutTask(long workId, Runnable onDispose) { - this.workId = workId; + ReceiveTimeoutTask(Runnable onDispose, ClientLogger logger) { this.onDispose = onDispose; + this.logger = logger; } @Override public void run() { - logger.info("Work: {}. Timeout encountered, disposing of subscriber.", workId); + logger.info("Timeout encountered, disposing of subscriber."); onDispose.run(); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousReceiveWork.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousReceiveWork.java index 1ec6fd0ddee2..c206d0bea7a4 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousReceiveWork.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousReceiveWork.java @@ -11,6 +11,8 @@ import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.WORK_ID_KEY; + /** * Represents a synchronous receive request. * @@ -100,7 +102,9 @@ public void next(PartitionEvent event) { */ public synchronized void complete() { if (!isTerminal || emitter.isCancelled()) { - logger.info("Id: {}. Completing task.", id); + logger.atInfo() + .addKeyValue(WORK_ID_KEY, id) + .log("Completing task."); isTerminal = true; emitter.complete(); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/resources/eventhubs-messages.properties b/sdk/eventhubs/azure-messaging-eventhubs/src/main/resources/eventhubs-messages.properties index 6a33e29adf1c..ed72392a1802 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/resources/eventhubs-messages.properties +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/resources/eventhubs-messages.properties @@ -1,14 +1,14 @@ -PROCESS_SPAN_SCOPE_TYPE_ERROR=Process span scope type is not of type AutoCloseable, but type: %s. Not closing the scope and span +PROCESS_SPAN_SCOPE_TYPE_ERROR=Process span scope type is not of type AutoCloseable, but type: %s. Not closing the scope and span. MESSAGE_NOT_OF_TYPE=Message body type is not of type Data, but type: %s. Not setting body contents. REQUEST_VALUE_NOT_VALID=Back pressure request value not valid. It must be between {} and {}. EVENT_DATA_DOES_NOT_FIT=EventData does not fit into maximum number of batches. '%s' CANNOT_SEND_EVENT_BATCH_EMPTY=Cannot send an EventBatch that is empty. ERROR_SENDING_BATCH=Error sending batch. -FAILED_TO_CLAIM_OWNERSHIP=Failed to claim ownership of partition {} - {} -LOAD_BALANCING_FAILED=Load balancing for event processor failed - {} -EVENT_PROCESSOR_RUN_END=EventProcessor.run() endTracingSpan().close() failed with an error %s -FAILED_CLOSE_CONSUMER_PARTITION=Failed to close consumer for partition {} -ERROR_OCCURRED_IN_SUBSCRIBER_ERROR=Error occurred in subscriber. Error: {} +FAILED_TO_CLAIM_OWNERSHIP=Failed to claim ownership. +LOAD_BALANCING_FAILED=Load balancing for event processor failed. +EVENT_PROCESSOR_RUN_END=EventProcessor.run() endTracingSpan().close() failed with an error. %s +FAILED_CLOSE_CONSUMER_PARTITION=Failed to close consumer for partition. +ERROR_OCCURRED_IN_SUBSCRIBER_ERROR=Error occurred in subscriber. EXCEPTION_OCCURRED_WHILE_EMITTING=Exception occurred while emitting next received event. CLASS_NOT_A_SUPPORTED_TYPE=Class '%s' is not a supported deserializable type. -ENCODING_TYPE_NOT_SUPPORTED=Encoding Type: %s is not supported +ENCODING_TYPE_NOT_SUPPORTED=Encoding Type: %s is not supported. diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/SampleCheckpointStore.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/SampleCheckpointStore.java index 2871d174e652..be441204adc8 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/SampleCheckpointStore.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/SampleCheckpointStore.java @@ -16,6 +16,10 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.OWNER_ID_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.SEQUENCE_NUMBER_KEY; + /** * A simple in-memory implementation of a {@link CheckpointStore}. This implementation keeps track of partition * ownership details including checkpointing information in-memory. Using this implementation will only facilitate @@ -81,9 +85,11 @@ public Flux claimOwnership(List requeste || partitionOwnershipMap.get(partitionOwnership.getPartitionId()).getETag() .equals(partitionOwnership.getETag()); }) - .doOnNext(partitionOwnership -> logger - .info("Ownership of partition {} claimed by {}", partitionOwnership.getPartitionId(), - partitionOwnership.getOwnerId())) + .doOnNext(partitionOwnership -> + logger.atInfo() + .addKeyValue(PARTITION_ID_KEY, partitionOwnership.getPartitionId()) + .addKeyValue(OWNER_ID_KEY, partitionOwnership.getOwnerId()) + .log("Ownership claimed.")) .map(partitionOwnership -> { partitionOwnership.setETag(UUID.randomUUID().toString()) .setLastModifiedTime(System.currentTimeMillis()); @@ -119,8 +125,10 @@ public Mono updateCheckpoint(Checkpoint checkpoint) { String prefix = prefixBuilder(checkpoint.getFullyQualifiedNamespace(), checkpoint.getEventHubName(), checkpoint.getConsumerGroup(), CHECKPOINT); checkpointsMap.put(prefix + SEPARATOR + checkpoint.getPartitionId(), checkpoint); - logger.info("Updated checkpoint for partition {} with sequence number {}", checkpoint.getPartitionId(), - checkpoint.getSequenceNumber()); + logger.atInfo() + .addKeyValue(PARTITION_ID_KEY, checkpoint.getPartitionId()) + .addKeyValue(SEQUENCE_NUMBER_KEY, checkpoint.getSequenceNumber()) + .log("Updated checkpoint."); return Mono.empty(); } }