Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/Testing/CoreTests/Acceptance/remote_invocation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,53 @@ public async Task happy_path_with_explicit_endpoint_name()
response.Name.ShouldBe("Croaker");
}

[Fact]
public async Task happy_path_with_explicit_endpoint_name_with_response_and_delivery_options()
{
Response1 response = null!;

Func<IMessageContext, Task<Response1>> fetch = async c =>
response = await c.EndpointFor("Receiver2")
.InvokeAsync<Response1>(new Request1 { Name = "Croaker" },
new DeliveryOptions(){ TenantId = "TheTenant" });

var session = await _sender.TrackActivity()
.AlsoTrack(_receiver1, _receiver2)
.Timeout(5.Seconds())
.ExecuteAndWaitAsync(fetch);

var send = session.FindEnvelopesWithMessageType<Request1>()
.Single(x => x.MessageEventType == MessageEventType.Sent);

send.Envelope.DeliverBy.ShouldNotBeNull();

var envelope = session.Received.SingleEnvelope<Response1>();
envelope.Source.ShouldBe("Receiver2");
envelope.Message.ShouldBe(response);
envelope.TenantId.ShouldBe("TheTenant");
response.Name.ShouldBe("Croaker");
}

[Fact]
public async Task happy_path_with_explicit_endpoint_name_and_delivery_options()
{
var (session, ack) = await _sender.TrackActivity()
.AlsoTrack(_receiver1, _receiver2)
.Timeout(5.Seconds())
.SendMessageAndWaitForAcknowledgementAsync(c =>
c.EndpointFor("Receiver2").InvokeAsync(new Request2 { Name = "Croaker" },
new DeliveryOptions() { TenantId = "TheTenant" }));

var send = session.FindEnvelopesWithMessageType<Request2>()
.Single(x => x.MessageEventType == MessageEventType.Sent);

send.Envelope.DeliverBy.ShouldNotBeNull();

var envelope = session.Received.SingleEnvelope<Acknowledgement>();
envelope.Source.ShouldBe("Receiver2");
envelope.TenantId.ShouldBe("TheTenant");
}

[Fact]
public async Task happy_path_with_explicit_uri_destination()
{
Expand Down
144 changes: 144 additions & 0 deletions src/Testing/CoreTests/TestMessageContextTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,150 @@ public async Task invoke_with_expected_response_and_filter_miss_to_endpoint_by_n
ex.Message.ShouldStartWith("There is no matching expectation for the request message");
}

[Fact]
public async Task invoke_acknowledgement_with_delivery_options_to_endpoint_by_uri()
{
var uri = "something://one".ToUri();
var message1 = new Message1();

await theContext.EndpointFor(uri).InvokeAsync(message1, new DeliveryOptions().WithHeader("ack-test", "value"));

var envelope = theSpy.Sent.ShouldHaveEnvelopeForMessageType<Message1>();
envelope.Destination.ShouldBe(uri);
envelope.Headers["ack-test"].ShouldBe("value");
}

[Fact]
public async Task invoke_acknowledgement_with_delivery_options_to_endpoint_by_name()
{
var message1 = new Message1();

await theContext.EndpointFor("endpoint1").InvokeAsync(message1, new DeliveryOptions().WithHeader("ack-name-test", "value"));

var envelope = theSpy.Sent.ShouldHaveEnvelopeForMessageType<Message1>();
envelope.EndpointName.ShouldBe("endpoint1");
envelope.Headers["ack-name-test"].ShouldBe("value");
}

[Fact]
public async Task invoke_with_expected_response_and_delivery_options_no_filter_hit()
{
var response = new NumberResponse(11);
theSpy.WhenInvokedMessageOf<NumberRequest>().RespondWith(response);

var result = await theContext.InvokeAsync<NumberResponse>(
new NumberRequest(3, 4),
new DeliveryOptions().WithHeader("custom", "value"));

result.ShouldBeSameAs(response);

var envelope = theSpy.Invoked.OfType<Envelope>().Last();
envelope.Headers["custom"].ShouldBe("value");
}

