From e81d2f4ab1308dfa73af81ed63532971cbef2361 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Fri, 24 Apr 2020 13:55:16 -0700
Subject: [PATCH 1/5] Don't autocomplete messages that have already been
settled; Remove lock renewal from processor event args
---
.../src/Processor/ProcessMessageEventArgs.cs | 48 ++---
.../ProcessSessionMessageEventArgs.cs | 54 +++---
.../src/Processor/ServiceBusProcessor.cs | 41 +++--
.../src/Receiver/ServiceBusSessionReceiver.cs | 6 +
.../tests/Infrastructure/TestException.cs | 12 ++
.../tests/Processor/ProcessorLiveTests.cs | 164 +++++++++++++++++-
.../tests/Processor/ProcessorTests.cs | 105 +++++++++++
.../Processor/SessionProcessorLiveTests.cs | 162 +++++++++++++++++
.../tests/Processor/SessionProcessorTests.cs | 118 +++++++++++++
9 files changed, 645 insertions(+), 65 deletions(-)
create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestException.cs
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessMessageEventArgs.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessMessageEventArgs.cs
index 9363de28d312..c03303800ae0 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessMessageEventArgs.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessMessageEventArgs.cs
@@ -9,7 +9,8 @@
namespace Azure.Messaging.ServiceBus
{
///
- /// Contains information about a receiver that has attempted to receive a message from the Azure Service Bus entity.
+ /// The contain event args that are specific
+ /// to the that is being processed.
///
public class ProcessMessageEventArgs : EventArgs
{
@@ -24,6 +25,12 @@ public class ProcessMessageEventArgs : EventArgs
///
public CancellationToken CancellationToken { get; }
+ ///
+ /// Indicates whether the user has attempted to settle the message as part of their callback.
+ /// If they have done so, we will not autocomplete.
+ ///
+ internal bool UserSettled { get; set; }
+
private readonly ServiceBusReceiver _receiver;
///
@@ -50,9 +57,12 @@ internal ProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusRe
public async Task AbandonAsync(
ServiceBusReceivedMessage message,
IDictionary propertiesToModify = default,
- CancellationToken cancellationToken = default) =>
+ CancellationToken cancellationToken = default)
+ {
await _receiver.AbandonAsync(message, propertiesToModify, cancellationToken)
.ConfigureAwait(false);
+ UserSettled = true;
+ }
///
///
@@ -62,11 +72,14 @@ await _receiver.AbandonAsync(message, propertiesToModify, cancellationToken)
///
public async Task CompleteAsync(
ServiceBusReceivedMessage message,
- CancellationToken cancellationToken = default) =>
+ CancellationToken cancellationToken = default)
+ {
await _receiver.CompleteAsync(
message,
cancellationToken)
.ConfigureAwait(false);
+ UserSettled = true;
+ }
///
///
@@ -80,13 +93,16 @@ public async Task DeadLetterAsync(
ServiceBusReceivedMessage message,
string deadLetterReason,
string deadLetterErrorDescription = default,
- CancellationToken cancellationToken = default) =>
+ CancellationToken cancellationToken = default)
+ {
await _receiver.DeadLetterAsync(
message,
deadLetterReason,
deadLetterErrorDescription,
cancellationToken)
.ConfigureAwait(false);
+ UserSettled = true;
+ }
///
///
@@ -98,12 +114,15 @@ await _receiver.DeadLetterAsync(
public async Task DeadLetterAsync(
ServiceBusReceivedMessage message,
IDictionary propertiesToModify = default,
- CancellationToken cancellationToken = default) =>
+ CancellationToken cancellationToken = default)
+ {
await _receiver.DeadLetterAsync(
message,
propertiesToModify,
cancellationToken)
.ConfigureAwait(false);
+ UserSettled = true;
+ }
///
///
@@ -115,25 +134,14 @@ await _receiver.DeadLetterAsync(
public async Task DeferAsync(
ServiceBusReceivedMessage message,
IDictionary propertiesToModify = default,
- CancellationToken cancellationToken = default) =>
+ CancellationToken cancellationToken = default)
+ {
await _receiver.DeferAsync(
message,
propertiesToModify,
cancellationToken)
.ConfigureAwait(false);
-
- ///
- ///
- ///
- ///
- ///
- ///
- public async Task RenewMessageLockAsync(
- ServiceBusReceivedMessage message,
- CancellationToken cancellationToken = default) =>
- await _receiver.RenewMessageLockAsync(
- message,
- cancellationToken)
- .ConfigureAwait(false);
+ UserSettled = true;
+ }
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionMessageEventArgs.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionMessageEventArgs.cs
index 72db6e1f07e6..742d8342a59e 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionMessageEventArgs.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionMessageEventArgs.cs
@@ -11,7 +11,7 @@ namespace Azure.Messaging.ServiceBus
{
///
/// The contain event args that are specific
- /// to the that is being processed.
+ /// to the and session that is being processed.
///
public class ProcessSessionMessageEventArgs : EventArgs
{
@@ -42,6 +42,12 @@ public class ProcessSessionMessageEventArgs : EventArgs
///
public DateTimeOffset SessionLockedUntil => _sessionReceiver.SessionLockedUntil;
+ ///
+ /// Indicates whether the user has attempted to settle the message as part of their callback.
+ /// If they have done so, we will not autocomplete.
+ ///
+ internal bool UserSettled { get; set; }
+
///
/// Initializes a new instance of the class.
///
@@ -86,25 +92,6 @@ public virtual async Task SetSessionStateAsync(
CancellationToken cancellationToken = default) =>
await _sessionReceiver.SetSessionStateAsync(sessionState, cancellationToken).ConfigureAwait(false);
- ///
- /// Renews the lock on the session specified by the . The lock will be renewed based on the setting specified on the entity.
- ///
- ///
- /// An optional instance to signal the request to cancel the operation.
- ///
- ///
- ///
- /// When you get session receiver, the session is locked for this receiver by the service for a duration as specified during the Queue/Subscription creation.
- /// If processing of the session requires longer than this duration, the session-lock needs to be renewed.
- /// For each renewal, it resets the time the session is locked by the LockDuration set on the Entity.
- ///
- ///
- /// Renewal of session renews all the messages in the session as well. Each individual message need not be renewed.
- ///
- ///
- public virtual async Task RenewSessionLockAsync(CancellationToken cancellationToken = default) =>
- await _sessionReceiver.RenewSessionLockAsync(cancellationToken).ConfigureAwait(false);
-
///
/// Abandons a . This will make the message available again for immediate processing as the lock on the message held by the processor will be released.
///
@@ -123,9 +110,12 @@ public virtual async Task RenewSessionLockAsync(CancellationToken cancellationTo
public async Task AbandonAsync(
ServiceBusReceivedMessage message,
IDictionary propertiesToModify = default,
- CancellationToken cancellationToken = default) =>
+ CancellationToken cancellationToken = default)
+ {
await _sessionReceiver.AbandonAsync(message, propertiesToModify, cancellationToken)
- .ConfigureAwait(false);
+ .ConfigureAwait(false);
+ UserSettled = true;
+ }
///
/// Completes a . This will delete the message from the service.
@@ -141,11 +131,14 @@ await _sessionReceiver.AbandonAsync(message, propertiesToModify, cancellationTok
/// A task to be resolved on when the operation has completed.
public async Task CompleteAsync(
ServiceBusReceivedMessage message,
- CancellationToken cancellationToken = default) =>
+ CancellationToken cancellationToken = default)
+ {
await _sessionReceiver.CompleteAsync(
message,
cancellationToken)
.ConfigureAwait(false);
+ UserSettled = true;
+ }
///
/// Moves a message to the deadletter sub-queue.
@@ -167,13 +160,16 @@ public async Task DeadLetterAsync(
ServiceBusReceivedMessage message,
string deadLetterReason,
string deadLetterErrorDescription = default,
- CancellationToken cancellationToken = default) =>
+ CancellationToken cancellationToken = default)
+ {
await _sessionReceiver.DeadLetterAsync(
message,
deadLetterReason,
deadLetterErrorDescription,
cancellationToken)
.ConfigureAwait(false);
+ UserSettled = true;
+ }
///
/// Moves a message to the deadletter sub-queue.
@@ -193,12 +189,15 @@ await _sessionReceiver.DeadLetterAsync(
public async Task DeadLetterAsync(
ServiceBusReceivedMessage message,
IDictionary propertiesToModify = default,
- CancellationToken cancellationToken = default) =>
+ CancellationToken cancellationToken = default)
+ {
await _sessionReceiver.DeadLetterAsync(
message,
propertiesToModify,
cancellationToken)
.ConfigureAwait(false);
+ UserSettled = true;
+ }
/// Defers the processing for a message.
///
@@ -219,11 +218,14 @@ await _sessionReceiver.DeadLetterAsync(
public async Task DeferAsync(
ServiceBusReceivedMessage message,
IDictionary propertiesToModify = default,
- CancellationToken cancellationToken = default) =>
+ CancellationToken cancellationToken = default)
+ {
await _sessionReceiver.DeferAsync(
message,
propertiesToModify,
cancellationToken)
.ConfigureAwait(false);
+ UserSettled = true;
+ }
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
index 6b89ed61ed21..268bd39c65b5 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
@@ -701,6 +701,7 @@ private async Task ProcessOneMessage(
ServiceBusErrorSource errorSource = ServiceBusErrorSource.Receive;
CancellationTokenSource renewLockCancellationTokenSource = null;
Task renewLock = null;
+ bool userSettled = false;
try
{
@@ -717,24 +718,36 @@ private async Task ProcessOneMessage(
if (IsSessionProcessor)
{
- await OnProcessSessionMessageAsync(
- new ProcessSessionMessageEventArgs(
- message,
- (ServiceBusSessionReceiver)receiver,
- processorCancellationToken))
- .ConfigureAwait(false);
+ var args = new ProcessSessionMessageEventArgs(
+ message,
+ (ServiceBusSessionReceiver)receiver,
+ processorCancellationToken);
+ try
+ {
+ await OnProcessSessionMessageAsync(args).ConfigureAwait(false);
+ }
+ finally
+ {
+ userSettled = args.UserSettled;
+ }
}
else
{
- await OnProcessMessageAsync(
- new ProcessMessageEventArgs(
+ var args = new ProcessMessageEventArgs(
message,
receiver,
- processorCancellationToken))
- .ConfigureAwait(false);
+ processorCancellationToken);
+ try
+ {
+ await OnProcessMessageAsync(args).ConfigureAwait(false);
+ }
+ finally
+ {
+ userSettled = args.UserSettled;
+ }
}
- if (ReceiveMode == ReceiveMode.PeekLock && AutoComplete)
+ if (ReceiveMode == ReceiveMode.PeekLock && AutoComplete && !userSettled)
{
errorSource = ServiceBusErrorSource.Complete;
// don't pass the processor cancellation token
@@ -754,8 +767,10 @@ await receiver.CompleteAsync(
await RaiseExceptionReceived(
new ProcessErrorEventArgs(ex, errorSource, FullyQualifiedNamespace, EntityPath)).ConfigureAwait(false);
- // if the message or session lock was lost, do not attempt to abandon the message
- if (ReceiveMode == ReceiveMode.PeekLock &&
+ // if the user settled the message, or if the message or session lock was lost,
+ // do not attempt to abandon the message
+ if (!userSettled &&
+ ReceiveMode == ReceiveMode.PeekLock &&
(!(ex is ServiceBusException sbException) ||
(sbException.Reason != ServiceBusException.FailureReason.SessionLockLost) &&
sbException.Reason != ServiceBusException.FailureReason.MessageLockLost))
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusSessionReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusSessionReceiver.cs
index db0fe8f3109b..de955a450770 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusSessionReceiver.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusSessionReceiver.cs
@@ -74,6 +74,12 @@ internal ServiceBusSessionReceiver(
{
}
+ ///
+ /// Initializes a new instance of the class for mocking.
+ ///
+ ///
+ protected ServiceBusSessionReceiver() : base() { }
+
///
/// Gets the session state.
///
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestException.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestException.cs
new file mode 100644
index 000000000000..c53855f1c0b3
--- /dev/null
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestException.cs
@@ -0,0 +1,12 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System;
+
+namespace Azure.Messaging.ServiceBus.Tests.Infrastructure
+{
+ public class TestException : Exception
+ {
+
+ }
+}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs
index bfbc20d408ff..dd3886417b51 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs
@@ -6,6 +6,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+using Azure.Messaging.ServiceBus.Tests.Infrastructure;
using NUnit.Framework;
namespace Azure.Messaging.ServiceBus.Tests.Receiver
@@ -60,9 +61,6 @@ async Task ProcessMessage(ProcessMessageEventArgs args)
var message = args.Message;
if (!autoComplete)
{
- var lockedUntil = message.LockedUntil;
- await args.RenewMessageLockAsync(message);
- Assert.That(message.LockedUntil > lockedUntil);
await args.CompleteAsync(message, args.CancellationToken);
}
Interlocked.Increment(ref messageCt);
@@ -87,6 +85,89 @@ async Task ProcessMessage(ProcessMessageEventArgs args)
}
}
+ [Test]
+ [TestCase(1)]
+ [TestCase(5)]
+ [TestCase(10)]
+ [TestCase(20)]
+ public async Task UserSettlingWithAutoCompleteDoesNotThrow(int numThreads)
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(
+ enablePartitioning: false,
+ enableSession: false))
+ {
+ await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
+ ServiceBusSender sender = client.CreateSender(scope.QueueName);
+
+ // use double the number of threads so we can make sure we test that we don't
+ // retrieve more messages than expected when there are more messages available
+ using ServiceBusMessageBatch batch = await sender.CreateBatchAsync();
+ var messageSendCt = numThreads * 2;
+ ServiceBusMessageBatch messageBatch = AddMessages(batch, messageSendCt);
+
+ await sender.SendBatchAsync(messageBatch);
+
+ var options = new ServiceBusProcessorOptions
+ {
+ MaxConcurrentCalls = numThreads,
+ AutoComplete = true,
+ MaxReceiveWaitTime = TimeSpan.FromSeconds(30)
+ };
+ var processor = client.CreateProcessor(scope.QueueName, options);
+ int messageCt = 0;
+
+ TaskCompletionSource[] completionSources = Enumerable
+ .Range(0, numThreads)
+ .Select(index => new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously))
+ .ToArray();
+ var completionSourceIndex = -1;
+
+ processor.ProcessMessageAsync += ProcessMessage;
+ processor.ProcessErrorAsync += ExceptionHandler;
+ await processor.StartProcessingAsync();
+
+ async Task ProcessMessage(ProcessMessageEventArgs args)
+ {
+ try
+ {
+ var message = args.Message;
+ switch (numThreads)
+ {
+ case 1:
+ await args.CompleteAsync(message, args.CancellationToken);
+ break;
+ case 5:
+ await args.AbandonAsync(message);
+ break;
+ case 10:
+ await args.DeadLetterAsync(message);
+ break;
+ case 20:
+ await args.DeferAsync(message);
+ break;
+ }
+ Interlocked.Increment(ref messageCt);
+ }
+ finally
+ {
+
+ var setIndex = Interlocked.Increment(ref completionSourceIndex);
+ if (setIndex < numThreads)
+ {
+ completionSources[setIndex].SetResult(true);
+ }
+ }
+ }
+ await Task.WhenAll(completionSources.Select(source => source.Task));
+ await processor.StopProcessingAsync();
+
+ // we complete each task after one message being processed, so the total number of messages
+ // processed should equal the number of threads, but it's possible that we may process a few more per thread.
+ Assert.IsTrue(messageCt >= numThreads);
+ Assert.IsTrue(messageCt < messageSendCt);
+ }
+ }
+
[Test]
[TestCase(1)]
[TestCase(20)]
@@ -383,11 +464,9 @@ public async Task CannotAddHandlerWhileProcessorIsRunning()
[Test]
public async Task StopProcessingDoesNotCancelAutoCompletion()
{
- var lockDuration = TimeSpan.FromSeconds(5);
await using (var scope = await ServiceBusScope.CreateWithQueue(
enablePartitioning: false,
- enableSession: false,
- lockDuration: lockDuration))
+ enableSession: false))
{
await using var client = GetClient();
var sender = client.CreateSender(scope.QueueName);
@@ -414,5 +493,78 @@ Task ProcessMessage(ProcessMessageEventArgs args)
Assert.IsNull(msg);
}
}
+
+ [Test]
+ [TestCase("")]
+ [TestCase("Abandon")]
+ [TestCase("Complete")]
+ [TestCase("Defer")]
+ [TestCase("Deadletter")]
+ [TestCase("DeadletterOverload")]
+ public async Task UserCallbackThrowingCausesMessageToBeAbandonedIfNotSettled(string settleMethod)
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(
+ enablePartitioning: false,
+ enableSession: false))
+ {
+ await using var client = GetClient();
+ var sender = client.CreateSender(scope.QueueName);
+ await sender.SendAsync(GetMessage());
+ var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
+ {
+ AutoComplete = true
+ });
+ var tcs = new TaskCompletionSource();
+
+ async Task ProcessMessage(ProcessMessageEventArgs args)
+ {
+ switch (settleMethod)
+ {
+ case "Abandon":
+ await args.AbandonAsync(args.Message);
+ break;
+ case "Complete":
+ await args.CompleteAsync(args.Message);
+ break;
+ case "Defer":
+ await args.DeferAsync(args.Message);
+ break;
+ case "Deadletter":
+ await args.DeadLetterAsync(args.Message);
+ break;
+ case "DeadletterOverload":
+ await args.DeadLetterAsync(args.Message, "reason");
+ break;
+ }
+ throw new TestException();
+ }
+
+ Task ExceptionHandler(ProcessErrorEventArgs args)
+ {
+ tcs.SetResult(true);
+ if (!(args.Exception is TestException))
+ {
+ Assert.Fail(args.Exception.ToString());
+ }
+ return Task.CompletedTask;
+ }
+ processor.ProcessMessageAsync += ProcessMessage;
+ processor.ProcessErrorAsync += ExceptionHandler;
+
+ await processor.StartProcessingAsync();
+ await tcs.Task;
+ await processor.StopProcessingAsync();
+ var receiver = client.CreateReceiver(scope.QueueName);
+ var msg = await receiver.ReceiveAsync(TimeSpan.FromSeconds(5));
+ if (settleMethod == "" || settleMethod == "Abandon")
+ {
+ Assert.IsNotNull(msg);
+ }
+ else
+ {
+ Assert.IsNull(msg);
+ }
+ }
+ }
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs
index ff2456ac0bd2..03a337ee2bf5 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs
@@ -2,6 +2,7 @@
// Licensed under the MIT License.
using System;
+using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -113,5 +114,109 @@ public void ProcessorOptionsSetOnClient()
Assert.AreEqual(fullyQualifiedNamespace, processor.FullyQualifiedNamespace);
Assert.AreEqual(options.MaxReceiveWaitTime, processor.MaxReceiveWaitTime);
}
+
+ [Test]
+ public async Task UserSettledPropertySetCorrectly()
+ {
+ var msg = new ServiceBusReceivedMessage();
+ var args = new ProcessMessageEventArgs(
+ msg,
+ new Mock().Object,
+ CancellationToken.None);
+
+ Assert.IsFalse(args.UserSettled);
+
+ args.UserSettled = false;
+ await args.AbandonAsync(msg);
+ Assert.IsTrue(args.UserSettled);
+
+ await args.CompleteAsync(msg);
+ Assert.IsTrue(args.UserSettled);
+
+ args.UserSettled = false;
+ await args.DeadLetterAsync(msg);
+ Assert.IsTrue(args.UserSettled);
+
+ args.UserSettled = false;
+ await args.DeadLetterAsync(msg, "reason");
+ Assert.IsTrue(args.UserSettled);
+
+ args.UserSettled = false;
+ await args.DeferAsync(msg);
+ Assert.IsTrue(args.UserSettled);
+ }
+
+ [Test]
+ public void UserSettledPropertySetCorrectlyOnException()
+ {
+ var msg = new ServiceBusReceivedMessage();
+ var mockReceiver = new Mock();
+
+ mockReceiver
+ .Setup(receiver => receiver.AbandonAsync(
+ It.IsAny(),
+ It.IsAny>(),
+ It.IsAny()))
+ .Throws(new Exception());
+
+ mockReceiver
+ .Setup(receiver => receiver.DeferAsync(
+ It.IsAny(),
+ It.IsAny>(),
+ It.IsAny()))
+ .Throws(new Exception());
+
+ mockReceiver
+ .Setup(receiver => receiver.CompleteAsync(
+ It.IsAny(),
+ It.IsAny()))
+ .Throws(new Exception());
+
+ mockReceiver
+ .Setup(receiver => receiver.DeadLetterAsync(
+ It.IsAny(),
+ It.IsAny>(),
+ It.IsAny()))
+ .Throws(new Exception());
+
+ mockReceiver
+ .Setup(receiver => receiver.DeadLetterAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Throws(new Exception());
+
+ var args = new ProcessMessageEventArgs(
+ msg,
+ mockReceiver.Object,
+ CancellationToken.None);
+
+ Assert.IsFalse(args.UserSettled);
+
+ args.UserSettled = false;
+ Assert.That(async () => await args.AbandonAsync(msg),
+ Throws.InstanceOf());
+ Assert.IsFalse(args.UserSettled);
+
+ Assert.That(async () => await args.CompleteAsync(msg),
+ Throws.InstanceOf());
+ Assert.IsFalse(args.UserSettled);
+
+ args.UserSettled = false;
+ Assert.That(async () => await args.DeadLetterAsync(msg),
+ Throws.InstanceOf());
+ Assert.IsFalse(args.UserSettled);
+
+ args.UserSettled = false;
+ Assert.That(async () => await args.DeadLetterAsync(msg, "reason"),
+ Throws.InstanceOf());
+ Assert.IsFalse(args.UserSettled);
+
+ args.UserSettled = false;
+ Assert.That(async () => await args.DeferAsync(msg),
+ Throws.InstanceOf());
+ Assert.IsFalse(args.UserSettled);
+ }
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
index f6ad52c47a2d..66c4b0de238c 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
@@ -9,6 +9,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
+using Azure.Messaging.ServiceBus.Tests.Infrastructure;
using Moq;
using NUnit.Framework;
@@ -122,6 +123,91 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
}
}
+ [Test]
+ [TestCase(1)]
+ [TestCase(5)]
+ [TestCase(10)]
+ [TestCase(20)]
+ public async Task UserSettlingWithAutoCompleteDoesNotThrow(int numThreads)
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(
+ enablePartitioning: false,
+ enableSession: true))
+ {
+ await using var client = GetClient();
+ ServiceBusSender sender = client.CreateSender(scope.QueueName);
+
+ // send 1 message for each thread and use a different session for each message
+ ConcurrentDictionary sessions = new ConcurrentDictionary();
+ for (int i = 0; i < numThreads; i++)
+ {
+ var sessionId = Guid.NewGuid().ToString();
+ await sender.SendAsync(GetMessage(sessionId));
+ sessions.TryAdd(sessionId, true);
+ }
+ var options = new ServiceBusProcessorOptions
+ {
+ MaxConcurrentCalls = numThreads,
+ AutoComplete = true
+ };
+ var processor = client.CreateSessionProcessor(scope.QueueName, options);
+ int messageCt = 0;
+
+ TaskCompletionSource[] completionSources = Enumerable
+ .Range(0, numThreads)
+ .Select(index => new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously))
+ .ToArray();
+
+ var completionSourceIndex = -1;
+
+ processor.ProcessMessageAsync += ProcessMessage;
+ processor.ProcessErrorAsync += ExceptionHandler;
+ await processor.StartProcessingAsync();
+
+ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
+ {
+ try
+ {
+ var message = args.Message;
+ switch (numThreads)
+ {
+ case 1:
+ await args.CompleteAsync(message, args.CancellationToken);
+ break;
+ case 5:
+ await args.AbandonAsync(message);
+ break;
+ case 10:
+ await args.DeadLetterAsync(message);
+ break;
+ case 20:
+ await args.DeferAsync(message);
+ break;
+ }
+ Interlocked.Increment(ref messageCt);
+ sessions.TryRemove(message.SessionId, out bool _);
+ Assert.AreEqual(message.SessionId, args.SessionId);
+ Assert.IsNotNull(args.SessionLockedUntil);
+ }
+ finally
+ {
+ var setIndex = Interlocked.Increment(ref completionSourceIndex);
+ completionSources[setIndex].SetResult(true);
+ }
+ }
+ await Task.WhenAll(completionSources.Select(source => source.Task));
+ await processor.StopProcessingAsync();
+
+ // there is only one message for each session, and one
+ // thread for each session, so the total messages processed
+ // should equal the number of threads
+ Assert.AreEqual(numThreads, messageCt);
+
+ // we should have received messages from each of the sessions
+ Assert.AreEqual(0, sessions.Count);
+ }
+ }
+
[Test]
[TestCase(1, true)]
[TestCase(5, false)]
@@ -545,5 +631,81 @@ Task ProcessMessage(ProcessSessionMessageEventArgs args)
}
}
+ [Test]
+ [TestCase("")]
+ [TestCase("Abandon")]
+ [TestCase("Complete")]
+ [TestCase("Defer")]
+ [TestCase("Deadletter")]
+ [TestCase("DeadletterOverload")]
+ public async Task UserCallbackThrowingCausesMessageToBeAbandonedIfNotSettled(string settleMethod)
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(
+ enablePartitioning: false,
+ enableSession: true))
+ {
+ await using var client = GetClient();
+ var sender = client.CreateSender(scope.QueueName);
+ await sender.SendAsync(GetMessage("sessionId"));
+ var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusProcessorOptions
+ {
+ AutoComplete = true
+ });
+ var tcs = new TaskCompletionSource();
+
+ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
+ {
+ switch (settleMethod)
+ {
+ case "Abandon":
+ await args.AbandonAsync(args.Message);
+ break;
+ case "Complete":
+ await args.CompleteAsync(args.Message);
+ break;
+ case "Defer":
+ await args.DeferAsync(args.Message);
+ break;
+ case "Deadletter":
+ await args.DeadLetterAsync(args.Message);
+ break;
+ case "DeadletterOverload":
+ await args.DeadLetterAsync(args.Message, "reason");
+ break;
+ }
+ throw new TestException();
+ }
+
+ Task ExceptionHandler(ProcessErrorEventArgs args)
+ {
+ tcs.SetResult(true);
+ if (!(args.Exception is TestException))
+ {
+ Assert.Fail(args.Exception.ToString());
+ }
+ return Task.CompletedTask;
+ }
+ processor.ProcessMessageAsync += ProcessMessage;
+ processor.ProcessErrorAsync += ExceptionHandler;
+
+ await processor.StartProcessingAsync();
+ await tcs.Task;
+ await processor.StopProcessingAsync();
+
+ if (settleMethod == "" || settleMethod == "Abandon")
+ {
+ var receiver = await client.CreateSessionReceiverAsync(scope.QueueName);
+ var msg = await receiver.ReceiveAsync(TimeSpan.FromSeconds(5));
+ Assert.IsNotNull(msg);
+ }
+ else
+ {
+ Assert.That(
+ async () => await GetNoRetryClient().CreateSessionReceiverAsync(scope.QueueName),
+ Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason)).EqualTo(ServiceBusException.FailureReason.ServiceTimeout));
+ }
+ }
+ }
+
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs
index 0a3f241f4b23..29dc9c4cf646 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs
@@ -2,7 +2,11 @@
// Licensed under the MIT License.
using System;
+using System.Collections.Generic;
using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Moq;
using NUnit.Framework;
namespace Azure.Messaging.ServiceBus.Tests.Processor
@@ -32,5 +36,119 @@ public void ProcessorOptionsSetOnClient()
Assert.AreEqual(options.MaxAutoLockRenewalDuration, processor.MaxAutoLockRenewalDuration);
Assert.AreEqual(fullyQualifiedNamespace, processor.FullyQualifiedNamespace);
}
+
+
+ [Test]
+ public async Task UserSettledPropertySetCorrectly()
+ {
+ var msg = new ServiceBusReceivedMessage();
+ var args = new ProcessSessionMessageEventArgs(
+ msg,
+ new Mock().Object,
+ CancellationToken.None);
+
+ Assert.IsFalse(args.UserSettled);
+
+ args.UserSettled = false;
+ await args.AbandonAsync(msg);
+ Assert.IsTrue(args.UserSettled);
+
+ await args.CompleteAsync(msg);
+ Assert.IsTrue(args.UserSettled);
+
+ args.UserSettled = false;
+ await args.DeadLetterAsync(msg);
+ Assert.IsTrue(args.UserSettled);
+
+ args.UserSettled = false;
+ await args.DeadLetterAsync(msg, "reason");
+ Assert.IsTrue(args.UserSettled);
+
+ args.UserSettled = false;
+ await args.DeferAsync(msg);
+ Assert.IsTrue(args.UserSettled);
+
+ // getting or setting session state doesn't count as settling
+ args.UserSettled = false;
+ await args.GetSessionStateAsync();
+ Assert.IsFalse(args.UserSettled);
+
+ args.UserSettled = false;
+ await args.SetSessionStateAsync(new byte[] { });
+ Assert.IsFalse(args.UserSettled);
+ }
+
+ [Test]
+ public void UserSettledPropertySetCorrectlyOnException()
+ {
+ var msg = new ServiceBusReceivedMessage();
+ var mockReceiver = new Mock();
+
+ mockReceiver
+ .Setup(receiver => receiver.AbandonAsync(
+ It.IsAny(),
+ It.IsAny>(),
+ It.IsAny()))
+ .Throws(new Exception());
+
+ mockReceiver
+ .Setup(receiver => receiver.DeferAsync(
+ It.IsAny(),
+ It.IsAny>(),
+ It.IsAny()))
+ .Throws(new Exception());
+
+ mockReceiver
+ .Setup(receiver => receiver.CompleteAsync(
+ It.IsAny(),
+ It.IsAny()))
+ .Throws(new Exception());
+
+ mockReceiver
+ .Setup(receiver => receiver.DeadLetterAsync(
+ It.IsAny(),
+ It.IsAny>(),
+ It.IsAny()))
+ .Throws(new Exception());
+
+ mockReceiver
+ .Setup(receiver => receiver.DeadLetterAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Throws(new Exception());
+
+ var args = new ProcessSessionMessageEventArgs(
+ msg,
+ mockReceiver.Object,
+ CancellationToken.None);
+
+ Assert.IsFalse(args.UserSettled);
+
+ args.UserSettled = false;
+ Assert.That(async () => await args.AbandonAsync(msg),
+ Throws.InstanceOf());
+ Assert.IsFalse(args.UserSettled);
+
+ Assert.That(async () => await args.CompleteAsync(msg),
+ Throws.InstanceOf());
+ Assert.IsFalse(args.UserSettled);
+
+ args.UserSettled = false;
+ Assert.That(async () => await args.DeadLetterAsync(msg),
+ Throws.InstanceOf());
+ Assert.IsFalse(args.UserSettled);
+
+ args.UserSettled = false;
+ Assert.That(async () => await args.DeadLetterAsync(msg, "reason"),
+ Throws.InstanceOf());
+ Assert.IsFalse(args.UserSettled);
+
+ args.UserSettled = false;
+ Assert.That(async () => await args.DeferAsync(msg),
+ Throws.InstanceOf());
+ Assert.IsFalse(args.UserSettled);
+ }
}
}
From 73d07c41979b95ff3c3217ffd54250eb4e90e60a Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Mon, 27 Apr 2020 16:14:32 -0700
Subject: [PATCH 2/5] Add session init/close events
---
.../src/Processor/ProcessSessionEventArgs.cs | 77 ++++++++++++
.../ProcessSessionMessageEventArgs.cs | 3 +-
.../src/Processor/ServiceBusProcessor.cs | 114 ++++++++++++++++++
.../Processor/ServiceBusSessionProcessor.cs | 39 ++++++
.../tests/Processor/ProcessorLiveTests.cs | 2 +-
.../Processor/SessionProcessorLiveTests.cs | 101 +++++++++++++---
.../tests/Processor/SessionProcessorTests.cs | 92 ++++++++++++++
7 files changed, 406 insertions(+), 22 deletions(-)
create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionEventArgs.cs
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionEventArgs.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionEventArgs.cs
new file mode 100644
index 000000000000..9127d065ffa2
--- /dev/null
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionEventArgs.cs
@@ -0,0 +1,77 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Azure.Messaging.ServiceBus
+{
+ ///
+ /// The contain event args related to the session being processed.
+ ///
+ public class ProcessSessionEventArgs : EventArgs
+ {
+ ///
+ /// The processor's instance which will be
+ /// cancelled in the event that is called.
+ ///
+ public CancellationToken CancellationToken { get; }
+
+ ///
+ /// The that will be used for all settlement methods for the args.
+ ///
+ private readonly ServiceBusSessionReceiver _sessionReceiver;
+
+ ///
+ /// The Session Id associated with the session being processed.
+ ///
+ public string SessionId => _sessionReceiver.SessionId;
+
+ ///
+ /// Gets the that the current session is locked until.
+ ///
+ public DateTimeOffset SessionLockedUntil => _sessionReceiver.SessionLockedUntil;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The that will be used for all settlement methods
+ /// for the args.
+ /// The processor's instance which will be cancelled in the event that is called.
+ internal ProcessSessionEventArgs(
+ ServiceBusSessionReceiver receiver,
+ CancellationToken cancellationToken)
+ {
+ _sessionReceiver = receiver;
+ CancellationToken = cancellationToken;
+ }
+
+ ///
+ /// Gets the session state.
+ ///
+ ///
+ /// An optional instance to signal the request to cancel the operation.
+ ///
+ /// The session state as byte array.
+ public virtual async Task GetSessionStateAsync(
+ CancellationToken cancellationToken = default) =>
+ await _sessionReceiver.GetSessionStateAsync(cancellationToken).ConfigureAwait(false);
+
+ ///
+ /// Set a custom state on the session which can be later retrieved using
+ ///
+ ///
+ /// A byte array of session state
+ /// An optional instance to signal the request to cancel the operation.
+ ///
+ /// This state is stored on Service Bus forever unless you set an empty state on it.
+ ///
+ /// A task to be resolved on when the operation has completed.
+ public virtual async Task SetSessionStateAsync(
+ byte[] sessionState,
+ CancellationToken cancellationToken = default) =>
+ await _sessionReceiver.SetSessionStateAsync(sessionState, cancellationToken).ConfigureAwait(false);
+ }
+}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionMessageEventArgs.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionMessageEventArgs.cs
index 742d8342a59e..d06051f185d9 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionMessageEventArgs.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionMessageEventArgs.cs
@@ -3,7 +3,6 @@
using System;
using System.Collections.Generic;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -49,7 +48,7 @@ public class ProcessSessionMessageEventArgs : EventArgs
internal bool UserSettled { get; set; }
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
/// The current .
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
index 268bd39c65b5..6de4a7544900 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
@@ -30,6 +30,10 @@ public class ServiceBusProcessor
private Func _processErrorAsync = default;
+ private Func _sessionInitializingAsync;
+
+ private Func _sessionClosingAsync;
+
private SemaphoreSlim MessageHandlerSemaphore;
///
@@ -68,6 +72,44 @@ private Task OnProcessMessageAsync(ProcessMessageEventArgs args) =>
/// The set of arguments to identify the context of the error to be processed.
private Task OnProcessErrorAsync(ProcessErrorEventArgs eventArgs) => _processErrorAsync(eventArgs);
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ private async Task OnSessionInitializingAsync(
+ ServiceBusSessionReceiver receiver,
+ CancellationToken cancellationToken)
+ {
+ // Handlers cannot be changed while the processor is running; it is safe to check and call
+ // without capturing a local reference.
+ if (_sessionInitializingAsync != null)
+ {
+ var args = new ProcessSessionEventArgs(receiver, cancellationToken);
+ await _sessionInitializingAsync(args).ConfigureAwait(false);
+ }
+ }
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ private async Task OnSessionClosingAsync(
+ ServiceBusSessionReceiver receiver,
+ CancellationToken cancellationToken)
+ {
+ // Handlers cannot be changed while the processor is running; it is safe to check and call
+ // without capturing a local reference.
+ if (_sessionClosingAsync != null)
+ {
+ var args = new ProcessSessionEventArgs(receiver, cancellationToken);
+ await _sessionClosingAsync(args).ConfigureAwait(false);
+ }
+ }
+
///
/// The fully qualified Service Bus namespace that the receiver is associated with. This is likely
/// to be similar to {yournamespace}.servicebus.windows.net.
@@ -141,6 +183,7 @@ private set
private int _maxConcurrentCalls;
private int _maxConcurrentAcceptSessions;
+
private const int DefaultMaxConcurrentCalls = 1;
private const int DefaultMaxConcurrentSessions = 8;
@@ -342,6 +385,68 @@ public event Func ProcessErrorAsync
}
}
+ ///
+ /// Optional event that can be set to be notified when a new session is about to be processed.
+ ///
+ [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Justification = "Guidance does not apply; this is an event.")]
+ [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the standard .NET event pattern; override via the associated On<> method.")]
+ internal event Func SessionInitializingAsync
+ {
+ add
+ {
+ Argument.AssertNotNull(value, nameof(SessionInitializingAsync));
+
+ if (_sessionInitializingAsync != default)
+ {
+ throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned);
+ }
+ EnsureNotRunningAndInvoke(() => _sessionInitializingAsync = value);
+
+ }
+
+ remove
+ {
+ Argument.AssertNotNull(value, nameof(SessionInitializingAsync));
+ if (_sessionInitializingAsync != value)
+ {
+ throw new ArgumentException(Resources.HandlerHasNotBeenAssigned);
+ }
+ EnsureNotRunningAndInvoke(() => _sessionInitializingAsync = default);
+ }
+ }
+
+ ///
+ /// Optional event that can be set to be notified when a session is about to be closed for processing.
+ /// This means that the most recent ReceiveAsync call timed out so there are currently no messages
+ /// available to be received for the session.
+ ///
+ [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Justification = "Guidance does not apply; this is an event.")]
+ [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the standard .NET event pattern; override via the associated On<> method.")]
+ internal event Func SessionClosingAsync
+ {
+ add
+ {
+ Argument.AssertNotNull(value, nameof(SessionClosingAsync));
+
+ if (_sessionClosingAsync != default)
+ {
+ throw new NotSupportedException(Resources.HandlerHasAlreadyBeenAssigned);
+ }
+ EnsureNotRunningAndInvoke(() => _sessionClosingAsync = value);
+
+ }
+
+ remove
+ {
+ Argument.AssertNotNull(value, nameof(SessionClosingAsync));
+ if (_sessionClosingAsync != value)
+ {
+ throw new ArgumentException(Resources.HandlerHasNotBeenAssigned);
+ }
+ EnsureNotRunningAndInvoke(() => _sessionClosingAsync = default);
+ }
+ }
+
///
/// Signals the to begin processing messages. Should this method be called while the processor
/// is running, no action is taken.
@@ -581,6 +686,11 @@ private async Task GetAndProcessMessagesAsync(
connection: _connection,
options: receiverOptions,
cancellationToken: cancellationToken).ConfigureAwait(false);
+
+ await OnSessionInitializingAsync(
+ (ServiceBusSessionReceiver)receiver,
+ cancellationToken).
+ ConfigureAwait(false);
}
catch (ServiceBusException ex)
when (ex.Reason == ServiceBusException.FailureReason.ServiceTimeout)
@@ -621,6 +731,10 @@ private async Task GetAndProcessMessagesAsync(
// 2. break out of the loop to allow requesting another session from the service
if (IsSessionProcessor && _sessionId == null)
{
+ await OnSessionClosingAsync(
+ (ServiceBusSessionReceiver)receiver,
+ cancellationToken)
+ .ConfigureAwait(false);
await CancelTask(
renewSessionLockCancellationSource,
renewSessionLock).ConfigureAwait(false);
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs
index 5f518a028f85..969717341875 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs
@@ -152,6 +152,45 @@ public event Func ProcessErrorAsync
}
}
+ ///
+ /// Optional event that can be set to be notified when a new session is about to be processed.
+ ///
+ [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Justification = "Guidance does not apply; this is an event.")]
+ [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the standard .NET event pattern; override via the associated On<> method.")]
+ public event Func SessionInitializingAsync
+ {
+ add
+ {
+ _innerProcessor.SessionInitializingAsync += value;
+
+ }
+
+ remove
+ {
+ _innerProcessor.SessionInitializingAsync -= value;
+ }
+ }
+
+ ///
+ /// Optional event that can be set to be notified when a session is about to be closed for processing.
+ /// This means that the most recent ReceiveAsync call timed out so there are currently no messages
+ /// available to be received for the session.
+ ///
+ [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Justification = "Guidance does not apply; this is an event.")]
+ [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the standard .NET event pattern; override via the associated On<> method.")]
+ public event Func SessionClosingAsync
+ {
+ add
+ {
+ _innerProcessor.SessionClosingAsync += value;
+ }
+
+ remove
+ {
+ _innerProcessor.SessionClosingAsync -= value;
+ }
+ }
+
///
/// Signals the to begin processing messages. Should this method be called while the processor
/// is running, no action is taken.
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs
index dd3886417b51..e727f37f5294 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs
@@ -105,7 +105,7 @@ public async Task UserSettlingWithAutoCompleteDoesNotThrow(int numThreads)
var messageSendCt = numThreads * 2;
ServiceBusMessageBatch messageBatch = AddMessages(batch, messageSendCt);
- await sender.SendBatchAsync(messageBatch);
+ await sender.SendAsync(messageBatch);
var options = new ServiceBusProcessorOptions
{
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
index 66c4b0de238c..e5a34a343de7 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
@@ -18,7 +18,7 @@ namespace Azure.Messaging.ServiceBus.Tests.Processor
public class SessionProcessorLiveTests : ServiceBusLiveTestBase
{
[Test]
- public async Task CannotAddHandlerWhileProcessorIsRunning()
+ public async Task CannotRemoveHandlersWhileProcessorIsRunning()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(
enablePartitioning: false,
@@ -30,22 +30,68 @@ public async Task CannotAddHandlerWhileProcessorIsRunning()
Func eventHandler = eventArgs => Task.CompletedTask;
Func errorHandler = eventArgs => Task.CompletedTask;
+ Func sessionhandler = eventArgs => Task.CompletedTask;
processor.ProcessMessageAsync += eventHandler;
processor.ProcessErrorAsync += errorHandler;
+ processor.SessionInitializingAsync += sessionhandler;
+ processor.SessionClosingAsync += sessionhandler;
await processor.StartProcessingAsync();
Assert.That(() => processor.ProcessMessageAsync -= eventHandler, Throws.InstanceOf());
Assert.That(() => processor.ProcessErrorAsync -= errorHandler, Throws.InstanceOf());
+ Assert.That(() => processor.SessionInitializingAsync -= sessionhandler, Throws.InstanceOf());
+ Assert.That(() => processor.SessionClosingAsync -= sessionhandler, Throws.InstanceOf());
await processor.StopProcessingAsync();
// Once stopped, the processor should allow handlers to be removed, and re-added.
Assert.That(() => processor.ProcessMessageAsync -= eventHandler, Throws.Nothing);
Assert.That(() => processor.ProcessErrorAsync -= errorHandler, Throws.Nothing);
+ Assert.That(() => processor.SessionInitializingAsync -= sessionhandler, Throws.Nothing);
+ Assert.That(() => processor.SessionClosingAsync -= sessionhandler, Throws.Nothing);
Assert.That(() => processor.ProcessMessageAsync += eventHandler, Throws.Nothing);
Assert.That(() => processor.ProcessErrorAsync += errorHandler, Throws.Nothing);
+ Assert.That(() => processor.SessionInitializingAsync += sessionhandler, Throws.Nothing);
+ Assert.That(() => processor.SessionClosingAsync += sessionhandler, Throws.Nothing);
+ }
+ }
+
+ [Test]
+ public async Task CannotAddHandlersWhileProcessorIsRunning()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(
+ enablePartitioning: false,
+ enableSession: true))
+ {
+ await using var client = GetClient();
+
+ var processor = client.CreateSessionProcessor(scope.QueueName);
+
+ Func eventHandler = eventArgs => Task.CompletedTask;
+ Func errorHandler = eventArgs => Task.CompletedTask;
+ Func sessionhandler = eventArgs => Task.CompletedTask;
+ processor.ProcessMessageAsync += eventHandler;
+ processor.ProcessErrorAsync += errorHandler;
+
+ await processor.StartProcessingAsync();
+
+ Assert.That(() => processor.ProcessMessageAsync -= eventHandler, Throws.InstanceOf());
+ Assert.That(() => processor.ProcessErrorAsync -= errorHandler, Throws.InstanceOf());
+ Assert.That(() => processor.SessionInitializingAsync += sessionhandler, Throws.InstanceOf());
+ Assert.That(() => processor.SessionClosingAsync += sessionhandler, Throws.InstanceOf());
+
+ await processor.StopProcessingAsync();
+
+ // Once stopped, the processor should allow handlers to be removed, and re-added.
+ Assert.That(() => processor.ProcessMessageAsync -= eventHandler, Throws.Nothing);
+ Assert.That(() => processor.ProcessErrorAsync -= errorHandler, Throws.Nothing);
+
+ Assert.That(() => processor.ProcessMessageAsync += eventHandler, Throws.Nothing);
+ Assert.That(() => processor.ProcessErrorAsync += errorHandler, Throws.Nothing);
+ Assert.That(() => processor.SessionInitializingAsync += sessionhandler, Throws.Nothing);
+ Assert.That(() => processor.SessionClosingAsync += sessionhandler, Throws.Nothing);
}
}
@@ -64,12 +110,12 @@ public async Task ProcessSessionEvent(int numThreads, bool autoComplete)
ServiceBusSender sender = client.CreateSender(scope.QueueName);
// send 1 message for each thread and use a different session for each message
- ConcurrentDictionary sessions = new ConcurrentDictionary();
+ ConcurrentDictionary sessions = new ConcurrentDictionary();
for (int i = 0; i < numThreads; i++)
{
var sessionId = Guid.NewGuid().ToString();
await sender.SendAsync(GetMessage(sessionId));
- sessions.TryAdd(sessionId, true);
+ sessions.TryAdd(sessionId, null);
}
var options = new ServiceBusProcessorOptions
{
@@ -78,6 +124,7 @@ public async Task ProcessSessionEvent(int numThreads, bool autoComplete)
};
var processor = client.CreateSessionProcessor(scope.QueueName, options);
int messageCt = 0;
+ int sessionEventCt = 0;
TaskCompletionSource[] completionSources = Enumerable
.Range(0, numThreads)
@@ -88,28 +135,41 @@ public async Task ProcessSessionEvent(int numThreads, bool autoComplete)
processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;
+ processor.SessionInitializingAsync += SessionInitHandler;
+ processor.SessionClosingAsync += SessionCloseHandler;
+
await processor.StartProcessingAsync();
+ async Task SessionInitHandler(ProcessSessionEventArgs args)
+ {
+ Interlocked.Increment(ref sessionEventCt);
+ byte[] state = GetRandomBuffer(100);
+ await args.SetSessionStateAsync(state);
+ sessions[args.SessionId] = state;
+ }
+
+ async Task SessionCloseHandler(ProcessSessionEventArgs args)
+ {
+ Interlocked.Increment(ref sessionEventCt);
+ var setIndex = Interlocked.Increment(ref completionSourceIndex);
+ completionSources[setIndex].SetResult(true);
+ sessions.TryRemove(args.SessionId, out byte[] state);
+ byte[] getState = await args.GetSessionStateAsync();
+ Assert.AreEqual(state, getState);
+ }
+
async Task ProcessMessage(ProcessSessionMessageEventArgs args)
{
- try
- {
- var message = args.Message;
- if (!autoComplete)
- {
- await args.CompleteAsync(message);
- }
- Interlocked.Increment(ref messageCt);
- sessions.TryRemove(message.SessionId, out bool _);
- Assert.AreEqual(message.SessionId, args.SessionId);
- Assert.IsNotNull(args.SessionLockedUntil);
- }
- finally
+ var message = args.Message;
+ if (!autoComplete)
{
- var setIndex = Interlocked.Increment(ref completionSourceIndex);
- completionSources[setIndex].SetResult(true);
+ await args.CompleteAsync(message);
}
+ Interlocked.Increment(ref messageCt);
+ Assert.AreEqual(message.SessionId, args.SessionId);
+ Assert.IsNotNull(args.SessionLockedUntil);
}
+
await Task.WhenAll(completionSources.Select(source => source.Task));
await processor.StopProcessingAsync();
@@ -120,6 +180,9 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
// we should have received messages from each of the sessions
Assert.AreEqual(0, sessions.Count);
+
+ // there are two session events per session
+ Assert.AreEqual(numThreads * 2, sessionEventCt);
}
}
@@ -626,7 +689,7 @@ Task ProcessMessage(ProcessSessionMessageEventArgs args)
await tcs.Task;
await processor.StopProcessingAsync();
Assert.That(
- async() => await GetNoRetryClient().CreateSessionReceiverAsync(scope.QueueName),
+ async () => await GetNoRetryClient().CreateSessionReceiverAsync(scope.QueueName),
Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason)).EqualTo(ServiceBusException.FailureReason.ServiceTimeout));
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs
index 29dc9c4cf646..0a391d068972 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs
@@ -13,6 +13,98 @@ namespace Azure.Messaging.ServiceBus.Tests.Processor
{
public class SessionProcessorTests : ServiceBusTestBase
{
+ [Test]
+ public void CannotAddNullHandler()
+ {
+ var processor = new ServiceBusSessionProcessor(
+ GetMockedConnection(),
+ "entityPath",
+ new ServiceBusProcessorOptions());
+
+ Assert.That(() => processor.ProcessMessageAsync += null, Throws.InstanceOf());
+ Assert.That(() => processor.ProcessErrorAsync += null, Throws.InstanceOf());
+ Assert.That(() => processor.SessionInitializingAsync += null, Throws.InstanceOf());
+ Assert.That(() => processor.SessionClosingAsync += null, Throws.InstanceOf());
+ }
+
+ [Test]
+ public void CannotAddTwoHandlersToTheSameEvent()
+ {
+ var processor = new ServiceBusSessionProcessor(
+ GetMockedConnection(),
+ "entityPath",
+ new ServiceBusProcessorOptions());
+
+ processor.ProcessMessageAsync += eventArgs => Task.CompletedTask;
+ processor.ProcessErrorAsync += eventArgs => Task.CompletedTask;
+ processor.SessionInitializingAsync += eventArgs => Task.CompletedTask;
+ processor.SessionClosingAsync += eventArgs => Task.CompletedTask;
+
+ Assert.That(() => processor.ProcessMessageAsync += eventArgs => Task.CompletedTask, Throws.InstanceOf());
+ Assert.That(() => processor.ProcessErrorAsync += eventArgs => Task.CompletedTask, Throws.InstanceOf());
+ Assert.That(() => processor.SessionInitializingAsync += eventArgs => Task.CompletedTask, Throws.InstanceOf());
+ Assert.That(() => processor.SessionClosingAsync += eventArgs => Task.CompletedTask, Throws.InstanceOf());
+ }
+
+ [Test]
+ public void CannotRemoveHandlerThatHasNotBeenAdded()
+ {
+ var processor = new ServiceBusSessionProcessor(
+ GetMockedConnection(),
+ "entityPath",
+ new ServiceBusProcessorOptions());
+
+ // First scenario: no handler has been set.
+
+ Assert.That(() => processor.ProcessMessageAsync -= eventArgs => Task.CompletedTask, Throws.InstanceOf());
+ Assert.That(() => processor.ProcessErrorAsync -= eventArgs => Task.CompletedTask, Throws.InstanceOf());
+ Assert.That(() => processor.SessionInitializingAsync -= eventArgs => Task.CompletedTask, Throws.InstanceOf());
+ Assert.That(() => processor.SessionClosingAsync -= eventArgs => Task.CompletedTask, Throws.InstanceOf());
+
+ // Second scenario: there is a handler set, but it's not the one we are trying to remove.
+
+ processor.ProcessMessageAsync += eventArgs => Task.CompletedTask;
+ processor.ProcessErrorAsync += eventArgs => Task.CompletedTask;
+ processor.SessionInitializingAsync += eventArgs => Task.CompletedTask;
+ processor.SessionClosingAsync += eventArgs => Task.CompletedTask;
+
+ Assert.That(() => processor.ProcessMessageAsync -= eventArgs => Task.CompletedTask, Throws.InstanceOf());
+ Assert.That(() => processor.ProcessErrorAsync -= eventArgs => Task.CompletedTask, Throws.InstanceOf());
+ Assert.That(() => processor.SessionInitializingAsync -= eventArgs => Task.CompletedTask, Throws.InstanceOf());
+ Assert.That(() => processor.SessionClosingAsync -= eventArgs => Task.CompletedTask, Throws.InstanceOf());
+ }
+
+ [Test]
+ public void CanRemoveHandlerThatHasBeenAdded()
+ {
+ var processor = new ServiceBusSessionProcessor(
+ GetMockedConnection(),
+ "entityPath",
+ new ServiceBusProcessorOptions());
+
+ Func eventHandler = eventArgs => Task.CompletedTask;
+ Func errorHandler = eventArgs => Task.CompletedTask;
+ Func sessionInitHandler = eventArgs => Task.CompletedTask;
+ Func sessionCloseHandler = eventArgs => Task.CompletedTask;
+
+ processor.ProcessMessageAsync += eventHandler;
+ processor.ProcessErrorAsync += errorHandler;
+ processor.SessionInitializingAsync += sessionInitHandler;
+ processor.SessionClosingAsync += sessionCloseHandler;
+
+ Assert.That(() => processor.ProcessMessageAsync -= eventHandler, Throws.Nothing);
+ Assert.That(() => processor.ProcessErrorAsync -= errorHandler, Throws.Nothing);
+ Assert.That(() => processor.SessionInitializingAsync -= sessionInitHandler, Throws.Nothing);
+ Assert.That(() => processor.SessionClosingAsync -= sessionCloseHandler, Throws.Nothing);
+
+ // Assert that handlers can be added again.
+
+ Assert.That(() => processor.ProcessMessageAsync += eventHandler, Throws.Nothing);
+ Assert.That(() => processor.ProcessErrorAsync += errorHandler, Throws.Nothing);
+ Assert.That(() => processor.SessionInitializingAsync += sessionInitHandler, Throws.Nothing);
+ Assert.That(() => processor.SessionClosingAsync += sessionCloseHandler, Throws.Nothing);
+ }
+
[Test]
public void ProcessorOptionsSetOnClient()
{
From be75453102ca29be41e28236664e66078cbf8284 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Mon, 27 Apr 2020 18:12:21 -0700
Subject: [PATCH 3/5] PR comments
---
.../src/Processor/ProcessMessageEventArgs.cs | 14 +++---
.../ProcessSessionMessageEventArgs.cs | 14 +++---
.../src/Processor/ServiceBusProcessor.cs | 4 +-
.../tests/Processor/ProcessorTests.cs | 40 ++++++++--------
.../tests/Processor/SessionProcessorTests.cs | 48 +++++++++----------
5 files changed, 60 insertions(+), 60 deletions(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessMessageEventArgs.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessMessageEventArgs.cs
index c03303800ae0..a0cf1c2b2668 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessMessageEventArgs.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessMessageEventArgs.cs
@@ -26,10 +26,10 @@ public class ProcessMessageEventArgs : EventArgs
public CancellationToken CancellationToken { get; }
///
- /// Indicates whether the user has attempted to settle the message as part of their callback.
+ /// Indicates whether the user has settled the message as part of their callback.
/// If they have done so, we will not autocomplete.
///
- internal bool UserSettled { get; set; }
+ internal bool IsMessageSettled { get; set; }
private readonly ServiceBusReceiver _receiver;
@@ -61,7 +61,7 @@ public async Task AbandonAsync(
{
await _receiver.AbandonAsync(message, propertiesToModify, cancellationToken)
.ConfigureAwait(false);
- UserSettled = true;
+ IsMessageSettled = true;
}
///
@@ -78,7 +78,7 @@ await _receiver.CompleteAsync(
message,
cancellationToken)
.ConfigureAwait(false);
- UserSettled = true;
+ IsMessageSettled = true;
}
///
@@ -101,7 +101,7 @@ await _receiver.DeadLetterAsync(
deadLetterErrorDescription,
cancellationToken)
.ConfigureAwait(false);
- UserSettled = true;
+ IsMessageSettled = true;
}
///
@@ -121,7 +121,7 @@ await _receiver.DeadLetterAsync(
propertiesToModify,
cancellationToken)
.ConfigureAwait(false);
- UserSettled = true;
+ IsMessageSettled = true;
}
///
@@ -141,7 +141,7 @@ await _receiver.DeferAsync(
propertiesToModify,
cancellationToken)
.ConfigureAwait(false);
- UserSettled = true;
+ IsMessageSettled = true;
}
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionMessageEventArgs.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionMessageEventArgs.cs
index d06051f185d9..7a6e11e2f0ab 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionMessageEventArgs.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionMessageEventArgs.cs
@@ -42,10 +42,10 @@ public class ProcessSessionMessageEventArgs : EventArgs
public DateTimeOffset SessionLockedUntil => _sessionReceiver.SessionLockedUntil;
///
- /// Indicates whether the user has attempted to settle the message as part of their callback.
+ /// Indicates whether the user has settled the message as part of their callback.
/// If they have done so, we will not autocomplete.
///
- internal bool UserSettled { get; set; }
+ internal bool IsMessageSettled { get; set; }
///
/// Initializes a new instance of the class.
@@ -113,7 +113,7 @@ public async Task AbandonAsync(
{
await _sessionReceiver.AbandonAsync(message, propertiesToModify, cancellationToken)
.ConfigureAwait(false);
- UserSettled = true;
+ IsMessageSettled = true;
}
///
@@ -136,7 +136,7 @@ await _sessionReceiver.CompleteAsync(
message,
cancellationToken)
.ConfigureAwait(false);
- UserSettled = true;
+ IsMessageSettled = true;
}
///
@@ -167,7 +167,7 @@ await _sessionReceiver.DeadLetterAsync(
deadLetterErrorDescription,
cancellationToken)
.ConfigureAwait(false);
- UserSettled = true;
+ IsMessageSettled = true;
}
///
@@ -195,7 +195,7 @@ await _sessionReceiver.DeadLetterAsync(
propertiesToModify,
cancellationToken)
.ConfigureAwait(false);
- UserSettled = true;
+ IsMessageSettled = true;
}
/// Defers the processing for a message.
@@ -224,7 +224,7 @@ await _sessionReceiver.DeferAsync(
propertiesToModify,
cancellationToken)
.ConfigureAwait(false);
- UserSettled = true;
+ IsMessageSettled = true;
}
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
index 6de4a7544900..b7368be158fc 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
@@ -842,7 +842,7 @@ private async Task ProcessOneMessage(
}
finally
{
- userSettled = args.UserSettled;
+ userSettled = args.IsMessageSettled;
}
}
else
@@ -857,7 +857,7 @@ private async Task ProcessOneMessage(
}
finally
{
- userSettled = args.UserSettled;
+ userSettled = args.IsMessageSettled;
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs
index 03a337ee2bf5..6c951183e3b1 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs
@@ -124,26 +124,26 @@ public async Task UserSettledPropertySetCorrectly()
new Mock().Object,
CancellationToken.None);
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
await args.AbandonAsync(msg);
- Assert.IsTrue(args.UserSettled);
+ Assert.IsTrue(args.IsMessageSettled);
await args.CompleteAsync(msg);
- Assert.IsTrue(args.UserSettled);
+ Assert.IsTrue(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
await args.DeadLetterAsync(msg);
- Assert.IsTrue(args.UserSettled);
+ Assert.IsTrue(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
await args.DeadLetterAsync(msg, "reason");
- Assert.IsTrue(args.UserSettled);
+ Assert.IsTrue(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
await args.DeferAsync(msg);
- Assert.IsTrue(args.UserSettled);
+ Assert.IsTrue(args.IsMessageSettled);
}
[Test]
@@ -192,31 +192,31 @@ public void UserSettledPropertySetCorrectlyOnException()
mockReceiver.Object,
CancellationToken.None);
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
Assert.That(async () => await args.AbandonAsync(msg),
Throws.InstanceOf());
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
Assert.That(async () => await args.CompleteAsync(msg),
Throws.InstanceOf());
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
Assert.That(async () => await args.DeadLetterAsync(msg),
Throws.InstanceOf());
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
Assert.That(async () => await args.DeadLetterAsync(msg, "reason"),
Throws.InstanceOf());
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
Assert.That(async () => await args.DeferAsync(msg),
Throws.InstanceOf());
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
}
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs
index 0a391d068972..4a93888b0c26 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs
@@ -139,35 +139,35 @@ public async Task UserSettledPropertySetCorrectly()
new Mock().Object,
CancellationToken.None);
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
await args.AbandonAsync(msg);
- Assert.IsTrue(args.UserSettled);
+ Assert.IsTrue(args.IsMessageSettled);
await args.CompleteAsync(msg);
- Assert.IsTrue(args.UserSettled);
+ Assert.IsTrue(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
await args.DeadLetterAsync(msg);
- Assert.IsTrue(args.UserSettled);
+ Assert.IsTrue(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
await args.DeadLetterAsync(msg, "reason");
- Assert.IsTrue(args.UserSettled);
+ Assert.IsTrue(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
await args.DeferAsync(msg);
- Assert.IsTrue(args.UserSettled);
+ Assert.IsTrue(args.IsMessageSettled);
// getting or setting session state doesn't count as settling
- args.UserSettled = false;
+ args.IsMessageSettled = false;
await args.GetSessionStateAsync();
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
await args.SetSessionStateAsync(new byte[] { });
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
}
[Test]
@@ -216,31 +216,31 @@ public void UserSettledPropertySetCorrectlyOnException()
mockReceiver.Object,
CancellationToken.None);
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
Assert.That(async () => await args.AbandonAsync(msg),
Throws.InstanceOf());
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
Assert.That(async () => await args.CompleteAsync(msg),
Throws.InstanceOf());
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
Assert.That(async () => await args.DeadLetterAsync(msg),
Throws.InstanceOf());
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
Assert.That(async () => await args.DeadLetterAsync(msg, "reason"),
Throws.InstanceOf());
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
- args.UserSettled = false;
+ args.IsMessageSettled = false;
Assert.That(async () => await args.DeferAsync(msg),
Throws.InstanceOf());
- Assert.IsFalse(args.UserSettled);
+ Assert.IsFalse(args.IsMessageSettled);
}
}
}
From aea085a3ca96b7e81f7f844fa531ca8b6367e230 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Mon, 27 Apr 2020 18:28:32 -0700
Subject: [PATCH 4/5] Flaky test
---
.../tests/Processor/SessionProcessorLiveTests.cs | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
index e5a34a343de7..d37f6999fe54 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
@@ -264,7 +264,16 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
// there is only one message for each session, and one
// thread for each session, so the total messages processed
// should equal the number of threads
- Assert.AreEqual(numThreads, messageCt);
+ if (numThreads == 5)
+ {
+ // when abandoning, it is possible we could process the same message more than once
+ // since the service will make the message available immediately
+ Assert.LessOrEqual(numThreads, messageCt);
+ }
+ else
+ {
+ Assert.AreEqual(numThreads, messageCt);
+ }
// we should have received messages from each of the sessions
Assert.AreEqual(0, sessions.Count);
From 82411ee108fe3c9c331acb8bf3fbf698216c0438 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Mon, 27 Apr 2020 19:34:57 -0700
Subject: [PATCH 5/5] Fix doc strings
---
.../src/Processor/ProcessSessionEventArgs.cs | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionEventArgs.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionEventArgs.cs
index 9127d065ffa2..c1edb86c7451 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionEventArgs.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ProcessSessionEventArgs.cs
@@ -19,7 +19,8 @@ public class ProcessSessionEventArgs : EventArgs
public CancellationToken CancellationToken { get; }
///
- /// The that will be used for all settlement methods for the args.
+ /// The that will be used for setting and getting session
+ /// state.
///
private readonly ServiceBusSessionReceiver _sessionReceiver;