diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md index 01ca9777dc08..ebed42792936 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md @@ -8,6 +8,10 @@ ### Bugs Fixed +- Fixed a race condition that could lead to a synchronization primitive being double-released if `IsRunning` was called concurrently while starting or stopping the processor. + +- Fixed an issue with event processor validation where an exception for quota exceeded may inappropriately be surfaced when starting the processor. + ### Other Changes - Updated the `Microsoft.Azure.Amqp` dependency to 2.6.4, which enables support for TLS 1.3. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md index 5cafe4b4b564..a1f1b288b7f6 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md @@ -10,6 +10,10 @@ - Adjusted retries to consider an unreachable host address as terminal. Previously, all socket-based errors were considered transient and would be retried. +- Fixed a race condition that could lead to a synchronization primitive being double-released if `IsRunning` was called concurrently while starting or stopping an event processor. + +- Fixed an issue with event processor validation where an exception for quota exceeded may inappropriately be surfaced when starting the processor. + ### Other Changes - Updated the `Microsoft.Azure.Amqp` dependency to 2.6.4, which enables support for TLS 1.3. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs index aca6a427fedc..db9ec6544e44 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs @@ -20,6 +20,7 @@ using Azure.Messaging.EventHubs.Core; using Azure.Messaging.EventHubs.Diagnostics; using Azure.Messaging.EventHubs.Processor; +using Microsoft.Azure.Amqp.Framing; namespace Azure.Messaging.EventHubs.Primitives { @@ -146,19 +147,16 @@ internal EventProcessorStatus Status if (_runningProcessorTask == null) { - try + if (!ProcessorRunningGuard.Wait(100)) { - if (!ProcessorRunningGuard.Wait(100)) - { - return (_statusOverride ?? EventProcessorStatus.NotRunning); - } - - statusOverride = _statusOverride; - } - finally - { - ProcessorRunningGuard.Release(); + return (_statusOverride ?? EventProcessorStatus.NotRunning); } + + // If we reach this point, the semaphore was acquired and should + // be released. + + statusOverride = _statusOverride; + ProcessorRunningGuard.Release(); } else { @@ -1302,7 +1300,6 @@ private async Task StartProcessingInternalAsync(bool async, Logger.EventProcessorStart(Identifier, EventHubName, ConsumerGroup); var capturedValidationException = default(Exception); - var releaseGuard = false; try { @@ -1318,7 +1315,6 @@ private async Task StartProcessingInternalAsync(bool async, ProcessorRunningGuard.Wait(cancellationToken); } - releaseGuard = true; _statusOverride = EventProcessorStatus.Starting; // If the processor is already running, then it was started before the @@ -1420,7 +1416,7 @@ private async Task StartProcessingInternalAsync(bool async, // If the cancellation token was signaled during the attempt to acquire the // semaphore, it cannot be safely released; ensure that it is held. - if (releaseGuard) + if (ProcessorRunningGuard.CurrentCount == 0) { ProcessorRunningGuard.Release(); } @@ -1466,7 +1462,6 @@ private async Task StopProcessingInternalAsync(bool async, Logger.EventProcessorStop(Identifier, EventHubName, ConsumerGroup); var processingException = default(Exception); - var releaseGuard = false; try { @@ -1482,7 +1477,6 @@ private async Task StopProcessingInternalAsync(bool async, ProcessorRunningGuard.Wait(cancellationToken); } - releaseGuard = true; _statusOverride = EventProcessorStatus.Stopping; // If the processor is not running, then it was never started or has been stopped @@ -1585,7 +1579,7 @@ private async Task StopProcessingInternalAsync(bool async, // If the cancellation token was signaled during the attempt to acquire the // semaphore, it cannot be safely released; ensure that it is held. - if (releaseGuard) + if (ProcessorRunningGuard.CurrentCount == 0) { ProcessorRunningGuard.Release(); } @@ -2065,17 +2059,20 @@ private async Task ValidateEventHubsConnectionAsync(CancellationToken cancellati await using var connectionAwaiter = connection.ConfigureAwait(false); var properties = await connection.GetPropertiesAsync(RetryPolicy, cancellationToken).ConfigureAwait(false); + var partitionIndex = new Random().Next(0, (properties.PartitionIds.Length - 1)); // To ensure validity of the requested consumer group and that at least one partition exists, // attempt to read from a partition. - var consumer = CreateConsumer(ConsumerGroup, properties.PartitionIds[0], $"SV-{ Identifier }", EventPosition.Earliest, connection, Options, false); + var consumer = CreateConsumer(ConsumerGroup, properties.PartitionIds[partitionIndex], $"SV-{ Identifier }", EventPosition.Earliest, connection, Options, false); try { await consumer.ReceiveAsync(1, TimeSpan.FromMilliseconds(5), cancellationToken).ConfigureAwait(false); } - catch (EventHubsException ex) when (ex.Reason == EventHubsException.FailureReason.ConsumerDisconnected) + catch (EventHubsException ex) + when ((ex.Reason == EventHubsException.FailureReason.ConsumerDisconnected) + || (ex.Reason == EventHubsException.FailureReason.QuotaExceeded)) { // This is expected when another processor is already running; no action is needed, as it // validates that the reader was able to connect. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubBufferedProducerClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubBufferedProducerClient.cs index 24b9a2737357..09052da5eeac 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubBufferedProducerClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubBufferedProducerClient.cs @@ -761,8 +761,6 @@ public virtual async Task EnqueueEventAsync(EventData eventData, if ((!IsPublishing) || (_producerManagementTask?.IsCompleted ?? false)) { - var releaseGuard = false; - try { if (!_stateGuard.Wait(0, cancellationToken)) @@ -770,7 +768,6 @@ public virtual async Task EnqueueEventAsync(EventData eventData, await _stateGuard.WaitAsync(cancellationToken).ConfigureAwait(false); } - releaseGuard = true; Argument.AssertNotClosed(_isClosed, nameof(EventHubBufferedProducerClient)); // StartPublishingAsync will verify that publishing is not already taking @@ -781,7 +778,7 @@ public virtual async Task EnqueueEventAsync(EventData eventData, } finally { - if (releaseGuard) + if (_stateGuard.CurrentCount == 0) { _stateGuard.Release(); } @@ -923,8 +920,6 @@ public virtual async Task EnqueueEventsAsync(IEnumerable events, if ((!IsPublishing) || (_producerManagementTask?.IsCompleted ?? false)) { - var releaseGuard = false; - try { if (!_stateGuard.Wait(0, cancellationToken)) @@ -932,7 +927,6 @@ public virtual async Task EnqueueEventsAsync(IEnumerable events, await _stateGuard.WaitAsync(cancellationToken).ConfigureAwait(false); } - releaseGuard = true; Argument.AssertNotClosed(_isClosed, nameof(EventHubBufferedProducerClient)); // StartPublishingAsync will verify that publishing is not already taking @@ -943,7 +937,7 @@ public virtual async Task EnqueueEventsAsync(IEnumerable events, } finally { - if (releaseGuard) + if (_stateGuard.CurrentCount == 0) { _stateGuard.Release(); } @@ -1035,15 +1029,11 @@ public virtual async Task FlushAsync(CancellationToken cancellationToken = defau { Argument.AssertNotClosed(_isClosed, nameof(EventHubBufferedProducerClient)); - var releaseGuard = false; - if (!_stateGuard.Wait(0, cancellationToken)) { await _stateGuard.WaitAsync(cancellationToken).ConfigureAwait(false); } - releaseGuard = true; - try { Argument.AssertNotClosed(_isClosed, nameof(EventHubBufferedProducerClient)); @@ -1055,7 +1045,7 @@ public virtual async Task FlushAsync(CancellationToken cancellationToken = defau } finally { - if (releaseGuard) + if (_stateGuard.CurrentCount == 0) { _stateGuard.Release(); } @@ -1083,7 +1073,6 @@ public virtual async Task CloseAsync(bool flush = true, return; } - var releaseGuard = false; var capturedExceptions = default(List); try @@ -1095,8 +1084,6 @@ public virtual async Task CloseAsync(bool flush = true, // If we've reached this point without an exception, the guard is held. - releaseGuard = true; - if (_isClosed) { return; @@ -1195,7 +1182,7 @@ public virtual async Task CloseAsync(bool flush = true, } finally { - if (releaseGuard) + if (_stateGuard.CurrentCount == 0) { _stateGuard.Release(); }