Skip to content

Commit

Permalink
New IConfigureLocalQueue mechanism for fine tuning local queues even …
Browse files Browse the repository at this point in the history
…when using sticky handler assignments. Closes GH-1213
  • Loading branch information
jeremydmiller committed Jan 16, 2025
1 parent d55890e commit 5289de1
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 9 deletions.
2 changes: 1 addition & 1 deletion docs/guide/durability/postgresql.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ that they are utilizing the transactional inbox and outbox. The PostgreSQL queue
```cs
opts.ListenToPostgresqlQueue("sender").BufferedInMemory();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/PostgresqlTests/Transport/compliance_tests.cs#L58-L62' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_setting_postgres_queue_to_buffered' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/PostgresqlTests/Transport/compliance_tests.cs#L64-L68' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_setting_postgres_queue_to_buffered' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Using this option just means that the PostgreSQL queues can be used for both sending or receiving with no integration
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/durability/sqlserver.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ that they are utilizing the transactional inbox and outbox. The Sql Server queue
```cs
opts.ListenToSqlServerQueue("sender").BufferedInMemory();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/SqlServerTests/Transport/compliance_tests.cs#L61-L65' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_setting_sql_server_queue_to_buffered' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/SqlServerTests/Transport/compliance_tests.cs#L67-L71' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_setting_sql_server_queue_to_buffered' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Using this option just means that the Sql Server queues can be used for both sending or receiving with no integration
Expand Down
14 changes: 10 additions & 4 deletions docs/guide/handlers/sticky.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ message as an input.
```cs
public class StickyMessage;
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L218-L222' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_stickymessage' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L233-L237' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_stickymessage' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

And we're going to handle that `StickyMessage` message separately with two different handler types:
Expand All @@ -51,7 +51,7 @@ public static class GreenStickyHandler
}
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L224-L244' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_sticky_handler_attribute' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L239-L259' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_sticky_handler_attribute' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

::: tip
Expand Down Expand Up @@ -79,7 +79,7 @@ using var host = await Host.CreateDefaultBuilder()
opts.ListenAtPort(4000).Named("blue");
}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L156-L166' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_named_listener_endpoint' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L171-L181' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_named_listener_endpoint' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

With all of that being said, the end result of the two `StickyMessage` handlers that are marked with `[StickyHandler]`
Expand Down Expand Up @@ -119,7 +119,13 @@ using var host = await Host.CreateDefaultBuilder()

}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L171-L189' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_sticky_handlers_by_endpoint_with_fluent_interface' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/sticky_message_handlers.cs#L186-L204' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_sticky_handlers_by_endpoint_with_fluent_interface' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Configuring Local Queues <Badge type="tip" text="3.7" />

There is a world of reasons why you might want to fine tune the behavior of local queues (sequential ordering? parallelism? circuit breakers?), but the
"sticky" handler usage did make it a little harder to configure the exact right local queue for a sticky handler. To alleviate that, see the
[IConfigureLocalQueue](/guide/messaging/transports/local.html#using-iconfigurelocalqueue-to-configure-local-queues) usage.


2 changes: 1 addition & 1 deletion docs/guide/messaging/transports/external-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ builder.UseWolverine(opts =>
.Sequential();
});
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs#L189-L252' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_external_database_messaging' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs#L190-L253' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_external_database_messaging' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

So a couple things to know:
Expand Down
56 changes: 56 additions & 0 deletions docs/guide/messaging/transports/local.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,62 @@ using var host = await Host.CreateDefaultBuilder()
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Samples/DocumentationSamples/EnqueueSamples.cs#L21-L45' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_local_queues' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Using IConfigureLocalQueue to Configure Local Queues <Badge type="tip" text="3.7" />

::: info
This feature was added in reaction to the newer "sticky" handler to local queue usage, but it's perfectly usable for
message types that are happily handled without any "sticky" handler configuration.
:::

The advent of ["sticky handlers"](/guide/handlers/sticky) or the [separated handler mode](/guide/handlers/#multiple-handlers-for-the-same-message-type) for better Wolverine usage in modular monoliths admittedly
made it a little harder to fine tune the local queue behavior for different message types or message handlers without understanding
the Wolverine naming conventions. To get back to leaning more on the type system, Wolverine introduced the static `IConfigureLocalQueue`
interface that can be implemented on any handler type to configure the local queue where that handler would run:

<!-- snippet: sample_IConfigureLocalQueue -->
<a id='snippet-sample_iconfigurelocalqueue'></a>
```cs
/// <summary>
/// Helps mark a handler to configure the local queue that its messages
/// would be routed to. It's probably only useful to use this with "sticky" handlers
/// that run on an isolated local queue
/// </summary>
public interface IConfigureLocalQueue
{
static abstract void Configure(LocalQueueConfiguration configuration);
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Wolverine/Configuration/IConfigureLocalQueue.cs#L5-L17' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_iconfigurelocalqueue' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

::: tip
Static interfaces can only be used on non-static types, so even if all your message handler *methods* are static, the
handler type itself cannot be static. Just a .NET quirk.
:::

To use this, just implement that interface on any message handler type:

<!-- snippet: sample_using_IConfigureLocalQueue -->
<a id='snippet-sample_using_iconfigurelocalqueue'></a>
```cs
public class MultipleMessage1Handler : IConfigureLocalQueue
{
public static void Handle(MultipleMessage message)
{

}

// This method is configuring the local queue that executes this
// handler to be strictly ordered
public static void Configure(LocalQueueConfiguration configuration)
{
configuration.Sequential();
}
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/configuring_local_queues.cs#L102-L119' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_iconfigurelocalqueue' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Durable Local Messages

The local worker queues can optionally be designated as "durable," meaning that local messages would be persisted until they can be successfully processed to provide a guarantee that the message will be successfully processed in the case of the running application faulting or having been shut down prematurely (assuming that other nodes are running or it's restarted later of course).
Expand Down
134 changes: 134 additions & 0 deletions src/Testing/CoreTests/Acceptance/configuring_local_queues.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
using JasperFx.Core.Reflection;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Wolverine.Attributes;
using Wolverine.Configuration;
using Wolverine.Tracking;
using Wolverine.Transports.Local;
using Xunit;

namespace CoreTests.Acceptance;

public class configuring_local_queues : IntegrationContext
{
public configuring_local_queues(DefaultApp @default) : base(@default)
{
}

[Fact]
public void apply_to_normal_non_sticky_default_routed_handler()
{
var runtime = Host.GetRuntime();
runtime.Endpoints.EndpointByName("Frank")
.ShouldBeOfType<LocalQueue>().Uri.ShouldBe(new Uri("local://coretests.acceptance.simplemessage/"));
}

[Fact]
public void apply_to_sticky_handlers()
{
var runtime = Host.GetRuntime();
runtime.Endpoints.EndpointByName("blue")
.ShouldBeOfType<LocalQueue>().ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(1);

runtime.Endpoints.EndpointByName("green")
.ShouldBeOfType<LocalQueue>().ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(1000);
}

[Fact]
public async Task use_with_separated_mode()
{
using var host = await new HostBuilder().UseWolverine(opts =>
{
opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
}).StartAsync();

var runtime = host.GetRuntime();
runtime.Endpoints.EndpointByName(typeof(MultipleMessage1Handler).FullNameInCode().ToLowerInvariant())
.ShouldBeOfType<LocalQueue>().ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(1);

runtime.Endpoints.EndpointByName(typeof(MultipleMessage2Handler).FullNameInCode().ToLowerInvariant())
.ShouldBeOfType<LocalQueue>().ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(1000);
}
}

public record SimpleMessage;

public class SimpleMessageHandler : IConfigureLocalQueue
{
public static void Configure(LocalQueueConfiguration configuration)
{
// Just got to do something to prove out the configuration
configuration.Named("Frank");
}

public static void Handle(SimpleMessage message)
{

}
}

public record StuckMessage;

[StickyHandler("blue")]
public class BlueStuckMessageHandler : IConfigureLocalQueue
{
public static void Configure(LocalQueueConfiguration configuration)
{
configuration.Sequential();
}

public static void Handle(StuckMessage message)
{

}
}

[StickyHandler("green")]
public class GreenStuckMessageHandler : IConfigureLocalQueue
{
public static void Configure(LocalQueueConfiguration configuration)
{
configuration.MaximumParallelMessages(1000);
}

public static void Handle(StuckMessage message)
{

}
}

public record MultipleMessage;

#region sample_using_IConfigureLocalQueue

public class MultipleMessage1Handler : IConfigureLocalQueue
{
public static void Handle(MultipleMessage message)
{

}

// This method is configuring the local queue that executes this
// handler to be strictly ordered
public static void Configure(LocalQueueConfiguration configuration)
{
configuration.Sequential();
}
}

#endregion

public class MultipleMessage2Handler : IConfigureLocalQueue
{
public static void Handle(MultipleMessage message)
{

}

public static void Configure(LocalQueueConfiguration configuration)
{
configuration.MaximumParallelMessages(1000);
}
}


8 changes: 6 additions & 2 deletions src/Wolverine/Configuration/IConfigureLocalQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

namespace Wolverine.Configuration;

#region sample_IConfigureLocalQueue

/// <summary>
/// Helps mark a handler to configure the local queue that its messages
/// would be routed to. It's probably only useful to use this with "sticky" handlers
/// that run on an isolated local queue
/// </summary>
public interface IConfigureLocalQueue
{
static abstract void Configure(LocalQueueConfiguration configuration);
}
static abstract void Configure(LocalQueueConfiguration configuration);
}

#endregion
12 changes: 12 additions & 0 deletions src/Wolverine/Runtime/Handlers/HandlerGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using Wolverine.Runtime.Scheduled;
using Wolverine.Runtime.Serialization;
using Wolverine.Transports;
using Wolverine.Transports.Local;
using Wolverine.Util;

namespace Wolverine.Runtime.Handlers;
Expand Down Expand Up @@ -309,6 +310,17 @@ IEnumerable<HandlerChain> explodeChains(HandlerChain chain)
foreach (var configuration in _configurations) configuration();

registerMessageTypes();

tryApplyLocalQueueConfiguration(options);
}

private void tryApplyLocalQueueConfiguration(WolverineOptions options)
{
var local = options.Transports.GetOrCreate<LocalTransport>();
foreach (var chain in Chains)
{
local.ApplyConfiguration(chain);
}
}

private void registerMessageTypes()
Expand Down
50 changes: 50 additions & 0 deletions src/Wolverine/Transports/Local/LocalTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Wolverine.Configuration;
using Wolverine.Runtime;
using Wolverine.Runtime.Agents;
using Wolverine.Runtime.Handlers;
using Wolverine.Runtime.Routing;
using Wolverine.Util;

Expand Down Expand Up @@ -234,4 +235,53 @@ internal LocalQueueConfiguration ConfigureQueueFor(Type messageType)

return configuration;
}

internal void ApplyConfiguration(HandlerChain chain)
{
// Gotta go recursive
foreach (var handlerChain in chain.ByEndpoint)
{
ApplyConfiguration(handlerChain);
}

var configured = chain.Handlers.Select(x => x.HandlerType)
.Where(x => x.CanBeCastTo(typeof(IConfigureLocalQueue))).ToArray();

if (!configured.Any()) return;

// Is it sticky?
if (chain.Endpoints.OfType<LocalQueue>().Any())
{
foreach (var handlerType in configured)
{
var applier = typeof(Applier<>).CloseAndBuildAs<IApplier>(handlerType);
foreach (var localQueue in chain.Endpoints.OfType<LocalQueue>())
{
applier.Apply(new LocalQueueConfiguration(localQueue));
}
}
}
else
{
var configuration = ConfigureQueueFor(chain.MessageType);
foreach (var handlerType in configured)
{
typeof(Applier<>).CloseAndBuildAs<IApplier>(handlerType).Apply(configuration);
}
}
}

private interface IApplier
{
void Apply(LocalQueueConfiguration configuration);
}

private class Applier<T> : IApplier where T : IConfigureLocalQueue
{
public void Apply(LocalQueueConfiguration configuration)
{
T.Configure(configuration);
}
}

}

0 comments on commit 5289de1

Please sign in to comment.