Skip to content

Commit da8f4d0

Browse files
authored
[Event Hubs Client] Publish Events Without Batch (#11651)
The focus of these changes is to allow the publishing of events without the overhead of creating an explicit batch. In this scenario the caller assumes the risk with respect to exceeding the maximum size allowed by the Event Hubs service, trading safety for lower overhead and higher throughput.
1 parent edff9b2 commit da8f4d0

File tree

7 files changed

+74
-138
lines changed

7 files changed

+74
-138
lines changed

sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -409,12 +409,10 @@ public enum ProcessingStoppedReason
409409
}
410410
namespace Azure.Messaging.EventHubs.Producer
411411
{
412-
public partial class CreateBatchOptions
412+
public partial class CreateBatchOptions : Azure.Messaging.EventHubs.Producer.SendEventOptions
413413
{
414414
public CreateBatchOptions() { }
415415
public long? MaximumSizeInBytes { get { throw null; } set { } }
416-
public string PartitionId { get { throw null; } set { } }
417-
public string PartitionKey { get { throw null; } set { } }
418416
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
419417
public override bool Equals(object obj) { throw null; }
420418
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
@@ -455,6 +453,8 @@ public EventHubProducerClient(string connectionString, string eventHubName, Azur
455453
public virtual System.Threading.Tasks.Task<string[]> GetPartitionIdsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
456454
public virtual System.Threading.Tasks.Task<Azure.Messaging.EventHubs.PartitionProperties> GetPartitionPropertiesAsync(string partitionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
457455
public virtual System.Threading.Tasks.Task SendAsync(Azure.Messaging.EventHubs.Producer.EventDataBatch eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
456+
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventBatch, Azure.Messaging.EventHubs.Producer.SendEventOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
457+
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
458458
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
459459
public override string ToString() { throw null; }
460460
}
@@ -470,6 +470,18 @@ public EventHubProducerClientOptions() { }
470470
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
471471
public override string ToString() { throw null; }
472472
}
473+
public partial class SendEventOptions
474+
{
475+
public SendEventOptions() { }
476+
public string PartitionId { get { throw null; } set { } }
477+
public string PartitionKey { get { throw null; } set { } }
478+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
479+
public override bool Equals(object obj) { throw null; }
480+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
481+
public override int GetHashCode() { throw null; }
482+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
483+
public override string ToString() { throw null; }
484+
}
473485
}
474486
namespace Microsoft.Extensions.Azure
475487
{

sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/CreateBatchOptions.cs

Lines changed: 1 addition & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace Azure.Messaging.EventHubs.Producer
1111
/// behaves and is sent to the Event Hubs service.
1212
/// </summary>
1313
///
14-
public class CreateBatchOptions
14+
public class CreateBatchOptions : SendEventOptions
1515
{
1616
/// <summary>The requested maximum size to allow for the batch, in bytes.</summary>
1717
private long? _maximumSizeInBytes = null;
@@ -40,58 +40,6 @@ public long? MaximumSizeInBytes
4040
}
4141
}
4242

43-
/// <summary>
44-
/// Allows a hashing key to be provided for the batch of events, which instructs the Event Hubs
45-
/// service map this key to a specific partition but allowing the service to choose an arbitrary,
46-
/// partition for this batch of events and any other batches using the same partition hashing key.
47-
///
48-
/// The selection of a partition is stable for a given partition hashing key. Should any other
49-
/// batches of events be sent using the same exact partition hashing key, the Event Hubs service will
50-
/// route them all to the same partition.
51-
///
52-
/// This should be specified only when there is a need to group events by partition, but there is
53-
/// flexibility into which partition they are routed. If ensuring that a batch of events is sent
54-
/// only to a specific partition, it is recommended that the identifier of the position be
55-
/// specified directly when sending the batch.
56-
/// </summary>
57-
///
58-
/// <value>
59-
/// If the producer wishes to influence the automatic routing of events to partitions, the partition
60-
/// hashing key to associate with the event or batch of events; otherwise, <c>null</c>.
61-
/// </value>
62-
///
63-
/// <remarks>
64-
/// If the <see cref="CreateBatchOptions.PartitionKey" /> is specified, then no <see cref="CreateBatchOptions.PartitionId" />
65-
/// may be set when sending.
66-
/// </remarks>
67-
///
68-
public string PartitionKey { get; set; }
69-
70-
/// <summary>
71-
/// If specified, events will be published to this specific partition. If the identifier is not
72-
/// specified, the Event Hubs service will be responsible for routing events automatically to an available partition.
73-
/// </summary>
74-
///
75-
/// <value>
76-
/// If the producer wishes the events to be automatically routed to partitions, <c>null</c>; otherwise, the identifier
77-
/// of the desired partition.
78-
/// </value>
79-
///
80-
/// <remarks>
81-
/// If the <see cref="CreateBatchOptions.PartitionId" /> is specified, then no <see cref="CreateBatchOptions.PartitionKey" />
82-
/// may be set when sending.
83-
///
84-
/// <para>Allowing automatic routing of partitions is recommended when:</para>
85-
/// <para>- The sending of events needs to be highly available.</para>
86-
/// <para>- The event data should be evenly distributed among all available partitions.</para>
87-
///
88-
/// If no partition is specified, the following rules are used for automatically selecting one:
89-
/// <para>1) Distribute the events equally amongst all available partitions using a round-robin approach.</para>
90-
/// <para>2) If a partition becomes unavailable, the Event Hubs service will automatically detect it and forward the message to another available partition.</para>
91-
/// </remarks>
92-
///
93-
public string PartitionId { get; set; }
94-
9543
/// <summary>
9644
/// Creates a new copy of the current <see cref="CreateBatchOptions" />, cloning its attributes into a new instance.
9745
/// </summary>
@@ -106,15 +54,6 @@ internal CreateBatchOptions Clone() =>
10654
_maximumSizeInBytes = MaximumSizeInBytes
10755
};
10856

109-
/// <summary>
110-
/// Converts the <see cref="CreateBatchOptions" /> into an equivalent
111-
/// <see cref="SendEventOptions" /> instance.
112-
/// </summary>
113-
///
114-
/// <returns>A set of sending options equivalent to those represented by the batch options.</returns>
115-
///
116-
internal SendEventOptions ToSendOptions() => new SendEventOptions(PartitionId, PartitionKey);
117-
11857
/// <summary>
11958
/// Determines whether the specified <see cref="System.Object" /> is equal to this instance.
12059
/// </summary>

sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -370,53 +370,64 @@ internal virtual async Task SendAsync(EventData eventData,
370370
}
371371

372372
/// <summary>
373-
/// Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the
374-
/// maximum size of a single batch, an exception will be triggered and the send will fail.
373+
/// Sends a set of events to the associated Event Hub using a batched approach. Because the batch is implicitly created, the size of the event set is not
374+
/// validated until this method is invoked. The call will fail if the size of the specified set of events exceeds the maximum allowable size of a single batch.
375375
/// </summary>
376376
///
377-
/// <param name="events">The set of event data to send.</param>
377+
/// <param name="eventBatch">The set of event data to send.</param>
378378
/// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request to cancel the operation.</param>
379379
///
380380
/// <returns>A task to be resolved on when the operation has completed.</returns>
381381
///
382-
/// <seealso cref="SendAsync(IEnumerable{EventData}, SendEventOptions, CancellationToken)"/>
382+
/// <exception cref="EventHubsException">
383+
/// Occurs when the set of events exceeds the maximum size allowed in a single batch, as determined by the Event Hubs service. The <see cref="EventHubsException.Reason" /> will be set to
384+
/// <see cref="EventHubsException.FailureReason.MessageSizeExceeded"/> in this case.
385+
/// </exception>
386+
///
387+
/// <seealso cref="SendAsync(IEnumerable{EventData}, SendEventOptions, CancellationToken)" />
388+
/// <seealso cref="SendAsync(EventDataBatch, CancellationToken)" />
389+
/// <seealso cref="CreateBatchAsync(CancellationToken)" />
383390
///
384-
internal virtual async Task SendAsync(IEnumerable<EventData> events,
385-
CancellationToken cancellationToken = default) => await SendAsync(events, null, cancellationToken).ConfigureAwait(false);
391+
public virtual async Task SendAsync(IEnumerable<EventData> eventBatch,
392+
CancellationToken cancellationToken = default) => await SendAsync(eventBatch, null, cancellationToken).ConfigureAwait(false);
386393

387394
/// <summary>
388-
/// Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the
389-
/// maximum size of a single batch, an exception will be triggered and the send will fail.
395+
/// Sends a set of events to the associated Event Hub using a batched approach. Because the batch is implicitly created, the size of the event set is not
396+
/// validated until this method is invoked. The call will fail if the size of the specified set of events exceeds the maximum allowable size of a single batch.
390397
/// </summary>
391398
///
392-
/// <param name="events">The set of event data to send.</param>
399+
/// <param name="eventBatch">The set of event data to send.</param>
393400
/// <param name="options">The set of options to consider when sending this batch.</param>
394401
/// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request to cancel the operation.</param>
395402
///
396403
/// <returns>A task to be resolved on when the operation has completed.</returns>
397404
///
398-
/// <seealso cref="SendAsync(EventData, CancellationToken)" />
399-
/// <seealso cref="SendAsync(EventData, SendEventOptions, CancellationToken)" />
405+
/// <exception cref="EventHubsException">
406+
/// Occurs when the set of events exceeds the maximum size allowed in a single batch, as determined by the Event Hubs service. The <see cref="EventHubsException.Reason" /> will be set to
407+
/// <see cref="EventHubsException.FailureReason.MessageSizeExceeded"/> in this case.
408+
/// </exception>
409+
///
400410
/// <seealso cref="SendAsync(IEnumerable{EventData}, CancellationToken)" />
401411
/// <seealso cref="SendAsync(EventDataBatch, CancellationToken)" />
412+
/// <seealso cref="CreateBatchAsync(CreateBatchOptions, CancellationToken)" />
402413
///
403-
internal virtual async Task SendAsync(IEnumerable<EventData> events,
404-
SendEventOptions options,
405-
CancellationToken cancellationToken = default)
414+
public virtual async Task SendAsync(IEnumerable<EventData> eventBatch,
415+
SendEventOptions options,
416+
CancellationToken cancellationToken = default)
406417
{
407418
options ??= DefaultSendOptions;
408419

409-
Argument.AssertNotNull(events, nameof(events));
420+
Argument.AssertNotNull(eventBatch, nameof(eventBatch));
410421
AssertSinglePartitionReference(options.PartitionId, options.PartitionKey);
411422

412423
int attempts = 0;
413424

414-
events = (events as IList<EventData>) ?? events.ToList();
415-
InstrumentMessages(events);
425+
eventBatch = (eventBatch as IList<EventData>) ?? eventBatch.ToList();
426+
InstrumentMessages(eventBatch);
416427

417428
var diagnosticIdentifiers = new List<string>();
418429

419-
foreach (var eventData in events)
430+
foreach (var eventData in eventBatch)
420431
{
421432
if (EventDataInstrumentation.TryExtractDiagnosticId(eventData, out var identifier))
422433
{
@@ -433,8 +444,7 @@ internal virtual async Task SendAsync(IEnumerable<EventData> events,
433444
try
434445
{
435446
await using var _ = pooledProducer.ConfigureAwait(false);
436-
437-
await pooledProducer.TransportProducer.SendAsync(events, options, cancellationToken).ConfigureAwait(false);
447+
await pooledProducer.TransportProducer.SendAsync(eventBatch, options, cancellationToken).ConfigureAwait(false);
438448

439449
return;
440450
}
@@ -468,10 +478,6 @@ internal virtual async Task SendAsync(IEnumerable<EventData> events,
468478
///
469479
/// <returns>A task to be resolved on when the operation has completed.</returns>
470480
///
471-
/// <seealso cref="SendAsync(EventData, CancellationToken)" />
472-
/// <seealso cref="SendAsync(EventData, SendEventOptions, CancellationToken)" />
473-
/// <seealso cref="SendAsync(IEnumerable{EventData}, CancellationToken)" />
474-
/// <seealso cref="SendAsync(IEnumerable{EventData}, SendEventOptions, CancellationToken)" />
475481
/// <seealso cref="CreateBatchAsync(CancellationToken)" />
476482
///
477483
public virtual async Task SendAsync(EventDataBatch eventBatch,
@@ -535,6 +541,7 @@ public virtual async Task SendAsync(EventDataBatch eventBatch,
535541
/// <returns>An <see cref="EventDataBatch" /> with the default batch options.</returns>
536542
///
537543
/// <seealso cref="CreateBatchAsync(CreateBatchOptions, CancellationToken)" />
544+
/// <seealso cref="SendAsync(EventDataBatch, CancellationToken)" />
538545
///
539546
public virtual async ValueTask<EventDataBatch> CreateBatchAsync(CancellationToken cancellationToken = default) => await CreateBatchAsync(null, cancellationToken).ConfigureAwait(false);
540547

@@ -552,7 +559,8 @@ public virtual async Task SendAsync(EventDataBatch eventBatch,
552559
///
553560
/// <returns>An <see cref="EventDataBatch" /> with the requested <paramref name="options"/>.</returns>
554561
///
555-
/// <seealso cref="CreateBatchAsync(CreateBatchOptions, CancellationToken)" />
562+
/// <seealso cref="CreateBatchAsync(CancellationToken)" />
563+
/// <seealso cref="SendAsync(EventDataBatch, CancellationToken)" />
556564
///
557565
public virtual async ValueTask<EventDataBatch> CreateBatchAsync(CreateBatchOptions options,
558566
CancellationToken cancellationToken = default)
@@ -561,7 +569,7 @@ public virtual async ValueTask<EventDataBatch> CreateBatchAsync(CreateBatchOptio
561569
AssertSinglePartitionReference(options.PartitionId, options.PartitionKey);
562570

563571
TransportEventBatch transportBatch = await PartitionProducerPool.EventHubProducer.CreateBatchAsync(options, cancellationToken).ConfigureAwait(false);
564-
return new EventDataBatch(transportBatch, FullyQualifiedNamespace, EventHubName, options.ToSendOptions());
572+
return new EventDataBatch(transportBatch, FullyQualifiedNamespace, EventHubName, options);
565573
}
566574

567575
/// <summary>

sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/SendEventOptions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ namespace Azure.Messaging.EventHubs.Producer
77
{
88
/// <summary>
99
/// The set of options that can be specified to influence the way in which events
10-
/// are sent to the Event Hubs service.
10+
/// are published to the Event Hubs service.
1111
/// </summary>
1212
///
13-
internal class SendEventOptions
13+
public class SendEventOptions
1414
{
1515
/// <summary>
1616
/// Allows a hashing key to be provided for the batch of events, which instructs the Event Hubs

0 commit comments

Comments
 (0)