Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ async Task ProcessMessage(ProcessMessageEventArgs args)

[Test]
[TestCase(1)]
[TestCase(10)]
[TestCase(20)]
public async Task AutoLockRenewalWorks(int numThreads)
{
Expand Down Expand Up @@ -203,28 +204,32 @@ public async Task AutoLockRenewalWorks(int numThreads)
var completionSourceIndex = -1;

processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;
processor.ProcessErrorAsync += args =>
{
// If the connection drops due to network flakiness
// after the message is received but before we
// complete it, we will get a message lock
// lost exception. We are still able to verify
// that the message will be completed eventually.
var exception = (ServiceBusException)args.Exception;
if (!(args.Exception is ServiceBusException sbEx) ||
sbEx.Reason != ServiceBusException.FailureReason.MessageLockLost)
{
Assert.Fail(args.Exception.ToString());
}
return Task.CompletedTask;
};
await processor.StartProcessingAsync();

async Task ProcessMessage(ProcessMessageEventArgs args)
{
try
{
var message = args.Message;
var lockedUntil = message.LockedUntil;
await Task.Delay(lockDuration);
Assert.That(message.LockedUntil > lockedUntil, $"{lockedUntil},{DateTime.UtcNow}");
await args.CompleteMessageAsync(message, args.CancellationToken);
Interlocked.Increment(ref messageCt);
}
finally
{
var setIndex = Interlocked.Increment(ref completionSourceIndex);
if (setIndex < numThreads)
{
completionSources[setIndex].SetResult(true);
}
}
var message = args.Message;
var lockedUntil = message.LockedUntil;
await Task.Delay(lockDuration);
await args.CompleteMessageAsync(message, args.CancellationToken);
Interlocked.Increment(ref messageCt);
var setIndex = Interlocked.Increment(ref completionSourceIndex);
completionSources[setIndex].SetResult(true);
}
await Task.WhenAll(completionSources.Select(source => source.Task));
Assert.IsTrue(processor.IsProcessing);
Expand Down Expand Up @@ -278,8 +283,10 @@ public async Task MaxAutoLockRenewalDurationRespected(int numThreads, int autoLo
async Task ProcessMessage(ProcessMessageEventArgs args)
{
var message = args.Message;
// wait 2x lock duration in case the
// lock was renewed already
await Task.Delay(lockDuration.Add(lockDuration));
var lockedUntil = message.LockedUntil;
await Task.Delay(lockDuration.Add(TimeSpan.FromSeconds(1)));
if (!args.CancellationToken.IsCancellationRequested)
{
// only do the assertion if cancellation wasn't requested as otherwise
Expand Down Expand Up @@ -405,7 +412,7 @@ Task ProcessErrors(ProcessErrorEventArgs args)
{
await processor.StartProcessingAsync();
var stopwatch = Stopwatch.StartNew();
while (stopwatch.Elapsed.TotalSeconds <= 20)
while (stopwatch.Elapsed.TotalSeconds <= 30)
{
if (exceptionReceivedHandlerCalled)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ Task ErrorHandler(ProcessErrorEventArgs args)
}
await processor.StartProcessingAsync();
var stopwatch = Stopwatch.StartNew();
while (stopwatch.Elapsed.TotalSeconds <= 10)
while (stopwatch.Elapsed.TotalSeconds <= 30)
{
if (exceptionReceivedHandlerCalled)
{
Expand Down Expand Up @@ -440,7 +440,7 @@ Task ErrorHandler(ProcessErrorEventArgs args)
}
await processor.StartProcessingAsync();
var stopwatch = Stopwatch.StartNew();
while (stopwatch.Elapsed.TotalSeconds <= 10)
while (stopwatch.Elapsed.TotalSeconds <= 30)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it may have some non-determinism due to the timings. You may want to consider setting a TaskCompletionSource in your exception handler and awaiting that here with a Task.WaitAny timeout. I'd also advise allowing for a few minutes in that timeout due to how things run in CI. (Event Hubs had some issues with shorter timeouts like this when parallelism caused a test to suspend for longer than we expected)

To illustrate, I'm thinking something like:

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(ServiceBusTestEnvironment.Instance.TestExecutionTimeLimit);

// stuff...

await Task.WhenAny(exceptionHandlerCalledSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");
cancellationSource.Cancel();

// stuff...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair - this was copied wholesale from Track 1. If there is any more flakiness I will make this update.

{
if (exceptionReceivedHandlerCalled)
{
Expand Down Expand Up @@ -569,28 +569,32 @@ public async Task AutoLockRenewalWorks(int numThreads)
var completionSourceIndex = -1;

processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;
processor.ProcessErrorAsync += args =>
{
// If the connection drops due to network flakiness
// after the message is received but before we
// complete it, we will get a session lock
// lost exception. We are still able to verify
// that the message will be completed eventually.
var exception = (ServiceBusException)args.Exception;
if (!(args.Exception is ServiceBusException sbEx) ||
sbEx.Reason != ServiceBusException.FailureReason.SessionLockLost)
{
Assert.Fail(args.Exception.ToString());
}
return Task.CompletedTask;
};
await processor.StartProcessingAsync();

async Task ProcessMessage(ProcessSessionMessageEventArgs args)
{
try
{
var message = args.Message;
var lockedUntil = args.SessionLockedUntil;
await Task.Delay(lockDuration);
Assert.That(args.SessionLockedUntil > lockedUntil, $"{lockedUntil},{DateTime.UtcNow}");
await args.CompleteMessageAsync(message, args.CancellationToken);
Interlocked.Increment(ref messageCt);
}
finally
{
var setIndex = Interlocked.Increment(ref completionSourceIndex);
if (setIndex < numThreads)
{
completionSources[setIndex].SetResult(true);
}
}
var message = args.Message;
var lockedUntil = args.SessionLockedUntil;
await Task.Delay(lockDuration);
await args.CompleteMessageAsync(message, args.CancellationToken);
Interlocked.Increment(ref messageCt);
var setIndex = Interlocked.Increment(ref completionSourceIndex);
completionSources[setIndex].SetResult(true);
}
await Task.WhenAll(completionSources.Select(source => source.Task));
await processor.StopProcessingAsync();
Expand Down Expand Up @@ -659,10 +663,10 @@ Task SessionErrorHandler(ProcessErrorEventArgs eventArgs)
async Task ProcessMessage(ProcessSessionMessageEventArgs args)
{
var message = args.Message;
var lockedUntil = args.SessionLockedUntil;
// wait 2x lock duration in case the
// lock was renewed already
await Task.Delay(lockDuration.Add(lockDuration));
var lockedUntil = args.SessionLockedUntil;
if (!args.CancellationToken.IsCancellationRequested)
{
// only do the assertion if cancellation wasn't requested as otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,8 @@ public async Task MaxWaitTimeRespected()
{
RetryOptions = new ServiceBusRetryOptions
{
TryTimeout = TimeSpan.FromSeconds(20)
TryTimeout = TimeSpan.FromSeconds(20),
MaxRetries = 0
}
});

Expand Down