Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import reactor.core.publisher.Operators;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand All @@ -27,6 +28,7 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;

import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.INTERVAL_KEY;

public class AmqpChannelProcessor<T> extends Mono<T> implements Processor<T, T>, CoreSubscriber<T>, Disposable {
Expand Down Expand Up @@ -64,8 +66,10 @@ public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath,
this.endpointStatesFunction = Objects.requireNonNull(endpointStatesFunction,
"'endpointStates' cannot be null.");
this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");
this.logger = Objects.requireNonNull(logger, "'logger' cannot be null.");

Map<String, Object> loggingContext = new HashMap<>(1);
loggingContext.put(ENTITY_PATH_KEY, Objects.requireNonNull(entityPath, "'entityPath' cannot be null."));
this.logger = new ClientLogger(getClass(), loggingContext);
this.errorContext = new AmqpErrorContext(fullyQualifiedNamespace);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ public class BlobCheckpointStore implements CheckpointStore {
private static final String CHECKPOINT_PATH = "/checkpoint/";
private static final String OWNERSHIP_PATH = "/ownership/";

// logging keys, consistent across all AMQP libraries and human-readable
private static final String PARTITION_ID_LOG_KEY = "partitionId";
private static final String OWNER_ID_LOG_KEY = "ownerId";
private static final String SEQUENCE_NUMBER_LOG_KEY = "sequenceNumber";
private static final String BLOB_NAME_LOG_KEY = "blobName";
private static final String OFFSET_LOG_KEY = "offset";

/**
* An empty string.
*/
Expand Down Expand Up @@ -103,20 +110,27 @@ private <T> Flux<T> listBlobs(String prefix, Function<BlobItem, Mono<T>> convert

private Mono<Checkpoint> convertToCheckpoint(BlobItem blobItem) {
String[] names = blobItem.getName().split(BLOB_PATH_SEPARATOR);
logger.verbose(Messages.FOUND_BLOB_FOR_PARTITION, blobItem.getName());
logger.atVerbose()
.addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName())
.log(Messages.FOUND_BLOB_FOR_PARTITION);
if (names.length == 5) {
// Blob names should be of the pattern
// fullyqualifiednamespace/eventhub/consumergroup/checkpoints/<partitionId>
// While we can further check if the partition id is numeric, it may not necessarily be the case in future.

if (CoreUtils.isNullOrEmpty(blobItem.getMetadata())) {
logger.warning(Messages.NO_METADATA_AVAILABLE_FOR_BLOB, blobItem.getName());
logger.atWarning()
.addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName())
.log(Messages.NO_METADATA_AVAILABLE_FOR_BLOB);
return Mono.empty();
}

Map<String, String> metadata = blobItem.getMetadata();
logger.verbose(Messages.CHECKPOINT_INFO, blobItem.getName(), metadata.get(SEQUENCE_NUMBER),
metadata.get(OFFSET));
logger.atVerbose()
.addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName())
.addKeyValue(SEQUENCE_NUMBER_LOG_KEY, metadata.get(SEQUENCE_NUMBER))
.addKeyValue(OFFSET_LOG_KEY, metadata.get(OFFSET))
.log(Messages.CHECKPOINT_INFO);

Long sequenceNumber = null;
Long offset = null;
Expand Down Expand Up @@ -176,20 +190,26 @@ public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requeste
.uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null, metadata, null, null,
blobRequestConditions)
.flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> {
logger.verbose(Messages.CLAIM_ERROR, partitionId, error.getMessage());
logger.atVerbose()
.addKeyValue(PARTITION_ID_LOG_KEY, partitionId)
.log(Messages.CLAIM_ERROR, error);
return Mono.empty();
}, Mono::empty);
} else {
// update existing blob
blobRequestConditions.setIfMatch(partitionOwnership.getETag());
return blobAsyncClient.setMetadataWithResponse(metadata, blobRequestConditions)
.flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> {
logger.verbose(Messages.CLAIM_ERROR, partitionId, error);
logger.atVerbose()
.addKeyValue(PARTITION_ID_LOG_KEY, partitionId)
.log(Messages.CLAIM_ERROR, error);
return Mono.empty();
}, Mono::empty);
}
} catch (Exception ex) {
logger.warning(Messages.CLAIM_ERROR, partitionOwnership.getPartitionId(), ex);
logger.atWarning()
.addKeyValue(PARTITION_ID_LOG_KEY, partitionOwnership.getPartitionId())
.log(Messages.CLAIM_ERROR, ex);
return Mono.empty();
}
});
Expand Down Expand Up @@ -252,19 +272,21 @@ private String getBlobName(String fullyQualifiedNamespace, String eventHubName,
}

