Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -757,20 +757,13 @@ public virtual async Task<int> EnqueueEventAsync(EventData eventData,
AssertValidPartition(partitionId, _partitionHash);
}

// Annotate the event with the current time; this is intended to help ensure that
// publishing can apply the maximum wait time correctly and will be removed when
// the event as added to a batch.

var amqpMessage = eventData.GetRawAmqpMessage();
amqpMessage.SetEnqueuedTime(GetCurrentTime());

// If there was a partition key requested, calculate the assigned partition and
// annotate the event so that it is preserved by the Event Hubs broker.

if (!string.IsNullOrEmpty(partitionKey))
{
partitionId = PartitionResolver.AssignForPartitionKey(partitionKey, _partitions);
amqpMessage.SetPartitionKey(partitionKey);
eventData.GetRawAmqpMessage().SetPartitionKey(partitionKey);
}

// If no partition was assigned, assign one for automatic routing.
Expand Down Expand Up @@ -939,27 +932,18 @@ public virtual async Task<int> EnqueueEventsAsync(IEnumerable<EventData> events,

// Enumerate the events and enqueue them.

var enqueueTime = GetCurrentTime();

foreach (var eventData in events)
{
var eventPartitionId = partitionId;
var amqpMessage = eventData.GetRawAmqpMessage();

// If there is an associated partition key, annotate the event so that it is
// preserved by the Event Hubs broker.

if (!string.IsNullOrEmpty(partitionKey))
{
amqpMessage.SetPartitionKey(partitionKey);
eventData.GetRawAmqpMessage().SetPartitionKey(partitionKey);
}

// Annotate the event with the current time; this is intended to help ensure that
// publishing can apply the maximum wait time correctly and will be removed when
// the event as added to a batch.

amqpMessage.SetEnqueuedTime(enqueueTime);

// If no partition was assigned, assign one for automatic routing.

if (string.IsNullOrEmpty(eventPartitionId))
Expand Down Expand Up @@ -2111,7 +2095,7 @@ private Task RunPublishingAsync(CancellationToken cancellationToken) =>
{
// There should be only one instance of this background publishing task running, so it is safe to assume
// no other publishing operations are active. Reset the operation cancellation source to ensure that
// any prior cancellation or disposal does not prevent cancelling operations created here.
// any prior cancellation or disposal does not prevent canceling operations created here.

var activeOperationCancellationSource = new CancellationTokenSource();
var existingSource = Interlocked.Exchange(ref _activeSendOperationsCancellationSource, activeOperationCancellationSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1832,8 +1832,7 @@ public async Task EnqueueEventsAsyncEnqueuesForAutomaticRouting()
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
await Task.Delay(10, cancellationSource.Token);
}

Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length.");
Expand Down Expand Up @@ -1911,8 +1910,7 @@ public async Task EnqueueEventsAsyncEnqueuesWithAPartitionKey()
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
await Task.Delay(10, cancellationSource.Token);
}

Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length.");
Expand Down Expand Up @@ -1981,8 +1979,7 @@ public async Task EnqueueEventsAsyncEnqueuesWithAPartitionAssignment()
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
await Task.Delay(10, cancellationSource.Token);
}

Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length.");
Expand Down Expand Up @@ -2063,8 +2060,7 @@ public async Task EnqueueEventsAsyncWaitsWhenFull()
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
await Task.Delay(10, cancellationSource.Token);
}

await enqueueTask;
Expand Down Expand Up @@ -2581,6 +2577,13 @@ public async Task EnqueueEventAsyncEnqueuesForAutomaticRouting()
.Setup(producer => producer.GetPartitionIdsAsync(It.IsAny<CancellationToken>()))
.ReturnsAsync(partitions);

mockBufferedProducer
.Setup(producer => producer.PublishBatchToPartition(
It.IsAny<EventHubBufferedProducerClient.PartitionPublishingState>(),
It.IsAny<bool>(),
It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);

mockBufferedProducer.Object.PartitionResolver = mockPartitionResolver.Object;
mockBufferedProducer.Object.SendEventBatchFailedAsync += args => Task.CompletedTask;

Expand All @@ -2602,11 +2605,10 @@ public async Task EnqueueEventAsyncEnqueuesForAutomaticRouting()
++readEventCount;

Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
await Task.Delay(10, cancellationSource.Token);
}

Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued.");
Expand Down Expand Up @@ -2684,8 +2686,7 @@ public async Task EnqueueEventAsyncEnqueuesWithAPartitionKey()
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
await Task.Delay(10, cancellationSource.Token);
}

Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued.");
Expand Down Expand Up @@ -2754,8 +2755,7 @@ public async Task EnqueueEventAsyncEnqueuesWithAPartitionAssignment()
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
await Task.Delay(10, cancellationSource.Token);
}

Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued.");
Expand Down Expand Up @@ -2837,8 +2837,7 @@ public async Task EnqueueEventAsyncWaitsWhenFull()
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
await Task.Delay(10, cancellationSource.Token);
}

Assert.That(readEventCount, Is.EqualTo(1), "An event should have been available to read.");
Expand Down