diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md index 95a194ffe63e..3509b889dfde 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md @@ -1,6 +1,14 @@ # Release History -## 5.3.0-beta.1 (Unreleased) +## 5.3.0-beta.2 (Unreleased) + +## 5.3.0-beta.1 (2020-09-15) + +### Changes + +#### New Features + +- Introduction of an option for the various event consumers allowing the prefetch cache to be filled based on a size-based heuristic rather than a count of events. This feature is considered a special case, helpful in scenarios where the size of events being read is not able to be known or predicted upfront and limiting resource use is valued over consistent and predictable performance. ## 5.2.0 (2020-09-08) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs index 34b87ff51cf4..2d0b9edd0cf7 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/api/Azure.Messaging.EventHubs.Processor.netstandard2.0.cs @@ -45,6 +45,7 @@ public EventProcessorClientOptions() { } public Azure.Messaging.EventHubs.Processor.LoadBalancingStrategy LoadBalancingStrategy { get { throw null; } set { } } public System.TimeSpan? MaximumWaitTime { get { throw null; } set { } } public int PrefetchCount { get { throw null; } set { } } + public long? PrefetchSizeInBytes { get { throw null; } set { } } public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } } public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj index 6c309647a048..b1082731f4df 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj @@ -1,7 +1,7 @@ Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This library extends its Event Processor with durable storage for checkpoint information using Azure Blob storage. For more information about Event Hubs, see https://azure.microsoft.com/en-us/services/event-hubs/ - 5.3.0-beta.1 + 5.3.0-beta.2 5.2.0 Azure;Event Hubs;EventHubs;.NET;Event Processor;EventProcessor;$(PackageCommonTags) $(RequiredTargetFrameworks) @@ -12,12 +12,12 @@ - + + - diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs index af69d24784e3..172bb181f6e0 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs @@ -1028,7 +1028,8 @@ private static EventProcessorOptions CreateOptions(EventProcessorClientOptions c MaximumWaitTime = clientOptions.MaximumWaitTime, TrackLastEnqueuedEventProperties = clientOptions.TrackLastEnqueuedEventProperties, LoadBalancingStrategy = clientOptions.LoadBalancingStrategy, - PrefetchCount = clientOptions.PrefetchCount + PrefetchCount = clientOptions.PrefetchCount, + PrefetchSizeInBytes = clientOptions.PrefetchSizeInBytes }; } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClientOptions.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClientOptions.cs index 3fcf22e7d72b..1a0d9825722f 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClientOptions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClientOptions.cs @@ -25,6 +25,9 @@ public class EventProcessorClientOptions /// The prefetch count to use when reading events. private int _prefetchCount = 300; + /// The prefetch size limit to use for the partition receiver. + private long? _prefetchSizeInBytes = default; + /// The set of options to use for configuring the connection to the Event Hubs service. private EventHubConnectionOptions _connectionOptions = new EventHubConnectionOptions(); @@ -135,9 +138,9 @@ public int CacheEventCount } /// - /// The number of events that will be eagerly requested from the Event Hubs service and staged locally without regard to - /// whether a reader is currently active, intended to help maximize throughput by buffering service operations rather than - /// readers needing to wait for service operations to complete. + /// The number of events that will be eagerly requested from the Event Hubs service and queued locally without regard to + /// whether a read operation is currently active, intended to help maximize throughput by allowing events to be read from + /// from a local cache rather than waiting on a service request. /// /// /// @@ -147,8 +150,8 @@ public int CacheEventCount /// /// /// - /// The size of the prefetch count has an influence on the efficiency of reading events from the Event Hubs service. The - /// larger the size of the cache, the more efficiently service operations can be buffered in the background to + /// The size of the prefetch count has an influence on the efficiency of reading events from the Event Hubs service. + /// The larger the size of the cache, the more efficiently service operations can be buffered in the background to /// improve throughput. This comes at the cost of additional memory use and potentially increases network I/O. /// /// For scenarios where the size of events is small and many events are flowing through the system, using a larger @@ -172,6 +175,38 @@ public int PrefetchCount } } + /// + /// The desired number of bytes to attempt to eagerly request from the Event Hubs service and queued locally without regard to + /// whether a read operation is currently active, intended to help maximize throughput by allowing events to be read from + /// from a local cache rather than waiting on a service request. + /// + /// + /// + /// When set to null, the option is considered disabled; otherwise, it will be considered enabled and take + /// precedence over any value specified for the The is an + /// advanced control that developers can use to help tune performance in some scenarios; it is recommended to prefer using + /// the over this option where possible for more accurate control and more predictable throughput. + /// + /// This size should be considered a statement of intent rather than a guaranteed limit; the local cache may be larger or + /// smaller than the number of bytes specified, and will always contain at least one event when the + /// is specified. A heuristic is used to predict the average event size to use for size calculations, which should be expected to fluctuate + /// as traffic passes through the system. Consequently, the resulting resource use will fluctuate as well. + /// + /// + public long? PrefetchSizeInBytes + { + get => _prefetchSizeInBytes; + + set + { + if (value.HasValue) + { + Argument.AssertAtLeast(value.Value, 0, nameof(PrefetchSizeInBytes)); + } + _prefetchSizeInBytes = value; + } + } + /// /// Gets or sets the options used for configuring the connection to the Event Hubs service. /// @@ -246,6 +281,7 @@ internal EventProcessorClientOptions Clone() => _maximumWaitTime = _maximumWaitTime, _cacheEventCount = _cacheEventCount, _prefetchCount = _prefetchCount, + _prefetchSizeInBytes = PrefetchSizeInBytes, _connectionOptions = ConnectionOptions.Clone(), _retryOptions = RetryOptions.Clone() }; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Azure.Messaging.EventHubs.Processor.Tests.csproj b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Azure.Messaging.EventHubs.Processor.Tests.csproj index 2d4629bb7524..672984713724 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Azure.Messaging.EventHubs.Processor.Tests.csproj +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Azure.Messaging.EventHubs.Processor.Tests.csproj @@ -9,19 +9,19 @@ + + + + + - - + - - - + - - - + diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientOptionsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientOptionsTests.cs index 8d501432ed73..4f2816847b72 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientOptionsTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientOptionsTests.cs @@ -32,6 +32,7 @@ public void CloneProducesACopy() MaximumWaitTime = TimeSpan.FromMinutes(65), CacheEventCount = 1, PrefetchCount = 0, + PrefetchSizeInBytes = 200, RetryOptions = new EventHubsRetryOptions { TryTimeout = TimeSpan.FromMinutes(1), Delay = TimeSpan.FromMinutes(4) }, ConnectionOptions = new EventHubConnectionOptions { TransportType = EventHubsTransportType.AmqpWebSockets } }; @@ -46,6 +47,7 @@ public void CloneProducesACopy() Assert.That(clone.MaximumWaitTime, Is.EqualTo(options.MaximumWaitTime), "The maximum wait time of the clone should match."); Assert.That(clone.CacheEventCount, Is.EqualTo(options.CacheEventCount), "The event cache size of the clone should match."); Assert.That(clone.PrefetchCount, Is.EqualTo(options.PrefetchCount), "The prefetch count of the clone should match."); + Assert.That(clone.PrefetchSizeInBytes, Is.EqualTo(options.PrefetchSizeInBytes), "The prefetch byte size of the clone should match."); Assert.That(clone.ConnectionOptions.TransportType, Is.EqualTo(options.ConnectionOptions.TransportType), "The connection options of the clone should copy properties."); Assert.That(clone.ConnectionOptions, Is.Not.SameAs(options.ConnectionOptions), "The connection options of the clone should be a copy, not the same instance."); Assert.That(clone.RetryOptions.IsEquivalentTo(options.RetryOptions), Is.True, "The retry options of the clone should be considered equal."); @@ -113,6 +115,39 @@ public void PrefetchCountAllowsZero() Assert.That(() => new EventProcessorClientOptions { PrefetchCount = 0 }, Throws.Nothing); } + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchSizeInBytesIsValidated() + { + Assert.That(() => new EventProcessorClientOptions { PrefetchSizeInBytes = -1 }, Throws.InstanceOf()); + } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchSizeInBytesAllowsZero() + { + Assert.That(() => new EventProcessorClientOptions { PrefetchSizeInBytes = 0 }, Throws.Nothing); + } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchSizeInBytesAllowsNull() + { + Assert.That(() => new EventProcessorClientOptions { PrefetchSizeInBytes = null }, Throws.Nothing); + } + /// /// Verifies functionality of the /// property. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs index 30f814ddbd64..2d746a8e4f69 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs @@ -123,6 +123,7 @@ void assertOptionsMatch(EventProcessorOptions expected, Assert.That(actual.LoadBalancingUpdateInterval, Is.EqualTo(expected.LoadBalancingUpdateInterval), $"The load balancing interval is incorrect for the { constructorDescription } constructor."); Assert.That(actual.PartitionOwnershipExpirationInterval, Is.EqualTo(expected.PartitionOwnershipExpirationInterval), $"The ownership expiration interval incorrect for the { constructorDescription } constructor."); Assert.That(actual.PrefetchCount, Is.EqualTo(expected.PrefetchCount), $"The prefetch count is incorrect for the { constructorDescription } constructor."); + Assert.That(actual.PrefetchSizeInBytes, Is.EqualTo(expected.PrefetchSizeInBytes), $"The prefetch byte size is incorrect for the { constructorDescription } constructor."); } var clientOptions = new EventProcessorClientOptions @@ -131,7 +132,9 @@ void assertOptionsMatch(EventProcessorOptions expected, RetryOptions = new EventHubsRetryOptions { MaximumRetries = 99 }, Identifier = "OMG, HAI!", MaximumWaitTime = TimeSpan.FromDays(54), - TrackLastEnqueuedEventProperties = true + TrackLastEnqueuedEventProperties = true, + PrefetchCount = 5, + PrefetchSizeInBytes = 500 }; var expectedOptions = InvokeCreateOptions(clientOptions); @@ -1408,7 +1411,8 @@ public void ClientOptionsCanBeTranslated() MaximumWaitTime = TimeSpan.FromDays(54), TrackLastEnqueuedEventProperties = true, LoadBalancingStrategy = LoadBalancingStrategy.Greedy, - PrefetchCount = 9990 + PrefetchCount = 9990, + PrefetchSizeInBytes = 400 }; var defaultOptions = new EventProcessorOptions(); @@ -1424,6 +1428,7 @@ public void ClientOptionsCanBeTranslated() Assert.That(processorOptions.TrackLastEnqueuedEventProperties, Is.EqualTo(clientOptions.TrackLastEnqueuedEventProperties), "The flag for last event tracking should have been set."); Assert.That(processorOptions.LoadBalancingStrategy, Is.EqualTo(clientOptions.LoadBalancingStrategy), "The load balancing strategy should have been set."); Assert.That(processorOptions.PrefetchCount, Is.EqualTo(clientOptions.PrefetchCount), "The prefetch count should have been set."); + Assert.That(processorOptions.PrefetchSizeInBytes, Is.EqualTo(clientOptions.PrefetchSizeInBytes), "The prefetch byte size should have been set."); Assert.That(processorOptions.DefaultStartingPosition, Is.EqualTo(defaultOptions.DefaultStartingPosition), "The default starting position should not have been set."); Assert.That(processorOptions.LoadBalancingUpdateInterval, Is.EqualTo(defaultOptions.LoadBalancingUpdateInterval), "The load balancing interval should not have been set."); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md index a80caeb6b7f8..d29d024b766a 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md @@ -1,6 +1,14 @@ # Release History -## 5.3.0-beta.1 (Unreleased) +## 5.3.0-beta.2 (Unreleased) + +## 5.3.0-beta.1 (2020-09-15) + +### Changes + +#### New Features + +- Introduction of an option for the various event consumers allowing the prefetch cache to be filled based on a size-based heuristic rather than a count of events. This feature is considered a special case, helpful in scenarios where the size of events being read is not able to be known or predicted upfront and limiting resource use is valued over consistent and predictable performance. ## 5.2.0 (2020-09-08) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs b/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs index 247e133431ed..c0426c2db98e 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs @@ -237,6 +237,7 @@ public ReadEventOptions() { } public System.TimeSpan? MaximumWaitTime { get { throw null; } set { } } public long? OwnerLevel { get { throw null; } set { } } public int PrefetchCount { get { throw null; } set { } } + public long? PrefetchSizeInBytes { get { throw null; } set { } } public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override bool Equals(object obj) { throw null; } @@ -268,6 +269,7 @@ public EventProcessorOptions() { } public System.TimeSpan? MaximumWaitTime { get { throw null; } set { } } public System.TimeSpan PartitionOwnershipExpirationInterval { get { throw null; } set { } } public int PrefetchCount { get { throw null; } set { } } + public long? PrefetchSizeInBytes { get { throw null; } set { } } public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } } public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] @@ -358,6 +360,7 @@ public PartitionReceiverOptions() { } public System.TimeSpan? DefaultMaximumReceiveWaitTime { get { throw null; } set { } } public long? OwnerLevel { get { throw null; } set { } } public int PrefetchCount { get { throw null; } set { } } + public long? PrefetchSizeInBytes { get { throw null; } set { } } public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } } public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs index f5aedd29e91f..d8fee6bf3563 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs @@ -424,6 +424,7 @@ public override TransportProducer CreateProducer(string partitionId, /// Indicates whether information on the last enqueued event on the partition is sent as events are received. /// The relative priority to associate with the link; for a non-exclusive link, this value should be null. /// Controls the number of events received and queued locally without regard to whether an operation was requested. If null a default will be used. + /// The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size. /// /// A configured in the requested manner. /// @@ -433,7 +434,8 @@ public override TransportConsumer CreateConsumer(string consumerGroup, EventHubsRetryPolicy retryPolicy, bool trackLastEnqueuedEventProperties, long? ownerLevel, - uint? prefetchCount) + uint? prefetchCount, + long? prefetchSizeInBytes) { Argument.AssertNotClosed(_closed, nameof(AmqpClient)); @@ -446,6 +448,7 @@ public override TransportConsumer CreateConsumer(string consumerGroup, trackLastEnqueuedEventProperties, ownerLevel, prefetchCount, + prefetchSizeInBytes, ConnectionScope, MessageConverter, retryPolicy diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs index 2c0367ddcb18..12e720513448 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs @@ -239,10 +239,11 @@ public virtual async Task OpenManagementLinkAsync(TimeS /// The name of the consumer group in the context of which events should be received. /// The identifier of the Event Hub partition from which events should be received. /// The position of the event in the partition where the link should be filtered to. + /// The timeout to apply when creating the link. /// Controls the number of events received and queued locally without regard to whether an operation was requested. + /// The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size. /// The relative priority to associate with the link; for a non-exclusive link, this value should be null. /// Indicates whether information on the last enqueued event on the partition is sent as events are received. - /// The timeout to apply when creating the link. /// An optional instance to signal the request to cancel the operation. /// /// A link for use with consumer operations. @@ -252,6 +253,7 @@ public virtual async Task OpenConsumerLinkAsync(string consum EventPosition eventPosition, TimeSpan timeout, uint prefetchCount, + long? prefetchSizeInBytes, long? ownerLevel, bool trackLastEnqueuedEventProperties, CancellationToken cancellationToken) @@ -273,6 +275,7 @@ public virtual async Task OpenConsumerLinkAsync(string consum eventPosition, timeout.CalculateRemaining(stopWatch.GetElapsedTime()), prefetchCount, + prefetchSizeInBytes, ownerLevel, trackLastEnqueuedEventProperties, cancellationToken @@ -460,6 +463,7 @@ protected virtual async Task CreateManagementLinkAsync( /// The fully qualified endpoint to open the link for. /// The position of the event in the partition where the link should be filtered to. /// Controls the number of events received and queued locally without regard to whether an operation was requested. + /// The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size. /// The relative priority to associate with the link; for a non-exclusive link, this value should be null. /// Indicates whether information on the last enqueued event on the partition is sent as events are received. /// The timeout to apply when creating the link. @@ -472,6 +476,7 @@ protected virtual async Task CreateReceivingLinkAsync(AmqpCon EventPosition eventPosition, TimeSpan timeout, uint prefetchCount, + long? prefetchSizeInBytes, long? ownerLevel, bool trackLastEnqueuedEventProperties, CancellationToken cancellationToken) @@ -510,7 +515,8 @@ protected virtual async Task CreateReceivingLinkAsync(AmqpCon AutoSendFlow = prefetchCount > 0, SettleType = SettleMode.SettleOnSend, Source = new Source { Address = endpoint.AbsolutePath, FilterSet = filters }, - Target = new Target { Address = Guid.NewGuid().ToString() } + Target = new Target { Address = Guid.NewGuid().ToString() }, + TotalCacheSizeInBytes = prefetchSizeInBytes }; linkSettings.AddProperty(AmqpProperty.EntityType, (int)AmqpProperty.Entity.ConsumerGroup); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConsumer.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConsumer.cs index 241a8d022e14..4432e0d7d256 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConsumer.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConsumer.cs @@ -121,6 +121,7 @@ internal class AmqpConsumer : TransportConsumer /// The identifier of the Event Hub partition from which events will be received. /// The position of the event in the partition where the consumer should begin reading. /// Controls the number of events received and queued locally without regard to whether an operation was requested. If null a default will be used. + /// The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size. /// The relative priority to associate with the link; for a non-exclusive link, this value should be null. /// Indicates whether information on the last enqueued event on the partition is sent as events are received. /// The AMQP connection context for operations . @@ -143,6 +144,7 @@ public AmqpConsumer(string eventHubName, bool trackLastEnqueuedEventProperties, long? ownerLevel, uint? prefetchCount, + long? prefetchSizeInBytes, AmqpConnectionScope connectionScope, AmqpMessageConverter messageConverter, EventHubsRetryPolicy retryPolicy) @@ -170,6 +172,7 @@ public AmqpConsumer(string eventHubName, partitionId, CurrentEventPosition, prefetchCount ?? DefaultPrefetchCount, + prefetchSizeInBytes, ownerLevel, trackLastEnqueuedEventProperties, timeout, @@ -382,6 +385,7 @@ public override async Task CloseAsync(CancellationToken cancellationToken) /// The identifier of the Event Hub partition to which the link is bound. /// The place within the partition's event stream to begin consuming events. /// Controls the number of events received and queued locally without regard to whether an operation was requested. + /// The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size. /// The relative priority to associate with the link; for a non-exclusive link, this value should be null. /// Indicates whether information on the last enqueued event on the partition is sent as events are received. /// The timeout to apply when creating the link. @@ -393,6 +397,7 @@ private async Task CreateConsumerLinkAsync(string consumerGro string partitionId, EventPosition eventStartingPosition, uint prefetchCount, + long? prefetchSizeInBytes, long? ownerLevel, bool trackLastEnqueuedEventProperties, TimeSpan timeout, @@ -408,6 +413,7 @@ private async Task CreateConsumerLinkAsync(string consumerGro eventStartingPosition, timeout, prefetchCount, + prefetchSizeInBytes, ownerLevel, trackLastEnqueuedEventProperties, cancellationToken).ConfigureAwait(false); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/ReadEventOptions.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/ReadEventOptions.cs index 26d1998f3a51..e12ad175f603 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/ReadEventOptions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/ReadEventOptions.cs @@ -23,6 +23,9 @@ public class ReadEventOptions /// The prefetch count to use when reading events. private int _prefetchCount = 300; + /// The prefetch size limit to use for the partition receiver. + private long? _prefetchSizeInBytes = default; + /// /// When populated, the owner level indicates that a reading is intended to be performed exclusively for events in the /// requested partition and for the associated consumer group. To do so, reading will attempt to assert ownership @@ -128,9 +131,9 @@ public int CacheEventCount } /// - /// The number of events that will be eagerly requested from the Event Hubs service and staged locally without regard to - /// whether a reader is currently active, intended to help maximize throughput by buffering service operations rather than - /// readers needing to wait for service operations to complete. + /// The number of events that will be eagerly requested from the Event Hubs service and queued locally without regard to + /// whether a read operation is currently active, intended to help maximize throughput by allowing events to be read from + /// from a local cache rather than waiting on a service request. /// /// /// @@ -165,6 +168,38 @@ public int PrefetchCount } } + /// + /// The desired number of bytes to attempt to eagerly request from the Event Hubs service and queued locally without regard to + /// whether a read operation is currently active, intended to help maximize throughput by allowing events to be read from + /// from a local cache rather than waiting on a service request. + /// + /// + /// + /// When set to null, the option is considered disabled; otherwise, it will be considered enabled and take + /// precedence over any value specified for the The is an + /// advanced control that developers can use to help tune performance in some scenarios; it is recommended to prefer using + /// the over this option where possible for more accurate control and more predictable throughput. + /// + /// This size should be considered a statement of intent rather than a guaranteed limit; the local cache may be larger or + /// smaller than the number of bytes specified, and will always contain at least one event when the + /// is specified. A heuristic is used to predict the average event size to use for size calculations, which should be expected to fluctuate + /// as traffic passes through the system. Consequently, the resulting resource use will fluctuate as well. + /// + /// + public long? PrefetchSizeInBytes + { + get => _prefetchSizeInBytes; + + set + { + if (value.HasValue) + { + Argument.AssertAtLeast(value.Value, 0, nameof(PrefetchSizeInBytes)); + } + _prefetchSizeInBytes = value; + } + } + /// /// Determines whether the specified is equal to this instance. /// @@ -203,11 +238,12 @@ public int PrefetchCount internal ReadEventOptions Clone() => new ReadEventOptions { - OwnerLevel = OwnerLevel, - TrackLastEnqueuedEventProperties = TrackLastEnqueuedEventProperties, _maximumWaitTime = _maximumWaitTime, _cacheEventCount = _cacheEventCount, _prefetchCount = _prefetchCount, + _prefetchSizeInBytes = _prefetchSizeInBytes, + OwnerLevel = OwnerLevel, + TrackLastEnqueuedEventProperties = TrackLastEnqueuedEventProperties }; } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportClient.cs index b96d5c8b6ea3..f7e0ccf29a07 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportClient.cs @@ -103,6 +103,7 @@ public abstract TransportProducer CreateProducer(string partitionId, /// Indicates whether information on the last enqueued event on the partition is sent as events are received. /// The relative priority to associate with the link; for a non-exclusive link, this value should be null. /// Controls the number of events received and queued locally without regard to whether an operation was requested. If null a default will be used. + /// The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size. /// /// A configured in the requested manner. /// @@ -112,7 +113,8 @@ public abstract TransportConsumer CreateConsumer(string consumerGroup, EventHubsRetryPolicy retryPolicy, bool trackLastEnqueuedEventProperties, long? ownerLevel, - uint? prefetchCount); + uint? prefetchCount, + long? prefetchSizeInBytes); /// /// Closes the connection to the transport client instance. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/EventHubConnection.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/EventHubConnection.cs index 65a9165097b1..4c1ae8d7aded 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/EventHubConnection.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/EventHubConnection.cs @@ -394,6 +394,7 @@ internal virtual TransportProducer CreateTransportProducer(string partitionId, /// Indicates whether information on the last enqueued event on the partition is sent as events are received. /// The relative priority to associate with the link; for a non-exclusive link, this value should be null. /// Controls the number of events received and queued locally without regard to whether an operation was requested. If null a default will be used. + /// The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size. /// /// A configured in the requested manner. /// @@ -403,13 +404,14 @@ internal virtual TransportConsumer CreateTransportConsumer(string consumerGroup, EventHubsRetryPolicy retryPolicy, bool trackLastEnqueuedEventProperties = true, long? ownerLevel = default, - uint? prefetchCount = default) + uint? prefetchCount = default, + long? prefetchSizeInBytes = default) { Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup)); Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId)); Argument.AssertNotNull(retryPolicy, nameof(retryPolicy)); - return InnerClient.CreateConsumer(consumerGroup, partitionId, eventPosition, retryPolicy, trackLastEnqueuedEventProperties, ownerLevel, prefetchCount); + return InnerClient.CreateConsumer(consumerGroup, partitionId, eventPosition, retryPolicy, trackLastEnqueuedEventProperties, ownerLevel, prefetchCount, prefetchSizeInBytes); } /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessorOptions.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessorOptions.cs index d84625dbdf76..99a5bfcab0d1 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessorOptions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessorOptions.cs @@ -29,6 +29,9 @@ public class EventProcessorOptions /// The prefetch count to use for the event processor. private int _prefetchCount = 300; + /// The prefetch size limit to use for the partition receiver. + private long? _prefetchSizeInBytes = default; + /// The desired amount of time to allow between load balancing verification attempts. private TimeSpan _loadBalancingUpdateInterval = TimeSpan.FromSeconds(10); @@ -94,9 +97,9 @@ public TimeSpan? MaximumWaitTime } /// - /// The number of events that will be eagerly requested from the Event Hubs service and staged locally without regard to - /// whether the processor is currently active, intended to help maximize throughput by buffering service operations rather than - /// readers needing to wait for service operations to complete. + /// The number of events that will be eagerly requested from the Event Hubs service and queued locally without regard to + /// whether a read operation is currently active, intended to help maximize throughput by allowing events to be read from + /// from a local cache rather than waiting on a service request. /// /// /// @@ -131,6 +134,38 @@ public int PrefetchCount } } + /// + /// The desired number of bytes to attempt to eagerly request from the Event Hubs service and queued locally without regard to + /// whether a read operation is currently active, intended to help maximize throughput by allowing events to be read from + /// from a local cache rather than waiting on a service request. + /// + /// + /// + /// When set to null, the option is considered disabled; otherwise, it will be considered enabled and take + /// precedence over any value specified for the The is an + /// advanced control that developers can use to help tune performance in some scenarios; it is recommended to prefer using + /// the over this option where possible for more accurate control and more predictable throughput. + /// + /// This size should be considered a statement of intent rather than a guaranteed limit; the local cache may be larger or + /// smaller than the number of bytes specified, and will always contain at least one event when the + /// is specified. A heuristic is used to predict the average event size to use for size calculations, which should be expected to fluctuate + /// as traffic passes through the system. Consequently, the resulting resource use will fluctuate as well. + /// + /// + public long? PrefetchSizeInBytes + { + get => _prefetchSizeInBytes; + + set + { + if (value.HasValue) + { + Argument.AssertAtLeast(value.Value, 0, nameof(PrefetchSizeInBytes)); + } + _prefetchSizeInBytes = value; + } + } + /// /// The desired amount of time to allow between load balancing verification attempts. /// @@ -258,6 +293,7 @@ internal EventProcessorOptions Clone() => _connectionOptions = ConnectionOptions.Clone(), _retryOptions = RetryOptions.Clone(), _prefetchCount = PrefetchCount, + _prefetchSizeInBytes = PrefetchSizeInBytes, _maximumWaitTime = MaximumWaitTime, _loadBalancingUpdateInterval = LoadBalancingUpdateInterval, _partitionOwnershipExpirationInterval = PartitionOwnershipExpirationInterval, diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor{TPartition}.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor{TPartition}.cs index c992248e41a5..53de29667b39 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor{TPartition}.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor{TPartition}.cs @@ -430,7 +430,7 @@ internal virtual TransportConsumer CreateConsumer(string consumerGroup, EventPosition eventPosition, EventHubConnection connection, EventProcessorOptions options) => - connection.CreateTransportConsumer(consumerGroup, partitionId, eventPosition, options.RetryOptions.ToRetryPolicy(), options.TrackLastEnqueuedEventProperties, prefetchCount: (uint?)options.PrefetchCount, ownerLevel: 0); + connection.CreateTransportConsumer(consumerGroup, partitionId, eventPosition, options.RetryOptions.ToRetryPolicy(), options.TrackLastEnqueuedEventProperties, prefetchCount: (uint?)options.PrefetchCount, prefetchSizeInBytes: options.PrefetchSizeInBytes, ownerLevel: 0); /// /// Creates a to use for interacting with durable storage. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PartitionReceiver.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PartitionReceiver.cs old mode 100755 new mode 100644 index 31a28508d4c6..a832bb968565 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PartitionReceiver.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PartitionReceiver.cs @@ -466,7 +466,7 @@ internal virtual TransportConsumer CreateTransportConsumer(string consumerGroup, EventPosition eventPosition, EventHubsRetryPolicy retryPolicy, PartitionReceiverOptions options) => - Connection.CreateTransportConsumer(consumerGroup, partitionId, eventPosition, retryPolicy, options.TrackLastEnqueuedEventProperties, options.OwnerLevel, (uint?)options.PrefetchCount); + Connection.CreateTransportConsumer(consumerGroup, partitionId, eventPosition, retryPolicy, options.TrackLastEnqueuedEventProperties, options.OwnerLevel, (uint?)options.PrefetchCount, options.PrefetchSizeInBytes); /// /// Receives a batch of from the Event Hub partition this client is associated with. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PartitionReceiverOptions.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PartitionReceiverOptions.cs old mode 100755 new mode 100644 index e4c6c0e2322a..479136e29ea4 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PartitionReceiverOptions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PartitionReceiverOptions.cs @@ -27,6 +27,9 @@ public class PartitionReceiverOptions /// The prefetch count to use for the partition receiver. private int _prefetchCount = 300; + /// The prefetch size limit to use for the partition receiver. + private long? _prefetchSizeInBytes = default; + /// /// The options used for configuring the connection to the Event Hubs service. /// @@ -103,8 +106,8 @@ public TimeSpan? DefaultMaximumReceiveWaitTime /// /// The number of events that will be eagerly requested from the Event Hubs service and queued locally without regard to - /// whether a read operation is currently active, intended to help maximize throughput by allowing the partition receiver - /// to read from a local cache rather than waiting on a service request. + /// whether a read operation is currently active, intended to help maximize throughput by allowing events to be read from + /// from a local cache rather than waiting on a service request. /// /// /// @@ -124,6 +127,39 @@ public int PrefetchCount } } + /// + /// The desired number of bytes to attempt to eagerly request from the Event Hubs service and queued locally without regard to + /// whether a read operation is currently active, intended to help maximize throughput by allowing events to be read from + /// from a local cache rather than waiting on a service request. + /// + /// + /// + /// When set to null, the option is considered disabled; otherwise, it will be considered enabled and take + /// precedence over any value specified for the The is an + /// advanced control that developers can use to help tune performance in some scenarios; it is recommended to prefer using + /// the over this option where possible for more accurate control and more predictable throughput. + /// + /// This size should be considered a statement of intent rather than a guaranteed limit; the local cache may be larger or + /// smaller than the number of bytes specified, and will always contain at least one event when the + /// is specified. A heuristic is used to predict the average event size to use for size calculations, which should be expected to fluctuate + /// as traffic passes through the system. Consequently, the resulting resource use will fluctuate as well. + /// + /// + public long? PrefetchSizeInBytes + { + get => _prefetchSizeInBytes; + + set + { + if (value.HasValue) + { + Argument.AssertAtLeast(value.Value, 0, nameof(PrefetchSizeInBytes)); + } + + _prefetchSizeInBytes = value; + } + } + /// /// Indicates whether or not the reader should request information on the last enqueued event on the partition /// associated with a given event, and track that information as events are read. @@ -181,8 +217,9 @@ internal PartitionReceiverOptions Clone() => _connectionOptions = ConnectionOptions.Clone(), _retryOptions = RetryOptions.Clone(), _defaultMaximumReceiveWaitTime = DefaultMaximumReceiveWaitTime, - OwnerLevel = OwnerLevel, _prefetchCount = PrefetchCount, + _prefetchSizeInBytes = PrefetchSizeInBytes, + OwnerLevel = OwnerLevel, TrackLastEnqueuedEventProperties = TrackLastEnqueuedEventProperties }; } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs index 3ce07f263c72..493c86c46da5 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs @@ -46,9 +46,6 @@ public class EventHubProducerClient : IAsyncDisposable /// The set of default publishing options to use when no specific options are requested. private static readonly SendEventOptions DefaultSendOptions = new SendEventOptions(); - /// The set of default publishing options to use when no specific options are requested. - private static readonly CreateBatchOptions DefaultCreateBatchOptions = new CreateBatchOptions(); - /// Sets how long a dedicated would sit in memory before its would remove and close it. private static readonly TimeSpan PartitionProducerLifespan = TimeSpan.FromMinutes(5); @@ -614,7 +611,7 @@ public virtual async Task SendAsync(EventDataBatch eventBatch, public virtual async ValueTask CreateBatchAsync(CreateBatchOptions options, CancellationToken cancellationToken = default) { - options = options?.Clone() ?? DefaultCreateBatchOptions; + options = options?.Clone() ?? new CreateBatchOptions(); AssertSinglePartitionReference(options.PartitionId, options.PartitionKey); TransportEventBatch transportBatch = await PartitionProducerPool.EventHubProducer.CreateBatchAsync(options, cancellationToken).ConfigureAwait(false); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpClientTests.cs index 7dfb51389f77..85c9905d693f 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpClientTests.cs @@ -700,7 +700,7 @@ public async Task CreateConsumerValidatesClosed() var client = new AmqpClient("my.eventhub.com", "somePath", credential.Object, new EventHubConnectionOptions()); await client.CloseAsync(cancellationSource.Token); - Assert.That(() => client.CreateConsumer("group", "0", EventPosition.Earliest, Mock.Of(), false, null, null), Throws.InstanceOf().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.ClientClosed)); + Assert.That(() => client.CreateConsumer("group", "0", EventPosition.Earliest, Mock.Of(), false, null, null, null), Throws.InstanceOf().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.ClientClosed)); } /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs index 501aea67371e..e06736ad275a 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs @@ -285,7 +285,7 @@ public void OpenConsumerLinkAsyncValidatesTheConsumerGroup(string consumerGroup) var identifier = "customIdentIFIER"; using var scope = new AmqpConnectionScope(endpoint, eventHub, credential.Object, transport, null, identifier); - Assert.That(() => scope.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 0, null, false, CancellationToken.None), Throws.InstanceOf()); + Assert.That(() => scope.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 0, null, null, false, CancellationToken.None), Throws.InstanceOf()); } /// @@ -307,7 +307,7 @@ public void OpenConsumerLinkAsyncValidatesThePartitionId(string partitionId) var identifier = "customIdentIFIER"; using var scope = new AmqpConnectionScope(endpoint, eventHub, credential.Object, transport, null, identifier); - Assert.That(() => scope.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 0, null, false, CancellationToken.None), Throws.InstanceOf()); + Assert.That(() => scope.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 0, null, null, false, CancellationToken.None), Throws.InstanceOf()); } /// @@ -332,7 +332,7 @@ public void OpenConsumerLinkAsyncRespectsTokenCancellation() var cancellationSource = new CancellationTokenSource(); cancellationSource.Cancel(); - Assert.That(() => scope.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 0, null, false, cancellationSource.Token), Throws.InstanceOf()); + Assert.That(() => scope.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 0, null, null, false, cancellationSource.Token), Throws.InstanceOf()); } /// @@ -355,7 +355,7 @@ public void OpenConsumerLinkAsyncRespectsDisposal() var scope = new AmqpConnectionScope(endpoint, eventHub, credential.Object, transport, null, identifier); scope.Dispose(); - Assert.That(() => scope.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 0, null, false, CancellationToken.None), Throws.InstanceOf()); + Assert.That(() => scope.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 0, null, null, false, CancellationToken.None), Throws.InstanceOf()); } /// @@ -372,6 +372,7 @@ public async Task OpenConsumerLinkAsyncRequestsTheLink() var partitionId = "0"; var ownerLevel = 95; var prefetchCount = 300U; + var prefetchSizeInBytes = 4242L; var trackLastEvent = false; var position = EventPosition.Latest; var credential = new Mock(Mock.Of(), "{namespace}.servicebus.windows.net"); @@ -407,6 +408,7 @@ public async Task OpenConsumerLinkAsyncRequestsTheLink() ItExpr.Is(value => value == position), ItExpr.IsAny(), ItExpr.Is(value => value == prefetchCount), + ItExpr.Is(value => value == prefetchSizeInBytes), ItExpr.Is(value => value == ownerLevel), ItExpr.Is(value => value == trackLastEvent), ItExpr.Is(value => value == cancellationSource.Token)) @@ -421,7 +423,7 @@ public async Task OpenConsumerLinkAsyncRequestsTheLink() .Returns(Task.CompletedTask) .Verifiable(); - var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), prefetchCount, ownerLevel, trackLastEvent, cancellationSource.Token); + var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), prefetchCount, prefetchSizeInBytes, ownerLevel, trackLastEvent, cancellationSource.Token); Assert.That(link, Is.EqualTo(mockLink), "The mock return was incorrect"); mockScope.VerifyAll(); @@ -441,6 +443,7 @@ public async Task OpenConsumerLinkAsyncConfiguresTheLink() var partitionId = "0"; var ownerLevel = 459; var prefetchCount = 697U; + var prefetchSizeInBytes = 12342L; var trackLastEvent = true; var position = EventPosition.Latest; var credential = new Mock(Mock.Of(), "{namespace}.servicebus.windows.net"); @@ -485,7 +488,7 @@ public async Task OpenConsumerLinkAsyncConfiguresTheLink() ItExpr.IsAny()) .Returns(Task.CompletedTask); - var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), prefetchCount, ownerLevel, trackLastEvent, cancellationSource.Token); + var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), prefetchCount, prefetchSizeInBytes, ownerLevel, trackLastEvent, cancellationSource.Token); Assert.That(link, Is.Not.Null, "The link produced was null"); var linkSource = (Source)link.Settings.Source; @@ -515,6 +518,7 @@ public async Task OpenConsumerLinkAsyncRespectsTheOwnerLevelOption() var partitionId = "0"; var ownerLevel = default(long?); var prefetchCount = 697U; + var prefetchSizeInBytes = 12342L; var trackLastEvent = false; var position = EventPosition.Latest; var credential = new Mock(Mock.Of(), "{namespace}.servicebus.windows.net"); @@ -559,7 +563,7 @@ public async Task OpenConsumerLinkAsyncRespectsTheOwnerLevelOption() ItExpr.IsAny()) .Returns(Task.CompletedTask); - var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), prefetchCount, ownerLevel, trackLastEvent, cancellationSource.Token); + var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), prefetchCount, prefetchSizeInBytes, ownerLevel, trackLastEvent, cancellationSource.Token); Assert.That(link, Is.Not.Null, "The link produced was null"); Assert.That(link.GetSettingPropertyOrDefault(AmqpProperty.OwnerLevel, long.MinValue), Is.EqualTo(long.MinValue), "The owner level should have been used."); } @@ -578,6 +582,7 @@ public async Task OpenConsumerLinkAsyncRespectsTheTrackLastEventOption() var partitionId = "0"; var ownerLevel = 9987; var prefetchCount = 697U; + var prefetchSizeInBytes = 12342L; var trackLastEvent = false; var position = EventPosition.Latest; var credential = new Mock(Mock.Of(), "{namespace}.servicebus.windows.net"); @@ -622,7 +627,7 @@ public async Task OpenConsumerLinkAsyncRespectsTheTrackLastEventOption() ItExpr.IsAny()) .Returns(Task.CompletedTask); - var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), prefetchCount, ownerLevel, trackLastEvent, cancellationSource.Token); + var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), prefetchCount, prefetchSizeInBytes, ownerLevel, trackLastEvent, cancellationSource.Token); Assert.That(link, Is.Not.Null, "The link produced was null"); Assert.That(link.Settings.DesiredCapabilities, Is.Null, "There should have not have been a set of desired capabilities created, as we're not tracking the last event."); } @@ -641,6 +646,7 @@ public async Task OpenConsumerLinkAsyncManagesActiveLinks() var partitionId = "0"; var ownerLevel = 459; var prefetchCount = 697U; + var prefetchSizeInBytes = 12342L; var trackLastEvent = false; var position = EventPosition.Latest; var credential = new Mock(Mock.Of(), "{namespace}.servicebus.windows.net"); @@ -689,7 +695,7 @@ public async Task OpenConsumerLinkAsyncManagesActiveLinks() Assert.That(activeLinks, Is.Not.Null, "The set of active links was null."); Assert.That(activeLinks.Count, Is.Zero, "There should be no active links when none have been created."); - var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), prefetchCount, ownerLevel, trackLastEvent, cancellationSource.Token); + var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), prefetchCount, prefetchSizeInBytes, ownerLevel, trackLastEvent, cancellationSource.Token); Assert.That(link, Is.Not.Null, "The link produced was null"); Assert.That(activeLinks.Count, Is.EqualTo(1), "There should be an active link being tracked."); @@ -778,7 +784,7 @@ public async Task OpenConsumerLinkAsyncConfiguresAuthorizationRefresh() ItExpr.IsAny()) .Returns(Task.CompletedTask); - var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 0, null, false, cancellationSource.Token); + var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 0, null, null, false, cancellationSource.Token); Assert.That(link, Is.Not.Null, "The link produced was null"); var activeLinks = GetActiveLinks(mockScope.Object); @@ -857,7 +863,7 @@ public async Task OpenConsumerLinkAsyncRefreshesAuthorization() ItExpr.IsAny()) .Returns(Task.CompletedTask); - var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 0, null, false, cancellationSource.Token); + var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 0, null, null, false, cancellationSource.Token); Assert.That(link, Is.Not.Null, "The link produced was null"); var activeLinks = GetActiveLinks(mockScope.Object); @@ -1559,7 +1565,7 @@ public async Task DisposeClosesActiveLinks() Assert.That(activeLinks, Is.Not.Null, "The set of active links was null."); Assert.That(activeLinks.Count, Is.Zero, "There should be no active links when none have been created."); - var producerLink = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 0, null, false, cancellationSource.Token); + var producerLink = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 0, null, null, false, cancellationSource.Token); Assert.That(producerLink, Is.Not.Null, "The producer link produced was null"); var managementLink = await mockScope.Object.OpenManagementLinkAsync(TimeSpan.FromDays(1), cancellationSource.Token); @@ -1632,7 +1638,7 @@ public async Task DisposeStopsManagingLinkAuthorizations() Assert.That(managedAuthorizations, Is.Not.Null, "The set of managed authorizations was null."); Assert.That(managedAuthorizations.Count, Is.Zero, "There should be no managed authorizations when none have been created."); - var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 12, 555, true, cancellationSource.Token); + var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, TimeSpan.FromDays(1), 12, 555, 777, true, cancellationSource.Token); Assert.That(link, Is.Not.Null, "The producer link produced was null"); Assert.That(managedAuthorizations.Count, Is.EqualTo(1), "There should be a managed authorization being tracked."); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConsumerTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConsumerTests.cs index c465cfe2a979..bc4e5c66f013 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConsumerTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConsumerTests.cs @@ -43,7 +43,7 @@ public static IEnumerable RetryOptionTestCases() [TestCase("")] public void ConstructorRequiresTheEventHubName(string eventHub) { - Assert.That(() => new AmqpConsumer(eventHub, "$DEFAULT", "0", EventPosition.Earliest, true, null, null, Mock.Of(), Mock.Of(), Mock.Of()), Throws.InstanceOf()); + Assert.That(() => new AmqpConsumer(eventHub, "$DEFAULT", "0", EventPosition.Earliest, true, null, null, null, Mock.Of(), Mock.Of(), Mock.Of()), Throws.InstanceOf()); } /// @@ -55,7 +55,7 @@ public void ConstructorRequiresTheEventHubName(string eventHub) [TestCase("")] public void ConstructorRequiresTheConsumerGroup(string group) { - Assert.That(() => new AmqpConsumer("myHub", group, "0", EventPosition.Earliest, true, null, null, Mock.Of(), Mock.Of(), Mock.Of()), Throws.InstanceOf()); + Assert.That(() => new AmqpConsumer("myHub", group, "0", EventPosition.Earliest, true, null, null, null, Mock.Of(), Mock.Of(), Mock.Of()), Throws.InstanceOf()); } /// @@ -67,7 +67,7 @@ public void ConstructorRequiresTheConsumerGroup(string group) [TestCase("")] public void ConstructorRequiresThePartition(string partition) { - Assert.That(() => new AmqpConsumer("aHub", "$DEFAULT", partition, EventPosition.Earliest, true, null, null, Mock.Of(), Mock.Of(), Mock.Of()), Throws.InstanceOf()); + Assert.That(() => new AmqpConsumer("aHub", "$DEFAULT", partition, EventPosition.Earliest, true, null, null, null, Mock.Of(), Mock.Of(), Mock.Of()), Throws.InstanceOf()); } /// @@ -77,7 +77,7 @@ public void ConstructorRequiresThePartition(string partition) [Test] public void ConstructorRequiresTheConnectionScope() { - Assert.That(() => new AmqpConsumer("theMostAwesomeHubEvar", "$DEFAULT", "0", EventPosition.FromSequenceNumber(123), true, null, null, null, Mock.Of(), Mock.Of()), Throws.ArgumentNullException); + Assert.That(() => new AmqpConsumer("theMostAwesomeHubEvar", "$DEFAULT", "0", EventPosition.FromSequenceNumber(123), true, null, null, null, null, Mock.Of(), Mock.Of()), Throws.ArgumentNullException); } /// @@ -87,7 +87,7 @@ public void ConstructorRequiresTheConnectionScope() [Test] public void ConstructorRequiresTheMessageConverter() { - Assert.That(() => new AmqpConsumer("theMostAwesomeHubEvar", "$DEFAULT", "0", EventPosition.FromSequenceNumber(123), true, null, null, Mock.Of(), null, Mock.Of()), Throws.ArgumentNullException); + Assert.That(() => new AmqpConsumer("theMostAwesomeHubEvar", "$DEFAULT", "0", EventPosition.FromSequenceNumber(123), true, null, null, null, Mock.Of(), null, Mock.Of()), Throws.ArgumentNullException); } /// @@ -97,7 +97,7 @@ public void ConstructorRequiresTheMessageConverter() [Test] public void ConstructorRequiresTheRetryPolicy() { - Assert.That(() => new AmqpConsumer("theMostAwesomeHubEvar", "$DEFAULT", "0", EventPosition.Latest, true, null, null, Mock.Of(), Mock.Of(), null), Throws.ArgumentNullException); + Assert.That(() => new AmqpConsumer("theMostAwesomeHubEvar", "$DEFAULT", "0", EventPosition.Latest, true, null, null, null, Mock.Of(), Mock.Of(), null), Throws.ArgumentNullException); } /// @@ -108,7 +108,7 @@ public void ConstructorRequiresTheRetryPolicy() [Test] public async Task CloseMarksTheConsumerAsClosed() { - var consumer = new AmqpConsumer("aHub", "$DEFAULT", "0", EventPosition.Earliest, true, null, null, Mock.Of(), Mock.Of(), Mock.Of()); + var consumer = new AmqpConsumer("aHub", "$DEFAULT", "0", EventPosition.Earliest, true, null, null, null, Mock.Of(), Mock.Of(), Mock.Of()); Assert.That(consumer.IsClosed, Is.False, "The consumer should not be closed on creation"); await consumer.CloseAsync(CancellationToken.None); @@ -123,7 +123,7 @@ public async Task CloseMarksTheConsumerAsClosed() [Test] public void CloseRespectsTheCancellationToken() { - var consumer = new AmqpConsumer("aHub", "$DEFAULT", "0", EventPosition.Earliest, true, null, null, Mock.Of(), Mock.Of(), Mock.Of()); + var consumer = new AmqpConsumer("aHub", "$DEFAULT", "0", EventPosition.Earliest, true, null, null, null, Mock.Of(), Mock.Of(), Mock.Of()); using var cancellationSource = new CancellationTokenSource(); cancellationSource.Cancel(); @@ -154,7 +154,7 @@ public void ReceiveAsyncValidatesTheMaximumMessageCount(int count) using var cancellationSource = new CancellationTokenSource(); - var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, true, null, null, mockScope.Object, Mock.Of(), retryPolicy); + var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, true, null, null, null, mockScope.Object, Mock.Of(), retryPolicy); Assert.That(async () => await consumer.ReceiveAsync(count, null, cancellationSource.Token), Throws.InstanceOf()); } @@ -179,7 +179,7 @@ public void ReceiveAsyncRespectsTheCancellationTokenIfSetWhenCalled() using var cancellationSource = new CancellationTokenSource(); cancellationSource.Cancel(); - var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, true, null, null, mockScope.Object, Mock.Of(), retryPolicy); + var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, true, null, null, null, mockScope.Object, Mock.Of(), retryPolicy); Assert.That(async () => await consumer.ReceiveAsync(100, null, cancellationSource.Token), Throws.InstanceOf()); } @@ -220,11 +220,12 @@ public void ReceiveAsyncAppliesTheRetryPolicy(EventHubsRetryOptions retryOptions It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny())) .Throws(retriableException); - var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, trackLastEnqueued, ownerLevel, null, mockScope.Object, Mock.Of(), retryPolicy); + var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, trackLastEnqueued, ownerLevel, null, null, mockScope.Object, Mock.Of(), retryPolicy); Assert.That(async () => await consumer.ReceiveAsync(100, null, cancellationSource.Token), Throws.InstanceOf(retriableException.GetType())); mockScope @@ -234,6 +235,7 @@ public void ReceiveAsyncAppliesTheRetryPolicy(EventHubsRetryOptions retryOptions It.Is(value => value == eventPosition), It.IsAny(), It.IsAny(), + It.IsAny(), It.Is(value => value == ownerLevel), It.Is(value => value == trackLastEnqueued), It.IsAny()), @@ -276,11 +278,12 @@ public void ReceiveAsyncConsidersOperationCanceledExceptionAsRetriable(EventHubs It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny())) .Throws(retriableException); - var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, trackLastEnqueued, ownerLevel, null, mockScope.Object, Mock.Of(), retryPolicy); + var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, trackLastEnqueued, ownerLevel, null, null, mockScope.Object, Mock.Of(), retryPolicy); Assert.That(async () => await consumer.ReceiveAsync(100, null, cancellationSource.Token), Throws.InstanceOf(retriableException.GetType())); mockScope @@ -290,6 +293,7 @@ public void ReceiveAsyncConsidersOperationCanceledExceptionAsRetriable(EventHubs It.Is(value => value == eventPosition), It.IsAny(), It.IsAny(), + It.IsAny(), It.Is(value => value == ownerLevel), It.Is(value => value == trackLastEnqueued), It.IsAny()), @@ -333,11 +337,12 @@ public void ReceiveAsyncAppliesTheRetryPolicyForAmqpErrors(EventHubsRetryOptions It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny())) .Throws(retriableException); - var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, trackLastEnqueued, ownerLevel, null, mockScope.Object, Mock.Of(), retryPolicy); + var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, trackLastEnqueued, ownerLevel, null, null, mockScope.Object, Mock.Of(), retryPolicy); Assert.That(async () => await consumer.ReceiveAsync(100, null, cancellationSource.Token), Throws.InstanceOf(retriableException.GetType())); mockScope @@ -347,6 +352,7 @@ public void ReceiveAsyncAppliesTheRetryPolicyForAmqpErrors(EventHubsRetryOptions It.Is(value => value == eventPosition), It.IsAny(), It.IsAny(), + It.IsAny(), It.Is(value => value == ownerLevel), It.Is(value => value == trackLastEnqueued), It.IsAny()), @@ -388,11 +394,12 @@ public void ReceiveAsyncDetectsAnEmbeddedErrorForOperationCanceled() It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny())) .Throws(embeddedException); - var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, trackLastEnqueued, ownerLevel, null, mockScope.Object, Mock.Of(), retryPolicy); + var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, trackLastEnqueued, ownerLevel, null, null, mockScope.Object, Mock.Of(), retryPolicy); Assert.That(async () => await consumer.ReceiveAsync(100, null, cancellationSource.Token), Throws.InstanceOf()); mockScope @@ -402,6 +409,7 @@ public void ReceiveAsyncDetectsAnEmbeddedErrorForOperationCanceled() It.Is(value => value == eventPosition), It.IsAny(), It.IsAny(), + It.IsAny(), It.Is(value => value == ownerLevel), It.Is(value => value == trackLastEnqueued), It.IsAny()), @@ -443,11 +451,12 @@ public void ReceiveAsyncDetectsAnEmbeddedAmqpErrorForOperationCanceled() It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny())) .Throws(embeddedException); - var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, trackLastEnqueued, ownerLevel, null, mockScope.Object, Mock.Of(), retryPolicy); + var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, trackLastEnqueued, ownerLevel, null, null, mockScope.Object, Mock.Of(), retryPolicy); Assert.That(async () => await consumer.ReceiveAsync(100, null, cancellationSource.Token), Throws.InstanceOf()); mockScope @@ -457,6 +466,7 @@ public void ReceiveAsyncDetectsAnEmbeddedAmqpErrorForOperationCanceled() It.Is(value => value == eventPosition), It.IsAny(), It.IsAny(), + It.IsAny(), It.Is(value => value == ownerLevel), It.Is(value => value == trackLastEnqueued), It.IsAny()), @@ -484,7 +494,7 @@ public async Task ReceiveAsyncValidatesClosed() using var cancellationSource = new CancellationTokenSource(); - var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, true, null, null, mockScope.Object, Mock.Of(), retryPolicy); + var consumer = new AmqpConsumer(eventHub, consumerGroup, partition, eventPosition, true, null, null, null, mockScope.Object, Mock.Of(), retryPolicy); await consumer.CloseAsync(cancellationSource.Token); Assert.That(async () => await consumer.ReceiveAsync(100, null, cancellationSource.Token), Throws.InstanceOf().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.ClientClosed)); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Azure.Messaging.EventHubs.Tests.csproj b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Azure.Messaging.EventHubs.Tests.csproj index 56b09260efbd..2db4a1b05c4b 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Azure.Messaging.EventHubs.Tests.csproj +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Azure.Messaging.EventHubs.Tests.csproj @@ -13,19 +13,19 @@ + + + + + - - + - - + - - - - + diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionTests.cs index 5cab2995be99..9aeb7e103c16 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionTests.cs @@ -842,7 +842,8 @@ public override TransportConsumer CreateConsumer(string consumerGroup, EventHubsRetryPolicy retryPolicy, bool trackLastEnqueuedEventProperties = true, long? ownerLevel = default, - uint? prefetchCount = default) + uint? prefetchCount = default, + long? prefechSize = default) { CreateConsumerCalledWith = (consumerGroup, partitionId, eventPosition, retryPolicy, trackLastEnqueuedEventProperties, ownerLevel, prefetchCount); return default; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs index ee761e08b20b..c1e04f49a1eb 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs @@ -318,6 +318,45 @@ public async Task ConsumerCanReadBatchOfEventsWithCustomPrefetchAndBatchCounts() } } + /// + /// Verifies that the is able to + /// connect to the Event Hubs service and perform operations. + /// + /// + [Test] + public async Task ConsumerCanReadBatchOfEventsWithCustomPrefetchAndBatchCountsAndPrefetchSize() + { + await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) + { + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName); + var sourceEvents = EventGenerator.CreateEvents(200).ToList(); + + await using (var consumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, connectionString)) + { + var partition = (await consumer.GetPartitionIdsAsync(cancellationSource.Token)).First(); + await SendEventsAsync(connectionString, sourceEvents, new CreateBatchOptions { PartitionId = partition }, cancellationSource.Token); + + // Read the events and validate the resulting state. + + var readOptions = new ReadEventOptions { PrefetchCount = 150, CacheEventCount = 50, PrefetchSizeInBytes = 128 }; + var readState = await ReadEventsFromPartitionAsync(consumer, partition, sourceEvents.Count, cancellationSource.Token, readOptions: readOptions); + Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled."); + + foreach (var sourceEvent in sourceEvents) + { + var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); + } + } + + cancellationSource.Cancel(); + } + } + /// /// Verifies that the is able to /// connect to the Event Hubs service and perform operations. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientTests.cs index 062ec708fc93..4fea56334cbf 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientTests.cs @@ -524,7 +524,7 @@ public async Task ReadEventsFromPartitionAsyncRespectsThePrefetchCount() .Returns(Task.FromResult(new[] { "0", "1" })); mockConnection - .Setup(conn => conn.CreateTransportConsumer(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Setup(conn => conn.CreateTransportConsumer(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns(transportConsumer); await using var enumerator = consumer.ReadEventsFromPartitionAsync("0", EventPosition.FromOffset(12), options).GetAsyncEnumerator(); @@ -538,7 +538,8 @@ public async Task ReadEventsFromPartitionAsyncRespectsThePrefetchCount() It.IsAny(), It.IsAny(), It.IsAny(), - (uint)options.PrefetchCount), + (uint)options.PrefetchCount, + It.IsAny()), Times.Once, "The transport consumer should have been created with the configured prefetch count."); } @@ -1258,7 +1259,7 @@ public async Task ReadEventsAsyncWithOptionsRespectsThePrefetchCount() .Returns(Task.FromResult(new[] { "0", "1" })); mockConnection - .Setup(conn => conn.CreateTransportConsumer(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Setup(conn => conn.CreateTransportConsumer(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns(transportConsumer); await using var enumerator = consumer.ReadEventsAsync(options).GetAsyncEnumerator(); @@ -1272,7 +1273,8 @@ public async Task ReadEventsAsyncWithOptionsRespectsThePrefetchCount() It.IsAny(), It.IsAny(), It.IsAny(), - (uint)options.PrefetchCount), + (uint)options.PrefetchCount, + It.IsAny()), Times.AtLeastOnce(), "The transport consumer should have been created with the configured prefetch count."); } @@ -2453,7 +2455,8 @@ internal override TransportConsumer CreateTransportConsumer(string consumerGroup EventHubsRetryPolicy retryPolicy, bool trackLastEnqueuedEventProperties = true, long? ownerLevel = default, - uint? prefetchCount = default) => TransportConsumerFactory(); + uint? prefetchCount = default, + long? prefetchSizeInBytes = default) => TransportConsumerFactory(); internal override TransportClient CreateTransportClient(string fullyQualifiedNamespace, string eventHubName, diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/ReadOptionsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/ReadOptionsTests.cs index 3cb9a118ba92..549ad21f6201 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/ReadOptionsTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/ReadOptionsTests.cs @@ -29,7 +29,8 @@ public void CloneProducesACopy() TrackLastEnqueuedEventProperties = false, MaximumWaitTime = TimeSpan.FromMinutes(65), CacheEventCount = 1, - PrefetchCount = 0 + PrefetchCount = 0, + PrefetchSizeInBytes = 0 }; ReadEventOptions clone = options.Clone(); @@ -40,6 +41,7 @@ public void CloneProducesACopy() Assert.That(clone.MaximumWaitTime, Is.EqualTo(options.MaximumWaitTime), "The maximum wait time of the clone should match."); Assert.That(clone.CacheEventCount, Is.EqualTo(options.CacheEventCount), "The event cache count of the clone should match."); Assert.That(clone.PrefetchCount, Is.EqualTo(options.PrefetchCount), "The prefetch count of the clone should match."); + Assert.That(clone.PrefetchSizeInBytes, Is.EqualTo(options.PrefetchSizeInBytes), "The prefetch size of the clone should match."); } /// @@ -97,5 +99,38 @@ public void PrefetchCountAllowsZero() { Assert.That(() => new ReadEventOptions { PrefetchCount = 0 }, Throws.Nothing); } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchSizeInBytesIsValidated() + { + Assert.That(() => new ReadEventOptions { PrefetchSizeInBytes = -1 }, Throws.InstanceOf()); + } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchSizeInBytesAllowsZero() + { + Assert.That(() => new ReadEventOptions { PrefetchSizeInBytes = 0 }, Throws.Nothing); + } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchSizeInBytesAllowsNull() + { + Assert.That(() => new ReadEventOptions { PrefetchSizeInBytes = null }, Throws.Nothing); + } } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorOptionsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorOptionsTests.cs index a650a2eeb197..01734cdf07e8 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorOptionsTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorOptionsTests.cs @@ -48,6 +48,7 @@ public void CloneProducesACopy() Assert.That(clone.RetryOptions, Is.Not.SameAs(options.RetryOptions), "The retry options of the clone should be a copy, not the same instance."); Assert.That(clone.MaximumWaitTime, Is.EqualTo(options.MaximumWaitTime), "The maximum wait time should match."); Assert.That(clone.PrefetchCount, Is.EqualTo(options.PrefetchCount), "The prefetch count should match."); + Assert.That(clone.PrefetchSizeInBytes, Is.EqualTo(options.PrefetchSizeInBytes), "The prefetch size should match."); Assert.That(clone.LoadBalancingUpdateInterval, Is.EqualTo(options.LoadBalancingUpdateInterval), "The load balancing update interval should match."); Assert.That(clone.PartitionOwnershipExpirationInterval, Is.EqualTo(options.PartitionOwnershipExpirationInterval), "The partition ownership interval should match."); Assert.That(clone.Identifier, Is.EqualTo(options.Identifier), "The identifier should match."); @@ -129,6 +130,42 @@ public void PrefetchCountAllowsZero() Assert.That(() => new EventProcessorOptions { PrefetchCount = 0 }, Throws.Nothing); } + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + [TestCase(-1)] + [TestCase(-10)] + [TestCase(-100)] + public void PrefetchSizeInBytesIsValidated(int count) + { + Assert.That(() => new EventProcessorOptions { PrefetchSizeInBytes = count }, Throws.InstanceOf()); + } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchSizeInBytesAllowsZero() + { + Assert.That(() => new EventProcessorOptions { PrefetchSizeInBytes = 0 }, Throws.Nothing); + } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchSizeInBytesAllowsNull() + { + Assert.That(() => new EventProcessorOptions { PrefetchSizeInBytes = null }, Throws.Nothing); + } + /// /// Verifies functionality of the /// property. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.PartitionProcessing.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.PartitionProcessing.cs old mode 100755 new mode 100644 index 8bc8f7ce0214..6fa2d88d79d9 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.PartitionProcessing.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.PartitionProcessing.cs @@ -382,7 +382,7 @@ public async Task CreatePartitionProcessorCreatesTheTransportConsumer() var partition = new EventProcessorPartition { PartitionId = "99" }; var position = EventPosition.FromOffset(12); - var options = new EventProcessorOptions { TrackLastEnqueuedEventProperties = false, PrefetchCount = 37, LoadBalancingUpdateInterval = TimeSpan.FromMinutes(1) }; + var options = new EventProcessorOptions { TrackLastEnqueuedEventProperties = false, PrefetchCount = 37, PrefetchSizeInBytes = 44, LoadBalancingUpdateInterval = TimeSpan.FromMinutes(1) }; var completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var mockConnection = new Mock(); var mockConsumer = new Mock(); @@ -402,7 +402,7 @@ public async Task CreatePartitionProcessorCreatesTheTransportConsumer() .Returns(Task.CompletedTask); mockConnection - .Setup(connection => connection.CreateTransportConsumer(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Setup(connection => connection.CreateTransportConsumer(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns(mockConsumer.Object); var partitionProcessor = mockProcessor.Object.CreatePartitionProcessor(partition, position, cancellationSource); @@ -418,7 +418,8 @@ public async Task CreatePartitionProcessorCreatesTheTransportConsumer() It.IsAny(), options.TrackLastEnqueuedEventProperties, 0, - (uint?)options.PrefetchCount), + (uint?)options.PrefetchCount, + options.PrefetchSizeInBytes), Times.Once); cancellationSource.Cancel(); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/PartitionReceiverLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/PartitionReceiverLiveTests.cs index 66e0250a7729..193e5524c6db 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/PartitionReceiverLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/PartitionReceiverLiveTests.cs @@ -45,7 +45,7 @@ public async Task ReceiverWithNoOptionsCanRead(EventHubsTransportType transportT await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) { using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName); @@ -76,7 +76,7 @@ public async Task ReceiverWithOptionsCanRead(EventHubsTransportType transportTyp await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) { using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); var options = new PartitionReceiverOptions(); options.RetryOptions.MaximumRetries = 7; @@ -223,7 +223,7 @@ public async Task ReceiverCanReadBatchOfZeroLengthEvents() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -259,7 +259,48 @@ public async Task ReceiverCanReadBatchOfEvents() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); + Assert.That(sourceEvent.IsEquivalentTo(readEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); + } + } + + cancellationSource.Cancel(); + } + } + + /// + /// Verifies that the is able to + /// connect to the Event Hubs service and perform operations. + /// + /// + [Test] + public async Task ReceiverCanReadBatchOfEventsWithPrefetchSize() + { + await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) + { + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName); + var sourceEvents = EventGenerator.CreateEvents(200).ToList(); + + var partition = (await QueryPartitionsAsync(connectionString, cancellationSource.Token)).First(); + await SendEventsAsync(connectionString, sourceEvents, new CreateBatchOptions { PartitionId = partition }, cancellationSource.Token); + + var recieverOptions = new PartitionReceiverOptions + { + PrefetchSizeInBytes = 64 + }; + + await using (var receiver = new PartitionReceiver(EventHubConsumerClient.DefaultConsumerGroupName, partition, EventPosition.Earliest, connectionString, recieverOptions)) + { + var readState = await ReadEventsAsync(receiver, sourceEvents.Count, cancellationSource.Token); + Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled."); + + foreach (var sourceEvent in sourceEvents) + { + var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -303,7 +344,7 @@ public async Task ReceiverCanReadEventsWithCustomProperties() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -340,7 +381,7 @@ public async Task ReceiverCanReadEventsUsingAnIdentityCredential() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -376,7 +417,7 @@ public async Task ReceiverCanReadFromEarliest() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -427,7 +468,7 @@ public async Task ReceiverCanReadFromLatest() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -491,7 +532,7 @@ public async Task ReceiverCanReadFromOffset(bool isInclusive) foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -555,7 +596,7 @@ public async Task ReceiverCanReadFromSequenceNumber(bool isInclusive) foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -614,7 +655,7 @@ public async Task ReceiverCanReadFromEnqueuedTime() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -658,10 +699,10 @@ public async Task ReceiverCanReadFromMultipleConsumerGroups() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState[0].Events.TryGetValue(sourceId, out var customReadEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed for the custom receiver group." ); + Assert.That(readState[0].Events.TryGetValue(sourceId, out var customReadEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed for the custom receiver group."); Assert.That(sourceEvent.IsEquivalentTo(customReadEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event for the custom receiver group."); - Assert.That(readState[1].Events.TryGetValue(sourceId, out var defaultReadEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed for the default receiver group." ); + Assert.That(readState[1].Events.TryGetValue(sourceId, out var defaultReadEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed for the default receiver group."); Assert.That(sourceEvent.IsEquivalentTo(defaultReadEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event for the default receiver group."); } } @@ -681,7 +722,7 @@ public async Task ReceiverCannotReadFromInvalidConsumerGroup() await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) { using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); var invalidConsumerGroup = "ThisIsFake"; var partition = (await QueryPartitionsAsync(EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName), cancellationSource.Token)).First(); @@ -829,7 +870,7 @@ public async Task ReceiverCannotReadFromInvalidPartition() await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) { using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); await using (var receiver = new PartitionReceiver(EventHubConsumerClient.DefaultConsumerGroupName, "-1", EventPosition.Earliest, EventHubsTestEnvironment.Instance.EventHubsConnectionString, scope.EventHubName)) { @@ -1065,7 +1106,7 @@ public async Task ExclusiveReceiverSupercedesNonExclusiveActiveReader() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -1122,7 +1163,7 @@ public async Task ReceiverWithHigherOwnerLevelSupercedesActiveReader() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -1392,7 +1433,7 @@ public async Task ReceiverCanRetrievePartitionProperties(EventHubsTransportType [Test] public async Task ReceiverCannotRetrievePartitionPropertiesWhenConnectionIsClosed() { - await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) + await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) { using var cancellationSource = new CancellationTokenSource(); cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/PartitionReceiverOptionsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/PartitionReceiverOptionsTests.cs old mode 100755 new mode 100644 index 1a5eeb28dce2..37eccd640714 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/PartitionReceiverOptionsTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/PartitionReceiverOptionsTests.cs @@ -45,6 +45,7 @@ public void CloneProducesACopy() Assert.That(clone.DefaultMaximumReceiveWaitTime, Is.EqualTo(options.DefaultMaximumReceiveWaitTime), "The maximum wait time should match."); Assert.That(clone.OwnerLevel, Is.EqualTo(options.OwnerLevel), "The owner level of the clone should match."); Assert.That(clone.PrefetchCount, Is.EqualTo(options.PrefetchCount), "The prefetch count should match."); + Assert.That(clone.PrefetchSizeInBytes, Is.EqualTo(options.PrefetchSizeInBytes), "The prefetch size should match."); Assert.That(clone.TrackLastEnqueuedEventProperties, Is.EqualTo(options.TrackLastEnqueuedEventProperties), "Tracking of last enqueued events should match."); } @@ -97,5 +98,41 @@ public void PrefetchCountIsValidated(int count) { Assert.That(() => new PartitionReceiverOptions { PrefetchCount = count }, Throws.InstanceOf()); } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + [TestCase(-1)] + [TestCase(-10)] + [TestCase(-100)] + public void PrefetchSizeInBytesIsValidated(int count) + { + Assert.That(() => new PartitionReceiverOptions { PrefetchSizeInBytes = count }, Throws.InstanceOf()); + } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchSizeInBytesAllowsNull() + { + Assert.That(() => new PartitionReceiverOptions { PrefetchSizeInBytes = null }, Throws.Nothing); + } + + /// + /// Verifies functionality of the + /// property. + /// + /// + [Test] + public void PrefetchSizeInBytesAllowZero() + { + Assert.That(() => new PartitionReceiverOptions { PrefetchSizeInBytes = 0 }, Throws.Nothing); + } } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/PartitionReceiverTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/PartitionReceiverTests.cs old mode 100755 new mode 100644 index 5ffa975b5e73..96a05ff3dd5e --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/PartitionReceiverTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/PartitionReceiverTests.cs @@ -511,6 +511,7 @@ public void CreateTransportConsumerDelegatesToTheConnection() { OwnerLevel = 99, PrefetchCount = 42, + PrefetchSizeInBytes = 43, TrackLastEnqueuedEventProperties = false }; @@ -524,7 +525,8 @@ public void CreateTransportConsumerDelegatesToTheConnection() expectedRetryPolicy, expectedOptions.TrackLastEnqueuedEventProperties, expectedOptions.OwnerLevel, - (uint?)expectedOptions.PrefetchCount), + (uint?)expectedOptions.PrefetchCount, + expectedOptions.PrefetchSizeInBytes), Times.Once); } @@ -557,7 +559,8 @@ public void ReadLastEnqueuedEventPropertiesDelegatesToTheTransportConsumer() It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny())) .Returns(mockConsumer); var receiver = new PartitionReceiver("cg", "pid", EventPosition.Earliest, mockConnection.Object); @@ -590,7 +593,8 @@ public void ReadLastEnqueuedEventPropertiesAllowsTheOperationWhenTheOptionIsUnse It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny())) .Returns(mockConsumer.Object); var receiver = new PartitionReceiver("cg", "pid", EventPosition.Earliest, mockConnection.Object); @@ -806,7 +810,8 @@ public async Task ReceiveBatchAsyncDelegatesToTheTransportConsumer() It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny())) .Returns(mockConsumer.Object); mockConsumer @@ -851,7 +856,8 @@ public async Task ReceiveBatchAsyncUsesTheDefaultMaximumReceiveWaitTimeWhenNoWai It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny())) .Returns(mockConsumer.Object); mockConsumer @@ -895,7 +901,8 @@ public async Task ReceiveBatchAsyncRespectsTheDefaultMaximumWaitTimeBeingUnset() It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny())) .Returns(mockConsumer.Object); mockConsumer @@ -961,7 +968,8 @@ public async Task CloseAsyncClosesTheTransportConsumer() It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny())) .Returns(mockConsumer.Object); var receiver = new PartitionReceiver("cg", "pid", EventPosition.Earliest, mockConnection.Object); @@ -1005,7 +1013,8 @@ public void CloseAsyncSurfacesExceptionsForTheTransportConsumer() It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny())) .Returns(mockConsumer.Object); var receiver = new PartitionReceiver("cg", "pid", EventPosition.Earliest, mockConnection.Object); @@ -1101,7 +1110,8 @@ public async Task CloseAsyncLogsNormalClose() It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny())) .Returns(mockConsumer.Object); var receiver = new PartitionReceiver("cg", "pid", EventPosition.Earliest, mockConnection.Object); @@ -1158,7 +1168,8 @@ public async Task CloseAsyncLogsErrorDuringClose() It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny())) .Returns(mockConsumer.Object); var receiver = new PartitionReceiver("cg", "pid", EventPosition.Earliest, mockConnection.Object);