diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Middlewares/Dispatch/RabbitMQDispatchMiddlewares.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Middlewares/Dispatch/RabbitMQDispatchMiddlewares.cs new file mode 100644 index 00000000000..d8cff184511 --- /dev/null +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Middlewares/Dispatch/RabbitMQDispatchMiddlewares.cs @@ -0,0 +1,12 @@ +namespace Mocha.Transport.RabbitMQ.Middlewares; + +/// +/// Provides pre-configured RabbitMQ-specific dispatch middleware configurations. +/// +public static class RabbitMQDispatchMiddlewares +{ + /// + /// Middleware configuration that extracts a routing key from the message and writes it to the dispatch headers. + /// + public static readonly DispatchMiddlewareConfiguration RoutingKey = RabbitMQRoutingKeyMiddleware.Create(); +} diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Middlewares/Dispatch/RabbitMQRoutingKeyMiddleware.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Middlewares/Dispatch/RabbitMQRoutingKeyMiddleware.cs new file mode 100644 index 00000000000..2e21f25703d --- /dev/null +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Middlewares/Dispatch/RabbitMQRoutingKeyMiddleware.cs @@ -0,0 +1,41 @@ +using Mocha.Middlewares; + +namespace Mocha.Transport.RabbitMQ.Middlewares; + +/// +/// Dispatch middleware that extracts a routing key from the message using a +/// stored on the message type's feature collection, +/// and writes it to the dispatch context headers for the terminal to read. +/// +internal sealed class RabbitMQRoutingKeyMiddleware +{ + /// + /// Invokes the middleware, extracting the routing key if an extractor is configured on the message type. + /// + /// The dispatch context. + /// The next delegate in the dispatch pipeline. + public ValueTask InvokeAsync(IDispatchContext context, DispatchDelegate next) + { + if (context.MessageType is not null + && context.Message is not null + && context.MessageType.Features.TryGet(out var extractor)) + { + var routingKey = extractor.Extract(context.Message); + if (routingKey is not null) + { + context.Headers.Set(RabbitMQMessageHeaders.RoutingKey, routingKey); + } + } + + return next(context); + } + + private static readonly RabbitMQRoutingKeyMiddleware s_instance = new(); + + /// + /// Creates a that wraps the routing key middleware singleton. + /// + /// A middleware configuration keyed as "RabbitMQRoutingKey". + public static DispatchMiddlewareConfiguration Create() + => new(static (_, next) => ctx => s_instance.InvokeAsync(ctx, next), "RabbitMQRoutingKey"); +} diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQDispatchEndpoint.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQDispatchEndpoint.cs index 951534d8b60..d2d2844bda5 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQDispatchEndpoint.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQDispatchEndpoint.cs @@ -108,6 +108,12 @@ private async ValueTask DispatchAsync( if (Exchange is not null) { exchangeName = Exchange.CachedName; + + if (envelope.Headers is not null + && envelope.Headers.TryGet(RabbitMQMessageHeaders.RoutingKey, out var rk)) + { + routingKey = new CachedString(rk); + } } else if (Queue is not null) { diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessageHeaders.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessageHeaders.cs index f56d3230df7..c5924bdd01a 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessageHeaders.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessageHeaders.cs @@ -43,6 +43,11 @@ internal static class RabbitMQMessageHeaders /// public static readonly ContextDataKey ContentType = new("x-content-type"); + /// + /// Header key for the AMQP routing key, used to route messages to the correct exchange binding. + /// + public static readonly ContextDataKey RoutingKey = new("x-routing-key"); + /// /// Header key for the list of message type names enclosed in the envelope, used for polymorphic deserialization. /// diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQRoutingKeyExtensions.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQRoutingKeyExtensions.cs new file mode 100644 index 00000000000..99059400fd4 --- /dev/null +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQRoutingKeyExtensions.cs @@ -0,0 +1,25 @@ +namespace Mocha.Transport.RabbitMQ; + +/// +/// Extension methods for configuring RabbitMQ routing keys on message type descriptors. +/// +public static class RabbitMQRoutingKeyExtensions +{ + /// + /// Configures a routing key extractor for this message type, used when publishing to RabbitMQ exchanges. + /// + /// The message type. + /// The message type descriptor. + /// A function that extracts the routing key from a message instance. + /// The descriptor for method chaining. + public static IMessageTypeDescriptor UseRabbitMQRoutingKey( + this IMessageTypeDescriptor descriptor, + Func extractor) + { + var features = descriptor.Extend().Configuration.Features; + + features.Set(RabbitMQRoutingKeyExtractor.Create(extractor)); + + return descriptor; + } +} diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQRoutingKeyExtractor.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQRoutingKeyExtractor.cs new file mode 100644 index 00000000000..7beaa540a16 --- /dev/null +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQRoutingKeyExtractor.cs @@ -0,0 +1,20 @@ +namespace Mocha.Transport.RabbitMQ; + +/// +/// Extracts a routing key from a message instance, stored as a feature on +/// to support transport-specific exchange routing. +/// +internal sealed class RabbitMQRoutingKeyExtractor(Func extractor) +{ + /// + /// Extracts the routing key from the specified message. + /// + /// The message to extract the routing key from. + /// The routing key, or null if none could be determined. + public string? Extract(object message) => extractor(message); + + public static RabbitMQRoutingKeyExtractor Create(Func extractor) + { + return new RabbitMQRoutingKeyExtractor(msg => extractor((TMessage)msg)); + } +} diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/Extensions/RabbitMQTransportDescriptorExtensions.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/Extensions/RabbitMQTransportDescriptorExtensions.cs index 9ef4a7a1c24..ca4f6919b6b 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/Extensions/RabbitMQTransportDescriptorExtensions.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/Extensions/RabbitMQTransportDescriptorExtensions.cs @@ -19,6 +19,9 @@ internal static IRabbitMQMessagingTransportDescriptor AddDefaults( descriptor .UseReceive(RabbitMQReceiveMiddlewares.Parsing, after: RabbitMQReceiveMiddlewares.Acknowledgement.Key); + descriptor + .UseDispatch(RabbitMQDispatchMiddlewares.RoutingKey, before: DispatchMiddlewares.Serialization.Key); + return descriptor; } } diff --git a/src/Mocha/src/Mocha/MessageTypes/MessageType.cs b/src/Mocha/src/Mocha/MessageTypes/MessageType.cs index bf1d04b752f..ad470f2f5b6 100644 --- a/src/Mocha/src/Mocha/MessageTypes/MessageType.cs +++ b/src/Mocha/src/Mocha/MessageTypes/MessageType.cs @@ -1,4 +1,5 @@ using System.Collections.Immutable; +using Mocha.Features; namespace Mocha; @@ -42,6 +43,11 @@ private ImmutableDictionary _serializer /// public MessageContentType? DefaultContentType { get; private set; } + /// + /// Gets the feature collection associated with this message type, providing transport-specific extensibility. + /// + public IFeatureCollection Features { get; private set; } = FeatureCollection.Empty; + /// /// Gets a value indicating whether the underlying CLR type is an interface. /// @@ -71,6 +77,8 @@ public void Initialize(IMessagingConfigurationContext context, MessageTypeConfig IsInternal = configuration.IsInternal; DefaultContentType = configuration.DefaultContentType; + Features = configuration.GetFeatures().ToReadOnly(); + _serializerRegistry = context.Messages.Serializers ?? throw new InvalidOperationException("Serializer registry is required"); diff --git a/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Behaviors/RoutingKeyTests.cs b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Behaviors/RoutingKeyTests.cs new file mode 100644 index 00000000000..31e60a1ac41 --- /dev/null +++ b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Behaviors/RoutingKeyTests.cs @@ -0,0 +1,261 @@ +using System.Collections.Concurrent; +using Microsoft.Extensions.DependencyInjection; +using Mocha.Transport.RabbitMQ.Tests.Helpers; + +namespace Mocha.Transport.RabbitMQ.Tests.Behaviors; + +[Collection("RabbitMQ")] +public class RoutingKeyTests +{ + private static readonly TimeSpan s_timeout = TimeSpan.FromSeconds(30); + private static readonly TimeSpan s_negativeTimeout = TimeSpan.FromSeconds(2); + private readonly RabbitMQFixture _fixture; + + public RoutingKeyTests(RabbitMQFixture fixture) + { + _fixture = fixture; + } + + [Fact] + public async Task PublishAsync_Should_RouteToMatchingQueue_When_RoutingKeyMatchesBindingPattern() + { + // arrange + var usRecorder = new MessageRecorder(); + var euRecorder = new MessageRecorder(); + await using var vhost = await _fixture.CreateVhostAsync(); + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddKeyedSingleton("us", usRecorder) + .AddKeyedSingleton("eu", euRecorder) + .AddMessageBus() + .AddConsumer() + .AddConsumer() + .AddMessage(m => m.UseRabbitMQRoutingKey(msg => msg.Region)) + .AddRabbitMQ(t => + { + t.BindHandlersExplicitly(); + + t.DeclareExchange("region-topic").Type(RabbitMQExchangeType.Topic); + t.DeclareQueue("us-queue"); + t.DeclareQueue("eu-queue"); + t.DeclareBinding("region-topic", "us-queue").RoutingKey("us.*"); + t.DeclareBinding("region-topic", "eu-queue").RoutingKey("eu.*"); + + t.Endpoint("us-ep").Consumer().Queue("us-queue"); + t.Endpoint("eu-ep").Consumer().Queue("eu-queue"); + t.DispatchEndpoint("region-dispatch").ToExchange("region-topic").Publish(); + }) + .BuildTestBusAsync(); + + using var scope = bus.Provider.CreateScope(); + var messageBus = scope.ServiceProvider.GetRequiredService(); + + // act + await messageBus.PublishAsync( + new RegionEvent { Region = "us.east", Payload = "hello-us" }, + CancellationToken.None); + + // assert + Assert.True(await usRecorder.WaitAsync(s_timeout), "US handler did not receive the event"); + var usMessage = Assert.Single(usRecorder.Messages); + var usEvent = Assert.IsType(usMessage); + Assert.Equal("us.east", usEvent.Region); + Assert.Equal("hello-us", usEvent.Payload); + + Assert.False( + await euRecorder.WaitAsync(s_negativeTimeout), + "EU handler should not have received a message routed to us.*"); + Assert.Empty(euRecorder.Messages); + } + + [Fact] + public async Task PublishAsync_Should_RouteToAllMatchingQueues_When_WildcardBindingMatches() + { + // arrange + var recorder = new MessageRecorder(); + await using var vhost = await _fixture.CreateVhostAsync(); + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(recorder) + .AddMessageBus() + .AddConsumer() + .AddMessage(m => m.UseRabbitMQRoutingKey(msg => msg.Region)) + .AddRabbitMQ(t => + { + t.BindHandlersExplicitly(); + + t.DeclareExchange("wildcard-topic").Type(RabbitMQExchangeType.Topic); + t.DeclareQueue("catch-all-queue"); + t.DeclareBinding("wildcard-topic", "catch-all-queue").RoutingKey("#"); + + t.Endpoint("catch-all-ep").Consumer().Queue("catch-all-queue"); + t.DispatchEndpoint("wildcard-dispatch").ToExchange("wildcard-topic").Publish(); + }) + .BuildTestBusAsync(); + + using var scope = bus.Provider.CreateScope(); + var messageBus = scope.ServiceProvider.GetRequiredService(); + + // act + await messageBus.PublishAsync( + new RegionEvent { Region = "us.east", Payload = "msg-1" }, + CancellationToken.None); + await messageBus.PublishAsync( + new RegionEvent { Region = "eu.west", Payload = "msg-2" }, + CancellationToken.None); + await messageBus.PublishAsync( + new RegionEvent { Region = "ap.southeast.sg", Payload = "msg-3" }, + CancellationToken.None); + + // assert + Assert.True( + await recorder.WaitAsync(s_timeout, expectedCount: 3), + "Catch-all handler did not receive all 3 events within timeout"); + + Assert.Equal(3, recorder.Messages.Count); + + var payloads = recorder + .Messages.Cast() + .Select(e => e.Payload) + .OrderBy(p => p, StringComparer.Ordinal) + .ToList(); + + Assert.Equal(["msg-1", "msg-2", "msg-3"], payloads); + } + + [Fact] + public async Task PublishAsync_Should_NotDeliverToQueue_When_RoutingKeyDoesNotMatchBinding() + { + // arrange + var recorder = new MessageRecorder(); + await using var vhost = await _fixture.CreateVhostAsync(); + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(recorder) + .AddMessageBus() + .AddConsumer() + .AddMessage(m => m.UseRabbitMQRoutingKey(msg => msg.Region)) + .AddRabbitMQ(t => + { + t.BindHandlersExplicitly(); + + t.DeclareExchange("nomatch-topic").Type(RabbitMQExchangeType.Topic); + t.DeclareQueue("nomatch-queue"); + t.DeclareBinding("nomatch-topic", "nomatch-queue").RoutingKey("eu.*"); + + t.Endpoint("nomatch-ep").Consumer().Queue("nomatch-queue"); + t.DispatchEndpoint("nomatch-dispatch").ToExchange("nomatch-topic").Publish(); + }) + .BuildTestBusAsync(); + + using var scope = bus.Provider.CreateScope(); + var messageBus = scope.ServiceProvider.GetRequiredService(); + + // act - publish with a routing key that does not match "eu.*" + await messageBus.PublishAsync( + new RegionEvent { Region = "us.east", Payload = "should-not-arrive" }, + CancellationToken.None); + + // assert + Assert.False( + await recorder.WaitAsync(s_negativeTimeout), + "Handler should not have received a message when the routing key does not match the binding"); + Assert.Empty(recorder.Messages); + } + + [Fact] + public async Task PublishAsync_Should_SetRoutingKeyHeader_When_ExtractorConfigured() + { + // arrange + var tracker = new RoutingKeyTracker(); + var recorder = new MessageRecorder(); + await using var vhost = await _fixture.CreateVhostAsync(); + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(tracker) + .AddSingleton(recorder) + .AddMessageBus() + .AddConsumer() + .AddMessage(m => m.UseRabbitMQRoutingKey(msg => msg.Region)) + .AddRabbitMQ(t => + { + t.BindHandlersExplicitly(); + + t.DeclareExchange("header-topic").Type(RabbitMQExchangeType.Topic); + t.DeclareQueue("header-queue"); + t.DeclareBinding("header-topic", "header-queue").RoutingKey("#"); + + t.Endpoint("header-ep").Consumer().Queue("header-queue"); + t.DispatchEndpoint("header-dispatch") + .ToExchange("header-topic") + .Publish() + .UseDispatch( + new DispatchMiddlewareConfiguration( + (_, next) => + context => + { + if (context.Headers.TryGet(RabbitMQMessageHeaders.RoutingKey, out var routingKey)) + { + tracker.CapturedKeys.Add(routingKey); + } + + return next(context); + }, + "routing-key-spy"), + after: "RabbitMQRoutingKey"); + }) + .BuildTestBusAsync(); + + using var scope = bus.Provider.CreateScope(); + var messageBus = scope.ServiceProvider.GetRequiredService(); + + // act + await messageBus.PublishAsync( + new RegionEvent { Region = "ap.southeast", Payload = "header-check" }, + CancellationToken.None); + + // assert + Assert.True(await recorder.WaitAsync(s_timeout), "Consumer did not receive the message within timeout"); + + var capturedKey = Assert.Single(tracker.CapturedKeys); + Assert.Equal("ap.southeast", capturedKey); + } + + public sealed class RegionEvent + { + public required string Region { get; init; } + public required string Payload { get; init; } + } + + public sealed class UsRegionConsumer([FromKeyedServices("us")] MessageRecorder recorder) : IConsumer + { + public ValueTask ConsumeAsync(IConsumeContext context) + { + recorder.Record(context.Message); + return default; + } + } + + public sealed class EuRegionConsumer([FromKeyedServices("eu")] MessageRecorder recorder) : IConsumer + { + public ValueTask ConsumeAsync(IConsumeContext context) + { + recorder.Record(context.Message); + return default; + } + } + + public sealed class CatchAllConsumer(MessageRecorder recorder) : IConsumer + { + public ValueTask ConsumeAsync(IConsumeContext context) + { + recorder.Record(context.Message); + return default; + } + } + + public sealed class RoutingKeyTracker + { + public ConcurrentBag CapturedKeys { get; } = []; + } +} diff --git a/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/RabbitMQRoutingKeyTests.cs b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/RabbitMQRoutingKeyTests.cs new file mode 100644 index 00000000000..3746c57e3a4 --- /dev/null +++ b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/RabbitMQRoutingKeyTests.cs @@ -0,0 +1,182 @@ +using Microsoft.Extensions.DependencyInjection; +using Mocha.Middlewares; +using Mocha.Transport.RabbitMQ.Middlewares; +using Mocha.Transport.RabbitMQ.Tests.Helpers; + +namespace Mocha.Transport.RabbitMQ.Tests; + +public class RabbitMQRoutingKeyTests +{ + [Fact] + public void UseRabbitMQRoutingKey_Should_StoreExtractorOnMessageType() + { + // arrange & act + var runtime = CreateRuntime( + b => b.AddMessage( + d => d.UseRabbitMQRoutingKey(msg => msg.OrderId))); + + // assert + var messageType = runtime.Messages.GetMessageType(typeof(OrderCreated)); + Assert.NotNull(messageType); + Assert.True(messageType!.Features.TryGet(out var extractor)); + + var order = new OrderCreated { OrderId = "ORD-123" }; + Assert.Equal("ORD-123", extractor.Extract(order)); + } + + [Fact] + public void UseRabbitMQRoutingKey_Should_ExtractCorrectValue_When_CompositeKey() + { + // arrange & act + var runtime = CreateRuntime( + b => b.AddMessage( + d => d.UseRabbitMQRoutingKey(msg => $"{msg.OrderId}.priority"))); + + // assert + var messageType = runtime.Messages.GetMessageType(typeof(OrderCreated)); + Assert.NotNull(messageType); + Assert.True(messageType!.Features.TryGet(out var extractor)); + + var order = new OrderCreated { OrderId = "ORD-456" }; + Assert.Equal("ORD-456.priority", extractor.Extract(order)); + } + + [Fact] + public void UseRabbitMQRoutingKey_Should_ReturnNull_When_ExtractorReturnsNull() + { + // arrange & act + var runtime = CreateRuntime( + b => b.AddMessage( + d => d.UseRabbitMQRoutingKey(_ => null))); + + // assert + var messageType = runtime.Messages.GetMessageType(typeof(OrderCreated)); + Assert.NotNull(messageType); + Assert.True(messageType!.Features.TryGet(out var extractor)); + + var order = new OrderCreated { OrderId = "ORD-789" }; + Assert.Null(extractor.Extract(order)); + } + + [Fact] + public void UseRabbitMQRoutingKey_Should_NotStoreExtractor_When_NotConfigured() + { + // arrange & act + var runtime = CreateRuntime( + b => b.AddMessage( + d => d.Publish(r => r.ToRabbitMQExchange("orders")))); + + // assert + var messageType = runtime.Messages.GetMessageType(typeof(OrderCreated)); + Assert.NotNull(messageType); + Assert.False(messageType!.Features.TryGet(out _)); + } + + [Fact] + public async Task Middleware_Should_SetRoutingKeyHeader_When_ExtractorConfigured() + { + // arrange + var runtime = CreateRuntime( + b => b.AddMessage( + d => d.UseRabbitMQRoutingKey(msg => msg.OrderId))); + + var messageType = runtime.Messages.GetMessageType(typeof(OrderCreated)); + Assert.NotNull(messageType); + + var context = new DispatchContext(); + context.MessageType = messageType; + context.Message = new OrderCreated { OrderId = "ORD-999" }; + + string? capturedRoutingKey = null; + DispatchDelegate terminal = ctx => + { + ctx.Headers.TryGetValue("x-routing-key", out var value); + capturedRoutingKey = value as string; + return default; + }; + + var middleware = new RabbitMQRoutingKeyMiddleware(); + + // act + await middleware.InvokeAsync(context, terminal); + + // assert + Assert.Equal("ORD-999", capturedRoutingKey); + } + + [Fact] + public async Task Middleware_Should_NotSetRoutingKeyHeader_When_ExtractorNotConfigured() + { + // arrange + var runtime = CreateRuntime( + b => b.AddMessage( + d => d.Publish(r => r.ToRabbitMQExchange("orders")))); + + var messageType = runtime.Messages.GetMessageType(typeof(OrderCreated)); + Assert.NotNull(messageType); + + var context = new DispatchContext(); + context.MessageType = messageType; + context.Message = new OrderCreated { OrderId = "ORD-000" }; + + var nextCalled = false; + DispatchDelegate terminal = _ => + { + nextCalled = true; + return default; + }; + + var middleware = new RabbitMQRoutingKeyMiddleware(); + + // act + await middleware.InvokeAsync(context, terminal); + + // assert + Assert.True(nextCalled); + Assert.False(context.Headers.TryGetValue("x-routing-key", out _)); + } + + [Fact] + public async Task Middleware_Should_NotSetRoutingKeyHeader_When_ExtractorReturnsNull() + { + // arrange + var runtime = CreateRuntime( + b => b.AddMessage( + d => d.UseRabbitMQRoutingKey(_ => null))); + + var messageType = runtime.Messages.GetMessageType(typeof(OrderCreated)); + Assert.NotNull(messageType); + + var context = new DispatchContext(); + context.MessageType = messageType; + context.Message = new OrderCreated { OrderId = "ORD-111" }; + + var nextCalled = false; + DispatchDelegate terminal = _ => + { + nextCalled = true; + return default; + }; + + var middleware = new RabbitMQRoutingKeyMiddleware(); + + // act + await middleware.InvokeAsync(context, terminal); + + // assert + Assert.True(nextCalled); + Assert.False(context.Headers.TryGetValue("x-routing-key", out _)); + } + + private static MessagingRuntime CreateRuntime(Action configure) + { + var services = new ServiceCollection(); + services.AddSingleton(new MessageRecorder()); + var builder = services.AddMessageBus(); + configure(builder); + var runtime = builder + .AddRabbitMQ(t => t.ConnectionProvider(_ => new StubConnectionProvider())) + .BuildRuntime(); + return runtime; + } +} diff --git a/website/src/docs/mocha/v1/transports/rabbitmq.md b/website/src/docs/mocha/v1/transports/rabbitmq.md index 84815b67154..731b8379fa2 100644 --- a/website/src/docs/mocha/v1/transports/rabbitmq.md +++ b/website/src/docs/mocha/v1/transports/rabbitmq.md @@ -428,6 +428,163 @@ For prefetch tuning guidance from first principles, see [CloudAMQP Best Practice All auto-provisioned resources are durable by default and survive broker restarts. +# Routing keys + +RabbitMQ uses a `routing_key` field on every published message to decide which queues receive it. When you publish to a **topic exchange**, the broker compares the message's routing key against binding patterns on each queue. Queues whose pattern matches get the message. Queues that don't match never see it. + +**Direct exchanges** work the same way, but require an exact match instead of a pattern. + +**Fanout exchanges** ignore routing keys entirely - every bound queue gets every message. + +Routing keys are useful when you need to split a single message stream across different consumers based on a property of the message itself: + +- **Disconnecting producers from consumers** - publish messages without knowing which queues or services will consume them. Consumers can bind with patterns to receive only the messages they care about. +- **Multi-tenant routing** - route messages to tenant-specific queues (`tenant-a.orders`, `tenant-b.orders`) +- **Region-based routing** - route to regional processors (`us.east`, `eu.west`) +- **Priority routing** - separate high-priority and low-priority messages (`priority.high`, `priority.low`) + +For a full treatment of topic exchange routing, see the [RabbitMQ Topics Tutorial](https://www.rabbitmq.com/tutorials/tutorial-five-dotnet). + +## Configure routing key extraction + +To set a routing key on published messages, call `UseRabbitMQRoutingKey()` when registering the message type: + +```csharp +builder.Services + .AddMessageBus() + .AddMessage(m => m + .UseRabbitMQRoutingKey(msg => msg.Region)) + .AddRabbitMQ(); +``` + +The extractor function runs at dispatch time for each message. It receives the message instance and returns the routing key string. Return `null` to publish without a routing key. + +`UseRabbitMQRoutingKey()` is configured on `AddMessage()`, not on the transport or endpoint. This keeps routing key logic next to the message definition where it belongs. + +### Composite routing keys + +Combine multiple properties into a single routing key using string interpolation: + +```csharp +builder.Services + .AddMessageBus() + .AddMessage(m => m + .UseRabbitMQRoutingKey(msg => $"{msg.TenantId}.{msg.Region}")) + .AddRabbitMQ(); +``` + +This produces routing keys like `acme.us.east` or `contoso.eu.west`, which you can match with topic exchange binding patterns like `acme.#` or `*.eu.*`. + +## Topic exchange example + +This example routes region-tagged events to different queues based on their routing key. The US queue receives messages matching `us.*`, and the EU queue receives messages matching `eu.*`. + +```mermaid +graph LR + P[Publisher] -->|"region = us.east"| E[Topic Exchange
region-events] + E -->|"us.* ✓"| QA[Queue
us-orders] + E -->|"eu.* ✗"| QB[Queue
eu-orders] + QA --> CA[US Consumer] + + style QB stroke-dasharray: 5 5 +``` + +### Define the message type + +```csharp +public sealed class RegionEvent +{ + public required string Region { get; init; } + public required string Payload { get; init; } +} +``` + +### Wire up the bus + +```csharp +builder.Services + .AddMessageBus() + .AddConsumer() + .AddConsumer() + .AddMessage(m => m + .UseRabbitMQRoutingKey(msg => msg.Region)) + .AddRabbitMQ(transport => + { + transport.BindHandlersExplicitly(); + + // Declare topology: one topic exchange, two queues, two bindings with patterns + transport.DeclareExchange("region-events") + .Type(RabbitMQExchangeType.Topic); + + transport.DeclareQueue("us-orders"); + transport.DeclareQueue("eu-orders"); + + transport.DeclareBinding("region-events", "us-orders") + .RoutingKey("us.*"); + transport.DeclareBinding("region-events", "eu-orders") + .RoutingKey("eu.*"); + + // Bind consumers to queues + transport.Endpoint("us-ep") + .Consumer() + .Queue("us-orders"); + transport.Endpoint("eu-ep") + .Consumer() + .Queue("eu-orders"); + + // Dispatch to the topic exchange + transport.DispatchEndpoint("region-dispatch") + .ToExchange("region-events") + .Publish(); + }); +``` + +When you publish a `RegionEvent` with `Region = "us.east"`, the routing key middleware extracts `"us.east"` from the message and sets it on the AMQP publish. The topic exchange matches `"us.east"` against `us.*` (match) and `eu.*` (no match). Only the US queue receives the message. + +### Topic exchange binding patterns + +| Pattern | Matches | Does not match | +| --------- | ---------------------------- | ------------------------ | +| `us.*` | `us.east`, `us.west` | `us.east.az1`, `eu.west` | +| `eu.#` | `eu.west`, `eu.west.az1` | `us.east` | +| `#` | Everything | - | +| `*.*.az1` | `us.east.az1`, `eu.west.az1` | `us.east` | + +`*` matches exactly one word. `#` matches zero or more words. Words are separated by dots. + +## Direct exchange routing keys + +Direct exchanges use exact-match routing keys instead of patterns. A message with routing key `"priority-high"` reaches only queues bound with exactly `"priority-high"`. + +```csharp +builder.Services + .AddMessageBus() + .AddConsumer() + .AddMessage(m => m + .UseRabbitMQRoutingKey(msg => $"priority-{msg.Priority}")) + .AddRabbitMQ(transport => + { + transport.BindHandlersExplicitly(); + + transport.DeclareExchange("task-routing") + .Type(RabbitMQExchangeType.Direct); + + transport.DeclareQueue("high-priority-tasks"); + transport.DeclareBinding("task-routing", "high-priority-tasks") + .RoutingKey("priority-high"); + + transport.Endpoint("high-priority-ep") + .Consumer() + .Queue("high-priority-tasks"); + + transport.DispatchEndpoint("task-dispatch") + .ToExchange("task-routing") + .Publish(); + }); +``` + +Messages with `Priority = "high"` reach the queue. Messages with any other priority are dropped by the exchange (unless another queue is bound with a matching routing key). + # Next steps - [Transports Overview](/docs/mocha/v1/transports) - Understand the transport abstraction and lifecycle.