diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs index cff5e834ca28..9dadc7ba4232 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs @@ -441,7 +441,7 @@ public virtual async Task GetPartitionPropertiesAsync(strin /// /// A set of information about the state of publishing for a partition, as observed by the . This - /// data can always be read, but will only be populated with information relevant to the features which are active for the producer client. + /// data can always be read, but will only be populated with information relevant to the active features for the producer client. /// /// /// The unique identifier of a partition associated with the Event Hub. @@ -449,12 +449,27 @@ public virtual async Task GetPartitionPropertiesAsync(strin /// /// The set of information about the publishing state of the requested partition, within the context of this producer. /// + /// + /// The state of a partition is only understood by the after events have been published to that + /// partition; calling this method for a partition before events have been published to it will return an empty set of properties. + /// + /// public virtual async Task GetPartitionPublishingPropertiesAsync(string partitionId, CancellationToken cancellationToken = default) { Argument.AssertNotClosed(IsClosed, nameof(EventHubProducerClient)); Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId)); + // If the producer does not require stateful partitions, return an empty + // instance. + + if (!RequiresStatefulPartitions(Options)) + { + return PartitionPublishingProperties.Empty; + } + + // If the state has not yet been initialized, then do so now. + var partitionState = PartitionState.GetOrAdd(partitionId, new PartitionPublishingState(partitionId)); cancellationToken.ThrowIfCancellationRequested(); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs index 88537813d0e0..ae9dbd67c0c4 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs @@ -68,7 +68,12 @@ public class PartitionPublishingOptions /// The starting number that should be used for the automatic sequencing of events for the associated partition, when published by this producer. /// /// - /// The starting sequence number to associate with the partition; if null, the Event Hubs service will control the value. + /// + /// The starting sequence number to associate with the partition; if null, the Event Hubs service will control the value. + /// + /// The sequence number will be in the range of 0 - (inclusive) and will increase as events are published. + /// When more than events have been published, the sequence number will roll over to 0. + /// /// /// /// The starting sequence number is only recognized and relevant when certain features of the producer are enabled. For example, it is used by diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs index 2f605aee18cd..4d50d71e1bb8 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs @@ -11,6 +11,27 @@ namespace Azure.Messaging.EventHubs.Producer /// public class PartitionPublishingProperties { + /// An empty set of properties. + private static PartitionPublishingProperties s_emptyInstance; + + /// + /// Returns a set of properties that represents an empty set of properties + /// suitable for use when partitions are not inherently stateful. + /// + /// + internal static PartitionPublishingProperties Empty + { + get + { + // The race condition here is benign; because the resulting properties + // are not mutable, there is not impact to having the reference updated after + // initial creation. + + s_emptyInstance ??= new PartitionPublishingProperties(false, null, null, null); + return s_emptyInstance; + } + } + /// /// Indicates whether or not idempotent publishing is enabled for the producer and, by extension, the associated partition. /// @@ -36,6 +57,12 @@ public class PartitionPublishingProperties /// successfully. /// /// + /// + /// The sequence number will be in the range of 0 - (inclusive) and will + /// increase as events are published. When more than events have been published, + /// the sequence number will roll over to 0. + /// + /// public int? LastPublishedSequenceNumber { get; } /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingState.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingState.cs index da2b110cbaee..331471d8ee02 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingState.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingState.cs @@ -54,6 +54,12 @@ internal class PartitionPublishingState /// successfully. /// /// + /// + /// The sequence number will be in the range of 0 - (inclusive) and will + /// increase as events are published. When more than events have been published, + /// the sequence number will roll over to 0. + /// + /// public int? LastPublishedSequenceNumber { get; set; } /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs index 1fa7157dea60..51ce9ac7125e 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs @@ -440,7 +440,7 @@ public async Task ReadPartitionPublishingPropertiesAsyncInitializesPartitionStat /// /// [Test] - public async Task ReadPartitionPublishingPropertiesAsyncReturnsPartitionState() + public async Task ReadPartitionPublishingPropertiesAsyncReturnsPartitionStateWhenIdempotentPublishingEnabled() { var expectedPartition = "5"; var expectedProperties = new PartitionPublishingProperties(true, 123, 456, 798); @@ -479,6 +479,41 @@ public async Task ReadPartitionPublishingPropertiesAsyncReturnsPartitionState() "Partition state should not have been initialized twice."); } + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public async Task ReadPartitionPublishingPropertiesAsyncReturnsEmptyPartitionStateWhenIdempotentPublishingDisabled() + { + var expectedPartition = "5"; + var expectedProperties = PartitionPublishingProperties.Empty; + var mockTransport = new Mock(); + var connection = new MockConnection(() => mockTransport.Object); + + var producer = new EventHubProducerClient(connection, new EventHubProducerClientOptions + { + EnableIdempotentPartitions = false + }); + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + var readProperties = await producer.GetPartitionPublishingPropertiesAsync(expectedPartition, cancellationSource.Token); + + Assert.That(readProperties, Is.Not.Null, "The read properties should have been created."); + Assert.That(readProperties.ProducerGroupId, Is.EqualTo(expectedProperties.ProducerGroupId), "The producer group should match."); + Assert.That(readProperties.OwnerLevel, Is.EqualTo(expectedProperties.OwnerLevel), "The owner level should match."); + Assert.That(readProperties.LastPublishedSequenceNumber, Is.EqualTo(expectedProperties.LastPublishedSequenceNumber), "The sequence number should match."); + + mockTransport + .Verify(transportProducer => transportProducer.ReadInitializationPublishingPropertiesAsync( + It.IsAny()), + Times.Never, + "Partition state should not have been initialized."); + } + /// /// Verifies functionality of the /// method. @@ -724,6 +759,8 @@ public void SendIdempotentRequiresThePartition() EnableIdempotentPartitions = true }); + Assert.That(async () => await producer.SendAsync(events), Throws.InstanceOf(), "Idempotent publishing requires the send options."); + var sendOptions = new SendEventOptions(); Assert.That(async () => await producer.SendAsync(events, sendOptions), Throws.InstanceOf(), "Automatic routing cannot be used with idempotent publishing.");