Skip to content

Commit a70eb1a

Browse files
pakrymjsquire
andauthored
Retrieve checkpoints one by one (Azure#18127)
Co-authored-by: Jesse Squire <[email protected]>
1 parent 67f0787 commit a70eb1a

File tree

10 files changed

+883
-87
lines changed

10 files changed

+883
-87
lines changed

sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public EventProcessorClient(Azure.Storage.Blobs.BlobContainerClient checkpointSt
2121
protected override Azure.Messaging.EventHubs.EventHubConnection CreateConnection() { throw null; }
2222
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
2323
public override bool Equals(object obj) { throw null; }
24+
protected override System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint> GetCheckpointAsync(string partitionId, System.Threading.CancellationToken cancellationToken) { throw null; }
2425
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
2526
public override int GetHashCode() { throw null; }
2627
protected override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint>> ListCheckpointsAsync(System.Threading.CancellationToken cancellationToken) { throw null; }

sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobEventStoreEventSource.cs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,5 +395,70 @@ public virtual void InvalidCheckpointFound(string partitionId,
395395
WriteEvent(35, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty);
396396
}
397397
}
398+
399+
/// <summary>
400+
/// Indicates that an attempt to retrieve a checkpoint has started.
401+
/// </summary>
402+
///
403+
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
404+
/// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param>
405+
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
406+
/// <param name="partitionId">The partition id the specific checkpoint is associated with.</param>
407+
///
408+
[Event(36, Level = EventLevel.Informational, Message = "Starting to retrieve checkpoint for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'; PartitionId: '{3}'.")]
409+
public virtual void GetCheckpointStart(string fullyQualifiedNamespace,
410+
string eventHubName,
411+
string consumerGroup,
412+
string partitionId)
413+
{
414+
if (IsEnabled())
415+
{
416+
WriteEvent(36, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, partitionId ?? string.Empty);
417+
}
418+
}
419+
420+
/// <summary>
421+
/// Indicates that an attempt to retrieve a checkpoint has completed.
422+
/// </summary>
423+
///
424+
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
425+
/// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param>
426+
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
427+
/// <param name="partitionId">The partition id the specific checkpoint is associated with.</param>
428+
///
429+
[Event(37, Level = EventLevel.Informational, Message = "Completed retrieving checkpoint for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'. PartitionId: '{3}'.")]
430+
public virtual void GetCheckpointComplete(string fullyQualifiedNamespace,
431+
string eventHubName,
432+
string consumerGroup,
433+
string partitionId)
434+
{
435+
if (IsEnabled())
436+
{
437+
WriteEvent(37, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, partitionId);
438+
}
439+
}
440+
441+
/// <summary>
442+
/// Indicates that an unhandled exception was encountered while retrieving a checkpoint.
443+
/// </summary>
444+
///
445+
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
446+
/// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param>
447+
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
448+
/// <param name="partitionId">The partition id the specific checkpoint is associated with.</param>
449+
/// <param name="errorMessage">The message for the exception that occurred.</param>
450+
///
451+
[Event(38, Level = EventLevel.Error, Message = "An exception occurred when retrieving checkpoint for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'; PartitionId: '{3}'; ErrorMessage: '{4}'.")]
452+
public virtual void GetCheckpointError(string fullyQualifiedNamespace,
453+
string eventHubName,
454+
string consumerGroup,
455+
string partitionId,
456+
string errorMessage)
457+
{
458+
if (IsEnabled())
459+
{
460+
WriteEvent(38, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, partitionId ?? string.Empty, errorMessage ?? string.Empty);
461+
}
462+
}
398463
}
399464
}

sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,43 @@ protected override async Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpo
775775
return ProcessCheckpointStartingPositions(checkpoints);
776776
}
777777

