diff --git a/docs/guide/migrating-to-wolverine.md b/docs/guide/migrating-to-wolverine.md index 2ed753995..b34cbca67 100644 --- a/docs/guide/migrating-to-wolverine.md +++ b/docs/guide/migrating-to-wolverine.md @@ -374,6 +374,106 @@ opts.Policies.RegisterInteropMessageAssembly(typeof(SharedMessages).Assembly); Supported transports: RabbitMQ, Azure Service Bus, Amazon SQS/SNS. See the full [interoperability guide](/tutorials/interop#interop-with-masstransit). +### MassTransit Shim Interfaces + +Wolverine provides shim interfaces in the `Wolverine.Shims.MassTransit` namespace that mimic MassTransit's +core consumer API while delegating to Wolverine's `IMessageBus` and `IMessageContext`. These shims let you +keep your existing `IConsumer` handler signatures working under Wolverine during migration. + +::: tip +The shim interfaces are included in the core Wolverine NuGet package -- no additional packages are needed. +While these shims ease migration, the Wolverine team recommends eventually moving to Wolverine's native +convention-based handlers for the best developer experience. +::: + +#### Automatic Handler Discovery + +Wolverine automatically discovers classes implementing `IConsumer` during its normal +[handler discovery](/guide/handlers/discovery) assembly scanning -- no explicit registration is needed. +The `ConsumeContext`, `IPublishEndpoint`, and `ISendEndpointProvider` types are automatically +resolved in handler methods via Wolverine's built-in code generation. + +Just make sure the assembly containing your `IConsumer` implementations is included in Wolverine's +discovery. By default, Wolverine scans the application assembly and any assemblies explicitly added via +`opts.Discovery.IncludeAssembly()`. See the [handler discovery documentation](/guide/handlers/discovery) +for more details on controlling which assemblies are scanned. + +#### Available Interfaces + +| MassTransit Shim | Delegates To | Purpose | +|-----------------|-------------|---------| +| `IConsumer` | `IWolverineHandler` | Consumer/handler discovery marker | +| `ConsumeContext` | `IMessageContext` | Message access, Send/Publish/Respond inside consumers | +| `IPublishEndpoint` | `IMessageBus` | Publish events outside of consumers | +| `ISendEndpointProvider` | `IMessageBus` | Send commands outside of consumers | + +#### Using IConsumer\ + +The `IConsumer` shim extends `IWolverineHandler`, so implementing it automatically registers your +consumer with Wolverine's handler discovery: + +```csharp +using Wolverine.Shims.MassTransit; + +public class OrderConsumer : IConsumer +{ + public async Task Consume(ConsumeContext context) + { + var order = new Order(context.Message.OrderId); + + // ConsumeContext delegates to Wolverine's IMessageContext + await context.Publish(new OrderSubmitted { OrderId = context.Message.OrderId }); + await context.RespondAsync(new SubmitOrderResponse { Success = true }); + } +} +``` + +#### Using IPublishEndpoint / ISendEndpointProvider + +Inject these interfaces to send and publish messages outside of consumers: + +```csharp +using Wolverine.Shims.MassTransit; + +public class OrderController : ControllerBase +{ + private readonly ISendEndpointProvider _sender; + private readonly IPublishEndpoint _publisher; + + public OrderController(ISendEndpointProvider sender, IPublishEndpoint publisher) + { + _sender = sender; + _publisher = publisher; + } + + [HttpPost] + public async Task PlaceOrder(PlaceOrderRequest request) + { + await _sender.Send(new SubmitOrder(request.OrderId)); + return Accepted(); + } + + [HttpPost("notify")] + public async Task NotifyOrderShipped(string orderId) + { + await _publisher.Publish(new OrderShipped { OrderId = orderId }); + return Ok(); + } +} +``` + +#### ConsumeContext Properties + +The `ConsumeContext` shim exposes common MassTransit properties mapped to Wolverine: + +| ConsumeContext Property | Wolverine Source | +|------------------------|-----------------| +| `Message` | The message instance | +| `MessageId` | `Envelope.Id` | +| `CorrelationId` | `IMessageContext.CorrelationId` | +| `ConversationId` | `Envelope.ConversationId` | +| `Headers` | `Envelope.Headers` | + ### Migration Checklist **Phase 1: Coexistence** @@ -588,6 +688,129 @@ opts.Policies.RegisterInteropMessageAssembly(typeof(SharedMessages).Assembly); Wolverine detects message types from standard NServiceBus headers. You may need [message type aliases](/guide/messages#message-type-name-or-alias) to bridge naming differences. See the full [interoperability guide](/tutorials/interop#interop-with-nservicebus). +### NServiceBus Shim Interfaces + +Wolverine provides shim interfaces in the `Wolverine.Shims.NServiceBus` namespace that mimic the core NServiceBus +API surface while delegating to Wolverine's `IMessageBus` and `IMessageContext` under the hood. These shims let +you migrate handler code incrementally without rewriting every handler signature at once. + +::: tip +The shim interfaces are included in the core Wolverine NuGet package -- no additional packages are needed. +While these shims ease migration, the Wolverine team recommends eventually moving to Wolverine's native +convention-based handlers and pure function style for the best developer experience. +::: + +#### Automatic Handler Discovery + +Wolverine automatically discovers classes implementing `IHandleMessages` during its normal +[handler discovery](/guide/handlers/discovery) assembly scanning -- no explicit registration is needed. +The `IMessageHandlerContext` parameter in `Handle(T message, IMessageHandlerContext context)` is +automatically resolved via Wolverine's built-in code generation. + +Just make sure the assembly containing your `IHandleMessages` implementations is included in +Wolverine's discovery. By default, Wolverine scans the application assembly and any assemblies +explicitly added via `opts.Discovery.IncludeAssembly()`. See the +[handler discovery documentation](/guide/handlers/discovery) for more details on controlling which +assemblies are scanned. + +#### DI Registration for Non-Handler Interfaces + +If you need to inject NServiceBus shim interfaces (`IMessageSession`, `IEndpointInstance`, +`IUniformSession`, `ITransactionalSession`) into services outside of message handlers via +constructor injection, register them with: + +```csharp +builder.Host.UseWolverine(opts => +{ + opts.UseNServiceBusShims(); + + // Your Wolverine configuration... +}); +``` + +#### Available Interfaces + +| NServiceBus Shim | Delegates To | Purpose | +|-----------------|-------------|---------| +| `IMessageSession` | `IMessageBus` | Send/Publish outside of handlers | +| `IEndpointInstance` | `IMessageBus` + `IHost` | Running endpoint with lifecycle | +| `IMessageHandlerContext` | `IMessageContext` | Send/Publish/Reply inside handlers | +| `IUniformSession` | `IMessageBus` | Unified Send/Publish (inside or outside handlers) | +| `ITransactionalSession` | `IMessageBus` | Transactional Send/Publish (Open/Commit are obsolete) | +| `IHandleMessages` | `IWolverineHandler` | Handler discovery marker | + +#### Using IHandleMessages\ + +The `IHandleMessages` shim extends `IWolverineHandler`, so implementing it automatically registers your +handler with Wolverine's handler discovery: + +```csharp +using Wolverine.Shims.NServiceBus; + +// This handler is discovered by Wolverine via the IWolverineHandler marker +public class OrderHandler : IHandleMessages +{ + public async Task Handle(PlaceOrder message, IMessageHandlerContext context) + { + // context.Send, context.Publish, context.Reply all delegate to Wolverine + await context.Publish(new OrderPlaced(message.OrderId)); + await context.Reply(new PlaceOrderResponse { Success = true }); + } +} +``` + +#### Using IMessageSession / IEndpointInstance + +Inject `IMessageSession` or `IEndpointInstance` to send and publish messages outside of handlers: + +```csharp +using Wolverine.Shims.NServiceBus; + +public class OrderController : ControllerBase +{ + private readonly IMessageSession _session; + + public OrderController(IMessageSession session) => _session = session; + + [HttpPost] + public async Task PlaceOrder(PlaceOrderRequest request) + { + await _session.Send(new PlaceOrder(request.OrderId)); + return Accepted(); + } +} +``` + +#### NServiceBus-Style Options + +The shims include `SendOptions`, `PublishOptions`, and `ReplyOptions` classes that map to Wolverine's +`DeliveryOptions`: + +```csharp +var options = new SendOptions(); +options.SetDestination("remote-endpoint"); // routes to a named endpoint +options.SetHeader("tenant-id", "acme"); // adds a header +options.DelayDeliveryWith(TimeSpan.FromMinutes(5)); // schedules delivery + +await session.Send(new PlaceOrder("ABC-123"), options); +``` + +#### ITransactionalSession + +`ITransactionalSession` delegates `Send` and `Publish` to `IMessageBus`. The `Open()` and `Commit()` +lifecycle methods are marked `[Obsolete]` and throw `NotSupportedException` because Wolverine handles +transactional messaging automatically via its built-in [outbox](/guide/durability/): + +```csharp +// These methods are obsolete -- just delete the calls +// session.Open(); // throws NotSupportedException +// session.Commit(); // throws NotSupportedException + +// Send and Publish work normally +await session.Send(new PlaceOrder("ABC-123")); +await session.Publish(new OrderPlaced("ABC-123")); +``` + ### Migration Checklist **Phase 1: Coexistence** @@ -623,6 +846,62 @@ to bridge naming differences. See the full [interoperability guide](/tutorials/i For a detailed comparison of MediatR and Wolverine, see the dedicated [Wolverine for MediatR Users](/introduction/from-mediatr) guide. +### MediatR Shim Interfaces + +Wolverine provides shim interfaces in the `Wolverine.Shims.MediatR` namespace that let you keep your existing +MediatR handler signatures working under Wolverine without any code changes. These shims are included in the +core Wolverine NuGet package. + +::: tip +These shim interfaces are marker types that Wolverine's [handler discovery](/guide/handlers/discovery) +recognizes via `IWolverineHandler`. No additional DI registration is needed -- just change your `using` +statements from `MediatR` to `Wolverine.Shims.MediatR` and remove the MediatR NuGet packages. +::: + +#### Available Interfaces + +| MediatR Shim | Purpose | +|-------------|---------| +| `IRequest` | Marker for request messages that return a response of type `T` | +| `IRequest` | Marker for request messages that do not return a response | +| `IRequestHandler` | Handler for requests with a response (extends `IWolverineHandler`) | +| `IRequestHandler` | Handler for requests without a response (extends `IWolverineHandler`) | + +#### Usage + +Simply change the `using` directive from `MediatR` to `Wolverine.Shims.MediatR`: + +```csharp +// Before: using MediatR; +using Wolverine.Shims.MediatR; + +public record CreateOrder(string OrderId) : IRequest; +public record OrderResult(string OrderId, string Status); + +public class CreateOrderHandler : IRequestHandler +{ + public Task Handle(CreateOrder request, CancellationToken cancellationToken) + { + return Task.FromResult(new OrderResult(request.OrderId, "Created")); + } +} +``` + +Invoke using Wolverine's `IMessageBus`: + +```csharp +// Before: var result = await mediator.Send(new CreateOrder("ABC-123")); +var result = await bus.InvokeAsync(new CreateOrder("ABC-123")); +``` + +#### Migration Steps + +1. Replace `using MediatR;` with `using Wolverine.Shims.MediatR;` in your handler files +2. Replace `IMediator.Send()` calls with `IMessageBus.InvokeAsync()` at call sites +3. Replace `IMediator.Publish()` calls with `IMessageBus.PublishAsync()` +4. Remove the MediatR NuGet packages +5. Over time, consider removing the shim interfaces and adopting Wolverine's native convention-based handlers + The key differences in summary: - **No `IRequest` / `IRequestHandler`** -- Wolverine handlers are discovered by convention diff --git a/src/Testing/CoreTests/Shims/masstransit_end_to_end.cs b/src/Testing/CoreTests/Shims/masstransit_end_to_end.cs new file mode 100644 index 000000000..b1d5c2e9e --- /dev/null +++ b/src/Testing/CoreTests/Shims/masstransit_end_to_end.cs @@ -0,0 +1,136 @@ +using JasperFx.Core.Reflection; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine.ComplianceTests; +using Wolverine.Runtime.Handlers; +using Wolverine.Shims.MassTransit; +using Wolverine.Tracking; +using Xunit; +using Xunit.Abstractions; + +namespace CoreTests.Shims; + +public class masstransit_end_to_end : IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private IHost _host = null!; + + public masstransit_end_to_end(ITestOutputHelper output) + { + _output = output; + } + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.IncludeType(); + opts.IncludeType(); + }) + .StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task consumer_handles_message_and_publishes_cascading_event() + { + PlaceOrderConsumer.Reset(); + OrderPlacedCascadeHandler.Reset(); + + var session = await _host.InvokeMessageAndWaitAsync(new MtPlaceOrder("ORD-001")); + + PlaceOrderConsumer.LastOrderId.ShouldBe("ORD-001"); + OrderPlacedCascadeHandler.LastOrderId.ShouldBe("ORD-001"); + } + + [Fact] + public async Task consumer_receives_message_metadata_via_consume_context() + { + PlaceOrderConsumer.Reset(); + + await _host.InvokeMessageAndWaitAsync(new MtPlaceOrder("ORD-002")); + + PlaceOrderConsumer.LastOrderId.ShouldBe("ORD-002"); + PlaceOrderConsumer.ReceivedContext.ShouldBeTrue(); + } + + [Fact] + public void print_generated_code_for_consumer() + { + // Force code generation by resolving the handler + _host.GetRuntime().Handlers.HandlerFor(); + + var graph = _host.Services.GetRequiredService(); + var chain = graph.ChainFor(); + chain.ShouldNotBeNull(); + + _output.WriteLine("=== Generated Code for MtPlaceOrder (MassTransit IConsumer) ==="); + _output.WriteLine(chain.SourceCode); + } + + [Fact] + public void print_generated_code_for_cascaded_handler() + { + // Force code generation by resolving the handler + _host.GetRuntime().Handlers.HandlerFor(); + + var graph = _host.Services.GetRequiredService(); + var chain = graph.ChainFor(); + chain.ShouldNotBeNull(); + + _output.WriteLine("=== Generated Code for MtOrderPlaced cascade handler ==="); + _output.WriteLine(chain.SourceCode); + } +} + +// --- MassTransit shim message types --- + +public record MtPlaceOrder(string OrderId); + +public record MtOrderPlaced(string OrderId); + +// --- MassTransit IConsumer handler --- + +public class PlaceOrderConsumer : IConsumer +{ + public static string? LastOrderId; + public static bool ReceivedContext; + + public static void Reset() + { + LastOrderId = null; + ReceivedContext = false; + } + + public async Task Consume(ConsumeContext context) + { + LastOrderId = context.Message.OrderId; + ReceivedContext = true; + + // Publish a cascading event via the ConsumeContext + await context.Publish(new MtOrderPlaced(context.Message.OrderId)); + } +} + +// --- Handler for the cascaded event --- + +public class OrderPlacedCascadeHandler +{ + public static string? LastOrderId; + + public static void Reset() + { + LastOrderId = null; + } + + public void Handle(MtOrderPlaced message) + { + LastOrderId = message.OrderId; + } +} diff --git a/src/Testing/CoreTests/Shims/masstransit_shim_tests.cs b/src/Testing/CoreTests/Shims/masstransit_shim_tests.cs new file mode 100644 index 000000000..376c6cc16 --- /dev/null +++ b/src/Testing/CoreTests/Shims/masstransit_shim_tests.cs @@ -0,0 +1,226 @@ +using NSubstitute; +using Wolverine.Shims.MassTransit; +using Xunit; + +namespace CoreTests.Shims; + +public class wolverine_consume_context_tests +{ + private readonly IMessageContext _context; + + public wolverine_consume_context_tests() + { + _context = Substitute.For(); + } + + [Fact] + public void message_returns_the_message() + { + var message = new TestMtCommand("hello"); + var consumeContext = new WolverineConsumeContext(_context, message); + + consumeContext.Message.ShouldBe(message); + } + + [Fact] + public void message_id_returns_envelope_id() + { + var envelope = new Envelope { Id = Guid.Parse("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee") }; + _context.Envelope.Returns(envelope); + + var consumeContext = new WolverineConsumeContext(_context, new TestMtCommand("test")); + + consumeContext.MessageId.ShouldBe(Guid.Parse("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee")); + } + + [Fact] + public void message_id_returns_null_when_no_envelope() + { + _context.Envelope.Returns((Envelope?)null); + + var consumeContext = new WolverineConsumeContext(_context, new TestMtCommand("test")); + + consumeContext.MessageId.ShouldBeNull(); + } + + [Fact] + public void correlation_id_delegates_to_context() + { + _context.CorrelationId.Returns("my-correlation"); + + var consumeContext = new WolverineConsumeContext(_context, new TestMtCommand("test")); + + consumeContext.CorrelationId.ShouldBe("my-correlation"); + } + + [Fact] + public void conversation_id_returns_envelope_conversation_id() + { + var conversationId = Guid.NewGuid(); + var envelope = new Envelope { ConversationId = conversationId }; + _context.Envelope.Returns(envelope); + + var consumeContext = new WolverineConsumeContext(_context, new TestMtCommand("test")); + + consumeContext.ConversationId.ShouldBe(conversationId); + } + + [Fact] + public void headers_returns_envelope_headers() + { + var envelope = new Envelope(); + envelope.Headers["key1"] = "value1"; + _context.Envelope.Returns(envelope); + + var consumeContext = new WolverineConsumeContext(_context, new TestMtCommand("test")); + + consumeContext.Headers["key1"].ShouldBe("value1"); + } + + [Fact] + public void headers_returns_empty_when_no_envelope() + { + _context.Envelope.Returns((Envelope?)null); + + var consumeContext = new WolverineConsumeContext(_context, new TestMtCommand("test")); + + consumeContext.Headers.Count.ShouldBe(0); + } + + [Fact] + public async Task publish_delegates_to_context_publish_async() + { + var consumeContext = new WolverineConsumeContext(_context, new TestMtCommand("test")); + var eventMessage = new TestMtEvent("created"); + + await consumeContext.Publish(eventMessage); + + await _context.Received(1).PublishAsync(eventMessage, null); + } + + [Fact] + public async Task send_delegates_to_context_send_async() + { + var consumeContext = new WolverineConsumeContext(_context, new TestMtCommand("test")); + var command = new TestMtCommand("forward"); + + await consumeContext.Send(command); + + await _context.Received(1).SendAsync(command, null); + } + + [Fact] + public async Task send_with_destination_uses_endpoint() + { + var destinationUri = new Uri("rabbitmq://localhost/remote-queue"); + var endpoint = Substitute.For(); + _context.EndpointFor(destinationUri).Returns(endpoint); + + var consumeContext = new WolverineConsumeContext(_context, new TestMtCommand("test")); + var command = new TestMtCommand("forward"); + + await consumeContext.Send(command, destinationUri); + + await endpoint.Received(1).SendAsync(command, Arg.Any()); + } + + [Fact] + public async Task respond_async_delegates_to_respond_to_sender() + { + var consumeContext = new WolverineConsumeContext(_context, new TestMtCommand("test")); + var response = new TestMtResponse("ok"); + + await consumeContext.RespondAsync(response); + + await _context.Received(1).RespondToSenderAsync(response); + } +} + +public class message_bus_implements_masstransit_shim_interfaces +{ + [Fact] + public void message_bus_implements_publish_endpoint() + { + typeof(IPublishEndpoint).IsAssignableFrom(typeof(Wolverine.Runtime.MessageBus)) + .ShouldBeTrue(); + } + + [Fact] + public void message_bus_implements_send_endpoint_provider() + { + typeof(ISendEndpointProvider).IsAssignableFrom(typeof(Wolverine.Runtime.MessageBus)) + .ShouldBeTrue(); + } +} + +public class consume_context_variable_source_tests +{ + private readonly ConsumeContextVariableSource _source = new(); + + [Fact] + public void matches_consume_context_of_t() + { + _source.Matches(typeof(ConsumeContext)).ShouldBeTrue(); + } + + [Fact] + public void does_not_match_unrelated_type() + { + _source.Matches(typeof(string)).ShouldBeFalse(); + } + + [Fact] + public void does_not_match_non_generic_type() + { + _source.Matches(typeof(IMessageBus)).ShouldBeFalse(); + } + + [Fact] + public void creates_variable_of_correct_type() + { + var variable = _source.Create(typeof(ConsumeContext)); + variable.VariableType.ShouldBe(typeof(ConsumeContext)); + } +} + +public class masstransit_interface_variable_source_tests +{ + private readonly MassTransitInterfaceVariableSource _source = new(); + + [Fact] + public void matches_publish_endpoint() + { + _source.Matches(typeof(IPublishEndpoint)).ShouldBeTrue(); + } + + [Fact] + public void matches_send_endpoint_provider() + { + _source.Matches(typeof(ISendEndpointProvider)).ShouldBeTrue(); + } + + [Fact] + public void does_not_match_unrelated_type() + { + _source.Matches(typeof(string)).ShouldBeFalse(); + } + + [Fact] + public void creates_publish_endpoint_variable() + { + var variable = _source.Create(typeof(IPublishEndpoint)); + variable.VariableType.ShouldBe(typeof(IPublishEndpoint)); + } + + [Fact] + public void creates_send_endpoint_provider_variable() + { + var variable = _source.Create(typeof(ISendEndpointProvider)); + variable.VariableType.ShouldBe(typeof(ISendEndpointProvider)); + } +} + +// Test message types +public record TestMtCommand(string Data); +public record TestMtEvent(string Data); +public record TestMtResponse(string Data); diff --git a/src/Testing/CoreTests/Shims/nservicebus_end_to_end.cs b/src/Testing/CoreTests/Shims/nservicebus_end_to_end.cs new file mode 100644 index 000000000..c667150af --- /dev/null +++ b/src/Testing/CoreTests/Shims/nservicebus_end_to_end.cs @@ -0,0 +1,138 @@ +using JasperFx.Core.Reflection; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine.ComplianceTests; +using Wolverine.Runtime.Handlers; +using Wolverine.Shims.NServiceBus; +using Wolverine.Tracking; +using Xunit; +using Xunit.Abstractions; + +namespace CoreTests.Shims; + +public class nservicebus_end_to_end : IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private IHost _host = null!; + + public nservicebus_end_to_end(ITestOutputHelper output) + { + _output = output; + } + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseNServiceBusShims(); + + opts.IncludeType(); + opts.IncludeType(); + }) + .StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task handler_handles_message_and_publishes_cascading_event() + { + NsbSubmitOrderHandler.Reset(); + NsbOrderSubmittedCascadeHandler.Reset(); + + var session = await _host.InvokeMessageAndWaitAsync(new NsbSubmitOrder("ORD-101")); + + NsbSubmitOrderHandler.LastOrderId.ShouldBe("ORD-101"); + NsbOrderSubmittedCascadeHandler.LastOrderId.ShouldBe("ORD-101"); + } + + [Fact] + public async Task handler_receives_message_handler_context() + { + NsbSubmitOrderHandler.Reset(); + + await _host.InvokeMessageAndWaitAsync(new NsbSubmitOrder("ORD-102")); + + NsbSubmitOrderHandler.LastOrderId.ShouldBe("ORD-102"); + NsbSubmitOrderHandler.ReceivedContext.ShouldBeTrue(); + } + + [Fact] + public void print_generated_code_for_handler() + { + // Force code generation by resolving the handler + _host.GetRuntime().Handlers.HandlerFor(); + + var graph = _host.Services.GetRequiredService(); + var chain = graph.ChainFor(); + chain.ShouldNotBeNull(); + + _output.WriteLine("=== Generated Code for NsbSubmitOrder (NServiceBus IHandleMessages) ==="); + _output.WriteLine(chain.SourceCode); + } + + [Fact] + public void print_generated_code_for_cascaded_handler() + { + // Force code generation by resolving the handler + _host.GetRuntime().Handlers.HandlerFor(); + + var graph = _host.Services.GetRequiredService(); + var chain = graph.ChainFor(); + chain.ShouldNotBeNull(); + + _output.WriteLine("=== Generated Code for NsbOrderSubmitted cascade handler ==="); + _output.WriteLine(chain.SourceCode); + } +} + +// --- NServiceBus shim message types --- + +public record NsbSubmitOrder(string OrderId); + +public record NsbOrderSubmitted(string OrderId); + +// --- NServiceBus IHandleMessages handler --- + +public class NsbSubmitOrderHandler : IHandleMessages +{ + public static string? LastOrderId; + public static bool ReceivedContext; + + public static void Reset() + { + LastOrderId = null; + ReceivedContext = false; + } + + public async Task Handle(NsbSubmitOrder message, IMessageHandlerContext context) + { + LastOrderId = message.OrderId; + ReceivedContext = true; + + // Publish a cascading event via the IMessageHandlerContext + await context.Publish(new NsbOrderSubmitted(message.OrderId)); + } +} + +// --- Handler for the cascaded event --- + +public class NsbOrderSubmittedCascadeHandler +{ + public static string? LastOrderId; + + public static void Reset() + { + LastOrderId = null; + } + + public void Handle(NsbOrderSubmitted message) + { + LastOrderId = message.OrderId; + } +} diff --git a/src/Testing/CoreTests/Shims/nservicebus_shim_tests.cs b/src/Testing/CoreTests/Shims/nservicebus_shim_tests.cs new file mode 100644 index 000000000..33d2bf02c --- /dev/null +++ b/src/Testing/CoreTests/Shims/nservicebus_shim_tests.cs @@ -0,0 +1,470 @@ +using NSubstitute; +using Wolverine.Shims.NServiceBus; +using Xunit; + +namespace CoreTests.Shims; + +public class nservicebus_options_tests +{ + [Fact] + public void send_options_sets_headers() + { + var options = new SendOptions(); + options.SetHeader("key1", "value1"); + options.SetHeader("key2", "value2"); + + var delivery = options.ToDeliveryOptions(); + + delivery.Headers["key1"].ShouldBe("value1"); + delivery.Headers["key2"].ShouldBe("value2"); + } + + [Fact] + public void send_options_sets_destination() + { + var options = new SendOptions(); + options.SetDestination("my-endpoint"); + + options.Destination.ShouldBe("my-endpoint"); + } + + [Fact] + public void send_options_delay_delivery() + { + var options = new SendOptions(); + options.DelayDeliveryWith(TimeSpan.FromMinutes(5)); + + var delivery = options.ToDeliveryOptions(); + + delivery.ScheduleDelay.ShouldBe(TimeSpan.FromMinutes(5)); + delivery.ScheduledTime.ShouldBeNull(); + } + + [Fact] + public void send_options_do_not_deliver_before() + { + var scheduledTime = DateTimeOffset.UtcNow.AddHours(1); + var options = new SendOptions(); + options.DoNotDeliverBefore(scheduledTime); + + var delivery = options.ToDeliveryOptions(); + + delivery.ScheduledTime.ShouldBe(scheduledTime); + delivery.ScheduleDelay.ShouldBeNull(); + } + + [Fact] + public void send_options_delay_clears_scheduled_time() + { + var options = new SendOptions(); + options.DoNotDeliverBefore(DateTimeOffset.UtcNow.AddHours(1)); + options.DelayDeliveryWith(TimeSpan.FromMinutes(5)); + + var delivery = options.ToDeliveryOptions(); + + delivery.ScheduleDelay.ShouldBe(TimeSpan.FromMinutes(5)); + delivery.ScheduledTime.ShouldBeNull(); + } + + [Fact] + public void publish_options_sets_headers() + { + var options = new PublishOptions(); + options.SetHeader("event-type", "created"); + + var delivery = options.ToDeliveryOptions(); + + delivery.Headers["event-type"].ShouldBe("created"); + } + + [Fact] + public void reply_options_sets_headers() + { + var options = new ReplyOptions(); + options.SetHeader("reply-key", "reply-value"); + + var delivery = options.ToDeliveryOptions(); + + delivery.Headers["reply-key"].ShouldBe("reply-value"); + } + + [Fact] + public void get_headers_returns_empty_when_none_set() + { + var options = new SendOptions(); + options.GetHeaders().Count.ShouldBe(0); + } +} + +public class wolverine_message_session_tests +{ + private readonly IMessageBus _bus; + private readonly WolverineMessageSession _session; + + public wolverine_message_session_tests() + { + _bus = Substitute.For(); + _session = new WolverineMessageSession(_bus); + } + + [Fact] + public async Task send_delegates_to_bus_send_async() + { + var message = new TestNsbCommand("hello"); + + await _session.Send(message); + + await _bus.Received(1).SendAsync(message, null); + } + + [Fact] + public async Task send_with_options_delegates_with_delivery_options() + { + var message = new TestNsbCommand("hello"); + var options = new SendOptions(); + options.SetHeader("key", "value"); + + await _session.Send(message, options); + + await _bus.Received(1).SendAsync(message, + Arg.Is(d => d.Headers["key"] == "value")); + } + + [Fact] + public async Task send_with_destination_uses_endpoint() + { + var message = new TestNsbCommand("hello"); + var options = new SendOptions(); + options.SetDestination("remote-endpoint"); + + var endpoint = Substitute.For(); + _bus.EndpointFor("remote-endpoint").Returns(endpoint); + + await _session.Send(message, options); + + await endpoint.Received(1).SendAsync(message, Arg.Any()); + await _bus.DidNotReceive().SendAsync(Arg.Any(), Arg.Any()); + } + + [Fact] + public async Task publish_delegates_to_bus_publish_async() + { + var message = new TestNsbEvent("created"); + + await _session.Publish(message); + + await _bus.Received(1).PublishAsync(message, null); + } + + [Fact] + public async Task publish_with_options_delegates_with_delivery_options() + { + var message = new TestNsbEvent("created"); + var options = new PublishOptions(); + options.SetHeader("event-source", "test"); + + await _session.Publish(message, options); + + await _bus.Received(1).PublishAsync(message, + Arg.Is(d => d.Headers["event-source"] == "test")); + } +} + +public class wolverine_endpoint_instance_tests +{ + private readonly IMessageBus _bus; + private readonly Microsoft.Extensions.Hosting.IHost _host; + private readonly WolverineEndpointInstance _instance; + + public wolverine_endpoint_instance_tests() + { + _bus = Substitute.For(); + _host = Substitute.For(); + _instance = new WolverineEndpointInstance(_bus, _host); + } + + [Fact] + public async Task send_delegates_to_bus() + { + var message = new TestNsbCommand("hello"); + + await _instance.Send(message); + + await _bus.Received(1).SendAsync(message, null); + } + + [Fact] + public async Task publish_delegates_to_bus() + { + var message = new TestNsbEvent("created"); + + await _instance.Publish(message); + + await _bus.Received(1).PublishAsync(message, null); + } + + [Fact] + public async Task stop_delegates_to_host() + { + await _instance.Stop(); + + await _host.Received(1).StopAsync(Arg.Any()); + } + + [Fact] + public async Task send_with_destination_uses_endpoint() + { + var message = new TestNsbCommand("hello"); + var options = new SendOptions(); + options.SetDestination("remote"); + + var endpoint = Substitute.For(); + _bus.EndpointFor("remote").Returns(endpoint); + + await _instance.Send(message, options); + + await endpoint.Received(1).SendAsync(message, Arg.Any()); + } +} + +public class wolverine_message_handler_context_tests +{ + private readonly IMessageContext _context; + private readonly WolverineMessageHandlerContext _handlerContext; + + public wolverine_message_handler_context_tests() + { + _context = Substitute.For(); + _handlerContext = new WolverineMessageHandlerContext(_context); + } + + [Fact] + public void message_id_returns_envelope_id() + { + var envelope = new Envelope { Id = Guid.Parse("12345678-1234-1234-1234-123456789012") }; + _context.Envelope.Returns(envelope); + + _handlerContext.MessageId.ShouldBe("12345678-1234-1234-1234-123456789012"); + } + + [Fact] + public void message_id_returns_empty_when_no_envelope() + { + _context.Envelope.Returns((Envelope?)null); + + _handlerContext.MessageId.ShouldBe(string.Empty); + } + + [Fact] + public void reply_to_address_returns_envelope_reply_uri() + { + var envelope = new Envelope { ReplyUri = new Uri("tcp://localhost:1234") }; + _context.Envelope.Returns(envelope); + + _handlerContext.ReplyToAddress.ShouldStartWith("tcp://localhost:1234"); + } + + [Fact] + public void reply_to_address_returns_null_when_no_reply_uri() + { + var envelope = new Envelope(); + _context.Envelope.Returns(envelope); + + _handlerContext.ReplyToAddress.ShouldBeNull(); + } + + [Fact] + public void message_headers_returns_envelope_headers() + { + var envelope = new Envelope(); + envelope.Headers["key1"] = "value1"; + _context.Envelope.Returns(envelope); + + _handlerContext.MessageHeaders["key1"].ShouldBe("value1"); + } + + [Fact] + public void message_headers_returns_empty_when_no_envelope() + { + _context.Envelope.Returns((Envelope?)null); + + _handlerContext.MessageHeaders.Count.ShouldBe(0); + } + + [Fact] + public void correlation_id_delegates_to_context() + { + _context.CorrelationId.Returns("my-correlation-id"); + + _handlerContext.CorrelationId.ShouldBe("my-correlation-id"); + } + + [Fact] + public async Task send_delegates_to_context_send_async() + { + var message = new TestNsbCommand("hello"); + + await _handlerContext.Send(message); + + await _context.Received(1).SendAsync(message, null); + } + + [Fact] + public async Task send_with_destination_uses_endpoint() + { + var message = new TestNsbCommand("hello"); + var options = new SendOptions(); + options.SetDestination("remote"); + + var endpoint = Substitute.For(); + _context.EndpointFor("remote").Returns(endpoint); + + await _handlerContext.Send(message, options); + + await endpoint.Received(1).SendAsync(message, Arg.Any()); + } + + [Fact] + public async Task publish_delegates_to_context_publish_async() + { + var message = new TestNsbEvent("created"); + + await _handlerContext.Publish(message); + + await _context.Received(1).PublishAsync(message, null); + } + + [Fact] + public async Task reply_delegates_to_respond_to_sender() + { + var message = new TestNsbResponse("ok"); + + await _handlerContext.Reply(message); + + await _context.Received(1).RespondToSenderAsync(message); + } + + [Fact] + public async Task forward_current_message_sends_to_destination() + { + var originalMessage = new TestNsbCommand("forward-me"); + var envelope = new Envelope(originalMessage); + _context.Envelope.Returns(envelope); + + var endpoint = Substitute.For(); + _context.EndpointFor("other-endpoint").Returns(endpoint); + + await _handlerContext.ForwardCurrentMessageTo("other-endpoint"); + + await endpoint.Received(1).SendAsync(originalMessage, Arg.Any()); + } + + [Fact] + public async Task forward_current_message_throws_when_no_envelope() + { + _context.Envelope.Returns((Envelope?)null); + + await Should.ThrowAsync(() => + _handlerContext.ForwardCurrentMessageTo("other-endpoint")); + } +} + +public class wolverine_uniform_session_tests +{ + private readonly IMessageBus _bus; + private readonly WolverineUniformSession _session; + + public wolverine_uniform_session_tests() + { + _bus = Substitute.For(); + _session = new WolverineUniformSession(_bus); + } + + [Fact] + public async Task send_delegates_to_bus() + { + var message = new TestNsbCommand("hello"); + + await _session.Send(message); + + await _bus.Received(1).SendAsync(message, null); + } + + [Fact] + public async Task publish_delegates_to_bus() + { + var message = new TestNsbEvent("created"); + + await _session.Publish(message); + + await _bus.Received(1).PublishAsync(message, null); + } + + [Fact] + public async Task send_with_destination_uses_endpoint() + { + var message = new TestNsbCommand("hello"); + var options = new SendOptions(); + options.SetDestination("target"); + + var endpoint = Substitute.For(); + _bus.EndpointFor("target").Returns(endpoint); + + await _session.Send(message, options); + + await endpoint.Received(1).SendAsync(message, Arg.Any()); + } +} + +public class wolverine_transactional_session_tests +{ + private readonly IMessageBus _bus; + private readonly WolverineTransactionalSession _session; + + public wolverine_transactional_session_tests() + { + _bus = Substitute.For(); + _session = new WolverineTransactionalSession(_bus); + } + + [Fact] + public async Task send_delegates_to_bus() + { + var message = new TestNsbCommand("hello"); + + await _session.Send(message); + + await _bus.Received(1).SendAsync(message, null); + } + + [Fact] + public async Task publish_delegates_to_bus() + { + var message = new TestNsbEvent("created"); + + await _session.Publish(message); + + await _bus.Received(1).PublishAsync(message, null); + } + + [Fact] + public async Task open_throws_not_supported() + { +#pragma warning disable CS0618 // Obsolete + await Should.ThrowAsync(() => _session.Open()); +#pragma warning restore CS0618 + } + + [Fact] + public async Task commit_throws_not_supported() + { +#pragma warning disable CS0618 // Obsolete + await Should.ThrowAsync(() => _session.Commit()); +#pragma warning restore CS0618 + } +} + +// Test message types +public record TestNsbCommand(string Data); +public record TestNsbEvent(string Data); +public record TestNsbResponse(string Data); diff --git a/src/Wolverine/Attributes/WolverineMessageWrapperAttribute.cs b/src/Wolverine/Attributes/WolverineMessageWrapperAttribute.cs new file mode 100644 index 000000000..1a2c0c9d9 --- /dev/null +++ b/src/Wolverine/Attributes/WolverineMessageWrapperAttribute.cs @@ -0,0 +1,10 @@ +namespace Wolverine.Attributes; + +/// +/// When applied to a generic interface or class, tells Wolverine that this type wraps a message +/// and that the first generic type argument is the actual message type for handler discovery. +/// This allows Wolverine to correctly route messages to handlers that accept wrapper types +/// such as ConsumeContext<T> from MassTransit shims. +/// +[AttributeUsage(AttributeTargets.Interface | AttributeTargets.Class)] +public class WolverineMessageWrapperAttribute : Attribute; diff --git a/src/Wolverine/Runtime/Handlers/MethodInfoExtensions.cs b/src/Wolverine/Runtime/Handlers/MethodInfoExtensions.cs index 41ae69c2b..7f027cafc 100644 --- a/src/Wolverine/Runtime/Handlers/MethodInfoExtensions.cs +++ b/src/Wolverine/Runtime/Handlers/MethodInfoExtensions.cs @@ -1,4 +1,5 @@ using System.Reflection; +using Wolverine.Attributes; namespace Wolverine.Runtime.Handlers; @@ -19,6 +20,17 @@ public static class MethodInfoExtensions } var parameters = method.GetParameters(); - return parameters.FirstOrDefault()?.ParameterType; + var paramType = parameters.FirstOrDefault()?.ParameterType; + + if (paramType is { IsGenericType: true }) + { + var genericDef = paramType.GetGenericTypeDefinition(); + if (genericDef.GetCustomAttribute() != null) + { + return paramType.GetGenericArguments()[0]; + } + } + + return paramType; } } \ No newline at end of file diff --git a/src/Wolverine/Runtime/MessageBus.MassTransit.cs b/src/Wolverine/Runtime/MessageBus.MassTransit.cs new file mode 100644 index 000000000..2d6b0dc66 --- /dev/null +++ b/src/Wolverine/Runtime/MessageBus.MassTransit.cs @@ -0,0 +1,22 @@ +using Wolverine.Shims.MassTransit; + +namespace Wolverine.Runtime; + +public partial class MessageBus : IPublishEndpoint, ISendEndpointProvider +{ + async Task IPublishEndpoint.Publish(T message) + { + await PublishAsync(message); + } + + async Task ISendEndpointProvider.Send(T message) + { + await SendAsync(message); + } + + async Task ISendEndpointProvider.Send(T message, Uri destinationAddress) + { + var endpoint = EndpointFor(destinationAddress); + await endpoint.SendAsync(message); + } +} diff --git a/src/Wolverine/Runtime/MessageBus.cs b/src/Wolverine/Runtime/MessageBus.cs index ed5c50dde..7de169798 100644 --- a/src/Wolverine/Runtime/MessageBus.cs +++ b/src/Wolverine/Runtime/MessageBus.cs @@ -7,7 +7,7 @@ namespace Wolverine.Runtime; -public class MessageBus : IMessageBus, IMessageContext +public partial class MessageBus : IMessageBus, IMessageContext { public static MessageBus Build(IWolverineRuntime runtime, string correlationId) => new MessageBus(runtime, correlationId); diff --git a/src/Wolverine/Shims/MassTransit/ConsumeContext.cs b/src/Wolverine/Shims/MassTransit/ConsumeContext.cs new file mode 100644 index 000000000..9ff61d532 --- /dev/null +++ b/src/Wolverine/Shims/MassTransit/ConsumeContext.cs @@ -0,0 +1,65 @@ +using Wolverine.Attributes; + +namespace Wolverine.Shims.MassTransit; + +/// +/// MassTransit-compatible consume context interface. +/// Provides access to the message being consumed and messaging operations. +/// +/// The message type +[WolverineMessageWrapper] +public interface ConsumeContext where T : class +{ + /// + /// The message being consumed. + /// + T Message { get; } + + /// + /// The unique identifier of the message. + /// Maps to . + /// + Guid? MessageId { get; } + + /// + /// The correlation identifier for tracking related messages. + /// Maps to . + /// + string? CorrelationId { get; } + + /// + /// The conversation identifier for the logical workflow. + /// Maps to . + /// + Guid? ConversationId { get; } + + /// + /// The headers of the message being consumed. + /// Maps to . + /// + IReadOnlyDictionary Headers { get; } + + /// + /// Publishes an event message. + /// Maps to . + /// + Task Publish(TMessage message) where TMessage : class; + + /// + /// Sends a command message. + /// Maps to . + /// + Task Send(TMessage message) where TMessage : class; + + /// + /// Sends a command message to a specific endpoint. + /// Maps to then . + /// + Task Send(TMessage message, Uri destinationAddress) where TMessage : class; + + /// + /// Sends a response back to the request originator. + /// Maps to . + /// + Task RespondAsync(TMessage message) where TMessage : class; +} diff --git a/src/Wolverine/Shims/MassTransit/ConsumeContextVariableSource.cs b/src/Wolverine/Shims/MassTransit/ConsumeContextVariableSource.cs new file mode 100644 index 000000000..6f19d81fb --- /dev/null +++ b/src/Wolverine/Shims/MassTransit/ConsumeContextVariableSource.cs @@ -0,0 +1,58 @@ +using JasperFx.CodeGeneration; +using JasperFx.CodeGeneration.Frames; +using JasperFx.CodeGeneration.Model; +using JasperFx.Core.Reflection; + +namespace Wolverine.Shims.MassTransit; + +/// +/// Code generation variable source that creates +/// instances from the current and the message. +/// +internal class ConsumeContextVariableSource : IVariableSource +{ + public bool Matches(Type type) + { + return type.Closes(typeof(ConsumeContext<>)); + } + + public Variable Create(Type type) + { + var messageType = type.GetGenericArguments()[0]; + return new ConsumeContextFrame(messageType).Variable; + } +} + +internal class ConsumeContextFrame : SyncFrame +{ + private readonly Type _messageType; + private Variable? _context; + private Variable? _message; + + public ConsumeContextFrame(Type messageType) + { + _messageType = messageType; + var consumeContextType = typeof(WolverineConsumeContext<>).MakeGenericType(messageType); + Variable = new Variable(typeof(ConsumeContext<>).MakeGenericType(messageType), this); + } + + public Variable Variable { get; } + + public override IEnumerable FindVariables(IMethodVariables chain) + { + _context = chain.FindVariable(typeof(IMessageContext)); + yield return _context; + + _message = chain.FindVariable(_messageType); + yield return _message; + } + + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + var concreteType = typeof(WolverineConsumeContext<>).MakeGenericType(_messageType); + writer.WriteLine( + $"var {Variable.Usage} = new {concreteType.FullNameInCode()}({_context!.Usage}, {_message!.Usage});"); + + Next?.GenerateCode(method, writer); + } +} diff --git a/src/Wolverine/Shims/MassTransit/IConsumer.cs b/src/Wolverine/Shims/MassTransit/IConsumer.cs new file mode 100644 index 000000000..8664c3623 --- /dev/null +++ b/src/Wolverine/Shims/MassTransit/IConsumer.cs @@ -0,0 +1,16 @@ +namespace Wolverine.Shims.MassTransit; + +/// +/// MassTransit-compatible consumer interface. +/// Implementing this interface marks the class for Wolverine's handler discovery +/// via . +/// +/// The message type to consume +public interface IConsumer : IWolverineHandler where T : class +{ + /// + /// Consumes a message of type . + /// + /// The consume context providing access to the message and messaging operations + Task Consume(ConsumeContext context); +} diff --git a/src/Wolverine/Shims/MassTransit/IPublishEndpoint.cs b/src/Wolverine/Shims/MassTransit/IPublishEndpoint.cs new file mode 100644 index 000000000..d3c1338b1 --- /dev/null +++ b/src/Wolverine/Shims/MassTransit/IPublishEndpoint.cs @@ -0,0 +1,13 @@ +namespace Wolverine.Shims.MassTransit; + +/// +/// MassTransit-compatible interface for publishing event messages. +/// Delegates to Wolverine's . +/// +public interface IPublishEndpoint +{ + /// + /// Publishes an event message to all subscribers. + /// + Task Publish(T message) where T : class; +} diff --git a/src/Wolverine/Shims/MassTransit/ISendEndpointProvider.cs b/src/Wolverine/Shims/MassTransit/ISendEndpointProvider.cs new file mode 100644 index 000000000..1f2a02e20 --- /dev/null +++ b/src/Wolverine/Shims/MassTransit/ISendEndpointProvider.cs @@ -0,0 +1,18 @@ +namespace Wolverine.Shims.MassTransit; + +/// +/// MassTransit-compatible interface for sending command messages. +/// Delegates to Wolverine's . +/// +public interface ISendEndpointProvider +{ + /// + /// Sends a command message. + /// + Task Send(T message) where T : class; + + /// + /// Sends a command message to a specific endpoint. + /// + Task Send(T message, Uri destinationAddress) where T : class; +} diff --git a/src/Wolverine/Shims/MassTransit/MassTransitInterfaceVariableSource.cs b/src/Wolverine/Shims/MassTransit/MassTransitInterfaceVariableSource.cs new file mode 100644 index 000000000..7fc8718c7 --- /dev/null +++ b/src/Wolverine/Shims/MassTransit/MassTransitInterfaceVariableSource.cs @@ -0,0 +1,26 @@ +using JasperFx.CodeGeneration.Model; + +namespace Wolverine.Shims.MassTransit; + +/// +/// Code generation variable source that provides and +/// as cast variables from the current message context. +/// Since directly implements both interfaces, +/// the generated code simply casts the existing context variable. +/// +internal class MassTransitInterfaceVariableSource : IVariableSource +{ + public bool Matches(Type type) + { + return type == typeof(IPublishEndpoint) || type == typeof(ISendEndpointProvider); + } + + public Variable Create(Type type) + { + // The "context" variable is a MessageContext (which extends MessageBus), + // and MessageBus implements IPublishEndpoint and ISendEndpointProvider. + // Create a cast from the existing context variable. + var contextVariable = new Variable(typeof(IMessageContext), "context"); + return new CastVariable(contextVariable, type); + } +} diff --git a/src/Wolverine/Shims/MassTransit/WolverineConsumeContext.cs b/src/Wolverine/Shims/MassTransit/WolverineConsumeContext.cs new file mode 100644 index 000000000..8802eacb8 --- /dev/null +++ b/src/Wolverine/Shims/MassTransit/WolverineConsumeContext.cs @@ -0,0 +1,49 @@ +namespace Wolverine.Shims.MassTransit; + +/// +/// Wolverine-backed implementation of . +/// Delegates all operations to . +/// +/// The message type +public class WolverineConsumeContext : ConsumeContext where T : class +{ + private readonly IMessageContext _context; + + public WolverineConsumeContext(IMessageContext context, T message) + { + _context = context; + Message = message; + } + + public T Message { get; } + + public Guid? MessageId => _context.Envelope?.Id; + + public string? CorrelationId => _context.CorrelationId; + + public Guid? ConversationId => _context.Envelope?.ConversationId; + + public IReadOnlyDictionary Headers => + _context.Envelope?.Headers ?? (IReadOnlyDictionary)new Dictionary(); + + public async Task Publish(TMessage message) where TMessage : class + { + await _context.PublishAsync(message); + } + + public async Task Send(TMessage message) where TMessage : class + { + await _context.SendAsync(message); + } + + public async Task Send(TMessage message, Uri destinationAddress) where TMessage : class + { + var endpoint = _context.EndpointFor(destinationAddress); + await endpoint.SendAsync(message); + } + + public async Task RespondAsync(TMessage message) where TMessage : class + { + await _context.RespondToSenderAsync(message); + } +} diff --git a/src/Wolverine/Shims/NServiceBus/ExtendableOptions.cs b/src/Wolverine/Shims/NServiceBus/ExtendableOptions.cs new file mode 100644 index 000000000..3ced98eb3 --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/ExtendableOptions.cs @@ -0,0 +1,50 @@ +namespace Wolverine.Shims.NServiceBus; + +/// +/// Base class for NServiceBus-compatible message options. +/// Maps internally to Wolverine's . +/// +public abstract class ExtendableOptions +{ + private Dictionary? _headers; + + /// + /// Sets a header key/value pair on the outgoing message. + /// + public void SetHeader(string key, string value) + { + _headers ??= new Dictionary(); + _headers[key] = value; + } + + /// + /// Gets all headers set on this options instance. + /// + public IReadOnlyDictionary GetHeaders() + { + return _headers ?? (IReadOnlyDictionary)new Dictionary(); + } + + /// + /// Sets the message identity for this options instance. + /// + public string? MessageId { get; set; } + + /// + /// Converts these NServiceBus-compatible options to Wolverine's . + /// + internal virtual DeliveryOptions ToDeliveryOptions() + { + var options = new DeliveryOptions(); + + if (_headers is { Count: > 0 }) + { + foreach (var kvp in _headers) + { + options.Headers[kvp.Key] = kvp.Value; + } + } + + return options; + } +} diff --git a/src/Wolverine/Shims/NServiceBus/IEndpointInstance.cs b/src/Wolverine/Shims/NServiceBus/IEndpointInstance.cs new file mode 100644 index 000000000..bd5df1725 --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/IEndpointInstance.cs @@ -0,0 +1,15 @@ +namespace Wolverine.Shims.NServiceBus; + +/// +/// NServiceBus-compatible interface representing a running endpoint instance. +/// Extends with lifecycle management. +/// Delegates to Wolverine's . +/// +public interface IEndpointInstance : IMessageSession +{ + /// + /// Stops the endpoint instance. + /// Maps to stopping the underlying host. + /// + Task Stop(); +} diff --git a/src/Wolverine/Shims/NServiceBus/IHandleMessages.cs b/src/Wolverine/Shims/NServiceBus/IHandleMessages.cs new file mode 100644 index 000000000..1318be93b --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/IHandleMessages.cs @@ -0,0 +1,17 @@ +namespace Wolverine.Shims.NServiceBus; + +/// +/// NServiceBus-compatible message handler interface. +/// Implementing this interface marks the class for Wolverine's handler discovery +/// via . +/// +/// The message type to handle +public interface IHandleMessages : IWolverineHandler +{ + /// + /// Handles a message of type . + /// + /// The message to handle + /// The handler context providing access to message metadata and messaging operations + Task Handle(T message, IMessageHandlerContext context); +} diff --git a/src/Wolverine/Shims/NServiceBus/IMessageHandlerContext.cs b/src/Wolverine/Shims/NServiceBus/IMessageHandlerContext.cs new file mode 100644 index 000000000..4bffc0b92 --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/IMessageHandlerContext.cs @@ -0,0 +1,57 @@ +namespace Wolverine.Shims.NServiceBus; + +/// +/// NServiceBus-compatible interface available during message handling. +/// Provides access to message metadata and the ability to send, publish, and reply. +/// Delegates to Wolverine's . +/// +public interface IMessageHandlerContext +{ + /// + /// The unique identifier of the message currently being handled. + /// Maps to . + /// + string MessageId { get; } + + /// + /// The address to which replies should be sent. + /// Maps to . + /// + string? ReplyToAddress { get; } + + /// + /// The headers of the message currently being handled. + /// Maps to . + /// + IReadOnlyDictionary MessageHeaders { get; } + + /// + /// The correlation identifier for the current message workflow. + /// Maps to . + /// + string? CorrelationId { get; } + + /// + /// Sends a command message. + /// Maps to . + /// + Task Send(object message, SendOptions? options = null); + + /// + /// Publishes an event message. + /// Maps to . + /// + Task Publish(object message, PublishOptions? options = null); + + /// + /// Sends a reply back to the originator of the current message. + /// Maps to . + /// + Task Reply(object message, ReplyOptions? options = null); + + /// + /// Sends the current message to a different endpoint for processing. + /// Maps to sending the current message body to the specified destination. + /// + Task ForwardCurrentMessageTo(string destination); +} diff --git a/src/Wolverine/Shims/NServiceBus/IMessageSession.cs b/src/Wolverine/Shims/NServiceBus/IMessageSession.cs new file mode 100644 index 000000000..72823699f --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/IMessageSession.cs @@ -0,0 +1,20 @@ +namespace Wolverine.Shims.NServiceBus; + +/// +/// NServiceBus-compatible interface for sending and publishing messages outside of a message handler. +/// Delegates to Wolverine's . +/// +public interface IMessageSession +{ + /// + /// Sends a command message. In NServiceBus, Send is for commands (point-to-point). + /// Maps to . + /// + Task Send(object message, SendOptions? options = null); + + /// + /// Publishes an event message. In NServiceBus, Publish is for events (pub-sub). + /// Maps to . + /// + Task Publish(object message, PublishOptions? options = null); +} diff --git a/src/Wolverine/Shims/NServiceBus/ITransactionalSession.cs b/src/Wolverine/Shims/NServiceBus/ITransactionalSession.cs new file mode 100644 index 000000000..865ed2f91 --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/ITransactionalSession.cs @@ -0,0 +1,22 @@ +namespace Wolverine.Shims.NServiceBus; + +/// +/// NServiceBus-compatible transactional session interface. +/// Extends with transaction lifecycle. +/// Wolverine handles transactional messaging automatically, so the Open/Commit +/// methods are obsolete and will throw . +/// +public interface ITransactionalSession : IUniformSession +{ + /// + /// Opens the transactional session. + /// + [Obsolete("Wolverine handles transactions automatically. Delete this usage.")] + Task Open(); + + /// + /// Commits the transactional session. + /// + [Obsolete("Wolverine handles transactions automatically. Delete this usage.")] + Task Commit(); +} diff --git a/src/Wolverine/Shims/NServiceBus/IUniformSession.cs b/src/Wolverine/Shims/NServiceBus/IUniformSession.cs new file mode 100644 index 000000000..aa3969f24 --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/IUniformSession.cs @@ -0,0 +1,21 @@ +namespace Wolverine.Shims.NServiceBus; + +/// +/// NServiceBus-compatible unified interface for sending and publishing that works +/// both inside and outside of message handlers. +/// Delegates to Wolverine's . +/// +public interface IUniformSession +{ + /// + /// Sends a command message. + /// Maps to . + /// + Task Send(object message, SendOptions? options = null); + + /// + /// Publishes an event message. + /// Maps to . + /// + Task Publish(object message, PublishOptions? options = null); +} diff --git a/src/Wolverine/Shims/NServiceBus/MessageHandlerContextVariableSource.cs b/src/Wolverine/Shims/NServiceBus/MessageHandlerContextVariableSource.cs new file mode 100644 index 000000000..f04273a6c --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/MessageHandlerContextVariableSource.cs @@ -0,0 +1,51 @@ +using JasperFx.CodeGeneration; +using JasperFx.CodeGeneration.Frames; +using JasperFx.CodeGeneration.Model; +using JasperFx.Core.Reflection; + +namespace Wolverine.Shims.NServiceBus; + +/// +/// Code generation variable source that creates +/// instances from the current . +/// This eliminates the need for service location when resolving +/// in handler methods. +/// +internal class MessageHandlerContextVariableSource : IVariableSource +{ + public bool Matches(Type type) + { + return type == typeof(IMessageHandlerContext); + } + + public Variable Create(Type type) + { + return new MessageHandlerContextFrame().Variable; + } +} + +internal class MessageHandlerContextFrame : SyncFrame +{ + private Variable? _context; + + public MessageHandlerContextFrame() + { + Variable = new Variable(typeof(IMessageHandlerContext), this); + } + + public Variable Variable { get; } + + public override IEnumerable FindVariables(IMethodVariables chain) + { + _context = chain.FindVariable(typeof(IMessageContext)); + yield return _context; + } + + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + writer.WriteLine( + $"var {Variable.Usage} = new {typeof(WolverineMessageHandlerContext).FullNameInCode()}({_context!.Usage});"); + + Next?.GenerateCode(method, writer); + } +} diff --git a/src/Wolverine/Shims/NServiceBus/NServiceBusShimExtensions.cs b/src/Wolverine/Shims/NServiceBus/NServiceBusShimExtensions.cs new file mode 100644 index 000000000..a08e91cb9 --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/NServiceBusShimExtensions.cs @@ -0,0 +1,36 @@ +using Microsoft.Extensions.DependencyInjection; + +namespace Wolverine.Shims.NServiceBus; + +/// +/// Extension methods to register NServiceBus shim interfaces with Wolverine. +/// +public static class NServiceBusShimExtensions +{ + /// + /// Registers NServiceBus shim DI services for constructor injection. + /// is automatically resolved in handler methods + /// via code generation and does not require this call. + /// This registers , , + /// , and for + /// constructor injection in services outside of message handlers. + /// + public static WolverineOptions UseNServiceBusShims(this WolverineOptions options) + { + options.Services.AddScoped(sp => + new WolverineMessageSession(sp.GetRequiredService())); + + options.Services.AddScoped(sp => + new WolverineEndpointInstance( + sp.GetRequiredService(), + sp.GetRequiredService())); + + options.Services.AddScoped(sp => + new WolverineUniformSession(sp.GetRequiredService())); + + options.Services.AddScoped(sp => + new WolverineTransactionalSession(sp.GetRequiredService())); + + return options; + } +} diff --git a/src/Wolverine/Shims/NServiceBus/PublishOptions.cs b/src/Wolverine/Shims/NServiceBus/PublishOptions.cs new file mode 100644 index 000000000..1419fd59e --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/PublishOptions.cs @@ -0,0 +1,9 @@ +namespace Wolverine.Shims.NServiceBus; + +/// +/// NServiceBus-compatible options for publishing an event message. +/// Maps internally to Wolverine's . +/// +public class PublishOptions : ExtendableOptions +{ +} diff --git a/src/Wolverine/Shims/NServiceBus/ReplyOptions.cs b/src/Wolverine/Shims/NServiceBus/ReplyOptions.cs new file mode 100644 index 000000000..03767627f --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/ReplyOptions.cs @@ -0,0 +1,9 @@ +namespace Wolverine.Shims.NServiceBus; + +/// +/// NServiceBus-compatible options for replying to the sender of the current message. +/// Maps internally to Wolverine's . +/// +public class ReplyOptions : ExtendableOptions +{ +} diff --git a/src/Wolverine/Shims/NServiceBus/SendOptions.cs b/src/Wolverine/Shims/NServiceBus/SendOptions.cs new file mode 100644 index 000000000..4009a0f3f --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/SendOptions.cs @@ -0,0 +1,62 @@ +namespace Wolverine.Shims.NServiceBus; + +/// +/// NServiceBus-compatible options for sending a command message. +/// Maps internally to Wolverine's . +/// +public class SendOptions : ExtendableOptions +{ + /// + /// Sets the destination endpoint for this send operation. + /// Maps to Wolverine's . + /// + public string? Destination { get; set; } + + /// + /// Sets the destination endpoint for the send operation. + /// + public void SetDestination(string destination) + { + Destination = destination; + } + + /// + /// Delays the delivery of the message by the specified time span. + /// Maps to . + /// + public void DelayDeliveryWith(TimeSpan delay) + { + Delay = delay; + ScheduledTime = null; + } + + /// + /// Delays the delivery of the message until the specified time. + /// Maps to . + /// + public void DoNotDeliverBefore(DateTimeOffset at) + { + ScheduledTime = at; + Delay = null; + } + + internal TimeSpan? Delay { get; set; } + internal DateTimeOffset? ScheduledTime { get; set; } + + internal override DeliveryOptions ToDeliveryOptions() + { + var options = base.ToDeliveryOptions(); + + if (Delay.HasValue) + { + options.ScheduleDelay = Delay; + } + + if (ScheduledTime.HasValue) + { + options.ScheduledTime = ScheduledTime; + } + + return options; + } +} diff --git a/src/Wolverine/Shims/NServiceBus/WolverineEndpointInstance.cs b/src/Wolverine/Shims/NServiceBus/WolverineEndpointInstance.cs new file mode 100644 index 000000000..2ad8a82e6 --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/WolverineEndpointInstance.cs @@ -0,0 +1,44 @@ +using Microsoft.Extensions.Hosting; + +namespace Wolverine.Shims.NServiceBus; + +/// +/// Wolverine-backed implementation of . +/// Delegates messaging to and lifecycle to . +/// +public class WolverineEndpointInstance : IEndpointInstance +{ + private readonly IMessageBus _bus; + private readonly IHost _host; + + public WolverineEndpointInstance(IMessageBus bus, IHost host) + { + _bus = bus; + _host = host; + } + + public async Task Send(object message, SendOptions? options = null) + { + var deliveryOptions = options?.ToDeliveryOptions(); + + if (options?.Destination != null) + { + var endpoint = _bus.EndpointFor(options.Destination); + await endpoint.SendAsync(message, deliveryOptions); + } + else + { + await _bus.SendAsync(message, deliveryOptions); + } + } + + public async Task Publish(object message, PublishOptions? options = null) + { + await _bus.PublishAsync(message, options?.ToDeliveryOptions()); + } + + public async Task Stop() + { + await _host.StopAsync(); + } +} diff --git a/src/Wolverine/Shims/NServiceBus/WolverineMessageHandlerContext.cs b/src/Wolverine/Shims/NServiceBus/WolverineMessageHandlerContext.cs new file mode 100644 index 000000000..b1f705dab --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/WolverineMessageHandlerContext.cs @@ -0,0 +1,60 @@ +namespace Wolverine.Shims.NServiceBus; + +/// +/// Wolverine-backed implementation of . +/// Delegates all operations to . +/// +public class WolverineMessageHandlerContext : IMessageHandlerContext +{ + private readonly IMessageContext _context; + + public WolverineMessageHandlerContext(IMessageContext context) + { + _context = context; + } + + public string MessageId => _context.Envelope?.Id.ToString() ?? string.Empty; + + public string? ReplyToAddress => _context.Envelope?.ReplyUri?.ToString(); + + public IReadOnlyDictionary MessageHeaders => + _context.Envelope?.Headers ?? (IReadOnlyDictionary)new Dictionary(); + + public string? CorrelationId => _context.CorrelationId; + + public async Task Send(object message, SendOptions? options = null) + { + var deliveryOptions = options?.ToDeliveryOptions(); + + if (options?.Destination != null) + { + var endpoint = _context.EndpointFor(options.Destination); + await endpoint.SendAsync(message, deliveryOptions); + } + else + { + await _context.SendAsync(message, deliveryOptions); + } + } + + public async Task Publish(object message, PublishOptions? options = null) + { + await _context.PublishAsync(message, options?.ToDeliveryOptions()); + } + + public async Task Reply(object message, ReplyOptions? options = null) + { + await _context.RespondToSenderAsync(message); + } + + public async Task ForwardCurrentMessageTo(string destination) + { + if (_context.Envelope?.Message == null) + { + throw new InvalidOperationException("No current message to forward."); + } + + var endpoint = _context.EndpointFor(destination); + await endpoint.SendAsync(_context.Envelope.Message); + } +} diff --git a/src/Wolverine/Shims/NServiceBus/WolverineMessageSession.cs b/src/Wolverine/Shims/NServiceBus/WolverineMessageSession.cs new file mode 100644 index 000000000..87610e330 --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/WolverineMessageSession.cs @@ -0,0 +1,35 @@ +namespace Wolverine.Shims.NServiceBus; + +/// +/// Wolverine-backed implementation of . +/// Delegates all operations to . +/// +public class WolverineMessageSession : IMessageSession +{ + private readonly IMessageBus _bus; + + public WolverineMessageSession(IMessageBus bus) + { + _bus = bus; + } + + public async Task Send(object message, SendOptions? options = null) + { + var deliveryOptions = options?.ToDeliveryOptions(); + + if (options?.Destination != null) + { + var endpoint = _bus.EndpointFor(options.Destination); + await endpoint.SendAsync(message, deliveryOptions); + } + else + { + await _bus.SendAsync(message, deliveryOptions); + } + } + + public async Task Publish(object message, PublishOptions? options = null) + { + await _bus.PublishAsync(message, options?.ToDeliveryOptions()); + } +} diff --git a/src/Wolverine/Shims/NServiceBus/WolverineTransactionalSession.cs b/src/Wolverine/Shims/NServiceBus/WolverineTransactionalSession.cs new file mode 100644 index 000000000..74ae3b3db --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/WolverineTransactionalSession.cs @@ -0,0 +1,50 @@ +namespace Wolverine.Shims.NServiceBus; + +/// +/// Wolverine-backed implementation of . +/// Delegates messaging to . +/// Open/Commit are not supported because Wolverine handles transactions automatically. +/// +public class WolverineTransactionalSession : ITransactionalSession +{ + private readonly IMessageBus _bus; + + public WolverineTransactionalSession(IMessageBus bus) + { + _bus = bus; + } + + public async Task Send(object message, SendOptions? options = null) + { + var deliveryOptions = options?.ToDeliveryOptions(); + + if (options?.Destination != null) + { + var endpoint = _bus.EndpointFor(options.Destination); + await endpoint.SendAsync(message, deliveryOptions); + } + else + { + await _bus.SendAsync(message, deliveryOptions); + } + } + + public async Task Publish(object message, PublishOptions? options = null) + { + await _bus.PublishAsync(message, options?.ToDeliveryOptions()); + } + + [Obsolete("Wolverine handles transactions automatically. Delete this usage.")] + public Task Open() + { + throw new NotSupportedException( + "Wolverine handles transactions automatically. Remove calls to Open()."); + } + + [Obsolete("Wolverine handles transactions automatically. Delete this usage.")] + public Task Commit() + { + throw new NotSupportedException( + "Wolverine handles transactions automatically. Remove calls to Commit()."); + } +} diff --git a/src/Wolverine/Shims/NServiceBus/WolverineUniformSession.cs b/src/Wolverine/Shims/NServiceBus/WolverineUniformSession.cs new file mode 100644 index 000000000..353cd3ad9 --- /dev/null +++ b/src/Wolverine/Shims/NServiceBus/WolverineUniformSession.cs @@ -0,0 +1,35 @@ +namespace Wolverine.Shims.NServiceBus; + +/// +/// Wolverine-backed implementation of . +/// Delegates all operations to . +/// +public class WolverineUniformSession : IUniformSession +{ + private readonly IMessageBus _bus; + + public WolverineUniformSession(IMessageBus bus) + { + _bus = bus; + } + + public async Task Send(object message, SendOptions? options = null) + { + var deliveryOptions = options?.ToDeliveryOptions(); + + if (options?.Destination != null) + { + var endpoint = _bus.EndpointFor(options.Destination); + await endpoint.SendAsync(message, deliveryOptions); + } + else + { + await _bus.SendAsync(message, deliveryOptions); + } + } + + public async Task Publish(object message, PublishOptions? options = null) + { + await _bus.PublishAsync(message, options?.ToDeliveryOptions()); + } +} diff --git a/src/Wolverine/WolverineOptions.cs b/src/Wolverine/WolverineOptions.cs index bdc4a6258..7cae2d432 100644 --- a/src/Wolverine/WolverineOptions.cs +++ b/src/Wolverine/WolverineOptions.cs @@ -117,6 +117,14 @@ public WolverineOptions(string? assemblyName) CodeGeneration = new GenerationRules("Internal.Generated"); CodeGeneration.Sources.Add(new NowTimeVariableSource()); CodeGeneration.Sources.Add(new TenantIdSource()); + + // MassTransit shim variable sources + CodeGeneration.Sources.Add(new Shims.MassTransit.ConsumeContextVariableSource()); + CodeGeneration.Sources.Add(new Shims.MassTransit.MassTransitInterfaceVariableSource()); + + // NServiceBus shim variable sources + CodeGeneration.Sources.Add(new Shims.NServiceBus.MessageHandlerContextVariableSource()); + CodeGeneration.Assemblies.Add(GetType().Assembly); if (assemblyName != null)