diff --git a/docs/guide/messaging/transports/sqs/index.md b/docs/guide/messaging/transports/sqs/index.md index e581958bb..89625f6f4 100644 --- a/docs/guide/messaging/transports/sqs/index.md +++ b/docs/guide/messaging/transports/sqs/index.md @@ -1,9 +1,5 @@ # Using Amazon SQS -::: warning -At this moment, Wolverine cannot support request/reply mechanics (`IMessageBus.InvokeAsync()`) with Amazon SQS. -::: - Wolverine supports [Amazon SQS](https://aws.amazon.com/sqs/) as a messaging transport through the WolverineFx.AmazonSqs package. ## Connecting to the Broker @@ -181,3 +177,64 @@ opts.UseAmazonSqsTransport() ``` The default delimiter between the prefix and the original name is `-` for Amazon SQS (e.g., `dev-john-orders`). + +## Request/Reply + +[Request/reply](https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html) mechanics (`IMessageBus.InvokeAsync()`) are supported with the Amazon SQS transport when system queues are enabled. Wolverine creates a dedicated per-node response queue named like `wolverine-response-[service name]-[node id]` that is used to receive replies. + +To enable request/reply support, call `EnableSystemQueues()` on the SQS transport configuration: + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseAmazonSqsTransport() + .AutoProvision() + + // Enable system queues for request/reply support + .EnableSystemQueues(); + }).StartAsync(); +``` + +::: tip +Unlike Azure Service Bus and RabbitMQ where system queues are enabled by default, SQS system queues require explicit opt-in via `EnableSystemQueues()`. This is because creating SQS queues requires IAM permissions that your application may not have. +::: + +System queues are automatically cleaned up when your application shuts down. Wolverine also tags each system queue with a `wolverine:last-active` timestamp and runs a background keep-alive timer. On startup, Wolverine scans for orphaned system queues (from crashed nodes) with the `wolverine-response-` or `wolverine-control-` prefix and deletes any that have been inactive for more than 5 minutes. + +## Wolverine Control Queues + +You can opt into using SQS queues for intra-node communication that Wolverine needs for leader election and background worker distribution. Using SQS for this feature is more efficient than the built-in database control queues that Wolverine uses otherwise, and is necessary for message storage options like RavenDb that do not have a built-in control queue mechanism. + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseAmazonSqsTransport() + .AutoProvision() + + // This enables Wolverine to use SQS queues + // created at runtime for communication between + // Wolverine nodes + .EnableWolverineControlQueues(); + }).StartAsync(); +``` + +Calling `EnableWolverineControlQueues()` implicitly enables system queues and request/reply support as well. + +## Disabling System Queues + +If your application does not have IAM permissions to create or delete queues, you can explicitly disable system queues: + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseAmazonSqsTransport() + .AutoProvision() + .SystemQueuesAreEnabled(false); + + opts.ListenToSqsQueue("send-and-receive"); + opts.PublishAllMessages().ToSqsQueue("send-and-receive"); + }).StartAsync(); +``` diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/BufferedSendingAndReceivingCompliance.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/BufferedSendingAndReceivingCompliance.cs index 4472d7d34..af2aa367e 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/BufferedSendingAndReceivingCompliance.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/BufferedSendingAndReceivingCompliance.cs @@ -22,7 +22,8 @@ public async Task InitializeAsync() await SenderIs(opts => { opts.UseAmazonSqsTransportLocally() - .AutoProvision().AutoPurgeOnStartup(); + .AutoProvision().AutoPurgeOnStartup() + .EnableSystemQueues(); opts.ListenToSqsQueue("sender-" + number); }); @@ -30,7 +31,8 @@ await SenderIs(opts => await ReceiverIs(opts => { opts.UseAmazonSqsTransportLocally() - .AutoProvision().AutoPurgeOnStartup(); + .AutoProvision().AutoPurgeOnStartup() + .EnableSystemQueues(); opts.ListenToSqsQueue("receiver-" + number).Named("receiver").BufferedInMemory(); }); diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/InlineSendingAndReceivingCompliance.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/InlineSendingAndReceivingCompliance.cs index c77baa104..12ddc08d4 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/InlineSendingAndReceivingCompliance.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/InlineSendingAndReceivingCompliance.cs @@ -25,7 +25,8 @@ await SenderIs(opts => { opts.UseAmazonSqsTransportLocally() .AutoProvision() - .AutoPurgeOnStartup(); + .AutoPurgeOnStartup() + .EnableSystemQueues(); opts.ListenToSqsQueue("sender-" + number); @@ -36,7 +37,8 @@ await ReceiverIs(opts => { opts.UseAmazonSqsTransportLocally() .AutoProvision() - .AutoPurgeOnStartup(); + .AutoPurgeOnStartup() + .EnableSystemQueues(); opts.ListenToSqsQueue("receiver-" + number).Named("receiver").ProcessInline(); }); diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsQueue.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsQueue.cs index c24814429..f6922eb80 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsQueue.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsQueue.cs @@ -282,6 +282,18 @@ internal async Task SetupAsync(IAmazonSQS client) var response = await client.CreateQueueAsync(Configuration); QueueUrl = response.QueueUrl; + + if (Role == EndpointRole.System) + { + await client.TagQueueAsync(new TagQueueRequest + { + QueueUrl = QueueUrl, + Tags = new Dictionary + { + ["wolverine:last-active"] = DateTime.UtcNow.ToString("o") + } + }); + } } catch (Exception e) { diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransport.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransport.cs index 30f84fe87..0c16b11ba 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransport.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransport.cs @@ -2,6 +2,7 @@ using Amazon.SQS; using Amazon.SQS.Model; using JasperFx.Core; +using Microsoft.Extensions.Logging; using Spectre.Console; using Wolverine.Configuration; using Wolverine.Runtime; @@ -12,9 +13,16 @@ namespace Wolverine.AmazonSqs.Internal; public class AmazonSqsTransport : BrokerTransport { public const string DeadLetterQueueName = DeadLetterQueueConstants.DefaultQueueName; - + public const string ResponseEndpointName = "AmazonSqsResponses"; public const char Separator = '-'; + private static readonly TimeSpan OrphanThreshold = TimeSpan.FromMinutes(5); + private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(2); + private const string LastActiveTagKey = "wolverine:last-active"; + + internal readonly List SystemQueues = new(); + private Task? _keepAliveTask; + public AmazonSqsTransport(string protocol) : base(protocol, "Amazon SQS", ["aws", "sqs"]) { Queues = new LightweightCache(name => new AmazonSqsQueue(name, this)); @@ -46,6 +54,12 @@ internal AmazonSqsTransport(IAmazonSQS client) : this() public bool UseLocalStackInDevelopment { get; set; } public bool DisableDeadLetterQueues { get; set; } + /// + /// Is this transport connection allowed to build and use response and control queues + /// for just this node? Default is false, requiring explicit opt-in. + /// + public bool SystemQueuesEnabled { get; set; } + public static string SanitizeSqsName(string identifier) { //AWS requires FIFO queues to have a `.fifo` suffix @@ -95,10 +109,127 @@ protected override AmazonSqsQueue findEndpointByUri(Uri uri) return Queues.Where(x => x.Uri.OriginalString == uri.OriginalString).FirstOrDefault() ?? Queues[uri.OriginalString.Split("//")[1].TrimEnd('/')]; } - public override ValueTask ConnectAsync(IWolverineRuntime runtime) + protected override void tryBuildSystemEndpoints(IWolverineRuntime runtime) + { + if (!SystemQueuesEnabled) return; + + // Lowercase the name because Uri normalizes the host portion to lowercase, + // and SQS queue names are case-sensitive. Without this, the sender creates + // "wolverine-response-MyApp-123" but the receiver resolves the reply URI + // to "wolverine-response-myapp-123" (lowercased by Uri), creating a different queue. + var responseName = SanitizeSqsName( + $"wolverine.response.{runtime.Options.ServiceName}.{runtime.DurabilitySettings.AssignedNodeNumber}") + .ToLowerInvariant(); + + var queue = Queues[responseName]; + queue.Mode = EndpointMode.BufferedInMemory; + queue.IsListener = true; + queue.EndpointName = ResponseEndpointName; + queue.IsUsedForReplies = true; + queue.Role = EndpointRole.System; + queue.DeadLetterQueueName = null; + queue.Configuration.Attributes ??= new Dictionary(); + queue.Configuration.Attributes["MessageRetentionPeriod"] = "300"; + + SystemQueues.Add(queue); + } + + public override async ValueTask ConnectAsync(IWolverineRuntime runtime) { Client ??= BuildClient(runtime); - return ValueTask.CompletedTask; + + if (SystemQueuesEnabled) + { + await CleanupOrphanedSystemQueuesAsync(runtime); + StartSystemQueueKeepAlive(runtime.DurabilitySettings.Cancellation, runtime); + } + } + + internal async Task CleanupOrphanedSystemQueuesAsync(IWolverineRuntime runtime) + { + var logger = runtime.LoggerFactory.CreateLogger(); + var prefixes = new[] { "wolverine-response-", "wolverine-control-" }; + + foreach (var prefix in prefixes) + { + try + { + var response = await Client!.ListQueuesAsync(new ListQueuesRequest { QueueNamePrefix = prefix }); + + foreach (var queueUrl in response.QueueUrls) + { + try + { + var tags = await Client.ListQueueTagsAsync(new ListQueueTagsRequest { QueueUrl = queueUrl }); + + if (tags.Tags.TryGetValue(LastActiveTagKey, out var lastActiveStr) + && DateTimeOffset.TryParse(lastActiveStr, out var lastActive)) + { + if (DateTimeOffset.UtcNow - lastActive > OrphanThreshold) + { + await Client.DeleteQueueAsync(new DeleteQueueRequest(queueUrl)); + logger.LogInformation("Deleted orphaned Wolverine system queue {QueueUrl}", queueUrl); + } + } + else + { + // No valid tag — consider it orphaned + await Client.DeleteQueueAsync(new DeleteQueueRequest(queueUrl)); + logger.LogInformation("Deleted untagged Wolverine system queue {QueueUrl}", queueUrl); + } + } + catch (Exception e) + { + logger.LogWarning(e, "Error checking orphaned queue {QueueUrl}", queueUrl); + } + } + } + catch (Exception e) + { + logger.LogWarning(e, "Error listing queues with prefix {Prefix}", prefix); + } + } + } + + internal async Task TagSystemQueueAsync(string queueUrl) + { + await Client!.TagQueueAsync(new TagQueueRequest + { + QueueUrl = queueUrl, + Tags = new Dictionary + { + [LastActiveTagKey] = DateTime.UtcNow.ToString("o") + } + }); + } + + internal void StartSystemQueueKeepAlive(CancellationToken cancellation, IWolverineRuntime runtime) + { + if (_keepAliveTask != null) return; + + var logger = runtime.LoggerFactory.CreateLogger(); + + _keepAliveTask = Task.Run(async () => + { + using var timer = new PeriodicTimer(KeepAliveInterval); + while (await timer.WaitForNextTickAsync(cancellation)) + { + foreach (var queue in SystemQueues) + { + if (queue.QueueUrl.IsNotEmpty()) + { + try + { + await TagSystemQueueAsync(queue.QueueUrl!); + } + catch (Exception e) + { + logger.LogWarning(e, "Error refreshing keep-alive tag for system queue {QueueName}", queue.QueueName); + } + } + } + } + }, cancellation); } public override IEnumerable DiagnosticColumns() diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransportConfiguration.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransportConfiguration.cs index 19db3c6a4..c8df667c9 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransportConfiguration.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsTransportConfiguration.cs @@ -1,4 +1,5 @@ using Amazon.Runtime; +using Wolverine.Configuration; using Wolverine.Runtime; using Wolverine.Transports; @@ -83,4 +84,58 @@ public AmazonSqsTransportConfiguration DisableAllNativeDeadLetterQueues() Transport.DisableDeadLetterQueues = true; return this; } + + /// + /// Enable Wolverine system queues for request/reply support. + /// Creates a per-node response queue that is automatically cleaned up. + /// + /// + public AmazonSqsTransportConfiguration EnableSystemQueues() + { + Transport.SystemQueuesEnabled = true; + return this; + } + + /// + /// Control whether Wolverine creates system queues for responses and retries. + /// Should be set to false if the application lacks permissions to create queues. + /// + /// + /// + public AmazonSqsTransportConfiguration SystemQueuesAreEnabled(bool enabled) + { + Transport.SystemQueuesEnabled = enabled; + return this; + } + + /// + /// Enable Wolverine control queues for inter-node communication + /// (leader election, node coordination). + /// + /// + public AmazonSqsTransportConfiguration EnableWolverineControlQueues() + { + Transport.SystemQueuesEnabled = true; + + // Lowercase to match URI normalization (see tryBuildSystemEndpoints comment) + var queueName = AmazonSqsTransport.SanitizeSqsName( + "wolverine.control." + Options.Durability.AssignedNodeNumber) + .ToLowerInvariant(); + + var queue = Transport.Queues[queueName]; + queue.Mode = EndpointMode.BufferedInMemory; + queue.IsListener = true; + queue.EndpointName = "Control"; + queue.IsUsedForReplies = true; + queue.Role = EndpointRole.System; + queue.DeadLetterQueueName = null; + queue.Configuration.Attributes ??= new Dictionary(); + queue.Configuration.Attributes["MessageRetentionPeriod"] = "300"; + + Options.Transports.NodeControlEndpoint = queue; + + Transport.SystemQueues.Add(queue); + + return this; + } } \ No newline at end of file diff --git a/src/Wolverine/AssemblyAttributes.cs b/src/Wolverine/AssemblyAttributes.cs index af6376d1e..b5bda4701 100644 --- a/src/Wolverine/AssemblyAttributes.cs +++ b/src/Wolverine/AssemblyAttributes.cs @@ -17,6 +17,7 @@ [assembly: InternalsVisibleTo("Wolverine.RabbitMq")] [assembly: InternalsVisibleTo("Wolverine.RabbitMq.Tests")] [assembly: InternalsVisibleTo("Wolverine.AzureServiceBus")] +[assembly: InternalsVisibleTo("Wolverine.AmazonSqs")] [assembly: InternalsVisibleTo("Wolverine.ConfluentKafka")] [assembly: InternalsVisibleTo("Wolverine.AzureServiceBus.Tests")] [assembly: InternalsVisibleTo("PersistenceTests")]