Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions docs/guide/durability/sagas.md
Original file line number Diff line number Diff line change
Expand Up @@ -697,3 +697,75 @@ When an `OrderPlaced` message is published, both sagas will be started independe
::: warning
In `Separated` mode, messages routed to multiple sagas must be **published** (via `SendAsync` or `PublishAsync`), not **invoked** (via `InvokeAsync`). `InvokeAsync` bypasses message routing and will not reach the separated saga endpoints.
:::

## Resequencer Saga

Wolverine supports the [Resequencer](https://www.enterpriseintegrationpatterns.com/patterns/messaging/Resequencer.html) pattern
out of the box through the `ResequencerSaga<T>` base class. This is useful when you need to process messages in a specific order,
but they may arrive out of sequence.

Messages must implement the `SequencedMessage` interface:

```cs
public interface SequencedMessage
{
int? Order { get; }
}
```

Then subclass `ResequencerSaga<T>` instead of `Saga`:

```cs
public record StartMyWorkflow(Guid Id);

public record MySequencedCommand(Guid SagaId, int? Order) : SequencedMessage;

public class MyWorkflowSaga : ResequencerSaga<MySequencedCommand>
{
public Guid Id { get; set; }

public static MyWorkflowSaga Start(StartMyWorkflow cmd)
{
return new MyWorkflowSaga { Id = cmd.Id };
}

public void Handle(MySequencedCommand cmd)
{
// This will only be called when messages arrive in the correct order,
// or when out-of-order messages are replayed after gaps are filled
}
}
```

### How It Works

Wolverine generates a `ShouldProceed()` guard around your `Handle`/`Orchestrate` methods:

- If `Order` is `null` or `0`, the message bypasses the guard and is handled immediately
- If `Order` equals `LastSequence + 1` (the next expected sequence), the handler executes normally and `LastSequence` advances
- If `Order` is greater than `LastSequence + 1` (a gap exists), the message is added to the `Pending` list and the handler is **not** called
- When a gap-filling message arrives, any consecutive pending messages are automatically re-published to be handled in order

The saga state is **always** persisted regardless of whether the handler was called, because the `Pending` list and `LastSequence` may have changed.

### Key Properties

| Property | Description |
|----------|-------------|
| `LastSequence` | The highest sequence number that has been processed in order |
| `Pending` | Messages received out of order, waiting for earlier messages to arrive |

### Concurrency Considerations

When using `ResequencerSaga`, we recommend also using [Partitioned Sequential Messaging](/guide/messaging/partitioning) to manage potential concurrency conflicts. When `UseInferredMessageGrouping()` is enabled, Wolverine automatically detects `SequencedMessage` types and uses the `Order` property as the group id for partitioning. Messages with `null` order values receive a random group id so they are distributed independently.

```cs
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.MessagePartitioning
// Automatically infers grouping from saga identity AND
// SequencedMessage.Order for resequencer sagas
.UseInferredMessageGrouping();
}).StartAsync();
```
3 changes: 2 additions & 1 deletion docs/guide/messaging/partitioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ opts.MessagePartitioning

The built in rules *at this point* include:

* Using the Sage identity of a message that is handled by a [Stateful Saga](/guide/durability/sagas)
* Using the saga identity of a message that is handled by a [Stateful Saga](/guide/durability/sagas)
* Using the stream/aggregate id of messages that are part of the [Aggregate Handler Workflow](/guide/durability/marten/event-sourcing) integration with Marten
* Using the `Order` property of messages that implement the `SequencedMessage` interface (used by [ResequencerSaga](/guide/durability/sagas#resequencer-saga)). Messages with a `null` order value receive a random group id so they are distributed independently

## Specifying Grouping Rules

Expand Down
138 changes: 138 additions & 0 deletions src/Persistence/MartenTests/Saga/resequencer_saga_end_to_end.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
using IntegrationTests;
using JasperFx.Resources;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Tracking;
using Xunit;

namespace MartenTests.Saga;

public record StartMartenSequencedSaga(Guid Id);

public record MartenSequencedCommand(Guid SagaId, int? Order) : SequencedMessage;

public class MartenTestResequencerSaga : ResequencerSaga<MartenSequencedCommand>
{
public Guid Id { get; set; }
public List<int?> ProcessedOrders { get; set; } = new();

public static MartenTestResequencerSaga Start(StartMartenSequencedSaga cmd)
{
return new MartenTestResequencerSaga { Id = cmd.Id };
}

public void Handle(MartenSequencedCommand cmd)
{
ProcessedOrders.Add(cmd.Order);
}
}

public class resequencer_saga_end_to_end : PostgresqlContext, IAsyncLifetime
{
private IHost _host;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Services.AddMarten(m =>
{
m.DisableNpgsqlLogging = true;
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "resequencer_sagas";
}).IntegrateWithWolverine();

opts.Services.AddResourceSetupOnStartup();

opts.Discovery.DisableConventionalDiscovery()
.IncludeType<MartenTestResequencerSaga>();
})
.StartAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

private async Task<MartenTestResequencerSaga?> LoadState(Guid id)
{
using var session = _host.DocumentStore().QuerySession();
return await session.LoadAsync<MartenTestResequencerSaga>(id);
}

[Fact]
public async Task messages_in_order_are_all_handled()
{
var sagaId = Guid.NewGuid();

await _host.InvokeMessageAndWaitAsync(new StartMartenSequencedSaga(sagaId));
await _host.InvokeMessageAndWaitAsync(new MartenSequencedCommand(sagaId, 1));
await _host.InvokeMessageAndWaitAsync(new MartenSequencedCommand(sagaId, 2));
await _host.InvokeMessageAndWaitAsync(new MartenSequencedCommand(sagaId, 3));

var state = await LoadState(sagaId);
state.ShouldNotBeNull();
state.ProcessedOrders.ShouldBe([1, 2, 3]);
state.LastSequence.ShouldBe(3);
state.Pending.ShouldBeEmpty();
}

[Fact]
public async Task out_of_order_message_is_queued_not_handled()
{
var sagaId = Guid.NewGuid();

await _host.InvokeMessageAndWaitAsync(new StartMartenSequencedSaga(sagaId));
await _host.InvokeMessageAndWaitAsync(new MartenSequencedCommand(sagaId, 3));

var state = await LoadState(sagaId);
state.ShouldNotBeNull();
state.ProcessedOrders.ShouldBeEmpty();
state.LastSequence.ShouldBe(0);
state.Pending.Count.ShouldBe(1);
}

[Fact]
public async Task out_of_order_messages_replayed_when_gap_fills()
{
var sagaId = Guid.NewGuid();

await _host.InvokeMessageAndWaitAsync(new StartMartenSequencedSaga(sagaId));
// Send message 2 first, it will be queued in Pending
await _host.InvokeMessageAndWaitAsync(new MartenSequencedCommand(sagaId, 2));

// Send message 1 which fills the gap - ShouldProceed will republish 2
await _host.ExecuteAndWaitAsync(async () =>
{
await _host.Services.GetRequiredService<IMessageBus>()
.PublishAsync(new MartenSequencedCommand(sagaId, 1));
}, timeoutInMilliseconds: 30000);

var state = await LoadState(sagaId);
state.ShouldNotBeNull();
state.LastSequence.ShouldBe(2);
state.Pending.ShouldBeEmpty();
state.ProcessedOrders.ShouldContain(1);
state.ProcessedOrders.ShouldContain(2);
}

[Fact]
public async Task null_order_bypasses_guard()
{
var sagaId = Guid.NewGuid();

await _host.InvokeMessageAndWaitAsync(new StartMartenSequencedSaga(sagaId));
await _host.InvokeMessageAndWaitAsync(new MartenSequencedCommand(sagaId, null));

var state = await LoadState(sagaId);
state.ShouldNotBeNull();
state.ProcessedOrders.ShouldBe([null]);
}
}
144 changes: 144 additions & 0 deletions src/Testing/CoreTests/Persistence/Sagas/resequencer_saga_in_memory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Persistence.Sagas;
using Wolverine.Tracking;
using Wolverine.Transports;
using Xunit;

namespace CoreTests.Persistence.Sagas;

public record StartSequencedSaga(Guid Id);

public record SequencedCommand(Guid SagaId, int? Order) : SequencedMessage;

public class TestResequencerSaga : ResequencerSaga<SequencedCommand>
{
public Guid Id { get; set; }
public List<int?> ProcessedOrders { get; set; } = new();

public static TestResequencerSaga Start(StartSequencedSaga cmd)
{
return new TestResequencerSaga { Id = cmd.Id };
}

public void Handle(SequencedCommand cmd)
{
ProcessedOrders.Add(cmd.Order);
}
}

public class resequencer_saga_in_memory : IAsyncLifetime
{
private IHost _host;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Discovery.DisableConventionalDiscovery()
.IncludeType<TestResequencerSaga>();

opts.PublishAllMessages().To(TransportConstants.LocalUri);
})
.StartAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

