Skip to content

Commit ff77085

Browse files
authored
[Event Hubs Client] Idempotent Publishing Tweaks (Azure#17906)
# Summary The focus of these changes is to allow querying partition publishing properties when idempotent publishing is not enabled, returning an empty set of data with the feature flags. Several areas of documentation were also enhanced to provide additional details and context. # Last Upstream Rebase Tuesday, December 12, 8:19am (EST)
1 parent 8864890 commit ff77085

File tree

5 files changed

+93
-3
lines changed

5 files changed

+93
-3
lines changed

sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,20 +441,35 @@ public virtual async Task<PartitionProperties> GetPartitionPropertiesAsync(strin
441441

442442
/// <summary>
443443
/// A set of information about the state of publishing for a partition, as observed by the <see cref="EventHubProducerClient" />. This
444-
/// data can always be read, but will only be populated with information relevant to the features which are active for the producer client.
444+
/// data can always be read, but will only be populated with information relevant to the active features for the producer client.
445445
/// </summary>
446446
///
447447
/// <param name="partitionId">The unique identifier of a partition associated with the Event Hub.</param>
448448
/// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request to cancel the operation.</param>
449449
///
450450
/// <returns>The set of information about the publishing state of the requested partition, within the context of this producer.</returns>
451451
///
452+
/// <remarks>
453+
/// The state of a partition is only understood by the <see cref="EventHubProducerClient" /> after events have been published to that
454+
/// partition; calling this method for a partition before events have been published to it will return an empty set of properties.
455+
/// </remarks>
456+
///
452457
public virtual async Task<PartitionPublishingProperties> GetPartitionPublishingPropertiesAsync(string partitionId,
453458
CancellationToken cancellationToken = default)
454459
{
455460
Argument.AssertNotClosed(IsClosed, nameof(EventHubProducerClient));
456461
Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId));
457462

463+
// If the producer does not require stateful partitions, return an empty
464+
// instance.
465+
466+
if (!RequiresStatefulPartitions(Options))
467+
{
468+
return PartitionPublishingProperties.Empty;
469+
}
470+
471+
// If the state has not yet been initialized, then do so now.
472+
458473
var partitionState = PartitionState.GetOrAdd(partitionId, new PartitionPublishingState(partitionId));
459474
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
460475

sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@ public class PartitionPublishingOptions
6868
/// The starting number that should be used for the automatic sequencing of events for the associated partition, when published by this producer.
6969
/// </summary>
7070
///
71-
/// <value>The starting sequence number to associate with the partition; if <c>null</c>, the Event Hubs service will control the value.</value>
71+
/// <value>
72+
/// <para>The starting sequence number to associate with the partition; if <c>null</c>, the Event Hubs service will control the value.</para>
73+
///
74+
/// <para>The sequence number will be in the range of <c>0</c> - <see cref="int.MaxValue"/> (inclusive) and will increase as events are published.
75+
/// When more than <see cref="int.MaxValue" /> events have been published, the sequence number will roll over to <c>0</c>.</para>
76+
/// </value>
7277
///
7378
/// <remarks>
7479
/// The starting sequence number is only recognized and relevant when certain features of the producer are enabled. For example, it is used by

sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,27 @@ namespace Azure.Messaging.EventHubs.Producer
1111
///
1212
public class PartitionPublishingProperties
1313
{
14+
/// <summary>An empty set of properties.</summary>
15+
private static PartitionPublishingProperties s_emptyInstance;
16+
17+
/// <summary>
18+
/// Returns a set of properties that represents an empty set of properties
19+
/// suitable for use when partitions are not inherently stateful.
20+
/// </summary>
21+
///
22+
internal static PartitionPublishingProperties Empty
23+
{
24+
get
25+
{
26+
// The race condition here is benign; because the resulting properties
27+
// are not mutable, there is not impact to having the reference updated after
28+
// initial creation.
29+
30+
s_emptyInstance ??= new PartitionPublishingProperties(false, null, null, null);
31+
return s_emptyInstance;
32+
}
33+
}
34+
1435
/// <summary>
1536
/// Indicates whether or not idempotent publishing is enabled for the producer and, by extension, the associated partition.
1637
/// </summary>
@@ -36,6 +57,12 @@ public class PartitionPublishingProperties
3657
/// successfully.
3758
/// </summary>
3859
///
60+
/// <value>
61+
/// The sequence number will be in the range of <c>0</c> - <see cref="int.MaxValue"/> (inclusive) and will
62+
/// increase as events are published. When more than <see cref="int.MaxValue" /> events have been published,
63+
/// the sequence number will roll over to <c>0</c>.
64+
/// </value>
65+
///
3966
public int? LastPublishedSequenceNumber { get; }
4067

4168
/// <summary>

sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingState.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ internal class PartitionPublishingState
5454
/// successfully.
5555
/// </summary>
5656
///
57+
/// <value>
58+
/// The sequence number will be in the range of <c>0</c> - <see cref="int.MaxValue"/> (inclusive) and will
59+
/// increase as events are published. When more than <see cref="int.MaxValue" /> events have been published,
60+
/// the sequence number will roll over to <c>0</c>.
61+
/// </value>
62+
///
5763
public int? LastPublishedSequenceNumber { get; set; }
5864

5965
/// <summary>

sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ public async Task ReadPartitionPublishingPropertiesAsyncInitializesPartitionStat
440440
/// </summary>
441441
///
442442
[Test]
443-
public async Task ReadPartitionPublishingPropertiesAsyncReturnsPartitionState()
443+
public async Task ReadPartitionPublishingPropertiesAsyncReturnsPartitionStateWhenIdempotentPublishingEnabled()
444444
{
445445
var expectedPartition = "5";
446446
var expectedProperties = new PartitionPublishingProperties(true, 123, 456, 798);
@@ -479,6 +479,41 @@ public async Task ReadPartitionPublishingPropertiesAsyncReturnsPartitionState()
479479
"Partition state should not have been initialized twice.");
480480
}
481481

482+
/// <summary>
483+
/// Verifies functionality of the <see cref="EventHubProducerClient.GetPartitionPublishingPropertiesAsync" />
484+
/// method.
485+
/// </summary>
486+
///
487+
[Test]
488+
public async Task ReadPartitionPublishingPropertiesAsyncReturnsEmptyPartitionStateWhenIdempotentPublishingDisabled()
489+
{
490+
var expectedPartition = "5";
491+
var expectedProperties = PartitionPublishingProperties.Empty;
492+
var mockTransport = new Mock<TransportProducer>();
493+
var connection = new MockConnection(() => mockTransport.Object);
494+
495+
var producer = new EventHubProducerClient(connection, new EventHubProducerClientOptions
496+
{
497+
EnableIdempotentPartitions = false
498+
});
499+
500+
using var cancellationSource = new CancellationTokenSource();
501+
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
502+
503+
var readProperties = await producer.GetPartitionPublishingPropertiesAsync(expectedPartition, cancellationSource.Token);
504+
505+
Assert.That(readProperties, Is.Not.Null, "The read properties should have been created.");
506+
Assert.That(readProperties.ProducerGroupId, Is.EqualTo(expectedProperties.ProducerGroupId), "The producer group should match.");
507+
Assert.That(readProperties.OwnerLevel, Is.EqualTo(expectedProperties.OwnerLevel), "The owner level should match.");
508+
Assert.That(readProperties.LastPublishedSequenceNumber, Is.EqualTo(expectedProperties.LastPublishedSequenceNumber), "The sequence number should match.");
509+
510+
mockTransport
511+
.Verify(transportProducer => transportProducer.ReadInitializationPublishingPropertiesAsync(
512+
It.IsAny<CancellationToken>()),
513+
Times.Never,
514+
"Partition state should not have been initialized.");
515+
}
516+
482517
/// <summary>
483518
/// Verifies functionality of the <see cref="EventHubProducerClient.SendAsync" />
484519
/// method.
@@ -724,6 +759,8 @@ public void SendIdempotentRequiresThePartition()
724759
EnableIdempotentPartitions = true
725760
});
726761

762+
Assert.That(async () => await producer.SendAsync(events), Throws.InstanceOf<InvalidOperationException>(), "Idempotent publishing requires the send options.");
763+
727764
var sendOptions = new SendEventOptions();
728765
Assert.That(async () => await producer.SendAsync(events, sendOptions), Throws.InstanceOf<InvalidOperationException>(), "Automatic routing cannot be used with idempotent publishing.");
729766

0 commit comments

Comments
 (0)