Skip to content

Commit f9b7fcc

Browse files
Processor bug (Azure#14280)
1 parent 52cc445 commit f9b7fcc

File tree

5 files changed

+89
-42
lines changed

5 files changed

+89
-42
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -429,47 +429,52 @@ public virtual async Task StartProcessingAsync(
429429
CancellationToken cancellationToken = default)
430430
{
431431
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
432-
if (ActiveReceiveTask == null)
432+
bool releaseGuard = false;
433+
try
433434
{
434-
Logger.StartProcessingStart(Identifier);
435-
bool releaseGuard = false;
435+
await ProcessingStartStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
436+
releaseGuard = true;
436437

437-
try
438+
if (ActiveReceiveTask == null)
438439
{
439-
await ProcessingStartStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
440-
releaseGuard = true;
441-
ValidateMessageHandler();
442-
ValidateErrorHandler();
443-
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
440+
Logger.StartProcessingStart(Identifier);
444441

445-
InitializeReceiverManagers();
442+
try
443+
{
444+
ValidateMessageHandler();
445+
ValidateErrorHandler();
446+
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
446447

447-
// We expect the token source to be null, but we are playing safe.
448+
InitializeReceiverManagers();
448449

449-
RunningTaskTokenSource?.Cancel();
450-
RunningTaskTokenSource?.Dispose();
451-
RunningTaskTokenSource = new CancellationTokenSource();
450+
// We expect the token source to be null, but we are playing safe.
452451

453-
// Start the main running task.
454-
ActiveReceiveTask = RunReceiveTaskAsync(RunningTaskTokenSource.Token);
455-
}
456-
catch (Exception exception)
457-
{
458-
Logger.StartProcessingException(Identifier, exception.ToString());
459-
throw;
460-
}
461-
finally
462-
{
463-
if (releaseGuard)
452+
RunningTaskTokenSource?.Cancel();
453+
RunningTaskTokenSource?.Dispose();
454+
RunningTaskTokenSource = new CancellationTokenSource();
455+
456+
// Start the main running task.
457+
ActiveReceiveTask = RunReceiveTaskAsync(RunningTaskTokenSource.Token);
458+
}
459+
catch (Exception exception)
464460
{
465-
ProcessingStartStopSemaphore.Release();
461+
Logger.StartProcessingException(Identifier, exception.ToString());
462+
throw;
466463
}
464+
465+
Logger.StartProcessingComplete(Identifier);
466+
}
467+
else
468+
{
469+
throw new InvalidOperationException(Resources.RunningMessageProcessorCannotPerformOperation);
467470
}
468-
Logger.StartProcessingComplete(Identifier);
469471
}
470-
else
472+
finally
471473
{
472-
throw new InvalidOperationException(Resources.RunningMessageProcessorCannotPerformOperation);
474+
if (releaseGuard)
475+
{
476+
ProcessingStartStopSemaphore.Release();
477+
}
473478
}
474479
}
475480

@@ -554,19 +559,18 @@ private void ValidateMessageHandler()
554559
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the <see cref="ServiceBusProcessor" /> will keep running.</param>
555560
public virtual async Task StopProcessingAsync(CancellationToken cancellationToken = default)
556561
{
562+
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
557563
bool releaseGuard = false;
558564
try
559565
{
566+
await ProcessingStartStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
567+
releaseGuard = true;
568+
560569
if (ActiveReceiveTask != null)
561570
{
562571
Logger.StopProcessingStart(Identifier);
563572
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
564573

565-
await ProcessingStartStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
566-
releaseGuard = true;
567-
568-
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
569-
570574
// Cancel the current running task.
571575

572576
RunningTaskTokenSource.Cancel();
@@ -594,6 +598,10 @@ await receiverManager.CloseReceiverIfNeeded(
594598
.ConfigureAwait(false);
595599
}
596600
}
601+
else
602+
{
603+
throw new InvalidOperationException(Resources.RunningMessageProcessorCannotPerformOperation);
604+
}
597605
}
598606
catch (Exception exception)
599607
{

sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.Designer.cs

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.resx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,4 +294,7 @@
294294
<data name="SendViaCannotBeUsedWithEntityInConnectionString" xml:space="preserve">
295295
<value>When sending via a different entity, an entity path is not allowed to specified in the connection string.</value>
296296
</data>
297-
</root>
297+
<data name="MessageProcessorIsNotRunning" xml:space="preserve">
298+
<value>The message processor is not currently running. It needs to be started before it can be stopped.</value>
299+
</data>
300+
</root>

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1303,11 +1303,11 @@ async Task MessageHandler(ProcessMessageEventArgs arg)
13031303
}
13041304

13051305
await processor.StartProcessingAsync();
1306-
var cts = new CancellationTokenSource();
1307-
cts.Cancel();
1306+
await processor.StopProcessingAsync();
1307+
13081308
Assert.That(
1309-
async () => await processor.StopProcessingAsync(cts.Token),
1310-
Throws.InstanceOf<TaskCanceledException>());
1309+
async () => await processor.StopProcessingAsync(),
1310+
Throws.InstanceOf<InvalidOperationException>());
13111311

13121312
mockLogger
13131313
.Verify(
@@ -1320,9 +1320,6 @@ async Task MessageHandler(ProcessMessageEventArgs arg)
13201320
processor.Identifier,
13211321
It.IsAny<string>()),
13221322
Times.Once);
1323-
1324-
// actually stop processing
1325-
await processor.StopProcessingAsync();
13261323
}
13271324

13281325
private Mock<ServiceBusConnection> GetMockConnection(Mock<TransportReceiver> mockTransportReceiver)

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License.
33

44
using System;
5+
using System.Collections.Generic;
56
using System.Diagnostics;
67
using System.Linq;
78
using System.Threading;
@@ -414,6 +415,35 @@ Task ProcessErrors(ProcessErrorEventArgs args)
414415
await processor.StopProcessingAsync();
415416
}
416417

418+
[Test]
419+
public void StartStopMultipleTimes()
420+
{
421+
var invalidQueueName = "nonexistentqueuename";
422+
var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
423+
ServiceBusProcessor processor = client.CreateProcessor(invalidQueueName);
424+
TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
425+
processor.ProcessMessageAsync += eventArgs => Task.CompletedTask;
426+
processor.ProcessErrorAsync += eventArgs => Task.CompletedTask;
427+
428+
var startTasks = new List<Task>
429+
{
430+
processor.StartProcessingAsync(),
431+
processor.StartProcessingAsync()
432+
};
433+
Assert.That(
434+
async () => await Task.WhenAll(startTasks),
435+
Throws.InstanceOf<InvalidOperationException>());
436+
437+
var stopTasks = new List<Task>()
438+
{
439+
processor.StopProcessingAsync(),
440+
processor.StopProcessingAsync()
441+
};
442+
Assert.That(
443+
async () => await Task.WhenAll(stopTasks),
444+
Throws.InstanceOf<InvalidOperationException>());
445+
}
446+
417447
[Test]
418448
public async Task CannotAddHandlerWhileProcessorIsRunning()
419449
{

0 commit comments

Comments
 (0)