diff --git a/src/Persistence/Wolverine.Marten/Subscriptions/InlineInvoker.cs b/src/Persistence/Wolverine.Marten/Subscriptions/InlineInvoker.cs index 4a5680d53..98b8187ef 100644 --- a/src/Persistence/Wolverine.Marten/Subscriptions/InlineInvoker.cs +++ b/src/Persistence/Wolverine.Marten/Subscriptions/InlineInvoker.cs @@ -30,7 +30,7 @@ public override async Task ProcessEventsAsync(EventRange page, ISubscriptionCont try { - await invoker.InvokeAsync(@event, (MessageBus)bus, cancellationToken, tenantId: @event.TenantId); + await invoker.InvokeAsync(@event, (MessageBus)bus, cancellationToken, options: new DeliveryOptions{TenantId = @event.TenantId}); sequence = @event.Sequence; } catch (Exception e) diff --git a/src/Persistence/Wolverine.Marten/Subscriptions/InnerDataInvoker.cs b/src/Persistence/Wolverine.Marten/Subscriptions/InnerDataInvoker.cs index 275ffe0e6..70e0432b3 100644 --- a/src/Persistence/Wolverine.Marten/Subscriptions/InnerDataInvoker.cs +++ b/src/Persistence/Wolverine.Marten/Subscriptions/InnerDataInvoker.cs @@ -14,18 +14,20 @@ public InnerDataInvoker(IMessageInvoker inner) _inner = inner; } - public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, TimeSpan? timeout = null, - string? tenantId = null) + public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, + TimeSpan? timeout = null, + DeliveryOptions? options = null) { throw new NotSupportedException(); } - public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, TimeSpan? timeout = null, - string? tenantId = null) + public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, + TimeSpan? timeout = null, + DeliveryOptions? options = null) { if (message is IEvent e) { - return _inner.InvokeAsync(e.Data, bus, cancellation, timeout, tenantId); + return _inner.InvokeAsync(e.Data, bus, cancellation, timeout, options); } return Task.CompletedTask; diff --git a/src/Persistence/Wolverine.Marten/Subscriptions/NulloMessageInvoker.cs b/src/Persistence/Wolverine.Marten/Subscriptions/NulloMessageInvoker.cs index 211df203d..be292be7e 100644 --- a/src/Persistence/Wolverine.Marten/Subscriptions/NulloMessageInvoker.cs +++ b/src/Persistence/Wolverine.Marten/Subscriptions/NulloMessageInvoker.cs @@ -5,14 +5,16 @@ namespace Wolverine.Marten.Subscriptions; internal class NulloMessageInvoker : IMessageInvoker { - public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, TimeSpan? timeout = null, - string? tenantId = null) + public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, + TimeSpan? timeout = null, + DeliveryOptions? options = null) { throw new NotSupportedException(); } - public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, TimeSpan? timeout = null, - string? tenantId = null) + public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, + TimeSpan? timeout = null, + DeliveryOptions? options = null) { return Task.CompletedTask; } diff --git a/src/Samples/DocumentationSamples/EnqueueSamples.cs b/src/Samples/DocumentationSamples/EnqueueSamples.cs index f8c22f220..7a186c96a 100644 --- a/src/Samples/DocumentationSamples/EnqueueSamples.cs +++ b/src/Samples/DocumentationSamples/EnqueueSamples.cs @@ -12,6 +12,11 @@ public static async Task invoke_locally(IMessageBus bus) { // Execute the message inline await bus.InvokeAsync(new Message1()); + + // Execute the message inline, but this time pass in + // messaging metadata for Wolverine + await bus.InvokeAsync(new Message1(), + new DeliveryOptions { TenantId = "one", SagaId = "two" }.WithHeader("user.id", "admin")); } #endregion diff --git a/src/Samples/DocumentationSamples/MessageBusBasics.cs b/src/Samples/DocumentationSamples/MessageBusBasics.cs index 4ceaccd5d..b4225157a 100644 --- a/src/Samples/DocumentationSamples/MessageBusBasics.cs +++ b/src/Samples/DocumentationSamples/MessageBusBasics.cs @@ -49,6 +49,13 @@ public async Task invoke_debit_account(IMessageBus bus) public async Task invoke_math_operations(IMessageBus bus) { var results = await bus.InvokeAsync(new Numbers(3, 4)); + + // Same functionality, but this time we'll configure the active + // tenant id and add a message header + var results2 = await bus.InvokeAsync(new Numbers(5, 6), new DeliveryOptions + { + TenantId = "north.america" + }.WithHeader("user.id", "professor")); } #endregion diff --git a/src/Testing/MessageRoutingTests/MessageRoutingContext.cs b/src/Testing/MessageRoutingTests/MessageRoutingContext.cs index 7cfab9763..d902ca8bb 100644 --- a/src/Testing/MessageRoutingTests/MessageRoutingContext.cs +++ b/src/Testing/MessageRoutingTests/MessageRoutingContext.cs @@ -41,6 +41,7 @@ protected void assertExternalListenersAre(string uriList) var actual = runtime .Endpoints .ActiveListeners() + .Where(x => x.Uri.Scheme != "stub") .Where(x => x.Endpoint.Role == EndpointRole.Application) .OrderBy(x => x.Uri.ToString()) .Select(x => x.Uri).ToArray(); diff --git a/src/Testing/SlowTests/invoke_async_with_delivery_options.cs b/src/Testing/SlowTests/invoke_async_with_delivery_options.cs new file mode 100644 index 000000000..f03ad8e31 --- /dev/null +++ b/src/Testing/SlowTests/invoke_async_with_delivery_options.cs @@ -0,0 +1,118 @@ +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.ComplianceTests; +using Wolverine.Tracking; +using Wolverine.Transports.Tcp; +using Wolverine.Util; +using Xunit; + +namespace SlowTests; + +public class invoke_async_with_delivery_options : IAsyncLifetime +{ + private IHost _publisher; + private IHost _receiver; + + public async Task InitializeAsync() + { + var publisherPort = PortFinder.GetAvailablePort(); + var receiverPort = PortFinder.GetAvailablePort(); + + + _publisher = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.DisableConventionalDiscovery(); + opts.PublishAllMessages().ToPort(receiverPort); + opts.ListenAtPort(publisherPort); + }).StartAsync(); + + _receiver = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.ListenAtPort(receiverPort); + }).StartAsync(); + + } + + public async Task DisposeAsync() + { + await _publisher.StopAsync(); + await _receiver.StopAsync(); + } + + [Fact] + public async Task invoke_locally() + { + var bus = _receiver.MessageBus(); + await bus.InvokeAsync(new WithHeaders(), + new DeliveryOptions { TenantId = "millers" }.WithHeader("name", "Chewie") + .WithHeader("breed", "indeterminate")); + } + + [Fact] + public async Task invoke_with_expected_outcome_locally() + { + var bus = _receiver.MessageBus(); + var answer = await bus.InvokeAsync(new DoMath(3, 4, "blue", "tom"), + new DeliveryOptions { TenantId = "blue" }.WithHeader("user-id", "tom")); + + answer.Sum.ShouldBe(7); + } + + [Fact] + public async Task invoke_remotely() + { + var tracked = await _publisher.TrackActivity() + .AlsoTrack(_receiver) + .IncludeExternalTransports() + .ExecuteAndWaitAsync(c => c.InvokeAsync(new WithHeaders(), + new DeliveryOptions { TenantId = "millers" }.WithHeader("name", "Chewie") + .WithHeader("breed", "indeterminate"))); + + var envelope = tracked.Received.SingleEnvelope(); + envelope.TenantId.ShouldBe("millers"); + envelope.Headers["name"].ShouldBe("Chewie"); + } + + [Fact] + public async Task invoke_with_expected_outcome_remotely() + { + Answer answer = null; + Func action = async c => answer = await c.InvokeAsync(new DoMath(3, 4, "blue", "tom"), + new DeliveryOptions { TenantId = "blue" }.WithHeader("user-id", "tom")); + var tracked = await _publisher.TrackActivity() + .AlsoTrack(_receiver) + .IncludeExternalTransports() + .ExecuteAndWaitAsync(action); + + var envelope = tracked.Received.SingleEnvelope(); + envelope.TenantId.ShouldBe("blue"); + envelope.Headers["user-id"].ShouldBe("tom"); + + answer.Sum.ShouldBe(7); + } +} + +public record WithHeaders; +public record DoMath(int X, int Y, string ExpectedTenantId, string UserIdHeader); +public record Answer(int Sum, int Product); + +public static class InvokeMessageHandler +{ + public static void Handle(WithHeaders message, Envelope envelope) + { + // Our family dog + envelope.Headers["name"].ShouldBe("Chewie"); + envelope.Headers["breed"].ShouldBe("indeterminate"); + envelope.TenantId.ShouldBe("millers"); + } + + public static Answer Handle(DoMath message, Envelope envelope) + { + envelope.TenantId.ShouldBe(message.ExpectedTenantId); + envelope.Headers["user-id"].ShouldBe(message.UserIdHeader); + return new Answer(message.X + message.Y, message.X * message.Y); + } +} \ No newline at end of file diff --git a/src/Wolverine/IMessageBus.cs b/src/Wolverine/IMessageBus.cs index 192f64380..8de864026 100644 --- a/src/Wolverine/IMessageBus.cs +++ b/src/Wolverine/IMessageBus.cs @@ -56,6 +56,18 @@ public interface ICommandBus /// Optional timeout /// Task InvokeAsync(object message, CancellationToken cancellation = default, TimeSpan? timeout = default); + + /// + /// Execute the message handling for this message *right now* and wait for the completion. + /// If the message is handled locally, this delegates immediately + /// If the message is handled remotely, the message is sent and the method waits for the response + /// + /// + /// Use to pass in extra metadata like headers or group id or correlation information to the command execution + /// + /// Optional timeout + /// + Task InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default, TimeSpan? timeout = default); /// /// Execute the message handling for this message *right now* and wait for the completion and the designated response @@ -69,6 +81,20 @@ public interface ICommandBus /// /// Task InvokeAsync(object message, CancellationToken cancellation = default, TimeSpan? timeout = default); + + /// + /// Execute the message handling for this message *right now* and wait for the completion and the designated response + /// type T. + /// If the message is handled locally, this delegates immediately + /// If the message is handled remotely, the message is sent and the method waits for the response + /// + /// + /// Use to pass in extra metadata like headers or group id or correlation information to the command execution + /// + /// Optional timeout + /// + /// + Task InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default, TimeSpan? timeout = default); } /// diff --git a/src/Wolverine/Runtime/Handlers/Executor.cs b/src/Wolverine/Runtime/Handlers/Executor.cs index 9643b5328..8369df38c 100644 --- a/src/Wolverine/Runtime/Handlers/Executor.cs +++ b/src/Wolverine/Runtime/Handlers/Executor.cs @@ -126,16 +126,18 @@ public async Task InvokeInlineAsync(Envelope envelope, CancellationToken cancell } public async Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, - TimeSpan? timeout = null, string? tenantId = null) + TimeSpan? timeout = null, DeliveryOptions? options = null) { var envelope = new Envelope(message) { ReplyUri = TransportConstants.RepliesUri, ReplyRequested = typeof(T).ToMessageTypeName(), ResponseType = typeof(T), - TenantId = tenantId ?? bus.TenantId, + TenantId = options?.TenantId ?? bus.TenantId, DoNotCascadeResponse = true }; + + options?.Override(envelope); bus.TrackEnvelopeCorrelation(envelope, Activity.Current); @@ -150,12 +152,14 @@ public async Task InvokeAsync(object message, MessageBus bus, Cancellation } public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, - TimeSpan? timeout = null, string? tenantId = null) + TimeSpan? timeout = null, DeliveryOptions? options = null) { var envelope = new Envelope(message) { - TenantId = tenantId ?? bus.TenantId + TenantId = options?.TenantId ?? bus.TenantId }; + + options?.Override(envelope); bus.TrackEnvelopeCorrelation(envelope, Activity.Current); return InvokeInlineAsync(envelope, cancellation); diff --git a/src/Wolverine/Runtime/Handlers/NoHandlerExecutor.cs b/src/Wolverine/Runtime/Handlers/NoHandlerExecutor.cs index 38cdb3e17..38cf1cf53 100644 --- a/src/Wolverine/Runtime/Handlers/NoHandlerExecutor.cs +++ b/src/Wolverine/Runtime/Handlers/NoHandlerExecutor.cs @@ -47,7 +47,7 @@ public Task InvokeAsync(MessageContext context, CancellationToken } public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, - TimeSpan? timeout = null, string? tenantId = default) + TimeSpan? timeout = null, DeliveryOptions? options = null) { if (Exception != null) { @@ -58,7 +58,7 @@ public Task InvokeAsync(object message, MessageBus bus, CancellationToken } public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, - TimeSpan? timeout = null, string? tenantId = default) + TimeSpan? timeout = null, DeliveryOptions? options = null) { if (Exception != null) { diff --git a/src/Wolverine/Runtime/MessageBus.cs b/src/Wolverine/Runtime/MessageBus.cs index fb66ffde0..f521c79fd 100644 --- a/src/Wolverine/Runtime/MessageBus.cs +++ b/src/Wolverine/Runtime/MessageBus.cs @@ -119,6 +119,32 @@ public Task InvokeAsync(object message, CancellationToken cancellation = d return Runtime.FindInvoker(message.GetType()).InvokeAsync(message, this, cancellation, timeout); } + public Task InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default, + TimeSpan? timeout = default) + { + if (message == null) + { + throw new ArgumentNullException(nameof(message)); + } + + Runtime.AssertHasStarted(); + + return Runtime.FindInvoker(message.GetType()).InvokeAsync(message, this, cancellation, timeout, options); + } + + public Task InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default, + TimeSpan? timeout = default) + { + if (message == null) + { + throw new ArgumentNullException(nameof(message)); + } + + Runtime.AssertHasStarted(); + + return Runtime.FindInvoker(message.GetType()).InvokeAsync(message, this, cancellation, timeout, options); + } + public Task InvokeForTenantAsync(string tenantId, object message, CancellationToken cancellation = default, TimeSpan? timeout = default) { @@ -129,7 +155,7 @@ public Task InvokeForTenantAsync(string tenantId, object message, CancellationTo Runtime.AssertHasStarted(); - return Runtime.FindInvoker(message.GetType()).InvokeAsync(message, this, cancellation, timeout, tenantId); + return Runtime.FindInvoker(message.GetType()).InvokeAsync(message, this, cancellation, timeout, new DeliveryOptions{TenantId = tenantId}); } public Task InvokeForTenantAsync(string tenantId, object message, CancellationToken cancellation = default, @@ -142,7 +168,7 @@ public Task InvokeForTenantAsync(string tenantId, object message, Cancella Runtime.AssertHasStarted(); - return Runtime.FindInvoker(message.GetType()).InvokeAsync(message, this, cancellation, timeout, tenantId); + return Runtime.FindInvoker(message.GetType()).InvokeAsync(message, this, cancellation, timeout, new DeliveryOptions{TenantId = tenantId}); } public IReadOnlyList PreviewSubscriptions(object message) diff --git a/src/Wolverine/Runtime/Routing/IMessageInvoker.cs b/src/Wolverine/Runtime/Routing/IMessageInvoker.cs index 598cc411f..f0df0eb67 100644 --- a/src/Wolverine/Runtime/Routing/IMessageInvoker.cs +++ b/src/Wolverine/Runtime/Routing/IMessageInvoker.cs @@ -4,9 +4,9 @@ public interface IMessageInvoker { Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, - TimeSpan? timeout = null, string? tenantId = null); + TimeSpan? timeout = null, DeliveryOptions? options = null); Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, - TimeSpan? timeout = null, string? tenantId = null); + TimeSpan? timeout = null, DeliveryOptions? options = null); } \ No newline at end of file diff --git a/src/Wolverine/Runtime/Routing/MessageRoute.cs b/src/Wolverine/Runtime/Routing/MessageRoute.cs index 35fd4412f..e61d246aa 100644 --- a/src/Wolverine/Runtime/Routing/MessageRoute.cs +++ b/src/Wolverine/Runtime/Routing/MessageRoute.cs @@ -74,9 +74,9 @@ public MessageRoute(Type messageType, Endpoint endpoint, IWolverineRuntime runti public IList Rules { get; } = new List(); public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, - TimeSpan? timeout = null, string? tenantId = null) + TimeSpan? timeout = null, DeliveryOptions? options = null) { - return InvokeAsync(message, bus, cancellation, timeout, tenantId); + return InvokeAsync(message, bus, cancellation, timeout, options); } public Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue, @@ -145,13 +145,13 @@ public MessageSubscriptionDescriptor Describe() public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, - TimeSpan? timeout = null, string? tenantId = null) + TimeSpan? timeout = null, DeliveryOptions? options = null) { - return RemoteInvokeAsync(message, bus, cancellation, timeout, tenantId); + return RemoteInvokeAsync(message, bus, cancellation, timeout, options); } internal async Task RemoteInvokeAsync(object message, MessageBus bus, CancellationToken cancellation, - TimeSpan? timeout, string? tenantId, string? topicName = null) + TimeSpan? timeout, DeliveryOptions? options, string? topicName = null) { if (message == null) { @@ -170,9 +170,11 @@ internal async Task RemoteInvokeAsync(object message, MessageBus bus, Canc var envelope = new Envelope(message, Sender) { - TenantId = tenantId ?? bus.TenantId, + TenantId = options?.TenantId ?? bus.TenantId, TopicName = topicName }; + + options?.Override(envelope); foreach (var rule in Rules) rule.Modify(envelope); if (typeof(T) == typeof(Acknowledgement)) diff --git a/src/Wolverine/Runtime/Routing/TopicRouting.cs b/src/Wolverine/Runtime/Routing/TopicRouting.cs index dcd6de670..9b1f7f082 100644 --- a/src/Wolverine/Runtime/Routing/TopicRouting.cs +++ b/src/Wolverine/Runtime/Routing/TopicRouting.cs @@ -63,25 +63,27 @@ public MessageSubscriptionDescriptor Describe() }; } - public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, TimeSpan? timeout = null, - string? tenantId = null) + public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, + TimeSpan? timeout = null, + DeliveryOptions? options = null) { if (message is T typedMessage) { _route ??= _topicEndpoint.RouteFor(typeof(T), bus.Runtime); var topicName = _topicSource(typedMessage); - return _route.RemoteInvokeAsync(message, bus, cancellation, timeout, tenantId, topicName); + return _route.RemoteInvokeAsync(message, bus, cancellation, timeout, options, topicName); } throw new InvalidOperationException( $"The message of type {message.GetType().FullNameInCode()} cannot be routed as a message of type {typeof(T).FullNameInCode()}"); } - public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, TimeSpan? timeout = null, - string? tenantId = null) + public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, + TimeSpan? timeout = null, + DeliveryOptions? options = null) { - return InvokeAsync(message, bus, cancellation, timeout, tenantId); + return InvokeAsync(message, bus, cancellation, timeout, options); } public override string ToString() diff --git a/src/Wolverine/Runtime/WolverineRuntime.cs b/src/Wolverine/Runtime/WolverineRuntime.cs index 6679be337..37a25b3a3 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.cs @@ -137,14 +137,16 @@ public IMessageInvoker FindInvoker(string envelopeMessageType) internal class NulloMessageInvoker : IMessageInvoker { - public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, TimeSpan? timeout = null, - string? tenantId = null) + public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, + TimeSpan? timeout = null, + DeliveryOptions? options = null) { throw new NotSupportedException(); } - public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, TimeSpan? timeout = null, - string? tenantId = null) + public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, + TimeSpan? timeout = null, + DeliveryOptions? options = null) { return Task.CompletedTask; } diff --git a/src/Wolverine/TestMessageContext.cs b/src/Wolverine/TestMessageContext.cs index 2e7a8ac84..24cca0408 100644 --- a/src/Wolverine/TestMessageContext.cs +++ b/src/Wolverine/TestMessageContext.cs @@ -201,6 +201,27 @@ Task ICommandBus.InvokeAsync(object message, CancellationToken cancellatio return Task.FromResult(response); } + Task ICommandBus.InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation, + TimeSpan? timeout) + { + _invoked.Add(message); + return Task.CompletedTask; + } + + Task ICommandBus.InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation, + TimeSpan? timeout) + { + var envelope = new Envelope(message) + { + + }; + + _invoked.Add(envelope); + + var response = findResponse(message); + return Task.FromResult(response); + } + public Task InvokeForTenantAsync(string tenantId, object message, CancellationToken cancellation = default, TimeSpan? timeout = default) {