Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Mocha.Transport.RabbitMQ.Middlewares;

/// <summary>
/// Provides pre-configured RabbitMQ-specific dispatch middleware configurations.
/// </summary>
public static class RabbitMQDispatchMiddlewares
{
/// <summary>
/// Middleware configuration that extracts a routing key from the message and writes it to the dispatch headers.
/// </summary>
public static readonly DispatchMiddlewareConfiguration RoutingKey = RabbitMQRoutingKeyMiddleware.Create();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Mocha.Middlewares;

namespace Mocha.Transport.RabbitMQ.Middlewares;

/// <summary>
/// Dispatch middleware that extracts a routing key from the message using a
/// <see cref="RabbitMQRoutingKeyExtractor"/> stored on the message type's feature collection,
/// and writes it to the dispatch context headers for the terminal to read.
/// </summary>
internal sealed class RabbitMQRoutingKeyMiddleware
{
/// <summary>
/// Invokes the middleware, extracting the routing key if an extractor is configured on the message type.
/// </summary>
/// <param name="context">The dispatch context.</param>
/// <param name="next">The next delegate in the dispatch pipeline.</param>
public ValueTask InvokeAsync(IDispatchContext context, DispatchDelegate next)
{
if (context.MessageType is not null
&& context.Message is not null
&& context.MessageType.Features.TryGet<RabbitMQRoutingKeyExtractor>(out var extractor))
{
var routingKey = extractor.Extract(context.Message);
if (routingKey is not null)
{
context.Headers.Set(RabbitMQMessageHeaders.RoutingKey, routingKey);
}
}

return next(context);
}

private static readonly RabbitMQRoutingKeyMiddleware s_instance = new();

/// <summary>
/// Creates a <see cref="DispatchMiddlewareConfiguration"/> that wraps the routing key middleware singleton.
/// </summary>
/// <returns>A middleware configuration keyed as "RabbitMQRoutingKey".</returns>
public static DispatchMiddlewareConfiguration Create()
=> new(static (_, next) => ctx => s_instance.InvokeAsync(ctx, next), "RabbitMQRoutingKey");
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ private async ValueTask DispatchAsync(
if (Exchange is not null)
{
exchangeName = Exchange.CachedName;

if (envelope.Headers is not null
&& envelope.Headers.TryGet(RabbitMQMessageHeaders.RoutingKey, out var rk))
{
routingKey = new CachedString(rk);
}
}
else if (Queue is not null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ internal static class RabbitMQMessageHeaders
/// </summary>
public static readonly ContextDataKey<string> ContentType = new("x-content-type");

/// <summary>
/// Header key for the AMQP routing key, used to route messages to the correct exchange binding.
/// </summary>
public static readonly ContextDataKey<string> RoutingKey = new("x-routing-key");

/// <summary>
/// Header key for the list of message type names enclosed in the envelope, used for polymorphic deserialization.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace Mocha.Transport.RabbitMQ;

/// <summary>
/// Extension methods for configuring RabbitMQ routing keys on message type descriptors.
/// </summary>
public static class RabbitMQRoutingKeyExtensions
{
/// <summary>
/// Configures a routing key extractor for this message type, used when publishing to RabbitMQ exchanges.
/// </summary>
/// <typeparam name="TMessage">The message type.</typeparam>
/// <param name="descriptor">The message type descriptor.</param>
/// <param name="extractor">A function that extracts the routing key from a message instance.</param>
/// <returns>The descriptor for method chaining.</returns>
public static IMessageTypeDescriptor UseRabbitMQRoutingKey<TMessage>(
this IMessageTypeDescriptor descriptor,
Func<TMessage, string?> extractor)
{
var features = descriptor.Extend().Configuration.Features;

features.Set(RabbitMQRoutingKeyExtractor.Create(extractor));

return descriptor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace Mocha.Transport.RabbitMQ;

/// <summary>
/// Extracts a routing key from a message instance, stored as a feature on <see cref="MessageType"/>
/// to support transport-specific exchange routing.
/// </summary>
internal sealed class RabbitMQRoutingKeyExtractor(Func<object, string?> extractor)
{
/// <summary>
/// Extracts the routing key from the specified message.
/// </summary>
/// <param name="message">The message to extract the routing key from.</param>
/// <returns>The routing key, or <c>null</c> if none could be determined.</returns>
public string? Extract(object message) => extractor(message);

public static RabbitMQRoutingKeyExtractor Create<TMessage>(Func<TMessage, string?> extractor)
{
return new RabbitMQRoutingKeyExtractor(msg => extractor((TMessage)msg));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ internal static IRabbitMQMessagingTransportDescriptor AddDefaults(
descriptor
.UseReceive(RabbitMQReceiveMiddlewares.Parsing, after: RabbitMQReceiveMiddlewares.Acknowledgement.Key);

descriptor
.UseDispatch(RabbitMQDispatchMiddlewares.RoutingKey, before: DispatchMiddlewares.Serialization.Key);

return descriptor;
}
}
8 changes: 8 additions & 0 deletions src/Mocha/src/Mocha/MessageTypes/MessageType.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Immutable;
using Mocha.Features;

namespace Mocha;

Expand Down Expand Up @@ -42,6 +43,11 @@ private ImmutableDictionary<MessageContentType, IMessageSerializer> _serializer
/// </summary>
public MessageContentType? DefaultContentType { get; private set; }

/// <summary>
/// Gets the feature collection associated with this message type, providing transport-specific extensibility.
/// </summary>
public IFeatureCollection Features { get; private set; } = FeatureCollection.Empty;

/// <summary>
/// Gets a value indicating whether the underlying CLR type is an interface.
/// </summary>
Expand Down Expand Up @@ -71,6 +77,8 @@ public void Initialize(IMessagingConfigurationContext context, MessageTypeConfig
IsInternal = configuration.IsInternal;
DefaultContentType = configuration.DefaultContentType;

Features = configuration.GetFeatures().ToReadOnly();

_serializerRegistry =
context.Messages.Serializers ?? throw new InvalidOperationException("Serializer registry is required");

Expand Down
Loading
Loading