[Fact]
public async Task invoke_with_expected_response_and_delivery_options_and_filter_hit()
{
var response1 = new NumberResponse(11);
var response2 = new NumberResponse(12);
theSpy.WhenInvokedMessageOf<NumberRequest>(x => x.X == 3).RespondWith(response1);
theSpy.WhenInvokedMessageOf<NumberRequest>(x => x.X == 5).RespondWith(response2);

var result1 = await theContext.InvokeAsync<NumberResponse>(
new NumberRequest(3, 4),
new DeliveryOptions().WithHeader("test", "one"));

result1.ShouldBeSameAs(response1);

var result2 = await theContext.InvokeAsync<NumberResponse>(
new NumberRequest(5, 4),
new DeliveryOptions().WithHeader("test", "two"));

result2.ShouldBeSameAs(response2);
}

[Fact]
public async Task invoke_with_expected_response_and_delivery_options_and_filter_miss()
{
var response1 = new NumberResponse(11);
theSpy.WhenInvokedMessageOf<NumberRequest>(x => x.X == 100).RespondWith(response1);

var ex = await Should.ThrowAsync<Exception>(async () =>
{
await theContext.InvokeAsync<NumberResponse>(
new NumberRequest(3, 4),
new DeliveryOptions().WithHeader("test", "value"));
});

ex.Message.ShouldStartWith("There is no matching expectation for the request message");
}

[Fact]
public async Task invoke_with_expected_response_and_delivery_options_to_endpoint_by_uri()
{
var response = new NumberResponse(11);
var destination = new Uri("stub://one");
theSpy.WhenInvokedMessageOf<NumberRequest>(destination: destination).RespondWith(response);

var result = await theContext.EndpointFor(destination)
.InvokeAsync<NumberResponse>(
new NumberRequest(4, 5),
new DeliveryOptions().WithHeader("uri-test", "value"));

result.ShouldBeSameAs(response);

var envelope = theSpy.Invoked.OfType<Envelope>().Last();
envelope.Headers["uri-test"].ShouldBe("value");
envelope.Destination.ShouldBe(destination);
}

[Fact]
public async Task invoke_with_expected_response_and_delivery_options_and_filter_to_endpoint_by_uri()
{
var response = new NumberResponse(11);
var destination = new Uri("stub://one");
theSpy.WhenInvokedMessageOf<NumberRequest>(x => x.X == 4, destination: destination).RespondWith(response);

var result = await theContext.EndpointFor(destination)
.InvokeAsync<NumberResponse>(
new NumberRequest(4, 5),
new DeliveryOptions().WithHeader("filter-uri-test", "value"));

result.ShouldBeSameAs(response);
}

[Fact]
public async Task invoke_with_expected_response_and_delivery_options_to_endpoint_by_name()
{
var response = new NumberResponse(11);
theSpy.WhenInvokedMessageOf<NumberRequest>(endpointName: "one").RespondWith(response);

var result = await theContext.EndpointFor("one")
.InvokeAsync<NumberResponse>(
new NumberRequest(4, 5),
new DeliveryOptions().WithHeader("name-test", "value"));

result.ShouldBeSameAs(response);

var envelope = theSpy.Invoked.OfType<Envelope>().Last();
envelope.Headers["name-test"].ShouldBe("value");
envelope.EndpointName.ShouldBe("one");
}

[Fact]
public async Task invoke_with_expected_response_and_delivery_options_and_filter_to_endpoint_by_name()
{
var response = new NumberResponse(11);
theSpy.WhenInvokedMessageOf<NumberRequest>(x => x.X == 4, endpointName: "one").RespondWith(response);

var result = await theContext.EndpointFor("one")
.InvokeAsync<NumberResponse>(
new NumberRequest(4, 5),
new DeliveryOptions().WithHeader("filter-name-test", "value"));

result.ShouldBeSameAs(response);
}

public static async Task set_up_invoke_expectations()
{
#region sample_using_invoke_with_expected_response_with_test_message_context
Expand Down
23 changes: 23 additions & 0 deletions src/Wolverine/IDestinationEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ public interface IDestinationEndpoint
Task<Acknowledgement> InvokeAsync(object message, CancellationToken cancellation = default,
TimeSpan? timeout = null);

/// <summary>
/// Execute the message at this destination
/// </summary>
/// <param name="message"></param>
/// <param name="options">Use to pass in extra metadata like headers or group id or correlation information to the command execution</param>
/// <param name="cancellation"></param>
/// <param name="timeout"></param>
/// <returns></returns>
Task<Acknowledgement> InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default,
TimeSpan? timeout = null);

