From 33bb0dc82cc87feec08e0621c9c2d4cbcda775dc Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Tue, 24 Feb 2026 21:41:12 +0100 Subject: [PATCH] Add DeliveryOptions overloads to InvokeAsync methods Introduce new InvokeAsync overloads on IDestinationEndpoint and implementations to accept DeliveryOptions, enabling callers to specify headers, tenant IDs, and other metadata when invoking messages. --- .../CoreTests/Acceptance/remote_invocation.cs | 47 ++++++ .../CoreTests/TestMessageContextTests.cs | 144 ++++++++++++++++++ src/Wolverine/IDestinationEndpoint.cs | 23 +++ src/Wolverine/Runtime/DestinationEndpoint.cs | 26 ++++ src/Wolverine/TestMessageContext.cs | 32 +++- 5 files changed, 268 insertions(+), 4 deletions(-) diff --git a/src/Testing/CoreTests/Acceptance/remote_invocation.cs b/src/Testing/CoreTests/Acceptance/remote_invocation.cs index afd5006a6..33ee2f3eb 100644 --- a/src/Testing/CoreTests/Acceptance/remote_invocation.cs +++ b/src/Testing/CoreTests/Acceptance/remote_invocation.cs @@ -160,6 +160,53 @@ public async Task happy_path_with_explicit_endpoint_name() response.Name.ShouldBe("Croaker"); } + [Fact] + public async Task happy_path_with_explicit_endpoint_name_with_response_and_delivery_options() + { + Response1 response = null!; + + Func> fetch = async c => + response = await c.EndpointFor("Receiver2") + .InvokeAsync(new Request1 { Name = "Croaker" }, + new DeliveryOptions(){ TenantId = "TheTenant" }); + + var session = await _sender.TrackActivity() + .AlsoTrack(_receiver1, _receiver2) + .Timeout(5.Seconds()) + .ExecuteAndWaitAsync(fetch); + + var send = session.FindEnvelopesWithMessageType() + .Single(x => x.MessageEventType == MessageEventType.Sent); + + send.Envelope.DeliverBy.ShouldNotBeNull(); + + var envelope = session.Received.SingleEnvelope(); + envelope.Source.ShouldBe("Receiver2"); + envelope.Message.ShouldBe(response); + envelope.TenantId.ShouldBe("TheTenant"); + response.Name.ShouldBe("Croaker"); + } + + [Fact] + public async Task happy_path_with_explicit_endpoint_name_and_delivery_options() + { + var (session, ack) = await _sender.TrackActivity() + .AlsoTrack(_receiver1, _receiver2) + .Timeout(5.Seconds()) + .SendMessageAndWaitForAcknowledgementAsync(c => + c.EndpointFor("Receiver2").InvokeAsync(new Request2 { Name = "Croaker" }, + new DeliveryOptions() { TenantId = "TheTenant" })); + + var send = session.FindEnvelopesWithMessageType() + .Single(x => x.MessageEventType == MessageEventType.Sent); + + send.Envelope.DeliverBy.ShouldNotBeNull(); + + var envelope = session.Received.SingleEnvelope(); + envelope.Source.ShouldBe("Receiver2"); + envelope.TenantId.ShouldBe("TheTenant"); + } + [Fact] public async Task happy_path_with_explicit_uri_destination() { diff --git a/src/Testing/CoreTests/TestMessageContextTests.cs b/src/Testing/CoreTests/TestMessageContextTests.cs index b7ff858ae..b1208ff16 100644 --- a/src/Testing/CoreTests/TestMessageContextTests.cs +++ b/src/Testing/CoreTests/TestMessageContextTests.cs @@ -318,6 +318,150 @@ public async Task invoke_with_expected_response_and_filter_miss_to_endpoint_by_n ex.Message.ShouldStartWith("There is no matching expectation for the request message"); } + [Fact] + public async Task invoke_acknowledgement_with_delivery_options_to_endpoint_by_uri() + { + var uri = "something://one".ToUri(); + var message1 = new Message1(); + + await theContext.EndpointFor(uri).InvokeAsync(message1, new DeliveryOptions().WithHeader("ack-test", "value")); + + var envelope = theSpy.Sent.ShouldHaveEnvelopeForMessageType(); + envelope.Destination.ShouldBe(uri); + envelope.Headers["ack-test"].ShouldBe("value"); + } + + [Fact] + public async Task invoke_acknowledgement_with_delivery_options_to_endpoint_by_name() + { + var message1 = new Message1(); + + await theContext.EndpointFor("endpoint1").InvokeAsync(message1, new DeliveryOptions().WithHeader("ack-name-test", "value")); + + var envelope = theSpy.Sent.ShouldHaveEnvelopeForMessageType(); + envelope.EndpointName.ShouldBe("endpoint1"); + envelope.Headers["ack-name-test"].ShouldBe("value"); + } + + [Fact] + public async Task invoke_with_expected_response_and_delivery_options_no_filter_hit() + { + var response = new NumberResponse(11); + theSpy.WhenInvokedMessageOf().RespondWith(response); + + var result = await theContext.InvokeAsync( + new NumberRequest(3, 4), + new DeliveryOptions().WithHeader("custom", "value")); + + result.ShouldBeSameAs(response); + + var envelope = theSpy.Invoked.OfType().Last(); + envelope.Headers["custom"].ShouldBe("value"); + } + + [Fact] + public async Task invoke_with_expected_response_and_delivery_options_and_filter_hit() + { + var response1 = new NumberResponse(11); + var response2 = new NumberResponse(12); + theSpy.WhenInvokedMessageOf(x => x.X == 3).RespondWith(response1); + theSpy.WhenInvokedMessageOf(x => x.X == 5).RespondWith(response2); + + var result1 = await theContext.InvokeAsync( + new NumberRequest(3, 4), + new DeliveryOptions().WithHeader("test", "one")); + + result1.ShouldBeSameAs(response1); + + var result2 = await theContext.InvokeAsync( + new NumberRequest(5, 4), + new DeliveryOptions().WithHeader("test", "two")); + + result2.ShouldBeSameAs(response2); + } + + [Fact] + public async Task invoke_with_expected_response_and_delivery_options_and_filter_miss() + { + var response1 = new NumberResponse(11); + theSpy.WhenInvokedMessageOf(x => x.X == 100).RespondWith(response1); + + var ex = await Should.ThrowAsync(async () => + { + await theContext.InvokeAsync( + new NumberRequest(3, 4), + new DeliveryOptions().WithHeader("test", "value")); + }); + + ex.Message.ShouldStartWith("There is no matching expectation for the request message"); + } + + [Fact] + public async Task invoke_with_expected_response_and_delivery_options_to_endpoint_by_uri() + { + var response = new NumberResponse(11); + var destination = new Uri("stub://one"); + theSpy.WhenInvokedMessageOf(destination: destination).RespondWith(response); + + var result = await theContext.EndpointFor(destination) + .InvokeAsync( + new NumberRequest(4, 5), + new DeliveryOptions().WithHeader("uri-test", "value")); + + result.ShouldBeSameAs(response); + + var envelope = theSpy.Invoked.OfType().Last(); + envelope.Headers["uri-test"].ShouldBe("value"); + envelope.Destination.ShouldBe(destination); + } + + [Fact] + public async Task invoke_with_expected_response_and_delivery_options_and_filter_to_endpoint_by_uri() + { + var response = new NumberResponse(11); + var destination = new Uri("stub://one"); + theSpy.WhenInvokedMessageOf(x => x.X == 4, destination: destination).RespondWith(response); + + var result = await theContext.EndpointFor(destination) + .InvokeAsync( + new NumberRequest(4, 5), + new DeliveryOptions().WithHeader("filter-uri-test", "value")); + + result.ShouldBeSameAs(response); + } + + [Fact] + public async Task invoke_with_expected_response_and_delivery_options_to_endpoint_by_name() + { + var response = new NumberResponse(11); + theSpy.WhenInvokedMessageOf(endpointName: "one").RespondWith(response); + + var result = await theContext.EndpointFor("one") + .InvokeAsync( + new NumberRequest(4, 5), + new DeliveryOptions().WithHeader("name-test", "value")); + + result.ShouldBeSameAs(response); + + var envelope = theSpy.Invoked.OfType().Last(); + envelope.Headers["name-test"].ShouldBe("value"); + envelope.EndpointName.ShouldBe("one"); + } + + [Fact] + public async Task invoke_with_expected_response_and_delivery_options_and_filter_to_endpoint_by_name() + { + var response = new NumberResponse(11); + theSpy.WhenInvokedMessageOf(x => x.X == 4, endpointName: "one").RespondWith(response); + + var result = await theContext.EndpointFor("one") + .InvokeAsync( + new NumberRequest(4, 5), + new DeliveryOptions().WithHeader("filter-name-test", "value")); + + result.ShouldBeSameAs(response); + } + public static async Task set_up_invoke_expectations() { #region sample_using_invoke_with_expected_response_with_test_message_context diff --git a/src/Wolverine/IDestinationEndpoint.cs b/src/Wolverine/IDestinationEndpoint.cs index b08c275b4..043e696ce 100644 --- a/src/Wolverine/IDestinationEndpoint.cs +++ b/src/Wolverine/IDestinationEndpoint.cs @@ -29,6 +29,17 @@ public interface IDestinationEndpoint Task InvokeAsync(object message, CancellationToken cancellation = default, TimeSpan? timeout = null); + /// + /// Execute the message at this destination + /// + /// + /// Use to pass in extra metadata like headers or group id or correlation information to the command execution + /// + /// + /// + Task InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default, + TimeSpan? timeout = null); + /// /// Execute the summary at this destination and retrieve the expected /// response from the destination @@ -41,6 +52,18 @@ Task InvokeAsync(object message, CancellationToken cancellation Task InvokeAsync(object message, CancellationToken cancellation = default, TimeSpan? timeout = null) where T : class; + /// + /// Execute the summary at this destination and retrieve the expected + /// response from the destination + /// + /// + /// Use to pass in extra metadata like headers or group id or correlation information to the command execution + /// + /// + /// + /// + Task InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default, TimeSpan? timeout = null) + where T : class; /// /// Send a message by its raw binary contents and optionally configure how Wolverine diff --git a/src/Wolverine/Runtime/DestinationEndpoint.cs b/src/Wolverine/Runtime/DestinationEndpoint.cs index 3c6b2129d..9fd203327 100644 --- a/src/Wolverine/Runtime/DestinationEndpoint.cs +++ b/src/Wolverine/Runtime/DestinationEndpoint.cs @@ -112,6 +112,18 @@ public Task InvokeAsync(object message, CancellationToken cance return route.InvokeAsync(message, _parent, cancellation, timeout); } + public Task InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default, + TimeSpan? timeout = null) + { + if (message == null) + { + throw new ArgumentNullException(nameof(message)); + } + + var route = _endpoint.RouteFor(message.GetType(), _parent.Runtime); + return route.InvokeAsync(message, _parent, cancellation, timeout, options); + } + public Task InvokeAsync(object message, CancellationToken cancellation = default, TimeSpan? timeout = null) where T : class { @@ -125,4 +137,18 @@ public Task InvokeAsync(object message, CancellationToken cancellation = d var route = _endpoint.RouteFor(message.GetType(), _parent.Runtime); return route.InvokeAsync(message, _parent, cancellation, timeout); } + + public Task InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default, + TimeSpan? timeout = null) where T : class + { + _parent.Runtime.RegisterMessageType(typeof(T)); + + if (message == null) + { + throw new ArgumentNullException(nameof(message)); + } + + var route = _endpoint.RouteFor(message.GetType(), _parent.Runtime); + return route.InvokeAsync(message, _parent, cancellation, timeout, options); + } } \ No newline at end of file diff --git a/src/Wolverine/TestMessageContext.cs b/src/Wolverine/TestMessageContext.cs index 24cca0408..9c15cb17f 100644 --- a/src/Wolverine/TestMessageContext.cs +++ b/src/Wolverine/TestMessageContext.cs @@ -211,10 +211,8 @@ Task ICommandBus.InvokeAsync(object message, DeliveryOptions options, Cancellati Task ICommandBus.InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation, TimeSpan? timeout) { - var envelope = new Envelope(message) - { - - }; + var envelope = new Envelope(message); + options.Override(envelope); _invoked.Add(envelope); @@ -329,6 +327,16 @@ public Task InvokeAsync(object message, CancellationToken cance return Task.FromResult(new Acknowledgement()); } + public Task InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default, + TimeSpan? timeout = null) + { + var envelope = new Envelope { Message = message, Destination = _destination, EndpointName = _endpointName }; + options.Override(envelope); + + _parent._sent.Add(envelope); + return Task.FromResult(new Acknowledgement()); + } + public Task InvokeAsync(object message, CancellationToken cancellation = default, TimeSpan? timeout = null) where T : class { @@ -344,6 +352,22 @@ public Task InvokeAsync(object message, CancellationToken cancellation = d return Task.FromResult(response); } + public Task InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default, + TimeSpan? timeout = null) where T : class + { + var envelope = new Envelope(message) + { + EndpointName = _endpointName, + Destination = _destination + }; + options.Override(envelope); + + _parent._invoked.Add(envelope); + + var response = _parent.findResponse(message, _destination, _endpointName); + return Task.FromResult(response); + } + public ValueTask SendRawMessageAsync(byte[] data, Type? messageType = null, Action? configure = null) { throw new NotImplementedException();