Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions docs/guide/durability/sagas.md
Original file line number Diff line number Diff line change
Expand Up @@ -622,3 +622,78 @@ using var host = await Host.CreateDefaultBuilder()
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/PersistenceTests/Samples/SagaChainPolicies.cs#L15-L23' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_chain_policy_on_sagas' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Multiple Sagas Handling the Same Message Type

By default, Wolverine does not allow multiple saga types to handle the same message type and will throw an `InvalidSagaException` at startup if this is detected. However, there are valid architectural reasons to have multiple, independent saga workflows react to the same event — for example, an `OrderPlaced` event might start both a `ShippingSaga` and a `BillingSaga`.

To enable this, set `MultipleHandlerBehavior` to `Separated`:

```cs
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;

// Your persistence configuration here (Marten, EF Core, etc.)
}).StartAsync();
```

When `Separated` mode is active, Wolverine creates an independent handler chain for each saga type, routed to its own local queue. Each saga independently manages its own lifecycle — loading, creating, updating, and deleting state — without interfering with the other.

Here is an example with two sagas that both start from an `OrderPlaced` message but complete independently:

```cs
// Shared message that both sagas react to
public record OrderPlaced(Guid OrderPlacedId, string ProductName);

// Messages specific to each saga
public record OrderShipped(Guid ShippingSagaId);
public record PaymentReceived(Guid BillingSagaId);

public class ShippingSaga : Saga
{
public Guid Id { get; set; }
public string ProductName { get; set; } = string.Empty;

public static ShippingSaga Start(OrderPlaced message)
{
return new ShippingSaga
{
Id = message.OrderPlacedId,
ProductName = message.ProductName
};
}

public void Handle(OrderShipped message)
{
MarkCompleted();
}
}

public class BillingSaga : Saga
{
public Guid Id { get; set; }
public string ProductName { get; set; } = string.Empty;

public static BillingSaga Start(OrderPlaced message)
{
return new BillingSaga
{
Id = message.OrderPlacedId,
ProductName = message.ProductName
};
}

public void Handle(PaymentReceived message)
{
MarkCompleted();
}
}
```

When an `OrderPlaced` message is published, both sagas will be started independently. Completing one saga (e.g., by sending `OrderShipped`) does not affect the other.

::: warning
In `Separated` mode, messages routed to multiple sagas must be **published** (via `SendAsync` or `PublishAsync`), not **invoked** (via `InvokeAsync`). `InvokeAsync` bypasses message routing and will not reach the separated saga endpoints.
:::
141 changes: 141 additions & 0 deletions src/Persistence/MartenTests/Saga/multiple_sagas_for_same_message.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
using IntegrationTests;
using Marten;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Attributes;
using Wolverine.Marten;
using Wolverine.Tracking;

namespace MartenTests.Saga;

public class multiple_sagas_for_same_message : IAsyncLifetime
{
private IHost _host;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;

opts.Discovery.IncludeType<ShippingSaga>();
opts.Discovery.IncludeType<BillingSaga>();

opts.Services.AddMarten(m =>
{
m.DisableNpgsqlLogging = true;
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "multi_saga";
}).IntegrateWithWolverine();

opts.Policies.AutoApplyTransactions();
}).StartAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
}

[Fact]
public async Task two_sagas_start_from_same_message()
{
var id = Guid.NewGuid();

await _host.SendMessageAndWaitAsync(new OrderPlaced(id, "Widget"));

await using var session = _host.DocumentStore().QuerySession();

var shipping = await session.LoadAsync<ShippingSaga>(id);
shipping.ShouldNotBeNull();
shipping.ProductName.ShouldBe("Widget");

var billing = await session.LoadAsync<BillingSaga>(id);
billing.ShouldNotBeNull();
billing.ProductName.ShouldBe("Widget");
}