/// <summary>
/// Execute the summary at this destination and retrieve the expected
/// response from the destination
Expand All @@ -41,6 +52,18 @@ Task<Acknowledgement> InvokeAsync(object message, CancellationToken cancellation
Task<T> InvokeAsync<T>(object message, CancellationToken cancellation = default, TimeSpan? timeout = null)
where T : class;

/// <summary>
/// Execute the summary at this destination and retrieve the expected
/// response from the destination
/// </summary>
/// <param name="message"></param>
/// <param name="options">Use to pass in extra metadata like headers or group id or correlation information to the command execution</param>
/// <param name="cancellation"></param>
/// <param name="timeout"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
Task<T> InvokeAsync<T>(object message, DeliveryOptions options, CancellationToken cancellation = default, TimeSpan? timeout = null)
where T : class;

/// <summary>
/// Send a message by its raw binary contents and optionally configure how Wolverine
Expand Down
26 changes: 26 additions & 0 deletions src/Wolverine/Runtime/DestinationEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ public Task<Acknowledgement> InvokeAsync(object message, CancellationToken cance
return route.InvokeAsync<Acknowledgement>(message, _parent, cancellation, timeout);
}

public Task<Acknowledgement> InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default,
TimeSpan? timeout = null)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}

var route = _endpoint.RouteFor(message.GetType(), _parent.Runtime);
return route.InvokeAsync<Acknowledgement>(message, _parent, cancellation, timeout, options);
}

public Task<T> InvokeAsync<T>(object message, CancellationToken cancellation = default, TimeSpan? timeout = null)
where T : class
{
Expand All @@ -125,4 +137,18 @@ public Task<T> InvokeAsync<T>(object message, CancellationToken cancellation = d
var route = _endpoint.RouteFor(message.GetType(), _parent.Runtime);
return route.InvokeAsync<T>(message, _parent, cancellation, timeout);
}

public Task<T> InvokeAsync<T>(object message, DeliveryOptions options, CancellationToken cancellation = default,
TimeSpan? timeout = null) where T : class
{
_parent.Runtime.RegisterMessageType(typeof(T));

if (message == null)
{
throw new ArgumentNullException(nameof(message));
}

var route = _endpoint.RouteFor(message.GetType(), _parent.Runtime);
return route.InvokeAsync<T>(message, _parent, cancellation, timeout, options);
}
}
32 changes: 28 additions & 4 deletions src/Wolverine/TestMessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,8 @@ Task ICommandBus.InvokeAsync(object message, DeliveryOptions options, Cancellati
Task<T> ICommandBus.InvokeAsync<T>(object message, DeliveryOptions options, CancellationToken cancellation,
TimeSpan? timeout)
{
var envelope = new Envelope(message)
{

};
var envelope = new Envelope(message);
options.Override(envelope);

_invoked.Add(envelope);

Expand Down Expand Up @@ -329,6 +327,16 @@ public Task<Acknowledgement> InvokeAsync(object message, CancellationToken cance
return Task.FromResult(new Acknowledgement());
}

public Task<Acknowledgement> InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default,
TimeSpan? timeout = null)
{
var envelope = new Envelope { Message = message, Destination = _destination, EndpointName = _endpointName };
options.Override(envelope);

_parent._sent.Add(envelope);
return Task.FromResult(new Acknowledgement());
}

public Task<T> InvokeAsync<T>(object message, CancellationToken cancellation = default,
TimeSpan? timeout = null) where T : class
{
Expand All @@ -344,6 +352,22 @@ public Task<T> InvokeAsync<T>(object message, CancellationToken cancellation = d
return Task.FromResult(response);
}

public Task<T> InvokeAsync<T>(object message, DeliveryOptions options, CancellationToken cancellation = default,
TimeSpan? timeout = null) where T : class
{
var envelope = new Envelope(message)
{
EndpointName = _endpointName,
Destination = _destination
};
options.Override(envelope);

_parent._invoked.Add(envelope);

var response = _parent.findResponse<T>(message, _destination, _endpointName);
return Task.FromResult(response);
}

public ValueTask SendRawMessageAsync(byte[] data, Type? messageType = null, Action<Envelope>? configure = null)
{
throw new NotImplementedException();
Expand Down
Loading