Skip to content

Commit 3d59356

Browse files
authored
[Event Hubs Client] Idempotent Producer Client (#15034)
The focus of these changes is implementing the idempotent publishing feature infrastructure into the `EventHubProducerClient` and associated types, refactoring the current structure to support idempotent and non-idempotent publishing as unique code paths. Not included in these changes is the addition of the `ReadPartitionPublishingProperties` member and live tests; those will be covered in dedicated work streams.
1 parent 4ed7fdf commit 3d59356

27 files changed

+2827
-354
lines changed

sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.Designer.cs

100755100644
Lines changed: 23 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.resx

100755100644
Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@
172172
<value>The requested transport type, '{0}' is not supported.</value>
173173
</data>
174174
<data name="CannotSendWithPartitionIdAndPartitionKey" xml:space="preserve">
175-
<value>A producer created for a specific partition cannot send events using a partition key. This producer is associated with partition '{0}'.</value>
175+
<value>An event cannot be published using both a partition key and a partition identifier. This operation specified partition key `{0}` and partition id `{1}`.</value>
176176
</data>
177177
<data name="UnsupportedCredential" xml:space="preserve">
178178
<value>The credential is not a known and supported credential type. Please use a JWT credential or shared key credential.</value>
@@ -289,6 +289,12 @@
289289
<value>One or more exceptions occured during event processing. Please see the inner exceptions for more detail.</value>
290290
</data>
291291
<data name="OnlyOneSharedAccessAuthorizationMayBeSpecified" xml:space="preserve">
292-
<value>The authorization for a connection string may specifiy a shared key or precomputed shared access signature, but not both. Please verify that your connection string does not have the `SharedAccessSignature` token if you are passing the `SharedKeyName` and `SharedKey`.</value>
292+
<value>The authorization for a connection string may specify a shared key or pre-computed shared access signature, but not both. Please verify that your connection string does not have the `SharedAccessSignature` token if you are passing the `SharedKeyName` and `SharedKey`.</value>
293+
</data>
294+
<data name="CannotPublishToGateway" xml:space="preserve">
295+
<value>The producer was configured to use features that require publishing to a specific partition. Publishing with automatic routing or using a partition key is not supported by this producer.</value>
296+
</data>
297+
<data name="IdempotentAlreadyPublished" xml:space="preserve">
298+
<value>These events have already been successfully published. When idempotent publishing is enabled, events that were acknowledged by the Event Hubs service may not be published again.</value>
293299
</data>
294300
</root>

sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ public EventHubProducerClient(string connectionString, string eventHubName, Azur
465465
public virtual System.Threading.Tasks.Task<string[]> GetPartitionIdsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
466466
public virtual System.Threading.Tasks.Task<Azure.Messaging.EventHubs.PartitionProperties> GetPartitionPropertiesAsync(string partitionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
467467
public virtual System.Threading.Tasks.Task SendAsync(Azure.Messaging.EventHubs.Producer.EventDataBatch eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
468-
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventBatch, Azure.Messaging.EventHubs.Producer.SendEventOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
468+
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventSet, Azure.Messaging.EventHubs.Producer.SendEventOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
469469
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
470470
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
471471
public override string ToString() { throw null; }
@@ -495,9 +495,9 @@ public partial class PartitionPublishingProperties
495495
{
496496
protected internal PartitionPublishingProperties(bool isIdempotentPublishingEnabled, long? producerGroupId, short? ownerLevel, int? lastPublishedSequenceNumber) { }
497497
public bool IsIdempotentPublishingEnabled { get { throw null; } }
498-
public int? LastPublishedSequenceNumber { get { throw null; } set { } }
499-
public short? OwnerLevel { get { throw null; } set { } }
500-
public long? ProducerGroupId { get { throw null; } set { } }
498+
public int? LastPublishedSequenceNumber { get { throw null; } }
499+
public short? OwnerLevel { get { throw null; } }
500+
public long? ProducerGroupId { get { throw null; } }
501501
}
502502
public partial class SendEventOptions
503503
{

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs

100755100644
Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using Azure.Messaging.EventHubs.Consumer;
1414
using Azure.Messaging.EventHubs.Core;
1515
using Azure.Messaging.EventHubs.Diagnostics;
16+
using Azure.Messaging.EventHubs.Producer;
1617
using Microsoft.Azure.Amqp;
1718

1819
namespace Azure.Messaging.EventHubs.Amqp
@@ -374,11 +375,15 @@ public override async Task<PartitionProperties> GetPartitionPropertiesAsync(stri
374375
/// </summary>
375376
///
376377
/// <param name="partitionId">The identifier of the partition to which the transport producer should be bound; if <c>null</c>, the producer is unbound.</param>
378+
/// <param name="requestedFeatures">The flags specifying the set of special transport features that should be opted-into.</param>
379+
/// <param name="partitionOptions">The set of options, if any, that should be considered when initializing the producer.</param>
377380
/// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
378381
///
379382
/// <returns>A <see cref="TransportProducer"/> configured in the requested manner.</returns>
380383
///
381384
public override TransportProducer CreateProducer(string partitionId,
385+
TransportProducerFeatures requestedFeatures,
386+
PartitionPublishingOptions partitionOptions,
382387
EventHubsRetryPolicy retryPolicy)
383388
{
384389
Argument.AssertNotClosed(_closed, nameof(AmqpClient));
@@ -389,7 +394,9 @@ public override TransportProducer CreateProducer(string partitionId,
389394
partitionId,
390395
ConnectionScope,
391396
MessageConverter,
392-
retryPolicy
397+
retryPolicy,
398+
requestedFeatures,
399+
partitionOptions
393400
);
394401
}
395402

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs

Lines changed: 113 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ internal class AmqpProducer : TransportProducer
5858
///
5959
private string PartitionId { get; }
6060

61+
/// <summary>
62+
/// The flags specifying the set of special transport features that this producer has opted-into.
63+
/// </summary>
64+
///
65+
private TransportProducerFeatures ActiveFeatures { get; }
66+
6167
/// <summary>
6268
/// The policy to use for determining retry behavior for when an operation fails.
6369
/// </summary>
@@ -92,6 +98,17 @@ internal class AmqpProducer : TransportProducer
9298
///
9399
private long? MaximumMessageSize { get; set; }
94100

101+
/// <summary>
102+
/// The set of partition publishing properties active for this producer at the time it was initialized.
103+
/// </summary>
104+
///
105+
/// <remarks>
106+
/// It is important to note that these properties are a snapshot of the service state at the time when the
107+
/// producer was initialized; they do not necessarily represent the current state of the service.
108+
/// </remarks>
109+
///
110+
private PartitionPublishingProperties InitializedPartitionProperties { get; set; }
111+
95112
/// <summary>
96113
/// Initializes a new instance of the <see cref="AmqpProducer"/> class.
97114
/// </summary>
@@ -101,6 +118,8 @@ internal class AmqpProducer : TransportProducer
101118
/// <param name="connectionScope">The AMQP connection context for operations.</param>
102119
/// <param name="messageConverter">The converter to use for translating between AMQP messages and client types.</param>
103120
/// <param name="retryPolicy">The retry policy to consider when an operation fails.</param>
121+
/// <param name="requestedFeatures">The flags specifying the set of special transport features that should be opted-into.</param>
122+
/// <param name="partitionOptions">The set of options, if any, that should be considered when initializing the producer.</param>
104123
///
105124
/// <remarks>
106125
/// As an internal type, this class performs only basic sanity checks against its arguments. It
@@ -115,8 +134,12 @@ public AmqpProducer(string eventHubName,
115134
string partitionId,
116135
AmqpConnectionScope connectionScope,
117136
AmqpMessageConverter messageConverter,
118-
EventHubsRetryPolicy retryPolicy)
137+
EventHubsRetryPolicy retryPolicy,
138+
TransportProducerFeatures requestedFeatures = TransportProducerFeatures.None,
139+
PartitionPublishingOptions partitionOptions = null)
119140
{
141+
partitionOptions ??= new PartitionPublishingOptions();
142+
120143
Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
121144
Argument.AssertNotNull(connectionScope, nameof(connectionScope));
122145
Argument.AssertNotNull(messageConverter, nameof(messageConverter));
@@ -127,9 +150,10 @@ public AmqpProducer(string eventHubName,
127150
RetryPolicy = retryPolicy;
128151
ConnectionScope = connectionScope;
129152
MessageConverter = messageConverter;
153+
ActiveFeatures = requestedFeatures;
130154

131155
SendLink = new FaultTolerantAmqpObject<SendingAmqpLink>(
132-
timeout => CreateLinkAndEnsureProducerStateAsync(partitionId, timeout, CancellationToken.None),
156+
timeout => CreateLinkAndEnsureProducerStateAsync(partitionId, partitionOptions, timeout, CancellationToken.None),
133157
link =>
134158
{
135159
link.Session?.SafeClose();
@@ -211,7 +235,6 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
211235
if (!MaximumMessageSize.HasValue)
212236
{
213237
var failedAttemptCount = 0;
214-
var retryDelay = default(TimeSpan?);
215238
var tryTimeout = RetryPolicy.CalculateTryTimeout(0);
216239

217240
while ((!cancellationToken.IsCancellationRequested) && (!_closed))
@@ -223,13 +246,13 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
223246
}
224247
catch (Exception ex)
225248
{
226-
Exception activeEx = ex.TranslateServiceException(EventHubName);
249+
++failedAttemptCount;
227250

228251
// Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop.
229252
// Otherwise, bubble the exception.
230253

231-
++failedAttemptCount;
232-
retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
254+
var activeEx = ex.TranslateServiceException(EventHubName);
255+
var retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
233256

234257
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellationRequested))
235258
{
@@ -247,13 +270,7 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
247270
}
248271
}
249272

250-
// If MaximumMessageSize has not been populated nor exception thrown
251-
// by this point, then cancellation has been requested.
252-
253-
if (!MaximumMessageSize.HasValue)
254-
{
255-
throw new TaskCanceledException();
256-
}
273+
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
257274
}
258275

259276
// Ensure that there was a maximum size populated; if none was provided,
@@ -265,6 +282,74 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
265282
return new AmqpEventBatch(MessageConverter, options);
266283
}
267284

285+
/// <summary>
286+
/// Reads the set of partition publishing properties active for this producer at the time it was initialized.
287+
/// </summary>
288+
///
289+
/// <param name="cancellationToken">The cancellation token to consider when creating the link.</param>
290+
///
291+
/// <returns>The set of <see cref="PartitionPublishingProperties" /> observed when the producer was initialized.</returns>
292+
///
293+
/// <remarks>
294+
/// It is important to note that these properties are a snapshot of the service state at the time when the
295+
/// producer was initialized; they do not necessarily represent the current state of the service.
296+
/// </remarks>
297+
///
298+
public override async ValueTask<PartitionPublishingProperties> ReadInitializationPublishingPropertiesAsync(CancellationToken cancellationToken)
299+
{
300+
Argument.AssertNotClosed(_closed, nameof(AmqpProducer));
301+
302+
// If the properties were already initialized, use them.
303+
304+
if (InitializedPartitionProperties != null)
305+
{
306+
return InitializedPartitionProperties;
307+
}
308+
309+
// Initialize the properties by forcing the link to be opened.
310+
311+
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
312+
313+
var failedAttemptCount = 0;
314+
var tryTimeout = RetryPolicy.CalculateTryTimeout(0);
315+
316+
while ((!cancellationToken.IsCancellationRequested) && (!_closed))
317+
{
318+
try
319+
{
320+
await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout)).ConfigureAwait(false);
321+
break;
322+
}
323+
catch (Exception ex)
324+
{
325+
++failedAttemptCount;
326+
327+
// Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop.
328+
// Otherwise, bubble the exception.
329+
330+
var activeEx = ex.TranslateServiceException(EventHubName);
331+
var retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
332+
333+
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellationRequested))
334+
{
335+
await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
336+
tryTimeout = RetryPolicy.CalculateTryTimeout(failedAttemptCount);
337+
}
338+
else if (ex is AmqpException)
339+
{
340+
ExceptionDispatchInfo.Capture(activeEx).Throw();
341+
}
342+
else
343+
{
344+
throw;
345+
}
346+
}
347+
}
348+
349+
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
350+
return InitializedPartitionProperties;
351+
}
352+
268353
/// <summary>
269354
/// Closes the connection to the transport producer instance.
270355
/// </summary>
@@ -430,6 +515,7 @@ protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory,
430515
/// </summary>
431516
///
432517
/// <param name="partitionId">The identifier of the Event Hub partition to which it is bound; if unbound, <c>null</c>.</param>
518+
/// <param name="partitionOptions">The set of options, if any, that should be considered when initializing the producer.</param>
433519
/// <param name="timeout">The timeout to apply when creating the link.</param>
434520
/// <param name="cancellationToken">The cancellation token to consider when creating the link.</param>
435521
///
@@ -443,6 +529,7 @@ protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory,
443529
/// </remarks>
444530
///
445531
protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAsync(string partitionId,
532+
PartitionPublishingOptions partitionOptions,
446533
TimeSpan timeout,
447534
CancellationToken cancellationToken)
448535
{
@@ -465,6 +552,19 @@ protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAs
465552
await Task.Delay(15, cancellationToken).ConfigureAwait(false);
466553
MaximumMessageSize = (long)link.Settings.MaxMessageSize;
467554
}
555+
556+
if (InitializedPartitionProperties == null)
557+
{
558+
if ((ActiveFeatures & TransportProducerFeatures.IdempotentPublishing) != 0)
559+
{
560+
throw new NotImplementedException(nameof(CreateLinkAndEnsureProducerStateAsync));
561+
}
562+
else
563+
{
564+
InitializedPartitionProperties = new PartitionPublishingProperties(false, null, null, null);
565+
}
566+
}
567+
468568
}
469569
catch (Exception ex)
470570
{

0 commit comments

Comments
 (0)