diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index c1f5391bb..8cfccee00 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -80,7 +80,9 @@ const config: UserConfig = { {text: 'Modular Monoliths', link: '/tutorials/modular-monolith'}, {text: 'Event Sourcing and CQRS with Marten', link: '/tutorials/cqrs-with-marten'}, {text: 'Railway Programming with Wolverine', link: '/tutorials/railway-programming'}, - {text: 'Interoperability with Non-Wolverine Systems', link: '/tutorials/interop'} + {text: 'Interoperability with Non-Wolverine Systems', link: '/tutorials/interop'}, + {text: 'Leader Election and Agents', link: '/tutorials/leader-election'}, + {text: 'Dealing with Concurrency', link:' /tutorials/concurrency'} ] }, { @@ -185,6 +187,7 @@ const config: UserConfig = { {text: 'External Database Tables', link: '/guide/messaging/transports/external-tables'} ] }, + {text: 'Partitioned Sequential Messaging', link: '/guide/messaging/partitioning'}, {text: 'Endpoint Specific Operations', link: '/guide/messaging/endpoint-operations'}, {text: 'Broadcast to a Specific Topic', link: '/guide/messaging/broadcast-to-topic'}, {text: 'Message Expiration', link: '/guide/messaging/expiration'}, diff --git a/docs/guide/command-line.md b/docs/guide/command-line.md index ffd1e9888..e03a8ea25 100644 --- a/docs/guide/command-line.md +++ b/docs/guide/command-line.md @@ -1,5 +1,7 @@ # Command Line Integration +@[youtube](3C5bacH0akU) + With help from its [JasperFx](https://github.com/JasperFx) team mate [Oakton](https://jasperfx.github.io/oakton), Wolverine supports quite a few command line diagnostic and resource management tools. To get started, apply Oakton as the command line parser in your applications as shown in the last line of code in this sample application bootstrapping from Wolverine's [Getting Started](/tutorials/getting-started): diff --git a/docs/guide/durability/marten/ancillary-stores.md b/docs/guide/durability/marten/ancillary-stores.md index c8dc676f1..f4e5cc476 100644 --- a/docs/guide/durability/marten/ancillary-stores.md +++ b/docs/guide/durability/marten/ancillary-stores.md @@ -26,7 +26,7 @@ public interface IPlayerStore : IDocumentStore; public interface IThingStore : IDocumentStore; ``` -snippet source | anchor +snippet source | anchor We can add Wolverine integration to both through a similar call to `IntegrateWithWolverine()` as normal as shown below: @@ -76,7 +76,7 @@ theHost = await Host.CreateDefaultBuilder() opts.Services.AddResourceSetupOnStartup(); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor Let's specifically zoom in on this code from within the big sample above: @@ -89,7 +89,7 @@ Let's specifically zoom in on this code from within the big sample above: // for all modules for more efficient usage of resources opts.Durability.MessageStorageSchemaName = "wolverine"; ``` -snippet source | anchor +snippet source | anchor If you are using separate Marten document stores for different modules in your application, you can easily make Wolverine @@ -116,7 +116,7 @@ public static class PlayerMessageHandler } } ``` -snippet source | anchor +snippet source | anchor ::: info diff --git a/docs/guide/durability/marten/event-sourcing.md b/docs/guide/durability/marten/event-sourcing.md index c0eb8166a..c408c8284 100644 --- a/docs/guide/durability/marten/event-sourcing.md +++ b/docs/guide/durability/marten/event-sourcing.md @@ -894,5 +894,5 @@ public class when_transfering_money } } ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/extensions.md b/docs/guide/extensions.md index a99dfba35..1564fd37f 100644 --- a/docs/guide/extensions.md +++ b/docs/guide/extensions.md @@ -109,7 +109,7 @@ internal class DisableExternalTransports : IWolverineExtension } } ``` -snippet source | anchor +snippet source | anchor And that extension is just added to the application's IoC container at test bootstrapping time like this: @@ -123,7 +123,7 @@ public static IServiceCollection DisableAllExternalWolverineTransports(this ISer return services; } ``` -snippet source | anchor +snippet source | anchor In usage, the `IWolverineExtension` objects added to the IoC container are applied *after* the inner configuration diff --git a/docs/guide/handlers/error-handling.md b/docs/guide/handlers/error-handling.md index 8aa7608fa..1148c298e 100644 --- a/docs/guide/handlers/error-handling.md +++ b/docs/guide/handlers/error-handling.md @@ -1,5 +1,7 @@ # Error Handling +@[youtube](k5WdzL85kGs) + It's an imperfect world and almost inevitable that your Wolverine message handlers will occasionally throw exceptions as message handling fails. Maybe because a piece of infrastructure is down, maybe you get transient network issues, or maybe a database is overloaded. diff --git a/docs/guide/handlers/multi-tenancy.md b/docs/guide/handlers/multi-tenancy.md index fab8f915c..a4dcc6a3b 100644 --- a/docs/guide/handlers/multi-tenancy.md +++ b/docs/guide/handlers/multi-tenancy.md @@ -62,7 +62,7 @@ public static IEnumerable Handle(IncomingMessage message) TenantId = "one" }); ``` -snippet source | anchor +snippet source | anchor ## Referencing the TenantId diff --git a/docs/guide/http/endpoints.md b/docs/guide/http/endpoints.md index d8646315b..a36a9a3bb 100644 --- a/docs/guide/http/endpoints.md +++ b/docs/guide/http/endpoints.md @@ -174,7 +174,7 @@ public static OrderShipped Ship(ShipOrder command, Order order) return new OrderShipped(); } ``` -snippet source | anchor +snippet source | anchor ## JSON Handling diff --git a/docs/guide/http/marten.md b/docs/guide/http/marten.md index b7f5f7f6a..4cbdbe6c0 100644 --- a/docs/guide/http/marten.md +++ b/docs/guide/http/marten.md @@ -135,7 +135,7 @@ public static OrderShipped Ship(ShipOrder2 command, [Aggregate] Order order) return new OrderShipped(); } ``` -snippet source | anchor +snippet source | anchor Using this version of the "aggregate workflow", you no longer have to supply a command in the request body, so you could @@ -154,7 +154,7 @@ public static OrderShipped Ship3([Aggregate] Order order) return new OrderShipped(); } ``` -snippet source | anchor +snippet source | anchor A couple other notes: @@ -247,7 +247,7 @@ public class Order public bool IsShipped() => Shipped.HasValue; } ``` -snippet source | anchor +snippet source | anchor To append a single event to an event stream from an HTTP endpoint, you can use a return value like so: @@ -266,7 +266,7 @@ public static OrderShipped Ship(ShipOrder command, Order order) return new OrderShipped(); } ``` -snippet source | anchor +snippet source | anchor Or potentially append multiple events using the `Events` type as a return value like this sample: @@ -302,7 +302,7 @@ public static (OrderStatus, Events) Post(MarkItemReady command, Order order) return (new OrderStatus(order.Id, order.IsReadyToShip()), events); } ``` -snippet source | anchor +snippet source | anchor ### Responding with the Updated Aggregate @@ -328,7 +328,7 @@ public static (UpdatedAggregate, Events) ConfirmDifferent(ConfirmOrder command, ); } ``` -snippet source | anchor +snippet source | anchor ## Reading the Latest Version of an Aggregate @@ -347,7 +347,7 @@ an HTTP endpoint method, use the `[ReadAggregate]` attribute like this: [WolverineGet("/orders/latest/{id}")] public static Order GetLatest(Guid id, [ReadAggregate] Order order) => order; ``` -snippet source | anchor +snippet source | anchor If the aggregate doesn't exist, the HTTP request will stop with a 404 status code. diff --git a/docs/guide/http/problemdetails.md b/docs/guide/http/problemdetails.md index 179c4a096..b1632a529 100644 --- a/docs/guide/http/problemdetails.md +++ b/docs/guide/http/problemdetails.md @@ -137,7 +137,7 @@ public static ProblemDetails Before(IShipOrder command, Order order) return WolverineContinue.NoProblems; } ``` -snippet source | anchor +snippet source | anchor ## Within Message Handlers diff --git a/docs/guide/http/querystring.md b/docs/guide/http/querystring.md index c0b068e6b..044798a38 100644 --- a/docs/guide/http/querystring.md +++ b/docs/guide/http/querystring.md @@ -120,7 +120,7 @@ public static class QueryOrdersEndpoint } } ``` -snippet source | anchor +snippet source | anchor Because we've used the `[FromQuery]` attribute on a parameter argument that's not a simple type, Wolverine is trying to bind diff --git a/docs/guide/messaging/expiration.md b/docs/guide/messaging/expiration.md index d0b5d31d2..1f807730a 100644 --- a/docs/guide/messaging/expiration.md +++ b/docs/guide/messaging/expiration.md @@ -20,7 +20,7 @@ public DateTimeOffset? DeliverBy set => _deliverBy = value?.ToUniversalTime(); } ``` -snippet source | anchor +snippet source | anchor At runtime, Wolverine will: diff --git a/docs/guide/messaging/partitioning.md b/docs/guide/messaging/partitioning.md new file mode 100644 index 000000000..d459e969b --- /dev/null +++ b/docs/guide/messaging/partitioning.md @@ -0,0 +1,422 @@ +# Partitioned Sequential Messaging + +::: tip +Concurrency can be hard, especially anytime there is any element of a system like +the storage for an entity or event stream or saga that is sensitive to simultaneous writes. I won't tell +you *not* to worry about this because you absolutely should be concerned with concurrency, but fortunately +Wolverine has [some helpful functionality to help you manage concurrency in your system](/tutorials/concurrency). +::: + +"Partitioned Sequential Messaging" is a feature in Wolverine that tries to guarantee sequential processing +*within* groups of messages related to some sort of business domain entity within your system while also allowing +work to be processed in parallel for better throughput *between* groups of messages. + +At this point, Wolverine supports this feature for: + +1. Purely local processing within the current process +2. "Partitioning" the publishing of messages to external transports like Rabbit MQ or Amazon SQS over a range of queues where we have built + specific support for this feature +3. "Partitioning" the processing of messages received from any external transport within a single process + +## How It Works + +Let's jump right to a concrete example. Let's say your building an order management system, so you're processing +plenty of command messages against a single `Order`. You also expect -- or already know from testing or production issues +-- that in normal operation you can expect your system to receive messages simultaneously that impact the same +`Order` and that when that happens your system either throws up from concurrent writes to the same entity or event stream +or even worse, you possibly get incorrect or incomplete system state when changes from one command are overwritten by +changes from another command against the same `Order`. + +With all of that being said, let's utilize Wolverine's "Partitioned Sequential Messaging" feature to alleviate the concurrent +access to any single `Order`, while hopefully allowing work against different `Order` entities to happily proceed in parallel. + +First though, just to make this easy, let's make a little marker interface for our internal message types that will +make it easy for Wolverine to know which `Order` a given command relates to: + + + +```cs +public interface IOrderCommand +{ + public string OrderId { get; } +} + +public record ApproveOrder(string OrderId) : IOrderCommand; +public record CancelOrder(string OrderId) : IOrderCommand; +``` +snippet source | anchor + + +If we were only running our system on a single node so we only care about a single process, we can do this: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.UseWolverine(opts => +{ + opts.MessagePartitioning + // First, we're going to tell Wolverine how to determine the + // message group id + .ByMessage(x => x.OrderId) + + // Next we're setting up a publishing rule to local queues + .PublishToPartitionedLocalMessaging("orders", 4, topology => + { + topology.MessagesImplementing(); + + + // this feature exists + topology.MaxDegreeOfParallelism = PartitionSlots.Five; + + // Just showing you how to make additional Wolverine configuration + // for all the local queues built from this usage + topology.ConfigureQueues(queue => + { + queue.TelemetryEnabled(true); + }); + }); +}); +``` +snippet source | anchor + + +So let's talk about what we set up in the code above. First, we've taught Wolverine how to determine the group +id of any message that implements the `IOrderCommand` interface. Next we've told Wolverine to publish any message +implementing our `IOrderCommand` interface to one of four [local queues](/guide/messaging/transports/local) named "orders1", "orders2", "orders3", and "orders4." +At runtime, when you publish an `IOrderCommand` within the system, Wolverine will determine the group id of the new message through the `IOrderCommand.OrderId` rule we created +(it does get written to `Envelope.GroupId`). Once Wolverine has that `GroupId`, it needs to determine which of the "orders#" +queues to send the message, and the easiest way to explain this is really just to show the internal code: + + + +```cs +/// +/// Uses a combination of message grouping id rules and a deterministic hash +/// to predictably assign envelopes to a slot to help "shard" message publishing. +/// +/// +/// +/// +/// +public static int SlotForSending(this Envelope envelope, int numberOfSlots, MessagePartitioningRules rules) +{ + // This is where Wolverine determines the GroupId for the message + // Note that you can also explicitly set the GroupId + var groupId = rules.DetermineGroupId(envelope); + + // Pick one at random if we can't determine a group id, and has to be zero based + if (groupId == null) return Random.Shared.Next(1, numberOfSlots) - 1; + + // Deterministically choose a slot based on the GroupId, but try + // to more or less evenly distribute groups to the different + // slots + return Math.Abs(groupId.GetDeterministicHashCode() % numberOfSlots); +} +``` +snippet source | anchor + + +The code above manages publishing between the "orders1", "orders2", "orders3", and "orders4" queues. Inside of each of the +local queues Wolverine is also using yet another round of grouped message segregation with a slightly different mechanism sorting +mechanism to sort messages by their group id into separate, strictly ordered Channels. The `PartitionSlots` enum controls +the number of parallel channels processing messages within a single listener. + +::: info +From our early testing, we quickly found out that the second level of partitioning within listeners only distributed messages +relatively evenly when you had an odd number of slots within the listener, so we opted for an enum to limit the values here rather than trying to assert +on invalid even numbers. +::: + +Then end result is that you do create some parallelism between message processing while guaranteeing that messages from +within a single group id will be executed sequentially. + +In the end, you really need just 2-3 things: + +1. Some way for Wolverine to determine the group id of a message, assuming you aren't explicitly passing that to Wolverine +2. Potentially a publishing rule for partitioned sending +3. Potentially a rule on each listening endpoint to use partitioned handling + +## Inferred Grouping for Event Streams or Sagas + +There are some built in message group id rules that you can opt into as shown below: + + + +```cs +// Telling Wolverine how to assign a GroupId to a message, that we'll use +// to predictably sort into "slots" in the processing +opts.MessagePartitioning + + // This tells Wolverine to use the Saga identity as the group id for any message + // that impacts a Saga or the stream id of any command that is part of the "aggregate handler workflow" + // integration with Marten + .UseInferredMessageGrouping() + + .PublishToPartitionedLocalMessaging("letters", 4, topology => + { + topology.MessagesImplementing(); + topology.MaxDegreeOfParallelism = PartitionSlots.Five; + + topology.ConfigureQueues(queue => + { + queue.BufferedInMemory(); + }); + }); +``` +snippet source | anchor + + +The built in rules *at this point* include: + +* Using the Sage identity of a message that is handled by a [Stateful Saga](/guide/durability/sagas) +* Using the stream/aggregate id of messages that are part of the [Aggregate Handler Workflow](/guide/durability/marten/event-sourcing) integration with Marten + +## Specifying Grouping Rules + +Internally, Wolverine is using a list of implementations of this interface: + + + +```cs +/// +/// Strategy for determining the GroupId of a message +/// +public interface IGroupingRule +{ + bool TryFindIdentity(Envelope envelope, out string groupId); +} +``` +snippet source | anchor + + +Definitely note that these rules are fall through, and the order you declare the rules +are important. Also note that when you call into this syntax below it's combinatorial (just meaning that you +don't start over if you call into it multiple times): + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.UseWolverine(opts => +{ + opts.MessagePartitioning + // Use saga identity or aggregate handler workflow identity + // from messages as the group id + .UseInferredMessageGrouping() + + // First, we're going to tell Wolverine how to determine the + // message group id for any message type that can be + // cast to this interface. Also works for concrete types too + .ByMessage(x => x.OrderId) + + // Use the Envelope.TenantId as the message group id + // this could be valuable to partition work by tenant + .ByTenantId() + + // Use a custom rule implementing IGroupingRULE with explicit code to determine + // the group id + .ByRule(new MySpecialGroupingRule()); +}); +``` +snippet source | anchor + + +## Explicit Group Ids + +::: tip +Any explicitly specified group id will take precedence over the grouping rules in the previous section +::: + +You can also explicitly specify a group id for a message when you send or publish it through +`IMessageBus` like this: + + + +```cs +public static async Task SendMessageToGroup(IMessageBus bus) +{ + await bus.PublishAsync( + new ApproveInvoice("AAA"), + new() { GroupId = "agroup" }); +} +``` +snippet source | anchor + + +If you are using [cascaded messages](/guide/handlers/cascading) from your message handlers, there's an extension method helper +just as a convenience like this: + + + +```cs +public static IEnumerable Handle(ApproveInvoice command) +{ + yield return new PayInvoice(command.Id).WithGroupId("aaa"); +} +``` +snippet source | anchor + + +## Partitioned Publishing Locally + +::: tip +You will also need to set up message grouping rules for the message partitioning to function +::: + +If you need to use the partitioned sequential messaging just within a single process, the +`PublishToPartitionedLocalMessaging()` method shown below will set up both a publishing rule for multiple local queues and +partitioned processing for those local queues. + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.UseWolverine(opts => +{ + opts.MessagePartitioning + // First, we're going to tell Wolverine how to determine the + // message group id + .ByMessage(x => x.OrderId) + + // Next we're setting up a publishing rule to local queues + .PublishToPartitionedLocalMessaging("orders", 4, topology => + { + topology.MessagesImplementing(); + + + // this feature exists + topology.MaxDegreeOfParallelism = PartitionSlots.Five; + + // Just showing you how to make additional Wolverine configuration + // for all the local queues built from this usage + topology.ConfigureQueues(queue => + { + queue.TelemetryEnabled(true); + }); + }); +}); +``` +snippet source | anchor + + +## Partitioned Processing at any Endpoint + +You can add partitioned processing to any listening endpoint like this: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.UseWolverine(opts => +{ + opts.UseRabbitMq(); + + // You still need rules for determining the message group id + // of incoming messages! + opts.MessagePartitioning + .ByMessage(x => x.OrderId); + + // We're going to listen + opts.ListenToRabbitQueue("incoming") + // To really keep our system from processing Order related + // messages for the same order id concurrently, we'll + // make it so that only one node actively processes messages + // from this queue + .ExclusiveNodeWithParallelism() + + // We're going to partition the message processing internally + // based on the message group id while allowing up to 7 parallel + // messages to be executed at once + .PartitionProcessingByGroupId(PartitionSlots.Seven); +}); +``` +snippet source | anchor + + +## Partitioned Publishing to External Transports + +::: info +Wolverine supports the Azure Service Bus concept of [session identifiers](/guide/messaging/transports/azureservicebus/session-identifiers) that effectively provides the same +benefits as this feature. +::: + +::: tip +Even if your system is not messaging to any other systems, using this mechanism will help distribute work across an +application cluster while guaranteeing that messages within a group id are processed sequentially and still allowing for +parallelism between message groups. +::: + +At this point Wolverine has direct support for partitioned routing to Rabbit MQ or Amazon SQS. Note that in both +of the following examples, Wolverine is both setting up publishing rules out to these queues, and also configuring +listeners for the queues. Beyond that, Wolverine is making each queue be "exclusive," meaning that only one node +within a cluster is actively listening and processing messages from each partitioned queue at any one time. + +For Rabbit MQ: + + + +```cs +// opts is the WolverineOptions from within an Add/UseWolverine() call + +// Telling Wolverine how to assign a GroupId to a message, that we'll use +// to predictably sort into "slots" in the processing +opts.MessagePartitioning.ByMessage(x => x.Id.ToString()); + +// This is creating Rabbit MQ queues named "letters1" etc. +opts.MessagePartitioning.PublishToShardedRabbitQueues("letters", 4, topology => +{ + topology.MessagesImplementing(); + topology.MaxDegreeOfParallelism = PartitionSlots.Five; + + topology.ConfigureSender(x => + { + // just to show that you can do this... + x.DeliverWithin(5.Minutes()); + }); + topology.ConfigureListening(x => x.BufferedInMemory()); +}); +``` +snippet source | anchor + + +And for Amazon SQS: + + + +```cs +// Telling Wolverine how to assign a GroupId to a message, that we'll use +// to predictably sort into "slots" in the processing +opts.MessagePartitioning.ByMessage(x => x.Id.ToString()); + +opts.MessagePartitioning.PublishToShardedAmazonSqsQueues("letters", 4, topology => +{ + topology.MessagesImplementing(); + topology.MaxDegreeOfParallelism = PartitionSlots.Five; + + topology.ConfigureListening(x => x.BufferedInMemory().MessageBatchSize(10)); + +}); +``` +snippet source | anchor + + +## Partitioning Messages Received from External Systems + +::: warning +Brute force, no points for style, explicit coding ahead! +::: + +If you are receiving messages from an external source that will be vulnerable to concurrent access problems when the messages +are executed, but you either do not want to make the external system publish the group ids or have no ability to make the +upstream system care about your own internal group id details, you can simply relay the received messages back out +to a partitioned message topology owned by your system. + +Using Amazon SQS as our transport, lets say that we're receiving messages from the external system at one queue like this: + +Hey folks, more coming soon. Hopefully before Wolverine 5.0. + +Watch this issue: https://github.com/JasperFx/wolverine/issues/1728 + + + diff --git a/docs/guide/messaging/transports/azureservicebus/scheduled.md b/docs/guide/messaging/transports/azureservicebus/scheduled.md index d5c70c677..88966ec8c 100644 --- a/docs/guide/messaging/transports/azureservicebus/scheduled.md +++ b/docs/guide/messaging/transports/azureservicebus/scheduled.md @@ -56,5 +56,5 @@ using var host = Host.CreateDefaultBuilder() }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/messaging/transports/azureservicebus/session-identifiers.md b/docs/guide/messaging/transports/azureservicebus/session-identifiers.md index 530330a64..959272b18 100644 --- a/docs/guide/messaging/transports/azureservicebus/session-identifiers.md +++ b/docs/guide/messaging/transports/azureservicebus/session-identifiers.md @@ -81,12 +81,9 @@ public static IEnumerable Handle(IncomingMessage message) yield return new Message3().ScheduleToGroup("one", 5.Minutes()); // Long hand - yield return new Message4().WithDeliveryOptions(new DeliveryOptions - { - GroupId = "one" - }); + yield return new Message4().WithDeliveryOptions(new() { GroupId = "one" }); } ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/guide/messaging/transports/sqs/message-attributes.md b/docs/guide/messaging/transports/sqs/message-attributes.md index 8593764ab..1a292add9 100644 --- a/docs/guide/messaging/transports/sqs/message-attributes.md +++ b/docs/guide/messaging/transports/sqs/message-attributes.md @@ -50,7 +50,6 @@ using var host = await Host.CreateDefaultBuilder() }).StartAsync(); ``` snippet source | anchor - Once you’ve opted in, those attributes are available in the dictionary passed to `ISqsEnvelopeMapper.ReadEnvelopeData`. From there, you can stash them in `Envelope.Headers`, set correlation IDs, or just ignore them. @@ -65,4 +64,4 @@ Once you’ve opted in, those attributes are available in the dictionary passed ::: info That’s it. If you’ve already got a custom mapper, you can now wire in SQS attributes directly without having to bend over backwards with the AWS SDK. -::: \ No newline at end of file +::: diff --git a/docs/guide/runtime.md b/docs/guide/runtime.md index d6374380a..eeb797739 100644 --- a/docs/guide/runtime.md +++ b/docs/guide/runtime.md @@ -245,13 +245,36 @@ interface like so: /// Models a constantly running background process within a Wolverine /// node cluster /// -public interface IAgent : IHostedService +public interface IAgent : IHostedService // Standard .NET interface for background services { /// /// Unique identification for this agent within the Wolverine system /// Uri Uri { get; } + // Not really used for anything real *yet*, but + // hopefully becomes something useful for CritterWatch + // health monitoring + AgentStatus Status { get; } +} +``` +snippet source | anchor + +```cs +/// +/// Models a constantly running background process within a Wolverine +/// node cluster +/// +public interface IAgent : IHostedService // Standard .NET interface for background services +{ + /// + /// Unique identification for this agent within the Wolverine system + /// + Uri Uri { get; } + + // Not really used for anything real *yet*, but + // hopefully becomes something useful for CritterWatch + // health monitoring AgentStatus Status { get; } } @@ -296,7 +319,7 @@ public enum AgentStatus Paused } ``` -snippet source | anchor +snippet source | anchor With related groups of agents built and assigned by IoC-registered implementations of this interface: diff --git a/docs/guide/testing.md b/docs/guide/testing.md index 44027c641..88c04b355 100644 --- a/docs/guide/testing.md +++ b/docs/guide/testing.md @@ -8,6 +8,10 @@ See Jeremy's blog post [How Wolverine allows for easier testing](https://jeremyd Also see [Wolverine Best Practices](/tutorials/best-practices) for other helpful tips. +And this: + +@[youtube](ODSAGAllsxw) + ## Integration Testing with Tracked Sessions ::: tip @@ -130,6 +134,10 @@ public async Task using_tracked_sessions_advanced(IHost otherWolverineSystem) // This is actually helpful if you are testing for error handling // functionality in your system .DoNotAssertOnExceptionsDetected() + + // Hey, just in case failure acks are getting into your testing session + // and you do not care for the tests, tell Wolverine to ignore them + .IgnoreFailureAcks() // Again, this is testing against processes, with another IHost .WaitForMessageToBeReceivedAt(otherWolverineSystem) @@ -141,7 +149,7 @@ public async Task using_tracked_sessions_advanced(IHost otherWolverineSystem) overdrawn.AccountId.ShouldBe(debitAccount.AccountId); } ``` -snippet source | anchor +snippet source | anchor The samples shown above inlcude `Sent` message records, but there are more properties available in the `TrackedSession` object. @@ -281,7 +289,7 @@ public class When_message_is_sent : IAsyncLifetime public async Task DisposeAsync() => await _host.StopAsync(); } ``` -snippet source | anchor +snippet source | anchor As you can see, we just have to start our application, attach a tracked session to it, and then wait for the message to be published. This way, we can test the whole process of the application, from the file change to the message publication, in a single test. diff --git a/docs/public/leader-election-diagram.png b/docs/public/leader-election-diagram.png new file mode 100644 index 000000000..d9b74c000 Binary files /dev/null and b/docs/public/leader-election-diagram.png differ diff --git a/docs/public/leader-election.webp b/docs/public/leader-election.webp new file mode 100644 index 000000000..a75e9fd4e Binary files /dev/null and b/docs/public/leader-election.webp differ diff --git a/docs/public/wolverines-wizard-of-oz.png b/docs/public/wolverines-wizard-of-oz.png new file mode 100644 index 000000000..f335e8c30 Binary files /dev/null and b/docs/public/wolverines-wizard-of-oz.png differ diff --git a/docs/tutorials/concurrency.md b/docs/tutorials/concurrency.md new file mode 100644 index 000000000..65ef46662 --- /dev/null +++ b/docs/tutorials/concurrency.md @@ -0,0 +1,142 @@ +# Dealing with Concurrency + +![Lions and tigers and bears, oh my!](/wolverines-wizard-of-oz.png) + +With a little bit of research today -- and unfortunately my own experience -- here's a list of *some* of the problems that +can be caused by concurrent message processing in your system trying to access or modify the same resources or data: + +* Race conditions +* [Deadlocks](https://en.wikipedia.org/wiki/Deadlock) +* Consistency errors when multiple threads may be overwriting the same data and some changes get lost +* Out of order processing that may lead to erroneous results +* Exceptions from tools like Marten that helpfully try to stop concurrent changes through [optimistic concurrency](https://en.wikipedia.org/wiki/Optimistic_concurrency_control) + +Because these issues are so common in the kind of systems you would want to use a tool like Wolverine on in the first place, +the Wolverine community has invested quite heavily in features to help you manage concurrent access in your system. + +## Error Retries on Concurrency Errors + +If you don't expect many concurrency exceptions, you can probably get away with some kind of optimistic concurrency. Using +the [aggregate handler workflow](/guide/durability/marten/event-sourcing) integration with Marten as an example, there is some built in optimistic concurrency +in Marten just to protect your system from simultaneous writes to the same event stream. In the case when Marten determines +that *something* else has written to an event stream between your command handling starting and it trying to commit changes, +Marten will throw the `JasperFx.ConcurrencyException`. + +If we're doing simplistic optimistic checks, we might be perfectly fine with a global error handler that simply [retries +any failure](/guide/handlers/error-handling) due to this exception a few times: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.UseWolverine(opts => +{ + opts + // On optimistic concurrency failures from Marten + .OnException() + .RetryWithCooldown(100.Milliseconds(), 250.Milliseconds(), 500.Milliseconds()) + .Then.MoveToErrorQueue(); +}); +``` +snippet source | anchor + + +Of course though, sometimes you are opting into a more stringent form of optimistic concurrency where the handler should +fail fast if an event stream has advanced beyond a specific version number, as in the usage of this command message: + +```csharp +public record MarkItemReady(Guid OrderId, string ItemName, int Version); +``` + +In that case, there's absolutely no value in retrying the message, so we should use a different error handling policy to +move that message off immediately like one of these: + + + +```cs +public static class MarkItemReadyHandler +{ + // This will let us specify error handling policies specific + // to only this message handler + public static void Configure(HandlerChain chain) + { + // Can't ever process this message, so send it directly + // to the DLQ + // Do not pass Go, do not collect $200... + chain.OnException() + .MoveToErrorQueue(); + + // Or instead... + // Can't ever process this message, so just throw it away + // Do not pass Go, do not collect $200... + chain.OnException() + .Discard(); + } + + public static IEnumerable Post( + MarkItemReady command, + + // Wolverine + Marten will assert that the Order stream + // in question has not advanced from command.Version + [WriteAggregate] Order order) + { + // process the message and emit events + yield break; + } +} +``` +snippet source | anchor + + +## Exclusive Locks or Serializable Transactions + +You can try to deal with concurrency problems by utilizing whatever database tooling you're using for +whatever exclusive locks or serializable transaction support they might have. The integration with Marten has +an option for exclusive locks with the "Aggregate Handler Workflow." With EF Core, you should be able to opt into starting +your own serializable transaction. + +The Wolverine team considers these approaches to maybe a necessary evil, but hopefully a temporary solution. We would +probably recommend in most cases that you protect your system from concurrent access through selective queueing as much as +possible as discussed in the next section. + +## Using Queueing + +In many cases you can use queueing of some sort to reduce concurrent access to sensitive resources within your system. +The most draconian way to do this is to say that all messages in a given queue will be executed single file in strict +order on one single node within your application like so: + + + +```cs +var builder = Host.CreateApplicationBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq(); + + // Wolverine will *only* listen to this queue + // on one single node and process messages in strict + // order + opts.ListenToRabbitQueue("control").ListenWithStrictOrdering(); + + opts.Publish(x => + { + // Just keying off a made up marker interface + x.MessagesImplementing(); + x.ToRabbitQueue("control"); + }); + }); +``` +snippet source | anchor + + +The strict ordering usage definitely limits the throughput in your system while largely eliminating issues due to concurrency. +This option is useful for fast processing messages where you may be coordinating long running work throughout the rest of +your system. This has proven useful in file ingestion processes or systems that have to manage long running processes +in other nodes. + +More likely though, to both protect against concurrent access against resources that are prone to issues with concurrent access +*and* allow for greater throughput, you may want to reach for either: + +* Session Identifier and FIFO queue support for Azure Service Bus +* Wolverine's [Partitioned Sequential Messaging](https://wolverinefx.net/guide/messaging/transports/azureservicebus/session-identifiers.html) feature introduced in Wolverine 5.0 that was designed specifically to alleviate problems with concurrency within + Wolverine systems. diff --git a/docs/tutorials/cqrs-with-marten.md b/docs/tutorials/cqrs-with-marten.md index b5c404cf4..28dbc5acc 100644 --- a/docs/tutorials/cqrs-with-marten.md +++ b/docs/tutorials/cqrs-with-marten.md @@ -13,6 +13,8 @@ This guide assumes some familiarity with Event Sourcing nomenclature, but if you of development, see [Understanding Event Sourcing with Marten](https://martendb.io/events/learning.html) from the Marten documentation. ::: +@[youtube](U9zTGdo0Ps8) + Let's get the entire "Critter Stack" (Wolverine + [Marten](https://martendb.io)) assembled and build a system using CQRS with Event Sourcing! We'll be using the [IncidentService](https://github.com/jasperfx/wolverine/tree/main/src/Samples/IncidentService) example service to show an example of using Wolverine with Marten in a headless diff --git a/docs/tutorials/index.md b/docs/tutorials/index.md index 4ff44a257..73cf266ec 100644 --- a/docs/tutorials/index.md +++ b/docs/tutorials/index.md @@ -1,11 +1,13 @@ # Wolverine Tutorials -| Tutorial | Description | -|----------------------------------------------------------|------------------------------------------------------------------------------------------------------------| -| [Wolverine as Mediator](/tutorials/mediator) | Learn how to use Wolverine as a mediator tool within an ASP.Net Core or other application | -| [Ping/Pong Messaging with Rabbit MQ](/tutorials/ping-pong) | Basic tutorial on asynchronous messaging with Rabbit MQ | +| Tutorial | Description | +|--------------------------------------------------------------|------------------------------------------------------------------------------------------------------------| +| [Wolverine as Mediator](/tutorials/mediator) | Learn how to use Wolverine as a mediator tool within an ASP.Net Core or other application | +| [Ping/Pong Messaging with Rabbit MQ](/tutorials/ping-pong) | Basic tutorial on asynchronous messaging with Rabbit MQ | | [Vertical Slice Architecture](./vertical-slice-architecture) | How Wolverine can be used for more effective vertical slice architecture style development | -| [Modular Monolith Architecture](./modular-monolith) | Learn how best to use Wolverine inside of "Modular Monolith" architectures | -| [CQRS and Event Sourcing with Marten](./cqrs-with-marten) | Utilize the full "Critter Stack" for a very productive development experience | -| [Railway Programming](./railway-programming) | Wolverine builds in some very light weight Railway Programming inspired abilities | -|[Interoperability with Non-Wolverine Systems](./interop) | Everything you need to know to make Wolverine play nicely and exchange messages with non-Wolverine systems | +| [Modular Monolith Architecture](./modular-monolith) | Learn how best to use Wolverine inside of "Modular Monolith" architectures | +| [CQRS and Event Sourcing with Marten](./cqrs-with-marten) | Utilize the full "Critter Stack" for a very productive development experience | +| [Railway Programming](./railway-programming) | Wolverine builds in some very light weight Railway Programming inspired abilities | +| [Interoperability with Non-Wolverine Systems](./interop) | Everything you need to know to make Wolverine play nicely and exchange messages with non-Wolverine systems | +| [Leader Election and Agents](./leader-election) | Learn about Wolverine's internal leader election and how to write your own "sticky" agent family | +| [Dealing with Concurrency](./concurrency) | Dealing with concurrency can be hard, but Wolverine has plenty of tools to help you manage it | diff --git a/docs/tutorials/leader-election.md b/docs/tutorials/leader-election.md new file mode 100644 index 000000000..788826aae --- /dev/null +++ b/docs/tutorials/leader-election.md @@ -0,0 +1,403 @@ +# Leader Election and Agents + +![Who's in charge?](/leader-election.webp) + +Wolverine has a couple important features that enable Wolverine to distribute stateful, background work by assigning +running agents to certain running nodes within an application cluster. To do so, Wolverine has a built in [leader election](https://en.wikipedia.org/wiki/Leader_election) +feature so that it can make one single node run a "leadership agent" that continuously ensures that all known and supported +agents are running within the system on a single node. + +Here's an illustration of that work distribution: + +![Work Distribution across Nodes](/leader-election-diagram.png) + +Within Wolverine itself, there are a couple types of "agents" that Wolverine distributes: + +1. The ["durability agents"](/guide/durability/) that poll against message stores for any stranded inbox or outbox messages that might need to + be recovered and pushed along. Wolverine runs exactly one agent for each message store in the system, and distributes these + across the cluster +2. "Exclusive Listeners" within Wolverine when you direct Wolverine to only listen to a queue, topic, or message subscription + on a single node. This happens when you use the [strictly ordered listening](/guide/messaging/listeners.html#strictly-ordered-listeners) option. +3. In conjunction with [Marten](https://martendb.io), the [Wolverine managed projection and subscription distribution](/guide/durability/marten/distribution) uses Wolverine's agent assignment + capability to make sure each projection or subscription is running on exactly one node. + +## Enabling Leader Election + +Leader election is on by default in Wolverine **if** you have any type of message persistence configured for your +application and some mechanism for cross node communication. First though, let's talk about message persistence. It could be by PostgreSQL: + + + +```cs +var builder = WebApplication.CreateBuilder(args); +var connectionString = builder.Configuration.GetConnectionString("postgres"); + +builder.Host.UseWolverine(opts => +{ + // Setting up Postgresql-backed message storage + // This requires a reference to Wolverine.Postgresql + opts.PersistMessagesWithPostgresql(connectionString); + + // Other Wolverine configuration +}); + +// This is rebuilding the persistent storage database schema on startup +// and also clearing any persisted envelope state +builder.Host.UseResourceSetupOnStartup(); + +var app = builder.Build(); + +// Other ASP.Net Core configuration... + +// Using JasperFx opens up command line utilities for managing +// the message storage +return await app.RunJasperFxCommands(args); +``` +snippet source | anchor + + +or by SQL Server: + + + +```cs +var builder = WebApplication.CreateBuilder(args); +var connectionString = builder.Configuration.GetConnectionString("sqlserver"); + +builder.Host.UseWolverine(opts => +{ + // Setting up Sql Server-backed message storage + // This requires a reference to Wolverine.SqlServer + opts.PersistMessagesWithSqlServer(connectionString); + + // Other Wolverine configuration +}); + +// This is rebuilding the persistent storage database schema on startup +// and also clearing any persisted envelope state +builder.Host.UseResourceSetupOnStartup(); + +var app = builder.Build(); + +// Other ASP.Net Core configuration... + +// Using JasperFx opens up command line utilities for managing +// the message storage +return await app.RunJasperFxCommands(args); +``` +snippet source | anchor + + +or through the Marten integration: + + + +```cs +// Adding Marten +builder.Services.AddMarten(opts => + { + var connectionString = builder.Configuration.GetConnectionString("Marten"); + opts.Connection(connectionString); + opts.DatabaseSchemaName = "orders"; + }) + + // Adding the Wolverine integration for Marten. + .IntegrateWithWolverine(); +``` +snippet source | anchor + + +or by RavenDb: + + + +```cs +var builder = Host.CreateApplicationBuilder(); + +// You'll need a reference to RavenDB.DependencyInjection +// for this one +builder.Services.AddRavenDbDocStore(raven => +{ + // configure your RavenDb connection here +}); + +builder.UseWolverine(opts => +{ + // That's it, nothing more to see here + opts.UseRavenDbPersistence(); + + // The RavenDb integration supports basic transactional + // middleware just fine + opts.Policies.AutoApplyTransactions(); +}); + +// continue with your bootstrapping... +``` +snippet source | anchor + + +Next, we need to have some kind of mechanism for cross node communication within Wolverine in the form +of control queues for each node. When Wolverine bootstraps, it uses the message persistence to save +information about the new node including a `Uri` for a control endpoint where other Wolverine nodes should +send messages to "control" agent assignments. + +If you're using any of the message persistence options above, there's a fallback mechanism using the associated +databases to act as a simplistic message queue between nodes. For better results though, some of the transports in Wolverine +can instead use a non-durable queue for each node that will probably provide for better results. At the time this guide +was written, the [Rabbit MQ transport](/guide/messaging/transports/rabbitmq/) and the [Azure Service Bus transport](/guide/messaging/transports/azureservicebus/) support this feature. + +## Disabling Leader Election + +If you want to disable leader election and all the cross node traffic, or maybe if you just want to optimize automated +testing scenarios by making a newly launched process automatically start up all possible agents immediately, you can use +the `DurabilityMode.Solo` setting as shown below: + + + +```cs +var builder = Host.CreateApplicationBuilder(); + +builder.UseWolverine(opts => +{ + opts.Services.AddMarten("some connection string") + + // This adds quite a bit of middleware for + // Marten + .IntegrateWithWolverine(); + + // You want this maybe! + opts.Policies.AutoApplyTransactions(); + + if (builder.Environment.IsDevelopment()) + { + // But wait! Optimize Wolverine for usage as + // if there would never be more than one node running + opts.Durability.Mode = DurabilityMode.Solo; + } +}); + +using var host = builder.Build(); +await host.StartAsync(); +``` +snippet source | anchor + + +For testing, you also have this helper: + + + +```cs +// This is bootstrapping the actual application using +// its implied Program.Main() set up +// For non-Alba users, this is using IWebHostBuilder +Host = await AlbaHost.For(x => +{ + x.ConfigureServices(services => + { + // Override the Wolverine configuration in the application + // to run the application in "solo" mode for faster + // testing cold starts + services.RunWolverineInSoloMode(); + + // And just for completion, disable all Wolverine external + // messaging transports + services.DisableAllExternalWolverineTransports(); + }); +}); +``` +snippet source | anchor + + +Likewise, any other `DurabilityMode` setting than `Balanced` (the default) will +disable leader election. + +## Writing Your Own Agent Family + +To write your own family of "sticky" agents and use Wolverine to distribute them across an application cluster, +you'll first need to make implementations of this interface: + + + +```cs +/// +/// Models a constantly running background process within a Wolverine +/// node cluster +/// +public interface IAgent : IHostedService // Standard .NET interface for background services +{ + /// + /// Unique identification for this agent within the Wolverine system + /// + Uri Uri { get; } + + // Not really used for anything real *yet*, but + // hopefully becomes something useful for CritterWatch + // health monitoring + AgentStatus Status { get; } +} +``` +snippet source | anchor + +```cs +/// +/// Models a constantly running background process within a Wolverine +/// node cluster +/// +public interface IAgent : IHostedService // Standard .NET interface for background services +{ + /// + /// Unique identification for this agent within the Wolverine system + /// + Uri Uri { get; } + + // Not really used for anything real *yet*, but + // hopefully becomes something useful for CritterWatch + // health monitoring + AgentStatus Status { get; } +} + +public class CompositeAgent : IAgent +{ + private readonly List _agents; + public Uri Uri { get; } + + public CompositeAgent(Uri uri, IEnumerable agents) + { + Uri = uri; + _agents = agents.ToList(); + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + foreach (var agent in _agents) + { + await agent.StartAsync(cancellationToken); + } + + Status = AgentStatus.Started; + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + foreach (var agent in _agents) + { + await agent.StopAsync(cancellationToken); + } + + Status = AgentStatus.Started; + } + + public AgentStatus Status { get; private set; } = AgentStatus.Stopped; +} + +public enum AgentStatus +{ + Started, + Stopped, + Paused +} +``` +snippet source | anchor + + +Note that you could use [BackgroundService](https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-9.0&tabs=visual-studio) as a base class. + +The `Uri` property just needs to be unique and match up with our next service interface. Wolverine uses that `Uri` as a +unique identifier to track where and whether the known agents are executing. + +The next service is the actual distributor. To plug into Wolverine, you need to build an implementation of this service: + + + +```cs +/// +/// Pluggable model for managing the assignment and execution of stateful, "sticky" +/// background agents on the various nodes of a running Wolverine cluster +/// +public interface IAgentFamily +{ + /// + /// Uri scheme for this family of agents + /// + string Scheme { get; } + + /// + /// List of all the possible agents by their identity for this family of agents + /// + /// + ValueTask> AllKnownAgentsAsync(); + + /// + /// Create or resolve the agent for this family + /// + /// + /// + /// + ValueTask BuildAgentAsync(Uri uri, IWolverineRuntime wolverineRuntime); + + /// + /// All supported agent uris by this node instance + /// + /// + ValueTask> SupportedAgentsAsync(); + + /// + /// Assign agents to the currently running nodes when new nodes are detected or existing + /// nodes are deactivated + /// + /// + /// + ValueTask EvaluateAssignmentsAsync(AssignmentGrid assignments); +} +``` +snippet source | anchor + + +In this case, you can plug custom `IAgentFamily` strategies into Wolverine by just registering a concrete service in +your DI container against that `IAgentFamily` interface (`services.AddSingleton();`). +Wolverine does a simple `IServiceProvider.GetServices()` during its bootstrapping to find them. + +As you can probably guess, the `Scheme` should be unique, and the `Uri` structure needs to be unique across all of your agents. +`EvaluateAssignmentsAsync()` is your hook to create distribution strategies, with a simple “just distribute these things evenly across my cluster” +strategy possible like this example from Wolverine itself: + +```csharp +public ValueTask EvaluateAssignmentsAsync(AssignmentGrid assignments) +{ + assignments.DistributeEvenly(Scheme); + return ValueTask.CompletedTask; +} +``` +If you go looking for it, the equivalent in Wolverine’s distribution of Marten projections and subscriptions is a +tiny bit more complicated in that it uses knowledge of node capabilities to support blue/green semantics to +only distribute work to the servers that “know” how to use particular agents +(like version 3 of a projection that doesn’t exist on “blue” nodes): + +```csharp +public ValueTask EvaluateAssignmentsAsync(AssignmentGrid assignments) +{ + assignments.DistributeEvenlyWithBlueGreenSemantics(SchemeName); + return new ValueTask(); +} +``` + +The `AssignmentGrid` tells you the current state of your application in terms of which node is the leader, what +all the currently running nodes are, and which agents are running on which nodes. Beyond the even distribution, +the `AssignmentGrid` has fine grained API methods to start, stop, or reassign individual agents to specific running nodes. + +To wrap this up, I’m trying to guess at the questions you might have and see if I can cover all the bases: + +* **Is some kind of persistence necessary?** Yes, absolutely. Wolverine has to have some way to “know” what nodes are running and which agents are really running on each node. +* **How does Wolverine do health checks for each node?** If you look in the wolverine_nodes table when using PostgreSQL or Sql Server, you’ll see a heartbeat column with a timestamp. Each Wolverine application is running a polling operation that updates its heartbeat timestamp and also checks that there is a known leader node. In normal shutdown, Wolverine tries to gracefully mark the current node as offline and send a message to the current leader node if there is one telling the leader that the node is shutting down. In real world usage though, Kubernetes or who knows what is frequently killing processes without a clean shutdown. In that case, the leader node will be able to detect stale nodes that are offline, eject them from the node persistence, and redistribute agents. +* **Can Wolverine switch over the leadership role?** Yes, and that should be relatively quick. Plus Wolverine would keep trying to start a leader election if none is found. But yet, it’s an imperfect world where things can go wrong and there will 100% be the ability to either kickstart or assign the leader role from the forthcoming CritterWatch user interface. +* **How does the leadership election work?** Crudely and relatively effectively. All of the storage mechanics today have some kind of sequential node number assignment for all newly persisted nodes. In a kind of simplified “Bully Algorithm,” Wolverine will always try to send “try assume leadership” messages to the node with the lowest sequential node number which will always be the longest running node. When a node does try to take leadership, it uses whatever kind of global, advisory lock function the current persistence uses to get sole access to write the leader node assignment to itself, but will back out if the current node detects from storage that the leadership is already running on another active node. + + + + + + + + + + diff --git a/docs/tutorials/modular-monolith.md b/docs/tutorials/modular-monolith.md index 424bea81f..1a8c88727 100644 --- a/docs/tutorials/modular-monolith.md +++ b/docs/tutorials/modular-monolith.md @@ -1,5 +1,7 @@ # Modular Monoliths +@[youtube](JSnBe7n-CNI) + ::: info Wolverine's mantra is "low code ceremony," and the modular monolith approach comes with a mountain of temptation for a certain kind of software architect to try out a world of potentially harmful high ceremony coding techniques. @@ -448,7 +450,7 @@ by using this setting: // for all modules for more efficient usage of resources opts.Durability.MessageStorageSchemaName = "wolverine"; ``` -snippet source | anchor +snippet source | anchor By setting any value for `WolverineOptions.Durability.MessageStorageSchemaName`, Wolverine will use that value for the database schema diff --git a/src/Http/WolverineWebApi/Marten/Orders.cs b/src/Http/WolverineWebApi/Marten/Orders.cs index bba689663..3cd60d712 100644 --- a/src/Http/WolverineWebApi/Marten/Orders.cs +++ b/src/Http/WolverineWebApi/Marten/Orders.cs @@ -1,3 +1,4 @@ +using JasperFx; using JasperFx.Events; using Marten; using Marten.Events; @@ -5,9 +6,11 @@ using Marten.Pagination; using Microsoft.AspNetCore.Mvc; using Wolverine.Attributes; +using Wolverine.ErrorHandling; using Wolverine.Http; using Wolverine.Http.Marten; using Wolverine.Marten; +using Wolverine.Runtime.Handlers; namespace WolverineWebApi.Marten; @@ -364,4 +367,39 @@ public static Task> Query( } +#endregion + +#region sample_showing_concurrency_exception_moving_directly_to_DLQ + +public static class MarkItemReadyHandler +{ + // This will let us specify error handling policies specific + // to only this message handler + public static void Configure(HandlerChain chain) + { + // Can't ever process this message, so send it directly + // to the DLQ + // Do not pass Go, do not collect $200... + chain.OnException() + .MoveToErrorQueue(); + + // Or instead... + // Can't ever process this message, so just throw it away + // Do not pass Go, do not collect $200... + chain.OnException() + .Discard(); + } + + public static IEnumerable Post( + MarkItemReady command, + + // Wolverine + Marten will assert that the Order stream + // in question has not advanced from command.Version + [WriteAggregate] Order order) + { + // process the message and emit events + yield break; + } +} + #endregion \ No newline at end of file diff --git a/src/Persistence/MartenTests/concurrency_resilient_sharded_processing.cs b/src/Persistence/MartenTests/concurrency_resilient_sharded_processing.cs index 4970fd0f1..93f3e88e6 100644 --- a/src/Persistence/MartenTests/concurrency_resilient_sharded_processing.cs +++ b/src/Persistence/MartenTests/concurrency_resilient_sharded_processing.cs @@ -65,7 +65,7 @@ public async Task hammer_it_with_lots_of_messages_against_buffered() .PublishToPartitionedLocalMessaging("letters", 4, topology => { topology.MessagesImplementing(); - topology.MaxDegreeOfParallelism = ShardSlots.Five; + topology.MaxDegreeOfParallelism = PartitionSlots.Five; topology.ConfigureQueues(queue => { @@ -101,22 +101,31 @@ public async Task hammer_it_with_lots_of_messages_against_buffered_with_inferred m.DatabaseSchemaName = "letters"; m.DisableNpgsqlLogging = true; }).IntegrateWithWolverine(); - - + + + #region sample_inferred_message_group_id + // Telling Wolverine how to assign a GroupId to a message, that we'll use // to predictably sort into "slots" in the processing opts.MessagePartitioning + + // This tells Wolverine to use the Saga identity as the group id for any message + // that impacts a Saga or the stream id of any command that is part of the "aggregate handler workflow" + // integration with Marten .UseInferredMessageGrouping() + .PublishToPartitionedLocalMessaging("letters", 4, topology => { topology.MessagesImplementing(); - topology.MaxDegreeOfParallelism = ShardSlots.Five; + topology.MaxDegreeOfParallelism = PartitionSlots.Five; topology.ConfigureQueues(queue => { queue.BufferedInMemory(); }); }); + + #endregion }).StartAsync(); @@ -155,7 +164,7 @@ public async Task hammer_it_with_lots_of_messages_against_buffered_and_sharded_m opts.MessagePartitioning.PublishToPartitionedLocalMessaging("letters", 4, topology => { topology.MessagesImplementing(); - topology.MaxDegreeOfParallelism = ShardSlots.Five; + topology.MaxDegreeOfParallelism = PartitionSlots.Five; topology.ConfigureQueues(queue => { @@ -204,10 +213,9 @@ public async Task hammer_it_with_lots_of_messages_against_durable() // This is the magic sauce that shards the processing // by GroupId, which would be the StreamId.ToString() in // most cases in your usage - .ShardListeningByGroupId(ShardSlots.Five) + .PartitionProcessingByGroupId(PartitionSlots.Five) - .UseDurableInbox() - .MaximumParallelMessages(10); + .UseDurableInbox(); }); }).StartAsync(); diff --git a/src/Persistence/OrderEventSourcingSample/Program.cs b/src/Persistence/OrderEventSourcingSample/Program.cs index 6585e9067..eef169afb 100644 --- a/src/Persistence/OrderEventSourcingSample/Program.cs +++ b/src/Persistence/OrderEventSourcingSample/Program.cs @@ -11,6 +11,8 @@ // Not 100% necessary, but enables some extra command line diagnostics builder.Host.ApplyJasperFxExtensions(); +#region sample_using_the_marten_persistence_integration + // Adding Marten builder.Services.AddMarten(opts => { @@ -22,6 +24,8 @@ // Adding the Wolverine integration for Marten. .IntegrateWithWolverine(); +#endregion + #region sample_configure_global_exception_rules builder.Host.UseWolverine(opts => diff --git a/src/Persistence/RavenDbTests/Code.cs b/src/Persistence/RavenDbTests/Code.cs deleted file mode 100644 index 8ff1f50cf..000000000 --- a/src/Persistence/RavenDbTests/Code.cs +++ /dev/null @@ -1,53 +0,0 @@ -// -#pragma warning disable -using Raven.Client.Documents; - -namespace Internal.Generated.WolverineHandlers -{ - // START: CreateTodoHandler1536167811 - public class CreateTodoHandler1536167811 : Wolverine.Runtime.Handlers.MessageHandler - { - private readonly Raven.Client.Documents.IDocumentStore _documentStore; - - public CreateTodoHandler1536167811(Raven.Client.Documents.IDocumentStore documentStore) - { - _documentStore = documentStore; - } - - - - public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation) - { - // The actual message body - var createTodo = (Wolverine.ComplianceTests.CreateTodo)context.Envelope.Message; - - - // Open a new document session - // message context to support the outbox functionality - using var asyncDocumentSession = _documentStore.OpenAsyncSession(); - context.EnlistInOutbox(new Wolverine.RavenDb.Internals.RavenDbEnvelopeTransaction(asyncDocumentSession, context)); - - // The actual message execution - var outgoing1 = Wolverine.ComplianceTests.TodoHandler.Handle(createTodo); - - if (outgoing1 != null) - { - await asyncDocumentSession.StoreAsync(outgoing1.Entity, cancellation).ConfigureAwait(false); - } - - - // Commit any outstanding RavenDb changes - await asyncDocumentSession.SaveChangesAsync(cancellation).ConfigureAwait(false); - - - // Have to flush outgoing messages just in case Marten did nothing because of https://github.com/JasperFx/wolverine/issues/536 - await context.FlushOutgoingMessagesAsync().ConfigureAwait(false); - - } - - } - - // END: CreateTodoHandler1536167811 - - -} \ No newline at end of file diff --git a/src/Samples/DocumentationSamples/ConcurrencyExamples.cs b/src/Samples/DocumentationSamples/ConcurrencyExamples.cs new file mode 100644 index 000000000..5597a5afd --- /dev/null +++ b/src/Samples/DocumentationSamples/ConcurrencyExamples.cs @@ -0,0 +1,35 @@ +using Microsoft.Extensions.Hosting; +using Wolverine; +using Wolverine.RabbitMQ; + +namespace DocumentationSamples; + +public interface IControlMessage; + +public class ConcurrencyExamples +{ + public static async Task configure_strict_ordering() + { + #region sample_using_strict_ordering_for_control_queue + + var builder = Host.CreateApplicationBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq(); + + // Wolverine will *only* listen to this queue + // on one single node and process messages in strict + // order + opts.ListenToRabbitQueue("control").ListenWithStrictOrdering(); + + opts.Publish(x => + { + // Just keying off a made up marker interface + x.MessagesImplementing(); + x.ToRabbitQueue("control"); + }); + }); + + #endregion + } +} \ No newline at end of file diff --git a/src/Samples/DocumentationSamples/ExceptionHandling.cs b/src/Samples/DocumentationSamples/ExceptionHandling.cs index 4e4a2ef4e..271b6eea3 100644 --- a/src/Samples/DocumentationSamples/ExceptionHandling.cs +++ b/src/Samples/DocumentationSamples/ExceptionHandling.cs @@ -1,3 +1,4 @@ +using JasperFx; using JasperFx.Core; using Microsoft.Extensions.Hosting; using Wolverine; @@ -9,6 +10,23 @@ public class ExceptionHandling; public static class AppWithErrorHandling { + public static async Task concurrency_retries() + { + #region sample_simple_retries_on_concurrency_exception + + var builder = Host.CreateApplicationBuilder(); + builder.UseWolverine(opts => + { + opts + // On optimistic concurrency failures from Marten + .OnException() + .RetryWithCooldown(100.Milliseconds(), 250.Milliseconds(), 500.Milliseconds()) + .Then.MoveToErrorQueue(); + }); + + #endregion + } + public static async Task sample() { #region sample_AppWithErrorHandling diff --git a/src/Samples/DocumentationSamples/PartitioningSamples.cs b/src/Samples/DocumentationSamples/PartitioningSamples.cs new file mode 100644 index 000000000..de54466d3 --- /dev/null +++ b/src/Samples/DocumentationSamples/PartitioningSamples.cs @@ -0,0 +1,150 @@ +using Microsoft.Extensions.Hosting; +using Wolverine; +using Wolverine.Attributes; +using Wolverine.Configuration; +using Wolverine.RabbitMQ; +using Wolverine.Runtime.Partitioning; + +namespace DocumentationSamples; + +public class PartitioningSamples +{ + public static async Task listening_with_partitioned_processing() + { + #region sample_configuring_partitioned_processing_on_any_listener + + var builder = Host.CreateApplicationBuilder(); + builder.UseWolverine(opts => + { + opts.UseRabbitMq(); + + // You still need rules for determining the message group id + // of incoming messages! + opts.MessagePartitioning + .ByMessage(x => x.OrderId); + + // We're going to listen + opts.ListenToRabbitQueue("incoming") + // To really keep our system from processing Order related + // messages for the same order id concurrently, we'll + // make it so that only one node actively processes messages + // from this queue + .ExclusiveNodeWithParallelism() + + // We're going to partition the message processing internally + // based on the message group id while allowing up to 7 parallel + // messages to be executed at once + .PartitionProcessingByGroupId(PartitionSlots.Seven); + }); + + #endregion + } + + public static async Task configure_local_partitioning() + { + #region sample_opting_into_local_partitioned_routing + + var builder = Host.CreateApplicationBuilder(); + builder.UseWolverine(opts => + { + opts.MessagePartitioning + // First, we're going to tell Wolverine how to determine the + // message group id + .ByMessage(x => x.OrderId) + + // Next we're setting up a publishing rule to local queues + .PublishToPartitionedLocalMessaging("orders", 4, topology => + { + topology.MessagesImplementing(); + + + // this feature exists + topology.MaxDegreeOfParallelism = PartitionSlots.Five; + + // Just showing you how to make additional Wolverine configuration + // for all the local queues built from this usage + topology.ConfigureQueues(queue => + { + queue.TelemetryEnabled(true); + }); + }); + }); + + #endregion + } + + public class MySpecialGroupingRule : IGroupingRule + { + public bool TryFindIdentity(Envelope envelope, out string groupId) + { + throw new NotImplementedException(); + } + } + + public static async Task configuring_message_grouping_rules() + { + #region sample_configuring_message_grouping_rules + + var builder = Host.CreateApplicationBuilder(); + builder.UseWolverine(opts => + { + opts.MessagePartitioning + // Use saga identity or aggregate handler workflow identity + // from messages as the group id + .UseInferredMessageGrouping() + + // First, we're going to tell Wolverine how to determine the + // message group id for any message type that can be + // cast to this interface. Also works for concrete types too + .ByMessage(x => x.OrderId) + + // Use the Envelope.TenantId as the message group id + // this could be valuable to partition work by tenant + .ByTenantId() + + // Use a custom rule implementing IGroupingRULE with explicit code to determine + // the group id + .ByRule(new MySpecialGroupingRule()); + }); + + #endregion + } + + #region sample_send_message_with_group_id + + public static async Task SendMessageToGroup(IMessageBus bus) + { + await bus.PublishAsync( + new ApproveInvoice("AAA"), + new() { GroupId = "agroup" }); + } + + #endregion +} + +public record PayInvoice(string Id); + +[WolverineIgnore] +public static class ApproveInvoiceHandler +{ + #region sample_using_with_group_id_as_cascading_message + + public static IEnumerable Handle(ApproveInvoice command) + { + yield return new PayInvoice(command.Id).WithGroupId("aaa"); + } + + #endregion +} + +#region sample_order_commands_for_partitioning + +public interface IOrderCommand +{ + public string OrderId { get; } +} + +public record ApproveOrder(string OrderId) : IOrderCommand; +public record CancelOrder(string OrderId) : IOrderCommand; + +#endregion \ No newline at end of file diff --git a/src/Samples/DocumentationSamples/TestingSupportSamples.cs b/src/Samples/DocumentationSamples/TestingSupportSamples.cs index 60bfa3116..b33549fbf 100644 --- a/src/Samples/DocumentationSamples/TestingSupportSamples.cs +++ b/src/Samples/DocumentationSamples/TestingSupportSamples.cs @@ -165,6 +165,10 @@ public async Task using_tracked_sessions_advanced(IHost otherWolverineSystem) // This is actually helpful if you are testing for error handling // functionality in your system .DoNotAssertOnExceptionsDetected() + + // Hey, just in case failure acks are getting into your testing session + // and you do not care for the tests, tell Wolverine to ignore them + .IgnoreFailureAcks() // Again, this is testing against processes, with another IHost .WaitForMessageToBeReceivedAt(otherWolverineSystem) diff --git a/src/Samples/DocumentationSamples/using_group_ids.cs b/src/Samples/DocumentationSamples/using_group_ids.cs index 4985e8923..fa81386b4 100644 --- a/src/Samples/DocumentationSamples/using_group_ids.cs +++ b/src/Samples/DocumentationSamples/using_group_ids.cs @@ -16,10 +16,7 @@ public static IEnumerable Handle(IncomingMessage message) yield return new Message3().ScheduleToGroup("one", 5.Minutes()); // Long hand - yield return new Message4().WithDeliveryOptions(new DeliveryOptions - { - GroupId = "one" - }); + yield return new Message4().WithDeliveryOptions(new() { GroupId = "one" }); } #endregion diff --git a/src/Testing/CoreTests/Runtime/Partitioning/EnvelopeShardingExtensionsTests.cs b/src/Testing/CoreTests/Runtime/Partitioning/PartitionedMessagingExtensionsTests.cs similarity index 95% rename from src/Testing/CoreTests/Runtime/Partitioning/EnvelopeShardingExtensionsTests.cs rename to src/Testing/CoreTests/Runtime/Partitioning/PartitionedMessagingExtensionsTests.cs index 1a31e9a02..738b29108 100644 --- a/src/Testing/CoreTests/Runtime/Partitioning/EnvelopeShardingExtensionsTests.cs +++ b/src/Testing/CoreTests/Runtime/Partitioning/PartitionedMessagingExtensionsTests.cs @@ -5,12 +5,12 @@ namespace CoreTests.Runtime.Partitioning; -public class EnvelopeShardingExtensionsTests +public class PartitionedMessagingExtensionsTests { private readonly ITestOutputHelper _output; private readonly MessagePartitioningRules theRules; - public EnvelopeShardingExtensionsTests(ITestOutputHelper output) + public PartitionedMessagingExtensionsTests(ITestOutputHelper output) { _output = output; theRules = new MessagePartitioningRules(new()); diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/concurrency_resilient_sharded_processing.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/concurrency_resilient_sharded_processing.cs index 69827cfc7..2142db4c0 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/concurrency_resilient_sharded_processing.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/concurrency_resilient_sharded_processing.cs @@ -59,10 +59,7 @@ public async Task hammer_it_with_lots_of_messages_against_buffered() opts.Discovery.DisableConventionalDiscovery().IncludeType(typeof(LetterMessageHandler)); - // Telling Wolverine how to assign a GroupId to a message, that we'll use - // to predictably sort into "slots" in the processing - opts.MessagePartitioning.ByMessage(x => x.Id.ToString()); - + opts.Services.AddMarten(m => { m.Connection(Servers.PostgresConnectionString); @@ -70,14 +67,24 @@ public async Task hammer_it_with_lots_of_messages_against_buffered() m.DisableNpgsqlLogging = true; }).IntegrateWithWolverine(); + opts.ListenToSqsQueue("from_external"); + + #region sample_partitioned_publishing_through_amazon_sqs + + // Telling Wolverine how to assign a GroupId to a message, that we'll use + // to predictably sort into "slots" in the processing + opts.MessagePartitioning.ByMessage(x => x.Id.ToString()); + opts.MessagePartitioning.PublishToShardedAmazonSqsQueues("letters", 4, topology => { topology.MessagesImplementing(); - topology.MaxDegreeOfParallelism = ShardSlots.Five; + topology.MaxDegreeOfParallelism = PartitionSlots.Five; topology.ConfigureListening(x => x.BufferedInMemory().MessageBatchSize(10)); }); + + #endregion }).StartAsync(); var tracked = await host diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsTransportExtensions.cs b/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsTransportExtensions.cs index 4a888df2c..d813322b6 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsTransportExtensions.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsTransportExtensions.cs @@ -208,7 +208,7 @@ public static MessagePartitioningRules PublishToShardedAmazonSqsQueues(this Mess { rules.AddPublishingTopology((opts, _) => { - var topology = new PartitionedMessageTopologyWithQueues(opts, ShardSlots.Five, baseName, numberOfEndpoints); + var topology = new PartitionedMessageTopologyWithQueues(opts, PartitionSlots.Five, baseName, numberOfEndpoints); topology.ConfigureListening(x => {}); configure(topology); topology.AssertValidity(); diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/PartitionedMessageTopologyWithQueues.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/PartitionedMessageTopologyWithQueues.cs index 249b3c7a8..57392cb91 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/PartitionedMessageTopologyWithQueues.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/PartitionedMessageTopologyWithQueues.cs @@ -5,9 +5,9 @@ namespace Wolverine.AmazonSqs.Internal; public class PartitionedMessageTopologyWithQueues : PartitionedMessageTopology { - public PartitionedMessageTopologyWithQueues(WolverineOptions options, ShardSlots? listeningSlots, string baseName, int numberOfEndpoints) : base(options, listeningSlots, baseName, numberOfEndpoints) + public PartitionedMessageTopologyWithQueues(WolverineOptions options, PartitionSlots? listeningSlots, string baseName, int numberOfEndpoints) : base(options, listeningSlots, baseName, numberOfEndpoints) { - MaxDegreeOfParallelism = ShardSlots.Five; + MaxDegreeOfParallelism = PartitionSlots.Five; } protected override Endpoint buildEndpoint(WolverineOptions options, string name) diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/concurrency_resilient_sharded_processing.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/concurrency_resilient_sharded_processing.cs index 47fb6d2f1..f88ae77d4 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/concurrency_resilient_sharded_processing.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/concurrency_resilient_sharded_processing.cs @@ -1,6 +1,7 @@ using System.Diagnostics; using IntegrationTests; using JasperFx; +using JasperFx.Core; using Marten; using Microsoft.Extensions.Hosting; using Shouldly; @@ -58,9 +59,7 @@ public async Task hammer_it_with_lots_of_messages_against_buffered() opts.Discovery.DisableConventionalDiscovery().IncludeType(typeof(LetterMessageHandler)); - // Telling Wolverine how to assign a GroupId to a message, that we'll use - // to predictably sort into "slots" in the processing - opts.MessagePartitioning.ByMessage(x => x.Id.ToString()); + opts.Services.AddMarten(m => { @@ -69,14 +68,29 @@ public async Task hammer_it_with_lots_of_messages_against_buffered() m.DisableNpgsqlLogging = true; }).IntegrateWithWolverine(); + #region sample_defining_partitioned_routing_for_rabbitmq + + // opts is the WolverineOptions from within an Add/UseWolverine() call + + // Telling Wolverine how to assign a GroupId to a message, that we'll use + // to predictably sort into "slots" in the processing + opts.MessagePartitioning.ByMessage(x => x.Id.ToString()); + + // This is creating Rabbit MQ queues named "letters1" etc. opts.MessagePartitioning.PublishToShardedRabbitQueues("letters", 4, topology => { topology.MessagesImplementing(); - topology.MaxDegreeOfParallelism = ShardSlots.Five; + topology.MaxDegreeOfParallelism = PartitionSlots.Five; + topology.ConfigureSender(x => + { + // just to show that you can do this... + x.DeliverWithin(5.Minutes()); + }); topology.ConfigureListening(x => x.BufferedInMemory()); - }); + + #endregion }).StartAsync(); @@ -124,7 +138,7 @@ public async Task hammer_it_with_lots_of_messages_against_buffered_when_resent() .PublishToShardedRabbitQueues("letters", 4, topology => { topology.MessagesImplementing(); - topology.MaxDegreeOfParallelism = ShardSlots.Five; + topology.MaxDegreeOfParallelism = PartitionSlots.Five; topology.ConfigureListening(x => x.BufferedInMemory()); @@ -198,7 +212,7 @@ public async Task hammer_it_with_lots_of_messages_against_durable() opts.MessagePartitioning.PublishToShardedRabbitQueues("letters", 4, topology => { topology.MessagesImplementing(); - topology.MaxDegreeOfParallelism = ShardSlots.Five; + topology.MaxDegreeOfParallelism = PartitionSlots.Five; topology.ConfigureListening(x => x.UseDurableInbox()); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/PartitionedMessageTopologyWithQueues.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/PartitionedMessageTopologyWithQueues.cs index ba5d73ddc..cc39a8ca3 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/PartitionedMessageTopologyWithQueues.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/PartitionedMessageTopologyWithQueues.cs @@ -5,9 +5,9 @@ namespace Wolverine.RabbitMQ.Internal; public class PartitionedMessageTopologyWithQueues : PartitionedMessageTopology { - public PartitionedMessageTopologyWithQueues(WolverineOptions options, ShardSlots? listeningSlots, string baseName, int numberOfEndpoints) : base(options, listeningSlots, baseName, numberOfEndpoints) + public PartitionedMessageTopologyWithQueues(WolverineOptions options, PartitionSlots? listeningSlots, string baseName, int numberOfEndpoints) : base(options, listeningSlots, baseName, numberOfEndpoints) { - MaxDegreeOfParallelism = ShardSlots.Five; + MaxDegreeOfParallelism = PartitionSlots.Five; } protected override Endpoint buildEndpoint(WolverineOptions options, string name) diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqTransportExtensions.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqTransportExtensions.cs index 193c6ea9d..6514402e5 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqTransportExtensions.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqTransportExtensions.cs @@ -480,7 +480,7 @@ public static MessagePartitioningRules PublishToShardedRabbitQueues(this Message { rules.AddPublishingTopology((opts, _) => { - var topology = new PartitionedMessageTopologyWithQueues(opts, ShardSlots.Five, baseName, numberOfEndpoints); + var topology = new PartitionedMessageTopologyWithQueues(opts, PartitionSlots.Five, baseName, numberOfEndpoints); topology.ConfigureListening(x => {}); configure(topology); topology.AssertValidity(); diff --git a/src/Wolverine/Configuration/Endpoint.cs b/src/Wolverine/Configuration/Endpoint.cs index f775d2392..4066c294c 100644 --- a/src/Wolverine/Configuration/Endpoint.cs +++ b/src/Wolverine/Configuration/Endpoint.cs @@ -19,7 +19,7 @@ namespace Wolverine.Configuration; -public enum ShardSlots +public enum PartitionSlots { Three = 3, Five = 5, @@ -194,7 +194,7 @@ protected Endpoint(Uri uri, EndpointRole role) /// If specified, directs this endpoint to use by GroupId sharding in processing. /// Only impacts Buffered or Durable endpoints though. /// - public ShardSlots? GroupShardingSlotNumber { get; set; } + public PartitionSlots? GroupShardingSlotNumber { get; set; } /// /// In the case of using "sticky handlers" diff --git a/src/Wolverine/Configuration/IListenerConfiguration.cs b/src/Wolverine/Configuration/IListenerConfiguration.cs index 20c470c7c..b1751ab06 100644 --- a/src/Wolverine/Configuration/IListenerConfiguration.cs +++ b/src/Wolverine/Configuration/IListenerConfiguration.cs @@ -56,7 +56,7 @@ public interface IListenerConfiguration : IEndpointConfiguration T ListenWithStrictOrdering(string? endpointName = null); /// - /// Creates a policy of sharding the processing of incoming messages by the + /// Creates a policy of partitioning the processing of incoming messages by the /// specified number of slots. Use this to group messages to prevent concurrent /// processing of messages with the same GroupId while allowing parallel work across /// GroupIds. The number of "slots" reflects the maximum number of parallel messages @@ -64,7 +64,7 @@ public interface IListenerConfiguration : IEndpointConfiguration /// /// /// - T ShardListeningByGroupId(ShardSlots numberOfSlots); + T PartitionProcessingByGroupId(PartitionSlots numberOfSlots); /// /// Specify the maximum number of threads that this worker queue diff --git a/src/Wolverine/Configuration/ListenerConfiguration.cs b/src/Wolverine/Configuration/ListenerConfiguration.cs index f91273315..fe251608b 100644 --- a/src/Wolverine/Configuration/ListenerConfiguration.cs +++ b/src/Wolverine/Configuration/ListenerConfiguration.cs @@ -108,7 +108,7 @@ public ListenerConfiguration(Func source) : base(source) /// /// /// - public TSelf ShardListeningByGroupId(ShardSlots numberOfSlots) + public TSelf PartitionProcessingByGroupId(PartitionSlots numberOfSlots) { add(e => e.GroupShardingSlotNumber = numberOfSlots); return this.As(); @@ -408,18 +408,3 @@ public TSelf DefaultIncomingMessage(Type messageType) return this.As(); } } - -public enum ProcessingOrder -{ - /// - /// Should the messages be processed in the strict order in which they - /// were received? - /// - StrictOrdered, - - /// - /// Is it okay to allow the local queue to process messages in any order? This - /// may give better throughput - /// - UnOrdered -} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Agents/IAgent.cs b/src/Wolverine/Runtime/Agents/IAgent.cs index 01eb7cd83..30d1fa22b 100644 --- a/src/Wolverine/Runtime/Agents/IAgent.cs +++ b/src/Wolverine/Runtime/Agents/IAgent.cs @@ -5,20 +5,27 @@ namespace Wolverine.Runtime.Agents; #region sample_IAgent +#region sample_IAgent + /// /// Models a constantly running background process within a Wolverine /// node cluster /// -public interface IAgent : IHostedService +public interface IAgent : IHostedService // Standard .NET interface for background services { /// /// Unique identification for this agent within the Wolverine system /// Uri Uri { get; } + // Not really used for anything real *yet*, but + // hopefully becomes something useful for CritterWatch + // health monitoring AgentStatus Status { get; } } +#endregion + public class CompositeAgent : IAgent { private readonly List _agents; diff --git a/src/Wolverine/Runtime/Partitioning/IGroupingRule.cs b/src/Wolverine/Runtime/Partitioning/IGroupingRule.cs index 389a8fcae..428c1eb44 100644 --- a/src/Wolverine/Runtime/Partitioning/IGroupingRule.cs +++ b/src/Wolverine/Runtime/Partitioning/IGroupingRule.cs @@ -1,9 +1,13 @@ namespace Wolverine.Runtime.Partitioning; +#region sample_IGroupingRule + /// /// Strategy for determining the GroupId of a message /// public interface IGroupingRule { bool TryFindIdentity(Envelope envelope, out string groupId); -} \ No newline at end of file +} + +#endregion \ No newline at end of file diff --git a/src/Wolverine/Runtime/Partitioning/LocalPartitionedMessageTopology.cs b/src/Wolverine/Runtime/Partitioning/LocalPartitionedMessageTopology.cs index e4c0566bc..26e128ed0 100644 --- a/src/Wolverine/Runtime/Partitioning/LocalPartitionedMessageTopology.cs +++ b/src/Wolverine/Runtime/Partitioning/LocalPartitionedMessageTopology.cs @@ -6,24 +6,24 @@ namespace Wolverine.Runtime.Partitioning; public class LocalPartitionedMessageTopology : PartitionedMessageTopology { - private ShardSlots _listeningSlots; + private PartitionSlots _listeningSlots; - public LocalPartitionedMessageTopology(WolverineOptions options, string baseName, int numberOfEndpoints) : base(options, ShardSlots.Five, baseName, numberOfEndpoints) + public LocalPartitionedMessageTopology(WolverineOptions options, string baseName, int numberOfEndpoints) : base(options, PartitionSlots.Five, baseName, numberOfEndpoints) { - _listeningSlots = ShardSlots.Five; + _listeningSlots = PartitionSlots.Five; } /// /// Override the maximum number of parallel messages that can be executed /// at one time in one of the sharded local queues. Default is 5. /// - public ShardSlots MaxDegreeOfParallelism + public PartitionSlots MaxDegreeOfParallelism { get => _listeningSlots; set { _listeningSlots = value; - ConfigureQueues(x => x.ShardListeningByGroupId(value)); + ConfigureQueues(x => x.PartitionProcessingByGroupId(value)); } } diff --git a/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs b/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs index 8e95613e3..0e4ce5fbe 100644 --- a/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs +++ b/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs @@ -66,6 +66,17 @@ public MessagePartitioningRules ByMessage(Func strategy) return this; } + /// + /// Register a custom message grouping rule + /// + /// + /// + public MessagePartitioningRules ByRule(IGroupingRule rule) + { + _rules.Add(rule); + return this; + } + /// /// Add a grouping rule based on a concrete message type and the property /// of the message type that exposes the group id information @@ -113,7 +124,7 @@ internal bool TryFindTopology(Type messageType, out PartitionedMessageTopology? /// /// As possible, let Wolverine infer the message grouping based on the message usage. - /// For example, messsages related to a Saga will use any saga id property in the command type. In + /// For example, messages related to a Saga will use any saga id property in the command type. In /// the Marten aggregate handler workflow, Wolverine will use a property on the command type that /// identifies the event stream *if only one event stream is impacted* /// diff --git a/src/Wolverine/Runtime/Partitioning/PartitionedMessageTopology.cs b/src/Wolverine/Runtime/Partitioning/PartitionedMessageTopology.cs index fba852de8..e9e227a34 100644 --- a/src/Wolverine/Runtime/Partitioning/PartitionedMessageTopology.cs +++ b/src/Wolverine/Runtime/Partitioning/PartitionedMessageTopology.cs @@ -8,9 +8,9 @@ public abstract class PartitionedMessageTopology : Parti where TListener : IListenerConfiguration where TSubscriber : ISubscriberConfiguration { - private ShardSlots _listeningSlots; + private PartitionSlots _listeningSlots; - public PartitionedMessageTopology(WolverineOptions options, ShardSlots? listeningSlots, string baseName, int numberOfEndpoints) : base(options, listeningSlots, baseName, numberOfEndpoints) + public PartitionedMessageTopology(WolverineOptions options, PartitionSlots? listeningSlots, string baseName, int numberOfEndpoints) : base(options, listeningSlots, baseName, numberOfEndpoints) { if (listeningSlots.HasValue) { @@ -26,13 +26,13 @@ public PartitionedMessageTopology(WolverineOptions options, ShardSlots? listenin /// Override the maximum number of parallel messages that can be executed /// at one time in one of the sharded local queues. Default is 5. /// - public ShardSlots MaxDegreeOfParallelism + public PartitionSlots MaxDegreeOfParallelism { get => _listeningSlots; set { _listeningSlots = value; - ConfigureListening(x => x.ShardListeningByGroupId(value)); + ConfigureListening(x => x.PartitionProcessingByGroupId(value)); } } @@ -60,7 +60,7 @@ public abstract class PartitionedMessageTopology { protected readonly WolverineOptions _options; - protected PartitionedMessageTopology(WolverineOptions options, ShardSlots? listeningSlots, string baseName, int numberOfEndpoints) + protected PartitionedMessageTopology(WolverineOptions options, PartitionSlots? listeningSlots, string baseName, int numberOfEndpoints) { if (numberOfEndpoints <= 0) { diff --git a/src/Wolverine/Runtime/Partitioning/EnvelopeShardingExtensions.cs b/src/Wolverine/Runtime/Partitioning/PartitionedMessagingExtensions.cs similarity index 80% rename from src/Wolverine/Runtime/Partitioning/EnvelopeShardingExtensions.cs rename to src/Wolverine/Runtime/Partitioning/PartitionedMessagingExtensions.cs index 089fc44f0..2d1ba35fa 100644 --- a/src/Wolverine/Runtime/Partitioning/EnvelopeShardingExtensions.cs +++ b/src/Wolverine/Runtime/Partitioning/PartitionedMessagingExtensions.cs @@ -2,7 +2,7 @@ namespace Wolverine.Runtime.Partitioning; -internal static class EnvelopeShardingExtensions +internal static class PartitionedMessagingExtensions { internal static void AssertIsValidNumberOfProcessingSlots(this int slots) { @@ -13,7 +13,9 @@ internal static void AssertIsValidNumberOfProcessingSlots(this int slots) $"Invalid number of processing slots. The acceptable values are {validValues.Select(x => x.ToString()).Join(", ")}"); } } - + + #region sample_SlotForSending + /// /// Uses a combination of message grouping id rules and a deterministic hash /// to predictably assign envelopes to a slot to help "shard" message publishing. @@ -24,13 +26,20 @@ internal static void AssertIsValidNumberOfProcessingSlots(this int slots) /// public static int SlotForSending(this Envelope envelope, int numberOfSlots, MessagePartitioningRules rules) { + // This is where Wolverine determines the GroupId for the message + // Note that you can also explicitly set the GroupId var groupId = rules.DetermineGroupId(envelope); - // Pick one at random, and has to be zero based + // Pick one at random if we can't determine a group id, and has to be zero based if (groupId == null) return Random.Shared.Next(1, numberOfSlots) - 1; + // Deterministically choose a slot based on the GroupId, but try + // to more or less evenly distribute groups to the different + // slots return Math.Abs(groupId.GetDeterministicHashCode() % numberOfSlots); } + + #endregion /// /// Uses a combination of message grouping id rules and a deterministic hash