-
Notifications
You must be signed in to change notification settings - Fork 5.1k
Retrieve checkpoints one by one #18127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
pakrym
merged 9 commits into
Azure:master
from
pakrym:pakrym/Retreive-checkpoints-one-by-one
Jan 25, 2021
Merged
Changes from 2 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
2e447c1
Retreive checkpoints one by one
pakrym 5287378
export-api
pakrym bdb1ff4
fic lofic
pakrym 94f9e28
nits
pakrym 4d38238
doc comments
pakrym 1165a69
tests
pakrym 9cdb296
rev
pakrym e2ccca3
doc
pakrym ec02f5e
Apply suggestions from code review
pakrym File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -395,5 +395,70 @@ public virtual void InvalidCheckpointFound(string partitionId, | |
| WriteEvent(35, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Indicates that an attempt to retrieve a checkpoint has started. | ||
| /// </summary> | ||
| /// | ||
| /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param> | ||
| /// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param> | ||
| /// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param> | ||
| /// <param name="partitionId">The partition id the specific checkpoint is associated with.</param> | ||
| /// | ||
| [Event(36, Level = EventLevel.Informational, Message = "Starting to retrieve checkpoint for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'; PartitionId: '{3}'.")] | ||
| public void GetCheckpointStart(string fullyQualifiedNamespace, | ||
| string eventHubName, | ||
| string consumerGroup, | ||
| string partitionId) | ||
| { | ||
| if (IsEnabled()) | ||
| { | ||
| WriteEvent(36, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, partitionId ?? string.Empty); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Indicates that an attempt to retrieve a list of checkpoints has completed. | ||
| /// </summary> | ||
| /// | ||
| /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param> | ||
| /// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param> | ||
| /// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param> | ||
| /// <param name="partitionId">The partition id the specific checkpoint is associated with.</param> | ||
| /// | ||
| [Event(37, Level = EventLevel.Informational, Message = "Completed retrieving checkpoint for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'. PartitionId: '{3}'.")] | ||
| public virtual void GetCheckpointComplete(string fullyQualifiedNamespace, | ||
| string eventHubName, | ||
| string consumerGroup, | ||
| string partitionId) | ||
| { | ||
| if (IsEnabled()) | ||
| { | ||
| WriteEvent(37, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, partitionId); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Indicates that an unhandled exception was encountered while retrieving a list of checkpoints. | ||
|
||
| /// </summary> | ||
| /// | ||
| /// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param> | ||
| /// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param> | ||
| /// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param> | ||
| /// <param name="partitionId">The partition id the specific checkpoint is associated with.</param> | ||
| /// <param name="errorMessage">The message for the exception that occurred.</param> | ||
| /// | ||
| [Event(38, Level = EventLevel.Error, Message = "An exception occurred when retrieving checkpoint for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'; PartitionId: '{3}'; ErrorMessage: '{4}'.")] | ||
| public virtual void GetCheckpointError(string fullyQualifiedNamespace, | ||
| string eventHubName, | ||
| string consumerGroup, | ||
| string partitionId, | ||
| string errorMessage) | ||
| { | ||
| if (IsEnabled()) | ||
| { | ||
| WriteEvent(38, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, partitionId ?? string.Empty, errorMessage ?? string.Empty); | ||
| } | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -775,6 +775,43 @@ protected override async Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpo | |
| return ProcessCheckpointStartingPositions(checkpoints); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Returns a checkpoint for the Event Hub, consumer group, and partition ID associated with the | ||
| /// event processor instance, so that processing for a given partition can be properly initialized. | ||
| /// </summary> | ||
| /// | ||
| /// <param name="partitionId"></param> | ||
|
||
| /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the processing. This is most likely to occur when the processor is shutting down.</param> | ||
| /// | ||
| /// <returns>The checkpoint for the processor to take into account when initializing partition.</returns> | ||
| /// | ||
| /// <remarks> | ||
| /// Should a partition not have a corresponding checkpoint, the <see cref="EventProcessorOptions.DefaultStartingPosition" /> will | ||
| /// be used to initialize the partition for processing. | ||
| /// | ||
| /// In the event that a custom starting point is desired for a single partition, or each partition should start at a unique place, | ||
| /// it is recommended that this method express that intent by returning checkpoints for those partitions with the desired custom | ||
| /// starting location set. | ||
| /// </remarks> | ||
| /// | ||
| protected override async Task<EventProcessorCheckpoint> GetCheckpointAsync(string partitionId, CancellationToken cancellationToken) | ||
| { | ||
| var checkpoint = await StorageManager.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, cancellationToken).ConfigureAwait(false); | ||
|
|
||
| // If there was no initialization handler, no custom starting positions | ||
| // could have been specified. Return the checkpoint without further processing. | ||
|
|
||
| if (_partitionInitializingAsync == null) | ||
| { | ||
| return checkpoint; | ||
| } | ||
|
|
||
| // Process the checkpoints to inject mock checkpoints for partitions that | ||
| // specify a custom default and do not have an actual checkpoint. | ||
|
|
||
| return checkpoint ?? CreateCheckpointWithDefaultStartingPosition(partitionId); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Produces a list of the ownership assignments for partitions between each of the cooperating event processor | ||
| /// instances for a given Event Hub and consumer group pairing. This method is used when load balancing to allow | ||
|
|
@@ -1041,18 +1078,31 @@ private IEnumerable<EventProcessorCheckpoint> ProcessCheckpointStartingPositions | |
| { | ||
| if (!knownCheckpoints.Contains(partition)) | ||
| { | ||
| yield return new EventProcessorCheckpoint | ||
| { | ||
| FullyQualifiedNamespace = FullyQualifiedNamespace, | ||
| EventHubName = EventHubName, | ||
| ConsumerGroup = ConsumerGroup, | ||
| PartitionId = partition, | ||
| StartingPosition = PartitionStartingPositionDefaults.TryGetValue(partition, out EventPosition position) ? position : DefaultStartingPosition | ||
| }; | ||
| yield return CreateCheckpointWithDefaultStartingPosition(partition); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Creates a checkpoint with a default starting position set. | ||
| /// </summary> | ||
| /// | ||
| /// <param name="partitionId">The partition id.</param> | ||
| /// | ||
| /// <returns>Returns an artificial checkpoint for a provided partition with a starting position set to <see cref="DefaultStartingPosition"/>.</returns> | ||
| /// | ||
| private EventProcessorCheckpoint CreateCheckpointWithDefaultStartingPosition(string partitionId) | ||
| { | ||
| return new EventProcessorCheckpoint | ||
| { | ||
| FullyQualifiedNamespace = FullyQualifiedNamespace, | ||
| EventHubName = EventHubName, | ||
| ConsumerGroup = ConsumerGroup, | ||
| PartitionId = partitionId, | ||
| StartingPosition = PartitionStartingPositionDefaults.TryGetValue(partitionId, out EventPosition position) ? position : DefaultStartingPosition | ||
| }; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Invokes a specified action only if this <see cref="EventProcessorClient" /> instance is not running. | ||
| /// </summary> | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this a single checkpoint?