Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions docs/guide/messaging/transports/sqs/index.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
# Using Amazon SQS

::: warning
At this moment, Wolverine cannot support request/reply mechanics (`IMessageBus.InvokeAsync<T>()`) with Amazon SQS.
:::

Wolverine supports [Amazon SQS](https://aws.amazon.com/sqs/) as a messaging transport through the WolverineFx.AmazonSqs package.

## Connecting to the Broker
Expand Down Expand Up @@ -181,3 +177,64 @@ opts.UseAmazonSqsTransport()
```

The default delimiter between the prefix and the original name is `-` for Amazon SQS (e.g., `dev-john-orders`).

## Request/Reply <Badge type="tip" text="5.14" />

[Request/reply](https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html) mechanics (`IMessageBus.InvokeAsync<T>()`) are supported with the Amazon SQS transport when system queues are enabled. Wolverine creates a dedicated per-node response queue named like `wolverine-response-[service name]-[node id]` that is used to receive replies.

To enable request/reply support, call `EnableSystemQueues()` on the SQS transport configuration:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAmazonSqsTransport()
.AutoProvision()

// Enable system queues for request/reply support
.EnableSystemQueues();
}).StartAsync();
```

::: tip
Unlike Azure Service Bus and RabbitMQ where system queues are enabled by default, SQS system queues require explicit opt-in via `EnableSystemQueues()`. This is because creating SQS queues requires IAM permissions that your application may not have.
:::

System queues are automatically cleaned up when your application shuts down. Wolverine also tags each system queue with a `wolverine:last-active` timestamp and runs a background keep-alive timer. On startup, Wolverine scans for orphaned system queues (from crashed nodes) with the `wolverine-response-` or `wolverine-control-` prefix and deletes any that have been inactive for more than 5 minutes.

## Wolverine Control Queues <Badge type="tip" text="5.14" />

You can opt into using SQS queues for intra-node communication that Wolverine needs for leader election and background worker distribution. Using SQS for this feature is more efficient than the built-in database control queues that Wolverine uses otherwise, and is necessary for message storage options like RavenDb that do not have a built-in control queue mechanism.

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAmazonSqsTransport()
.AutoProvision()

// This enables Wolverine to use SQS queues
// created at runtime for communication between
// Wolverine nodes
.EnableWolverineControlQueues();
}).StartAsync();
```

Calling `EnableWolverineControlQueues()` implicitly enables system queues and request/reply support as well.

## Disabling System Queues <Badge type="tip" text="5.14" />

If your application does not have IAM permissions to create or delete queues, you can explicitly disable system queues:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAmazonSqsTransport()
.AutoProvision()
.SystemQueuesAreEnabled(false);

opts.ListenToSqsQueue("send-and-receive");
opts.PublishAllMessages().ToSqsQueue("send-and-receive");
}).StartAsync();
```
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@ public async Task InitializeAsync()
await SenderIs(opts =>
{
opts.UseAmazonSqsTransportLocally()
.AutoProvision().AutoPurgeOnStartup();
.AutoProvision().AutoPurgeOnStartup()
.EnableSystemQueues();

opts.ListenToSqsQueue("sender-" + number);
});