private async Task<TestResequencerSaga?> LoadState(Guid id)
{
return _host.Services.GetRequiredService<InMemorySagaPersistor>()
.Load<TestResequencerSaga>(id);
}

[Fact]
public async Task messages_in_order_are_all_handled()
{
var sagaId = Guid.NewGuid();

await _host.InvokeMessageAndWaitAsync(new StartSequencedSaga(sagaId));
await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, 1));
await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, 2));
await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, 3));

var state = await LoadState(sagaId);
state.ShouldNotBeNull();
state.ProcessedOrders.ShouldBe([1, 2, 3]);
state.LastSequence.ShouldBe(3);
state.Pending.ShouldBeEmpty();
}

[Fact]
public async Task out_of_order_message_is_queued_not_handled()
{
var sagaId = Guid.NewGuid();

await _host.InvokeMessageAndWaitAsync(new StartSequencedSaga(sagaId));
await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, 3));

var state = await LoadState(sagaId);
state.ShouldNotBeNull();
state.ProcessedOrders.ShouldBeEmpty();
state.LastSequence.ShouldBe(0);
state.Pending.Count.ShouldBe(1);
}

[Fact]
public async Task out_of_order_messages_replayed_when_gap_fills()
{
var sagaId = Guid.NewGuid();

await _host.InvokeMessageAndWaitAsync(new StartSequencedSaga(sagaId));
await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, 3));
await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, 2));

