Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Changed the approach that the event processor uses to validate permissions on startup to ensure that it does not interrupt other processors already running by temporarily asserting ownership of a partition.

### Other Changes

## 5.8.1 (2023-03-09)
Expand Down
2 changes: 2 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Changed the approach that the event processor uses to validate permissions on startup to ensure that it does not interrupt other processors already running by temporarily asserting ownership of a partition.

### Other Changes

## 5.8.1 (2023-03-09)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ public virtual void StopProcessing(CancellationToken cancellationToken = default
/// <param name="eventPosition">The position in the event stream where the consumer should begin reading.</param>
/// <param name="connection">The connection to use for the consumer.</param>
/// <param name="options">The options to use for configuring the consumer.</param>
/// <param name="exclusive"><c>true</c> if this should be an exclusive consumer; otherwise, <c>false</c>.</param>
///
/// <returns>An <see cref="TransportConsumer" /> with the requested configuration.</returns>
///
Expand All @@ -581,8 +582,19 @@ internal virtual TransportConsumer CreateConsumer(string consumerGroup,
string consumerIdentifier,
EventPosition eventPosition,
EventHubConnection connection,
EventProcessorOptions options) =>
connection.CreateTransportConsumer(consumerGroup, partitionId, consumerIdentifier, eventPosition, options.RetryOptions.ToRetryPolicy(), options.TrackLastEnqueuedEventProperties, InvalidateConsumerWhenPartitionIsStolen, prefetchCount: (uint?)options.PrefetchCount, prefetchSizeInBytes: options.PrefetchSizeInBytes, ownerLevel: 0);
EventProcessorOptions options,
bool exclusive) =>
connection.CreateTransportConsumer(
consumerGroup,
partitionId,
consumerIdentifier,
eventPosition,
options.RetryOptions.ToRetryPolicy(),
options.TrackLastEnqueuedEventProperties,
InvalidateConsumerWhenPartitionIsStolen,
prefetchCount: (uint?)options.PrefetchCount,
prefetchSizeInBytes: options.PrefetchSizeInBytes,
ownerLevel: exclusive ? 0 : null);

/// <summary>
/// Performs the tasks needed to process a batch of events.
Expand Down Expand Up @@ -767,7 +779,7 @@ async Task performProcessing()
{
try
{
consumer = CreateConsumer(ConsumerGroup, partition.PartitionId, $"P{ partition.PartitionId }-{ Identifier }", startingPosition, connection, Options);
consumer = CreateConsumer(ConsumerGroup, partition.PartitionId, $"P{ partition.PartitionId }-{ Identifier }", startingPosition, connection, Options, true);

// Register for notification when the cancellation token is triggered. Attempt to close the consumer
// in response to force-close the link and short-circuit any receive operation that is blocked and
Expand Down Expand Up @@ -2029,12 +2041,17 @@ private async Task ValidateEventHubsConnectionAsync(CancellationToken cancellati
// To ensure validity of the requested consumer group and that at least one partition exists,
// attempt to read from a partition.

var consumer = CreateConsumer(ConsumerGroup, properties.PartitionIds[0], "validate", EventPosition.Earliest, connection, Options);
var consumer = CreateConsumer(ConsumerGroup, properties.PartitionIds[0], $"SV-{ Identifier }", EventPosition.Earliest, connection, Options, false);

try
{
await consumer.ReceiveAsync(1, TimeSpan.FromMilliseconds(5), cancellationToken).ConfigureAwait(false);
}
catch (EventHubsException ex) when (ex.Reason == EventHubsException.FailureReason.ConsumerDisconnected)
{
// This is expected when another processor is already running; no action is needed, as it
// validates that the reader was able to connect.
}
finally
{
await consumer.CloseAsync(cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public async Task ReadLastEnqueuedEventPropertiesReadsPropertiesWhenThePartition
.Returns(mockConnection.Object);

mockProcessor
.Setup(processor => processor.CreateConsumer(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<EventPosition>(), It.IsAny<EventHubConnection>(), It.IsAny<EventProcessorOptions>()))
.Setup(processor => processor.CreateConsumer(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<EventPosition>(), It.IsAny<EventHubConnection>(), It.IsAny<EventProcessorOptions>(), It.IsAny<bool>()))
.Returns(Mock.Of<SettableTransportConsumer>());

mockProcessor
Expand Down Expand Up @@ -132,7 +132,7 @@ public async Task ReadLastEnqueuedEventPropertiesThrowsWhenThePartitionIsNotOwne
.Returns(mockConnection.Object);

mockProcessor
.Setup(processor => processor.CreateConsumer(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<EventPosition>(), It.IsAny<EventHubConnection>(), It.IsAny<EventProcessorOptions>()))
.Setup(processor => processor.CreateConsumer(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<EventPosition>(), It.IsAny<EventHubConnection>(), It.IsAny<EventProcessorOptions>(), It.IsAny<bool>()))
.Returns(Mock.Of<SettableTransportConsumer>());

mockProcessor
Expand Down
Loading