await ReceiverIs(opts =>
{
opts.UseAmazonSqsTransportLocally()
.AutoProvision().AutoPurgeOnStartup();
.AutoProvision().AutoPurgeOnStartup()
.EnableSystemQueues();

opts.ListenToSqsQueue("receiver-" + number).Named("receiver").BufferedInMemory();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ await SenderIs(opts =>
{
opts.UseAmazonSqsTransportLocally()
.AutoProvision()
.AutoPurgeOnStartup();
.AutoPurgeOnStartup()
.EnableSystemQueues();

opts.ListenToSqsQueue("sender-" + number);

Expand All @@ -36,7 +37,8 @@ await ReceiverIs(opts =>
{
opts.UseAmazonSqsTransportLocally()
.AutoProvision()
.AutoPurgeOnStartup();
.AutoPurgeOnStartup()
.EnableSystemQueues();

opts.ListenToSqsQueue("receiver-" + number).Named("receiver").ProcessInline();
});
Expand Down
12 changes: 12 additions & 0 deletions src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,18 @@ internal async Task SetupAsync(IAmazonSQS client)
var response = await client.CreateQueueAsync(Configuration);

QueueUrl = response.QueueUrl;

if (Role == EndpointRole.System)
{
await client.TagQueueAsync(new TagQueueRequest
{
QueueUrl = QueueUrl,
Tags = new Dictionary<string, string>
{
["wolverine:last-active"] = DateTime.UtcNow.ToString("o")
}
});
}
}
catch (Exception e)
{
Expand Down
137 changes: 134 additions & 3 deletions src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Amazon.SQS;
using Amazon.SQS.Model;
using JasperFx.Core;
using Microsoft.Extensions.Logging;
using Spectre.Console;
using Wolverine.Configuration;
using Wolverine.Runtime;
Expand All @@ -12,9 +13,16 @@ namespace Wolverine.AmazonSqs.Internal;
public class AmazonSqsTransport : BrokerTransport<AmazonSqsQueue>
{
public const string DeadLetterQueueName = DeadLetterQueueConstants.DefaultQueueName;

public const string ResponseEndpointName = "AmazonSqsResponses";
public const char Separator = '-';

private static readonly TimeSpan OrphanThreshold = TimeSpan.FromMinutes(5);
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(2);
private const string LastActiveTagKey = "wolverine:last-active";

internal readonly List<AmazonSqsQueue> SystemQueues = new();
private Task? _keepAliveTask;

public AmazonSqsTransport(string protocol) : base(protocol, "Amazon SQS", ["aws", "sqs"])
{
Queues = new LightweightCache<string, AmazonSqsQueue>(name => new AmazonSqsQueue(name, this));
Expand Down Expand Up @@ -46,6 +54,12 @@ internal AmazonSqsTransport(IAmazonSQS client) : this()
public bool UseLocalStackInDevelopment { get; set; }
public bool DisableDeadLetterQueues { get; set; }

/// <summary>
/// Is this transport connection allowed to build and use response and control queues
/// for just this node? Default is false, requiring explicit opt-in.
/// </summary>
public bool SystemQueuesEnabled { get; set; }

public static string SanitizeSqsName(string identifier)
{
//AWS requires FIFO queues to have a `.fifo` suffix
Expand Down Expand Up @@ -95,10 +109,127 @@ protected override AmazonSqsQueue findEndpointByUri(Uri uri)
return Queues.Where(x => x.Uri.OriginalString == uri.OriginalString).FirstOrDefault() ?? Queues[uri.OriginalString.Split("//")[1].TrimEnd('/')];
}

public override ValueTask ConnectAsync(IWolverineRuntime runtime)
protected override void tryBuildSystemEndpoints(IWolverineRuntime runtime)
{
if (!SystemQueuesEnabled) return;

// Lowercase the name because Uri normalizes the host portion to lowercase,
// and SQS queue names are case-sensitive. Without this, the sender creates
// "wolverine-response-MyApp-123" but the receiver resolves the reply URI
// to "wolverine-response-myapp-123" (lowercased by Uri), creating a different queue.
var responseName = SanitizeSqsName(
$"wolverine.response.{runtime.Options.ServiceName}.{runtime.DurabilitySettings.AssignedNodeNumber}")
.ToLowerInvariant();

var queue = Queues[responseName];
queue.Mode = EndpointMode.BufferedInMemory;
queue.IsListener = true;
queue.EndpointName = ResponseEndpointName;
queue.IsUsedForReplies = true;
queue.Role = EndpointRole.System;
queue.DeadLetterQueueName = null;
queue.Configuration.Attributes ??= new Dictionary<string, string>();
queue.Configuration.Attributes["MessageRetentionPeriod"] = "300";

SystemQueues.Add(queue);
}

public override async ValueTask ConnectAsync(IWolverineRuntime runtime)
{
Client ??= BuildClient(runtime);
return ValueTask.CompletedTask;

if (SystemQueuesEnabled)
{
await CleanupOrphanedSystemQueuesAsync(runtime);
StartSystemQueueKeepAlive(runtime.DurabilitySettings.Cancellation, runtime);
}
}

internal async Task CleanupOrphanedSystemQueuesAsync(IWolverineRuntime runtime)
{
var logger = runtime.LoggerFactory.CreateLogger<AmazonSqsTransport>();
var prefixes = new[] { "wolverine-response-", "wolverine-control-" };

foreach (var prefix in prefixes)
{
try
{
var response = await Client!.ListQueuesAsync(new ListQueuesRequest { QueueNamePrefix = prefix });

foreach (var queueUrl in response.QueueUrls)
{
try
{
var tags = await Client.ListQueueTagsAsync(new ListQueueTagsRequest { QueueUrl = queueUrl });

if (tags.Tags.TryGetValue(LastActiveTagKey, out var lastActiveStr)
&& DateTimeOffset.TryParse(lastActiveStr, out var lastActive))
{
if (DateTimeOffset.UtcNow - lastActive > OrphanThreshold)
{
await Client.DeleteQueueAsync(new DeleteQueueRequest(queueUrl));
logger.LogInformation("Deleted orphaned Wolverine system queue {QueueUrl}", queueUrl);
}
}
else
{
// No valid tag — consider it orphaned
await Client.DeleteQueueAsync(new DeleteQueueRequest(queueUrl));
logger.LogInformation("Deleted untagged Wolverine system queue {QueueUrl}", queueUrl);
}
}
catch (Exception e)
{
logger.LogWarning(e, "Error checking orphaned queue {QueueUrl}", queueUrl);
}
}
}
catch (Exception e)
{
logger.LogWarning(e, "Error listing queues with prefix {Prefix}", prefix);
}
}
}

internal async Task TagSystemQueueAsync(string queueUrl)
{
await Client!.TagQueueAsync(new TagQueueRequest
{
QueueUrl = queueUrl,
Tags = new Dictionary<string, string>
{
[LastActiveTagKey] = DateTime.UtcNow.ToString("o")
}
});
}

internal void StartSystemQueueKeepAlive(CancellationToken cancellation, IWolverineRuntime runtime)
{
if (_keepAliveTask != null) return;

var logger = runtime.LoggerFactory.CreateLogger<AmazonSqsTransport>();

_keepAliveTask = Task.Run(async () =>
{
using var timer = new PeriodicTimer(KeepAliveInterval);
while (await timer.WaitForNextTickAsync(cancellation))
{
foreach (var queue in SystemQueues)
{
if (queue.QueueUrl.IsNotEmpty())
{
try
{
await TagSystemQueueAsync(queue.QueueUrl!);
}
catch (Exception e)
{
logger.LogWarning(e, "Error refreshing keep-alive tag for system queue {QueueName}", queue.QueueName);
}
}
}
}
}, cancellation);
}

public override IEnumerable<PropertyColumn> DiagnosticColumns()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Amazon.Runtime;
using Wolverine.Configuration;
using Wolverine.Runtime;
using Wolverine.Transports;

Expand Down Expand Up @@ -83,4 +84,58 @@ public AmazonSqsTransportConfiguration DisableAllNativeDeadLetterQueues()
Transport.DisableDeadLetterQueues = true;
return this;
}

/// <summary>
/// Enable Wolverine system queues for request/reply support.
/// Creates a per-node response queue that is automatically cleaned up.
/// </summary>
/// <returns></returns>
public AmazonSqsTransportConfiguration EnableSystemQueues()
{
Transport.SystemQueuesEnabled = true;
return this;
}

/// <summary>
/// Control whether Wolverine creates system queues for responses and retries.
/// Should be set to false if the application lacks permissions to create queues.
/// </summary>
/// <param name="enabled"></param>
/// <returns></returns>
public AmazonSqsTransportConfiguration SystemQueuesAreEnabled(bool enabled)
{
Transport.SystemQueuesEnabled = enabled;
return this;
}

/// <summary>
/// Enable Wolverine control queues for inter-node communication
/// (leader election, node coordination).
/// </summary>
/// <returns></returns>
public AmazonSqsTransportConfiguration EnableWolverineControlQueues()
{
Transport.SystemQueuesEnabled = true;

// Lowercase to match URI normalization (see tryBuildSystemEndpoints comment)
var queueName = AmazonSqsTransport.SanitizeSqsName(
"wolverine.control." + Options.Durability.AssignedNodeNumber)
.ToLowerInvariant();

var queue = Transport.Queues[queueName];
queue.Mode = EndpointMode.BufferedInMemory;
queue.IsListener = true;
queue.EndpointName = "Control";
queue.IsUsedForReplies = true;
queue.Role = EndpointRole.System;
queue.DeadLetterQueueName = null;
queue.Configuration.Attributes ??= new Dictionary<string, string>();
queue.Configuration.Attributes["MessageRetentionPeriod"] = "300";

Options.Transports.NodeControlEndpoint = queue;

Transport.SystemQueues.Add(queue);

return this;
}
}
1 change: 1 addition & 0 deletions src/Wolverine/AssemblyAttributes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
[assembly: InternalsVisibleTo("Wolverine.RabbitMq")]
[assembly: InternalsVisibleTo("Wolverine.RabbitMq.Tests")]
[assembly: InternalsVisibleTo("Wolverine.AzureServiceBus")]
[assembly: InternalsVisibleTo("Wolverine.AmazonSqs")]
[assembly: InternalsVisibleTo("Wolverine.ConfluentKafka")]
[assembly: InternalsVisibleTo("Wolverine.AzureServiceBus.Tests")]
[assembly: InternalsVisibleTo("PersistenceTests")]
Expand Down
Loading