[Fact]
public async Task two_sagas_handle_subsequent_messages_independently()
{
var id = Guid.NewGuid();

await using var session = _host.DocumentStore().QuerySession();

await _host.SendMessageAndWaitAsync(new OrderPlaced(id, "Gadget"));
(await session.LoadAsync<ShippingSaga>(id)).ShouldNotBeNull();
(await session.LoadAsync<BillingSaga>(id)).ShouldNotBeNull();


// Complete only the shipping saga
await _host.SendMessageAndWaitAsync(new OrderShipped(id));



// Shipping saga should be deleted (completed)
var shipping = await session.LoadAsync<ShippingSaga>(id);
shipping.ShouldBeNull();

// Billing saga should still exist
var billing = await session.LoadAsync<BillingSaga>(id);
billing.ShouldNotBeNull();
billing.ProductName.ShouldBe("Gadget");

// Now complete the billing saga
await _host.SendMessageAndWaitAsync(new PaymentReceived(id));

await using var session2 = _host.DocumentStore().QuerySession();
(await session2.LoadAsync<BillingSaga>(id)).ShouldBeNull();
}
}

// Shared message that both sagas react to
public record OrderPlaced(Guid OrderPlacedId, string ProductName);

// Messages specific to each saga
public record OrderShipped(Guid ShippingSagaId);
public record PaymentReceived(Guid BillingSagaId);

[WolverineIgnore]
public class ShippingSaga : Wolverine.Saga
{
public Guid Id { get; set; }
public string ProductName { get; set; } = string.Empty;

public static ShippingSaga Start(OrderPlaced message)
{
return new ShippingSaga
{
Id = message.OrderPlacedId,
ProductName = message.ProductName
};
}

public void Handle(OrderShipped message)
{
MarkCompleted();
}
}

[WolverineIgnore]
public class BillingSaga : Wolverine.Saga
{
public Guid Id { get; set; }
public string ProductName { get; set; } = string.Empty;

public static BillingSaga Start(OrderPlaced message)
{
return new BillingSaga
{
Id = message.OrderPlacedId,
ProductName = message.ProductName
};
}

public void Handle(PaymentReceived message)
{
MarkCompleted();
}
}
73 changes: 64 additions & 9 deletions src/Wolverine/Persistence/Sagas/SagaChain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Wolverine.Configuration;
using Wolverine.Logging;
using Wolverine.Runtime.Handlers;
using Wolverine.Transports.Local;

namespace Wolverine.Persistence.Sagas;

Expand All @@ -28,9 +29,22 @@ public class SagaChain : HandlerChain