private Mono<PartitionOwnership> convertToPartitionOwnership(BlobItem blobItem) {
logger.verbose(Messages.FOUND_BLOB_FOR_PARTITION, blobItem.getName());
logger.atVerbose()
.addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName())
.log(Messages.FOUND_BLOB_FOR_PARTITION);

String[] names = blobItem.getName().split(BLOB_PATH_SEPARATOR);
if (names.length == 5) {
// Blob names should be of the pattern
// fullyqualifiednamespace/eventhub/consumergroup/ownership/<partitionId>
// While we can further check if the partition id is numeric, it may not necessarily be the case in future.
if (CoreUtils.isNullOrEmpty(blobItem.getMetadata())) {
logger.warning(Messages.NO_METADATA_AVAILABLE_FOR_BLOB, blobItem.getName());
logger.atWarning()
.addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName())
.log(Messages.NO_METADATA_AVAILABLE_FOR_BLOB);
return Mono.empty();
}
logger
.verbose(Messages.BLOB_OWNER_INFO, blobItem.getName(),
blobItem.getMetadata().getOrDefault(OWNER_ID, EMPTY_STRING));

BlobItemProperties blobProperties = blobItem.getProperties();

Expand All @@ -273,6 +295,11 @@ private Mono<PartitionOwnership> convertToPartitionOwnership(BlobItem blobItem)
ownerId = EMPTY_STRING;
}

logger.atVerbose()
.addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName())
.addKeyValue(OWNER_ID_LOG_KEY, ownerId)
.log(Messages.BLOB_OWNER_INFO);

