diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs index a5aa8f6c0c14..0fa02e957c60 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs @@ -21,6 +21,7 @@ public EventProcessorClient(Azure.Storage.Blobs.BlobContainerClient checkpointSt protected override Azure.Messaging.EventHubs.EventHubConnection CreateConnection() { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override bool Equals(object obj) { throw null; } + protected override System.Threading.Tasks.Task GetCheckpointAsync(string partitionId, System.Threading.CancellationToken cancellationToken) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override int GetHashCode() { throw null; } protected override System.Threading.Tasks.Task> ListCheckpointsAsync(System.Threading.CancellationToken cancellationToken) { throw null; } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobEventStoreEventSource.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobEventStoreEventSource.cs index 455c353c9dd6..49c1c9966aaf 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobEventStoreEventSource.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobEventStoreEventSource.cs @@ -395,5 +395,70 @@ public virtual void InvalidCheckpointFound(string partitionId, WriteEvent(35, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty); } } + + /// + /// Indicates that an attempt to retrieve a checkpoint has started. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// + [Event(36, Level = EventLevel.Informational, Message = "Starting to retrieve checkpoint for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'; PartitionId: '{3}'.")] + public virtual void GetCheckpointStart(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId) + { + if (IsEnabled()) + { + WriteEvent(36, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, partitionId ?? string.Empty); + } + } + + /// + /// Indicates that an attempt to retrieve a checkpoint has completed. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// + [Event(37, Level = EventLevel.Informational, Message = "Completed retrieving checkpoint for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'. PartitionId: '{3}'.")] + public virtual void GetCheckpointComplete(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId) + { + if (IsEnabled()) + { + WriteEvent(37, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, partitionId); + } + } + + /// + /// Indicates that an unhandled exception was encountered while retrieving a checkpoint. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// The message for the exception that occurred. + /// + [Event(38, Level = EventLevel.Error, Message = "An exception occurred when retrieving checkpoint for FullyQualifiedNamespace: '{0}'; EventHubName: '{1}'; ConsumerGroup: '{2}'; PartitionId: '{3}'; ErrorMessage: '{4}'.")] + public virtual void GetCheckpointError(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId, + string errorMessage) + { + if (IsEnabled()) + { + WriteEvent(38, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, partitionId ?? string.Empty, errorMessage ?? string.Empty); + } + } } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs index 557b5b02a61f..7994a152e391 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs @@ -775,6 +775,43 @@ protected override async Task> ListCheckpo return ProcessCheckpointStartingPositions(checkpoints); } + /// + /// Returns a checkpoint for the Event Hub, consumer group, and partition ID associated with the + /// event processor instance, so that processing for a given partition can be properly initialized. + /// + /// + /// The ID of the partition for which to retrieve the checkpoint. + /// A instance to signal the request to cancel the processing. This is most likely to occur when the processor is shutting down. + /// + /// The checkpoint for the processor to take into account when initializing partition. + /// + /// + /// Should a partition not have a corresponding checkpoint, the will + /// be used to initialize the partition for processing. + /// + /// In the event that a custom starting point is desired for a single partition, or each partition should start at a unique place, + /// it is recommended that this method express that intent by returning checkpoints for those partitions with the desired custom + /// starting location set. + /// + /// + protected override async Task GetCheckpointAsync(string partitionId, CancellationToken cancellationToken) + { + var checkpoint = await StorageManager.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, cancellationToken).ConfigureAwait(false); + + // If there was no initialization handler, no custom starting positions + // could have been specified. Return the checkpoint without further processing. + + if (_partitionInitializingAsync == null) + { + return checkpoint; + } + + // Process the checkpoints to inject mock checkpoints for partitions that + // specify a custom default and do not have an actual checkpoint. + + return checkpoint ?? CreateCheckpointWithDefaultStartingPosition(partitionId); + } + /// /// Produces a list of the ownership assignments for partitions between each of the cooperating event processor /// instances for a given Event Hub and consumer group pairing. This method is used when load balancing to allow @@ -1041,18 +1078,31 @@ private IEnumerable ProcessCheckpointStartingPositions { if (!knownCheckpoints.Contains(partition)) { - yield return new EventProcessorCheckpoint - { - FullyQualifiedNamespace = FullyQualifiedNamespace, - EventHubName = EventHubName, - ConsumerGroup = ConsumerGroup, - PartitionId = partition, - StartingPosition = PartitionStartingPositionDefaults.TryGetValue(partition, out EventPosition position) ? position : DefaultStartingPosition - }; + yield return CreateCheckpointWithDefaultStartingPosition(partition); } } } + /// + /// Creates a checkpoint with a default starting position set. + /// + /// + /// The partition id. + /// + /// Returns an artificial checkpoint for a provided partition with a starting position set to . + /// + private EventProcessorCheckpoint CreateCheckpointWithDefaultStartingPosition(string partitionId) + { + return new EventProcessorCheckpoint + { + FullyQualifiedNamespace = FullyQualifiedNamespace, + EventHubName = EventHubName, + ConsumerGroup = ConsumerGroup, + PartitionId = partitionId, + StartingPosition = PartitionStartingPositionDefaults.TryGetValue(partitionId, out EventPosition position) ? position : DefaultStartingPosition + }; + } + /// /// Invokes a specified action only if this instance is not running. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs index 8eaad10767d2..9056902721c5 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs @@ -224,5 +224,42 @@ partial void ClaimOwnershipStart(string partitionId, string fullyQualifiedNamesp /// partial void BlobsCheckpointStoreCreated(string typeName, string accountName, string containerName) => Logger.BlobsCheckpointStoreCreated(typeName, accountName, containerName); + + /// + /// Indicates that an attempt to retrieve a checkpoint has started. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// + partial void GetCheckpointStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId) => + Logger.GetCheckpointStart(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId); + + /// + /// Indicates that an attempt to retrieve a checkpoint has completed. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// + partial void GetCheckpointComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId) => + Logger.GetCheckpointComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId); + + /// + /// Indicates that an unhandled exception was encountered while retrieving a checkpoint. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// The message for the exception that occurred. + /// + partial void GetCheckpointError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, Exception exception) => + Logger.GetCheckpointError(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, exception.Message); } } \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs index 115c53bd05da..3c0f7d2e2cae 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs @@ -1732,6 +1732,383 @@ public void AlreadyCanceledTokenMakesUpdateCheckpointAsyncThrow() Assert.That(async () => await checkpointStore.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), cancellationSource.Token), Throws.InstanceOf()); } + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the appropriate events are emitted on success. + /// + /// + [Test] + public async Task GetCheckpointLogsStartAndComplete() + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot", + new Dictionary + { + {BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}, + {BlobMetadataKey.Offset, "0"} + }) + }; + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, + new BasicRetryPolicy(new EventHubsRetryOptions())); + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + + mockLog.Verify(m => m.GetCheckpointStart(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0")); + mockLog.Verify(m => m.GetCheckpointComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0")); + } + + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task GetCheckpointUsesOffsetAsTheStartingPositionWhenPresent() + { + var expectedOffset = 13; + var expectedStartingPosition = EventPosition.FromOffset(expectedOffset, false); + + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot", + new Dictionary + { + {BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}, + {BlobMetadataKey.Offset, expectedOffset.ToString()}, + {BlobMetadataKey.SequenceNumber, "7777"} + }) + }; + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, new BasicRetryPolicy(new EventHubsRetryOptions())); + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + + Assert.That(checkpoint, Is.Not.Null, "A set of checkpoint should have been returned."); + Assert.That(checkpoint.StartingPosition, Is.EqualTo(expectedStartingPosition)); + } + + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task GetCheckpointUsesSequenceNumberAsTheStartingPositionWhenNoOffsetIsPresent() + { + var expectedSequence = 133; + var expectedStartingPosition = EventPosition.FromSequenceNumber(expectedSequence, false); + + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot", + new Dictionary + { + {BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}, + {BlobMetadataKey.SequenceNumber, expectedSequence.ToString()} + }) + }; + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, new BasicRetryPolicy(new EventHubsRetryOptions())); + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + + Assert.That(checkpoint, Is.Not.Null, "A set of checkpoints should have been returned."); + Assert.That(checkpoint.StartingPosition, Is.EqualTo(expectedStartingPosition)); + } + + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task GetCheckpointConsidersDataInvalidWithNoOffsetOrSequenceNumber() + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot", + new Dictionary + { + {BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()} + }) + }; + + var mockLogger = new Mock(); + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, new BasicRetryPolicy(new EventHubsRetryOptions())); + + target.Logger = mockLogger.Object; + + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + + Assert.That(checkpoint, Is.Null, "No valid checkpoints should exist."); + + mockLogger.Verify(log => log.InvalidCheckpointFound("0", FullyQualifiedNamespace, EventHubName, ConsumerGroup)); + } + + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task GetCheckpointPreferredNewCheckpointOverLegacy() + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag), contentLength: 0), + "snapshot", + new Dictionary + { + {BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}, + {BlobMetadataKey.SequenceNumber, "960182"}, + {BlobMetadataKey.Offset, "14"} + }), + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/0", client => + { + client.Content = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386," + + "\"Offset\":\"13\"," + + "\"SequenceNumber\":960180" + + "}"); + }); + + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + + Assert.That(checkpoint, Is.Not.Null, "A single checkpoint should have been returned."); + Assert.That(checkpoint.StartingPosition, Is.EqualTo(EventPosition.FromOffset(14, false))); + Assert.That(checkpoint.PartitionId, Is.EqualTo("0")); + } + + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task GetCheckpointsUsesOffsetAsTheStartingPositionWhenPresentInLegacyCheckpoint() + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => + { + client.Content = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386," + + "\"Offset\":\"13\"," + + "\"SequenceNumber\":960180" + + "}"); + }); + + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + + Assert.That(checkpoint, Is.Not.Null, "A set of checkpoints should have been returned."); + Assert.That(checkpoint.StartingPosition, Is.EqualTo(EventPosition.FromOffset(13, false))); + Assert.That(checkpoint.PartitionId, Is.EqualTo("0")); + } + + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly. + /// + /// + [TestCase("null", "0")] + [TestCase("null", "78")] + [TestCase("\"\"", "0")] + [TestCase("\"\"", "78")] + public async Task GetCheckpointSkipsCheckpointsWhenOffsetIsNullOrEmptyInLegacyCheckpoint(string offset, string sequenceNumber) + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => + { + client.Content = Encoding.UTF8.GetBytes("{" + + "\"Offset\":" + offset + "," + + "\"SequenceNumber\":" + sequenceNumber + "," + + "\"PartitionId\":\"8\"," + + "\"Owner\":\"cc397fe0-6771-4eaa-a8df-1997efeb3c87\"," + + "\"Token\":\"ab00a395-4c39-4939-89d5-10c04b4553af\"," + + "\"Epoch\":3" + + "}"); + }); + + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + + Assert.That(checkpoint, Is.Null, "No valid checkpoints should exist."); + } + + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task GetCheckpointConsidersDataInvalidWithNoOffsetOrSequenceNumberLegacyCheckpoint() + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => + { + client.Content = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386}"); + }); + + var mockLogger = new Mock(); + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); + + target.Logger = mockLogger.Object; + + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + + Assert.That(checkpoint, Is.Null, "No valid checkpoints should exist."); + + mockLogger.Verify(log => log.InvalidCheckpointFound("0", FullyQualifiedNamespace, EventHubName, ConsumerGroup)); + } + + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly. + /// + /// + [TestCase("")] + [TestCase("{\"PartitionId\":\"0\",\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\",")] + [TestCase("\0\0\0")] + public async Task GetCheckpointConsidersDataInvalidWithLegacyCheckpointBlobContainingInvalidJson(string json) + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => + { + client.Content = Encoding.UTF8.GetBytes(json); + }); + + var mockLogger = new Mock(); + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), initializeWithLegacyCheckpoints: true); + + target.Logger = mockLogger.Object; + + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + + Assert.That(checkpoint, Is.Null, "No valid checkpoints should exist."); + + mockLogger.Verify(log => log.InvalidCheckpointFound("0", FullyQualifiedNamespace, EventHubName, ConsumerGroup)); + } + + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the appropriate events are emitted when errors occur. + /// + /// + [Test] + public void GetCheckpointLogsErrors() + { + var expectedException = new DllNotFoundException("BOOM!"); + var mockLog = new Mock(); + var mockContainerClient = new MockBlobContainerClient() { GetBlobsAsyncException = expectedException }; + var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + + target.Logger = mockLog.Object; + + mockContainerClient.AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/0", client => + { + client.GetPropertiesException = expectedException; + }); + + Assert.That(async () => await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()), Throws.Exception.EqualTo(expectedException)); + mockLog.Verify(m => m.GetCheckpointError(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", expectedException.Message)); + } + + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the appropriate events are emitted on success. + /// + /// + [Test] + public async Task GetCheckpointLogsInvalidCheckpoint() + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot", + new Dictionary {{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}}) + }; + var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, + new BasicRetryPolicy(new EventHubsRetryOptions())); + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + mockLog.Verify(m => m.InvalidCheckpointFound("0", FullyQualifiedNamespace, EventHubName, ConsumerGroup)); + } + + /// + /// Verifies basic functionality of GetCheckpointAsync and ensures the appropriate events are emitted on failure. + /// + /// + [Test] + public async Task GetCheckpointForMissingPartitionReturnsNull() + { + var ex = new RequestFailedException(0, "foo", BlobErrorCode.ContainerNotFound.ToString(), null); + + var target = new BlobsCheckpointStore(new MockBlobContainerClient(getBlobsAsyncException: ex), + new BasicRetryPolicy(new EventHubsRetryOptions())); + var mockLog = new Mock(); + target.Logger = mockLog.Object; + + var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); + Assert.That(checkpoint, Is.Null); + } + private class MockBlobContainerClient : BlobContainerClient { public override Uri Uri { get; } @@ -1766,7 +2143,21 @@ public override AsyncPageable GetBlobsAsync(BlobTraits traits = BlobTr public override BlobClient GetBlobClient(string blobName) { - return BlobClients[blobName]; + if (BlobClients.TryGetValue(blobName, out var client)) + { + return client; + } + + var blob = Blobs.SingleOrDefault(c => c.Name == blobName); + if (blob != null) + { + return new MockBlobClient(blobName) + { + Properties = BlobsModelFactory.BlobProperties(metadata: blob.Metadata) + }; + } + + return new MockBlobClient(blobName); } internal MockBlobContainerClient AddBlobClient(string name, Action configure) @@ -1783,8 +2174,10 @@ private class MockBlobClient : BlobClient public override string Name { get; } internal BlobInfo BlobInfo; + internal BlobProperties Properties; internal Exception UploadBlobException; internal Exception BlobClientSetMetadataException; + internal DllNotFoundException GetPropertiesException; internal byte[] Content; internal Action, BlobRequestConditions, IProgress, AccessTier?, StorageTransferOptions, CancellationToken> UploadAsyncCallback; internal Action, BlobRequestConditions, CancellationToken> SetMetadataAsyncCallback; @@ -1841,6 +2234,21 @@ public override async Task DownloadToAsync(Stream destination, Cancell await destination.WriteAsync(Content, 0, Content.Length, cancellationToken); return Mock.Of(); } + + public override Task> GetPropertiesAsync(BlobRequestConditions conditions = null, CancellationToken cancellationToken = new CancellationToken()) + { + if (GetPropertiesException != null) + { + throw GetPropertiesException; + } + + if (Properties == null) + { + throw new RequestFailedException(404, BlobErrorCode.BlobNotFound.ToString(), BlobErrorCode.BlobNotFound.ToString(), default); + } + + return Task.FromResult(Response.FromValue(Properties, Mock.Of())); + } } private class MockAsyncPageable : AsyncPageable diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs index 94a13fe70306..7be427791973 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs @@ -302,45 +302,16 @@ async Task> listCheckpointsAsync(CancellationToke await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: prefix, cancellationToken: listCheckpointsToken).ConfigureAwait(false)) { var partitionId = blob.Name.Substring(prefix.Length); - var startingPosition = default(EventPosition?); - var offset = default(long?); - var sequenceNumber = default(long?); + EventProcessorCheckpoint checkpoint = CreateCheckpoint(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, blob.Metadata); - if (blob.Metadata.TryGetValue(BlobMetadataKey.Offset, out var str) && long.TryParse(str, NumberStyles.Integer, CultureInfo.InvariantCulture, out var result)) + if (checkpoint != null) { - offset = result; - startingPosition = EventPosition.FromOffset(result, false); - } - else if (blob.Metadata.TryGetValue(BlobMetadataKey.SequenceNumber, out str) && long.TryParse(str, NumberStyles.Integer, CultureInfo.InvariantCulture, out result)) - { - sequenceNumber = result; - startingPosition = EventPosition.FromSequenceNumber(result, false); - } - - // If either the offset or the sequence number was not populated, - // this is not a valid checkpoint. - - if (startingPosition.HasValue) - { - checkpoints.Add(new BlobStorageCheckpoint - { - FullyQualifiedNamespace = fullyQualifiedNamespace, - EventHubName = eventHubName, - ConsumerGroup = consumerGroup, - PartitionId = partitionId, - StartingPosition = startingPosition.Value, - Offset = offset, - SequenceNumber = sequenceNumber - }); - } - else - { - InvalidCheckpointFound(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup); + checkpoints.Add(checkpoint); } } return checkpoints; - }; + } async Task> listLegacyCheckpointsAsync(List existingCheckpoints, CancellationToken listCheckpointsToken) { @@ -364,44 +335,11 @@ async Task> listLegacyCheckpointsAsync(List> listLegacyCheckpointsAsync(List + /// Retrieves a checkpoints in a data store for a given namespace, Event Hub, consumer group and partition ID. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// A instance to signal the request to cancel the operation. + /// + /// A initialized with checkpoint properties if the checkpoint exists, otherwise null. + /// + public override async Task GetCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, CancellationToken cancellationToken) + { + async Task getCheckpointAsync(CancellationToken listCheckpointsToken) + { + try + { + var blobName = string.Format(CultureInfo.InvariantCulture, CheckpointPrefix, fullyQualifiedNamespace.ToLowerInvariant(), eventHubName.ToLowerInvariant(), consumerGroup.ToLowerInvariant()) + partitionId; + var blob = await ContainerClient + .GetBlobClient(blobName) + .GetPropertiesAsync(cancellationToken: listCheckpointsToken).ConfigureAwait(false); + + var checkpoint = CreateCheckpoint(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, blob.Value.Metadata); + + return checkpoint; + } + catch (RequestFailedException e) when (e.Status == 404) + { + // ignore + } + + try + { + if (InitializeWithLegacyCheckpoints) + { + var legacyPrefix = string.Format(CultureInfo.InvariantCulture, LegacyCheckpointPrefix, fullyQualifiedNamespace, eventHubName, consumerGroup) + partitionId; + return await CreateLegacyCheckpoint(fullyQualifiedNamespace, eventHubName, consumerGroup, legacyPrefix, partitionId, listCheckpointsToken).ConfigureAwait(false); + } + } + catch (RequestFailedException e) when (e.Status == 404) + { + // ignore + } + + return null; + } + + try + { + GetCheckpointStart(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId); + return await ApplyRetryPolicy(getCheckpointAsync, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + GetCheckpointError(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, ex); + throw; + } + finally + { + GetCheckpointComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId); + } + } + + /// + /// Creates a checkpoint instance based on the blob metadata. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// The metadata of the blob that represents the checkpoint. + /// + /// A initialized with checkpoint properties if the checkpoint exists, otherwise null. + /// + private EventProcessorCheckpoint CreateCheckpoint(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId, + IDictionary metadata) + { + var startingPosition = default(EventPosition?); + var offset = default(long?); + var sequenceNumber = default(long?); + + if (metadata.TryGetValue(BlobMetadataKey.Offset, out var str) && long.TryParse(str, NumberStyles.Integer, CultureInfo.InvariantCulture, out var result)) + { + offset = result; + startingPosition = EventPosition.FromOffset(result, false); + } + else if (metadata.TryGetValue(BlobMetadataKey.SequenceNumber, out str) && long.TryParse(str, NumberStyles.Integer, CultureInfo.InvariantCulture, out result)) + { + sequenceNumber = result; + startingPosition = EventPosition.FromSequenceNumber(result, false); + } + + // If either the offset or the sequence number was not populated, + // this is not a valid checkpoint. + + if (!startingPosition.HasValue) + { + InvalidCheckpointFound(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup); + return null; + } + + return new BlobStorageCheckpoint() + { + FullyQualifiedNamespace = fullyQualifiedNamespace, + EventHubName = eventHubName, + ConsumerGroup = consumerGroup, + PartitionId = partitionId, + StartingPosition = startingPosition.Value, + Offset = offset, + SequenceNumber = sequenceNumber + }; + } + + /// + /// Creates a checkpoint instance based on the blob name for a legacy checkpoint format. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// The name of the blob that represents the checkpoint. + /// A instance to signal the request to cancel the operation. + /// + /// A initialized with checkpoint properties if the checkpoint exists, otherwise null. + /// + private async Task CreateLegacyCheckpoint(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string blobName, + string partitionId, + CancellationToken cancellationToken) + { + var startingPosition = default(EventPosition?); + + BlobBaseClient blobClient = ContainerClient.GetBlobClient(blobName); + using var memoryStream = new MemoryStream(); + await blobClient.DownloadToAsync(memoryStream, cancellationToken).ConfigureAwait(false); + + if (TryReadLegacyCheckpoint( + memoryStream.GetBuffer().AsSpan(0, (int) memoryStream.Length), + out long? offset, + out long? sequenceNumber)) + { + if (offset.HasValue) + { + startingPosition = EventPosition.FromOffset(offset.Value, false); + } + else + { + // Skip checkpoints without an offset without logging an error + return null; + } + } + + if (!startingPosition.HasValue) + { + InvalidCheckpointFound(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup); + + return null; + } + + return new BlobStorageCheckpoint + { + FullyQualifiedNamespace = fullyQualifiedNamespace, + EventHubName = eventHubName, + ConsumerGroup = consumerGroup, + PartitionId = partitionId, + StartingPosition = startingPosition.Value, + Offset = offset, + SequenceNumber = sequenceNumber + }; + } + /// /// Updates the checkpoint using the given information for the associated partition and consumer group in the storage blob service. /// @@ -718,6 +835,40 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception); + /// + /// Indicates that an attempt to retrieve a checkpoint has started. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// + partial void GetCheckpointStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId); + + /// + /// Indicates that an attempt to retrieve a checkpoint has completed. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// + partial void GetCheckpointComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId); + + /// + /// Indicates that an unhandled exception was encountered while retrieving a checkpoint. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoint are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the checkpoint is associated with. + /// The partition id the specific checkpoint is associated with. + /// The message for the exception that occurred. + /// + partial void GetCheckpointError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, Exception exception); + /// /// Indicates that invalid checkpoint data was found during an attempt to retrieve a list of checkpoints. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/StorageManager.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/StorageManager.cs index 45f1424a4848..b5f58b55a82d 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/StorageManager.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/StorageManager.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -58,6 +59,37 @@ public abstract Task> ListCheckpointsAsync string consumerGroup, CancellationToken cancellationToken); + /// + /// Retrieves a checkpoint information from the chosen storage service. The default implementation calls and selects a checkpoint by id. + /// + /// + /// The fully qualified Event Hubs namespace the ownership are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership are associated with. + /// The identifier of the partition to read a checkpoint for. + /// A instance to signal the request to cancel the operation. + /// + /// An instance if a checkpoint is found for a particular partition otherwise, null. + /// + public virtual async Task GetCheckpointAsync(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId, + CancellationToken cancellationToken) + { + var checkpoints = await ListCheckpointsAsync(fullyQualifiedNamespace, eventHubName, consumerGroup, cancellationToken).ConfigureAwait(false); + + foreach (EventProcessorCheckpoint checkpoint in checkpoints) + { + if (string.Equals(checkpoint.PartitionId, partitionId, StringComparison.Ordinal)) + { + return checkpoint; + } + } + + return null; + } + /// /// Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs b/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs index 9d41dd04626b..6fd957fdd302 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs @@ -355,6 +355,7 @@ protected EventProcessor(int eventBatchMaximumCount, string consumerGroup, strin protected internal virtual Azure.Messaging.EventHubs.EventHubConnection CreateConnection() { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override bool Equals(object obj) { throw null; } + protected virtual System.Threading.Tasks.Task GetCheckpointAsync(string partitionId, System.Threading.CancellationToken cancellationToken) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override int GetHashCode() { throw null; } protected abstract System.Threading.Tasks.Task> ListCheckpointsAsync(System.Threading.CancellationToken cancellationToken); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs index bf1d0276806f..7727f0e6e20e 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs @@ -764,6 +764,7 @@ async Task performProcessing() /// /// Produces a list of the available checkpoints for the Event Hub and consumer group associated with the /// event processor instance, so that processing for a given set of partitions can be properly initialized. + /// It's recommended that is implemented as well as to achieve an optimal performance. /// /// /// A instance to signal the request to cancel the processing. This is most likely to occur when the processor is shutting down. @@ -781,6 +782,41 @@ async Task performProcessing() /// protected abstract Task> ListCheckpointsAsync(CancellationToken cancellationToken); + /// + /// Returns a checkpoint for the Event Hub, consumer group, and identifier of the partition associated with the + /// event processor instance, so that processing for a given partition can be properly initialized. + /// The default implementation calls the and filters results by . + /// It's recommended that this method is overriden in implementations to achieve an optimal performance. + /// + /// + /// The identifier of the partition for which to retrieve the checkpoint. + /// A instance to signal the request to cancel the processing. This is most likely to occur when the processor is shutting down. + /// + /// The checkpoint for the processor to take into account when initializing partition. + /// + /// + /// Should a partition not have a corresponding checkpoint, the will + /// be used to initialize the partition for processing. + /// + /// In the event that a custom starting point is desired for a single partition, or each partition should start at a unique place, + /// it is recommended that this method express that intent by returning checkpoints for those partitions with the desired custom + /// starting location set. + /// + /// + protected virtual async Task GetCheckpointAsync(string partitionId, + CancellationToken cancellationToken) + { + foreach (var checkpoint in await ListCheckpointsAsync(cancellationToken).ConfigureAwait(false)) + { + if (checkpoint.PartitionId == partitionId) + { + return checkpoint; + } + } + + return null; + } + /// /// Produces a list of the ownership assignments for partitions between each of the cooperating event processor /// instances for a given Event Hub and consumer group pairing. This method is used when load balancing to allow @@ -1357,18 +1393,13 @@ private async Task TryStartProcessingPartitionAsync(string partitionId, cancellationToken.ThrowIfCancellationRequested(); operationDescription = Resources.OperationListCheckpoints; - var checkpoints = await ListCheckpointsAsync(cancellationToken).ConfigureAwait(false); - operationDescription = Resources.OperationClaimOwnership; - // Determine the starting position for processing the partition. - foreach (var checkpoint in checkpoints) + var checkpoint = await GetCheckpointAsync(partitionId, cancellationToken).ConfigureAwait(false); + operationDescription = Resources.OperationClaimOwnership; + if (checkpoint != null) { - if (checkpoint.PartitionId == partitionId) - { - startingPosition = checkpoint.StartingPosition; - break; - } + startingPosition = checkpoint.StartingPosition; } // Create and register the partition processor. Ownership of the cancellationSource is transferred @@ -1604,6 +1635,21 @@ public override async Task> ListCheckpoint public override Task UpdateCheckpointAsync(EventProcessorCheckpoint checkpoint, EventData eventData, CancellationToken cancellationToken) => throw new NotImplementedException(); + + /// + /// Retrieves a checkpoint information from the chosen storage service. The default implementation calls and selects a checkpoint by id. + /// + /// + /// The fully qualified Event Hubs namespace the ownership are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership are associated with. + /// The identifier of the partition to read a checkpoint for. + /// A instance to signal the request to cancel the operation. + /// + /// An instance if a checkpoint is found for a particular partition otherwise, null. + /// + public override async Task GetCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, CancellationToken cancellationToken) + => await Processor.GetCheckpointAsync(partitionId, cancellationToken).ConfigureAwait(false); } /// diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs index 81bed083b312..3c184d4b960c 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs @@ -67,6 +67,11 @@ protected override async Task> Lis return await _checkpointStore.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, cancellationToken).ConfigureAwait(false); } + protected override async Task GetCheckpointAsync(string partitionId, CancellationToken cancellationToken) + { + return await _checkpointStore.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, cancellationToken).ConfigureAwait(false); + } + internal virtual async Task CheckpointAsync(string partitionId, EventData checkpointEvent, CancellationToken cancellationToken = default) { await _checkpointStore.UpdateCheckpointAsync(new EventProcessorCheckpoint()