Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -40,7 +42,7 @@ public class EventProcessorClient {
private final ClientLogger logger = new ClientLogger(EventProcessorClient.class);

private final String identifier;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean isRunning = new AtomicBoolean(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the no arg constructor is false. new AtomicBoolean()

private final PartitionPumpManager partitionPumpManager;
private final PartitionBasedLoadBalancer partitionBasedLoadBalancer;
private final CheckpointStore checkpointStore;
Expand All @@ -55,13 +57,16 @@ public class EventProcessorClient {
* @param consumerGroup The consumer group name used in this event processor to consumer events.
* @param partitionProcessorFactory The factory to create new partition processor(s).
* @param checkpointStore The store used for reading and updating partition ownership and checkpoints. information.
* @param trackLastEnqueuedEventProperties If set to {@code true}, all events received by this
* EventProcessorClient will also include the last enqueued event properties for it's respective partitions.
* @param trackLastEnqueuedEventProperties If set to {@code true}, all events received by this EventProcessorClient
* will also include the last enqueued event properties for it's respective partitions.
* @param tracerProvider The tracer implementation.
* @param processError Error handler for any errors that occur outside the context of a partition.
* @param initialPartitionEventPosition Map of initial event positions for partition ids.
*/
EventProcessorClient(EventHubClientBuilder eventHubClientBuilder, String consumerGroup,
Supplier<PartitionProcessor> partitionProcessorFactory, CheckpointStore checkpointStore,
boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider, Consumer<ErrorContext> processError) {
Supplier<PartitionProcessor> partitionProcessorFactory, CheckpointStore checkpointStore,
boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider, Consumer<ErrorContext> processError,
Map<String, EventPosition> initialPartitionEventPosition) {

Objects.requireNonNull(eventHubClientBuilder, "eventHubClientBuilder cannot be null.");
Objects.requireNonNull(consumerGroup, "consumerGroup cannot be null.");
Expand All @@ -71,7 +76,7 @@ public class EventProcessorClient {
this.identifier = UUID.randomUUID().toString();
logger.info("The instance ID for this event processors is {}", this.identifier);
this.partitionPumpManager = new PartitionPumpManager(checkpointStore, partitionProcessorFactory,
eventHubClientBuilder, trackLastEnqueuedEventProperties, tracerProvider);
eventHubClientBuilder, trackLastEnqueuedEventProperties, tracerProvider, initialPartitionEventPosition);
EventHubAsyncClient eventHubAsyncClient = eventHubClientBuilder.buildAsyncClient();
this.partitionBasedLoadBalancer =
new PartitionBasedLoadBalancer(this.checkpointStore, eventHubAsyncClient,
Expand Down Expand Up @@ -103,7 +108,7 @@ public String getIdentifier() {
* {@codesnippet com.azure.messaging.eventhubs.eventprocessorclient.startstop}
*/
public synchronized void start() {
if (!started.compareAndSet(false, true)) {
if (!isRunning.compareAndSet(false, true)) {
logger.info("Event processor is already running");
return;
}
Expand All @@ -127,12 +132,22 @@ public synchronized void start() {
* {@codesnippet com.azure.messaging.eventhubs.eventprocessorclient.startstop}
*/
public synchronized void stop() {
if (!started.compareAndSet(true, false)) {
if (!isRunning.compareAndSet(true, false)) {
logger.info("Event processor has already stopped");
return;
}
runner.get().dispose();
scheduler.get().dispose();
this.partitionPumpManager.stopAllPartitionPumps();
}

/**
* Returns {@code true} if the event processor is running. If the event processor is already running, calling
* {@link #start()} has no effect.
*
* @return {@code true} if the event processor is running.
*/
public synchronized boolean isRunning() {
return isRunning.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.InitializationContext;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.function.Consumer;
Expand Down Expand Up @@ -73,6 +75,7 @@ public class EventProcessorClientBuilder {
private Consumer<InitializationContext> processPartitionInitialization;
private Consumer<CloseContext> processPartitionClose;
private boolean trackLastEnqueuedEventProperties;
private Map<String, EventPosition> initialPartitionEventPosition = new HashMap<>();

/**
* Creates a new instance of {@link EventProcessorClientBuilder}.
Expand Down Expand Up @@ -289,14 +292,30 @@ public EventProcessorClientBuilder processPartitionClose(Consumer<CloseContext>
*
* @param trackLastEnqueuedEventProperties {@code true} if the resulting events will keep track of the last
* enqueued information for that partition; {@code false} otherwise.
*
* @return The updated {@link EventProcessorClientBuilder} instance.
*/
public EventProcessorClientBuilder trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties) {
this.trackLastEnqueuedEventProperties = trackLastEnqueuedEventProperties;
return this;
}

/**
* Sets the map containing the event position to use for each partition if a checkpoint for the partition does not
* exist in {@link CheckpointStore}. This map is keyed off of the partition id. If there is no checkpoint in
* {@link CheckpointStore} and there is no entry in this map, the processing of the partition will start from
* {@link EventPosition#latest() latest} position.
*
* @param initialPartitionEventPosition Map of initial event positions for partition ids.
* @return The updated {@link EventProcessorClientBuilder} instance.
*/
public EventProcessorClientBuilder initialPartitionEventPosition(
Map<String, EventPosition> initialPartitionEventPosition) {

this.initialPartitionEventPosition = Objects.requireNonNull(initialPartitionEventPosition,
"'initialPartitionEventPosition' cannot be null.");
return this;
}

/**
* This will create a new {@link EventProcessorClient} configured with the options set in this builder. Each call to
* this method will return a new instance of {@link EventProcessorClient}.
Expand All @@ -322,7 +341,7 @@ public EventProcessorClient buildEventProcessorClient() {
final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class));
return new EventProcessorClient(eventHubClientBuilder, this.consumerGroup,
getPartitionProcessorSupplier(), checkpointStore, trackLastEnqueuedEventProperties, tracerProvider,
processError);
processError, initialPartitionEventPosition);
}

private Supplier<PartitionProcessor> getPartitionProcessorSupplier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,11 @@ private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, Lis
// If the partitions are evenly distributed among all active event processors, no change required.
logger.info("Load is balanced");
// renew ownership of already owned partitions
checkpointStore.claimOwnership(ownerPartitionMap.get(this.ownerId)).subscribe();
checkpointStore.claimOwnership(partitionPumpManager.getPartitionPumps().keySet()
.stream()
.map(partitionId -> createPartitionOwnershipRequest(partitionOwnershipMap, partitionId))
.collect(Collectors.toList()))
.subscribe();
return;
}

Expand All @@ -216,7 +220,11 @@ private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, Lis
logger.info("This event processor owns {} partitions and shouldn't own more",
ownerPartitionMap.get(ownerId).size());
// renew ownership of already owned partitions
checkpointStore.claimOwnership(ownerPartitionMap.get(this.ownerId)).subscribe();
checkpointStore.claimOwnership(partitionPumpManager.getPartitionPumps().keySet()
.stream()
.map(partitionId -> createPartitionOwnershipRequest(partitionOwnershipMap, partitionId))
.collect(Collectors.toList()))
.subscribe();
return;
}

Expand Down Expand Up @@ -345,11 +353,16 @@ private void claimOwnership(final Map<String, PartitionOwnership> partitionOwner
PartitionOwnership ownershipRequest = createPartitionOwnershipRequest(partitionOwnershipMap,
partitionIdToClaim);

List<PartitionOwnership> currentPartitionsOwned = ownerPartitionsMap.get(ownerId);
currentPartitionsOwned.add(ownershipRequest);
List<PartitionOwnership> partitionsToClaim = new ArrayList<>();
partitionsToClaim.add(ownershipRequest);
partitionsToClaim.addAll(partitionPumpManager.getPartitionPumps()
.keySet()
.stream()
.map(partitionId -> createPartitionOwnershipRequest(partitionOwnershipMap, partitionId))
.collect(Collectors.toList()));

checkpointStore
.claimOwnership(currentPartitionsOwned)
.claimOwnership(partitionsToClaim)
.timeout(Duration.ofMinutes(1)) // TODO: configurable
.doOnNext(partitionOwnership -> logger.info("Successfully claimed ownership of partition {}",
partitionOwnership.getPartitionId()))
Expand Down
Loading