From 9643c4b1771e004b66ea789af2a65dda61b4d281 Mon Sep 17 00:00:00 2001 From: jolov Date: Mon, 12 Feb 2024 17:14:05 -0800 Subject: [PATCH 1/4] Fix for drain when prefetch is enabled --- .../src/Amqp/AmqpReceiver.cs | 12 +++++++---- .../Processor/SessionProcessorLiveTests.cs | 20 +++++++++++++------ .../Receiver/SessionReceiverLiveTests.cs | 9 +++++++-- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index 619de810941c..72fba9ab6127 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -369,12 +369,16 @@ private async Task> ReceiveMessagesAsyn messagesReceived as IReadOnlyCollection ?? messagesReceived?.ToList() ?? s_emptyAmqpMessageList; // If this is a session receiver and we didn't receive all requested messages, we need to drain the credits - // to ensure FIFO ordering within each session. We exclude session processor since those will always receive a single message - // at a time, and if there are no messages, the session will be closed. The session won't be closed in the case that - // MaxConcurrentCallsPerSession > 1, but in that case FIFO is not possible to guarantee. - if (_isSessionReceiver && !_isProcessor && messageList.Count < maxMessages) + // to ensure FIFO ordering within each session. We exclude session processors where the since those will always receive a single message + // at a time, and if there are no messages, the session will be closed unless the processor was configured to receive from specific sessions. + // The session won't be closed in the case that MaxConcurrentCallsPerSession > 1, but in that case FIFO is not possible to guarantee. + if (_isSessionReceiver && (!_isProcessor || SessionId != null) && messageList.Count < maxMessages) { await link.DrainAsyc(cancellationToken).ConfigureAwait(false); + if (_prefetchCount > 0) + { + link.SetTotalLinkCredit((uint)_prefetchCount, true, false); + } } List receivedMessages = null; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs index 68b0cc918598..1d31aaf0c47b 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs @@ -2680,18 +2680,26 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args) } [Test] - public async Task SessionOrderingIsGuaranteedProcessor() + [TestCase(true, true)] + [TestCase(true, false)] + [TestCase(false, true)] + [TestCase(false, false)] + public async Task SessionOrderingIsGuaranteedProcessor(bool prefetch, bool useSpecificSession) { await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) { await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); long lastSequenceNumber = 0; + var options = new ServiceBusSessionProcessorOptions + { + MaxConcurrentCallsPerSession = 1, MaxConcurrentSessions = 1, PrefetchCount = prefetch ? 5 : 0 + }; + if (useSpecificSession) + { + options.SessionIds.Add("session"); + } - await using var processor = client.CreateSessionProcessor(scope.QueueName, - new ServiceBusSessionProcessorOptions - { - MaxConcurrentCallsPerSession = 1, MaxConcurrentSessions = 1 - }); + await using var processor = client.CreateSessionProcessor(scope.QueueName, options); processor.ProcessMessageAsync += ProcessMessage; processor.ProcessErrorAsync += SessionErrorHandler; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs index c09a645044df..598fb02fe334 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs @@ -1150,12 +1150,17 @@ public async Task CannotDeadLetterAfterLinkReconnect() } [Test] - public async Task SessionOrderingIsGuaranteed() + [TestCase(true)] + [TestCase(false)] + public async Task SessionOrderingIsGuaranteed(bool prefetch) { await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) { await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); - var receiver = await client.AcceptSessionAsync(scope.QueueName, "session"); + var receiver = await client.AcceptSessionAsync(scope.QueueName, "session", new ServiceBusSessionReceiverOptions + { + PrefetchCount = prefetch ? 5 : 0 + }); var sender = client.CreateSender(scope.QueueName); CancellationTokenSource cts = new CancellationTokenSource(); From 812211659236b2cc3d1fac1bd586226884e657d3 Mon Sep 17 00:00:00 2001 From: jolov Date: Tue, 13 Feb 2024 14:19:27 -0800 Subject: [PATCH 2/4] Fix to prefetch drain --- .../Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index 72fba9ab6127..3f15fac1f1ef 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -377,7 +377,8 @@ private async Task> ReceiveMessagesAsyn await link.DrainAsyc(cancellationToken).ConfigureAwait(false); if (_prefetchCount > 0) { - link.SetTotalLinkCredit((uint)_prefetchCount, true, false); + link.Settings.TotalLinkCredit = 0; + link.SetTotalLinkCredit((uint)_prefetchCount, true, true); } } From c022265959fcf3ff14ada15bc46526c19c1de449 Mon Sep 17 00:00:00 2001 From: jolov Date: Tue, 13 Feb 2024 14:21:25 -0800 Subject: [PATCH 3/4] Add comment --- .../Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index 3f15fac1f1ef..0c138f3a038e 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -375,6 +375,9 @@ private async Task> ReceiveMessagesAsyn if (_isSessionReceiver && (!_isProcessor || SessionId != null) && messageList.Count < maxMessages) { await link.DrainAsyc(cancellationToken).ConfigureAwait(false); + + // These workarounds are necessary in order to resume prefetching after the link has been drained + // https://github.com/Azure/azure-amqp/issues/252#issuecomment-1942734342 if (_prefetchCount > 0) { link.Settings.TotalLinkCredit = 0; From 7643ae0bb791d24e84e573cf78bddfb2496812df Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Tue, 13 Feb 2024 14:21:58 -0800 Subject: [PATCH 4/4] Update sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs Co-authored-by: Jesse Squire --- .../Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index 0c138f3a038e..ed47a73fd47c 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -369,9 +369,9 @@ private async Task> ReceiveMessagesAsyn messagesReceived as IReadOnlyCollection ?? messagesReceived?.ToList() ?? s_emptyAmqpMessageList; // If this is a session receiver and we didn't receive all requested messages, we need to drain the credits - // to ensure FIFO ordering within each session. We exclude session processors where the since those will always receive a single message - // at a time, and if there are no messages, the session will be closed unless the processor was configured to receive from specific sessions. - // The session won't be closed in the case that MaxConcurrentCallsPerSession > 1, but in that case FIFO is not possible to guarantee. + // to ensure FIFO ordering within each session. We exclude session processors, since those will always receive a single message + // at a time. If there are no messages, the session will be closed unless the processor was configured to receive from specific sessions. + // The session won't be closed in the case that MaxConcurrentCallsPerSession > 1, but with concurrency, it is not possible to guarantee ordering. if (_isSessionReceiver && (!_isProcessor || SessionId != null) && messageList.Count < maxMessages) { await link.DrainAsyc(cancellationToken).ConfigureAwait(false);