From eed31d7ac353b8d6cae0acb7e014bbfdf8942aad Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Wed, 11 Aug 2021 18:54:05 -0700
Subject: [PATCH 01/14] Add ability to set max calls across all sessions
---
.../src/Processor/ServiceBusProcessor.cs | 21 +++++--
.../Processor/ServiceBusSessionProcessor.cs | 14 ++++-
.../Processor/SessionProcessorLiveTests.cs | 62 +++++++++++++++++++
3 files changed, 91 insertions(+), 6 deletions(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
index bfd70249a8ca..7aa9b274f885 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
@@ -960,16 +960,29 @@ public void UpdateConcurrency(int maxConcurrentCalls)
}
}
- internal void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession)
+ internal void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession, int? maxConcurrentCalls = default)
{
Argument.AssertAtLeast(maxConcurrentSessions, 1, nameof(maxConcurrentSessions));
Argument.AssertAtLeast(maxConcurrentCallsPerSession, 1, nameof(maxConcurrentCallsPerSession));
+ if (maxConcurrentCalls.HasValue)
+ {
+ Argument.AssertAtLeast(maxConcurrentCalls.Value, 1, nameof(maxConcurrentCalls));
+ }
+
lock (_maxConcurrencySyncLock)
{
- _maxConcurrentCalls = _sessionIds.Length > 0
- ? Math.Min(_sessionIds.Length, maxConcurrentSessions)
- : maxConcurrentSessions * maxConcurrentCallsPerSession;
+ if (maxConcurrentCalls.HasValue)
+ {
+ _maxConcurrentCalls = maxConcurrentCalls.Value;
+ }
+ else
+ {
+ _maxConcurrentCalls = _sessionIds.Length > 0
+ ? Math.Min(_sessionIds.Length, maxConcurrentSessions)
+ : maxConcurrentSessions * maxConcurrentCallsPerSession;
+ }
+
_maxConcurrentSessions = maxConcurrentSessions;
_maxConcurrentCallsPerSession = maxConcurrentCallsPerSession;
WakeLoop();
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs
index 30b5b7382cfa..9ac39086c3f3 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs
@@ -310,9 +310,19 @@ public async ValueTask DisposeAsync()
/// property.
/// The new max concurrent calls per session value. This will be reflect in the
/// .
- public void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession)
+ /// If specified, limits the total max concurrent calls to this value. This does not override the
+ /// limits specified in and , but acts as further limit
+ /// to the total calls. As an example, suppose you want to allow 100 concurrent invocations of the message handler,
+ /// and you want to process up to 20 sessions concurrently. You can try setting maxConcurrentSessions to 20, and maxConcurrentCallsPerSession to 5.
+ /// However, in practice, your queue might typically have only 10 sessions with messages at a given time. So in order to achieve
+ /// your desired throughput, you can instead set maxConcurrentCallsPerSession to 10. This would mean that if your queue ever did have
+ /// 20 sessions at a time, you would be doing 200 invocations. In order to prevent this, you can set maxConcurrentCalls to 100.
+ /// This allows the processor to attempt to scale up to the maxConcurrentCallsPerSession when the number of available sessions is lower,
+ /// while still being able to accept new sessions without breaking your throughput requirement as the number of available sessions
+ /// increases.
+ public void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession, int? maxConcurrentCalls = default)
{
- InnerProcessor.UpdateConcurrency(maxConcurrentSessions, maxConcurrentCallsPerSession);
+ InnerProcessor.UpdateConcurrency(maxConcurrentSessions, maxConcurrentCallsPerSession, maxConcurrentCalls);
}
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
index c0c199f0459f..e18d36205bc0 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
@@ -1935,6 +1935,68 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
}
}
+ [Test]
+ public async Task CanUpdateMaxCallsAcrossAllSessionsConcurrency()
+ {
+ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
+ {
+ await using var client = CreateClient();
+ var sender = client.CreateSender(scope.QueueName);
+ int messageCount = 100;
+ await sender.SendMessagesAsync(GetMessages(messageCount, "sessionId"));
+
+ await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
+ {
+ MaxConcurrentSessions = 1,
+ MaxConcurrentCallsPerSession = 1
+ });
+
+ int receivedCount = 0;
+ var tcs = new TaskCompletionSource();
+
+ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
+ {
+ if (args.CancellationToken.IsCancellationRequested)
+ {
+ await args.AbandonMessageAsync(args.Message);
+ }
+
+ var ct = Interlocked.Increment(ref receivedCount);
+ if (ct == messageCount)
+ {
+ tcs.SetResult(true);
+ }
+
+ if (ct == 5)
+ {
+ processor.UpdateConcurrency(5, 20, 50);
+ Assert.AreEqual(5, processor.MaxConcurrentSessions);
+ Assert.AreEqual(20, processor.MaxConcurrentCallsPerSession);
+ }
+ if (ct == 50)
+ {
+ // tasks will generally be 50 here, but allow some forgiveness as this is not deterministic
+ Assert.GreaterOrEqual(processor.InnerProcessor._tasks.Count, 25);
+ Assert.LessOrEqual(processor.InnerProcessor._tasks.Count, 50);
+ processor.UpdateConcurrency(1, 1);
+ Assert.AreEqual(1, processor.MaxConcurrentSessions);
+ Assert.AreEqual(1, processor.MaxConcurrentCallsPerSession);
+ }
+ if (ct == 95)
+ {
+ Assert.LessOrEqual(processor.InnerProcessor._tasks.Where(t => !t.Task.IsCompleted).Count(), 1);
+ }
+ }
+
+ processor.ProcessMessageAsync += ProcessMessage;
+ processor.ProcessErrorAsync += SessionErrorHandler;
+
+ await processor.StartProcessingAsync();
+ await tcs.Task;
+ await processor.StopProcessingAsync();
+ }
+ }
+
private Task SessionErrorHandler(ProcessErrorEventArgs args)
{
// If the connection drops due to network flakiness
From 8e2c96bfec8a832935abe46e4d77fb91dcf53002 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 12 Aug 2021 11:35:17 -0700
Subject: [PATCH 02/14] Add to options
---
...ure.Messaging.ServiceBus.netstandard2.0.cs | 4 ++-
.../src/Processor/ServiceBusProcessor.cs | 26 +++++++++-----
.../Processor/ServiceBusSessionProcessor.cs | 16 ++++++---
.../ServiceBusSessionProcessorOptions.cs | 35 +++++++++++++++++++
.../Processor/SessionProcessorLiveTests.cs | 26 ++++++++------
5 files changed, 83 insertions(+), 24 deletions(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs
index 9d8417064ef8..201beca2c985 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs
@@ -384,6 +384,7 @@ protected ServiceBusSessionProcessor(Azure.Messaging.ServiceBus.ServiceBusClient
public virtual bool IsClosed { get { throw null; } }
public virtual bool IsProcessing { get { throw null; } }
public virtual System.TimeSpan MaxAutoLockRenewalDuration { get { throw null; } }
+ public virtual int MaxConcurrentCallsAcrossAllSessions { get { throw null; } }
public virtual int MaxConcurrentCallsPerSession { get { throw null; } }
public virtual int MaxConcurrentSessions { get { throw null; } }
public virtual int PrefetchCount { get { throw null; } }
@@ -407,13 +408,14 @@ protected ServiceBusSessionProcessor(Azure.Messaging.ServiceBus.ServiceBusClient
public virtual System.Threading.Tasks.Task StopProcessingAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
- public void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession) { }
+ public void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession, int? maxConcurrentCallsAcrossAllSessions = default(int?)) { }
}
public partial class ServiceBusSessionProcessorOptions
{
public ServiceBusSessionProcessorOptions() { }
public bool AutoCompleteMessages { get { throw null; } set { } }
public System.TimeSpan MaxAutoLockRenewalDuration { get { throw null; } set { } }
+ public int? MaxConcurrentCallsAcrossAllSessions { get { throw null; } set { } }
public int MaxConcurrentCallsPerSession { get { throw null; } set { } }
public int MaxConcurrentSessions { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
index 7aa9b274f885..c9b207cb00cd 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
@@ -188,7 +188,6 @@ public virtual bool IsClosed
///
/// Initializes a new instance of the class.
///
- ///
/// The connection to use for communication with the Service Bus service.
/// The queue name or subscription path to process messages from.
/// Whether or not the processor is associated with a session entity.
@@ -199,6 +198,7 @@ public virtual bool IsClosed
/// Only applies if isSessionEntity is true.
/// The max number of concurrent calls per session.
/// Only applies if isSessionEntity is true.
+ /// The max number of concurrent calls across all sessions.
/// If this is for a session processor, will contain the session processor instance.
internal ServiceBusProcessor(
ServiceBusConnection connection,
@@ -208,6 +208,7 @@ internal ServiceBusProcessor(
string[] sessionIds = default,
int maxConcurrentSessions = default,
int maxConcurrentCallsPerSession = default,
+ int? maxConcurrentCallsAcrossAllSessions = default,
ServiceBusSessionProcessor sessionProcessor = default)
{
Argument.AssertNotNullOrWhiteSpace(entityPath, nameof(entityPath));
@@ -232,9 +233,16 @@ internal ServiceBusProcessor(
if (isSessionEntity)
{
- _maxConcurrentCalls = _sessionIds.Length > 0
- ? Math.Min(_sessionIds.Length, _maxConcurrentSessions)
- : _maxConcurrentSessions * _maxConcurrentCallsPerSession;
+ if (maxConcurrentCallsAcrossAllSessions.HasValue)
+ {
+ _maxConcurrentCalls = maxConcurrentCallsAcrossAllSessions.Value;
+ }
+ else
+ {
+ _maxConcurrentCalls = _sessionIds.Length > 0
+ ? Math.Min(_sessionIds.Length, _maxConcurrentSessions)
+ : _maxConcurrentSessions * _maxConcurrentCallsPerSession;
+ }
}
var maxAcceptSessions = Math.Min(_maxConcurrentCalls, 2 * Environment.ProcessorCount);
@@ -960,21 +968,21 @@ public void UpdateConcurrency(int maxConcurrentCalls)
}
}
- internal void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession, int? maxConcurrentCalls = default)
+ internal void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession, int? maxConcurrentCallsAcrossAllSessions = default)
{
Argument.AssertAtLeast(maxConcurrentSessions, 1, nameof(maxConcurrentSessions));
Argument.AssertAtLeast(maxConcurrentCallsPerSession, 1, nameof(maxConcurrentCallsPerSession));
- if (maxConcurrentCalls.HasValue)
+ if (maxConcurrentCallsAcrossAllSessions.HasValue)
{
- Argument.AssertAtLeast(maxConcurrentCalls.Value, 1, nameof(maxConcurrentCalls));
+ Argument.AssertAtLeast(maxConcurrentCallsAcrossAllSessions.Value, 1, nameof(maxConcurrentCallsAcrossAllSessions));
}
lock (_maxConcurrencySyncLock)
{
- if (maxConcurrentCalls.HasValue)
+ if (maxConcurrentCallsAcrossAllSessions.HasValue)
{
- _maxConcurrentCalls = maxConcurrentCalls.Value;
+ _maxConcurrentCalls = maxConcurrentCallsAcrossAllSessions.Value;
}
else
{
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs
index 9ac39086c3f3..23ec44e6dae1 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs
@@ -79,6 +79,13 @@ public class ServiceBusSessionProcessor : IAsyncDisposable
///
public virtual int MaxConcurrentCallsPerSession => InnerProcessor.MaxConcurrentCallsPerSession;
+ /// Gets the maximum number of concurrent calls to the
+ /// message handler the processor should initiate across all sessions.
+ ///
+ ///
+ /// The maximum number of concurrent calls to the message handler.
+ public virtual int MaxConcurrentCallsAcrossAllSessions => InnerProcessor.MaxConcurrentCalls;
+
///
public virtual string FullyQualifiedNamespace => InnerProcessor.FullyQualifiedNamespace;
@@ -104,6 +111,7 @@ internal ServiceBusSessionProcessor(
options.SessionIds.ToArray(),
options.MaxConcurrentSessions,
options.MaxConcurrentCallsPerSession,
+ options.MaxConcurrentCallsAcrossAllSessions,
this);
}
@@ -310,19 +318,19 @@ public async ValueTask DisposeAsync()
/// property.
/// The new max concurrent calls per session value. This will be reflect in the
/// .
- /// If specified, limits the total max concurrent calls to this value. This does not override the
+ /// If specified, limits the total max concurrent calls to this value. This does not override the
/// limits specified in and , but acts as further limit
/// to the total calls. As an example, suppose you want to allow 100 concurrent invocations of the message handler,
/// and you want to process up to 20 sessions concurrently. You can try setting maxConcurrentSessions to 20, and maxConcurrentCallsPerSession to 5.
/// However, in practice, your queue might typically have only 10 sessions with messages at a given time. So in order to achieve
/// your desired throughput, you can instead set maxConcurrentCallsPerSession to 10. This would mean that if your queue ever did have
- /// 20 sessions at a time, you would be doing 200 invocations. In order to prevent this, you can set maxConcurrentCalls to 100.
+ /// 20 sessions at a time, you would be doing 200 invocations. In order to prevent this, you can set maxConcurrentCallsAcrossAllSessions to 100.
/// This allows the processor to attempt to scale up to the maxConcurrentCallsPerSession when the number of available sessions is lower,
/// while still being able to accept new sessions without breaking your throughput requirement as the number of available sessions
/// increases.
- public void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession, int? maxConcurrentCalls = default)
+ public void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession, int? maxConcurrentCallsAcrossAllSessions = default)
{
- InnerProcessor.UpdateConcurrency(maxConcurrentSessions, maxConcurrentCallsPerSession, maxConcurrentCalls);
+ InnerProcessor.UpdateConcurrency(maxConcurrentSessions, maxConcurrentCallsPerSession, maxConcurrentCallsAcrossAllSessions);
}
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessorOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessorOptions.cs
index b104f874be50..3ddb7646116e 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessorOptions.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessorOptions.cs
@@ -138,6 +138,41 @@ public int MaxConcurrentCallsPerSession
}
private int _maxConcurrentCallsPerSessions = 1;
+ /// Gets or sets the maximum number of concurrent calls to the
+ /// message handler the processor should initiate across all sessions. If not specified, this value
+ /// will be computed based on the and properties.
+ ///
+ ///
+ /// The maximum number of concurrent calls to the message handler.
+ ///
+ /// A value that is not positive is attempted to be set for the property.
+ ///
+ ///
+ /// This does not override the limits specified in and , but acts as further limit
+ /// to the total calls. As an example, suppose you want to allow 100 concurrent invocations of the message handler,
+ /// and you want to process up to 20 sessions concurrently. You can try setting MaxConcurrentSessions to 20, and MaxConcurrentCallsPerSession to 5.
+ /// However, in practice, your queue might typically have only 10 sessions with messages at a given time. So in order to achieve
+ /// your desired throughput, you can instead set MaxConcurrentCallsPerSession to 10. This would mean that if your queue ever did have
+ /// 20 sessions at a time, you would be doing 200 invocations. In order to prevent this, you can set MaxConcurrentCallsAcrossAllSessions to 100.
+ /// This allows the processor to attempt to scale up to the MaxConcurrentCallsPerSession when the number of available sessions is lower,
+ /// while still being able to accept new sessions without breaking your throughput requirement as the number of available sessions
+ /// increases.
+ ///
+ public int? MaxConcurrentCallsAcrossAllSessions
+ {
+ get => _maxConcurrentCallsAcrossAllSessions;
+
+ set
+ {
+ if (value.HasValue)
+ {
+ Argument.AssertAtLeast(value.Value, 1, nameof(MaxConcurrentCallsAcrossAllSessions));
+ _maxConcurrentCallsAcrossAllSessions = value;
+ }
+ }
+ }
+ private int? _maxConcurrentCallsAcrossAllSessions;
+
///
/// Gets an optional list of session IDs to scope
/// the to. If the list is
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
index e18d36205bc0..fa054c80605a 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
@@ -1943,12 +1943,14 @@ public async Task CanUpdateMaxCallsAcrossAllSessionsConcurrency()
await using var client = CreateClient();
var sender = client.CreateSender(scope.QueueName);
int messageCount = 100;
- await sender.SendMessagesAsync(GetMessages(messageCount, "sessionId"));
+ await sender.SendMessagesAsync(GetMessages(10, "sessionId1"));
+ await sender.SendMessagesAsync(GetMessages(10, "sessionId2"));
await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
{
- MaxConcurrentSessions = 1,
- MaxConcurrentCallsPerSession = 1
+ MaxConcurrentSessions = 10,
+ MaxConcurrentCallsPerSession = 10,
+ MaxConcurrentCallsAcrossAllSessions = 50
});
int receivedCount = 0;
@@ -1969,22 +1971,26 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
if (ct == 5)
{
- processor.UpdateConcurrency(5, 20, 50);
- Assert.AreEqual(5, processor.MaxConcurrentSessions);
- Assert.AreEqual(20, processor.MaxConcurrentCallsPerSession);
+ Assert.LessOrEqual(processor.InnerProcessor._tasks.Count, processor.MaxConcurrentCallsAcrossAllSessions);
+ // send messages to a bunch of sessions
+ for (int i = 0; i < 40; i++)
+ {
+ await sender.SendMessagesAsync(GetMessages(2, $"session{i}"));
+ }
}
if (ct == 50)
{
// tasks will generally be 50 here, but allow some forgiveness as this is not deterministic
Assert.GreaterOrEqual(processor.InnerProcessor._tasks.Count, 25);
Assert.LessOrEqual(processor.InnerProcessor._tasks.Count, 50);
- processor.UpdateConcurrency(1, 1);
- Assert.AreEqual(1, processor.MaxConcurrentSessions);
- Assert.AreEqual(1, processor.MaxConcurrentCallsPerSession);
+ processor.UpdateConcurrency(50, 10, 20);
+ Assert.AreEqual(50, processor.MaxConcurrentSessions);
+ Assert.AreEqual(10, processor.MaxConcurrentCallsPerSession);
+ Assert.AreEqual(20, processor.MaxConcurrentCallsAcrossAllSessions);
}
if (ct == 95)
{
- Assert.LessOrEqual(processor.InnerProcessor._tasks.Where(t => !t.Task.IsCompleted).Count(), 1);
+ Assert.LessOrEqual(processor.InnerProcessor._tasks.Where(t => !t.Task.IsCompleted).Count(), processor.MaxConcurrentCallsAcrossAllSessions);
}
}
From 2092b0175c1a76fcaa94a61d7a1a54f8ec1388b4 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 12 Aug 2021 13:12:22 -0700
Subject: [PATCH 03/14] Include more details in test error handler
---
.../tests/Processor/SessionProcessorLiveTests.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
index fa054c80605a..5ed0972dfe1c 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
@@ -2012,7 +2012,7 @@ private Task SessionErrorHandler(ProcessErrorEventArgs args)
// that the message will be completed eventually.
if (args.Exception is not ServiceBusException { Reason: ServiceBusFailureReason.SessionLockLost })
{
- Assert.Fail(args.Exception.ToString());
+ Assert.Fail($"Error source: {args.ErrorSource}, Exception: {args.Exception}");
}
return Task.CompletedTask;
}
From 02c83a02d6cbba5b04959c4e3edc6b38e48ba7e9 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 12 Aug 2021 14:08:55 -0700
Subject: [PATCH 04/14] fix test
---
.../tests/Processor/SessionProcessorLiveTests.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
index 5ed0972dfe1c..d86294ef5b9c 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
@@ -1982,7 +1982,7 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
{
// tasks will generally be 50 here, but allow some forgiveness as this is not deterministic
Assert.GreaterOrEqual(processor.InnerProcessor._tasks.Count, 25);
- Assert.LessOrEqual(processor.InnerProcessor._tasks.Count, 50);
+ Assert.LessOrEqual(processor.InnerProcessor._tasks.Count, 60);
processor.UpdateConcurrency(50, 10, 20);
Assert.AreEqual(50, processor.MaxConcurrentSessions);
Assert.AreEqual(10, processor.MaxConcurrentCallsPerSession);
From b130ff50535abc91c9f365b6b5bc5481beab2c8d Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 12 Aug 2021 16:44:10 -0700
Subject: [PATCH 05/14] Throw exception of MaxCallsAcrossSessions is larger
than allowed
---
.../src/Client/ServiceBusClient.cs | 4 ++
.../src/Processor/ServiceBusProcessor.cs | 45 ++++++++++---------
.../Processor/ServiceBusSessionProcessor.cs | 4 ++
.../src/Resources.Designer.cs | 9 ++++
.../src/Resources.resx | 3 ++
.../Processor/SessionProcessorLiveTests.cs | 12 +++++
6 files changed, 56 insertions(+), 21 deletions(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Client/ServiceBusClient.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Client/ServiceBusClient.cs
index c64278b05366..b80a2bc8a305 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Client/ServiceBusClient.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Client/ServiceBusClient.cs
@@ -639,6 +639,10 @@ public virtual ServiceBusProcessor CreateProcessor(
/// The set of to use for configuring the
/// .
/// A scoped to the specified queue.
+ ///
+ /// The value specified for was greater than the product of
+ /// and .
+ ///
public virtual ServiceBusSessionProcessor CreateSessionProcessor(
string queueName,
ServiceBusSessionProcessorOptions options = default)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
index c9b207cb00cd..99ea7fa402af 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
@@ -233,16 +233,7 @@ internal ServiceBusProcessor(
if (isSessionEntity)
{
- if (maxConcurrentCallsAcrossAllSessions.HasValue)
- {
- _maxConcurrentCalls = maxConcurrentCallsAcrossAllSessions.Value;
- }
- else
- {
- _maxConcurrentCalls = _sessionIds.Length > 0
- ? Math.Min(_sessionIds.Length, _maxConcurrentSessions)
- : _maxConcurrentSessions * _maxConcurrentCallsPerSession;
- }
+ SetMaxConcurrentCallsAcrossSessions(maxConcurrentCallsAcrossAllSessions);
}
var maxAcceptSessions = Math.Min(_maxConcurrentCalls, 2 * Environment.ProcessorCount);
@@ -980,23 +971,35 @@ internal void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCall
lock (_maxConcurrencySyncLock)
{
- if (maxConcurrentCallsAcrossAllSessions.HasValue)
- {
- _maxConcurrentCalls = maxConcurrentCallsAcrossAllSessions.Value;
- }
- else
- {
- _maxConcurrentCalls = _sessionIds.Length > 0
- ? Math.Min(_sessionIds.Length, maxConcurrentSessions)
- : maxConcurrentSessions * maxConcurrentCallsPerSession;
- }
-
_maxConcurrentSessions = maxConcurrentSessions;
_maxConcurrentCallsPerSession = maxConcurrentCallsPerSession;
+
+ SetMaxConcurrentCallsAcrossSessions(maxConcurrentCallsAcrossAllSessions);
WakeLoop();
}
}
+ private void SetMaxConcurrentCallsAcrossSessions(int? maxConcurrentCallsAcrossAllSessions)
+ {
+ int calculatedMaxConcurrentCalls = _sessionIds.Length > 0
+ ? Math.Min(_sessionIds.Length, _maxConcurrentSessions)
+ : _maxConcurrentSessions * _maxConcurrentCallsPerSession;
+ if (maxConcurrentCallsAcrossAllSessions.HasValue)
+ {
+ if (maxConcurrentCallsAcrossAllSessions.Value > calculatedMaxConcurrentCalls)
+ {
+ throw new ArgumentOutOfRangeException(Resources.InvalidMaxConcurrentCalls);
+ }
+ _maxConcurrentCalls = maxConcurrentCallsAcrossAllSessions.Value;
+ }
+ else
+ {
+ _maxConcurrentCalls = _sessionIds.Length > 0
+ ? Math.Min(_sessionIds.Length, _maxConcurrentSessions)
+ : _maxConcurrentSessions * _maxConcurrentCallsPerSession;
+ }
+ }
+
private void WakeLoop()
{
// wake up the handler loop
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs
index 23ec44e6dae1..87af7e8ce974 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs
@@ -328,6 +328,10 @@ public async ValueTask DisposeAsync()
/// This allows the processor to attempt to scale up to the maxConcurrentCallsPerSession when the number of available sessions is lower,
/// while still being able to accept new sessions without breaking your throughput requirement as the number of available sessions
/// increases.
+ ///
+ /// The value specified for was greater than the product of
+ /// and .
+ ///
public void UpdateConcurrency(int maxConcurrentSessions, int maxConcurrentCallsPerSession, int? maxConcurrentCallsAcrossAllSessions = default)
{
InnerProcessor.UpdateConcurrency(maxConcurrentSessions, maxConcurrentCallsPerSession, maxConcurrentCallsAcrossAllSessions);
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.Designer.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.Designer.cs
index a52a6ad8d051..c7de8a906845 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.Designer.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.Designer.cs
@@ -330,6 +330,15 @@ internal static string InvalidFullyQualifiedNamespace {
}
}
+ ///
+ /// Looks up a localized string similar to The value for MaxConcurrentCallsAcrossAllSessions is greater than the product of MaxConcurrentSessions and MaxConcurrentCallsPerSessions. Either set the value so that it is lower than this product, or increase the values of MaxConcurrentSessions and MaxConcurrentCallsPerSession so that the product exceeds the value you are attempting to set here..
+ ///
+ internal static string InvalidMaxConcurrentCalls {
+ get {
+ return ResourceManager.GetString("InvalidMaxConcurrentCalls", resourceCulture);
+ }
+ }
+
///
/// Looks up a localized string similar to The shared access signature could not be parsed; it was either malformed or incorrectly encoded..
///
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.resx b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.resx
index 3c589cf6285c..3d9613b28265 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.resx
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.resx
@@ -316,4 +316,7 @@
An attempt was made to reconnect the {0} sender link as part of a cross-entity transaction. This could not be completed successfully because the {1} receiver
needs to be re-established first. Attempting the transaction again will allow the correct link to be established first.
+
+ The value for MaxConcurrentCallsAcrossAllSessions is greater than the product of MaxConcurrentSessions and MaxConcurrentCallsPerSessions. Either set the value so that it is lower than this product, or increase the values of MaxConcurrentSessions and MaxConcurrentCallsPerSession so that the product exceeds the value you are attempting to set here.
+
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
index d86294ef5b9c..dd74f3623c3e 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
@@ -1946,6 +1946,15 @@ public async Task CanUpdateMaxCallsAcrossAllSessionsConcurrency()
await sender.SendMessagesAsync(GetMessages(10, "sessionId1"));
await sender.SendMessagesAsync(GetMessages(10, "sessionId2"));
+ Assert.That(
+ () => client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
+ {
+ MaxConcurrentSessions = 10,
+ MaxConcurrentCallsPerSession = 10,
+ MaxConcurrentCallsAcrossAllSessions = 500
+ }),
+ Throws.InstanceOf());
+
await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
{
MaxConcurrentSessions = 10,
@@ -1983,6 +1992,9 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
// tasks will generally be 50 here, but allow some forgiveness as this is not deterministic
Assert.GreaterOrEqual(processor.InnerProcessor._tasks.Count, 25);
Assert.LessOrEqual(processor.InnerProcessor._tasks.Count, 60);
+ Assert.That(
+ () => processor.UpdateConcurrency(10, 10, 200),
+ Throws.InstanceOf());
processor.UpdateConcurrency(50, 10, 20);
Assert.AreEqual(50, processor.MaxConcurrentSessions);
Assert.AreEqual(10, processor.MaxConcurrentCallsPerSession);
From af22c6420c15b007b090a4b157976e3a3816447d Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 12 Aug 2021 19:53:57 -0700
Subject: [PATCH 06/14] Fix test
---
.../tests/Processor/SessionProcessorLiveTests.cs | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
index dd74f3623c3e..11a2c97e4a4f 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
@@ -1953,7 +1953,7 @@ public async Task CanUpdateMaxCallsAcrossAllSessionsConcurrency()
MaxConcurrentCallsPerSession = 10,
MaxConcurrentCallsAcrossAllSessions = 500
}),
- Throws.InstanceOf());
+ Throws.InstanceOf());
await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
{
@@ -1994,7 +1994,7 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
Assert.LessOrEqual(processor.InnerProcessor._tasks.Count, 60);
Assert.That(
() => processor.UpdateConcurrency(10, 10, 200),
- Throws.InstanceOf());
+ Throws.InstanceOf());
processor.UpdateConcurrency(50, 10, 20);
Assert.AreEqual(50, processor.MaxConcurrentSessions);
Assert.AreEqual(10, processor.MaxConcurrentCallsPerSession);
From 7aa7565a770cca7449ac9ee29446d6b2377db7ac Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Sat, 14 Aug 2021 21:55:09 -0700
Subject: [PATCH 07/14] PR FB
---
.../src/Processor/ServiceBusProcessor.cs | 39 +++++++++++++------
1 file changed, 27 insertions(+), 12 deletions(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
index 99ea7fa402af..0f2760073f45 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
@@ -46,10 +46,10 @@ public class ServiceBusProcessor : IAsyncDisposable
/// The primitive for ensuring that the service is not overloaded with
/// accept session requests.
///
- private SemaphoreSlim MaxConcurrentAcceptSessionsSemaphore { get; }
+ private readonly SemaphoreSlim _maxConcurrentAcceptSessionsSemaphore = new(0, int.MaxValue);
/// The primitive for synchronizing access during start and close operations.
- private readonly SemaphoreSlim _processingStartStopSemaphore = new SemaphoreSlim(1, 1);
+ private readonly SemaphoreSlim _processingStartStopSemaphore = new(1, 1);
private CancellationTokenSource RunningTaskTokenSource { get; set; }
@@ -123,6 +123,8 @@ public class ServiceBusProcessor : IAsyncDisposable
internal int MaxConcurrentCallsPerSession => _maxConcurrentCallsPerSession;
private volatile int _maxConcurrentCallsPerSession;
+ private int _currentAcceptSessions;
+
internal TimeSpan? MaxReceiveWaitTime { get; }
///
@@ -184,6 +186,7 @@ public virtual bool IsClosed
internal readonly List<(Task Task, CancellationTokenSource Cts, ReceiverManager ReceiverManager)> _tasks = new();
private readonly List _orphanedReceiverManagers = new();
private CancellationTokenSource _handlerCts = new();
+ private int MaxAcceptSessions => Math.Min(_maxConcurrentCalls, 2 * Environment.ProcessorCount);
///
/// Initializes a new instance of the class.
@@ -236,11 +239,6 @@ internal ServiceBusProcessor(
SetMaxConcurrentCallsAcrossSessions(maxConcurrentCallsAcrossAllSessions);
}
- var maxAcceptSessions = Math.Min(_maxConcurrentCalls, 2 * Environment.ProcessorCount);
- MaxConcurrentAcceptSessionsSemaphore = new SemaphoreSlim(
- maxAcceptSessions,
- maxAcceptSessions);
-
AutoCompleteMessages = Options.AutoCompleteMessages;
IsSessionProcessor = isSessionEntity;
@@ -606,7 +604,7 @@ private void ReconcileReceiverManagers(int maxConcurrentSessions)
new SessionReceiverManager(
_sessionProcessor,
sessionId,
- MaxConcurrentAcceptSessionsSemaphore,
+ _maxConcurrentAcceptSessionsSemaphore,
_scopeFactory,
KeepOpenOnReceiveTimeout));
}
@@ -633,7 +631,7 @@ private void ReconcileReceiverManagers(int maxConcurrentSessions)
new SessionReceiverManager(
_sessionProcessor,
null,
- MaxConcurrentAcceptSessionsSemaphore,
+ _maxConcurrentAcceptSessionsSemaphore,
_scopeFactory,
KeepOpenOnReceiveTimeout));
}
@@ -994,9 +992,7 @@ private void SetMaxConcurrentCallsAcrossSessions(int? maxConcurrentCallsAcrossAl
}
else
{
- _maxConcurrentCalls = _sessionIds.Length > 0
- ? Math.Min(_sessionIds.Length, _maxConcurrentSessions)
- : _maxConcurrentSessions * _maxConcurrentCallsPerSession;
+ _maxConcurrentCalls = calculatedMaxConcurrentCalls;
}
}
@@ -1055,6 +1051,25 @@ private async Task ReconcileConcurrencyAsync()
}
}
+ if (IsSessionProcessor)
+ {
+ int maxAcceptSessions = Math.Min(maxConcurrentCalls, 2 * Environment.ProcessorCount);
+ int diffAcceptSessions = maxAcceptSessions - _currentAcceptSessions;
+ if (diffAcceptSessions > 0)
+ {
+ _maxConcurrentAcceptSessionsSemaphore.Release(diffAcceptSessions);
+ }
+ else
+ {
+ int diffAcceptLimit = Math.Abs(diffAcceptSessions);
+ for (int i = 0; i < diffAcceptLimit; i++)
+ {
+ await _maxConcurrentAcceptSessionsSemaphore.WaitAsync().ConfigureAwait(false);
+ }
+ }
+ _currentAcceptSessions = maxAcceptSessions;
+ }
+
ReconcileReceiverManagers(maxConcurrentSessions);
_currentConcurrentCalls = maxConcurrentCalls;
From 7cd171a28c706059e75508795cc8c14789431135 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Sat, 14 Aug 2021 21:56:02 -0700
Subject: [PATCH 08/14] Remove unused
---
.../src/Processor/ServiceBusProcessor.cs | 1 -
1 file changed, 1 deletion(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
index 0f2760073f45..f17bda78aa69 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs
@@ -186,7 +186,6 @@ public virtual bool IsClosed
internal readonly List<(Task Task, CancellationTokenSource Cts, ReceiverManager ReceiverManager)> _tasks = new();
private readonly List _orphanedReceiverManagers = new();
private CancellationTokenSource _handlerCts = new();
- private int MaxAcceptSessions => Math.Min(_maxConcurrentCalls, 2 * Environment.ProcessorCount);
///
/// Initializes a new instance of the class.
From 38d6f8240b821a94fab5346543e277f72c40a0cd Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Sat, 14 Aug 2021 22:01:59 -0700
Subject: [PATCH 09/14] flaky test
---
.../tests/Processor/SessionProcessorTests.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs
index 6bfd6bb71697..93aa43f9fa7e 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs
@@ -385,7 +385,7 @@ public async Task CloseRespectsCancellationToken()
var cts = new CancellationTokenSource();
// mutate the cancellation token to distinguish it from CancellationToken.None
- cts.CancelAfter(100);
+ cts.CancelAfter(500);
await mockSessionProcessor.Object.CloseAsync(cts.Token);
mockProcessor.Verify(p => p.StopProcessingAsync(It.Is(ct => ct == cts.Token)));
From 7e065b7da8addcda93c813104be6ea80014842ae Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Sat, 14 Aug 2021 22:35:41 -0700
Subject: [PATCH 10/14] Tweak test
---
.../tests/Processor/SessionProcessorLiveTests.cs | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
index 11a2c97e4a4f..8fd7d40ae521 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
@@ -1822,7 +1822,7 @@ public async Task CanUpdateSessionConcurrency()
{
MaxConcurrentSessions = 1,
MaxConcurrentCallsPerSession = 1,
- SessionIdleTimeout = TimeSpan.FromSeconds(5)
+ SessionIdleTimeout = TimeSpan.FromSeconds(3)
});
int receivedCount = 0;
@@ -1852,7 +1852,7 @@ Task ProcessMessage(ProcessSessionMessageEventArgs args)
{
Assert.GreaterOrEqual(processor.InnerProcessor._tasks.Count, 20);
}
- if (ct == 90)
+ if (ct == 75)
{
processor.UpdateConcurrency(1, 1);
Assert.AreEqual(1, processor.MaxConcurrentSessions);
@@ -1959,7 +1959,8 @@ public async Task CanUpdateMaxCallsAcrossAllSessionsConcurrency()
{
MaxConcurrentSessions = 10,
MaxConcurrentCallsPerSession = 10,
- MaxConcurrentCallsAcrossAllSessions = 50
+ MaxConcurrentCallsAcrossAllSessions = 50,
+ SessionIdleTimeout = TimeSpan.FromSeconds(3)
});
int receivedCount = 0;
From 97686aab7b003891f5523e69c93e18be7574c596 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Sun, 15 Aug 2021 14:20:41 -0700
Subject: [PATCH 11/14] Flaky tests
---
.../src/Processor/SessionReceiverManager.cs | 5 +----
.../tests/Processor/ProcessorTests.cs | 2 +-
.../tests/Receiver/ReceiverLiveTests.cs | 2 ++
3 files changed, 4 insertions(+), 5 deletions(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/SessionReceiverManager.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/SessionReceiverManager.cs
index a1cbeec891ce..9f4d0875437e 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/SessionReceiverManager.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/SessionReceiverManager.cs
@@ -295,10 +295,7 @@ await ProcessOneMessageWithinScopeAsync(
}
}
catch (Exception ex)
- when (ex is not TaskCanceledException ||
- // If the user manually throws a TCE, then we should log it.
- (!_sessionCancellationSource.IsCancellationRequested &&
- !processorCancellationToken.IsCancellationRequested))
+ when (ex is not TaskCanceledException)
{
if (ex is ServiceBusException sbException)
{
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs
index 39b2c0330f0d..b977ca93116f 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs
@@ -425,7 +425,7 @@ public async Task CloseRespectsCancellationToken()
var cts = new CancellationTokenSource();
// mutate the cancellation token to distinguish it from CancellationToken.None
- cts.CancelAfter(100);
+ cts.CancelAfter(500);
await mockProcessor.Object.CloseAsync(cts.Token);
mockProcessor.Verify(p => p.StopProcessingAsync(It.Is(ct => ct == cts.Token)));
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs
index 0cb38dd720d1..dc65e3ad067e 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs
@@ -57,6 +57,8 @@ await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession:
await using var receiverWithPrefetch = client.CreateReceiver(scope.QueueName,
options: new ServiceBusReceiverOptions() {PrefetchCount = 10});
+ // establish the receive link
+ await receiverWithPrefetch.ReceiveMessageAsync(TimeSpan.FromSeconds(5));
var stopwatch = new Stopwatch();
stopwatch.Start();
await receiverWithPrefetch.ReceiveMessagesAsync(10, TimeSpan.Zero).ConfigureAwait(false);
From 1ce7a8da9e4d09a9f5dfcf95c1a3c81e0c393914 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Sun, 15 Aug 2021 15:54:00 -0700
Subject: [PATCH 12/14] Improve event source message
---
.../src/Diagnostics/ServiceBusEventSource.cs | 4 ++--
.../tests/Processor/SessionProcessorLiveTests.cs | 5 ++---
2 files changed, 4 insertions(+), 5 deletions(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs
index d593c5a6d149..f4662a1cb879 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs
@@ -881,7 +881,7 @@ public void ProcessorClientClosedException(string identifier)
}
}
- [Event(ProcessorStoppingReceiveCanceledEvent, Level = EventLevel.Verbose, Message = "A receive operation was cancelled while stopping the processor. (Identifier '{0}'). Error Message: '{1}'")]
+ [Event(ProcessorStoppingReceiveCanceledEvent, Level = EventLevel.Verbose, Message = "A receive operation was cancelled while stopping the processor or scaling down concurrency. (Identifier '{0}'). Error Message: '{1}'")]
public void ProcessorStoppingReceiveCanceled(string identifier, string exception)
{
if (IsEnabled())
@@ -890,7 +890,7 @@ public void ProcessorStoppingReceiveCanceled(string identifier, string exception
}
}
- [Event(ProcessorStoppingAcceptSessionCanceledEvent, Level = EventLevel.Verbose, Message = "An accept session operation was cancelled while stopping the processor. (Namespace '{0}', Entity path '{1}'). Error Message: '{2}'")]
+ [Event(ProcessorStoppingAcceptSessionCanceledEvent, Level = EventLevel.Verbose, Message = "An accept session operation was cancelled while stopping the processor or scaling down concurrency. (Namespace '{0}', Entity path '{1}'). Error Message: '{2}'")]
public void ProcessorStoppingAcceptSessionCanceled(string fullyQualifiedNamespace, string entityPath, string exception)
{
if (IsEnabled())
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
index 8fd7d40ae521..1c83cfd75220 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs
@@ -1828,11 +1828,11 @@ public async Task CanUpdateSessionConcurrency()
int receivedCount = 0;
var tcs = new TaskCompletionSource();
- Task ProcessMessage(ProcessSessionMessageEventArgs args)
+ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
{
if (args.CancellationToken.IsCancellationRequested)
{
- return Task.CompletedTask;
+ await args.AbandonMessageAsync(args.Message);
}
var ct = Interlocked.Increment(ref receivedCount);
@@ -1862,7 +1862,6 @@ Task ProcessMessage(ProcessSessionMessageEventArgs args)
{
Assert.LessOrEqual(processor.InnerProcessor._tasks.Where(t => !t.Task.IsCompleted).Count(), 1);
}
- return Task.CompletedTask;
}
processor.ProcessMessageAsync += ProcessMessage;
From ba2ec6def55cd66ff6788641c610d549cd19e838 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Sun, 15 Aug 2021 20:49:35 -0700
Subject: [PATCH 13/14] Use CT for AmqpObject.OpenAsync
---
.../src/Amqp/AmqpConnectionScope.cs | 40 +------------------
1 file changed, 1 insertion(+), 39 deletions(-)
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs
index c635020ed184..c14d83bea84b 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs
@@ -1046,40 +1046,9 @@ protected virtual async Task OpenAmqpObjectAsync(
string entityPath = default,
bool isProcessor = default)
{
- CancellationTokenRegistration registration;
try
{
- var openObjectCompletionSource = new TaskCompletionSource