Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

### Bugs Fixed

- Fixed a race condition that could lead to a synchronization primitive being double-released if `IsRunning` was called concurrently while starting or stopping the processor.

- Fixed an issue with event processor validation where an exception for quota exceeded may inappropriately be surfaced when starting the processor.

### Other Changes

- Updated the `Microsoft.Azure.Amqp` dependency to 2.6.4, which enables support for TLS 1.3.
Expand Down
4 changes: 4 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

- Adjusted retries to consider an unreachable host address as terminal. Previously, all socket-based errors were considered transient and would be retried.

- Fixed a race condition that could lead to a synchronization primitive being double-released if `IsRunning` was called concurrently while starting or stopping an event processor.

- Fixed an issue with event processor validation where an exception for quota exceeded may inappropriately be surfaced when starting the processor.

### Other Changes

- Updated the `Microsoft.Azure.Amqp` dependency to 2.6.4, which enables support for TLS 1.3.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using Azure.Messaging.EventHubs.Core;
using Azure.Messaging.EventHubs.Diagnostics;
using Azure.Messaging.EventHubs.Processor;
using Microsoft.Azure.Amqp.Framing;

namespace Azure.Messaging.EventHubs.Primitives
{
Expand Down Expand Up @@ -146,19 +147,16 @@ internal EventProcessorStatus Status

if (_runningProcessorTask == null)
{
try
if (!ProcessorRunningGuard.Wait(100))
{
if (!ProcessorRunningGuard.Wait(100))
{
return (_statusOverride ?? EventProcessorStatus.NotRunning);
}

statusOverride = _statusOverride;
}
finally
{
ProcessorRunningGuard.Release();
return (_statusOverride ?? EventProcessorStatus.NotRunning);
}

// If we reach this point, the semaphore was acquired and should
// be released.

statusOverride = _statusOverride;
ProcessorRunningGuard.Release();
}
else
{
Expand Down Expand Up @@ -1302,7 +1300,6 @@ private async Task StartProcessingInternalAsync(bool async,
Logger.EventProcessorStart(Identifier, EventHubName, ConsumerGroup);

var capturedValidationException = default(Exception);
var releaseGuard = false;

try
{
Expand All @@ -1318,7 +1315,6 @@ private async Task StartProcessingInternalAsync(bool async,
ProcessorRunningGuard.Wait(cancellationToken);
}

releaseGuard = true;
_statusOverride = EventProcessorStatus.Starting;

// If the processor is already running, then it was started before the
Expand Down Expand Up @@ -1420,7 +1416,7 @@ private async Task StartProcessingInternalAsync(bool async,
// If the cancellation token was signaled during the attempt to acquire the
// semaphore, it cannot be safely released; ensure that it is held.

if (releaseGuard)
if (ProcessorRunningGuard.CurrentCount == 0)
{
ProcessorRunningGuard.Release();
}
Expand Down Expand Up @@ -1466,7 +1462,6 @@ private async Task StopProcessingInternalAsync(bool async,
Logger.EventProcessorStop(Identifier, EventHubName, ConsumerGroup);

var processingException = default(Exception);
var releaseGuard = false;

try
{
Expand All @@ -1482,7 +1477,6 @@ private async Task StopProcessingInternalAsync(bool async,
ProcessorRunningGuard.Wait(cancellationToken);
}

releaseGuard = true;
_statusOverride = EventProcessorStatus.Stopping;

// If the processor is not running, then it was never started or has been stopped
Expand Down Expand Up @@ -1585,7 +1579,7 @@ private async Task StopProcessingInternalAsync(bool async,
// If the cancellation token was signaled during the attempt to acquire the
// semaphore, it cannot be safely released; ensure that it is held.

if (releaseGuard)
if (ProcessorRunningGuard.CurrentCount == 0)
{
ProcessorRunningGuard.Release();
}
Expand Down Expand Up @@ -2065,17 +2059,20 @@ private async Task ValidateEventHubsConnectionAsync(CancellationToken cancellati
await using var connectionAwaiter = connection.ConfigureAwait(false);

var properties = await connection.GetPropertiesAsync(RetryPolicy, cancellationToken).ConfigureAwait(false);
var partitionIndex = new Random().Next(0, (properties.PartitionIds.Length - 1));

// To ensure validity of the requested consumer group and that at least one partition exists,
// attempt to read from a partition.

var consumer = CreateConsumer(ConsumerGroup, properties.PartitionIds[0], $"SV-{ Identifier }", EventPosition.Earliest, connection, Options, false);
var consumer = CreateConsumer(ConsumerGroup, properties.PartitionIds[partitionIndex], $"SV-{ Identifier }", EventPosition.Earliest, connection, Options, false);

try
{
await consumer.ReceiveAsync(1, TimeSpan.FromMilliseconds(5), cancellationToken).ConfigureAwait(false);
}
catch (EventHubsException ex) when (ex.Reason == EventHubsException.FailureReason.ConsumerDisconnected)
catch (EventHubsException ex)
when ((ex.Reason == EventHubsException.FailureReason.ConsumerDisconnected)
|| (ex.Reason == EventHubsException.FailureReason.QuotaExceeded))
{
// This is expected when another processor is already running; no action is needed, as it
// validates that the reader was able to connect.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,16 +761,13 @@ public virtual async Task<int> EnqueueEventAsync(EventData eventData,

if ((!IsPublishing) || (_producerManagementTask?.IsCompleted ?? false))
{
var releaseGuard = false;

try
{
if (!_stateGuard.Wait(0, cancellationToken))
{
await _stateGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
}

releaseGuard = true;
Argument.AssertNotClosed(_isClosed, nameof(EventHubBufferedProducerClient));

// StartPublishingAsync will verify that publishing is not already taking
Expand All @@ -781,7 +778,7 @@ public virtual async Task<int> EnqueueEventAsync(EventData eventData,
}
finally
{
if (releaseGuard)
if (_stateGuard.CurrentCount == 0)
{
_stateGuard.Release();
}
Expand Down Expand Up @@ -923,16 +920,13 @@ public virtual async Task<int> EnqueueEventsAsync(IEnumerable<EventData> events,

if ((!IsPublishing) || (_producerManagementTask?.IsCompleted ?? false))
{
var releaseGuard = false;

try
{
if (!_stateGuard.Wait(0, cancellationToken))
{
await _stateGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
}

releaseGuard = true;
Argument.AssertNotClosed(_isClosed, nameof(EventHubBufferedProducerClient));

// StartPublishingAsync will verify that publishing is not already taking
Expand All @@ -943,7 +937,7 @@ public virtual async Task<int> EnqueueEventsAsync(IEnumerable<EventData> events,
}
finally
{
if (releaseGuard)
if (_stateGuard.CurrentCount == 0)
{
_stateGuard.Release();
}
Expand Down Expand Up @@ -1035,15 +1029,11 @@ public virtual async Task FlushAsync(CancellationToken cancellationToken = defau
{
Argument.AssertNotClosed(_isClosed, nameof(EventHubBufferedProducerClient));

var releaseGuard = false;

if (!_stateGuard.Wait(0, cancellationToken))
{
await _stateGuard.WaitAsync(cancellationToken).ConfigureAwait(false);
}

releaseGuard = true;

try
{
Argument.AssertNotClosed(_isClosed, nameof(EventHubBufferedProducerClient));
Expand All @@ -1055,7 +1045,7 @@ public virtual async Task FlushAsync(CancellationToken cancellationToken = defau
}
finally
{
if (releaseGuard)
if (_stateGuard.CurrentCount == 0)
{
_stateGuard.Release();
}
Expand Down Expand Up @@ -1083,7 +1073,6 @@ public virtual async Task CloseAsync(bool flush = true,
return;
}

var releaseGuard = false;
var capturedExceptions = default(List<Exception>);

try
Expand All @@ -1095,8 +1084,6 @@ public virtual async Task CloseAsync(bool flush = true,

// If we've reached this point without an exception, the guard is held.

releaseGuard = true;

if (_isClosed)
{
return;
Expand Down Expand Up @@ -1195,7 +1182,7 @@ public virtual async Task CloseAsync(bool flush = true,
}
finally
{
if (releaseGuard)
if (_stateGuard.CurrentCount == 0)
{
_stateGuard.Release();
}
Expand Down