Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public override async Task ProcessEventsAsync(EventRange page, ISubscriptionCont

try
{
await invoker.InvokeAsync(@event, (MessageBus)bus, cancellationToken, tenantId: @event.TenantId);
await invoker.InvokeAsync(@event, (MessageBus)bus, cancellationToken, options: new DeliveryOptions{TenantId = @event.TenantId});
sequence = @event.Sequence;
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@ public InnerDataInvoker(IMessageInvoker inner)
_inner = inner;
}

public Task<T1> InvokeAsync<T1>(object message, MessageBus bus, CancellationToken cancellation = default, TimeSpan? timeout = null,
string? tenantId = null)
public Task<T1> InvokeAsync<T1>(object message, MessageBus bus, CancellationToken cancellation = default,
TimeSpan? timeout = null,
DeliveryOptions? options = null)
{
throw new NotSupportedException();
}

public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, TimeSpan? timeout = null,
string? tenantId = null)
public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default,
TimeSpan? timeout = null,
DeliveryOptions? options = null)
{
if (message is IEvent<T> e)
{
return _inner.InvokeAsync(e.Data, bus, cancellation, timeout, tenantId);
return _inner.InvokeAsync(e.Data, bus, cancellation, timeout, options);
}

return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ namespace Wolverine.Marten.Subscriptions;

internal class NulloMessageInvoker : IMessageInvoker
{
public Task<T> InvokeAsync<T>(object message, MessageBus bus, CancellationToken cancellation = default, TimeSpan? timeout = null,
string? tenantId = null)
public Task<T> InvokeAsync<T>(object message, MessageBus bus, CancellationToken cancellation = default,
TimeSpan? timeout = null,
DeliveryOptions? options = null)
{
throw new NotSupportedException();
}

public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default, TimeSpan? timeout = null,
string? tenantId = null)
public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default,
TimeSpan? timeout = null,
DeliveryOptions? options = null)
{
return Task.CompletedTask;
}
Expand Down
5 changes: 5 additions & 0 deletions src/Samples/DocumentationSamples/EnqueueSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ public static async Task invoke_locally(IMessageBus bus)
{
// Execute the message inline
await bus.InvokeAsync(new Message1());

// Execute the message inline, but this time pass in
// messaging metadata for Wolverine
await bus.InvokeAsync(new Message1(),
new DeliveryOptions { TenantId = "one", SagaId = "two" }.WithHeader("user.id", "admin"));
}

#endregion
Expand Down
7 changes: 7 additions & 0 deletions src/Samples/DocumentationSamples/MessageBusBasics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ public async Task invoke_debit_account(IMessageBus bus)
public async Task invoke_math_operations(IMessageBus bus)
{
var results = await bus.InvokeAsync<Results>(new Numbers(3, 4));

// Same functionality, but this time we'll configure the active
// tenant id and add a message header
var results2 = await bus.InvokeAsync<Results>(new Numbers(5, 6), new DeliveryOptions
{
TenantId = "north.america"
}.WithHeader("user.id", "professor"));
}

#endregion
Expand Down
1 change: 1 addition & 0 deletions src/Testing/MessageRoutingTests/MessageRoutingContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ protected void assertExternalListenersAre(string uriList)
var actual = runtime
.Endpoints
.ActiveListeners()
.Where(x => x.Uri.Scheme != "stub")
.Where(x => x.Endpoint.Role == EndpointRole.Application)
.OrderBy(x => x.Uri.ToString())
.Select(x => x.Uri).ToArray();
Expand Down
118 changes: 118 additions & 0 deletions src/Testing/SlowTests/invoke_async_with_delivery_options.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.ComplianceTests;
using Wolverine.Tracking;
using Wolverine.Transports.Tcp;
using Wolverine.Util;
using Xunit;

namespace SlowTests;

public class invoke_async_with_delivery_options : IAsyncLifetime
{
private IHost _publisher;
private IHost _receiver;

public async Task InitializeAsync()
{
var publisherPort = PortFinder.GetAvailablePort();
var receiverPort = PortFinder.GetAvailablePort();


_publisher = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.DisableConventionalDiscovery();
opts.PublishAllMessages().ToPort(receiverPort);
opts.ListenAtPort(publisherPort);
}).StartAsync();

_receiver = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.ListenAtPort(receiverPort);
}).StartAsync();

}

public async Task DisposeAsync()
{
await _publisher.StopAsync();
await _receiver.StopAsync();
}

[Fact]
public async Task invoke_locally()
{
var bus = _receiver.MessageBus();
await bus.InvokeAsync(new WithHeaders(),
new DeliveryOptions { TenantId = "millers" }.WithHeader("name", "Chewie")
.WithHeader("breed", "indeterminate"));
}