// Send message 1 which fills the gap - ShouldProceed will republish 2 and 3
// The tracked session will wait for all cascading messages to complete
await _host.ExecuteAndWaitAsync(async () =>
{
await _host.Services.GetRequiredService<IMessageBus>()
.PublishAsync(new SequencedCommand(sagaId, 1));
}, timeoutInMilliseconds: 30000);

var state = await LoadState(sagaId);
state.ShouldNotBeNull();
state.LastSequence.ShouldBe(3);
state.Pending.ShouldBeEmpty();
state.ProcessedOrders.ShouldContain(1);
state.ProcessedOrders.ShouldContain(2);
state.ProcessedOrders.ShouldContain(3);
}

[Fact]
public async Task null_order_bypasses_guard()
{
var sagaId = Guid.NewGuid();

await _host.InvokeMessageAndWaitAsync(new StartSequencedSaga(sagaId));
await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, null));

var state = await LoadState(sagaId);
state.ShouldNotBeNull();
state.ProcessedOrders.ShouldBe([null]);
}

[Fact]
public async Task zero_order_bypasses_guard()
{
var sagaId = Guid.NewGuid();

await _host.InvokeMessageAndWaitAsync(new StartSequencedSaga(sagaId));
await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, 0));

var state = await LoadState(sagaId);
state.ShouldNotBeNull();
state.ProcessedOrders.ShouldBe([0]);
}
}
Loading
Loading