Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ const config: UserConfig<DefaultTheme.Config> = {
{text: 'Event Subscriptions', link: '/guide/durability/marten/subscriptions'},
{text: 'Subscription/Projection Distribution', link: '/guide/durability/marten/distribution'},
{text: 'Sagas', link: '/guide/durability/marten/sagas'},
{text: 'Process Manager via Handlers', link: '/guide/durability/marten/process-manager-via-handlers'},
{text: 'Multi-Tenancy and Marten', link: '/guide/durability/marten/multi-tenancy'},
{text: 'Ancillary Marten Stores', link: '/guide/durability/marten/ancillary-stores'},
]},
Expand Down
973 changes: 973 additions & 0 deletions docs/guide/durability/marten/process-manager-via-handlers.md

Large diffs are not rendered by default.

52 changes: 52 additions & 0 deletions src/Samples/ProcessManagerSample/DISCOVERIES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Discoveries: Process Manager via Existing Wolverine + Marten Features

This log records what the implementation of `ProcessManagerSample` taught us beyond the initial research notes. It is the feedback loop for the recipe in `docs/guide/durability/marten/process-manager-via-handlers.md` and any future `ProcessManager<TState>` framework proposal.

Entries are grouped by kind: confirmed expectations, corrected expectations, new gotchas, recipe adjustments, and open questions.

## Confirmed expectations

1. **`public Guid Id { get; set; }` is required on the state type.** Without it, `ResetAllMartenDataAsync()` / `CleanAllDataAsync()` throws `InvalidDocumentException` because Marten registers the aggregate as a document type.
2. **Correlation by convention works with no attribute.** A command/event property named `{AggregateTypeName}Id` (for us, `OrderFulfillmentStateId`) resolves the stream id automatically. This held across every continue handler in the sample with no per-handler override.
3. **Inline snapshot projection is enough.** Registering `opts.Projections.Snapshot<OrderFulfillmentState>(SnapshotLifecycle.Inline)` makes the next `FetchForWriting` see the latest state without running a daemon. Between two back-to-back `InvokeMessageAndWaitAsync` calls, the second call sees the first call's effects.
4. **`InvokeMessageAndWaitAsync` commits before returning.** The next `LightweightSession().Events.FetchStreamAsync(id)` on the same thread sees the appended event without any sleep or poll.
5. **`FetchForWriting` on a non-existent stream does return `Aggregate == null`.** This matches the research notes verbatim. However, see the corrected expectation below for why this is not actually reachable from the start handler by default.
6. **Pure-function handler tests are as simple as advertised.** `new OrderFulfillmentState { ... }` constructed directly, passed with a plain event into a static `Handle`, assertions run on the returned `Events` list. No host, no Marten, no DI, no async.
7. **Idempotency via per-step flag check works.** Checking `if (state.ThisStepAlreadyHappened) return new Events();` inside each continue handler makes a duplicate integration event a no-op, reliably, because the inline projection has already reflected the first occurrence by the time the duplicate is dispatched.
8. **`(IStartStream, OutgoingMessages)` tuple return composes cleanly.** Both values unpack correctly from the same handler: the `IStartStream` side effect creates the stream and appends the initial events; the `OutgoingMessages` items are dispatched through Wolverine's scheduler. `IStartStream` inherits from `IMartenOp` which inherits from `ISideEffect` (`IMartenOp.cs:23`), which is what makes the unpacker accept it alongside an `OutgoingMessages`.
9. **The timeout handler stays pure.** `PaymentTimeoutHandler` is a standard `[AggregateHandler]` with a `(PaymentTimeout, OrderFulfillmentState) -> Events` signature. No `IMessageBus` injection. The terminal-state and payment-confirmed guards handle both "process already done" and "payment arrived before the timer fired" cases; no explicit cancellation of the scheduled message is needed. This is the strongest argument for the "let state decide" idiom over "cancel the timer" idiom: one fewer API to manage, one fewer failure mode.
10. **Scheduler in solo mode fires within a few seconds of the requested delay.** With `TimeSpan.FromSeconds(1)`, the timeout handler actually runs within roughly 2 to 4 seconds in our test environment. A 10-second observation window is comfortably above that. The scheduler polls on an interval, so the actual fire time is "requested delay plus one poll cycle."