778+
/// <summary>
779+
/// Returns a checkpoint for the Event Hub, consumer group, and partition ID associated with the
780+
/// event processor instance, so that processing for a given partition can be properly initialized.
781+
/// </summary>
782+
///
783+
/// <param name="partitionId">The ID of the partition for which to retrieve the checkpoint.</param>
784+
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the processing. This is most likely to occur when the processor is shutting down.</param>
785+
///
786+
/// <returns>The checkpoint for the processor to take into account when initializing partition.</returns>
787+
///
788+
/// <remarks>
789+
/// Should a partition not have a corresponding checkpoint, the <see cref="EventProcessorOptions.DefaultStartingPosition" /> will
790+
/// be used to initialize the partition for processing.
791+
///
792+
/// In the event that a custom starting point is desired for a single partition, or each partition should start at a unique place,
793+
/// it is recommended that this method express that intent by returning checkpoints for those partitions with the desired custom
794+
/// starting location set.
795+
/// </remarks>
796+
///
797+
protected override async Task<EventProcessorCheckpoint> GetCheckpointAsync(string partitionId, CancellationToken cancellationToken)
798+
{
799+
var checkpoint = await StorageManager.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, cancellationToken).ConfigureAwait(false);
800+
801+
// If there was no initialization handler, no custom starting positions
802+
// could have been specified. Return the checkpoint without further processing.
803+
804+
if (_partitionInitializingAsync == null)
805+
{
806+
return checkpoint;
807+
}
808+
809+
// Process the checkpoints to inject mock checkpoints for partitions that
810+
// specify a custom default and do not have an actual checkpoint.
811+
812+
return checkpoint ?? CreateCheckpointWithDefaultStartingPosition(partitionId);
813+
}
814+
778815
/// <summary>
779816
/// Produces a list of the ownership assignments for partitions between each of the cooperating event processor
780817
/// instances for a given Event Hub and consumer group pairing. This method is used when load balancing to allow
@@ -1041,18 +1078,31 @@ private IEnumerable<EventProcessorCheckpoint> ProcessCheckpointStartingPositions
10411078
{
10421079
if (!knownCheckpoints.Contains(partition))
10431080
{
1044-
yield return new EventProcessorCheckpoint
1045-
{
1046-
FullyQualifiedNamespace = FullyQualifiedNamespace,
1047-
EventHubName = EventHubName,
1048-
ConsumerGroup = ConsumerGroup,
1049-
PartitionId = partition,
1050-
StartingPosition = PartitionStartingPositionDefaults.TryGetValue(partition, out EventPosition position) ? position : DefaultStartingPosition
1051-
};
1081+
yield return CreateCheckpointWithDefaultStartingPosition(partition);
10521082
}
10531083
}
10541084
}
10551085

1086+
/// <summary>
1087+
/// Creates a checkpoint with a default starting position set.
1088+
/// </summary>
1089+
///
1090+
/// <param name="partitionId">The partition id.</param>
1091+
///
1092+
/// <returns>Returns an artificial checkpoint for a provided partition with a starting position set to <see cref="DefaultStartingPosition"/>.</returns>
1093+
///
1094+
private EventProcessorCheckpoint CreateCheckpointWithDefaultStartingPosition(string partitionId)
1095+
{
1096+
return new EventProcessorCheckpoint
1097+
{
1098+
FullyQualifiedNamespace = FullyQualifiedNamespace,
1099+
EventHubName = EventHubName,
1100+
ConsumerGroup = ConsumerGroup,
1101+
PartitionId = partitionId,
1102+
StartingPosition = PartitionStartingPositionDefaults.TryGetValue(partitionId, out EventPosition position) ? position : DefaultStartingPosition
1103+
};
1104+
}
1105+
10561106
/// <summary>
10571107
/// Invokes a specified action only if this <see cref="EventProcessorClient" /> instance is not running.
10581108
/// </summary>

sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,5 +224,42 @@ partial void ClaimOwnershipStart(string partitionId, string fullyQualifiedNamesp
224224
///
225225
partial void BlobsCheckpointStoreCreated(string typeName, string accountName, string containerName) =>
226226
Logger.BlobsCheckpointStoreCreated(typeName, accountName, containerName);
227+
228+
/// <summary>
229+
/// Indicates that an attempt to retrieve a checkpoint has started.
230+
/// </summary>
231+
///
232+
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
233+
/// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param>
234+
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
235+
/// <param name="partitionId">The partition id the specific checkpoint is associated with.</param>
236+
///
237+
partial void GetCheckpointStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId) =>
238+
Logger.GetCheckpointStart(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId);
239+
240+
/// <summary>
241+
/// Indicates that an attempt to retrieve a checkpoint has completed.
242+
/// </summary>
243+
///
244+
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
245+
/// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param>
246+
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
247+
/// <param name="partitionId">The partition id the specific checkpoint is associated with.</param>
248+
///
249+
partial void GetCheckpointComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId) =>
250+
Logger.GetCheckpointComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId);
251+
252+
/// <summary>
253+
/// Indicates that an unhandled exception was encountered while retrieving a checkpoint.
254+
/// </summary>
255+
///
256+
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
257+
/// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param>
258+
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
259+
/// <param name="partitionId">The partition id the specific checkpoint is associated with.</param>
260+
/// <param name="exception">The message for the exception that occurred.</param>
261+
///
262+
partial void GetCheckpointError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, Exception exception) =>
263+
Logger.GetCheckpointError(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, exception.Message);
227264
}
228265
}

0 commit comments

Comments
 (0)