diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index 619de810941c..ed47a73fd47c 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -369,12 +369,20 @@ private async Task> ReceiveMessagesAsyn messagesReceived as IReadOnlyCollection ?? messagesReceived?.ToList() ?? s_emptyAmqpMessageList; // If this is a session receiver and we didn't receive all requested messages, we need to drain the credits - // to ensure FIFO ordering within each session. We exclude session processor since those will always receive a single message - // at a time, and if there are no messages, the session will be closed. The session won't be closed in the case that - // MaxConcurrentCallsPerSession > 1, but in that case FIFO is not possible to guarantee. - if (_isSessionReceiver && !_isProcessor && messageList.Count < maxMessages) + // to ensure FIFO ordering within each session. We exclude session processors, since those will always receive a single message + // at a time. If there are no messages, the session will be closed unless the processor was configured to receive from specific sessions. + // The session won't be closed in the case that MaxConcurrentCallsPerSession > 1, but with concurrency, it is not possible to guarantee ordering. + if (_isSessionReceiver && (!_isProcessor || SessionId != null) && messageList.Count < maxMessages) { await link.DrainAsyc(cancellationToken).ConfigureAwait(false); + + // These workarounds are necessary in order to resume prefetching after the link has been drained + // https://github.com/Azure/azure-amqp/issues/252#issuecomment-1942734342 + if (_prefetchCount > 0) + { + link.Settings.TotalLinkCredit = 0; + link.SetTotalLinkCredit((uint)_prefetchCount, true, true); + } } List receivedMessages = null; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs index 68b0cc918598..1d31aaf0c47b 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs @@ -2680,18 +2680,26 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args) } [Test] - public async Task SessionOrderingIsGuaranteedProcessor() + [TestCase(true, true)] + [TestCase(true, false)] + [TestCase(false, true)] + [TestCase(false, false)] + public async Task SessionOrderingIsGuaranteedProcessor(bool prefetch, bool useSpecificSession) { await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) { await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); long lastSequenceNumber = 0; + var options = new ServiceBusSessionProcessorOptions + { + MaxConcurrentCallsPerSession = 1, MaxConcurrentSessions = 1, PrefetchCount = prefetch ? 5 : 0 + }; + if (useSpecificSession) + { + options.SessionIds.Add("session"); + } - await using var processor = client.CreateSessionProcessor(scope.QueueName, - new ServiceBusSessionProcessorOptions - { - MaxConcurrentCallsPerSession = 1, MaxConcurrentSessions = 1 - }); + await using var processor = client.CreateSessionProcessor(scope.QueueName, options); processor.ProcessMessageAsync += ProcessMessage; processor.ProcessErrorAsync += SessionErrorHandler; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs index c09a645044df..598fb02fe334 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs @@ -1150,12 +1150,17 @@ public async Task CannotDeadLetterAfterLinkReconnect() } [Test] - public async Task SessionOrderingIsGuaranteed() + [TestCase(true)] + [TestCase(false)] + public async Task SessionOrderingIsGuaranteed(bool prefetch) { await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) { await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); - var receiver = await client.AcceptSessionAsync(scope.QueueName, "session"); + var receiver = await client.AcceptSessionAsync(scope.QueueName, "session", new ServiceBusSessionReceiverOptions + { + PrefetchCount = prefetch ? 5 : 0 + }); var sender = client.CreateSender(scope.QueueName); CancellationTokenSource cts = new CancellationTokenSource();