## Corrected expectations

1. **`[AggregateHandler]` is not suitable for the "start a new stream" case out of the box.** The research notes suggested a uniform `[AggregateHandler]` pattern for all handlers in the process. In practice, `AggregateHandlerAttribute.OnMissing` defaults to `OnMissing.Simple404` (`AggregateHandlerAttribute.cs:144`). When the aggregate does not yet exist, the handler body is short-circuited: zero events are appended and no exception is thrown, which makes the failure silent and easy to miss.

The idiomatic fix matches what the Wolverine test suite already does for start cases: a plain handler that returns `IStartStream` via `MartenOps.StartStream<TState>(id, events...)`. See `src/Persistence/MartenTests/AggregateHandlerWorkflow/aggregate_handler_workflow_with_ievent.cs:144` for the reference pattern.

Consequence for the recipe: the start handler has a different shape from continue handlers. The continue handlers stay on `[AggregateHandler]` because the stream exists by the time they run.

## New gotchas

1. **Silent no-op on missing aggregate.** Tied to the corrected expectation above. If you accidentally put `[AggregateHandler]` on a would-be start handler, the integration test will not throw; it will just report zero events on the stream. First-time failure mode is confusing. The doc should call this out explicitly.
2. **Nullable single-event returns are unsafe.** Returning `TEvent?` (for example `OrderFulfillmentCancelled?`) from an `[AggregateHandler]` handler looks ergonomic but is a trap: `EventCaptureActionSource` generates an unconditional `stream.AppendOne(variable)` with no null check (`AggregateHandlerAttribute.cs:225`). A `return null` would call `AppendOne(null)`. Always return `Events` (possibly empty) for no-op paths. This is the idiom the recipe should teach for any "sometimes no event" shape.
3. **Completion guard and idempotency guard are two different checks.** `if (state.IsTerminal) return;` handles "the process is closed." `if (state.ThisStepAlreadyHappened) return;` handles "at-least-once redelivery of a message that already landed." Merging them would lose the ability to tell a post-terminal late delivery from a normal retry, and both occur in practice. Continue handlers therefore carry two guard lines at the top, not one.
4. **No first-class "wait for scheduled message to fire" test helper.** Wolverine's `InvokeMessageAndWaitAsync` waits for the cascading work of a single dispatch, not for a delayed message that the scheduler has yet to pick up. For the timeout integration test we wrote a plain polling helper (`WaitForCondition(id, predicate)`) that `FetchLatest<T>`s the state every 250ms until the condition is met or a deadline passes. This works but is unremarkable prose; a more idiomatic test helper on the Wolverine side would be welcome. `Host.TrackActivity()` is likely the right primitive but was not required to get the test green.

## Recipe adjustments (applied)

Each adjustment below describes a change that was applied to the recipe in `docs/guide/durability/marten/process-manager-via-handlers.md` during the iterative build. They are listed here as a historical record of why the recipe looks the way it does, for the benefit of future maintainers and the downstream `ProcessManager<TState>` proposal.

1. **Step "Write your handlers" needs to split into two sub-steps.** One for the start handler (plain, returns `IStartStream`), one for the continue handlers (`[AggregateHandler]`, returns events / `Events` / `OutgoingMessages`). Treating them uniformly is the trap.
2. **Recommend `Events` as the default continue-handler return shape.** Single-event returns work for the happy path but force you into nullable workarounds on the no-op paths. `Events` handles both cases with the same return type (empty list for no-op, one item for a record, two items when the event also trips completion).
3. **The completion guard section in the Recipe should be two sub-sections, not one.** First sub-section: the terminal-state guard (`IsTerminal`). Second sub-section: the step-already-happened idempotency guard. They solve different problems and readers will conflate them if we don't separate them.
4. **Remove the "to be validated" banner from Step 6 (Schedule timeouts) in Phase 6.** The tuple-return pattern `(IStartStream, OutgoingMessages)` is confirmed. The pure timeout handler shape is confirmed. The "let state decide, don't cancel the timer" idiom is confirmed.
5. **Step 6 should explicitly teach the "let state decide" idiom.** The timeout handler does not attempt to cancel the scheduled message when payment arrives early. Instead, the handler's guards turn a late-firing timer into a silent no-op. This is simpler and more robust than an explicit cancellation API, and it only works because the process state is authoritative. Emphasize this when the reader is likely reaching for "how do I cancel the scheduled message."
6. **Step 8 (Test it) should show a polling test helper for scheduler assertions.** Integration tests that rely on the scheduler firing need to poll for the terminal condition; `InvokeMessageAndWaitAsync` alone is not enough because it does not wait for delayed messages. Show the helper shape so readers do not try to solve the same problem from scratch.

