Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public EventProcessorClient(Azure.Storage.Blobs.BlobContainerClient checkpointSt
protected override Azure.Messaging.EventHubs.EventHubConnection CreateConnection() { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
protected override System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint> GetCheckpointAsync(string partitionId, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override int GetHashCode() { throw null; }
protected override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint>> ListCheckpointsAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,5 +395,70 @@ public virtual void InvalidCheckpointFound(string partitionId,
WriteEvent(35, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty);
}
}

/// <summary>
/// Indicates that an attempt to retrieve a checkpoint has started.
/// </summary>
///
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="partitionId">The partition id the specific checkpoint is associated with.</param>
///
[Event(36, Level = EventLevel.Informational, Message = "Starting to retrieve checkpoint for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'; PartitionId: '{3}'.")]
public virtual void GetCheckpointStart(string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string partitionId)
{
if (IsEnabled())
{
WriteEvent(36, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, partitionId ?? string.Empty);
}
}

/// <summary>
/// Indicates that an attempt to retrieve a checkpoint has completed.
/// </summary>
///
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="partitionId">The partition id the specific checkpoint is associated with.</param>
///
[Event(37, Level = EventLevel.Informational, Message = "Completed retrieving checkpoint for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'. PartitionId: '{3}'.")]
public virtual void GetCheckpointComplete(string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string partitionId)
{
if (IsEnabled())
{
WriteEvent(37, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, partitionId);
}
}

/// <summary>
/// Indicates that an unhandled exception was encountered while retrieving a checkpoint.
/// </summary>
///
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="partitionId">The partition id the specific checkpoint is associated with.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
///
[Event(38, Level = EventLevel.Error, Message = "An exception occurred when retrieving checkpoint for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'; PartitionId: '{3}'; ErrorMessage: '{4}'.")]
public virtual void GetCheckpointError(string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string partitionId,
string errorMessage)
{
if (IsEnabled())
{
WriteEvent(38, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, partitionId ?? string.Empty, errorMessage ?? string.Empty);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,43 @@ protected override async Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpo
return ProcessCheckpointStartingPositions(checkpoints);
}

/// <summary>
/// Returns a checkpoint for the Event Hub, consumer group, and partition ID associated with the
/// event processor instance, so that processing for a given partition can be properly initialized.
/// </summary>
///
/// <param name="partitionId">The ID of the partition for which to retrieve the checkpoint.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the processing. This is most likely to occur when the processor is shutting down.</param>
///
/// <returns>The checkpoint for the processor to take into account when initializing partition.</returns>
///
/// <remarks>
/// Should a partition not have a corresponding checkpoint, the <see cref="EventProcessorOptions.DefaultStartingPosition" /> will
/// be used to initialize the partition for processing.
///
/// In the event that a custom starting point is desired for a single partition, or each partition should start at a unique place,
/// it is recommended that this method express that intent by returning checkpoints for those partitions with the desired custom
/// starting location set.
/// </remarks>
///
protected override async Task<EventProcessorCheckpoint> GetCheckpointAsync(string partitionId, CancellationToken cancellationToken)
{
var checkpoint = await StorageManager.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, cancellationToken).ConfigureAwait(false);

// If there was no initialization handler, no custom starting positions
// could have been specified. Return the checkpoint without further processing.

if (_partitionInitializingAsync == null)
{
return checkpoint;
}

// Process the checkpoints to inject mock checkpoints for partitions that
// specify a custom default and do not have an actual checkpoint.

return checkpoint ?? CreateCheckpointWithDefaultStartingPosition(partitionId);
}

/// <summary>
/// Produces a list of the ownership assignments for partitions between each of the cooperating event processor
/// instances for a given Event Hub and consumer group pairing. This method is used when load balancing to allow
Expand Down Expand Up @@ -1041,18 +1078,31 @@ private IEnumerable<EventProcessorCheckpoint> ProcessCheckpointStartingPositions
{
if (!knownCheckpoints.Contains(partition))
{
yield return new EventProcessorCheckpoint
{
FullyQualifiedNamespace = FullyQualifiedNamespace,
EventHubName = EventHubName,
ConsumerGroup = ConsumerGroup,
PartitionId = partition,
StartingPosition = PartitionStartingPositionDefaults.TryGetValue(partition, out EventPosition position) ? position : DefaultStartingPosition
};
yield return CreateCheckpointWithDefaultStartingPosition(partition);
}
}
}

/// <summary>
/// Creates a checkpoint with a default starting position set.
/// </summary>
///
/// <param name="partitionId">The partition id.</param>
///
/// <returns>Returns an artificial checkpoint for a provided partition with a starting position set to <see cref="DefaultStartingPosition"/>.</returns>
///
private EventProcessorCheckpoint CreateCheckpointWithDefaultStartingPosition(string partitionId)
{
return new EventProcessorCheckpoint
{
FullyQualifiedNamespace = FullyQualifiedNamespace,
EventHubName = EventHubName,
ConsumerGroup = ConsumerGroup,
PartitionId = partitionId,
StartingPosition = PartitionStartingPositionDefaults.TryGetValue(partitionId, out EventPosition position) ? position : DefaultStartingPosition
};
}

/// <summary>
/// Invokes a specified action only if this <see cref="EventProcessorClient" /> instance is not running.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,5 +224,42 @@ partial void ClaimOwnershipStart(string partitionId, string fullyQualifiedNamesp
///
partial void BlobsCheckpointStoreCreated(string typeName, string accountName, string containerName) =>
Logger.BlobsCheckpointStoreCreated(typeName, accountName, containerName);

/// <summary>
/// Indicates that an attempt to retrieve a checkpoint has started.
/// </summary>
///
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="partitionId">The partition id the specific checkpoint is associated with.</param>
///
partial void GetCheckpointStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId) =>
Logger.GetCheckpointStart(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId);

/// <summary>
/// Indicates that an attempt to retrieve a checkpoint has completed.
/// </summary>
///
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="partitionId">The partition id the specific checkpoint is associated with.</param>
///
partial void GetCheckpointComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId) =>
Logger.GetCheckpointComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId);

/// <summary>
/// Indicates that an unhandled exception was encountered while retrieving a checkpoint.
/// </summary>
///
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="partitionId">The partition id the specific checkpoint is associated with.</param>
/// <param name="exception">The message for the exception that occurred.</param>
///
partial void GetCheckpointError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, Exception exception) =>
Logger.GetCheckpointError(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, exception.Message);
}
}
Loading