diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 775acde60..78a59023b 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -190,6 +190,7 @@ const config: UserConfig = { ] }, {text: 'Partitioned Sequential Messaging', link: '/guide/messaging/partitioning'}, + {text: 'Unknown Messages', link: '/guide/messaging/unknown'}, {text: 'Endpoint Specific Operations', link: '/guide/messaging/endpoint-operations'}, {text: 'Broadcast to a Specific Topic', link: '/guide/messaging/broadcast-to-topic'}, {text: 'Message Expiration', link: '/guide/messaging/expiration'}, diff --git a/docs/guide/configuration.md b/docs/guide/configuration.md index aad538aa1..94efe4398 100644 --- a/docs/guide/configuration.md +++ b/docs/guide/configuration.md @@ -201,4 +201,32 @@ within different modules within your system, you can use [Wolverine extensions]( You can also use the `IServiceCollection.ConfigureWolverine()` method to add configuration to your Wolverine application from outside the main `UseWolverine()` code as shown below: -snippet: sample_using_configure_wolverine \ No newline at end of file + + +```cs +var builder = Host.CreateApplicationBuilder(); + +// Baseline Wolverine configuration +builder.Services.AddWolverine(opts => +{ + +}); + +// This would be applied as an extension +builder.Services.ConfigureWolverine(w => +{ + // There is a specific helper for this, but just go for it + // as an easy example + w.Durability.Mode = DurabilityMode.Solo; +}); + +using var host = builder.Build(); + +host.Services.GetRequiredService() + .Options + .Durability + .Mode + .ShouldBe(DurabilityMode.Solo); +``` +snippet source | anchor + diff --git a/docs/guide/durability/index.md b/docs/guide/durability/index.md index 74f37d29a..989e41255 100644 --- a/docs/guide/durability/index.md +++ b/docs/guide/durability/index.md @@ -158,7 +158,21 @@ migration subsystem You also have this setting to force Wolverine to automatically "bump" and older messages that seem to be stalled in the outbox table: -snippet: sample_configuring_outbox_stale_timeout + + +```cs +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Bump any persisted message in the outbox tables + // that is more than an hour old to be globally owned + // so that the durability agent can recover it and force + // it to be sent + opts.Durability.OutboxStaleTime = 1.Hours(); + }).StartAsync(); +``` +snippet source | anchor + Note that this will still respect the "deliver by" semantics. This is part of the polling that Wolverine normally does against the inbox/outbox/node storage tables. Note that this will only happen if the setting above has a non-null diff --git a/docs/guide/durability/postgresql.md b/docs/guide/durability/postgresql.md index a7e6bc7a3..ad1911849 100644 --- a/docs/guide/durability/postgresql.md +++ b/docs/guide/durability/postgresql.md @@ -54,7 +54,16 @@ as an optimization. This is not enabled by default just to avoid causing databas migrations in a minor point release. Note that this will have some significant benefits for inbox/outbox metrics gathering in the future: -snippet: sample_enabling_inbox_partitioning + + +```cs +var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.EnableInboxPartitioning = true; +``` +snippet source | anchor + ## PostgreSQL Messaging Transport diff --git a/docs/guide/durability/sagas.md b/docs/guide/durability/sagas.md index 5a0382928..295a21994 100644 --- a/docs/guide/durability/sagas.md +++ b/docs/guide/durability/sagas.md @@ -558,12 +558,52 @@ output was getting too verbose. `Saga` types are officially message handlers to still use the `public static void Configure(HandlerChain)` mechanism for one off configurations to every message handler method on the `Saga` like this: -snippet: sample_overriding_logging_on_saga + + +```cs +public class RevisionedSaga : Wolverine.Saga +{ + // This works just the same as on any other message handler + // type + public static void Configure(HandlerChain chain) + { + chain.ProcessingLogLevel = LogLevel.None; + chain.SuccessLogLevel = LogLevel.None; + } +``` +snippet source | anchor + Or if you wanted to just do it globally, something like this approach: -snippet: sample_turn_down_logging_for_sagas + + +```cs +public class TurnDownLoggingOnSagas : IChainPolicy +{ + public void Apply(IReadOnlyList chains, GenerationRules rules, IServiceContainer container) + { + foreach (var sagaChain in chains.OfType()) + { + sagaChain.ProcessingLogLevel = LogLevel.None; + sagaChain.SuccessLogLevel = LogLevel.None; + } + } +} +``` +snippet source | anchor + and register that policy something like this: -snippet: sample_configuring_chain_policy_on_sagas \ No newline at end of file + + +```cs +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Policies.Add(); + }).StartAsync(); +``` +snippet source | anchor + diff --git a/docs/guide/messaging/partitioning.md b/docs/guide/messaging/partitioning.md index d459e969b..764d02c00 100644 --- a/docs/guide/messaging/partitioning.md +++ b/docs/guide/messaging/partitioning.md @@ -164,7 +164,7 @@ opts.MessagePartitioning }); }); ``` -snippet source | anchor +snippet source | anchor The built in rules *at this point* include: diff --git a/docs/guide/messaging/transports/kafka.md b/docs/guide/messaging/transports/kafka.md index 8a4f640d8..cbeb63f3a 100644 --- a/docs/guide/messaging/transports/kafka.md +++ b/docs/guide/messaging/transports/kafka.md @@ -198,7 +198,7 @@ public static class KafkaInstrumentation } } ``` -snippet source | anchor +snippet source | anchor ## Connecting to Multiple Brokers @@ -229,7 +229,7 @@ using var host = await Host.CreateDefaultBuilder() // Other configuration }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor Note that the `Uri` scheme within Wolverine for any endpoints from a "named" Kafka broker is the name that you supply @@ -240,4 +240,21 @@ for the broker. So in the example above, you might see `Uri` values for `emea:// Hey, you might have an application that only consumes Kafka messages, but there are a *few* diagnostics in Wolverine that try to send messages. To completely eliminate that, you can disable all message sending in Wolverine like this: -snippet: sample_disable_all_kafka_sending \ No newline at end of file + + +```cs +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts + .UseKafka("localhost:9092") + + // Tell Wolverine that this application will never + // produce messages to turn off any diagnostics that might + // try to "ping" a topic and result in errors + .ConsumeOnly(); + + }).StartAsync(); +``` +snippet source | anchor + diff --git a/docs/guide/messaging/unknown.md b/docs/guide/messaging/unknown.md new file mode 100644 index 000000000..a3c5b45b1 --- /dev/null +++ b/docs/guide/messaging/unknown.md @@ -0,0 +1,108 @@ +# Unknown Messages + +When Wolverine receives a message from the outside world, it's keying off the [message type name](/guide/messages.html#message-type-name-or-alias) from the `Envelope` to +"know" what message type it's receiving and therefore, which handler(s) to execute. It's an imperfect world of course, +so it's perfectly possible that your system will receive a message from the outside world with a message type name that +your system does not recognize. + +Out of the box Wolverine will simply log that it received an unknown message type and discard the message, but there are +means to take additional actions on "missing handler" messages where Wolverine does not recognize the message type. + +## Move to the Dead Letter Queue + +You can declaratively tell Wolverine to persist every message received with an unknown message type name +to the dead letter queue with this flag: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.UseWolverine(opts => +{ + var connectionString = builder.Configuration.GetConnectionString("rabbit"); + opts.UseRabbitMq(connectionString).UseConventionalRouting(); + + // All unknown message types received should be placed into + // the proper dead letter queue mechanism + opts.UnknownMessageBehavior = UnknownMessageBehavior.DeadLetterQueue; +}); +``` +snippet source | anchor + + +The message will be moved to the dead letter queue mechanism for the listening endpoint where the message was received. + +## Custom Actions + +::: note +The missing handlers are additive, meaning that you can provide more than one and Wolverine will try to execute +each one that is registered for the missing handler behavior. +::: + +You can direct Wolverine to take custom actions on messages received with unknown message type names by providing +a custom implementation of this interface: + + + +```cs +namespace Wolverine; + +/// +/// Hook interface to receive notifications of envelopes received +/// that do not match any known handlers within the system +/// +public interface IMissingHandler +{ + /// + /// Executes for unhandled envelopes + /// + /// + /// + /// + ValueTask HandleAsync(IEnvelopeLifecycle context, IWolverineRuntime root); +} +``` +snippet source | anchor + + +Here's a made up sample that theoretically posts a message to a Slack room by sending a Wolverine message in response: + + + +```cs +public class MyCustomActionForMissingHandlers : IMissingHandler +{ + public ValueTask HandleAsync(IEnvelopeLifecycle context, IWolverineRuntime root) + { + var bus = new MessageBus(root); + return bus.PublishAsync(new PostInSlack("Incidents", + $"Got an unknown message with type '{context.Envelope.MessageType}' and id {context.Envelope.Id}")); + } +} +``` +snippet source | anchor + + +And simply registering that with your application's IoC container against the `IMissingHandler` interface like this: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.UseWolverine(opts => +{ + // configuration + opts.UnknownMessageBehavior = UnknownMessageBehavior.DeadLetterQueue; +}); + +builder.Services.AddSingleton(); +``` +snippet source | anchor + + +## Tracked Session Testing + +Just know that the [Tracked Session](/guide/testing.html#integration-testing-with-tracked-sessions) subsystem for integration +testing exposes a separate record collection for `NoHandlers` and reports when that happens through its output for hopefully +easy troubleshooting on test failures. + diff --git a/docs/tutorials/interop.md b/docs/tutorials/interop.md index f4d6db9a9..a7b68fdf1 100644 --- a/docs/tutorials/interop.md +++ b/docs/tutorials/interop.md @@ -145,7 +145,7 @@ public class OurKafkaJsonMapper : IKafkaEnvelopeMapper } } ``` -snippet source | anchor +snippet source | anchor Which is essentially how the built in "Raw JSON" mapper works in external transport mappers. In the envelope mapper above diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs index 20451666e..d1170a99a 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs @@ -1,6 +1,7 @@ using System.Data.Common; using Weasel.Core; using Wolverine.Persistence.Durability; +using Wolverine.Runtime.Interop; using Wolverine.Transports; using DbCommandBuilder = Weasel.Core.DbCommandBuilder; diff --git a/src/Samples/DocumentationSamples/MissingHandlerSample.cs b/src/Samples/DocumentationSamples/MissingHandlerSample.cs new file mode 100644 index 000000000..f429b5d45 --- /dev/null +++ b/src/Samples/DocumentationSamples/MissingHandlerSample.cs @@ -0,0 +1,42 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine; +using Wolverine.Runtime; + +namespace DocumentationSamples; + +public record PostInSlack(string Room, string Message); + +#region sample_MyCustomActionForMissingHandlers + +public class MyCustomActionForMissingHandlers : IMissingHandler +{ + public ValueTask HandleAsync(IEnvelopeLifecycle context, IWolverineRuntime root) + { + var bus = new MessageBus(root); + return bus.PublishAsync(new PostInSlack("Incidents", + $"Got an unknown message with type '{context.Envelope.MessageType}' and id {context.Envelope.Id}")); + } +} + +#endregion + +public static class ConfigureMissingHandlers +{ + public static async Task configure() + { + #region sample_registering_custom_missing_handler + + var builder = Host.CreateApplicationBuilder(); + builder.UseWolverine(opts => + { + // configuration + opts.UnknownMessageBehavior = UnknownMessageBehavior.DeadLetterQueue; + }); + + builder.Services.AddSingleton(); + + #endregion + } +} + diff --git a/src/Testing/CoreTests/Configuration/missing_handler_behavior.cs b/src/Testing/CoreTests/Configuration/missing_handler_behavior.cs new file mode 100644 index 000000000..4f915eff0 --- /dev/null +++ b/src/Testing/CoreTests/Configuration/missing_handler_behavior.cs @@ -0,0 +1,66 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Xunit; + +namespace CoreTests.Configuration; + +// This is really to test core Wolverine behavior +public class missing_handler_behavior +{ + + + [Fact] + public async Task no_registered_missing_handlers_for_default_behavior() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine() + .StartAsync(); + + host.Services.GetServices().ShouldBeEmpty(); + } + + [Fact] + public async Task no_registered_missing_handlers_move_to_dead_letter_queue() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UnknownMessageBehavior = UnknownMessageBehavior.DeadLetterQueue; + }) + .StartAsync(); + + host.Services.GetServices().ShouldContain(x => x is MoveUnknownMessageToDeadLetterQueue); + } + + [Fact] + public async Task reentrant_config_for_overloads() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UnknownMessageBehavior = UnknownMessageBehavior.DeadLetterQueue; + opts.UnknownMessageBehavior = UnknownMessageBehavior.LogOnly; + }) + .StartAsync(); + + host.Services.GetServices().ShouldBeEmpty(); + } + + [Fact] + public async Task reentrant_config_for_overloads_2() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UnknownMessageBehavior = UnknownMessageBehavior.DeadLetterQueue; + opts.UnknownMessageBehavior = UnknownMessageBehavior.LogOnly; + opts.UnknownMessageBehavior = UnknownMessageBehavior.DeadLetterQueue; + }) + .StartAsync(); + + host.Services.GetServices().ShouldContain(x => x is MoveUnknownMessageToDeadLetterQueue); + } + + +} + diff --git a/src/Testing/CoreTests/WolverineOptionsTests.cs b/src/Testing/CoreTests/WolverineOptionsTests.cs index 952bfe6e6..a931c8b7a 100644 --- a/src/Testing/CoreTests/WolverineOptionsTests.cs +++ b/src/Testing/CoreTests/WolverineOptionsTests.cs @@ -27,6 +27,12 @@ public void publish_agent_events_should_be_false_by_default() new WolverineOptions().Policies.PublishAgentEvents.ShouldBeFalse(); } + [Fact] + public void default_dead_letter_queue_behavior_is_discard() + { + new WolverineOptions().UnknownMessageBehavior.ShouldBe(UnknownMessageBehavior.LogOnly); + } + [Fact] public void default_service_location_policy_should_be_allowed_by_warn() { diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/moving_unknown_message_type_to_dlq.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/moving_unknown_message_type_to_dlq.cs new file mode 100644 index 000000000..5fca425ca --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/moving_unknown_message_type_to_dlq.cs @@ -0,0 +1,104 @@ +using IntegrationTests; +using JasperFx.Core; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Persistence.Durability.DeadLetterManagement; +using Wolverine.Postgresql; +using Wolverine.Runtime; +using Wolverine.Tracking; +using Wolverine.Transports.SharedMemory; +using Wolverine.Util; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests; + +public class moving_unknown_message_type_to_dlq : IAsyncLifetime +{ + private IHost _sender; + private IHost _receiver; + + public static async Task TestSample() + { + #region sample_unknown_messages_go_to_dead_letter_queue + + var builder = Host.CreateApplicationBuilder(); + builder.UseWolverine(opts => + { + var connectionString = builder.Configuration.GetConnectionString("rabbit"); + opts.UseRabbitMq(connectionString).UseConventionalRouting(); + + // All unknown message types received should be placed into + // the proper dead letter queue mechanism + opts.UnknownMessageBehavior = UnknownMessageBehavior.DeadLetterQueue; + }); + + #endregion + } + + public async Task InitializeAsync() + { + await SharedMemoryQueueManager.ClearAllAsync(); + + _sender = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq().AutoProvision().DisableDeadLetterQueueing(); + + opts.UnknownMessageBehavior = UnknownMessageBehavior.DeadLetterQueue; + + opts.PublishMessage().ToRabbitQueue("durable"); + }).StartAsync(); + + _receiver = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq().AutoProvision().DisableDeadLetterQueueing(); + + opts.UnknownMessageBehavior = UnknownMessageBehavior.DeadLetterQueue; + + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "missing"); + + // Forcing this to use the database backed DLQ + opts.ListenToRabbitQueue("durable").UseDurableInbox(); + }).StartAsync(); + + // Empty it out before you do anything here + await _receiver.RebuildAllEnvelopeStorageAsync(); + } + + public async Task DisposeAsync() + { + await _sender.StopAsync(); + await _receiver.StopAsync(); + } + + [Fact] + public async Task send_message_with_no_handler_to_durable() + { + var tracked = await _sender.TrackActivity() + .AlsoTrack(_receiver) + .Timeout(1.Minutes()) + .IncludeExternalTransports() + .SendMessageAndWaitAsync(new ToDurable("One")); + + var records = tracked.NoHandlers.Envelopes(); + records.Any(x => x.MessageType == typeof(ToDurable).ToMessageTypeName()).ShouldBeTrue(); + + var storage = _receiver.GetRuntime().Storage; + var deadLetters = await storage.DeadLetters.QueryAsync(new DeadLetterEnvelopeQuery(TimeRange.AllTime()), + CancellationToken.None); + + var envelope = deadLetters.Envelopes.Single(); + envelope.MessageType.ShouldBe(typeof(ToDurable).ToMessageTypeName()); + envelope.ExceptionType.ShouldBe("Wolverine.Runtime.Interop.UnknownMessageTypeNameException"); + envelope.ExceptionMessage.ShouldBe("Unknown message type: 'Wolverine.RabbitMQ.Tests.ToDurable'"); + + } +} + +public record ToInline(string Name); + +public record ToBuffered(string Name); + +public record ToDurable(string Name); diff --git a/src/Wolverine/IMissingHandler.cs b/src/Wolverine/IMissingHandler.cs index 29ff4eb5d..94c46fb98 100644 --- a/src/Wolverine/IMissingHandler.cs +++ b/src/Wolverine/IMissingHandler.cs @@ -1,8 +1,8 @@ using Wolverine.Runtime; - -namespace Wolverine; +using Wolverine.Runtime.Interop; #region sample_IMissingHandler +namespace Wolverine; /// /// Hook interface to receive notifications of envelopes received @@ -19,4 +19,12 @@ public interface IMissingHandler ValueTask HandleAsync(IEnvelopeLifecycle context, IWolverineRuntime root); } -#endregion \ No newline at end of file +#endregion + +internal class MoveUnknownMessageToDeadLetterQueue : IMissingHandler +{ + public async ValueTask HandleAsync(IEnvelopeLifecycle context, IWolverineRuntime root) + { + await context.MoveToDeadLetterQueueAsync(new UnknownMessageTypeNameException($"Unknown message type: '{context.Envelope!.MessageType}'")); + } +} \ No newline at end of file diff --git a/src/Wolverine/WolverineOptions.cs b/src/Wolverine/WolverineOptions.cs index 46b96bd23..9f8110f4f 100644 --- a/src/Wolverine/WolverineOptions.cs +++ b/src/Wolverine/WolverineOptions.cs @@ -7,6 +7,7 @@ using JasperFx.Core.Reflection; using JasperFx.Descriptors; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using Wolverine.Configuration; using Wolverine.Persistence; using Wolverine.Persistence.MultiTenancy; @@ -59,6 +60,21 @@ public enum WolverineMetricsMode Hybrid } +public enum UnknownMessageBehavior +{ + /// + /// Default behavior. Wolverine will log a message that there was no known handler + /// for the incoming message, but otherwise discard it + /// + LogOnly, + + /// + /// Wolverine will log a message that there was no known handler for the incoming message, + /// then move it to the appropriate dead letter queue for the receiving endpoint + /// + DeadLetterQueue +} + public class MetricsOptions { /// @@ -80,6 +96,7 @@ public sealed partial class WolverineOptions { private readonly List> _lazyActions = []; private AutoCreate? _autoBuildMessageStorageOnStartup; + private UnknownMessageBehavior _unknownMessageBehavior = UnknownMessageBehavior.LogOnly; public WolverineOptions() : this(null) { @@ -362,6 +379,25 @@ public IEnumerable FindEndpointsWithHandlerType(Type handlerType) // This helps govern some command line work internal bool LightweightMode { get; set; } + public UnknownMessageBehavior UnknownMessageBehavior + { + get => _unknownMessageBehavior; + set + { + _unknownMessageBehavior = value; + if (value == UnknownMessageBehavior.DeadLetterQueue) + { + Services.TryAddSingleton(); + } + else + { + Services.RemoveAll(x => + !x.IsKeyedService && x.ServiceType == typeof(IMissingHandler) && x.ImplementationType == + typeof(MoveUnknownMessageToDeadLetterQueue)); + } + } + } + internal void ReadJasperFxOptions(JasperFxOptions jasperfx) { ServiceName ??= jasperfx.ServiceName;