## Open questions

1. Can we configure `[AggregateHandler]` with `OnMissing.ProceedAnyway` (or equivalent) to support a uniform pattern, and should the doc show that as an option? Phase 3 did not surface a need for continue handlers to override this, so the question stays specific to start handlers. Deferred.
2. ~~Does `MartenOps.StartStream<T>` play cleanly with `OutgoingMessages` cascading~~? **Answered in Phase 5: yes.** `(IStartStream, OutgoingMessages)` tuple return works because `IStartStream` is an `IMartenOp : ISideEffect` and the unpacker applies the side effect while also dispatching the `OutgoingMessages`.
3. ~~What happens if `StartOrderFulfillment` is dispatched twice for the same id?~~ **Answered in Phase 7.** A second start for an existing id throws `Marten.Exceptions.ExistingStreamIdCollisionException` ("Stream #{id} already exists in the database"). The exception inherits from `MartenException`, not from `ConcurrencyException`. Wolverine's `InvokeMessageAndWaitAsync` propagates it with its original type intact; there is no wrapping exception to unwrap. The transaction rolls back cleanly, so the first start's data remains the authoritative state. The exception carries the duplicate id and the aggregate type as properties, which makes it trivial to log or convert to an idempotent "already started, ignoring" response. Test: `ProcessManagerSample.Tests.OrderFulfillment.when_starting_a_fulfillment.starting_the_same_process_twice_throws_and_first_start_wins`.
4. ~~Concurrency check firing~~. **Decided in Phase 7: docs-only.** The sample does not simulate two handlers racing on the same stream. A deterministic race test in the sample would require either bypassing the transport (testing something else) or racing real dispatches (non-deterministic and flaky); neither strengthens the pattern-specific signal. The mechanic is already covered by Wolverine's own test suite. The exception-hierarchy trap that readers need to know (`DcbConcurrencyException` and `ExistingStreamIdCollisionException` not inheriting from `ConcurrencyException`) is documented in Section 5 Friction Points and Section 7 DCB Enhancement of the guide.
5. Is there a supported `Host.TrackActivity()`-style helper that waits specifically for delayed/scheduled messages to fire? Our integration test used a polling helper which works but feels like rolling our own. Worth a follow-up look.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using Alba;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Wolverine;
using Wolverine.Runtime;
using Xunit;

namespace ProcessManagerSample.Tests;

public class AppFixture : IAsyncLifetime
{
public IAlbaHost? Host { get; private set; }

public async Task InitializeAsync()
{
Host = await AlbaHost.For<Program>(x =>
{
x.ConfigureServices(services =>
{
// Strip any external transports; this sample only exercises local handlers.
services.DisableAllExternalWolverineTransports();

// Required when running Marten + Wolverine in a single test process.
services.MartenDaemonModeIsSolo();
services.RunWolverineInSoloMode();
});
});
}

public async Task DisposeAsync()
{
await Host!.StopAsync();
Host.Dispose();
}
}

[CollectionDefinition("integration")]
public class IntegrationCollection : ICollectionFixture<AppFixture>;

