Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ const config: UserConfig<DefaultTheme.Config> = {
{text: 'Conventional Routing', link:'/guide/messaging/transports/sqs/conventional-routing'},
{text: 'Interoperability', link:'/guide/messaging/transports/sqs/interoperability'},
{text: 'MessageAttributes', link:'/guide/messaging/transports/sqs/message-attributes'},
{text: 'FIFO Queues', link:'/guide/messaging/transports/sqs/fifo-queues'}
{text: 'FIFO Queues', link:'/guide/messaging/transports/sqs/fifo-queues'},
{text: 'Fair Queues', link:'/guide/messaging/transports/sqs/fair-queues'}
]},
{text: 'Amazon SNS', link: '/guide/messaging/transports/sns'},
{text: 'TCP', link: '/guide/messaging/transports/tcp'},
Expand Down
32 changes: 32 additions & 0 deletions docs/guide/messaging/transports/sqs/fair-queues.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Fair Queues

[Amazon SQS fair queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html) let a **standard** queue mitigate "noisy neighbor" problems in multi-tenant workloads. By assigning a `MessageGroupId` to each message, SQS spreads dwell time more fairly across groups so that one tenant generating a large backlog does not starve the others. Unlike FIFO queues, fair queues imply **no ordering or deduplication semantics** — they keep standard queue throughput while improving fairness.

## Enabling Fair Queues

Wolverine already maps `Envelope.GroupId` to the SQS `MessageGroupId` for [FIFO queues](/guide/messaging/transports/sqs/fifo-queues). For a standard queue this mapping is opt-in via `EnableFairQueueMessageGroups()`, so existing standard queues are unaffected unless you ask for it:

```cs
opts.PublishMessage<OrderPlaced>()
.ToSqsQueue("orders")
.EnableFairQueueMessageGroups();
```

Once enabled, set the group id the same way you would for a FIFO queue — typically a tenant id — through `DeliveryOptions`:

```cs
await messageBus.PublishAsync(new OrderPlaced(orderId), new DeliveryOptions
{
GroupId = tenantId
});
```

The group id can also be assigned with [message partitioning](/guide/messaging/partitioning) rather than per-publish.

::: tip
`EnableFairQueueMessageGroups()` only affects standard queues. FIFO queues (names ending in `.fifo`) always map `MessageGroupId` and `MessageDeduplicationId` regardless of this setting.
:::

## Customizing the Group Id

The group id is derived through the endpoint's `ISqsEnvelopeMapper` via `DetermineGroupId(Envelope)`, which returns `Envelope.GroupId` by default. A custom mapper can override it to source the group id from a header, the message body, or a tenant id when interoperating with non-Wolverine systems.
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Wolverine.AmazonSqs.Internal;
using Wolverine.ComplianceTests;
using Wolverine.Configuration;

namespace Wolverine.AmazonSqs.Tests.Internal;