[Fact]
public async Task invoke_with_expected_outcome_locally()
{
var bus = _receiver.MessageBus();
var answer = await bus.InvokeAsync<Answer>(new DoMath(3, 4, "blue", "tom"),
new DeliveryOptions { TenantId = "blue" }.WithHeader("user-id", "tom"));

answer.Sum.ShouldBe(7);
}

[Fact]
public async Task invoke_remotely()
{
var tracked = await _publisher.TrackActivity()
.AlsoTrack(_receiver)
.IncludeExternalTransports()
.ExecuteAndWaitAsync(c => c.InvokeAsync(new WithHeaders(),
new DeliveryOptions { TenantId = "millers" }.WithHeader("name", "Chewie")
.WithHeader("breed", "indeterminate")));

var envelope = tracked.Received.SingleEnvelope<WithHeaders>();
envelope.TenantId.ShouldBe("millers");
envelope.Headers["name"].ShouldBe("Chewie");
}

[Fact]
public async Task invoke_with_expected_outcome_remotely()
{
Answer answer = null;
Func<IMessageContext, Task> action = async c => answer = await c.InvokeAsync<Answer>(new DoMath(3, 4, "blue", "tom"),
new DeliveryOptions { TenantId = "blue" }.WithHeader("user-id", "tom"));
var tracked = await _publisher.TrackActivity()
.AlsoTrack(_receiver)
.IncludeExternalTransports()
.ExecuteAndWaitAsync(action);

var envelope = tracked.Received.SingleEnvelope<DoMath>();
envelope.TenantId.ShouldBe("blue");
envelope.Headers["user-id"].ShouldBe("tom");

answer.Sum.ShouldBe(7);
}
}

public record WithHeaders;
public record DoMath(int X, int Y, string ExpectedTenantId, string UserIdHeader);
public record Answer(int Sum, int Product);

public static class InvokeMessageHandler
{
public static void Handle(WithHeaders message, Envelope envelope)
{
// Our family dog
envelope.Headers["name"].ShouldBe("Chewie");
envelope.Headers["breed"].ShouldBe("indeterminate");
envelope.TenantId.ShouldBe("millers");
}

public static Answer Handle(DoMath message, Envelope envelope)
{
envelope.TenantId.ShouldBe(message.ExpectedTenantId);
envelope.Headers["user-id"].ShouldBe(message.UserIdHeader);
return new Answer(message.X + message.Y, message.X * message.Y);
}
}
26 changes: 26 additions & 0 deletions src/Wolverine/IMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ public interface ICommandBus
/// <param name="timeout">Optional timeout</param>
/// <returns></returns>
Task InvokeAsync(object message, CancellationToken cancellation = default, TimeSpan? timeout = default);

/// <summary>
/// Execute the message handling for this message *right now* and wait for the completion.
/// If the message is handled locally, this delegates immediately
/// If the message is handled remotely, the message is sent and the method waits for the response
/// </summary>
/// <param name="message"></param>
/// <param name="options">Use to pass in extra metadata like headers or group id or correlation information to the command execution</param>
/// <param name="cancellation"></param>
/// <param name="timeout">Optional timeout</param>
/// <returns></returns>
Task InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default, TimeSpan? timeout = default);

/// <summary>
/// Execute the message handling for this message *right now* and wait for the completion and the designated response
Expand All @@ -69,6 +81,20 @@ public interface ICommandBus
/// <typeparam name="T"></typeparam>
/// <returns></returns>
Task<T> InvokeAsync<T>(object message, CancellationToken cancellation = default, TimeSpan? timeout = default);

/// <summary>
/// Execute the message handling for this message *right now* and wait for the completion and the designated response
/// type T.
/// If the message is handled locally, this delegates immediately
/// If the message is handled remotely, the message is sent and the method waits for the response
/// </summary>
/// <param name="message"></param>
/// <param name="options">Use to pass in extra metadata like headers or group id or correlation information to the command execution</param>
/// <param name="cancellation"></param>
/// <param name="timeout">Optional timeout</param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
Task<T> InvokeAsync<T>(object message, DeliveryOptions options, CancellationToken cancellation = default, TimeSpan? timeout = default);
}

/// <summary>
Expand Down
12 changes: 8 additions & 4 deletions src/Wolverine/Runtime/Handlers/Executor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,18 @@ public async Task InvokeInlineAsync(Envelope envelope, CancellationToken cancell
}