[Collection("integration")]
public abstract class IntegrationContext : IAsyncLifetime
{
private readonly AppFixture _fixture;

protected IntegrationContext(AppFixture fixture)
{
_fixture = fixture;
Runtime = (WolverineRuntime)fixture.Host!.Services.GetRequiredService<IWolverineRuntime>();
}

public WolverineRuntime Runtime { get; }
public IAlbaHost Host => _fixture.Host!;
public IDocumentStore Store => _fixture.Host!.Services.GetRequiredService<IDocumentStore>();

async Task IAsyncLifetime.InitializeAsync()
{
await Host.ResetAllMartenDataAsync();
}

public Task DisposeAsync() => Task.CompletedTask;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
using ProcessManagerSample.OrderFulfillment;
using ProcessManagerSample.OrderFulfillment.Handlers;
using Shouldly;
using Xunit;

namespace ProcessManagerSample.Tests.OrderFulfillment;

/// <summary>
/// Pure function tests. No Wolverine host, no Marten session. The handlers are static methods
/// over plain inputs, so we can construct state in memory and assert directly on the returned events.
/// </summary>
public class HandlerUnitTests
{
private static OrderFulfillmentState InProgressState(
bool paymentConfirmed = false,
bool itemsReserved = false,
bool shipmentConfirmed = false)
{
return new OrderFulfillmentState
{
Id = Guid.NewGuid(),
CustomerId = Guid.NewGuid(),
TotalAmount = 100m,
PaymentConfirmed = paymentConfirmed,
ItemsReserved = itemsReserved,
ShipmentConfirmed = shipmentConfirmed
};
}

[Fact]
public void payment_confirmed_records_event_and_stays_in_progress_when_gates_remain()
{
var state = InProgressState();
var @event = new PaymentConfirmed(state.Id, state.TotalAmount);

var result = PaymentConfirmedHandler.Handle(@event, state);

result.Count.ShouldBe(1);
result[0].ShouldBeOfType<PaymentConfirmed>();
}

[Fact]
public void payment_confirmed_also_completes_when_other_two_gates_are_already_satisfied()
{
var state = InProgressState(itemsReserved: true, shipmentConfirmed: true);
var @event = new PaymentConfirmed(state.Id, state.TotalAmount);

var result = PaymentConfirmedHandler.Handle(@event, state);

result.Count.ShouldBe(2);
result[0].ShouldBeOfType<PaymentConfirmed>();
var completed = result[1].ShouldBeOfType<OrderFulfillmentCompleted>();
completed.OrderFulfillmentStateId.ShouldBe(state.Id);
}

[Fact]
public void payment_confirmed_is_a_no_op_when_payment_already_recorded()
{
var state = InProgressState(paymentConfirmed: true);
var @event = new PaymentConfirmed(state.Id, state.TotalAmount);

var result = PaymentConfirmedHandler.Handle(@event, state);

result.ShouldBeEmpty();
}

[Fact]
public void completion_guard_rejects_a_continue_message_after_terminal_state()
{
var state = InProgressState();
state.IsCompleted = true;

var result = PaymentConfirmedHandler.Handle(
new PaymentConfirmed(state.Id, state.TotalAmount),
state);

result.ShouldBeEmpty();
}

[Fact]
public void completion_guard_rejects_a_continue_message_after_cancellation()
{
var state = InProgressState();
state.IsCancelled = true;

var result = ShipmentConfirmedHandler.Handle(
new ShipmentConfirmed(state.Id, "TRACK-1"),
state);

result.ShouldBeEmpty();
}

[Fact]
public void cancel_emits_terminal_event_with_reason()
{
var state = InProgressState(paymentConfirmed: true);
var command = new CancelOrderFulfillment(state.Id, "Customer requested");

var result = CancelOrderFulfillmentHandler.Handle(command, state);

result.Count.ShouldBe(1);
var cancelled = result[0].ShouldBeOfType<OrderFulfillmentCancelled>();
cancelled.OrderFulfillmentStateId.ShouldBe(state.Id);
cancelled.Reason.ShouldBe("Customer requested");
}

[Fact]
public void cancel_on_already_terminal_state_is_a_no_op()
{
var state = InProgressState();
state.IsCompleted = true;

var result = CancelOrderFulfillmentHandler.Handle(
new CancelOrderFulfillment(state.Id, "Too late"),
state);

result.ShouldBeEmpty();
}
}
Loading
Loading