Skip to content

Commit 9caa2a0

Browse files
authored
[Event Hubs Client] Idempotent Producer Client (#15125)
The focus of these changes is to enable the `EventDataBatch` to reserve appropriate space for publishing sequence numbers, when required by the active producer configuration and to complete the client API surface by implementing the `ReadPartitionPublishingProperties` method.
1 parent ec88e46 commit 9caa2a0

17 files changed

+531
-92
lines changed

sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,9 +464,10 @@ public EventHubProducerClient(string connectionString, string eventHubName, Azur
464464
public override int GetHashCode() { throw null; }
465465
public virtual System.Threading.Tasks.Task<string[]> GetPartitionIdsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
466466
public virtual System.Threading.Tasks.Task<Azure.Messaging.EventHubs.PartitionProperties> GetPartitionPropertiesAsync(string partitionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
467+
public virtual System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Producer.PartitionPublishingProperties> ReadPartitionPublishingPropertiesAsync(string partitionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
467468
public virtual System.Threading.Tasks.Task SendAsync(Azure.Messaging.EventHubs.Producer.EventDataBatch eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
468469
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventSet, Azure.Messaging.EventHubs.Producer.SendEventOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
469-
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
470+
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventSet, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
470471
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
471472
public override string ToString() { throw null; }
472473
}
@@ -490,6 +491,12 @@ public PartitionPublishingOptions() { }
490491
public short? OwnerLevel { get { throw null; } set { } }
491492
public long? ProducerGroupId { get { throw null; } set { } }
492493
public int? StartingSequenceNumber { get { throw null; } set { } }
494+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
495+
public override bool Equals(object obj) { throw null; }
496+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
497+
public override int GetHashCode() { throw null; }
498+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
499+
public override string ToString() { throw null; }
493500
}
494501
public partial class PartitionPublishingProperties
495502
{
@@ -498,6 +505,12 @@ protected internal PartitionPublishingProperties(bool isIdempotentPublishingEnab
498505
public int? LastPublishedSequenceNumber { get { throw null; } }
499506
public short? OwnerLevel { get { throw null; } }
500507
public long? ProducerGroupId { get { throw null; } }
508+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
509+
public override bool Equals(object obj) { throw null; }
510+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
511+
public override int GetHashCode() { throw null; }
512+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
513+
public override string ToString() { throw null; }
501514
}
502515
public partial class SendEventOptions
503516
{

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventBatch.cs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,17 @@ internal class AmqpEventBatch : TransportEventBatch
5252
public override long SizeInBytes => _sizeBytes;
5353

5454
/// <summary>
55-
/// The publishing sequence number assigned to the first event in the batch at the time
56-
/// the batch was successfully published.
55+
/// A flag that indicates whether space should be reserved for a publishing
56+
/// sequence number when the event size is measured. If <c>false</c>, a sequence
57+
/// number is not used in size calculations.
5758
/// </summary>
5859
///
5960
/// <remarks>
60-
/// The starting published sequence number is only populated and relevant when certain features
61+
/// The sequence number is only populated and relevant when certain features
6162
/// of the producer are enabled. For example, it is used by idempotent publishing.
6263
/// </remarks>
6364
///
64-
public override int? StartingPublishedSequenceNumber { get; set; }
65+
public override bool ReserveSpaceForSequenceNumber { get; }
6566

6667
/// <summary>
6768
/// The count of events contained in the batch.
@@ -93,9 +94,11 @@ internal class AmqpEventBatch : TransportEventBatch
9394
///
9495
/// <param name="messageConverter">The converter to use for translating <see cref="EventData" /> into the corresponding AMQP message.</param>
9596
/// <param name="options">The set of options to apply to the batch.</param>
97+
/// <param name="reserveSpaceForSequenceNumber">A flag that indicates whether space should be reserved for a publishing sequence number when the event size is measured. If <c>false</c>, a sequence number is not used in size calculations.</param>
9698
///
9799
public AmqpEventBatch(AmqpMessageConverter messageConverter,
98-
CreateBatchOptions options)
100+
CreateBatchOptions options,
101+
bool reserveSpaceForSequenceNumber)
99102
{
100103
Argument.AssertNotNull(messageConverter, nameof(messageConverter));
101104
Argument.AssertNotNull(options, nameof(options));
@@ -104,13 +107,13 @@ public AmqpEventBatch(AmqpMessageConverter messageConverter,
104107
MessageConverter = messageConverter;
105108
Options = options;
106109
MaximumSizeInBytes = options.MaximumSizeInBytes.Value;
110+
ReserveSpaceForSequenceNumber = reserveSpaceForSequenceNumber;
107111

108112
// Initialize the size by reserving space for the batch envelope.
109113

110114
using AmqpMessage envelope = messageConverter.CreateBatchFromEvents(Enumerable.Empty<EventData>(), options.PartitionKey);
111115
ReservedSize = envelope.SerializedMessageSize;
112116
_sizeBytes = ReservedSize;
113-
114117
}
115118

116119
/// <summary>
@@ -127,7 +130,12 @@ public override bool TryAdd(EventData eventData)
127130
Argument.AssertNotNull(eventData, nameof(eventData));
128131
Argument.AssertNotDisposed(_disposed, nameof(EventDataBatch));
129132

130-
AmqpMessage eventMessage = MessageConverter.CreateMessageFromEvent(eventData, Options.PartitionKey);
133+
if (ReserveSpaceForSequenceNumber)
134+
{
135+
eventData.PendingPublishSequenceNumber = int.MaxValue;
136+
}
137+
138+
var eventMessage = MessageConverter.CreateMessageFromEvent(eventData, Options.PartitionKey);
131139

132140
try
133141
{
@@ -152,6 +160,7 @@ public override bool TryAdd(EventData eventData)
152160
}
153161
finally
154162
{
163+
eventData.PendingPublishSequenceNumber = default;
155164
eventMessage?.Dispose();
156165
}
157166
}

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.Collections.Generic;
66
using System.Globalization;
7+
using System.Runtime.CompilerServices;
78
using System.Runtime.ExceptionServices;
89
using System.Threading;
910
using System.Threading.Tasks;
@@ -277,9 +278,9 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
277278
// default to the maximum size allowed by the link.
278279

279280
options.MaximumSizeInBytes ??= MaximumMessageSize;
280-
281281
Argument.AssertInRange(options.MaximumSizeInBytes.Value, EventHubProducerClient.MinimumBatchSizeLimit, MaximumMessageSize.Value, nameof(options.MaximumSizeInBytes));
282-
return new AmqpEventBatch(MessageConverter, options);
282+
283+
return new AmqpEventBatch(MessageConverter, options, IsSequenceMeasurementRequired(ActiveFeatures));
283284
}
284285

285286
/// <summary>
@@ -574,6 +575,18 @@ protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAs
574575
return link;
575576
}
576577

578+
/// <summary>
579+
/// Determines if measuring a sequence number is required to accurately calculate
580+
/// the size of an event.
581+
/// </summary>
582+
///
583+
/// <param name="activeFeatures">The set of features which are active for the producer.</param>
584+
///
585+
/// <returns><c>true</c> if a sequence number should be measured; otherwise, <c>false</c>.</returns>
586+
///
587+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
588+
private static bool IsSequenceMeasurementRequired(TransportProducerFeatures activeFeatures) => ((activeFeatures & TransportProducerFeatures.IdempotentPublishing) != 0);
589+
577590
/// <summary>
578591
/// Uses the minimum value of the two specified <see cref="TimeSpan" /> instances.
579592
/// </summary>

sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportEventBatch.cs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,23 +31,17 @@ internal abstract class TransportEventBatch : IDisposable
3131
public abstract long SizeInBytes { get; }
3232

3333
/// <summary>
34-
/// The publishing sequence number assigned to the first event in the batch at the time
35-
/// the batch was successfully published.
34+
/// A flag that indicates whether space should be reserved for a publishing
35+
/// sequence number when the event size is measured. If <c>false</c>, a sequence
36+
/// number is not used in size calculations.
3637
/// </summary>
3738
///
38-
/// <value>
39-
/// The sequence number of the first event in the batch, if the batch was successfully
40-
/// published by a sequence-aware producer. If the producer was not configured to apply
41-
/// sequence numbering or if the batch has not yet been successfully published, this member
42-
/// will be <c>null</c>.
43-
///</value>
44-
///
4539
/// <remarks>
46-
/// The starting published sequence number is only populated and relevant when certain features
40+
/// The sequence number is only populated and relevant when certain features
4741
/// of the producer are enabled. For example, it is used by idempotent publishing.
4842
/// </remarks>
4943
///
50-
public abstract int? StartingPublishedSequenceNumber { get; set; }
44+
public abstract bool ReserveSpaceForSequenceNumber { get; }
5145

5246
/// <summary>
5347
/// The count of events contained in the batch.

sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ protected EventData(ReadOnlyMemory<byte> eventBody,
323323
/// Transitions the pending publishing sequence number to the published sequence number.
324324
/// </summary>
325325
///
326-
internal void CommitPublishingSequenceNumber()
326+
internal void CommitPublishingState()
327327
{
328328
PublishedSequenceNumber = PendingPublishSequenceNumber;
329329
PendingPublishSequenceNumber = default;

sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/CreateBatchOptions.cs

100755100644
Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,6 @@ public long? MaximumSizeInBytes
4040
}
4141
}
4242

43-
/// <summary>
44-
/// Creates a new copy of the current <see cref="CreateBatchOptions" />, cloning its attributes into a new instance.
45-
/// </summary>
46-
///
47-
/// <returns>A new copy of <see cref="CreateBatchOptions" />.</returns>
48-
///
49-
internal CreateBatchOptions Clone() =>
50-
new CreateBatchOptions
51-
{
52-
PartitionId = PartitionId,
53-
PartitionKey = PartitionKey,
54-
_maximumSizeInBytes = MaximumSizeInBytes
55-
};
56-
5743
/// <summary>
5844
/// Determines whether the specified <see cref="System.Object" /> is equal to this instance.
5945
/// </summary>
@@ -82,5 +68,19 @@ internal CreateBatchOptions Clone() =>
8268
///
8369
[EditorBrowsable(EditorBrowsableState.Never)]
8470
public override string ToString() => base.ToString();
71+
72+
/// <summary>
73+
/// Creates a new copy of the current <see cref="CreateBatchOptions" />, cloning its attributes into a new instance.
74+
/// </summary>
75+
///
76+
/// <returns>A new copy of <see cref="CreateBatchOptions" />.</returns>
77+
///
78+
internal new CreateBatchOptions Clone() =>
79+
new CreateBatchOptions
80+
{
81+
PartitionId = PartitionId,
82+
PartitionKey = PartitionKey,
83+
_maximumSizeInBytes = _maximumSizeInBytes
84+
};
8585
}
8686
}

sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventDataBatch.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,7 @@ public sealed class EventDataBatch : IDisposable
5858
/// of the producer are enabled. For example, it is used by idempotent publishing.
5959
/// </remarks>
6060
///
61-
public int? StartingPublishedSequenceNumber
62-
{
63-
get => InnerBatch.StartingPublishedSequenceNumber;
64-
internal set => InnerBatch.StartingPublishedSequenceNumber = value;
65-
}
61+
public int? StartingPublishedSequenceNumber { get; internal set; }
6662

6763
/// <summary>
6864
/// The count of events contained in the batch.

0 commit comments

Comments
 (0)