diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 7bc42c968..acd5e5c24 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -214,7 +214,8 @@ const config: UserConfig = { {text: 'Conventional Routing', link:'/guide/messaging/transports/sqs/conventional-routing'}, {text: 'Interoperability', link:'/guide/messaging/transports/sqs/interoperability'}, {text: 'MessageAttributes', link:'/guide/messaging/transports/sqs/message-attributes'}, - {text: 'FIFO Queues', link:'/guide/messaging/transports/sqs/fifo-queues'} + {text: 'FIFO Queues', link:'/guide/messaging/transports/sqs/fifo-queues'}, + {text: 'Fair Queues', link:'/guide/messaging/transports/sqs/fair-queues'} ]}, {text: 'Amazon SNS', link: '/guide/messaging/transports/sns'}, {text: 'TCP', link: '/guide/messaging/transports/tcp'}, diff --git a/docs/guide/messaging/transports/sqs/fair-queues.md b/docs/guide/messaging/transports/sqs/fair-queues.md new file mode 100644 index 000000000..f519be425 --- /dev/null +++ b/docs/guide/messaging/transports/sqs/fair-queues.md @@ -0,0 +1,32 @@ +# Fair Queues + +[Amazon SQS fair queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html) let a **standard** queue mitigate "noisy neighbor" problems in multi-tenant workloads. By assigning a `MessageGroupId` to each message, SQS spreads dwell time more fairly across groups so that one tenant generating a large backlog does not starve the others. Unlike FIFO queues, fair queues imply **no ordering or deduplication semantics** — they keep standard queue throughput while improving fairness. + +## Enabling Fair Queues + +Wolverine already maps `Envelope.GroupId` to the SQS `MessageGroupId` for [FIFO queues](/guide/messaging/transports/sqs/fifo-queues). For a standard queue this mapping is opt-in via `EnableFairQueueMessageGroups()`, so existing standard queues are unaffected unless you ask for it: + +```cs +opts.PublishMessage() + .ToSqsQueue("orders") + .EnableFairQueueMessageGroups(); +``` + +Once enabled, set the group id the same way you would for a FIFO queue — typically a tenant id — through `DeliveryOptions`: + +```cs +await messageBus.PublishAsync(new OrderPlaced(orderId), new DeliveryOptions +{ + GroupId = tenantId +}); +``` + +The group id can also be assigned with [message partitioning](/guide/messaging/partitioning) rather than per-publish. + +::: tip +`EnableFairQueueMessageGroups()` only affects standard queues. FIFO queues (names ending in `.fifo`) always map `MessageGroupId` and `MessageDeduplicationId` regardless of this setting. +::: + +## Customizing the Group Id + +The group id is derived through the endpoint's `ISqsEnvelopeMapper` via `DetermineGroupId(Envelope)`, which returns `Envelope.GroupId` by default. A custom mapper can override it to source the group id from a header, the message body, or a tenant id when interoperating with non-Wolverine systems. diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/Internal/sqs_fair_queue_message_groups.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/Internal/sqs_fair_queue_message_groups.cs new file mode 100644 index 000000000..0d5a0913b --- /dev/null +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/Internal/sqs_fair_queue_message_groups.cs @@ -0,0 +1,114 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Wolverine.AmazonSqs.Internal; +using Wolverine.ComplianceTests; +using Wolverine.Configuration; + +namespace Wolverine.AmazonSqs.Tests.Internal; + +// GH-2886: standard (non-FIFO) SQS queues can opt into AWS "fair queues" by mapping +// Envelope.GroupId onto the SQS MessageGroupId. The mapping flows through the envelope +// mapper (ISqsEnvelopeMapper.DetermineGroupId) and is gated per-endpoint by +// EnableFairQueueMessageGroups(). FIFO queues keep their existing group + deduplication +// behavior unconditionally; standard queues never set a deduplication id. +public class sqs_fair_queue_message_groups +{ + private static AmazonSqsQueue QueueFor(string name, bool enableFairQueues) + { + var queue = new AmazonSqsQueue(name, new AmazonSqsTransport()) + { + Mapper = new DefaultSqsEnvelopeMapper(), + EnableFairQueueMessageGroups = enableFairQueues + }; + + return queue; + } + + private static SendMessageBatchRequestEntryView FirstEntry(AmazonSqsQueue queue, Envelope envelope) + { + var batch = new OutgoingSqsBatch(queue, NullLogger.Instance, [envelope]); + var entry = batch.Request.Entries.ShouldHaveSingleItem(); + return new SendMessageBatchRequestEntryView(entry.MessageGroupId, entry.MessageDeduplicationId); + } + + private record SendMessageBatchRequestEntryView(string? MessageGroupId, string? MessageDeduplicationId); + + [Fact] + public void default_mapper_maps_group_id_from_envelope() + { + var envelope = ObjectMother.Envelope(); + envelope.GroupId = "tenant-1"; + + new DefaultSqsEnvelopeMapper().DetermineGroupId(envelope).ShouldBe("tenant-1"); + } + + [Fact] + public void standard_queue_without_opt_in_does_not_set_message_group_id() + { + var envelope = ObjectMother.Envelope(); + envelope.GroupId = "tenant-1"; + + FirstEntry(QueueFor("standard", enableFairQueues: false), envelope) + .MessageGroupId.ShouldBeNull(); + } + + [Fact] + public void standard_queue_with_opt_in_sets_message_group_id() + { + var envelope = ObjectMother.Envelope(); + envelope.GroupId = "tenant-1"; + + FirstEntry(QueueFor("standard", enableFairQueues: true), envelope) + .MessageGroupId.ShouldBe("tenant-1"); + } + + [Fact] + public void standard_queue_with_opt_in_but_no_group_id_leaves_it_unset() + { + var envelope = ObjectMother.Envelope(); + envelope.GroupId = null; + + FirstEntry(QueueFor("standard", enableFairQueues: true), envelope) + .MessageGroupId.ShouldBeNull(); + } + + [Fact] + public void standard_queue_never_sets_deduplication_id() + { + // Deduplication is a FIFO-only concept. A standard fair queue must not receive one + // even when the envelope happens to carry a deduplication id. + var envelope = ObjectMother.Envelope(); + envelope.GroupId = "tenant-1"; + envelope.DeduplicationId = "dedup-1"; + + FirstEntry(QueueFor("standard", enableFairQueues: true), envelope) + .MessageDeduplicationId.ShouldBeNull(); + } + + [Fact] + public void fifo_queue_sets_group_and_deduplication_regardless_of_opt_in() + { + // FIFO behavior is unchanged: MessageGroupId + MessageDeduplicationId are always mapped, + // independent of the EnableFairQueueMessageGroups() flag. + var envelope = ObjectMother.Envelope(); + envelope.GroupId = "tenant-1"; + envelope.DeduplicationId = "dedup-1"; + + var view = FirstEntry(QueueFor("orders.fifo", enableFairQueues: false), envelope); + + view.MessageGroupId.ShouldBe("tenant-1"); + view.MessageDeduplicationId.ShouldBe("dedup-1"); + } + + [Fact] + public void enable_fair_queue_message_groups_configuration_sets_flag() + { + var queue = new AmazonSqsQueue("standard", new AmazonSqsTransport()); + var configuration = new AmazonSqsSubscriberConfiguration(queue); + + configuration.EnableFairQueueMessageGroups(); + ((IDelayedEndpointConfiguration)configuration).Apply(); + + queue.EnableFairQueueMessageGroups.ShouldBeTrue(); + } +} diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsSubscriberConfiguration.cs b/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsSubscriberConfiguration.cs index f0b3d2e4a..2fc3c18ab 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsSubscriberConfiguration.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsSubscriberConfiguration.cs @@ -27,6 +27,20 @@ public AmazonSqsSubscriberConfiguration ConfigureQueueCreation(Action + /// Opt this standard (non-FIFO) queue into Amazon SQS fair queues by mapping + /// (set through DeliveryOptions.GroupId or message + /// partitioning) to the SQS MessageGroupId on outgoing messages. This improves + /// fairness for multi-tenant workloads and implies no ordering or deduplication semantics. + /// Has no effect on FIFO queues, which always set MessageGroupId. See + /// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html + /// + public AmazonSqsSubscriberConfiguration EnableFairQueueMessageGroups() + { + add(e => e.EnableFairQueueMessageGroups = true); + return this; + } + /// Opt to send messages as raw JSON without any Wolverine metadata /// /// Optional. If both sending and receiving from this queue, you will want to specify a default message type diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/ISqsEnvelopeMapper.cs b/src/Transports/AWS/Wolverine.AmazonSqs/ISqsEnvelopeMapper.cs index 11871fb63..889b3c416 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/ISqsEnvelopeMapper.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/ISqsEnvelopeMapper.cs @@ -14,6 +14,14 @@ public interface ISqsEnvelopeMapper IEnumerable> ToAttributes(Envelope envelope); void ReadEnvelopeData(Envelope envelope, string messageBody, IDictionary attributes); + + /// + /// Determine the SQS MessageGroupId for an outgoing message. This is applied to FIFO + /// queues and, when EnableFairQueueMessageGroups() is set, to standard queues to opt into + /// SQS fair queues. Return null to leave MessageGroupId unset. The default maps + /// ; override to source the group id from a header, tenant id, etc. + /// + string? DetermineGroupId(Envelope envelope) => envelope.GroupId; } public class DefaultSqsEnvelopeMapper : ISqsEnvelopeMapper @@ -36,4 +44,6 @@ public void ReadEnvelopeData(Envelope envelope, string messageBody, var buffer = Convert.FromBase64String(messageBody); EnvelopeSerializer.ReadEnvelopeData(envelope, buffer); } + + public string? DetermineGroupId(Envelope envelope) => envelope.GroupId; } \ No newline at end of file diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsQueue.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsQueue.cs index b827cef3c..b9e0167d6 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsQueue.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsQueue.cs @@ -54,6 +54,15 @@ internal AmazonSqsQueue(string queueName, AmazonSqsTransport parent) : base( internal bool IsFifoQueue => QueueName.EndsWith(".fifo", StringComparison.OrdinalIgnoreCase); + /// + /// Opt this standard (non-FIFO) queue into Amazon SQS fair queues by mapping + /// to the SQS MessageGroupId on outgoing messages. + /// This has no effect on FIFO queues, which always set MessageGroupId, and implies + /// no ordering or deduplication semantics. Default is false. See + /// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html + /// + public bool EnableFairQueueMessageGroups { get; set; } + // Set by the AmazonSqsTransport parent internal string? QueueUrl { get; private set; } @@ -248,9 +257,10 @@ internal async Task SendMessageAsync(Envelope envelope, ILogger logger) var request = new SendMessageRequest(QueueUrl, body); if (IsFifoQueue) { - if (envelope.GroupId.IsNotEmpty()) + var groupId = Mapper.DetermineGroupId(envelope); + if (groupId.IsNotEmpty()) { - request.MessageGroupId = envelope.GroupId; + request.MessageGroupId = groupId; } if (envelope.DeduplicationId.IsNotEmpty()) @@ -258,6 +268,16 @@ internal async Task SendMessageAsync(Envelope envelope, ILogger logger) request.MessageDeduplicationId = envelope.DeduplicationId; } } + else if (EnableFairQueueMessageGroups) + { + // SQS fair queues: a MessageGroupId on a standard queue improves tenant fairness. + // No deduplication semantics apply to standard queues. See GH-2886. + var groupId = Mapper.DetermineGroupId(envelope); + if (groupId.IsNotEmpty()) + { + request.MessageGroupId = groupId; + } + } foreach (var attribute in Mapper.ToAttributes(envelope)) { diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsSenderProtocol.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsSenderProtocol.cs index 56421bbc4..d335ab9f1 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsSenderProtocol.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/SqsSenderProtocol.cs @@ -61,15 +61,26 @@ public OutgoingSqsBatch(AmazonSqsQueue queue, ILogger logger, IEnumerable