diff --git a/docs/guide/diagnostics.md b/docs/guide/diagnostics.md index e9581d446..9a08910c4 100644 --- a/docs/guide/diagnostics.md +++ b/docs/guide/diagnostics.md @@ -166,6 +166,6 @@ public static void using_preview_subscriptions(IMessageBus bus) } } ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/durability/dead-letter-storage.md b/docs/guide/durability/dead-letter-storage.md index 7551e5651..8a94ebf09 100644 --- a/docs/guide/durability/dead-letter-storage.md +++ b/docs/guide/durability/dead-letter-storage.md @@ -82,7 +82,7 @@ app.MapDeadLettersEndpoints() ; ``` -snippet source | anchor +snippet source | anchor ### Using the Dead Letters REST API diff --git a/docs/guide/durability/index.md b/docs/guide/durability/index.md index 4ff2c57c8..78581e8cf 100644 --- a/docs/guide/durability/index.md +++ b/docs/guide/durability/index.md @@ -169,9 +169,15 @@ using var host = await Host.CreateDefaultBuilder() // so that the durability agent can recover it and force // it to be sent opts.Durability.OutboxStaleTime = 1.Hours(); + + // Same for the inbox, but it's configured independently + // This should *never* be necessary and the Wolverine + // team has no clue why this could ever happen and a message + // could get "stuck", but yet, here this is: + opts.Durability.InboxStaleTime = 10.Minutes(); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor Note that this will still respect the "deliver by" semantics. This is part of the polling that Wolverine normally does diff --git a/docs/guide/durability/marten/ancillary-stores.md b/docs/guide/durability/marten/ancillary-stores.md index 5ba21dc40..152961b51 100644 --- a/docs/guide/durability/marten/ancillary-stores.md +++ b/docs/guide/durability/marten/ancillary-stores.md @@ -26,7 +26,7 @@ public interface IPlayerStore : IDocumentStore; public interface IThingStore : IDocumentStore; ``` -snippet source | anchor +snippet source | anchor We can add Wolverine integration to both through a similar call to `IntegrateWithWolverine()` as normal as shown below: @@ -71,12 +71,15 @@ theHost = await Host.CreateDefaultBuilder() tenancy.AddSingleTenantDatabase(tenant3ConnectionString, "tenant3"); }); m.DatabaseSchemaName = "things"; - }).IntegrateWithWolverine(x => x.MainConnectionString = Servers.PostgresConnectionString); + }).IntegrateWithWolverine(x => + { + x.MainConnectionString = Servers.PostgresConnectionString; + }); opts.Services.AddResourceSetupOnStartup(); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor Let's specifically zoom in on this code from within the big sample above: @@ -89,7 +92,7 @@ Let's specifically zoom in on this code from within the big sample above: // for all modules for more efficient usage of resources opts.Durability.MessageStorageSchemaName = "wolverine"; ``` -snippet source | anchor +snippet source | anchor If you are using separate Marten document stores for different modules in your application, you can easily make Wolverine @@ -116,7 +119,7 @@ public static class PlayerMessageHandler } } ``` -snippet source | anchor +snippet source | anchor ::: info diff --git a/docs/guide/durability/marten/distribution.md b/docs/guide/durability/marten/distribution.md index 8175c9af2..96ec84b13 100644 --- a/docs/guide/durability/marten/distribution.md +++ b/docs/guide/durability/marten/distribution.md @@ -152,6 +152,6 @@ var host = await Host.CreateDefaultBuilder() opts.CodeGeneration.TypeLoadMode = TypeLoadMode.Auto; }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/durability/marten/event-sourcing.md b/docs/guide/durability/marten/event-sourcing.md index 20a1116ed..a2f0bea7e 100644 --- a/docs/guide/durability/marten/event-sourcing.md +++ b/docs/guide/durability/marten/event-sourcing.md @@ -697,7 +697,7 @@ public static class RaiseIfValidatedHandler } } ``` -snippet source | anchor +snippet source | anchor ## Archiving Streams diff --git a/docs/guide/extensions.md b/docs/guide/extensions.md index 5c33d2336..51913aa3b 100644 --- a/docs/guide/extensions.md +++ b/docs/guide/extensions.md @@ -242,7 +242,7 @@ var app = builder.Build(); // you will need to explicitly call this *before* MapWolverineEndpoints() await app.Services.ApplyAsyncWolverineExtensions(); ``` -snippet source | anchor +snippet source | anchor ## Wolverine Plugin Modules diff --git a/docs/guide/handlers/discovery.md b/docs/guide/handlers/discovery.md index edea4fe60..c41dc8e49 100644 --- a/docs/guide/handlers/discovery.md +++ b/docs/guide/handlers/discovery.md @@ -220,7 +220,7 @@ using var host = await Host.CreateDefaultBuilder() .UseWolverine(opts => { // No automatic discovery of handlers - opts.DisableConventionalDiscovery(); + opts.Discovery.DisableConventionalDiscovery(); }).StartAsync(); ``` snippet source | anchor diff --git a/docs/guide/handlers/middleware.md b/docs/guide/handlers/middleware.md index 3d71f4cd0..157fef73c 100644 --- a/docs/guide/handlers/middleware.md +++ b/docs/guide/handlers/middleware.md @@ -515,7 +515,7 @@ public interface IHandlerPolicy : IWolverinePolicy void Apply(IReadOnlyList chains, GenerationRules rules, IServiceContainer container); } ``` -snippet source | anchor +snippet source | anchor Here's a simple sample that registers middleware on each handler chain: diff --git a/docs/guide/http/endpoints.md b/docs/guide/http/endpoints.md index 6f2ad70e2..020386645 100644 --- a/docs/guide/http/endpoints.md +++ b/docs/guide/http/endpoints.md @@ -329,7 +329,7 @@ and register that strategy within our `MapWolverineEndpoints()` set up like so: // Customizing parameter handling opts.AddParameterHandlingStrategy(); ``` -snippet source | anchor +snippet source | anchor And lastly, here's the application within an HTTP endpoint for extra context: @@ -431,7 +431,28 @@ HTTP endpoint method, and Wolverine already generates code separately for the tw As of Wolverine 5.7, you can also technically use `HttpContext` arguments in the message handler usage *if* you are carefully accounting for that being null as shown in this sample: -snippet: sample_HybridHandler_with_null_HttpContext + + +```cs +public record DoHybrid(string Message); + +public static class HybridHandler +{ + [WolverinePost("/hybrid")] + public static async Task HandleAsync(DoHybrid command, HttpContext? context) + { + // What this, because it will be null if this is used within + // a message handler! + if (context != null) + { + context.Response.ContentType = "text/plain"; + await context.Response.WriteAsync(command.Message); + } + } +} +``` +snippet source | anchor + diff --git a/docs/guide/http/fluentvalidation.md b/docs/guide/http/fluentvalidation.md index 0e81b00a3..a6e779950 100644 --- a/docs/guide/http/fluentvalidation.md +++ b/docs/guide/http/fluentvalidation.md @@ -50,7 +50,7 @@ app.MapWolverineEndpoints(opts => // Wolverine.Http.FluentValidation opts.UseFluentValidationProblemDetailMiddleware(); ``` -snippet source | anchor +snippet source | anchor ## AsParameters Binding diff --git a/docs/guide/http/marten.md b/docs/guide/http/marten.md index 2080bb6fd..b4eb1dd14 100644 --- a/docs/guide/http/marten.md +++ b/docs/guide/http/marten.md @@ -148,7 +148,7 @@ public static OrderShipped Ship(ShipOrder2 command, [Aggregate] Order order) return new OrderShipped(); } ``` -snippet source | anchor +snippet source | anchor Using this version of the "aggregate workflow", you no longer have to supply a command in the request body, so you could @@ -167,7 +167,7 @@ public static OrderShipped Ship3([Aggregate] Order order) return new OrderShipped(); } ``` -snippet source | anchor +snippet source | anchor A couple other notes: @@ -315,7 +315,7 @@ public static (OrderStatus, Events) Post(MarkItemReady command, Order order) return (new OrderStatus(order.Id, order.IsReadyToShip()), events); } ``` -snippet source | anchor +snippet source | anchor ### Responding with the Updated Aggregate @@ -341,7 +341,7 @@ public static (UpdatedAggregate, Events) ConfirmDifferent(ConfirmOrder command, ); } ``` -snippet source | anchor +snippet source | anchor ## Reading the Latest Version of an Aggregate @@ -360,7 +360,7 @@ an HTTP endpoint method, use the `[ReadAggregate]` attribute like this: [WolverineGet("/orders/latest/{id}")] public static Order GetLatest(Guid id, [ReadAggregate] Order order) => order; ``` -snippet source | anchor +snippet source | anchor If the aggregate doesn't exist, the HTTP request will stop with a 404 status code. @@ -380,7 +380,7 @@ Register it in `WolverineHttpOptions` like this: ```cs opts.UseMartenCompiledQueryResultPolicy(); ``` -snippet source | anchor +snippet source | anchor If you now return a compiled query from an Endpoint the result will get directly streamed to the client as JSON. Short circuiting JSON deserialization. diff --git a/docs/guide/http/mediator.md b/docs/guide/http/mediator.md index 686bc213d..9808aa151 100644 --- a/docs/guide/http/mediator.md +++ b/docs/guide/http/mediator.md @@ -45,7 +45,7 @@ app.MapPostToWolverine("/wolverine/request"); app.MapDeleteToWolverine("/wolverine/request"); app.MapPutToWolverine("/wolverine/request"); ``` -snippet source | anchor +snippet source | anchor With this mechanism, Wolverine is able to optimize the runtime function for Minimal API by eliminating IoC service locations diff --git a/docs/guide/http/middleware.md b/docs/guide/http/middleware.md index 9a966328a..99ebe67f8 100644 --- a/docs/guide/http/middleware.md +++ b/docs/guide/http/middleware.md @@ -49,7 +49,7 @@ Which is registered like this (or as described in [`Registering Middleware by Me opts.AddMiddlewareByMessageType(typeof(FakeAuthenticationMiddleware)); opts.AddMiddlewareByMessageType(typeof(CanShipOrderMiddleWare)); ``` -snippet source | anchor +snippet source | anchor The key point to notice there is that `IResult` is a "return value" of the middleware. In the case of an HTTP endpoint, diff --git a/docs/guide/http/multi-tenancy.md b/docs/guide/http/multi-tenancy.md index f058331e1..98d745973 100644 --- a/docs/guide/http/multi-tenancy.md +++ b/docs/guide/http/multi-tenancy.md @@ -275,7 +275,7 @@ public static string NoTenantNoProblem() return "hey"; } ``` -snippet source | anchor +snippet source | anchor If the above usage completely disabled all tenant id detection or validation, in the case of an endpoint that *might* be @@ -293,7 +293,7 @@ public static string MaybeTenanted(IMessageBus bus) return bus.TenantId ?? "none"; } ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/http/policies.md b/docs/guide/http/policies.md index 5492e27c9..db31836da 100644 --- a/docs/guide/http/policies.md +++ b/docs/guide/http/policies.md @@ -65,7 +65,7 @@ app.MapWolverineEndpoints(opts => // Wolverine.Http.FluentValidation opts.UseFluentValidationProblemDetailMiddleware(); ``` -snippet source | anchor +snippet source | anchor The `HttpChain` model is a configuration time structure that Wolverine.Http will use at runtime to create the full @@ -97,7 +97,7 @@ app.MapWolverineEndpoints(opts => // Wolverine.Http.FluentValidation opts.UseFluentValidationProblemDetailMiddleware(); ``` -snippet source | anchor +snippet source | anchor ## Resource Writer Policies @@ -132,7 +132,7 @@ If you need special handling of a primary return type you can implement `IResour ```cs opts.AddResourceWriterPolicy(); ``` -snippet source | anchor +snippet source | anchor Resource writer policies registered this way will be applied in order before all built in policies. diff --git a/docs/guide/http/querystring.md b/docs/guide/http/querystring.md index 044798a38..14a3d509e 100644 --- a/docs/guide/http/querystring.md +++ b/docs/guide/http/querystring.md @@ -120,7 +120,7 @@ public static class QueryOrdersEndpoint } } ``` -snippet source | anchor +snippet source | anchor Because we've used the `[FromQuery]` attribute on a parameter argument that's not a simple type, Wolverine is trying to bind diff --git a/docs/guide/logging.md b/docs/guide/logging.md index 0c25b2629..582edbacd 100644 --- a/docs/guide/logging.md +++ b/docs/guide/logging.md @@ -82,9 +82,11 @@ below: ```cs -public class QuietMessage; +public record QuietMessage; -public class QuietMessageHandler +public record VerboseMessage; + +public class QuietAndVerboseMessageHandler { [WolverineLogging( telemetryEnabled:false, @@ -94,9 +96,27 @@ public class QuietMessageHandler { Console.WriteLine("Hush!"); } + + [WolverineLogging( + // Enable Open Telemetry tracing + TelemetryEnabled = true, + + // Log on successful completion of this message + SuccessLogLevel = LogLevel.Information, + + // Log on execution being complete, but before Wolverine does its own book keeping + ExecutionLogLevel = LogLevel.Information, + + // Throw in yet another contextual logging statement + // at the beginning of message execution + MessageStartingLevel = LogLevel.Debug)] + public void Handle(VerboseMessage message) + { + Console.WriteLine("Tell me about it!"); + } } ``` -snippet source | anchor +snippet source | anchor @@ -162,7 +182,7 @@ public class AuditedMessage [Audit("AccountIdentifier")] public int AccountId; } ``` -snippet source | anchor +snippet source | anchor Or if you are okay using a common message interface for common identification like "this message targets an account/organization/tenant/client" @@ -180,7 +200,7 @@ public interface IAccountMessage // A possible command that uses our marker interface above public record DebitAccount(int AccountId, decimal Amount) : IAccountMessage; ``` -snippet source | anchor +snippet source | anchor You can specify audited members through this syntax: diff --git a/docs/guide/messaging/message-bus.md b/docs/guide/messaging/message-bus.md index 4cb24db3d..fd7bf53e3 100644 --- a/docs/guide/messaging/message-bus.md +++ b/docs/guide/messaging/message-bus.md @@ -425,4 +425,33 @@ public static async Task SendMessagesWithDeliveryOptions(IMessageBus bus) snippet source | anchor +## Sending Raw Message Data +In some particular cases, you may want to use Wolverine to send a message to another system (or the same system) +when you already have the raw binary message data but not an actual .NET message object. An example use case is integrating +scheduling libraries like Quartz.NET or Hangfire where you might be persisting a `byte[]` for a message to be +sent via Wolverine at a certain time. + +Regardless of why you need to do this, Wolverine has a capability to do exactly this, but with the proviso that +you will have to select the messaging endpoint first. To make this concrete, +let's say that you've got this application set up: + +snippet: sample_simple_rabbit_mq_setup_for_raw_messages + +And some more context for the subsequent sample usages: + +snippet: sample_context_for_raw_message_sending + +The simplest possible usage is when you can assume that the receiving Wolverine +endpoint or downstream system will "know" what the message type is without you +having to tell it: + +snippet: sample_simple_usage_of_sending_by_raw_data + +Note that in this case, you'll have to help Wolverine out by explicitly choosing +the destination for the raw message data by either using a `Uri` or the endpoint name. + +You can also specify the .NET message type to help Wolverine create the necessary +metadata for the outgoing message like so: + +snippet: sample_more_advanced_usage_of_raw_message_sending~~~~ \ No newline at end of file diff --git a/src/Testing/CoreTests/EnvelopeTests.cs b/src/Testing/CoreTests/EnvelopeTests.cs index 42facb908..b5fbabfc6 100644 --- a/src/Testing/CoreTests/EnvelopeTests.cs +++ b/src/Testing/CoreTests/EnvelopeTests.cs @@ -620,5 +620,21 @@ public void message_type() { theHandledEnvelope.MessageType.ShouldBe(theOriginal.MessageType); } + + [Fact] + public void set_the_message_type_1() + { + var envelope = new Envelope(); + envelope.SetMessageType(typeof(Message1)); + envelope.MessageType.ShouldBe(typeof(Message1).ToMessageTypeName()); + } + + [Fact] + public void set_the_message_type_2() + { + var envelope = new Envelope(); + envelope.SetMessageType(); + envelope.MessageType.ShouldBe(typeof(Message1).ToMessageTypeName()); + } } } \ No newline at end of file diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/sending_raw_messages.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/sending_raw_messages.cs new file mode 100644 index 000000000..efa17a4f7 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/sending_raw_messages.cs @@ -0,0 +1,240 @@ +using System.Diagnostics; +using System.Text; +using Marten.Services; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.ComplianceTests; +using Wolverine.Tracking; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests; + +public class sending_raw_messages +{ + [Fact] + public async Task send_end_to_end_with_default_message_type_name() + { + var theQueueName = RabbitTesting.NextQueueName(); + using var publisher = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq().DisableDeadLetterQueueing().AutoProvision().AutoPurgeOnStartup(); + + opts.PublishAllMessages() + .ToRabbitQueue(theQueueName).SendInline(); + }).StartAsync(); + + + using var receiver = await WolverineHost.ForAsync(opts => + { + opts.UseRabbitMq().AutoProvision().DisableDeadLetterQueueing(); + + opts.ListenToRabbitQueue(theQueueName).DefaultIncomingMessage() + + .DefaultIncomingMessage() + .PreFetchCount(10).ProcessInline(); + }); + + var messageData = receiver.GetRuntime().Options.DefaultSerializer + .Write(new Envelope() { Message = new RawMessage("Kareem Hunt") }); + + + var tracked = await publisher.TrackActivity() + .AlsoTrack(receiver) + .IncludeExternalTransports() + .ExecuteAndWaitAsync(c => c.EndpointFor(theQueueName).SendRawMessageAsync(messageData, typeof(RawMessage))); + + var received = tracked.Received.SingleEnvelope(); + received.Message.ShouldBeOfType().Name.ShouldBe("Kareem Hunt"); + } + + [Fact] + public async Task send_end_to_end_with_supplied_message_type_name() + { + var theQueueName = RabbitTesting.NextQueueName(); + using var publisher = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq().DisableDeadLetterQueueing().AutoProvision().AutoPurgeOnStartup(); + + opts.PublishAllMessages() + .ToRabbitQueue(theQueueName).SendInline(); + }).StartAsync(); + + + using var receiver = await WolverineHost.ForAsync(opts => + { + opts.UseRabbitMq().AutoProvision().DisableDeadLetterQueueing(); + + opts.ListenToRabbitQueue(theQueueName) + .PreFetchCount(10).ProcessInline(); + }); + + var messageData = receiver.GetRuntime().Options.DefaultSerializer + .Write(new Envelope() { Message = new RawMessage("Nohl Williams") }); + + + var tracked = await publisher.TrackActivity() + .AlsoTrack(receiver) + .IncludeExternalTransports() + .ExecuteAndWaitAsync(c => c.EndpointFor(theQueueName).SendRawMessageAsync(messageData, typeof(RawMessage))); + + var received = tracked.Received.SingleEnvelope(); + received.Message.ShouldBeOfType().Name.ShouldBe("Nohl Williams"); + } + + [Fact] + public async Task send_end_to_end_customize_envelope() + { + var theQueueName = RabbitTesting.NextQueueName(); + using var publisher = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq().DisableDeadLetterQueueing().AutoProvision().AutoPurgeOnStartup(); + + opts.PublishAllMessages() + .ToRabbitQueue(theQueueName).SendInline(); + }).StartAsync(); + + + using var receiver = await WolverineHost.ForAsync(opts => + { + opts.UseRabbitMq().AutoProvision().DisableDeadLetterQueueing(); + + opts.ListenToRabbitQueue(theQueueName) + .PreFetchCount(10).ProcessInline(); + }); + + var messageData = receiver.GetRuntime().Options.DefaultSerializer + .Write(new Envelope() { Message = new RawMessage("Nohl Williams") }); + + + var tracked = await publisher.TrackActivity() + .AlsoTrack(receiver) + .IncludeExternalTransports() + .ExecuteAndWaitAsync(c => + { + return c + .EndpointFor(theQueueName) + .SendRawMessageAsync(messageData, configure: e => + { + e.SetMessageType(); + e.Headers["name"] = "Chris Jones"; + }); + }); + + var received = tracked.Received.SingleEnvelope(); + received.Message.ShouldBeOfType().Name.ShouldBe("Nohl Williams"); + received.Headers["name"].ShouldBe("Chris Jones"); + } + + + public static async Task send_messages_with_raw_data() + { + #region sample_simple_rabbit_mq_setup_for_raw_messages + + var builder = Host.CreateApplicationBuilder(); + var connectionString = builder.Configuration.GetConnectionString("rabbit"); + + builder.UseWolverine(opts => + { + opts.UseRabbitMq(connectionString).AutoProvision(); + + opts.ListenToRabbitQueue("batches") + + // Pay attention to this. This helps Wolverine + // "know" that if the message type isn't specified + // on the incoming Rabbit MQ message to assume that + // the .NET message type is RunBatch + .DefaultIncomingMessage() + + // The default endpoint name would be "batches" anyway, but still + // good to show this if you want to use more readable names: + .Named("batches"); + + opts.ListenToRabbitQueue("control"); + }); + + using var host = builder.Build(); + await host.StartAsync(); + + #endregion + + #region sample_context_for_raw_message_sending + + // Helper method for testing in Wolverine that + // gives you a new IMessageBus instance without having to + // muck around with scoped service providers + IMessageBus bus = host.MessageBus(); + + // The raw message data, but pretend this was sourced from a database + // table or some other non-Wolverine storage in your system + byte[] messageData + = Encoding.Default.GetBytes("{\"Name\": \"George Karlaftis\"}"); + + #endregion + + + #region sample_simple_usage_of_sending_by_raw_data + + // Simplest possible usage. This can work because the + // listening endpoint has a configured default message + // type + await bus + + // choosing the destination endpoint by its name + // Rabbit MQ queues use the queue name by default + .EndpointFor("batches") + .SendRawMessageAsync(messageData); + + // Same usage, but locate by the Wolverine Uri + await bus + + // choosing the destination endpoint by its name + // Rabbit MQ queues use the queue name by default + .EndpointFor(new Uri("rabbitmq://queue/batches")) + .SendRawMessageAsync(messageData); + + #endregion + + #region sample_more_advanced_usage_of_raw_message_sending + + await bus + .EndpointFor(new Uri("rabbitmq://queue/control")) + + // In this case I helped Wolverine out by telling it + // what the .NET message type is for this message + .SendRawMessageAsync(messageData, typeof(RunBatch)); + + await bus + .EndpointFor(new Uri("rabbitmq://queue/control")) + + // In this case I helped Wolverine out by telling it + // what the .NET message type is for this message + .SendRawMessageAsync(messageData, configure: env => + { + + // Alternative usage to just work directly + // with Wolverine's Envelope wrapper + env.SetMessageType(); + + // And you can do *anything* with message metadata + // using the Envelope wrapper + // Use a little bit of caution with this though + env.Headers["user"] = "jack"; + }); + + #endregion + + } +} + +public record RunBatch(string Name); + +public record RawMessage(string Name); + +public static class RawMessageHandler +{ + public static void Handle(RawMessage m) => Debug.WriteLine("Got raw message " + m.Name); +} \ No newline at end of file diff --git a/src/Wolverine/Envelope.cs b/src/Wolverine/Envelope.cs index 5333a783d..fe8af338f 100644 --- a/src/Wolverine/Envelope.cs +++ b/src/Wolverine/Envelope.cs @@ -229,6 +229,26 @@ public object? Message /// public string? MessageType { get; set; } + /// + /// Set the MessageType to Wolverine's message type name for + /// T + /// + /// + public void SetMessageType() + { + SetMessageType(typeof(T)); + } + + /// + /// Set the MessageType to Wolverine's message type name for + /// this message type + /// + /// + public void SetMessageType(Type messageType) + { + MessageType = messageType.ToMessageTypeName(); + } + /// /// Location where any replies should be sent /// diff --git a/src/Wolverine/IDestinationEndpoint.cs b/src/Wolverine/IDestinationEndpoint.cs index 6a2e2a55b..b08c275b4 100644 --- a/src/Wolverine/IDestinationEndpoint.cs +++ b/src/Wolverine/IDestinationEndpoint.cs @@ -40,4 +40,15 @@ Task InvokeAsync(object message, CancellationToken cancellation /// Task InvokeAsync(object message, CancellationToken cancellation = default, TimeSpan? timeout = null) where T : class; + + + /// + /// Send a message by its raw binary contents and optionally configure how Wolverine + /// will send this + /// + /// + /// The .NET type for this message if known. If supplied, this will help Wolverine apply any configured sending policies for this message type + /// + /// + ValueTask SendRawMessageAsync(byte[] data, Type? messageType = null, Action? configure = null); } \ No newline at end of file diff --git a/src/Wolverine/Runtime/DestinationEndpoint.cs b/src/Wolverine/Runtime/DestinationEndpoint.cs index 3ad61cf6e..3c6b2129d 100644 --- a/src/Wolverine/Runtime/DestinationEndpoint.cs +++ b/src/Wolverine/Runtime/DestinationEndpoint.cs @@ -47,7 +47,48 @@ public ValueTask SendAsync(T message, DeliveryOptions? options = null) options?.Override(envelope); // adjust for local, scheduled send - if (envelope.IsScheduledForLater(DateTimeOffset.Now) && !_endpoint.Agent!.SupportsNativeScheduledSend) + if (envelope.IsScheduledForLater(DateTimeOffset.UtcNow) && !_endpoint.Agent!.SupportsNativeScheduledSend) + { + var localDurableQueue = + _parent.Runtime.Endpoints.GetOrBuildSendingAgent(TransportConstants.DurableLocalUri); + envelope = envelope.ForScheduledSend(localDurableQueue); + } + + _parent.TrackEnvelopeCorrelation(envelope, Activity.Current); + + return _parent.PersistOrSendAsync(envelope); + } + + public ValueTask SendRawMessageAsync(byte[] data, Type? messageType = null, Action? configure = null) + { + if (data == null) + { + throw new ArgumentNullException(nameof(data)); + } + + if (data.Length == 0) + { + throw new ArgumentOutOfRangeException(nameof(data), "Zero length data is not valid for this usage"); + } + + var envelope = new Envelope + { + Data = data, + Sender = _parent.Runtime.Endpoints.GetOrBuildSendingAgent(_endpoint.Uri) + }; + + if (messageType != null) + { + envelope.SetMessageType(messageType); + + var route = _endpoint.RouteFor(messageType, _parent.Runtime); + foreach (var rule in route.Rules) rule.Modify(envelope); + } + + configure?.Invoke(envelope); + + // adjust for local, scheduled send + if (envelope.IsScheduledForLater(DateTimeOffset.UtcNow) && !_endpoint.Agent!.SupportsNativeScheduledSend) { var localDurableQueue = _parent.Runtime.Endpoints.GetOrBuildSendingAgent(TransportConstants.DurableLocalUri); diff --git a/src/Wolverine/TestMessageContext.cs b/src/Wolverine/TestMessageContext.cs index 4ee86033b..2e7a8ac84 100644 --- a/src/Wolverine/TestMessageContext.cs +++ b/src/Wolverine/TestMessageContext.cs @@ -322,5 +322,10 @@ public Task InvokeAsync(object message, CancellationToken cancellation = d var response = _parent.findResponse(message, _destination, _endpointName); return Task.FromResult(response); } + + public ValueTask SendRawMessageAsync(byte[] data, Type? messageType = null, Action? configure = null) + { + throw new NotImplementedException(); + } } } \ No newline at end of file