Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@
import com.azure.core.annotation.ServiceClient;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ErrorContext;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.Disposable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import reactor.core.Disposable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
* EventProcessorClient provides a convenient mechanism to consume events from all partitions of an Event Hub in the
Expand Down Expand Up @@ -54,33 +54,31 @@ public class EventProcessorClient {
* @param eventHubClientBuilder The {@link EventHubClientBuilder}.
* @param consumerGroup The consumer group name used in this event processor to consumer events.
* @param partitionProcessorFactory The factory to create new partition processor(s).
* @param initialEventPosition Initial event position to start consuming events.
* @param checkpointStore The store used for reading and updating partition ownership and checkpoints. information.
* @param trackLastEnqueuedEventProperties If set to {@code true}, all events received by this
* EventProcessorClient will also include the last enqueued event properties for it's respective partitions.
* @param tracerProvider The tracer implementation.
*/
EventProcessorClient(EventHubClientBuilder eventHubClientBuilder, String consumerGroup,
Supplier<PartitionProcessor> partitionProcessorFactory, EventPosition initialEventPosition,
CheckpointStore checkpointStore, boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider) {
Supplier<PartitionProcessor> partitionProcessorFactory, CheckpointStore checkpointStore,
boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider, Consumer<ErrorContext> processError) {

Objects.requireNonNull(eventHubClientBuilder, "eventHubClientBuilder cannot be null.");
Objects.requireNonNull(consumerGroup, "consumerGroup cannot be null.");
Objects.requireNonNull(partitionProcessorFactory, "partitionProcessorFactory cannot be null.");
Objects.requireNonNull(initialEventPosition, "initialEventPosition cannot be null.");

this.checkpointStore = Objects.requireNonNull(checkpointStore, "checkpointStore cannot be null");
this.identifier = UUID.randomUUID().toString();
logger.info("The instance ID for this event processors is {}", this.identifier);
this.partitionPumpManager = new PartitionPumpManager(checkpointStore, partitionProcessorFactory,
initialEventPosition, eventHubClientBuilder, trackLastEnqueuedEventProperties, tracerProvider);
eventHubClientBuilder, trackLastEnqueuedEventProperties, tracerProvider);
EventHubAsyncClient eventHubAsyncClient = eventHubClientBuilder.buildAsyncClient();
this.partitionBasedLoadBalancer =
new PartitionBasedLoadBalancer(this.checkpointStore, eventHubAsyncClient,
eventHubAsyncClient.getFullyQualifiedNamespace().toLowerCase(Locale.ROOT),
eventHubAsyncClient.getEventHubName().toLowerCase(Locale.ROOT),
consumerGroup.toLowerCase(Locale.ROOT), identifier, TimeUnit.MINUTES.toSeconds(1),
partitionPumpManager);
partitionPumpManager, processError);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ public EventProcessorClient buildEventProcessorClient() {

final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class));
return new EventProcessorClient(eventHubClientBuilder, this.consumerGroup,
getPartitionProcessorSupplier(), EventPosition.earliest(), checkpointStore,
trackLastEnqueuedEventProperties, tracerProvider);
getPartitionProcessorSupplier(), checkpointStore, trackLastEnqueuedEventProperties, tracerProvider,
processError);
}