// GH-2886: standard (non-FIFO) SQS queues can opt into AWS "fair queues" by mapping
// Envelope.GroupId onto the SQS MessageGroupId. The mapping flows through the envelope
// mapper (ISqsEnvelopeMapper.DetermineGroupId) and is gated per-endpoint by
// EnableFairQueueMessageGroups(). FIFO queues keep their existing group + deduplication
// behavior unconditionally; standard queues never set a deduplication id.
public class sqs_fair_queue_message_groups
{
private static AmazonSqsQueue QueueFor(string name, bool enableFairQueues)
{
var queue = new AmazonSqsQueue(name, new AmazonSqsTransport())
{
Mapper = new DefaultSqsEnvelopeMapper(),
EnableFairQueueMessageGroups = enableFairQueues
};

return queue;
}

private static SendMessageBatchRequestEntryView FirstEntry(AmazonSqsQueue queue, Envelope envelope)
{
var batch = new OutgoingSqsBatch(queue, NullLogger.Instance, [envelope]);
var entry = batch.Request.Entries.ShouldHaveSingleItem();
return new SendMessageBatchRequestEntryView(entry.MessageGroupId, entry.MessageDeduplicationId);
}

private record SendMessageBatchRequestEntryView(string? MessageGroupId, string? MessageDeduplicationId);

[Fact]
public void default_mapper_maps_group_id_from_envelope()
{
var envelope = ObjectMother.Envelope();
envelope.GroupId = "tenant-1";

new DefaultSqsEnvelopeMapper().DetermineGroupId(envelope).ShouldBe("tenant-1");
}

[Fact]
public void standard_queue_without_opt_in_does_not_set_message_group_id()
{
var envelope = ObjectMother.Envelope();
envelope.GroupId = "tenant-1";

FirstEntry(QueueFor("standard", enableFairQueues: false), envelope)
.MessageGroupId.ShouldBeNull();
}

[Fact]
public void standard_queue_with_opt_in_sets_message_group_id()
{
var envelope = ObjectMother.Envelope();
envelope.GroupId = "tenant-1";

FirstEntry(QueueFor("standard", enableFairQueues: true), envelope)
.MessageGroupId.ShouldBe("tenant-1");
}

[Fact]
public void standard_queue_with_opt_in_but_no_group_id_leaves_it_unset()
{
var envelope = ObjectMother.Envelope();
envelope.GroupId = null;

FirstEntry(QueueFor("standard", enableFairQueues: true), envelope)
.MessageGroupId.ShouldBeNull();
}

[Fact]
public void standard_queue_never_sets_deduplication_id()
{
// Deduplication is a FIFO-only concept. A standard fair queue must not receive one
// even when the envelope happens to carry a deduplication id.
var envelope = ObjectMother.Envelope();
envelope.GroupId = "tenant-1";
envelope.DeduplicationId = "dedup-1";

FirstEntry(QueueFor("standard", enableFairQueues: true), envelope)
.MessageDeduplicationId.ShouldBeNull();
}

[Fact]
public void fifo_queue_sets_group_and_deduplication_regardless_of_opt_in()
{
// FIFO behavior is unchanged: MessageGroupId + MessageDeduplicationId are always mapped,
// independent of the EnableFairQueueMessageGroups() flag.
var envelope = ObjectMother.Envelope();
envelope.GroupId = "tenant-1";
envelope.DeduplicationId = "dedup-1";

var view = FirstEntry(QueueFor("orders.fifo", enableFairQueues: false), envelope);

view.MessageGroupId.ShouldBe("tenant-1");
view.MessageDeduplicationId.ShouldBe("dedup-1");
}

[Fact]
public void enable_fair_queue_message_groups_configuration_sets_flag()
{
var queue = new AmazonSqsQueue("standard", new AmazonSqsTransport());
var configuration = new AmazonSqsSubscriberConfiguration(queue);

configuration.EnableFairQueueMessageGroups();
((IDelayedEndpointConfiguration)configuration).Apply();

queue.EnableFairQueueMessageGroups.ShouldBeTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ public AmazonSqsSubscriberConfiguration ConfigureQueueCreation(Action<CreateQueu
return this;
}

/// <summary>
/// Opt this standard (non-FIFO) queue into Amazon SQS fair queues by mapping
/// <see cref="Envelope.GroupId"/> (set through <c>DeliveryOptions.GroupId</c> or message
/// partitioning) to the SQS <c>MessageGroupId</c> on outgoing messages. This improves
/// fairness for multi-tenant workloads and implies no ordering or deduplication semantics.
/// Has no effect on FIFO queues, which always set <c>MessageGroupId</c>. See
/// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html
/// </summary>
public AmazonSqsSubscriberConfiguration EnableFairQueueMessageGroups()
{
add(e => e.EnableFairQueueMessageGroups = true);
return this;
}

/// Opt to send messages as raw JSON without any Wolverine metadata
/// </summary>
/// <param name="defaultMessageType">Optional. If both sending and receiving from this queue, you will want to specify a default message type</param>
Expand Down
10 changes: 10 additions & 0 deletions src/Transports/AWS/Wolverine.AmazonSqs/ISqsEnvelopeMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ public interface ISqsEnvelopeMapper
IEnumerable<KeyValuePair<string, MessageAttributeValue>> ToAttributes(Envelope envelope);

void ReadEnvelopeData(Envelope envelope, string messageBody, IDictionary<string, MessageAttributeValue> attributes);

/// <summary>
/// Determine the SQS <c>MessageGroupId</c> for an outgoing message. This is applied to FIFO
/// queues and, when <c>EnableFairQueueMessageGroups()</c> is set, to standard queues to opt into
/// SQS fair queues. Return <c>null</c> to leave <c>MessageGroupId</c> unset. The default maps
/// <see cref="Envelope.GroupId"/>; override to source the group id from a header, tenant id, etc.
/// </summary>
string? DetermineGroupId(Envelope envelope) => envelope.GroupId;
}

public class DefaultSqsEnvelopeMapper : ISqsEnvelopeMapper
Expand All @@ -36,4 +44,6 @@ public void ReadEnvelopeData(Envelope envelope, string messageBody,
var buffer = Convert.FromBase64String(messageBody);
EnvelopeSerializer.ReadEnvelopeData(envelope, buffer);
}

public string? DetermineGroupId(Envelope envelope) => envelope.GroupId;
}
24 changes: 22 additions & 2 deletions src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ internal AmazonSqsQueue(string queueName, AmazonSqsTransport parent) : base(

internal bool IsFifoQueue => QueueName.EndsWith(".fifo", StringComparison.OrdinalIgnoreCase);

/// <summary>
/// Opt this standard (non-FIFO) queue into Amazon SQS fair queues by mapping
/// <see cref="Envelope.GroupId"/> to the SQS <c>MessageGroupId</c> on outgoing messages.
/// This has no effect on FIFO queues, which always set <c>MessageGroupId</c>, and implies
/// no ordering or deduplication semantics. Default is <c>false</c>. See
/// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html
/// </summary>
public bool EnableFairQueueMessageGroups { get; set; }

// Set by the AmazonSqsTransport parent
internal string? QueueUrl { get; private set; }

Expand Down Expand Up @@ -248,16 +257,27 @@ internal async Task SendMessageAsync(Envelope envelope, ILogger logger)
var request = new SendMessageRequest(QueueUrl, body);
if (IsFifoQueue)
{
if (envelope.GroupId.IsNotEmpty())
var groupId = Mapper.DetermineGroupId(envelope);
if (groupId.IsNotEmpty())
{
request.MessageGroupId = envelope.GroupId;
request.MessageGroupId = groupId;
}

if (envelope.DeduplicationId.IsNotEmpty())
{
request.MessageDeduplicationId = envelope.DeduplicationId;
}
}
else if (EnableFairQueueMessageGroups)
{
// SQS fair queues: a MessageGroupId on a standard queue improves tenant fairness.
// No deduplication semantics apply to standard queues. See GH-2886.
var groupId = Mapper.DetermineGroupId(envelope);
if (groupId.IsNotEmpty())
{
request.MessageGroupId = groupId;
}
}

foreach (var attribute in Mapper.ToAttributes(envelope))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,26 @@ public OutgoingSqsBatch(AmazonSqsQueue queue, ILogger logger, IEnumerable<Envelo
var entry = new SendMessageBatchRequestEntry(envelope.Id.ToString(), queue.Mapper!.BuildMessageBody(envelope));
if (queue.IsFifoQueue)
{
if (envelope.GroupId.IsNotEmpty())
var groupId = queue.Mapper.DetermineGroupId(envelope);
if (groupId.IsNotEmpty())
{
entry.MessageGroupId = envelope.GroupId;
entry.MessageGroupId = groupId;
}
if (envelope.DeduplicationId.IsNotEmpty())
{
entry.MessageDeduplicationId = envelope.DeduplicationId;
}
}
else if (queue.EnableFairQueueMessageGroups)
{
// SQS fair queues: a MessageGroupId on a standard queue improves tenant fairness.
// No deduplication semantics apply to standard queues. See GH-2886.
var groupId = queue.Mapper.DetermineGroupId(envelope);
if (groupId.IsNotEmpty())
{
entry.MessageGroupId = groupId;
}
}

foreach (var attribute in queue.Mapper.ToAttributes(envelope))
{
Expand Down
Loading