Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/servicebus/Azure.Messaging.ServiceBus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- Adjusted retries to consider an unreachable host address as terminal. Previously, all socket-based errors were considered transient and would be retried.
- Updated the `ServiceBusMessage` constructor that takes a `ServiceBusReceivedMessage` to no longer copy over the
`x-opt-partition-id` key as this is meant to apply only to the original message.
- Drain excess credits when attempting to receive using sessions to ensure FIFO ordering.

### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Buffers;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -370,15 +371,23 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
maxWaitTime ?? timeout,
cancellationToken).ConfigureAwait(false);

IReadOnlyCollection<AmqpMessage> messageList =
messagesReceived as IReadOnlyCollection<AmqpMessage> ?? messagesReceived.ToList();

// If this is a session receiver and we didn't receive all requested messages, we need to drain the credits
// to ensure FIFO ordering within each session.
if (_isSessionReceiver && messageList.Count < maxMessages)
{
await link.DrainAsyc(cancellationToken).ConfigureAwait(false);
}

List<ServiceBusReceivedMessage> receivedMessages = null;
// If event messages were received, then package them for consumption and
// return them.
foreach (AmqpMessage message in messagesReceived)
foreach (AmqpMessage message in messageList)
{
// Getting the count of the underlying collection is good for performance/allocations to prevent the list from growing
receivedMessages ??= messagesReceived is IReadOnlyCollection<AmqpMessage> readOnlyList
? new List<ServiceBusReceivedMessage>(readOnlyList.Count)
: new List<ServiceBusReceivedMessage>();
receivedMessages ??= new List<ServiceBusReceivedMessage>(messageList.Count);

if (_receiveMode == ServiceBusReceiveMode.ReceiveAndDelete)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void ResetFeatureSwitch()
[TestCase(false)]
public async Task SenderReceiverActivitiesDisabled(bool useSessions)
{
using var listener = new TestActivitySourceListener(DiagnosticProperty.DiagnosticNamespace);
using var listener = new TestActivitySourceListener(source => source.Name.StartsWith(DiagnosticProperty.DiagnosticNamespace));

await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: useSessions))
{
Expand Down Expand Up @@ -72,7 +72,7 @@ public async Task SenderReceiverActivitiesDisabled(bool useSessions)
}
else
{
await receiver.RenewMessageLockAsync(received[-1]);
await receiver.RenewMessageLockAsync(received[1]);
}

// schedule
Expand Down Expand Up @@ -262,7 +262,7 @@ public async Task ProcessorActivities()
var messages = ServiceBusTestUtilities.GetMessages(messageCt);

using var listener = new TestActivitySourceListener(
DiagnosticProperty.DiagnosticNamespace,
source => source.Name.StartsWith(DiagnosticProperty.DiagnosticNamespace),
activityStartedCallback: activity =>
{
if (activity.OperationName == DiagnosticProperty.ProcessMessageActivityName)
Expand Down Expand Up @@ -321,7 +321,7 @@ public async Task SessionProcessorActivities()
var messages = ServiceBusTestUtilities.GetMessages(messageCt, "sessionId");

using var listener = new TestActivitySourceListener(
DiagnosticProperty.DiagnosticNamespace,
source => source.Name.StartsWith(DiagnosticProperty.DiagnosticNamespace),
activityStartedCallback: activity =>
{
if (activity.OperationName == DiagnosticProperty.ProcessSessionMessageActivityName)
Expand Down Expand Up @@ -374,7 +374,7 @@ public async Task SessionProcessorActivities()
public async Task RuleManagerActivities()
{
using var _ = SetAppConfigSwitch();
using var listener = new TestActivitySourceListener(DiagnosticProperty.DiagnosticNamespace);
using var listener = new TestActivitySourceListener(source => source.Name.StartsWith(DiagnosticProperty.DiagnosticNamespace));

await using (var scope = await ServiceBusScope.CreateWithTopic(enablePartitioning: false, enableSession: false))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1144,5 +1144,55 @@ public async Task CannotDeadLetterAfterLinkReconnect()
.EqualTo(ServiceBusFailureReason.SessionLockLost));
}
}

[Test]
public async Task SessionOrderingIsGuaranteed()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var receiver = await client.AcceptSessionAsync(scope.QueueName, "session");
var sender = client.CreateSender(scope.QueueName);

CancellationTokenSource cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(60));

var receive = ReceiveMessagesAsync();

var send = SendMessagesAsync();

await Task.WhenAll(send, receive);

async Task SendMessagesAsync()
{
while (!cts.IsCancellationRequested)
{
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session"));
await Task.Delay(TimeSpan.FromMilliseconds(100));
}
}

async Task ReceiveMessagesAsync()
{
long lastSequenceNumber = 0;
while (!cts.IsCancellationRequested)
{
var messages = await receiver.ReceiveMessagesAsync(10);
foreach (var message in messages)
{
if (message.SequenceNumber != lastSequenceNumber + 1)
{
Assert.Fail(
$"Last sequence number: {lastSequenceNumber}, current sequence number: {message.SequenceNumber}");
}

lastSequenceNumber = message.SequenceNumber;

await receiver.CompleteMessageAsync(message);
}
}
}
}
}
}
}