private Supplier<PartitionProcessor> getPartitionProcessorSupplier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.util.function.Consumer;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
Expand Down Expand Up @@ -47,6 +50,8 @@ final class PartitionBasedLoadBalancer {
private final long inactiveTimeLimitInSeconds;
private final PartitionPumpManager partitionPumpManager;
private final String fullyQualifiedNamespace;
private final Consumer<ErrorContext> processError;
private final PartitionContext partitionAgnosticContext;

/**
* Creates an instance of PartitionBasedLoadBalancer for the given Event Hub name and consumer group.
Expand All @@ -60,11 +65,13 @@ final class PartitionBasedLoadBalancer {
* assuming the owner of the partition is inactive.
* @param partitionPumpManager The partition pump manager that keeps track of all EventHubConsumers and partitions
* that this {@link EventProcessorClient} is processing.
* @param processError The callback that will be called when an error occurs while running the load balancer.
*/
PartitionBasedLoadBalancer(final CheckpointStore checkpointStore,
final EventHubAsyncClient eventHubAsyncClient, final String fullyQualifiedNamespace,
final String eventHubName, final String consumerGroupName, final String ownerId,
final long inactiveTimeLimitInSeconds, final PartitionPumpManager partitionPumpManager) {
final long inactiveTimeLimitInSeconds, final PartitionPumpManager partitionPumpManager,
final Consumer<ErrorContext> processError) {
this.checkpointStore = checkpointStore;
this.eventHubAsyncClient = eventHubAsyncClient;
this.fullyQualifiedNamespace = fullyQualifiedNamespace;
Expand All @@ -73,6 +80,9 @@ final class PartitionBasedLoadBalancer {
this.ownerId = ownerId;
this.inactiveTimeLimitInSeconds = inactiveTimeLimitInSeconds;
this.partitionPumpManager = partitionPumpManager;
this.processError = processError;
this.partitionAgnosticContext = new PartitionContext(fullyQualifiedNamespace, eventHubName,
consumerGroupName, "NONE");
}

/**
Expand Down Expand Up @@ -107,8 +117,12 @@ void loadBalance() {
Mono.zip(partitionOwnershipMono, partitionsMono)
.flatMap(this::loadBalance)
// if there was an error, log warning and TODO: call user provided error handler
.doOnError(ex -> logger.warning(Messages.LOAD_BALANCING_FAILED, ex.getMessage()))
.subscribe();
.subscribe(ignored -> { },
ex -> {
logger.warning(Messages.LOAD_BALANCING_FAILED, ex.getMessage());
ErrorContext errorContext = new ErrorContext(partitionAgnosticContext, ex);
processError.accept(errorContext);
}, () -> logger.info("Load balancing completed successfully"));
}

/*
Expand Down Expand Up @@ -350,7 +364,10 @@ private void claimOwnership(final Map<String, PartitionOwnership> partitionOwner
.stream()
.forEach(po -> partitionPumpManager.startPartitionPump(po,
ownedPartitionCheckpointsTuple.getT2().get(po.getPartitionId())));
});
},
ex -> {
throw logger.logExceptionAsError(new RuntimeException("Error while listing checkpoints", ex));
});
}

private PartitionOwnership createPartitionOwnershipRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.exception.AmqpException;
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.SCOPE_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
Expand All @@ -12,28 +17,21 @@
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.CloseContext;
import com.azure.messaging.eventhubs.models.CloseReason;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.InitializationContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import reactor.core.publisher.Signal;

import java.io.Closeable;
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.SCOPE_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import reactor.core.publisher.Signal;

/**
* The partition pump manager that keeps track of all the partition pumps started by this {@link EventProcessorClient}.
Expand All @@ -52,7 +50,6 @@ class PartitionPumpManager {
private final CheckpointStore checkpointStore;
private final Map<String, EventHubConsumerAsyncClient> partitionPumps = new ConcurrentHashMap<>();
private final Supplier<PartitionProcessor> partitionProcessorFactory;
private final EventPosition initialEventPosition;
private final EventHubClientBuilder eventHubClientBuilder;
private final TracerProvider tracerProvider;
private final boolean trackLastEnqueuedEventProperties;
Expand All @@ -63,21 +60,17 @@ class PartitionPumpManager {
* @param checkpointStore The partition manager that is used to store and update checkpoints.
* @param partitionProcessorFactory The partition processor factory that is used to create new instances of {@link
* PartitionProcessor} when new partition pumps are started.
* @param initialEventPosition The initial event position to use when a new partition pump is created and no
* checkpoint for the partition is available.
* @param eventHubClientBuilder The client builder used to create new clients (and new connections) for each
* partition processed by this {@link EventProcessorClient}.
* @param trackLastEnqueuedEventProperties If set to {@code true}, all events received by this
* EventProcessorClient will also include the last enqueued event properties for it's respective partitions.
* @param trackLastEnqueuedEventProperties If set to {@code true}, all events received by this EventProcessorClient
* will also include the last enqueued event properties for it's respective partitions.
* @param tracerProvider The tracer implementation.
*/
PartitionPumpManager(CheckpointStore checkpointStore,
Supplier<PartitionProcessor> partitionProcessorFactory, EventPosition initialEventPosition,
EventHubClientBuilder eventHubClientBuilder, boolean trackLastEnqueuedEventProperties,
TracerProvider tracerProvider) {
Supplier<PartitionProcessor> partitionProcessorFactory, EventHubClientBuilder eventHubClientBuilder,
boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider) {
this.checkpointStore = checkpointStore;
this.partitionProcessorFactory = partitionProcessorFactory;
this.initialEventPosition = initialEventPosition;
this.eventHubClientBuilder = eventHubClientBuilder;
this.trackLastEnqueuedEventProperties = trackLastEnqueuedEventProperties;
this.tracerProvider = tracerProvider;
Expand Down Expand Up @@ -121,12 +114,17 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi
partitionProcessor.initialize(initializationContext);

EventPosition startFromEventPosition = null;
// A checkpoint indicates the last known successfully processed event.
// So, the event position to start a new partition processing should be exclusive of the
// offset/sequence number in the checkpoint. If no checkpoint is available, start from
// the position in set in the InitializationContext (either the earliest event in the partition or
// the user provided initial position)
if (checkpoint != null && checkpoint.getOffset() != null) {
startFromEventPosition = EventPosition.fromOffset(checkpoint.getOffset());
} else if (checkpoint != null && checkpoint.getSequenceNumber() != null) {
startFromEventPosition = EventPosition.fromSequenceNumber(checkpoint.getSequenceNumber(), true);
startFromEventPosition = EventPosition.fromSequenceNumber(checkpoint.getSequenceNumber());
} else {
startFromEventPosition = initialEventPosition;
startFromEventPosition = initializationContext.getInitialPosition();
}

ReceiveOptions receiveOptions = new ReceiveOptions().setOwnerLevel(0L)
Expand All @@ -151,42 +149,27 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi
partitionProcessor.processEvent(new EventContext(partitionContext, eventData, checkpointStore,
partitionEvent.getLastEnqueuedEventProperties()));
endProcessTracingSpan(processSpanContext, Signal.complete());
} catch (Exception ex) {
/* event processing threw an exception */
handleProcessingError(claimedOwnership, partitionProcessor, ex, partitionContext);
endProcessTracingSpan(processSpanContext, Signal.error(ex));

} catch (Throwable throwable) {
/* user code for event processing threw an exception - log and bubble up */
endProcessTracingSpan(processSpanContext, Signal.error(throwable));
throw logger.logExceptionAsError(new RuntimeException("Error in event processing callback",
throwable));
}
}, /* EventHubConsumer receive() returned an error */
ex -> handleReceiveError(claimedOwnership, eventHubConsumer, partitionProcessor, ex, partitionContext),
() -> partitionProcessor.close(new CloseContext(partitionContext,
CloseReason.EVENT_PROCESSOR_SHUTDOWN)));
// @formatter:on
}