public SagaChain(WolverineOptions options, IGrouping<Type, HandlerCall> grouping, HandlerGraph parent) : base(options, grouping, parent)
{
// After base constructor, saga handlers may have been moved to ByEndpoint (Separated mode).
// Check what's left in Handlers (not the original grouping).
var remainingSagaCalls = Handlers.Where(x => x.HandlerType.CanBeCastTo<Saga>())
.DistinctBy(x => x.HandlerType).ToArray();

if (remainingSagaCalls.Length == 0)
{
// All sagas were separated into ByEndpoint chains — this parent is routing-only.
var anySaga = grouping.First(x => x.HandlerType.CanBeCastTo<Saga>());
SagaType = anySaga.HandlerType;
return;
}

try
{
var saga = grouping.Where(x => x.HandlerType.CanBeCastTo<Saga>()).DistinctBy(x => x.HandlerType).Single();
var saga = remainingSagaCalls.Single();
SagaType = saga.HandlerType;
SagaMethodInfo = saga.Method;

Expand All @@ -44,7 +58,7 @@ public SagaChain(WolverineOptions options, IGrouping<Type, HandlerCall> grouping
}
catch (Exception e)
{
var handlerTypes = grouping.Where(x => x.HandlerType.CanBeCastTo<Saga>())
var handlerTypes = remainingSagaCalls
.Select(x => x.HandlerType).Select(x => x.FullNameInCode()).Join(", ");

throw new InvalidSagaException(
Expand All @@ -61,16 +75,38 @@ protected override void maybeAssignStickyHandlers(WolverineOptions options, IGro
tryAssignStickyEndpoints(handlerCall, options);
}

// You just know *somebody* is going to try to handle the same message type
// by different sagas because our users hate me
var groupedSagas = grouping.Where(x => x.HandlerType.CanBeCastTo<Saga>())
.GroupBy(x => x.HandlerType).ToArray();

if (groupedSagas.Length > 1)
throw new NotSupportedException(
"Wolverine does not (yet) support having multiple Saga type respond to the same message.");
{
if (options.MultipleHandlerBehavior != MultipleHandlerBehavior.Separated)
{
var sagaTypes = groupedSagas.Select(x => x.Key.FullNameInCode()).Join(", ");
throw new InvalidSagaException(
$"Multiple saga types ({sagaTypes}) handle message {MessageType.FullNameInCode()}. " +
$"Set MultipleHandlerBehavior to Separated to allow this.");
}

// In Separated mode, create a separate SagaChain per saga type
foreach (var sagaGroup in groupedSagas)
{
var sagaCalls = sagaGroup.ToArray();
var sagaType = sagaGroup.Key;

var endpoint = options.Transports.GetOrCreate<LocalTransport>()
.QueueFor(sagaType.FullNameInCode().ToLowerInvariant());

var chain = new SagaChain(sagaCalls, options.HandlerGraph, [endpoint]);

foreach (var call in sagaCalls)
{
Handlers.Remove(call);
}

// TODO -- MORE HERE!!!!!
_byEndpoint.Add(chain);
}
}
}

public SagaChain(HandlerCall handlerCall, HandlerGraph handlerGraph, Endpoint[] endpoints) : base(handlerCall, handlerGraph)
Expand All @@ -90,6 +126,26 @@ public SagaChain(HandlerCall handlerCall, HandlerGraph handlerGraph, Endpoint[]
}
}

internal SagaChain(HandlerCall[] sagaCalls, HandlerGraph handlerGraph, Endpoint[] endpoints)
: base(sagaCalls[0].Method.MessageType()!, handlerGraph)
{
foreach (var endpoint in endpoints) RegisterEndpoint(endpoint);
foreach (var call in sagaCalls) Handlers.Add(call);

var saga = sagaCalls.First();
SagaType = saga.HandlerType;
SagaMethodInfo = saga.Method;

SagaIdMember = DetermineSagaIdMember(MessageType, SagaType, saga.Method);

if (SagaIdMember != null && AuditedMembers.All(x => x.Member != SagaIdMember))
{
AuditedMembers.Add(new AuditedMember(SagaIdMember, SagaIdMember.Name, SagaIdMember.Name));
}

TypeName = saga.HandlerType.ToSuffixedTypeName(HandlerSuffix).Replace("[]", "Array");
}

public override bool TryInferMessageIdentity(out PropertyInfo? property)
{
property = SagaIdMember as PropertyInfo;
Expand Down Expand Up @@ -236,8 +292,7 @@ private void generateForOnlyStartingSaga(IServiceContainer container, IPersisten
frames.Add(ifNotCompleted);
}

// Always true!
internal override bool HasDefaultNonStickyHandlers() => true;
internal override bool HasDefaultNonStickyHandlers() => Handlers.Any();

internal IEnumerable<Frame> DetermineSagaDoesNotExistSteps(Variable sagaId, Variable saga,
IPersistenceFrameProvider frameProvider, IServiceContainer container)
Expand Down
2 changes: 1 addition & 1 deletion src/Wolverine/Runtime/Handlers/HandlerChain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class HandlerChain : Chain<HandlerChain, ModifyHandlerChainAttribute>, IW
public const string Consume = "Consume";
public const string Consumes = "Consumes";

private readonly List<HandlerChain> _byEndpoint = [];
protected readonly List<HandlerChain> _byEndpoint = [];

private readonly List<Endpoint> _endpoints = [];

Expand Down
Loading