diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java index 52104c1184fd..5f138fa8a1a5 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java @@ -8,7 +8,9 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.implementation.PartitionProcessor; import com.azure.messaging.eventhubs.models.ErrorContext; +import com.azure.messaging.eventhubs.models.EventPosition; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; @@ -40,7 +42,7 @@ public class EventProcessorClient { private final ClientLogger logger = new ClientLogger(EventProcessorClient.class); private final String identifier; - private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean isRunning = new AtomicBoolean(false); private final PartitionPumpManager partitionPumpManager; private final PartitionBasedLoadBalancer partitionBasedLoadBalancer; private final CheckpointStore checkpointStore; @@ -55,13 +57,16 @@ public class EventProcessorClient { * @param consumerGroup The consumer group name used in this event processor to consumer events. * @param partitionProcessorFactory The factory to create new partition processor(s). * @param checkpointStore The store used for reading and updating partition ownership and checkpoints. information. - * @param trackLastEnqueuedEventProperties If set to {@code true}, all events received by this - * EventProcessorClient will also include the last enqueued event properties for it's respective partitions. + * @param trackLastEnqueuedEventProperties If set to {@code true}, all events received by this EventProcessorClient + * will also include the last enqueued event properties for it's respective partitions. * @param tracerProvider The tracer implementation. + * @param processError Error handler for any errors that occur outside the context of a partition. + * @param initialPartitionEventPosition Map of initial event positions for partition ids. */ EventProcessorClient(EventHubClientBuilder eventHubClientBuilder, String consumerGroup, - Supplier partitionProcessorFactory, CheckpointStore checkpointStore, - boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider, Consumer processError) { + Supplier partitionProcessorFactory, CheckpointStore checkpointStore, + boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider, Consumer processError, + Map initialPartitionEventPosition) { Objects.requireNonNull(eventHubClientBuilder, "eventHubClientBuilder cannot be null."); Objects.requireNonNull(consumerGroup, "consumerGroup cannot be null."); @@ -71,7 +76,7 @@ public class EventProcessorClient { this.identifier = UUID.randomUUID().toString(); logger.info("The instance ID for this event processors is {}", this.identifier); this.partitionPumpManager = new PartitionPumpManager(checkpointStore, partitionProcessorFactory, - eventHubClientBuilder, trackLastEnqueuedEventProperties, tracerProvider); + eventHubClientBuilder, trackLastEnqueuedEventProperties, tracerProvider, initialPartitionEventPosition); EventHubAsyncClient eventHubAsyncClient = eventHubClientBuilder.buildAsyncClient(); this.partitionBasedLoadBalancer = new PartitionBasedLoadBalancer(this.checkpointStore, eventHubAsyncClient, @@ -103,7 +108,7 @@ public String getIdentifier() { * {@codesnippet com.azure.messaging.eventhubs.eventprocessorclient.startstop} */ public synchronized void start() { - if (!started.compareAndSet(false, true)) { + if (!isRunning.compareAndSet(false, true)) { logger.info("Event processor is already running"); return; } @@ -127,7 +132,7 @@ public synchronized void start() { * {@codesnippet com.azure.messaging.eventhubs.eventprocessorclient.startstop} */ public synchronized void stop() { - if (!started.compareAndSet(true, false)) { + if (!isRunning.compareAndSet(true, false)) { logger.info("Event processor has already stopped"); return; } @@ -135,4 +140,14 @@ public synchronized void stop() { scheduler.get().dispose(); this.partitionPumpManager.stopAllPartitionPumps(); } + + /** + * Returns {@code true} if the event processor is running. If the event processor is already running, calling + * {@link #start()} has no effect. + * + * @return {@code true} if the event processor is running. + */ + public synchronized boolean isRunning() { + return isRunning.get(); + } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java index daafe27c3d68..1b1d0637b8df 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java @@ -19,6 +19,8 @@ import com.azure.messaging.eventhubs.models.EventPosition; import com.azure.messaging.eventhubs.models.ErrorContext; import com.azure.messaging.eventhubs.models.InitializationContext; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.ServiceLoader; import java.util.function.Consumer; @@ -73,6 +75,7 @@ public class EventProcessorClientBuilder { private Consumer processPartitionInitialization; private Consumer processPartitionClose; private boolean trackLastEnqueuedEventProperties; + private Map initialPartitionEventPosition = new HashMap<>(); /** * Creates a new instance of {@link EventProcessorClientBuilder}. @@ -289,7 +292,6 @@ public EventProcessorClientBuilder processPartitionClose(Consumer * * @param trackLastEnqueuedEventProperties {@code true} if the resulting events will keep track of the last * enqueued information for that partition; {@code false} otherwise. - * * @return The updated {@link EventProcessorClientBuilder} instance. */ public EventProcessorClientBuilder trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties) { @@ -297,6 +299,23 @@ public EventProcessorClientBuilder trackLastEnqueuedEventProperties(boolean trac return this; } + /** + * Sets the map containing the event position to use for each partition if a checkpoint for the partition does not + * exist in {@link CheckpointStore}. This map is keyed off of the partition id. If there is no checkpoint in + * {@link CheckpointStore} and there is no entry in this map, the processing of the partition will start from + * {@link EventPosition#latest() latest} position. + * + * @param initialPartitionEventPosition Map of initial event positions for partition ids. + * @return The updated {@link EventProcessorClientBuilder} instance. + */ + public EventProcessorClientBuilder initialPartitionEventPosition( + Map initialPartitionEventPosition) { + + this.initialPartitionEventPosition = Objects.requireNonNull(initialPartitionEventPosition, + "'initialPartitionEventPosition' cannot be null."); + return this; + } + /** * This will create a new {@link EventProcessorClient} configured with the options set in this builder. Each call to * this method will return a new instance of {@link EventProcessorClient}. @@ -322,7 +341,7 @@ public EventProcessorClient buildEventProcessorClient() { final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class)); return new EventProcessorClient(eventHubClientBuilder, this.consumerGroup, getPartitionProcessorSupplier(), checkpointStore, trackLastEnqueuedEventProperties, tracerProvider, - processError); + processError, initialPartitionEventPosition); } private Supplier getPartitionProcessorSupplier() { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java index 5a89d6d498f0..6caee9db26a7 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java @@ -207,7 +207,11 @@ private Mono loadBalance(final Tuple2, Lis // If the partitions are evenly distributed among all active event processors, no change required. logger.info("Load is balanced"); // renew ownership of already owned partitions - checkpointStore.claimOwnership(ownerPartitionMap.get(this.ownerId)).subscribe(); + checkpointStore.claimOwnership(partitionPumpManager.getPartitionPumps().keySet() + .stream() + .map(partitionId -> createPartitionOwnershipRequest(partitionOwnershipMap, partitionId)) + .collect(Collectors.toList())) + .subscribe(); return; } @@ -216,7 +220,11 @@ private Mono loadBalance(final Tuple2, Lis logger.info("This event processor owns {} partitions and shouldn't own more", ownerPartitionMap.get(ownerId).size()); // renew ownership of already owned partitions - checkpointStore.claimOwnership(ownerPartitionMap.get(this.ownerId)).subscribe(); + checkpointStore.claimOwnership(partitionPumpManager.getPartitionPumps().keySet() + .stream() + .map(partitionId -> createPartitionOwnershipRequest(partitionOwnershipMap, partitionId)) + .collect(Collectors.toList())) + .subscribe(); return; } @@ -345,11 +353,16 @@ private void claimOwnership(final Map partitionOwner PartitionOwnership ownershipRequest = createPartitionOwnershipRequest(partitionOwnershipMap, partitionIdToClaim); - List currentPartitionsOwned = ownerPartitionsMap.get(ownerId); - currentPartitionsOwned.add(ownershipRequest); + List partitionsToClaim = new ArrayList<>(); + partitionsToClaim.add(ownershipRequest); + partitionsToClaim.addAll(partitionPumpManager.getPartitionPumps() + .keySet() + .stream() + .map(partitionId -> createPartitionOwnershipRequest(partitionOwnershipMap, partitionId)) + .collect(Collectors.toList())); checkpointStore - .claimOwnership(currentPartitionsOwned) + .claimOwnership(partitionsToClaim) .timeout(Duration.ofMinutes(1)) // TODO: configurable .doOnNext(partitionOwnership -> logger.info("Successfully claimed ownership of partition {}", partitionOwnership.getPartitionId())) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java index a620f1392c2e..6694fd6741f4 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java @@ -14,6 +14,7 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.core.util.tracing.ProcessKind; import com.azure.messaging.eventhubs.implementation.PartitionProcessor; +import com.azure.messaging.eventhubs.implementation.PartitionProcessorException; import com.azure.messaging.eventhubs.models.Checkpoint; import com.azure.messaging.eventhubs.models.CloseContext; import com.azure.messaging.eventhubs.models.CloseReason; @@ -22,6 +23,7 @@ import com.azure.messaging.eventhubs.models.EventPosition; import com.azure.messaging.eventhubs.models.InitializationContext; import com.azure.messaging.eventhubs.models.PartitionContext; +import com.azure.messaging.eventhubs.models.PartitionEvent; import com.azure.messaging.eventhubs.models.PartitionOwnership; import com.azure.messaging.eventhubs.models.ReceiveOptions; import java.io.Closeable; @@ -53,6 +55,7 @@ class PartitionPumpManager { private final EventHubClientBuilder eventHubClientBuilder; private final TracerProvider tracerProvider; private final boolean trackLastEnqueuedEventProperties; + private final Map initialPartitionEventPosition; /** * Creates an instance of partition pump manager. @@ -65,15 +68,18 @@ class PartitionPumpManager { * @param trackLastEnqueuedEventProperties If set to {@code true}, all events received by this EventProcessorClient * will also include the last enqueued event properties for it's respective partitions. * @param tracerProvider The tracer implementation. + * @param initialPartitionEventPosition Map of initial event positions for partition ids. */ PartitionPumpManager(CheckpointStore checkpointStore, Supplier partitionProcessorFactory, EventHubClientBuilder eventHubClientBuilder, - boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider) { + boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider, + Map initialPartitionEventPosition) { this.checkpointStore = checkpointStore; this.partitionProcessorFactory = partitionProcessorFactory; this.eventHubClientBuilder = eventHubClientBuilder; this.trackLastEnqueuedEventProperties = trackLastEnqueuedEventProperties; this.tracerProvider = tracerProvider; + this.initialPartitionEventPosition = initialPartitionEventPosition; } /** @@ -104,83 +110,112 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi return; } - PartitionContext partitionContext = new PartitionContext(claimedOwnership.getFullyQualifiedNamespace(), - claimedOwnership.getEventHubName(), claimedOwnership.getConsumerGroup(), - claimedOwnership.getPartitionId()); - PartitionProcessor partitionProcessor = this.partitionProcessorFactory.get(); - - InitializationContext initializationContext = new InitializationContext(partitionContext, - EventPosition.earliest()); - partitionProcessor.initialize(initializationContext); - - EventPosition startFromEventPosition = null; - // A checkpoint indicates the last known successfully processed event. - // So, the event position to start a new partition processing should be exclusive of the - // offset/sequence number in the checkpoint. If no checkpoint is available, start from - // the position in set in the InitializationContext (either the earliest event in the partition or - // the user provided initial position) - if (checkpoint != null && checkpoint.getOffset() != null) { - startFromEventPosition = EventPosition.fromOffset(checkpoint.getOffset()); - } else if (checkpoint != null && checkpoint.getSequenceNumber() != null) { - startFromEventPosition = EventPosition.fromSequenceNumber(checkpoint.getSequenceNumber()); - } else { - startFromEventPosition = initializationContext.getInitialPosition(); + try { + PartitionContext partitionContext = new PartitionContext(claimedOwnership.getFullyQualifiedNamespace(), + claimedOwnership.getEventHubName(), claimedOwnership.getConsumerGroup(), + claimedOwnership.getPartitionId()); + PartitionProcessor partitionProcessor = this.partitionProcessorFactory.get(); + + InitializationContext initializationContext = new InitializationContext(partitionContext); + partitionProcessor.initialize(initializationContext); + + EventPosition startFromEventPosition = null; + // A checkpoint indicates the last known successfully processed event. + // So, the event position to start a new partition processing should be exclusive of the + // offset/sequence number in the checkpoint. If no checkpoint is available, start from + // the position in set in the InitializationContext (either the earliest event in the partition or + // the user provided initial position) + if (checkpoint != null && checkpoint.getOffset() != null) { + startFromEventPosition = EventPosition.fromOffset(checkpoint.getOffset()); + } else if (checkpoint != null && checkpoint.getSequenceNumber() != null) { + startFromEventPosition = EventPosition.fromSequenceNumber(checkpoint.getSequenceNumber()); + } else if (initialPartitionEventPosition.containsKey(claimedOwnership.getPartitionId())) { + startFromEventPosition = initialPartitionEventPosition.get(claimedOwnership.getPartitionId()); + } else { + startFromEventPosition = EventPosition.latest(); + } + + ReceiveOptions receiveOptions = new ReceiveOptions().setOwnerLevel(0L) + .setTrackLastEnqueuedEventProperties(trackLastEnqueuedEventProperties); + + EventHubConsumerAsyncClient eventHubConsumer = eventHubClientBuilder.buildAsyncClient() + .createConsumer(claimedOwnership.getConsumerGroup(), EventHubClientBuilder.DEFAULT_PREFETCH_COUNT); + + partitionPumps.put(claimedOwnership.getPartitionId(), eventHubConsumer); + eventHubConsumer + .receiveFromPartition(claimedOwnership.getPartitionId(), startFromEventPosition, receiveOptions) + .subscribe(partitionEvent -> processEvent(partitionContext, partitionProcessor, eventHubConsumer, + partitionEvent), + /* EventHubConsumer receive() returned an error */ + ex -> handleError(claimedOwnership, eventHubConsumer, partitionProcessor, ex, partitionContext), + () -> partitionProcessor.close(new CloseContext(partitionContext, + CloseReason.EVENT_PROCESSOR_SHUTDOWN))); + } catch (Exception ex) { + if (partitionPumps.containsKey(claimedOwnership.getPartitionId())) { + cleanup(claimedOwnership, partitionPumps.get(claimedOwnership.getPartitionId())); + } + throw logger.logExceptionAsError( + new PartitionProcessorException( + "Error occurred while starting partition pump for partition " + claimedOwnership.getPartitionId(), + ex)); } - ReceiveOptions receiveOptions = new ReceiveOptions().setOwnerLevel(0L) - .setTrackLastEnqueuedEventProperties(trackLastEnqueuedEventProperties); - - EventHubConsumerAsyncClient eventHubConsumer = eventHubClientBuilder.buildAsyncClient() - .createConsumer(claimedOwnership.getConsumerGroup(), EventHubClientBuilder.DEFAULT_PREFETCH_COUNT); - - partitionPumps.put(claimedOwnership.getPartitionId(), eventHubConsumer); - - // The indentation required by checkstyle is different from the indentation IntelliJ uses. - // @formatter:off - eventHubConsumer.receiveFromPartition(claimedOwnership.getPartitionId(), startFromEventPosition, receiveOptions) - .subscribe(partitionEvent -> { - EventData eventData = partitionEvent.getData(); - Context processSpanContext = startProcessTracingSpan(eventData, eventHubConsumer.getEventHubName(), - eventHubConsumer.getFullyQualifiedNamespace()); - if (processSpanContext.getData(SPAN_CONTEXT_KEY).isPresent()) { - eventData.addContext(SPAN_CONTEXT_KEY, processSpanContext); - } - try { - partitionProcessor.processEvent(new EventContext(partitionContext, eventData, checkpointStore, - partitionEvent.getLastEnqueuedEventProperties())); - endProcessTracingSpan(processSpanContext, Signal.complete()); - } catch (Throwable throwable) { - /* user code for event processing threw an exception - log and bubble up */ - endProcessTracingSpan(processSpanContext, Signal.error(throwable)); - throw logger.logExceptionAsError(new RuntimeException("Error in event processing callback", - throwable)); - } - }, /* EventHubConsumer receive() returned an error */ - ex -> handleReceiveError(claimedOwnership, eventHubConsumer, partitionProcessor, ex, partitionContext), - () -> partitionProcessor.close(new CloseContext(partitionContext, - CloseReason.EVENT_PROCESSOR_SHUTDOWN))); - // @formatter:on } - private void handleReceiveError(PartitionOwnership claimedOwnership, EventHubConsumerAsyncClient eventHubConsumer, - PartitionProcessor partitionProcessor, Throwable throwable, PartitionContext partitionContext) { + private void processEvent(PartitionContext partitionContext, PartitionProcessor partitionProcessor, + EventHubConsumerAsyncClient eventHubConsumer, PartitionEvent partitionEvent) { + EventData eventData = partitionEvent.getData(); + Context processSpanContext = startProcessTracingSpan(eventData, eventHubConsumer.getEventHubName(), + eventHubConsumer.getFullyQualifiedNamespace()); + if (processSpanContext.getData(SPAN_CONTEXT_KEY).isPresent()) { + eventData.addContext(SPAN_CONTEXT_KEY, processSpanContext); + } try { - logger.warning("Error receiving events for partition {}", partitionContext.getPartitionId(), throwable); - // if there was an error on receive, it also marks the end of the event data stream + partitionProcessor.processEvent(new EventContext(partitionContext, eventData, checkpointStore, + partitionEvent.getLastEnqueuedEventProperties())); + endProcessTracingSpan(processSpanContext, Signal.complete()); + } catch (Throwable throwable) { + /* user code for event processing threw an exception - log and bubble up */ + endProcessTracingSpan(processSpanContext, Signal.error(throwable)); + throw logger.logExceptionAsError(new PartitionProcessorException("Error in event processing callback", + throwable)); + } + } + + Map getPartitionPumps() { + return this.partitionPumps; + } + + private void handleError(PartitionOwnership claimedOwnership, EventHubConsumerAsyncClient eventHubConsumer, + PartitionProcessor partitionProcessor, Throwable throwable, PartitionContext partitionContext) { + boolean shouldRethrow = true; + if (!(throwable instanceof PartitionProcessorException)) { + shouldRethrow = false; + // If user code threw an exception in processEvent callback, bubble up the exception + logger.warning("Error receiving events from partition {}", partitionContext.getPartitionId(), throwable); partitionProcessor.processError(new ErrorContext(partitionContext, throwable)); - // Any exception while receiving events will result in the processor losing ownership - CloseReason closeReason = CloseReason.LOST_PARTITION_OWNERSHIP; - partitionProcessor.close(new CloseContext(partitionContext, closeReason)); - } catch (Exception ex) { - logger.warning(Messages.FAILED_PROCESSING_ERROR_RECEIVE, claimedOwnership.getPartitionId(), ex); + } + // If there was an error on receive, it also marks the end of the event data stream + // Any exception while receiving events will result in the processor losing ownership + CloseReason closeReason = CloseReason.LOST_PARTITION_OWNERSHIP; + partitionProcessor.close(new CloseContext(partitionContext, closeReason)); + cleanup(claimedOwnership, eventHubConsumer); + if (shouldRethrow) { + PartitionProcessorException exception = (PartitionProcessorException) throwable; + throw logger.logExceptionAsError(exception); + } + } + + private void cleanup(PartitionOwnership claimedOwnership, EventHubConsumerAsyncClient eventHubConsumer) { + try { + // close the consumer + logger.info("Closing consumer for partition id {}", claimedOwnership.getPartitionId()); + eventHubConsumer.close(); } finally { - try { - // close the consumer - eventHubConsumer.close(); - } finally { - // finally, remove the partition from partitionPumps map - partitionPumps.remove(claimedOwnership.getPartitionId()); - } + // finally, remove the partition from partitionPumps map + logger.info("Removing partition id {} from list of processing partitions", + claimedOwnership.getPartitionId()); + partitionPumps.remove(claimedOwnership.getPartitionId()); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionProcessorException.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionProcessorException.java new file mode 100644 index 000000000000..2b2f13b00fb0 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionProcessorException.java @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.implementation; + +import com.azure.core.exception.AzureException; + +/** + * Exception to wrap around the exception that was thrown from user's process event callback. + */ +public class PartitionProcessorException extends AzureException { + private static final long serialVersionUID = 6842246662817290407L; + + /** + * Creates and instance of this exception with the given message. + * @param message The error message. + */ + public PartitionProcessorException(String message) { + super(message); + } + + /** + * Creates an instance of this exception with the given message and cause. + * @param message The error message. + * @param cause The underlying cause for this exception. + */ + public PartitionProcessorException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/InitializationContext.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/InitializationContext.java index b7171ed26634..91ea189e1050 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/InitializationContext.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/InitializationContext.java @@ -3,7 +3,6 @@ package com.azure.messaging.eventhubs.models; -import com.azure.messaging.eventhubs.CheckpointStore; import com.azure.messaging.eventhubs.EventProcessorClient; import com.azure.messaging.eventhubs.EventProcessorClientBuilder; import java.util.Objects; @@ -17,19 +16,15 @@ public class InitializationContext { private final PartitionContext partitionContext; - private EventPosition initialPosition; /** * Creates an instance of InitializationContext for the partition provided in the {@link PartitionContext}. * * @param partitionContext The partition information for which the event processing is going to start. - * @param initialPosition The default initial event position from which the processing will start in the absence of - * a checkpoint in {@link CheckpointStore}. * @throws NullPointerException if {@code partitionContext} or {@code initialPosition}is {@code null}. */ - public InitializationContext(final PartitionContext partitionContext, final EventPosition initialPosition) { + public InitializationContext(final PartitionContext partitionContext) { this.partitionContext = Objects.requireNonNull(partitionContext, "'partitionContext' cannot be null"); - this.initialPosition = Objects.requireNonNull(initialPosition, "'initialPosition' cannot be null"); } /** @@ -40,27 +35,4 @@ public InitializationContext(final PartitionContext partitionContext, final Even public PartitionContext getPartitionContext() { return partitionContext; } - - /** - * Returns the default initial event position from which the processing will start in the absence of a checkpoint in - * {@link CheckpointStore}. - * - * @return The default initial event position from which the processing will start in the absence of a checkpoint in - * {@link CheckpointStore}. - */ - public EventPosition getInitialPosition() { - return initialPosition; - } - - /** - * If a different initial position is desirable for this partition, setting the initial position will start the - * event processing from this position. Note that the checkpoint in {@link CheckpointStore} is given the highest - * priority and if there's a checkpoint in the store, that will be used regardless of what is set in this method. - * - * @param initialPosition The initial event position to start the event processing from. - * @throws NullPointerException if {@code initialPosition} is {@code null}. - */ - public void setInitialPosition(final EventPosition initialPosition) { - this.initialPosition = Objects.requireNonNull(initialPosition, "'initialPosition' cannot be null"); - } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientErrorHandlingTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientErrorHandlingTest.java index 21b799801c8b..2617fbb7ba85 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientErrorHandlingTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientErrorHandlingTest.java @@ -3,23 +3,30 @@ package com.azure.messaging.eventhubs; +import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_PREFETCH_COUNT; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.when; -import com.azure.messaging.eventhubs.implementation.ClientConstants; import com.azure.messaging.eventhubs.implementation.PartitionProcessor; import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.messaging.eventhubs.models.CloseContext; import com.azure.messaging.eventhubs.models.ErrorContext; import com.azure.messaging.eventhubs.models.EventContext; +import com.azure.messaging.eventhubs.models.EventPosition; +import com.azure.messaging.eventhubs.models.InitializationContext; +import com.azure.messaging.eventhubs.models.PartitionContext; +import com.azure.messaging.eventhubs.models.PartitionEvent; import com.azure.messaging.eventhubs.models.PartitionOwnership; -import java.net.URI; -import java.net.URISyntaxException; +import com.azure.messaging.eventhubs.models.ReceiveOptions; +import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -35,15 +42,6 @@ public class EventProcessorClientErrorHandlingTest { private static final String NAMESPACE_NAME = "dummyNamespaceName"; private static final String DEFAULT_DOMAIN_NAME = "servicebus.windows.net/"; - private static final String EVENT_HUB_NAME = "eventHubName"; - private static final String SHARED_ACCESS_KEY_NAME = "dummySasKeyName"; - private static final String SHARED_ACCESS_KEY = "dummySasKey"; - private static final String ENDPOINT = getURI(ClientConstants.ENDPOINT_FORMAT, NAMESPACE_NAME, DEFAULT_DOMAIN_NAME) - .toString(); - - private static final String CORRECT_CONNECTION_STRING = String - .format("Endpoint=%s;SharedAccessKeyName=%s;SharedAccessKey=%s;EntityPath=%s", - ENDPOINT, SHARED_ACCESS_KEY_NAME, SHARED_ACCESS_KEY, EVENT_HUB_NAME); @Mock private EventHubClientBuilder eventHubClientBuilder; @@ -54,16 +52,10 @@ public class EventProcessorClientErrorHandlingTest { @Mock private EventHubConsumerAsyncClient eventHubConsumer; - private CountDownLatch countDownLatch; + @Mock + private EventData eventData1; - private static URI getURI(String endpointFormat, String namespace, String domainName) { - try { - return new URI(String.format(Locale.US, endpointFormat, namespace, domainName)); - } catch (URISyntaxException exception) { - throw new IllegalArgumentException(String.format(Locale.US, - "Invalid namespace name: %s", namespace), exception); - } - } + private CountDownLatch countDownLatch; @BeforeEach public void setup() { @@ -85,7 +77,52 @@ public void testCheckpointStoreErrors(CheckpointStore checkpointStore) throws In Assertions.assertEquals("NONE", errorContext.getPartitionContext().getPartitionId()); Assertions.assertEquals("cg", errorContext.getPartitionContext().getConsumerGroup()); Assertions.assertTrue(errorContext.getThrowable() instanceof IllegalStateException); - }); + }, new HashMap<>()); + client.start(); + boolean completed = countDownLatch.await(3, TimeUnit.SECONDS); + client.stop(); + Assertions.assertTrue(completed); + } + + @Test + public void testProcessEventHandlerError() throws InterruptedException { + countDownLatch = new CountDownLatch(1); + when(eventHubAsyncClient.createConsumer("cg", DEFAULT_PREFETCH_COUNT)).thenReturn(eventHubConsumer); + when(eventHubConsumer.receiveFromPartition(anyString(), any(EventPosition.class), any(ReceiveOptions.class))) + .thenReturn(Flux.just(getEvent(eventData1))); + EventProcessorClient client = new EventProcessorClient(eventHubClientBuilder, "cg", + () -> new BadProcessEventHandler(countDownLatch), new InMemoryCheckpointStore(), false, + null, errorContext -> { }, new HashMap<>()); + client.start(); + boolean completed = countDownLatch.await(3, TimeUnit.SECONDS); + client.stop(); + Assertions.assertTrue(completed); + } + + @Test + public void testInitHandlerError() throws InterruptedException { + countDownLatch = new CountDownLatch(1); + when(eventHubAsyncClient.createConsumer("cg", DEFAULT_PREFETCH_COUNT)).thenReturn(eventHubConsumer); + when(eventHubConsumer.receiveFromPartition(anyString(), any(EventPosition.class), any(ReceiveOptions.class))) + .thenReturn(Flux.just(getEvent(eventData1))); + EventProcessorClient client = new EventProcessorClient(eventHubClientBuilder, "cg", + () -> new BadInitHandler(countDownLatch), new InMemoryCheckpointStore(), false, + null, errorContext -> { }, new HashMap<>()); + client.start(); + boolean completed = countDownLatch.await(3, TimeUnit.SECONDS); + client.stop(); + Assertions.assertTrue(completed); + } + + @Test + public void testCloseHandlerError() throws InterruptedException { + countDownLatch = new CountDownLatch(1); + when(eventHubAsyncClient.createConsumer("cg", DEFAULT_PREFETCH_COUNT)).thenReturn(eventHubConsumer); + when(eventHubConsumer.receiveFromPartition(anyString(), any(EventPosition.class), any(ReceiveOptions.class))) + .thenReturn(Flux.just(getEvent(eventData1))); + EventProcessorClient client = new EventProcessorClient(eventHubClientBuilder, "cg", + () -> new BadCloseHandler(countDownLatch), new InMemoryCheckpointStore(), false, + null, errorContext -> { }, new HashMap<>()); client.start(); boolean completed = countDownLatch.await(3, TimeUnit.SECONDS); client.stop(); @@ -99,6 +136,11 @@ private static Stream checkpointStoreSupplier() { Arguments.of(new ListCheckpointErrorStore())); } + private PartitionEvent getEvent(EventData event) { + PartitionContext context = new PartitionContext("ns", "foo", "bar", "baz"); + return new PartitionEvent(context, event, null); + } + private static class ListOwnershipErrorStore implements CheckpointStore { @@ -191,4 +233,75 @@ public void processError(ErrorContext errorContext) { return; } } + + private static final class BadProcessEventHandler extends PartitionProcessor { + + CountDownLatch countDownLatch; + + BadProcessEventHandler(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public void processEvent(EventContext eventContext) { + countDownLatch.countDown(); + throw new IllegalStateException("Process event error"); + } + + @Override + public void processError(ErrorContext errorContext) { + Assertions.fail("Process error handler should not be called when process event throws exception"); + return; + } + } + + private static final class BadInitHandler extends PartitionProcessor { + + CountDownLatch countDownLatch; + + BadInitHandler(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public void initialize(InitializationContext initContext) { + countDownLatch.countDown(); + throw new IllegalStateException("Init error"); + } + + @Override + public void processEvent(EventContext eventContext) { + Assertions.fail("Process event handler should not be called when there's an error during initialization"); + } + + @Override + public void processError(ErrorContext errorContext) { + Assertions.fail("Process error handler should not be called when process event throws exception"); + return; + } + } + + private static final class BadCloseHandler extends PartitionProcessor { + + CountDownLatch countDownLatch; + BadCloseHandler(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public void close(CloseContext closeContext) { + countDownLatch.countDown(); + throw new IllegalStateException("Close error"); + } + + @Override + public void processEvent(EventContext eventContext) { + // do nothing + } + + @Override + public void processError(ErrorContext errorContext) { + // do nothing + } + } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java index 7f1f47ad825b..1cc88baed6ed 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java @@ -140,7 +140,7 @@ public void testWithSimplePartitionProcessor() throws Exception { // Act final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", - () -> testPartitionProcessor, checkpointStore, false, tracerProvider, ec -> { }); + () -> testPartitionProcessor, checkpointStore, false, tracerProvider, ec -> { }, new HashMap<>()); eventProcessorClient.start(); TimeUnit.SECONDS.sleep(10); eventProcessorClient.stop(); @@ -170,117 +170,6 @@ public void testWithSimplePartitionProcessor() throws Exception { verify(consumer1, atLeastOnce()).close(); } - /** - * Tests {@link EventProcessorClient} with a partition processor that throws an exception when processing an event. - * - * @throws Exception if an error occurs while running the test. - */ - @Test - public void testWithFaultyPartitionProcessor() throws Exception { - // Arrange - final Tracer tracer1 = mock(Tracer.class); - final List tracers = Collections.singletonList(tracer1); - TracerProvider tracerProvider = new TracerProvider(tracers); - when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient); - when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns"); - when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh"); - when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1")); - when(eventHubAsyncClient - .createConsumer(anyString(), anyInt())) - .thenReturn(consumer1); - when(consumer1.receiveFromPartition(anyString(), any(EventPosition.class), any(ReceiveOptions.class))).thenReturn(Flux.just(getEvent(eventData1))); - String diagnosticId = "00-08ee063508037b1719dddcbf248e30e2-1365c684eb25daed-01"; - - final InMemoryCheckpointStore checkpointStore = new InMemoryCheckpointStore(); - final FaultyPartitionProcessor faultyPartitionProcessor = new FaultyPartitionProcessor(); - - when(tracer1.extractContext(eq(diagnosticId), any())).thenAnswer( - invocation -> { - Context passed = invocation.getArgument(1, Context.class); - return passed.addData(SPAN_CONTEXT_KEY, "value"); - } - ); - when(tracer1.start(eq("EventHubs.process"), any(), eq(ProcessKind.PROCESS))).thenAnswer( - invocation -> { - Context passed = invocation.getArgument(1, Context.class); - return passed.addData(SPAN_CONTEXT_KEY, "value1") - .addData("scope", (Closeable) () -> { - }) - .addData(PARENT_SPAN_KEY, "value2"); - } - ); - // Act - final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", - () -> faultyPartitionProcessor, checkpointStore, false, tracerProvider, ec -> { }); - - eventProcessorClient.start(); - TimeUnit.SECONDS.sleep(10); - eventProcessorClient.stop(); - - // Assert - assertTrue(faultyPartitionProcessor.error); - } - - /** - * Tests process start spans error messages invoked for {@link EventProcessorClient}. - * - * @throws Exception if an error occurs while running the test. - */ - @Test - public void testErrorProcessSpans() throws Exception { - //Arrange - final Tracer tracer1 = mock(Tracer.class); - final List tracers = Collections.singletonList(tracer1); - TracerProvider tracerProvider = new TracerProvider(tracers); - when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient); - when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns"); - when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh"); - when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1")); - when(eventHubAsyncClient - .createConsumer(anyString(), anyInt())) - .thenReturn(consumer1); - when(eventData1.getSequenceNumber()).thenReturn(1L); - when(eventData2.getSequenceNumber()).thenReturn(2L); - when(eventData1.getOffset()).thenReturn(1L); - when(eventData2.getOffset()).thenReturn(100L); - - String diagnosticId = "00-08ee063508037b1719dddcbf248e30e2-1365c684eb25daed-01"; - Map properties = new HashMap<>(); - properties.put(DIAGNOSTIC_ID_KEY, diagnosticId); - - when(eventData1.getProperties()).thenReturn(properties); - when(consumer1.receiveFromPartition(anyString(), any(EventPosition.class), any(ReceiveOptions.class))).thenReturn(Flux.just(getEvent(eventData1))); - when(tracer1.extractContext(eq(diagnosticId), any())).thenAnswer( - invocation -> { - Context passed = invocation.getArgument(1, Context.class); - return passed.addData(SPAN_CONTEXT_KEY, "value"); - } - ); - when(tracer1.start(eq("EventHubs.process"), any(), eq(ProcessKind.PROCESS))).thenAnswer( - invocation -> { - Context passed = invocation.getArgument(1, Context.class); - return passed.addData(SPAN_CONTEXT_KEY, "value1") - .addData("scope", (Closeable) () -> { - }) - .addData(PARENT_SPAN_KEY, "value2"); - } - ); - - final InMemoryCheckpointStore checkpointStore = new InMemoryCheckpointStore(); - - //Act - final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", - FaultyPartitionProcessor::new, checkpointStore, false, tracerProvider, ec -> { }); - eventProcessorClient.start(); - TimeUnit.SECONDS.sleep(10); - eventProcessorClient.stop(); - - //Assert - verify(tracer1, times(1)).extractContext(eq(diagnosticId), any()); - verify(tracer1, times(1)).start(eq("EventHubs.process"), any(), eq(ProcessKind.PROCESS)); - verify(tracer1, times(1)).end(eq(""), any(IllegalStateException.class), any()); - } - /** * Tests process start spans invoked for {@link EventProcessorClient}. * @@ -329,7 +218,7 @@ public void testProcessSpans() throws Exception { //Act final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", - TestPartitionProcessor::new, checkpointStore, false, tracerProvider, ec -> { }); + TestPartitionProcessor::new, checkpointStore, false, tracerProvider, ec -> { }, new HashMap<>()); eventProcessorClient.start(); TimeUnit.SECONDS.sleep(10); @@ -354,7 +243,7 @@ public void testWithMultiplePartitions() throws Exception { identifiers.add("1"); identifiers.add("2"); identifiers.add("3"); - final EventPosition position = EventPosition.earliest(); + final EventPosition position = EventPosition.latest(); when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient); when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1", "2", "3")); @@ -391,7 +280,7 @@ public void testWithMultiplePartitions() throws Exception { // Act final EventProcessorClient eventProcessorClient = new EventProcessorClient(eventHubClientBuilder, "test-consumer", - TestPartitionProcessor::new, checkpointStore, false, tracerProvider, ec -> { }); + TestPartitionProcessor::new, checkpointStore, false, tracerProvider, ec -> { }, new HashMap<>()); eventProcessorClient.start(); final boolean completed = count.await(10, TimeUnit.SECONDS); eventProcessorClient.stop(); @@ -420,21 +309,6 @@ private PartitionEvent getEvent(EventData event) { return new PartitionEvent(context, event, null); } - private static final class FaultyPartitionProcessor extends PartitionProcessor { - - boolean error; - - @Override - public void processError(ErrorContext errorContext) { - error = true; - } - - @Override - public void processEvent(EventContext partitionEvent) { - throw new IllegalStateException(); - } - } - private static final class TestPartitionProcessor extends PartitionProcessor { @Override diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancerTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancerTest.java index 6947ceccb41c..671ed8dd3b0f 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancerTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancerTest.java @@ -14,6 +14,7 @@ import com.azure.messaging.eventhubs.models.PartitionEvent; import com.azure.messaging.eventhubs.models.PartitionOwnership; import com.azure.messaging.eventhubs.models.ReceiveOptions; +import java.util.HashMap; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -312,7 +313,7 @@ public void testReceiveFailure() throws Exception { .thenReturn(Flux.error(new IllegalStateException())); PartitionPumpManager partitionPumpManager = new PartitionPumpManager(checkpointStore, - () -> partitionProcessor, eventHubClientBuilder, false, tracerProvider); + () -> partitionProcessor, eventHubClientBuilder, false, tracerProvider, new HashMap<>()); PartitionBasedLoadBalancer loadBalancer = new PartitionBasedLoadBalancer(checkpointStore, eventHubAsyncClient, fqNamespace, eventHubName, consumerGroupName, "owner", TimeUnit.SECONDS.toSeconds(5), partitionPumpManager, ec -> { @@ -337,7 +338,7 @@ public void testCheckpointStoreFailure() throws Exception { List partitionIds = Arrays.asList("1", "2", "3"); when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.fromIterable(partitionIds)); PartitionPumpManager partitionPumpManager = new PartitionPumpManager(checkpointStore, - () -> partitionProcessor, eventHubClientBuilder, false, tracerProvider); + () -> partitionProcessor, eventHubClientBuilder, false, tracerProvider, new HashMap<>()); PartitionBasedLoadBalancer loadBalancer = new PartitionBasedLoadBalancer(checkpointStore, eventHubAsyncClient, fqNamespace, eventHubName, consumerGroupName, "owner", TimeUnit.SECONDS.toSeconds(5), partitionPumpManager, ec -> { @@ -363,7 +364,7 @@ public void testEventHubClientFailure() { List partitionIds = new ArrayList<>(); when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.fromIterable(partitionIds)); PartitionPumpManager partitionPumpManager = new PartitionPumpManager(checkpointStore, - () -> partitionProcessor, eventHubClientBuilder, false, tracerProvider); + () -> partitionProcessor, eventHubClientBuilder, false, tracerProvider, new HashMap<>()); PartitionBasedLoadBalancer loadBalancer = new PartitionBasedLoadBalancer(checkpointStore, eventHubAsyncClient, fqNamespace, eventHubName, consumerGroupName, "owner", TimeUnit.SECONDS.toSeconds(5), partitionPumpManager, ec -> { @@ -460,7 +461,7 @@ public void processError(ErrorContext eventProcessingErrorContext) { eventProcessingErrorContext.getPartitionContext().getPartitionId(), eventProcessingErrorContext.getThrowable()); } - }, eventHubClientBuilder, false, tracerProvider); + }, eventHubClientBuilder, false, tracerProvider, new HashMap<>()); return new PartitionBasedLoadBalancer(checkpointStore, eventHubAsyncClient, fqNamespace, eventHubName, consumerGroupName, owner, TimeUnit.SECONDS.toSeconds(5), partitionPumpManager, ec -> { });