PartitionOwnership partitionOwnership = new PartitionOwnership()
.setFullyQualifiedNamespace(names[0])
.setEventHubName(names[1])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
NO_METADATA_AVAILABLE_FOR_BLOB=No metadata available for blob {}
CLAIM_ERROR=Couldn't claim ownership of partition {}
FOUND_BLOB_FOR_PARTITION=Found blob for partition {}
BLOB_OWNER_INFO=Blob {} is owned by {}
CHECKPOINT_INFO=Blob {} has checkpoint with sequence number {} and offset {}
NO_METADATA_AVAILABLE_FOR_BLOB=No metadata available for blob.
CLAIM_ERROR=Couldn't claim ownership of partition.
FOUND_BLOB_FOR_PARTITION=Found blob for partition.
BLOB_OWNER_INFO=Blob ownership info.
CHECKPOINT_INFO=Blob checkpoint info.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class EventData {
*/
static final Set<String> RESERVED_SYSTEM_PROPERTIES;

private static final ClientLogger LOGGER = new ClientLogger(EventData.class);
private final Map<String, Object> properties;
private final SystemProperties systemProperties;
private final AmqpAnnotatedMessage annotatedMessage;
Expand Down Expand Up @@ -140,11 +141,11 @@ public EventData(BinaryData body) {
break;
case SEQUENCE:
case VALUE:
new ClientLogger(EventData.class).warning("Message body type '{}' is not supported in EH. "
LOGGER.warning("Message body type '{}' is not supported in EH. "
+ " Getting contents of body may throw.", annotatedMessage.getBody().getBodyType());
break;
default:
throw new ClientLogger(EventData.class).logExceptionAsError(new IllegalArgumentException(
throw LOGGER.logExceptionAsError(new IllegalArgumentException(
"Body type not valid " + annotatedMessage.getBody().getBodyType()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONNECTION_ID_KEY;

/**
* This class provides a fluent builder API to aid the instantiation of {@link EventHubProducerAsyncClient}, {@link
* EventHubProducerClient}, {@link EventHubConsumerAsyncClient}, and {@link EventHubConsumerClient}. Calling any of the
Expand Down Expand Up @@ -748,7 +750,9 @@ private EventHubConnectionProcessor buildConnectionProcessor(MessageSerializer m
}

final String connectionId = StringUtil.getRandomString("MF");
logger.info("connectionId[{}]: Emitting a single connection.", connectionId);
logger.atInfo()
.addKeyValue(CONNECTION_ID_KEY, connectionId)
.log("Emitting a single connection.");

final TokenManagerProvider tokenManagerProvider = new AzureTokenManagerProvider(
connectionOptions.getAuthorizationType(), connectionOptions.getFullyQualifiedNamespace(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONNECTION_ID_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.LINK_NAME_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.SIGNAL_TYPE_KEY;
import static com.azure.core.util.FluxUtil.fluxError;
import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY;

/**
* An <b>asynchronous</b> consumer responsible for reading {@link EventData} from either a specific Event Hub partition
Expand Down Expand Up @@ -413,8 +417,11 @@ private Flux<PartitionEvent> createConsumer(String linkName, String partitionId,
}

private void removeLink(String linkName, String partitionId, SignalType signalType) {
logger.info("linkName[{}], partitionId[{}], signal[{}]: Receiving completed.",
linkName, partitionId, signalType);
logger.atInfo()
.addKeyValue(LINK_NAME_KEY, linkName)
.addKeyValue(PARTITION_ID_KEY, partitionId)
.addKeyValue(SIGNAL_TYPE_KEY, signalType)
.log("Receiving completed.");

final EventHubPartitionAsyncConsumer consumer = openPartitionConsumers.remove(linkName);

Expand All @@ -434,8 +441,11 @@ private EventHubPartitionAsyncConsumer createPartitionConsumer(String linkName,
//
final Mono<AmqpReceiveLink> receiveLinkMono = connectionProcessor
.flatMap(connection -> {
logger.info("connectionId[{}] linkName[{}] Creating receive consumer for partition '{}'",
connection.getId(), linkName, partitionId);
logger.atInfo()
.addKeyValue(LINK_NAME_KEY, linkName)
.addKeyValue(PARTITION_ID_KEY, partitionId)
.addKeyValue(CONNECTION_ID_KEY, connection.getId())
.log("Creating receive consumer for partition.");
return connection.createReceiveLink(linkName, entityPath, initialPosition.get().get(), receiveOptions);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY;

/**
* A <b>synchronous</b> consumer responsible for reading {@link EventData} from an Event Hub partition in the context of
* a specific consumer group.
Expand Down Expand Up @@ -287,7 +289,10 @@ private void queueWork(String partitionId, int maximumMessageCount, EventPositio
final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maximumWaitTime,
emitter);
final SynchronousEventSubscriber syncSubscriber = new SynchronousEventSubscriber(work);
logger.info("Started synchronous event subscriber for partition '{}'.", partitionId);
logger.atInfo()
.addKeyValue(PARTITION_ID_KEY, partitionId)
.log("Started synchronous event subscriber.");

consumer.receiveFromPartition(partitionId, startingPosition, receiveOptions).subscribeWith(syncSubscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public <T> Message serialize(T object) {

if (!(object instanceof EventData)) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"Cannot serialize object that is not EventData. Clazz: " + object.getClass()));
"Cannot serialize object that is not EventData. Class: " + object.getClass()));
}

final EventData eventData = (EventData) object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONSUMER_GROUP_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY;

/**
* A package-private consumer responsible for reading {@link EventData} from a specific Event Hub partition in the
* context of a specific consumer group.
Expand Down Expand Up @@ -72,10 +75,11 @@ class EventHubPartitionAsyncConsumer implements AutoCloseable {
if (offset != null) {
currentOffset = offset;
} else {
logger.warning(
"Offset for received event should not be null. Partition Id: {}. Consumer group: {}. Data: {}",
event.getPartitionContext().getPartitionId(), event.getPartitionContext().getConsumerGroup(),
event.getData().getBodyAsString());
logger.atWarning()
.addKeyValue(PARTITION_ID_KEY, event.getPartitionContext().getPartitionId())
.addKeyValue(CONSUMER_GROUP_KEY, event.getPartitionContext().getConsumerGroup())
.addKeyValue("data", () -> event.getData().getBodyAsString())
.log("Offset for received event should not be null.");
}
});
}
Expand All @@ -90,7 +94,9 @@ public void close() {
// cancel only if the processor is not already terminated.
amqpReceiveLinkProcessor.cancel();
}
logger.info("Closed consumer for partition {}", this.partitionId);
logger.atInfo()
.addKeyValue(PARTITION_ID_KEY, this.partitionId)
.log("Closed consumer.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_NAMESPACE_VALUE;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_TRACING_SERVICE_NAME;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.MAX_MESSAGE_LENGTH_BYTES;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_KEY_KEY;

/**
* An <b>asynchronous</b> producer responsible for transmitting {@link EventData} to a specific Event Hub, grouped
Expand Down Expand Up @@ -510,12 +512,19 @@ public Mono<Void> send(EventDataBatch batch) {
}

if (!CoreUtils.isNullOrEmpty(batch.getPartitionId())) {
logger.verbose("Sending batch with size[{}] to partitionId[{}].", batch.getCount(), batch.getPartitionId());
logger.atVerbose()
.addKeyValue("size", batch.getCount())
.addKeyValue(PARTITION_ID_KEY, batch.getPartitionId())
.log("Sending batch.");
} else if (!CoreUtils.isNullOrEmpty(batch.getPartitionKey())) {
logger.verbose("Sending batch with size[{}] with partitionKey[{}].",
batch.getCount(), batch.getPartitionKey());
logger.atVerbose()
.addKeyValue("size", batch.getCount())
.addKeyValue(PARTITION_KEY_KEY, batch.getPartitionKey())
.log("Sending batch.");
} else {
logger.verbose("Sending batch with size[{}] to be distributed round-robin in service.", batch.getCount());
logger.atVerbose()
.addKeyValue("size", batch.getCount())
.log("Sending batch to be distributed round-robin in service.");
}

final String partitionKey = batch.getPartitionKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import java.time.Duration;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -40,7 +41,7 @@
public class EventProcessorClient {

private static final long BASE_JITTER_IN_SECONDS = 2; // the initial delay jitter before starting the processor
private final ClientLogger logger = new ClientLogger(EventProcessorClient.class);
private final ClientLogger logger;

private final String identifier;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
Expand Down Expand Up @@ -90,12 +91,16 @@ public class EventProcessorClient {

this.checkpointStore = Objects.requireNonNull(checkpointStore, "checkpointStore cannot be null");
this.identifier = UUID.randomUUID().toString();

Map<String, Object> loggingContext = new HashMap<>();
loggingContext.put("eventProcessorId", identifier);

this.logger = new ClientLogger(EventProcessorClient.class, loggingContext);
this.fullyQualifiedNamespace = eventHubAsyncClient.getFullyQualifiedNamespace().toLowerCase(Locale.ROOT);
this.eventHubName = eventHubAsyncClient.getEventHubName().toLowerCase(Locale.ROOT);
this.consumerGroup = consumerGroup.toLowerCase(Locale.ROOT);
this.loadBalancerUpdateInterval = loadBalancerUpdateInterval;

logger.info("The instance ID for this event processors is {}", this.identifier);
this.partitionPumpManager = new PartitionPumpManager(checkpointStore, partitionProcessorFactory,
eventHubClientBuilder, trackLastEnqueuedEventProperties, tracerProvider, initialPartitionEventPosition,
maxBatchSize, maxWaitTime, batchReceiveMode);
Expand Down Expand Up @@ -138,7 +143,7 @@ public synchronized void start() {
logger.info("Event processor is already running");
return;
}
logger.info("Starting a new event processor instance with id {}", this.identifier);
logger.info("Starting a new event processor instance.");

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
scheduler.set(executor);
Expand Down
Loading