diff --git a/docs/guide/durability/sagas.md b/docs/guide/durability/sagas.md index 6e17ab94e..c6965e56f 100644 --- a/docs/guide/durability/sagas.md +++ b/docs/guide/durability/sagas.md @@ -622,3 +622,78 @@ using var host = await Host.CreateDefaultBuilder() ``` snippet source | anchor + +## Multiple Sagas Handling the Same Message Type + +By default, Wolverine does not allow multiple saga types to handle the same message type and will throw an `InvalidSagaException` at startup if this is detected. However, there are valid architectural reasons to have multiple, independent saga workflows react to the same event — for example, an `OrderPlaced` event might start both a `ShippingSaga` and a `BillingSaga`. + +To enable this, set `MultipleHandlerBehavior` to `Separated`: + +```cs +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + + // Your persistence configuration here (Marten, EF Core, etc.) + }).StartAsync(); +``` + +When `Separated` mode is active, Wolverine creates an independent handler chain for each saga type, routed to its own local queue. Each saga independently manages its own lifecycle — loading, creating, updating, and deleting state — without interfering with the other. + +Here is an example with two sagas that both start from an `OrderPlaced` message but complete independently: + +```cs +// Shared message that both sagas react to +public record OrderPlaced(Guid OrderPlacedId, string ProductName); + +// Messages specific to each saga +public record OrderShipped(Guid ShippingSagaId); +public record PaymentReceived(Guid BillingSagaId); + +public class ShippingSaga : Saga +{ + public Guid Id { get; set; } + public string ProductName { get; set; } = string.Empty; + + public static ShippingSaga Start(OrderPlaced message) + { + return new ShippingSaga + { + Id = message.OrderPlacedId, + ProductName = message.ProductName + }; + } + + public void Handle(OrderShipped message) + { + MarkCompleted(); + } +} + +public class BillingSaga : Saga +{ + public Guid Id { get; set; } + public string ProductName { get; set; } = string.Empty; + + public static BillingSaga Start(OrderPlaced message) + { + return new BillingSaga + { + Id = message.OrderPlacedId, + ProductName = message.ProductName + }; + } + + public void Handle(PaymentReceived message) + { + MarkCompleted(); + } +} +``` + +When an `OrderPlaced` message is published, both sagas will be started independently. Completing one saga (e.g., by sending `OrderShipped`) does not affect the other. + +::: warning +In `Separated` mode, messages routed to multiple sagas must be **published** (via `SendAsync` or `PublishAsync`), not **invoked** (via `InvokeAsync`). `InvokeAsync` bypasses message routing and will not reach the separated saga endpoints. +::: diff --git a/src/Persistence/MartenTests/Saga/multiple_sagas_for_same_message.cs b/src/Persistence/MartenTests/Saga/multiple_sagas_for_same_message.cs new file mode 100644 index 000000000..9ced6860d --- /dev/null +++ b/src/Persistence/MartenTests/Saga/multiple_sagas_for_same_message.cs @@ -0,0 +1,141 @@ +using IntegrationTests; +using Marten; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Attributes; +using Wolverine.Marten; +using Wolverine.Tracking; + +namespace MartenTests.Saga; + +public class multiple_sagas_for_same_message : IAsyncLifetime +{ + private IHost _host; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + + opts.Discovery.IncludeType(); + opts.Discovery.IncludeType(); + + opts.Services.AddMarten(m => + { + m.DisableNpgsqlLogging = true; + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "multi_saga"; + }).IntegrateWithWolverine(); + + opts.Policies.AutoApplyTransactions(); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + } + + [Fact] + public async Task two_sagas_start_from_same_message() + { + var id = Guid.NewGuid(); + + await _host.SendMessageAndWaitAsync(new OrderPlaced(id, "Widget")); + + await using var session = _host.DocumentStore().QuerySession(); + + var shipping = await session.LoadAsync(id); + shipping.ShouldNotBeNull(); + shipping.ProductName.ShouldBe("Widget"); + + var billing = await session.LoadAsync(id); + billing.ShouldNotBeNull(); + billing.ProductName.ShouldBe("Widget"); + } + + [Fact] + public async Task two_sagas_handle_subsequent_messages_independently() + { + var id = Guid.NewGuid(); + + await using var session = _host.DocumentStore().QuerySession(); + + await _host.SendMessageAndWaitAsync(new OrderPlaced(id, "Gadget")); + (await session.LoadAsync(id)).ShouldNotBeNull(); + (await session.LoadAsync(id)).ShouldNotBeNull(); + + + // Complete only the shipping saga + await _host.SendMessageAndWaitAsync(new OrderShipped(id)); + + + + // Shipping saga should be deleted (completed) + var shipping = await session.LoadAsync(id); + shipping.ShouldBeNull(); + + // Billing saga should still exist + var billing = await session.LoadAsync(id); + billing.ShouldNotBeNull(); + billing.ProductName.ShouldBe("Gadget"); + + // Now complete the billing saga + await _host.SendMessageAndWaitAsync(new PaymentReceived(id)); + + await using var session2 = _host.DocumentStore().QuerySession(); + (await session2.LoadAsync(id)).ShouldBeNull(); + } +} + +// Shared message that both sagas react to +public record OrderPlaced(Guid OrderPlacedId, string ProductName); + +// Messages specific to each saga +public record OrderShipped(Guid ShippingSagaId); +public record PaymentReceived(Guid BillingSagaId); + +[WolverineIgnore] +public class ShippingSaga : Wolverine.Saga +{ + public Guid Id { get; set; } + public string ProductName { get; set; } = string.Empty; + + public static ShippingSaga Start(OrderPlaced message) + { + return new ShippingSaga + { + Id = message.OrderPlacedId, + ProductName = message.ProductName + }; + } + + public void Handle(OrderShipped message) + { + MarkCompleted(); + } +} + +[WolverineIgnore] +public class BillingSaga : Wolverine.Saga +{ + public Guid Id { get; set; } + public string ProductName { get; set; } = string.Empty; + + public static BillingSaga Start(OrderPlaced message) + { + return new BillingSaga + { + Id = message.OrderPlacedId, + ProductName = message.ProductName + }; + } + + public void Handle(PaymentReceived message) + { + MarkCompleted(); + } +} diff --git a/src/Wolverine/Persistence/Sagas/SagaChain.cs b/src/Wolverine/Persistence/Sagas/SagaChain.cs index c25c702fc..0f5f0de2f 100644 --- a/src/Wolverine/Persistence/Sagas/SagaChain.cs +++ b/src/Wolverine/Persistence/Sagas/SagaChain.cs @@ -9,6 +9,7 @@ using Wolverine.Configuration; using Wolverine.Logging; using Wolverine.Runtime.Handlers; +using Wolverine.Transports.Local; namespace Wolverine.Persistence.Sagas; @@ -28,9 +29,22 @@ public class SagaChain : HandlerChain public SagaChain(WolverineOptions options, IGrouping grouping, HandlerGraph parent) : base(options, grouping, parent) { + // After base constructor, saga handlers may have been moved to ByEndpoint (Separated mode). + // Check what's left in Handlers (not the original grouping). + var remainingSagaCalls = Handlers.Where(x => x.HandlerType.CanBeCastTo()) + .DistinctBy(x => x.HandlerType).ToArray(); + + if (remainingSagaCalls.Length == 0) + { + // All sagas were separated into ByEndpoint chains — this parent is routing-only. + var anySaga = grouping.First(x => x.HandlerType.CanBeCastTo()); + SagaType = anySaga.HandlerType; + return; + } + try { - var saga = grouping.Where(x => x.HandlerType.CanBeCastTo()).DistinctBy(x => x.HandlerType).Single(); + var saga = remainingSagaCalls.Single(); SagaType = saga.HandlerType; SagaMethodInfo = saga.Method; @@ -44,7 +58,7 @@ public SagaChain(WolverineOptions options, IGrouping grouping } catch (Exception e) { - var handlerTypes = grouping.Where(x => x.HandlerType.CanBeCastTo()) + var handlerTypes = remainingSagaCalls .Select(x => x.HandlerType).Select(x => x.FullNameInCode()).Join(", "); throw new InvalidSagaException( @@ -61,16 +75,38 @@ protected override void maybeAssignStickyHandlers(WolverineOptions options, IGro tryAssignStickyEndpoints(handlerCall, options); } - // You just know *somebody* is going to try to handle the same message type - // by different sagas because our users hate me var groupedSagas = grouping.Where(x => x.HandlerType.CanBeCastTo()) .GroupBy(x => x.HandlerType).ToArray(); if (groupedSagas.Length > 1) - throw new NotSupportedException( - "Wolverine does not (yet) support having multiple Saga type respond to the same message."); + { + if (options.MultipleHandlerBehavior != MultipleHandlerBehavior.Separated) + { + var sagaTypes = groupedSagas.Select(x => x.Key.FullNameInCode()).Join(", "); + throw new InvalidSagaException( + $"Multiple saga types ({sagaTypes}) handle message {MessageType.FullNameInCode()}. " + + $"Set MultipleHandlerBehavior to Separated to allow this."); + } + + // In Separated mode, create a separate SagaChain per saga type + foreach (var sagaGroup in groupedSagas) + { + var sagaCalls = sagaGroup.ToArray(); + var sagaType = sagaGroup.Key; + + var endpoint = options.Transports.GetOrCreate() + .QueueFor(sagaType.FullNameInCode().ToLowerInvariant()); + + var chain = new SagaChain(sagaCalls, options.HandlerGraph, [endpoint]); + + foreach (var call in sagaCalls) + { + Handlers.Remove(call); + } - // TODO -- MORE HERE!!!!! + _byEndpoint.Add(chain); + } + } } public SagaChain(HandlerCall handlerCall, HandlerGraph handlerGraph, Endpoint[] endpoints) : base(handlerCall, handlerGraph) @@ -90,6 +126,26 @@ public SagaChain(HandlerCall handlerCall, HandlerGraph handlerGraph, Endpoint[] } } + internal SagaChain(HandlerCall[] sagaCalls, HandlerGraph handlerGraph, Endpoint[] endpoints) + : base(sagaCalls[0].Method.MessageType()!, handlerGraph) + { + foreach (var endpoint in endpoints) RegisterEndpoint(endpoint); + foreach (var call in sagaCalls) Handlers.Add(call); + + var saga = sagaCalls.First(); + SagaType = saga.HandlerType; + SagaMethodInfo = saga.Method; + + SagaIdMember = DetermineSagaIdMember(MessageType, SagaType, saga.Method); + + if (SagaIdMember != null && AuditedMembers.All(x => x.Member != SagaIdMember)) + { + AuditedMembers.Add(new AuditedMember(SagaIdMember, SagaIdMember.Name, SagaIdMember.Name)); + } + + TypeName = saga.HandlerType.ToSuffixedTypeName(HandlerSuffix).Replace("[]", "Array"); + } + public override bool TryInferMessageIdentity(out PropertyInfo? property) { property = SagaIdMember as PropertyInfo; @@ -236,8 +292,7 @@ private void generateForOnlyStartingSaga(IServiceContainer container, IPersisten frames.Add(ifNotCompleted); } - // Always true! - internal override bool HasDefaultNonStickyHandlers() => true; + internal override bool HasDefaultNonStickyHandlers() => Handlers.Any(); internal IEnumerable DetermineSagaDoesNotExistSteps(Variable sagaId, Variable saga, IPersistenceFrameProvider frameProvider, IServiceContainer container) diff --git a/src/Wolverine/Runtime/Handlers/HandlerChain.cs b/src/Wolverine/Runtime/Handlers/HandlerChain.cs index afc3bfb64..e30d16e8f 100644 --- a/src/Wolverine/Runtime/Handlers/HandlerChain.cs +++ b/src/Wolverine/Runtime/Handlers/HandlerChain.cs @@ -46,7 +46,7 @@ public class HandlerChain : Chain, IW public const string Consume = "Consume"; public const string Consumes = "Consumes"; - private readonly List _byEndpoint = []; + protected readonly List _byEndpoint = []; private readonly List _endpoints = [];