public async Task<T> InvokeAsync<T>(object message, MessageBus bus, CancellationToken cancellation = default,
TimeSpan? timeout = null, string? tenantId = null)
TimeSpan? timeout = null, DeliveryOptions? options = null)
{
var envelope = new Envelope(message)
{
ReplyUri = TransportConstants.RepliesUri,
ReplyRequested = typeof(T).ToMessageTypeName(),
ResponseType = typeof(T),
TenantId = tenantId ?? bus.TenantId,
TenantId = options?.TenantId ?? bus.TenantId,
DoNotCascadeResponse = true
};

options?.Override(envelope);

bus.TrackEnvelopeCorrelation(envelope, Activity.Current);

Expand All @@ -150,12 +152,14 @@ public async Task<T> InvokeAsync<T>(object message, MessageBus bus, Cancellation
}

public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default,
TimeSpan? timeout = null, string? tenantId = null)
TimeSpan? timeout = null, DeliveryOptions? options = null)
{
var envelope = new Envelope(message)
{
TenantId = tenantId ?? bus.TenantId
TenantId = options?.TenantId ?? bus.TenantId
};

options?.Override(envelope);

bus.TrackEnvelopeCorrelation(envelope, Activity.Current);
return InvokeInlineAsync(envelope, cancellation);
Expand Down
4 changes: 2 additions & 2 deletions src/Wolverine/Runtime/Handlers/NoHandlerExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Task<InvokeResult> InvokeAsync(MessageContext context, CancellationToken
}

public Task<T> InvokeAsync<T>(object message, MessageBus bus, CancellationToken cancellation = default,
TimeSpan? timeout = null, string? tenantId = default)
TimeSpan? timeout = null, DeliveryOptions? options = null)
{
if (Exception != null)
{
Expand All @@ -58,7 +58,7 @@ public Task<T> InvokeAsync<T>(object message, MessageBus bus, CancellationToken
}

public Task InvokeAsync(object message, MessageBus bus, CancellationToken cancellation = default,
TimeSpan? timeout = null, string? tenantId = default)
TimeSpan? timeout = null, DeliveryOptions? options = null)
{
if (Exception != null)
{
Expand Down
30 changes: 28 additions & 2 deletions src/Wolverine/Runtime/MessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,32 @@ public Task<T> InvokeAsync<T>(object message, CancellationToken cancellation = d
return Runtime.FindInvoker(message.GetType()).InvokeAsync<T>(message, this, cancellation, timeout);
}

public Task InvokeAsync(object message, DeliveryOptions options, CancellationToken cancellation = default,
TimeSpan? timeout = default)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}

Runtime.AssertHasStarted();

return Runtime.FindInvoker(message.GetType()).InvokeAsync(message, this, cancellation, timeout, options);
}

public Task<T> InvokeAsync<T>(object message, DeliveryOptions options, CancellationToken cancellation = default,
TimeSpan? timeout = default)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}

Runtime.AssertHasStarted();

return Runtime.FindInvoker(message.GetType()).InvokeAsync<T>(message, this, cancellation, timeout, options);
}

public Task InvokeForTenantAsync(string tenantId, object message, CancellationToken cancellation = default,
TimeSpan? timeout = default)
{
Expand All @@ -129,7 +155,7 @@ public Task InvokeForTenantAsync(string tenantId, object message, CancellationTo

Runtime.AssertHasStarted();

return Runtime.FindInvoker(message.GetType()).InvokeAsync(message, this, cancellation, timeout, tenantId);
return Runtime.FindInvoker(message.GetType()).InvokeAsync(message, this, cancellation, timeout, new DeliveryOptions{TenantId = tenantId});
}

public Task<T> InvokeForTenantAsync<T>(string tenantId, object message, CancellationToken cancellation = default,
Expand All @@ -142,7 +168,7 @@ public Task<T> InvokeForTenantAsync<T>(string tenantId, object message, Cancella

Runtime.AssertHasStarted();

return Runtime.FindInvoker(message.GetType()).InvokeAsync<T>(message, this, cancellation, timeout, tenantId);
return Runtime.FindInvoker(message.GetType()).InvokeAsync<T>(message, this, cancellation, timeout, new DeliveryOptions{TenantId = tenantId});
}

public IReadOnlyList<Envelope> PreviewSubscriptions(object message)
Expand Down
4 changes: 2 additions & 2 deletions src/Wolverine/Runtime/Routing/IMessageInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ public interface IMessageInvoker
{
Task<T> InvokeAsync<T>(object message, MessageBus bus,
CancellationToken cancellation = default,
TimeSpan? timeout = null, string? tenantId = null);
TimeSpan? timeout = null, DeliveryOptions? options = null);

Task InvokeAsync(object message, MessageBus bus,
CancellationToken cancellation = default,
TimeSpan? timeout = null, string? tenantId = null);
TimeSpan? timeout = null, DeliveryOptions? options = null);
}
Loading
Loading