diff --git a/docs/guide/durability/postgresql.md b/docs/guide/durability/postgresql.md index ce2d5507..1278e481 100644 --- a/docs/guide/durability/postgresql.md +++ b/docs/guide/durability/postgresql.md @@ -117,7 +117,7 @@ that they are utilizing the transactional inbox and outbox. The PostgreSQL queue ```cs opts.ListenToPostgresqlQueue("sender").BufferedInMemory(); ``` -snippet source | anchor +snippet source | anchor Using this option just means that the PostgreSQL queues can be used for both sending or receiving with no integration diff --git a/docs/guide/durability/sqlserver.md b/docs/guide/durability/sqlserver.md index 5f080985..07da1097 100644 --- a/docs/guide/durability/sqlserver.md +++ b/docs/guide/durability/sqlserver.md @@ -91,7 +91,7 @@ that they are utilizing the transactional inbox and outbox. The Sql Server queue ```cs opts.ListenToSqlServerQueue("sender").BufferedInMemory(); ``` -snippet source | anchor +snippet source | anchor Using this option just means that the Sql Server queues can be used for both sending or receiving with no integration diff --git a/docs/guide/handlers/sticky.md b/docs/guide/handlers/sticky.md index ce025f86..41e22dc6 100644 --- a/docs/guide/handlers/sticky.md +++ b/docs/guide/handlers/sticky.md @@ -25,7 +25,7 @@ message as an input. ```cs public class StickyMessage; ``` -snippet source | anchor +snippet source | anchor And we're going to handle that `StickyMessage` message separately with two different handler types: @@ -51,7 +51,7 @@ public static class GreenStickyHandler } } ``` -snippet source | anchor +snippet source | anchor ::: tip @@ -79,7 +79,7 @@ using var host = await Host.CreateDefaultBuilder() opts.ListenAtPort(4000).Named("blue"); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor With all of that being said, the end result of the two `StickyMessage` handlers that are marked with `[StickyHandler]` @@ -119,7 +119,13 @@ using var host = await Host.CreateDefaultBuilder() }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor +## Configuring Local Queues + +There is a world of reasons why you might want to fine tune the behavior of local queues (sequential ordering? parallelism? circuit breakers?), but the +"sticky" handler usage did make it a little harder to configure the exact right local queue for a sticky handler. To alleviate that, see the +[IConfigureLocalQueue](/guide/messaging/transports/local.html#using-iconfigurelocalqueue-to-configure-local-queues) usage. + diff --git a/docs/guide/messaging/transports/external-tables.md b/docs/guide/messaging/transports/external-tables.md index bf3d4bea..204e8f85 100644 --- a/docs/guide/messaging/transports/external-tables.md +++ b/docs/guide/messaging/transports/external-tables.md @@ -74,7 +74,7 @@ builder.UseWolverine(opts => .Sequential(); }); ``` -snippet source | anchor +snippet source | anchor So a couple things to know: diff --git a/docs/guide/messaging/transports/local.md b/docs/guide/messaging/transports/local.md index 3375da73..b769a8ee 100644 --- a/docs/guide/messaging/transports/local.md +++ b/docs/guide/messaging/transports/local.md @@ -206,6 +206,62 @@ using var host = await Host.CreateDefaultBuilder() snippet source | anchor +## Using IConfigureLocalQueue to Configure Local Queues + +::: info +This feature was added in reaction to the newer "sticky" handler to local queue usage, but it's perfectly usable for +message types that are happily handled without any "sticky" handler configuration. +::: + +The advent of ["sticky handlers"](/guide/handlers/sticky) or the [separated handler mode](/guide/handlers/#multiple-handlers-for-the-same-message-type) for better Wolverine usage in modular monoliths admittedly +made it a little harder to fine tune the local queue behavior for different message types or message handlers without understanding +the Wolverine naming conventions. To get back to leaning more on the type system, Wolverine introduced the static `IConfigureLocalQueue` +interface that can be implemented on any handler type to configure the local queue where that handler would run: + + + +```cs +/// +/// Helps mark a handler to configure the local queue that its messages +/// would be routed to. It's probably only useful to use this with "sticky" handlers +/// that run on an isolated local queue +/// +public interface IConfigureLocalQueue +{ + static abstract void Configure(LocalQueueConfiguration configuration); +} +``` +snippet source | anchor + + +::: tip +Static interfaces can only be used on non-static types, so even if all your message handler *methods* are static, the +handler type itself cannot be static. Just a .NET quirk. +::: + +To use this, just implement that interface on any message handler type: + + + +```cs +public class MultipleMessage1Handler : IConfigureLocalQueue +{ + public static void Handle(MultipleMessage message) + { + + } + + // This method is configuring the local queue that executes this + // handler to be strictly ordered + public static void Configure(LocalQueueConfiguration configuration) + { + configuration.Sequential(); + } +} +``` +snippet source | anchor + + ## Durable Local Messages The local worker queues can optionally be designated as "durable," meaning that local messages would be persisted until they can be successfully processed to provide a guarantee that the message will be successfully processed in the case of the running application faulting or having been shut down prematurely (assuming that other nodes are running or it's restarted later of course). diff --git a/src/Testing/CoreTests/Acceptance/configuring_local_queues.cs b/src/Testing/CoreTests/Acceptance/configuring_local_queues.cs new file mode 100644 index 00000000..a440f0f3 --- /dev/null +++ b/src/Testing/CoreTests/Acceptance/configuring_local_queues.cs @@ -0,0 +1,134 @@ +using JasperFx.Core.Reflection; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine.Attributes; +using Wolverine.Configuration; +using Wolverine.Tracking; +using Wolverine.Transports.Local; +using Xunit; + +namespace CoreTests.Acceptance; + +public class configuring_local_queues : IntegrationContext +{ + public configuring_local_queues(DefaultApp @default) : base(@default) + { + } + + [Fact] + public void apply_to_normal_non_sticky_default_routed_handler() + { + var runtime = Host.GetRuntime(); + runtime.Endpoints.EndpointByName("Frank") + .ShouldBeOfType().Uri.ShouldBe(new Uri("local://coretests.acceptance.simplemessage/")); + } + + [Fact] + public void apply_to_sticky_handlers() + { + var runtime = Host.GetRuntime(); + runtime.Endpoints.EndpointByName("blue") + .ShouldBeOfType().ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(1); + + runtime.Endpoints.EndpointByName("green") + .ShouldBeOfType().ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(1000); + } + + [Fact] + public async Task use_with_separated_mode() + { + using var host = await new HostBuilder().UseWolverine(opts => + { + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + }).StartAsync(); + + var runtime = host.GetRuntime(); + runtime.Endpoints.EndpointByName(typeof(MultipleMessage1Handler).FullNameInCode().ToLowerInvariant()) + .ShouldBeOfType().ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(1); + + runtime.Endpoints.EndpointByName(typeof(MultipleMessage2Handler).FullNameInCode().ToLowerInvariant()) + .ShouldBeOfType().ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(1000); + } +} + +public record SimpleMessage; + +public class SimpleMessageHandler : IConfigureLocalQueue +{ + public static void Configure(LocalQueueConfiguration configuration) + { + // Just got to do something to prove out the configuration + configuration.Named("Frank"); + } + + public static void Handle(SimpleMessage message) + { + + } +} + +public record StuckMessage; + +[StickyHandler("blue")] +public class BlueStuckMessageHandler : IConfigureLocalQueue +{ + public static void Configure(LocalQueueConfiguration configuration) + { + configuration.Sequential(); + } + + public static void Handle(StuckMessage message) + { + + } +} + +[StickyHandler("green")] +public class GreenStuckMessageHandler : IConfigureLocalQueue +{ + public static void Configure(LocalQueueConfiguration configuration) + { + configuration.MaximumParallelMessages(1000); + } + + public static void Handle(StuckMessage message) + { + + } +} + +public record MultipleMessage; + +#region sample_using_IConfigureLocalQueue + +public class MultipleMessage1Handler : IConfigureLocalQueue +{ + public static void Handle(MultipleMessage message) + { + + } + + // This method is configuring the local queue that executes this + // handler to be strictly ordered + public static void Configure(LocalQueueConfiguration configuration) + { + configuration.Sequential(); + } +} + +#endregion + +public class MultipleMessage2Handler : IConfigureLocalQueue +{ + public static void Handle(MultipleMessage message) + { + + } + + public static void Configure(LocalQueueConfiguration configuration) + { + configuration.MaximumParallelMessages(1000); + } +} + + diff --git a/src/Wolverine/Configuration/IConfigureLocalQueue.cs b/src/Wolverine/Configuration/IConfigureLocalQueue.cs index 160c1c09..7ace1566 100644 --- a/src/Wolverine/Configuration/IConfigureLocalQueue.cs +++ b/src/Wolverine/Configuration/IConfigureLocalQueue.cs @@ -2,6 +2,8 @@ namespace Wolverine.Configuration; +#region sample_IConfigureLocalQueue + /// /// Helps mark a handler to configure the local queue that its messages /// would be routed to. It's probably only useful to use this with "sticky" handlers @@ -9,5 +11,7 @@ namespace Wolverine.Configuration; /// public interface IConfigureLocalQueue { - static abstract void Configure(LocalQueueConfiguration configuration); -} \ No newline at end of file + static abstract void Configure(LocalQueueConfiguration configuration); +} + +#endregion \ No newline at end of file diff --git a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs index 5780560a..01cf7f19 100644 --- a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs +++ b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs @@ -17,6 +17,7 @@ using Wolverine.Runtime.Scheduled; using Wolverine.Runtime.Serialization; using Wolverine.Transports; +using Wolverine.Transports.Local; using Wolverine.Util; namespace Wolverine.Runtime.Handlers; @@ -309,6 +310,17 @@ IEnumerable explodeChains(HandlerChain chain) foreach (var configuration in _configurations) configuration(); registerMessageTypes(); + + tryApplyLocalQueueConfiguration(options); + } + + private void tryApplyLocalQueueConfiguration(WolverineOptions options) + { + var local = options.Transports.GetOrCreate(); + foreach (var chain in Chains) + { + local.ApplyConfiguration(chain); + } } private void registerMessageTypes() diff --git a/src/Wolverine/Transports/Local/LocalTransport.cs b/src/Wolverine/Transports/Local/LocalTransport.cs index 81780785..58ed600d 100644 --- a/src/Wolverine/Transports/Local/LocalTransport.cs +++ b/src/Wolverine/Transports/Local/LocalTransport.cs @@ -4,6 +4,7 @@ using Wolverine.Configuration; using Wolverine.Runtime; using Wolverine.Runtime.Agents; +using Wolverine.Runtime.Handlers; using Wolverine.Runtime.Routing; using Wolverine.Util; @@ -234,4 +235,53 @@ internal LocalQueueConfiguration ConfigureQueueFor(Type messageType) return configuration; } + + internal void ApplyConfiguration(HandlerChain chain) + { + // Gotta go recursive + foreach (var handlerChain in chain.ByEndpoint) + { + ApplyConfiguration(handlerChain); + } + + var configured = chain.Handlers.Select(x => x.HandlerType) + .Where(x => x.CanBeCastTo(typeof(IConfigureLocalQueue))).ToArray(); + + if (!configured.Any()) return; + + // Is it sticky? + if (chain.Endpoints.OfType().Any()) + { + foreach (var handlerType in configured) + { + var applier = typeof(Applier<>).CloseAndBuildAs(handlerType); + foreach (var localQueue in chain.Endpoints.OfType()) + { + applier.Apply(new LocalQueueConfiguration(localQueue)); + } + } + } + else + { + var configuration = ConfigureQueueFor(chain.MessageType); + foreach (var handlerType in configured) + { + typeof(Applier<>).CloseAndBuildAs(handlerType).Apply(configuration); + } + } + } + + private interface IApplier + { + void Apply(LocalQueueConfiguration configuration); + } + + private class Applier : IApplier where T : IConfigureLocalQueue + { + public void Apply(LocalQueueConfiguration configuration) + { + T.Configure(configuration); + } + } + } \ No newline at end of file