diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubBufferedProducerClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubBufferedProducerClient.cs index 1adc9a873ce4..f33932fe4f35 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubBufferedProducerClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubBufferedProducerClient.cs @@ -757,20 +757,13 @@ public virtual async Task EnqueueEventAsync(EventData eventData, AssertValidPartition(partitionId, _partitionHash); } - // Annotate the event with the current time; this is intended to help ensure that - // publishing can apply the maximum wait time correctly and will be removed when - // the event as added to a batch. - - var amqpMessage = eventData.GetRawAmqpMessage(); - amqpMessage.SetEnqueuedTime(GetCurrentTime()); - // If there was a partition key requested, calculate the assigned partition and // annotate the event so that it is preserved by the Event Hubs broker. if (!string.IsNullOrEmpty(partitionKey)) { partitionId = PartitionResolver.AssignForPartitionKey(partitionKey, _partitions); - amqpMessage.SetPartitionKey(partitionKey); + eventData.GetRawAmqpMessage().SetPartitionKey(partitionKey); } // If no partition was assigned, assign one for automatic routing. @@ -939,27 +932,18 @@ public virtual async Task EnqueueEventsAsync(IEnumerable events, // Enumerate the events and enqueue them. - var enqueueTime = GetCurrentTime(); - foreach (var eventData in events) { var eventPartitionId = partitionId; - var amqpMessage = eventData.GetRawAmqpMessage(); // If there is an associated partition key, annotate the event so that it is // preserved by the Event Hubs broker. if (!string.IsNullOrEmpty(partitionKey)) { - amqpMessage.SetPartitionKey(partitionKey); + eventData.GetRawAmqpMessage().SetPartitionKey(partitionKey); } - // Annotate the event with the current time; this is intended to help ensure that - // publishing can apply the maximum wait time correctly and will be removed when - // the event as added to a batch. - - amqpMessage.SetEnqueuedTime(enqueueTime); - // If no partition was assigned, assign one for automatic routing. if (string.IsNullOrEmpty(eventPartitionId)) @@ -2111,7 +2095,7 @@ private Task RunPublishingAsync(CancellationToken cancellationToken) => { // There should be only one instance of this background publishing task running, so it is safe to assume // no other publishing operations are active. Reset the operation cancellation source to ensure that - // any prior cancellation or disposal does not prevent cancelling operations created here. + // any prior cancellation or disposal does not prevent canceling operations created here. var activeOperationCancellationSource = new CancellationTokenSource(); var existingSource = Interlocked.Exchange(ref _activeSendOperationsCancellationSource, activeOperationCancellationSource); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubBufferedProducerClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubBufferedProducerClientTests.cs index cb674d977e9a..c203baaf2bae 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubBufferedProducerClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubBufferedProducerClientTests.cs @@ -1832,8 +1832,7 @@ public async Task EnqueueEventsAsyncEnqueuesForAutomaticRouting() Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); } - cancellationSource.Token.ThrowIfCancellationRequested(); - await Task.Delay(50); + await Task.Delay(10, cancellationSource.Token); } Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length."); @@ -1911,8 +1910,7 @@ public async Task EnqueueEventsAsyncEnqueuesWithAPartitionKey() Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }]."); } - cancellationSource.Token.ThrowIfCancellationRequested(); - await Task.Delay(50); + await Task.Delay(10, cancellationSource.Token); } Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length."); @@ -1981,8 +1979,7 @@ public async Task EnqueueEventsAsyncEnqueuesWithAPartitionAssignment() Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); } - cancellationSource.Token.ThrowIfCancellationRequested(); - await Task.Delay(50); + await Task.Delay(10, cancellationSource.Token); } Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length."); @@ -2063,8 +2060,7 @@ public async Task EnqueueEventsAsyncWaitsWhenFull() Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); } - cancellationSource.Token.ThrowIfCancellationRequested(); - await Task.Delay(50); + await Task.Delay(10, cancellationSource.Token); } await enqueueTask; @@ -2581,6 +2577,13 @@ public async Task EnqueueEventAsyncEnqueuesForAutomaticRouting() .Setup(producer => producer.GetPartitionIdsAsync(It.IsAny())) .ReturnsAsync(partitions); + mockBufferedProducer + .Setup(producer => producer.PublishBatchToPartition( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + mockBufferedProducer.Object.PartitionResolver = mockPartitionResolver.Object; mockBufferedProducer.Object.SendEventBatchFailedAsync += args => Task.CompletedTask; @@ -2602,11 +2605,10 @@ public async Task EnqueueEventAsyncEnqueuesForAutomaticRouting() ++readEventCount; Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued."); - Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); + Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); } - cancellationSource.Token.ThrowIfCancellationRequested(); - await Task.Delay(50); + await Task.Delay(10, cancellationSource.Token); } Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued."); @@ -2684,8 +2686,7 @@ public async Task EnqueueEventAsyncEnqueuesWithAPartitionKey() Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }]."); } - cancellationSource.Token.ThrowIfCancellationRequested(); - await Task.Delay(50); + await Task.Delay(10, cancellationSource.Token); } Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued."); @@ -2754,8 +2755,7 @@ public async Task EnqueueEventAsyncEnqueuesWithAPartitionAssignment() Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); } - cancellationSource.Token.ThrowIfCancellationRequested(); - await Task.Delay(50); + await Task.Delay(10, cancellationSource.Token); } Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued."); @@ -2837,8 +2837,7 @@ public async Task EnqueueEventAsyncWaitsWhenFull() Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); } - cancellationSource.Token.ThrowIfCancellationRequested(); - await Task.Delay(50); + await Task.Delay(10, cancellationSource.Token); } Assert.That(readEventCount, Is.EqualTo(1), "An event should have been available to read.");