private void handleProcessingError(PartitionOwnership claimedOwnership, PartitionProcessor partitionProcessor,
Throwable error, PartitionContext partitionContext) {
try {
// There was an error in process event (user provided code), call process error and if that
// also fails just log and continue
partitionProcessor.processError(new ErrorContext(partitionContext, error));
} catch (Exception ex) {
logger.warning(Messages.FAILED_WHILE_PROCESSING_ERROR, claimedOwnership.getPartitionId(), ex);
}
// @formatter:on
}

private void handleReceiveError(PartitionOwnership claimedOwnership, EventHubConsumerAsyncClient eventHubConsumer,
PartitionProcessor partitionProcessor, Throwable error, PartitionContext partitionContext) {
PartitionProcessor partitionProcessor, Throwable throwable, PartitionContext partitionContext) {
try {
logger.warning("Error receiving events for partition {}", partitionContext.getPartitionId(), throwable);
// if there was an error on receive, it also marks the end of the event data stream
partitionProcessor.processError(new ErrorContext(partitionContext, error));
CloseReason closeReason = CloseReason.EVENT_HUB_EXCEPTION;
// If the exception indicates that the partition was stolen (i.e some other consumer with same ownerlevel
// started consuming the partition), update the closeReason
// TODO: Find right exception type to determine stolen partition
if (error instanceof AmqpException) {
closeReason = CloseReason.LOST_PARTITION_OWNERSHIP;
}
partitionProcessor.processError(new ErrorContext(partitionContext, throwable));
// Any exception while receiving events will result in the processor losing ownership
CloseReason closeReason = CloseReason.LOST_PARTITION_OWNERSHIP;
partitionProcessor.close(new CloseContext(partitionContext, closeReason));
} catch (Exception ex) {
logger.warning(Messages.FAILED_PROCESSING_ERROR_RECEIVE, claimedOwnership.getPartitionId(), ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,4 @@ public enum CloseReason {
* PartitionProcessor#close(CloseContext)} will be called with this reason.
*/
EVENT_PROCESSOR_SHUTDOWN,

/**
* If a non-retryable exception occured when receiving events from Event Hub, this reason will be provided when
* {@link PartitionProcessor#close(CloseContext)} is called.
*/
EVENT_HUB_EXCEPTION
}
Loading