diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index c1b040093..12540515c 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -309,6 +309,7 @@ const config: UserConfig = { {text: 'Event Subscriptions', link: '/guide/durability/marten/subscriptions'}, {text: 'Subscription/Projection Distribution', link: '/guide/durability/marten/distribution'}, {text: 'Sagas', link: '/guide/durability/marten/sagas'}, + {text: 'Process Manager via Handlers', link: '/guide/durability/marten/process-manager-via-handlers'}, {text: 'Multi-Tenancy and Marten', link: '/guide/durability/marten/multi-tenancy'}, {text: 'Ancillary Marten Stores', link: '/guide/durability/marten/ancillary-stores'}, ]}, diff --git a/docs/guide/durability/marten/process-manager-via-handlers.md b/docs/guide/durability/marten/process-manager-via-handlers.md new file mode 100644 index 000000000..ef89f7a1b --- /dev/null +++ b/docs/guide/durability/marten/process-manager-via-handlers.md @@ -0,0 +1,973 @@ +# Process Manager via Handlers + +You can build an event-sourced Process Manager with the Wolverine and Marten features that ship today. No new base class, no new package. This guide shows you how, with a worked sample you can clone. + +The pattern is complementary to Wolverine's [Saga](/guide/durability/sagas) support, not a replacement. Saga stays the right tool when you want a single class per process, a framework-managed lifecycle, and simple document-backed state. The pattern described here trades that for event-sourced state, a full audit trail, and handlers you can test as pure functions. + +## 1. Introduction + +A Process Manager coordinates a long-running business operation that unfolds across multiple steps and multiple messages. Place order, confirm payment, reserve items, schedule shipment, handle a timeout if payment never arrives. Each step is triggered by a different message. Each step needs to see where the process is so it can decide what happens next. + +Wolverine's [Saga](/guide/durability/sagas) support (and its [Marten-backed integration](/guide/durability/marten/sagas)) is the first-class way to do this: a single stateful class, persisted as a document, with a framework-managed `MarkCompleted()` lifecycle. If you have not read those pages yet, read them first. This guide assumes you have. + +This page describes a complementary pattern for the same class of problem: carrying the process as an **event stream** instead of a document, and coordinating the steps through ordinary `[AggregateHandler]` methods rather than a single Saga class. No new base class, no new package; every mechanism used here ships in the current versions of Wolverine and Marten. + +Pick Saga when: + +- You want one class per process and the discoverability that comes with it. +- Simple coordination with timeouts is the primary concern. +- You do not need an audit trail of how the process arrived at its current state. +- `MarkCompleted()` and a framework-managed lifecycle matter to your team. + +Pick Process Manager via handlers when: + +- You want every state change on the process recorded as an event, replayable and queryable. +- You want to test each step as a pure function: `Handle(command, state)` returns events, no host, no database. +- The process state is part of the domain model, not just internal plumbing. +- You are already event-sourcing nearby aggregates and want the process to fit the same mental model. + +Both options live happily side by side in the same application. The choice is per-process, not per-repository. + +## 2. The Building Blocks + +This pattern composes features you probably already know. What is new is the combination. + +### `IEventStream` and `FetchForWriting` + +[`FetchForWriting`](https://martendb.io/events/optimistic_concurrency.html) is the Marten entry point. It loads an event stream, replays its events through your aggregate type's `Apply` methods, and hands you back an `IEventStream` whose `Aggregate` property is the projected state. You append new events to that stream; on `SaveChangesAsync`, Marten runs an optimistic concurrency check and persists them atomically. + +```csharp +public interface IEventStream where T : notnull +{ + T? Aggregate { get; } + long? StartingVersion { get; } + long? CurrentVersion { get; } + Guid Id { get; } + string Key { get; } + IReadOnlyList Events { get; } + void AppendOne(object @event); + void AppendMany(params object[] events); +} +``` + +`Aggregate` is null and `StartingVersion` is null when the stream does not exist yet. That is important and returns in [Section 3](#the-recipe) when you look at the start handler. + +### `[AggregateHandler]` + +`[AggregateHandler]` is a class-level attribute from `Wolverine.Marten` that wires the full `FetchForWriting` plus `SaveChangesAsync` plus concurrency-check middleware around every handler method on the class. You get four behaviors for free: + +1. The stream id is extracted from the incoming message using convention-based resolution. +2. The stream is loaded and the aggregate projected through its `Apply` methods. +3. The handler receives the projected aggregate as a parameter. +4. Any events returned from the handler are appended to the stream, and the session is saved. + +As a naming-based alternative, any static class whose name ends with `AggregateHandler` is treated as if it carried the attribute. This guide uses the explicit attribute form throughout for clarity. + +### `[WriteAggregate]` + +`[WriteAggregate]` is the parameter-level version of the same idea. It decorates a single handler parameter instead of the whole class. You reach for `[WriteAggregate]` when one of the following is true: + +- You want to override the convention-based stream-id resolution on a per-handler basis, for example when an external integration event names the id differently: `[WriteAggregate("OrderId")] OrderFulfillmentState state`. +- You want only one handler method on a class to participate in the aggregate workflow, while the others do something else. +- You want to override concurrency style (`ConcurrencyStyle.Exclusive` for an advisory lock) for a specific handler. + +If `[AggregateHandler]` works for your handler, prefer it. `[WriteAggregate]` is the escape hatch. + +### `MartenOps.StartStream` + +`FetchForWriting` is how you attach handlers to an existing stream. To **create** the stream in the first place, you return a result built by `MartenOps.StartStream(id, events...)` from a plain handler. Wolverine recognizes the return value, writes the stream-start and the initial events in one transaction, and you are off to the races. The start handler has a different shape from continue handlers, and [Section 3](#the-recipe) is specific about why. + +### `Apply` methods on the state type + +The projected state is a plain C# class. For each event the process cares about, you write a method named `Apply(TheEvent e)` that mutates the instance. Marten's [single stream projection](https://martendb.io/events/projections/aggregate-projections.html) machinery finds these methods by convention, plays events through them, and hands back the reduced state. No base class, no interface, no framework type required. The only non-negotiable is a `public Guid Id { get; set; }` property, because Marten registers the type as a document type behind the scenes. + +### `OutgoingMessages` for cascading work and timeouts + +`OutgoingMessages` is a `List` with intent. Return one from a handler and Wolverine dispatches every message in it through the outbox. The important part for a Process Manager is the scheduling overload: + +```csharp +public class OutgoingMessages : List, IWolverineReturnType +{ + void Delay(T message, TimeSpan delay); + void Schedule(T message, DateTimeOffset time); + void RespondToSender(object response); + void ToEndpoint(T message, string endpointName, DeliveryOptions? options); +} +``` + +`Delay` and `Schedule` are how you arm a payment timeout from the start handler without injecting `IMessageBus`. That keeps continue handlers testable as pure functions; the scheduled message is just an item in the returned list, and the test can assert on it directly. (The start handler itself is asymmetric, for reasons [Section 3 Step 4](#step-4-write-your-handlers) covers.) + +### `Events` for fluent appending + +`Events` (from the `Wolverine.Marten` namespace) is also a `List`, and it is recognized by the aggregate-handler codegen as "append each of these to the stream." It exists so you can return multiple events from one handler without having to build a tuple: + +```csharp +public static Events Handle(ConfirmPayment cmd, OrderFulfillmentState state) +{ + var events = new Events(); + events += new PaymentConfirmed(cmd.OrderFulfillmentStateId, cmd.Amount); + if (state.ItemsReserved) events += new OrderFulfillmentCompleted(cmd.OrderFulfillmentStateId); + return events; +} +``` + +For the common case of one event per handler, just return the event directly and Wolverine treats it as a single-item append. Use `Events` when the count is conditional. Use a tuple `(Events, OutgoingMessages)` when you want to append events **and** schedule follow-up messages in the same handler. + +### How these pieces combine + +Taken together, the ingredients look like this: + +- A state type with `Apply` methods and a `Guid Id` property. +- A small catalogue of event records, past tense. +- A small catalogue of command records, imperative. +- One plain start handler that returns `IStartStream` via `MartenOps.StartStream`. +- One `[AggregateHandler]` class per continue message type. Each handler returns events, possibly an `Events` list, possibly an `(Events, OutgoingMessages)` tuple. +- A terminal event that every continue handler guards against, so late-arriving messages are no-ops instead of corruption. +- A Marten configuration that registers the state type as an inline snapshot projection. + +[Section 3](#the-recipe) walks through each of those in order, using the order fulfillment sample as the worked example. + +## 3. The Recipe + +A Process Manager built with this pattern is a handful of small files, arranged in a predictable shape. The steps below map one-for-one to files in [the ProcessManagerSample](https://github.com/JasperFx/wolverine/tree/main/src/Samples/ProcessManagerSample). Open that alongside the recipe; it compiles, runs, and has a full test suite behind it. + +### Step 1: Define the process state type + +```csharp +public class OrderFulfillmentState +{ + public Guid Id { get; set; } + + public Guid CustomerId { get; set; } + public decimal TotalAmount { get; set; } + + public bool PaymentConfirmed { get; set; } + public bool ItemsReserved { get; set; } + public bool ShipmentConfirmed { get; set; } + + public bool IsCompleted { get; set; } + public bool IsCancelled { get; set; } + + public bool IsTerminal => IsCompleted || IsCancelled; + + public void Apply(OrderFulfillmentStarted e) + { + Id = e.OrderFulfillmentStateId; + CustomerId = e.CustomerId; + TotalAmount = e.TotalAmount; + } + + public void Apply(PaymentConfirmed _) => PaymentConfirmed = true; + public void Apply(ItemsReserved _) => ItemsReserved = true; + public void Apply(ShipmentConfirmed _) => ShipmentConfirmed = true; + public void Apply(OrderFulfillmentCompleted _) => IsCompleted = true; + public void Apply(OrderFulfillmentCancelled _) => IsCancelled = true; +} +``` + +Three rules drive the shape. + +A `public Guid Id { get; set; }` is non-negotiable. Marten registers this type as a document type when you snapshot it, and document types need a settable identity. Omit it and `CleanAllDataAsync` (which you will run in test setup) throws `InvalidDocumentException`. + +Every event the process cares about gets an `Apply` method. Marten's aggregation machinery discovers them by convention. Keep them boring: parameter name of `_` is fine when all you need is the event's existence, not its payload. + +An `IsTerminal` helper is not required by the framework, but every continue handler will check it, so computing it once on the state type avoids drift. + +### Step 2: Define your events + +```csharp +public record OrderFulfillmentStarted( + Guid OrderFulfillmentStateId, + Guid CustomerId, + decimal TotalAmount); + +public record PaymentConfirmed(Guid OrderFulfillmentStateId, decimal Amount); +public record ItemsReserved(Guid OrderFulfillmentStateId, Guid ReservationId); +public record ShipmentConfirmed(Guid OrderFulfillmentStateId, string TrackingNumber); + +public record OrderFulfillmentCompleted(Guid OrderFulfillmentStateId); +public record OrderFulfillmentCancelled(Guid OrderFulfillmentStateId, string Reason); +``` + +Past tense. Records. Each carries the stream id as its first property. + +Two rules make this easier than it looks. First, the id property is named `{AggregateTypeName}Id`, which is `OrderFulfillmentStateId` here. Wolverine's convention-based resolution finds it without any attribute. Second, the sample uses the same event type for both the incoming integration event and the stream event it records; the process "accepts" the external fact by writing it to its own stream. That is not the only valid modelling choice (you can keep them separate), but it is the tightest for a sample and mirrors how many real process managers work in practice. + +Include **at least one terminal event**. The sample has two: a happy-path `OrderFulfillmentCompleted` and a compensating `OrderFulfillmentCancelled`. The compensating event will matter in Step 5 and again when you add a payment timeout in Step 6. + +### Step 3: Define your commands + +```csharp +public record StartOrderFulfillment( + Guid OrderFulfillmentStateId, + Guid CustomerId, + decimal TotalAmount); + +public record CancelOrderFulfillment( + Guid OrderFulfillmentStateId, + string Reason); +``` + +Imperative, records, and carrying the stream id on the same property name as the events. Keep commands and integration events in separate files so the reading order is clear (imperatives in one, facts in the other). The sample uses `Commands.cs` and `Events.cs`. + +If the messages that trigger your process come from an external bounded context and already have their own id property name (for example, `OrderId` rather than `OrderFulfillmentStateId`), do not rename them. Use the `[WriteAggregate("OrderId")]` escape hatch from Step 4 instead. + +### Step 4: Write your handlers + +The start handler and the continue handlers are **different shapes**. Treating them uniformly is the most common early mistake with this pattern. The two shapes are covered in Steps 4a and 4b. + +#### Step 4a: The start handler + +```csharp +public static class StartOrderFulfillmentHandler +{ + public static IStartStream Handle(StartOrderFulfillment command) + { + var started = new OrderFulfillmentStarted( + command.OrderFulfillmentStateId, + command.CustomerId, + command.TotalAmount); + + return MartenOps.StartStream( + command.OrderFulfillmentStateId, started); + } +} +``` + +Simplified here for focus; [Step 6](#step-6-schedule-timeouts) shows the full shape with the scheduled payment timeout, which is what the actual sample file carries. + +A plain static class. No `[AggregateHandler]` attribute. The handler returns an `IStartStream` built by `MartenOps.StartStream(id, events...)`, and Wolverine takes care of creating the stream, appending the initial events, and calling `SaveChangesAsync`. + +The reason this is not an `[AggregateHandler]`: `AggregateHandlerAttribute` defaults `OnMissing` to `OnMissing.Simple404`. When you apply it to a handler whose aggregate does not yet exist, the middleware short-circuits before your method runs. No events are appended, no exception is thrown, and the failure is silent. `MartenOps.StartStream` is the idiomatic way to express "this command creates the stream" and it matches what the Wolverine test suite does in `src/Persistence/MartenTests/AggregateHandlerWorkflow/`. + +One corollary: a duplicated `StartOrderFulfillment` for the same id fails with a `Marten.Exceptions.ExistingStreamIdCollisionException` ("Stream #{id} already exists in the database"). Wolverine propagates this exception through `InvokeMessageAndWaitAsync` with its original type intact; the transaction rolls back cleanly, so the first start's data remains authoritative. If your trigger source may deliver the start command at least once, catch this exception and convert it to an idempotent "already started, ignoring" response; otherwise let the caller guarantee a unique process id at dispatch time. The sample's [`starting_the_same_process_twice_throws_and_first_start_wins`](https://github.com/JasperFx/wolverine/tree/main/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/when_starting_a_fulfillment.cs) test covers this exact scenario. + +#### Step 4b: Continue handlers + +```csharp +[AggregateHandler] +public static class PaymentConfirmedHandler +{ + public static Events Handle(PaymentConfirmed @event, OrderFulfillmentState state) + { + if (state.IsTerminal) return new Events(); + if (state.PaymentConfirmed) return new Events(); + + var events = new Events(); + events += @event; + + if (state.ItemsReserved && state.ShipmentConfirmed) + { + events += new OrderFulfillmentCompleted(state.Id); + } + + return events; + } +} +``` + +One static class per trigger message. `[AggregateHandler]` at the class level wires `FetchForWriting` plus optimistic concurrency plus `SaveChangesAsync` around every handler method on the class. The method receives the projected `OrderFulfillmentState` already loaded from the stream. + +Prefer `Events` (from the `Wolverine.Marten` namespace) as the return type. You get three benefits: appending a single event, appending two events when a step also trips completion, and returning an empty `Events` for no-op paths are all the same shape. Single-event returns work too, but they force you into nullable workarounds on the no-op paths, and nullable event returns are unsafe: the aggregate-handler code generator emits an unconditional `stream.AppendOne(variable)` with no null check, so a `return null;` will call `AppendOne(null)`. + +If the incoming message's id property does not match the `{AggregateTypeName}Id` convention, use `[WriteAggregate("CustomName")] OrderFulfillmentState state` on the parameter and drop the class-level `[AggregateHandler]`. + +### Step 5: Handle completion + +Completion means two different things. Readers and reviewers conflate them. The sample keeps them as two separate guards at the top of every continue handler. + +#### Step 5a: The terminal-state guard + +```csharp +if (state.IsTerminal) return new Events(); +``` + +This guard prevents a late-arriving integration event from corrupting a finished process. If the customer cancelled five minutes ago and the warehouse has not heard yet, an `ItemsReserved` message is going to arrive after `OrderFulfillmentCancelled`. Without the guard, you would append an `ItemsReserved` event to a cancelled stream and the projection would report `ItemsReserved == true` on a cancelled order. With the guard, the message is a silent no-op. + +Every continue handler carries this line. For N continue handlers, that is N guard lines. There is no framework-level `MarkCompleted()`; discipline holds the invariant. This is one of the friction points in Section 5. + +#### Step 5b: The step-level idempotency guard + +```csharp +if (state.PaymentConfirmed) return new Events(); +``` + +This guard prevents at-least-once redelivery of the same integration event from being recorded twice. Your transport will occasionally re-deliver the same `PaymentConfirmed` message. Without the guard, you would append two identical events. With it, the second delivery is a no-op. + +The completion guard and the idempotency guard look similar but handle different failure modes. Keep them as two separate lines. Merging them would lose the distinction between "the process is closed" and "this specific fact is already recorded," and both happen in real systems. + +#### The terminal event append + +Any continue handler can be the one that trips completion. Whichever handler observes that the other two gates are already satisfied appends the terminal event along with its own fact: + +```csharp +if (state.ItemsReserved && state.ShipmentConfirmed) +{ + events += new OrderFulfillmentCompleted(state.Id); +} +``` + +This keeps the terminal event in the hands of whichever step actually closes the process, rather than funnelling every step through a central "maybe complete" handler. The tradeoff is that the condition appears in every handler, with the two "other" flags named each time. For three steps this is fine; for a ten-step process this starts to ache. + +The compensating handler is simpler: + +```csharp +[AggregateHandler] +public static class CancelOrderFulfillmentHandler +{ + public static Events Handle(CancelOrderFulfillment command, OrderFulfillmentState state) + { + if (state.IsTerminal) return new Events(); + + var events = new Events(); + events += new OrderFulfillmentCancelled(state.Id, command.Reason); + return events; + } +} +``` + +No step-level idempotency guard; cancellation is terminal by its first occurrence, so the terminal-state guard alone is enough. + +### Step 6: Schedule timeouts + +You schedule a timeout without injecting `IMessageBus`. Return an `OutgoingMessages` alongside whatever the handler produces, and Wolverine dispatches each item through the outbox: + +```csharp +public class OutgoingMessages : List, IWolverineReturnType +{ + void Delay(T message, TimeSpan delay); + void Schedule(T message, DateTimeOffset time); + // ... +} +``` + +The start handler returns a tuple. The first element creates the stream and appends the initial event; the second element schedules the timeout: + +```csharp +public static (IStartStream, OutgoingMessages) Handle(StartOrderFulfillment command) +{ + var started = new OrderFulfillmentStarted( + command.OrderFulfillmentStateId, + command.CustomerId, + command.TotalAmount); + + var outgoing = new OutgoingMessages(); + outgoing.Delay( + new PaymentTimeout(command.OrderFulfillmentStateId), + command.PaymentTimeoutWindow ?? DefaultPaymentTimeoutWindow); + + return ( + MartenOps.StartStream(command.OrderFulfillmentStateId, started), + outgoing); +} +``` + +The tuple return is Wolverine's multi-result convention. `IStartStream` is an `IMartenOp : ISideEffect`, so Wolverine's return-value unpacker applies it alongside the `OutgoingMessages` without either fighting the other. This is what keeps the start handler a single method instead of splitting stream creation and timeout scheduling into separate handlers. + +#### Let state decide. Do not cancel the timer. + +The timeout handler is a standard `[AggregateHandler]` that uses the same guards you wrote for every other continue handler: + +```csharp +[AggregateHandler] +public static class PaymentTimeoutHandler +{ + public static Events Handle(PaymentTimeout _, OrderFulfillmentState state) + { + if (state.IsTerminal) return new Events(); + if (state.PaymentConfirmed) return new Events(); + + var events = new Events(); + events += new OrderFulfillmentCancelled(state.Id, "Payment timed out"); + return events; + } +} +``` + +Notice what is not here: there is no API to "cancel" the scheduled message when payment arrives early. You do not need one. When the timer fires, the handler loads the current state, sees that payment already confirmed, and returns an empty `Events`. The scheduled message becomes a silent no-op. + +This is the cleanest ergonomic win this pattern has over a Saga plus explicit-cancel approach. A cancel-the-timer design has to race the cancel against the timer firing, needs a cancellation API, and breaks if the cancel message is lost. A let-state-decide design relies on the state being authoritative and always current, which is exactly what event sourcing gives you. The timeout handler stays pure. The start handler stays pure. No `IMessageBus` injection anywhere in the process. + +One consequence worth flagging: a long timeout window leaves the scheduler holding a message the process no longer cares about. If your transport or durability store has per-message cost concerns at scale, you may want shorter windows or explicit cancellation anyway. The sample's 15-minute default is fine for most workloads. + +### Step 7: Wire it up + +```csharp +builder.Services.AddMarten(opts => + { + var connectionString = builder.Configuration.GetConnectionString("Marten"); + opts.Connection(connectionString!); + opts.DatabaseSchemaName = "process_manager"; + + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }) + .IntegrateWithWolverine(); + +builder.Host.UseWolverine(opts => +{ + opts.Policies.AutoApplyTransactions(); +}); +``` + +Two pieces worth highlighting. + +`SnapshotLifecycle.Inline` is what makes the next `FetchForWriting` call see the previous one's effects without running an async daemon. If you already run projections in the background, you can keep them there for reads and still use inline for the process state; the two settings are independent. + +`opts.Policies.AutoApplyTransactions()` ensures the Marten session is wrapped around every handler, which is what makes `SaveChangesAsync` actually run. Without it, the start handler's `IStartStream` return would not be persisted. + +### Step 8: Test it + +Two styles. Both belong in the test project, and both are cheap enough that you should write both. + +**Unit test (pure function).** Construct the state directly, call the handler, assert on the returned events: + +```csharp +[Fact] +public void payment_confirmed_also_completes_when_other_two_gates_are_already_satisfied() +{ + var state = new OrderFulfillmentState + { + Id = Guid.NewGuid(), + ItemsReserved = true, + ShipmentConfirmed = true + }; + + var result = PaymentConfirmedHandler.Handle( + new PaymentConfirmed(state.Id, 249m), state); + + result.Count.ShouldBe(2); + result[0].ShouldBeOfType(); + result[1].ShouldBeOfType(); +} +``` + +No Wolverine host. No Marten. No async. The handler is a static method over plain inputs, and the state type has no base class, so constructing it by object initializer is trivial. This is one of the strongest arguments this pattern has over Saga. + +**Integration test.** Use Alba plus Wolverine's tracking support to run a full `InvokeMessageAndWaitAsync` sequence: + +```csharp +[Fact] +public async Task happy_path_ends_with_OrderFulfillmentCompleted() +{ + var id = Guid.NewGuid(); + + await Host.InvokeMessageAndWaitAsync(new StartOrderFulfillment(id, Guid.NewGuid(), 249m)); + await Host.InvokeMessageAndWaitAsync(new PaymentConfirmed(id, 249m)); + await Host.InvokeMessageAndWaitAsync(new ItemsReserved(id, Guid.NewGuid())); + await Host.InvokeMessageAndWaitAsync(new ShipmentConfirmed(id, "TRACK-ABC")); + + await using var session = Store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(id); + + events.Count.ShouldBe(5); + events[4].Data.ShouldBeOfType(); +} +``` + +`InvokeMessageAndWaitAsync` blocks until the transaction commits, so you can open a read session on the next line and see the appended events. + +**Testing scheduled messages.** `InvokeMessageAndWaitAsync` does not wait for a delayed message that the scheduler has yet to pick up. For timeout assertions you need a small polling helper with a generous deadline, because the scheduler fires "around" the requested delay plus one poll cycle: + +```csharp +private async Task WaitForCondition(Guid id, Func predicate) +{ + var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(10); + while (DateTime.UtcNow < deadline) + { + await using var session = Store.LightweightSession(); + var state = await session.Events.FetchLatest(id); + if (state is not null && predicate(state)) return; + + await Task.Delay(TimeSpan.FromMilliseconds(250)); + } + + throw new TimeoutException($"Condition on state {id} not met within the observation window."); +} +``` + +Tests that rely on the scheduler firing then look like: + +```csharp +await Host.InvokeMessageAndWaitAsync(new StartOrderFulfillment( + id, Guid.NewGuid(), 10m, + PaymentTimeoutWindow: TimeSpan.FromSeconds(1))); + +await WaitForCondition(id, state => state.IsTerminal); +``` + +Keep the requested delay small in tests (1 to 2 seconds) and the observation window comfortably larger than one scheduler poll cycle. Do not use a bare `Task.Delay` as an observation window; you will get a flaky test that sometimes passes because the scheduler was fast and sometimes fails because it was slow. + +The sample project's [IntegrationContext.cs](https://github.com/JasperFx/wolverine/tree/main/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/IntegrationContext.cs) shows the Alba bootstrap used here. Two settings matter for test reliability: `services.MartenDaemonModeIsSolo()` and `services.RunWolverineInSoloMode()`. Without them you will fight the distributed durability machinery on every test run. + +## 4. Worked Example + +This section is the reference: every file in the `OrderFulfillment` folder of [`ProcessManagerSample`](https://github.com/JasperFx/wolverine/tree/main/src/Samples/ProcessManagerSample), in the order you would read them, plus the Marten plus Wolverine wiring and one test of each style. The scenario is end-to-end order fulfillment with a payment timeout and a compensating cancellation path. 20 tests back the sample; two of them appear below. + +### `OrderFulfillment/OrderFulfillmentState.cs` + +```csharp +namespace ProcessManagerSample.OrderFulfillment; + +/// +/// Event-sourced state for the order fulfillment process. Projected inline from the event stream +/// via Apply methods. Serves as the correlation surface for the handlers that coordinate payment, +/// warehouse, and shipping steps. +/// +public class OrderFulfillmentState +{ + // Required by Marten: FetchForWriting registers the aggregate type as a document type. + // Without a public Guid Id { get; set; }, CleanAllDataAsync throws InvalidDocumentException. + public Guid Id { get; set; } + + public Guid CustomerId { get; set; } + public decimal TotalAmount { get; set; } + + public bool PaymentConfirmed { get; set; } + public bool ItemsReserved { get; set; } + public bool ShipmentConfirmed { get; set; } + + public bool IsCompleted { get; set; } + public bool IsCancelled { get; set; } + + /// + /// True once the process has reached a terminal state. Every continue handler must guard on this + /// to stay idempotent against late-arriving messages after completion or cancellation. + /// + public bool IsTerminal => IsCompleted || IsCancelled; + + public void Apply(OrderFulfillmentStarted e) + { + Id = e.OrderFulfillmentStateId; + CustomerId = e.CustomerId; + TotalAmount = e.TotalAmount; + } + + public void Apply(PaymentConfirmed _) => PaymentConfirmed = true; + + public void Apply(ItemsReserved _) => ItemsReserved = true; + + public void Apply(ShipmentConfirmed _) => ShipmentConfirmed = true; + + public void Apply(OrderFulfillmentCompleted _) => IsCompleted = true; + + public void Apply(OrderFulfillmentCancelled _) => IsCancelled = true; +} +``` + +### `OrderFulfillment/Events.cs` + +```csharp +namespace ProcessManagerSample.OrderFulfillment; + +public record OrderFulfillmentStarted( + Guid OrderFulfillmentStateId, + Guid CustomerId, + decimal TotalAmount); + +public record PaymentConfirmed( + Guid OrderFulfillmentStateId, + decimal Amount); + +public record ItemsReserved( + Guid OrderFulfillmentStateId, + Guid ReservationId); + +public record ShipmentConfirmed( + Guid OrderFulfillmentStateId, + string TrackingNumber); + +public record OrderFulfillmentCompleted(Guid OrderFulfillmentStateId); + +public record OrderFulfillmentCancelled( + Guid OrderFulfillmentStateId, + string Reason); +``` + +### `OrderFulfillment/Commands.cs` + +```csharp +namespace ProcessManagerSample.OrderFulfillment; + +public record StartOrderFulfillment( + Guid OrderFulfillmentStateId, + Guid CustomerId, + decimal TotalAmount, + TimeSpan? PaymentTimeoutWindow = null); + +public record CancelOrderFulfillment( + Guid OrderFulfillmentStateId, + string Reason); + +public record PaymentTimeout(Guid OrderFulfillmentStateId); +``` + +### `OrderFulfillment/Handlers/StartOrderFulfillmentHandler.cs` + +```csharp +using Wolverine; +using Wolverine.Marten; + +namespace ProcessManagerSample.OrderFulfillment.Handlers; + +public static class StartOrderFulfillmentHandler +{ + public static readonly TimeSpan DefaultPaymentTimeoutWindow = TimeSpan.FromMinutes(15); + + public static (IStartStream, OutgoingMessages) Handle(StartOrderFulfillment command) + { + var started = new OrderFulfillmentStarted( + command.OrderFulfillmentStateId, + command.CustomerId, + command.TotalAmount); + + var outgoing = new OutgoingMessages(); + outgoing.Delay( + new PaymentTimeout(command.OrderFulfillmentStateId), + command.PaymentTimeoutWindow ?? DefaultPaymentTimeoutWindow); + + return ( + MartenOps.StartStream(command.OrderFulfillmentStateId, started), + outgoing); + } +} +``` + +### `OrderFulfillment/Handlers/PaymentConfirmedHandler.cs` + +```csharp +using Wolverine.Marten; + +namespace ProcessManagerSample.OrderFulfillment.Handlers; + +[AggregateHandler] +public static class PaymentConfirmedHandler +{ + public static Events Handle(PaymentConfirmed @event, OrderFulfillmentState state) + { + if (state.IsTerminal) return new Events(); + if (state.PaymentConfirmed) return new Events(); + + var events = new Events(); + events += @event; + + if (state.ItemsReserved && state.ShipmentConfirmed) + { + events += new OrderFulfillmentCompleted(state.Id); + } + + return events; + } +} +``` + +### `OrderFulfillment/Handlers/ItemsReservedHandler.cs` + +```csharp +using Wolverine.Marten; + +namespace ProcessManagerSample.OrderFulfillment.Handlers; + +[AggregateHandler] +public static class ItemsReservedHandler +{ + public static Events Handle(ItemsReserved @event, OrderFulfillmentState state) + { + if (state.IsTerminal) return new Events(); + if (state.ItemsReserved) return new Events(); + + var events = new Events(); + events += @event; + + if (state.PaymentConfirmed && state.ShipmentConfirmed) + { + events += new OrderFulfillmentCompleted(state.Id); + } + + return events; + } +} +``` + +### `OrderFulfillment/Handlers/ShipmentConfirmedHandler.cs` + +```csharp +using Wolverine.Marten; + +namespace ProcessManagerSample.OrderFulfillment.Handlers; + +[AggregateHandler] +public static class ShipmentConfirmedHandler +{ + public static Events Handle(ShipmentConfirmed @event, OrderFulfillmentState state) + { + if (state.IsTerminal) return new Events(); + if (state.ShipmentConfirmed) return new Events(); + + var events = new Events(); + events += @event; + + if (state.PaymentConfirmed && state.ItemsReserved) + { + events += new OrderFulfillmentCompleted(state.Id); + } + + return events; + } +} +``` + +### `OrderFulfillment/Handlers/CancelOrderFulfillmentHandler.cs` + +```csharp +using Wolverine.Marten; + +namespace ProcessManagerSample.OrderFulfillment.Handlers; + +[AggregateHandler] +public static class CancelOrderFulfillmentHandler +{ + public static Events Handle(CancelOrderFulfillment command, OrderFulfillmentState state) + { + if (state.IsTerminal) return new Events(); + + var events = new Events(); + events += new OrderFulfillmentCancelled(state.Id, command.Reason); + return events; + } +} +``` + +### `OrderFulfillment/Handlers/PaymentTimeoutHandler.cs` + +```csharp +using Wolverine.Marten; + +namespace ProcessManagerSample.OrderFulfillment.Handlers; + +[AggregateHandler] +public static class PaymentTimeoutHandler +{ + public static Events Handle(PaymentTimeout _, OrderFulfillmentState state) + { + if (state.IsTerminal) return new Events(); + if (state.PaymentConfirmed) return new Events(); + + var events = new Events(); + events += new OrderFulfillmentCancelled(state.Id, "Payment timed out"); + return events; + } +} +``` + +### `Program.cs` (Marten plus Wolverine wiring) + +```csharp +using JasperFx; +using Marten; +using Marten.Events.Projections; +using ProcessManagerSample.OrderFulfillment; +using Wolverine; +using Wolverine.Marten; + +var builder = WebApplication.CreateBuilder(args); + +builder.Services.AddMarten(opts => + { + var connectionString = builder.Configuration.GetConnectionString("Marten"); + opts.Connection(connectionString!); + opts.DatabaseSchemaName = "process_manager"; + + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }) + .IntegrateWithWolverine(); + +builder.Host.UseWolverine(opts => +{ + opts.Policies.AutoApplyTransactions(); +}); + +var app = builder.Build(); + +app.MapPost("/orders/start", + (StartOrderFulfillment command, IMessageBus bus) => bus.InvokeAsync(command)); + +return await app.RunJasperFxCommands(args); + +public partial class Program; +``` + +### Unit test (pure function) + +From [`HandlerUnitTests.cs`](https://github.com/JasperFx/wolverine/tree/main/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/HandlerUnitTests.cs): + +```csharp +[Fact] +public void payment_confirmed_also_completes_when_other_two_gates_are_already_satisfied() +{ + var state = new OrderFulfillmentState + { + Id = Guid.NewGuid(), + CustomerId = Guid.NewGuid(), + TotalAmount = 100m, + ItemsReserved = true, + ShipmentConfirmed = true + }; + var @event = new PaymentConfirmed(state.Id, state.TotalAmount); + + var result = PaymentConfirmedHandler.Handle(@event, state); + + result.Count.ShouldBe(2); + result[0].ShouldBeOfType(); + var completed = result[1].ShouldBeOfType(); + completed.OrderFulfillmentStateId.ShouldBe(state.Id); +} +``` + +### Integration test (happy path, end to end) + +From [`when_completing_a_fulfillment.cs`](https://github.com/JasperFx/wolverine/tree/main/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/when_completing_a_fulfillment.cs): + +```csharp +[Fact] +public async Task happy_path_ends_with_OrderFulfillmentCompleted() +{ + var id = Guid.NewGuid(); + + await Host.InvokeMessageAndWaitAsync(new StartOrderFulfillment(id, Guid.NewGuid(), 249.00m)); + await Host.InvokeMessageAndWaitAsync(new PaymentConfirmed(id, 249.00m)); + await Host.InvokeMessageAndWaitAsync(new ItemsReserved(id, Guid.NewGuid())); + await Host.InvokeMessageAndWaitAsync(new ShipmentConfirmed(id, "TRACK-ABC")); + + await using var session = Store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(id); + + events.Count.ShouldBe(5); + events[0].Data.ShouldBeOfType(); + events[1].Data.ShouldBeOfType(); + events[2].Data.ShouldBeOfType(); + events[3].Data.ShouldBeOfType(); + events[4].Data.ShouldBeOfType(); + + var state = await session.Events.FetchLatest(id); + state.ShouldNotBeNull(); + state.IsCompleted.ShouldBeTrue(); + state.IsCancelled.ShouldBeFalse(); + state.PaymentConfirmed.ShouldBeTrue(); + state.ItemsReserved.ShouldBeTrue(); + state.ShipmentConfirmed.ShouldBeTrue(); +} +``` + +The full integration and unit test suites cover the out-of-order, idempotency, cancellation, and timeout paths; they live in the sample's `OrderFulfillment/` test folder and are worth reading alongside this page. + +## 5. The Friction Points + +None of these are deal-breakers. They are the honest accounting of what is harder with this pattern than with a Saga. Read them before you commit, so the first one does not come as a surprise two weeks in. + +### No single home for the process + +A process with five trigger message types means five handler files. Nothing in the framework ties them together. If a reviewer asks "what runs on `PaymentConfirmed`," you grep. There is no `OrderFulfillmentProcess.cs` to open. The sample's `Handlers/` folder is a convention, not an enforcement; a future maintainer can add a sixth handler elsewhere and the discoverability drops further. + +Saga gets this for free. One class per process, one file to read. + +### Completion logic is distributed across handlers + +The "maybe-complete" check lives in every continue handler, with the two "other" flags named each time: + +```csharp +if (state.ItemsReserved && state.ShipmentConfirmed) + events += new OrderFulfillmentCompleted(state.Id); +``` + +For three steps this is fine. For ten steps, the condition appears ten times with nine-flag expressions, and the first maintainer who adds an eleventh step will miss at least one. You can factor the predicate onto the state type (`state.ReadyToCompleteAfter(typeof(PaymentConfirmed))`) to centralize it, but that is hand-written and not something the framework will nudge you toward. + +Saga centralizes this in one `checkForCompletion()` method on the saga class. + +### Every continue handler carries two guard lines + +```csharp +if (state.IsTerminal) return new Events(); +if (state.PaymentConfirmed) return new Events(); +``` + +For N continue handlers, that is 2N guard lines. They are mechanical but they are not optional, and a missed guard produces data corruption (an `ItemsReserved` event appended to a cancelled stream) that your tests may not catch because the next read of state still looks "right." + +Saga's `MarkCompleted()` plus framework-managed lifecycle means Wolverine itself short-circuits handlers on a completed saga. You write the `MarkCompleted()` call once; the framework enforces it everywhere. + +### The start handler has a different shape from the continue handlers + +Start: plain static class, returns `IStartStream` via `MartenOps.StartStream`. Continue: `[AggregateHandler]` static class, returns `Events`. Start has no `OrderFulfillmentState` parameter; continue handlers always do. The two shapes are small but they are different, and new readers will ask why. + +This is a hard consequence of `AggregateHandlerAttribute.OnMissing` defaulting to `OnMissing.Simple404`. The attribute is designed around "the aggregate exists, load it, enforce concurrency." It does not naturally model "this command creates the aggregate." `MartenOps.StartStream` is the idiomatic workaround and it is fine once you know it, but you cannot hide the asymmetry from the reader. + +One related trap worth flagging: the `ExistingStreamIdCollisionException` raised on a duplicate start inherits from `MartenException`, **not** from `ConcurrencyException`, so a Wolverine retry or error policy scoped to `ConcurrencyException` will not cover duplicate-start failures. Scope the policy to `MartenException` (or to `ExistingStreamIdCollisionException` specifically) if you want to catch them. + +### Silent failure mode if you misapply `[AggregateHandler]` to a start handler + +If you forget and put `[AggregateHandler]` on a start handler, the middleware short-circuits before your handler runs. No events are appended. No exception is thrown. The test "passes" the build and the handler signature, then fails your assertion on event count with no useful diagnostic. The first time you hit this, expect to spend an hour before you realize the handler body never ran. + +### Nullable single-event returns are unsafe + +Returning `TEvent?` from a continue handler is ergonomic for the "sometimes no event" case but the aggregate-handler codegen emits `stream.AppendOne(variable)` unconditionally with no null check. A `return null;` will call `AppendOne(null)`. Use `Events` (possibly empty) for the no-op path instead. This is documented above but worth calling out as a sharp edge. + +### Inline snapshot projection is a silent correctness dependency + +The per-step idempotency guard (`if (state.PaymentConfirmed) return new Events();`) depends on the inline projection having committed the previous step's effects before the next handler loads state. Register the projection as `SnapshotLifecycle.Inline` and this works. Forget, and duplicate deliveries will be double-written without any other test failure telling you why. + +### No first-class test helper for "wait for scheduled message to fire" + +`InvokeMessageAndWaitAsync` waits for the cascading work of a single dispatch. A delayed message held by the scheduler is not tracked by that call. The sample's timeout tests use a polling helper (`WaitForCondition` in the test project) which works fine but is extra code every sample project will reinvent. If you write several timeout tests, consider lifting the helper into a shared test utility. + +## 6. When to Use Saga Instead + +Saga is the right tool when any of the following hold: + +- **You want one class per process.** Discoverability matters more to your team than the audit trail does. Open one file and see the whole state machine. +- **You do not need event history on the process itself.** A document showing "where the saga is now" is enough; a replayable log of "how it got here" would be dead weight. +- **Framework-managed completion matters.** You want to call `MarkCompleted()` in one place and have the framework stop dispatching to that instance. You do not want to maintain a completion guard on every handler. +- **Simple coordination with timeouts is the primary concern.** Kick off, run a few handlers, time out if one does not arrive. Saga has dedicated lifecycle support for this; the Process Manager via handlers recipe above is heavier for the same outcome. +- **Your team is already fluent with Saga.** Adding a second pattern for the same class of problems has a real cost in reviewability and onboarding. That cost is worth paying when the event-sourced benefits are load-bearing, not when they are merely nice-to-have. +- **The process state is not part of the domain model.** If the state is internal coordination plumbing rather than something the domain asks questions about ("show me the fulfillment history for order 1234"), there is no value in making it a first-class event stream. + +Nothing stops you from mixing the two in the same application. Sagas for the short, internal coordination processes; Process Manager via handlers for the long-running, externally visible, auditable ones. The choice is per-process, not per-repository. + +For the Saga-side mechanics see the [Saga documentation](/guide/durability/sagas) and the [Marten-backed Saga integration](/guide/durability/marten/sagas). + +## 7. Optional: DCB Enhancement + +Everything above keeps the Process Manager's decisions bounded by its own stream. Sometimes you need more. If a step has to take into account facts that live on **other** streams (a different aggregate's history, a cross-cutting event stream), a single-stream `[AggregateHandler]` is not enough. Marten's Dynamic Consistency Boundary (DCB) support, exposed in Wolverine as the `[BoundaryModel]` parameter attribute, is the right reach. + +The shape is small and worth knowing even if you do not use it today: + +```csharp +public static class BoundaryModelSubscribeStudentHandler +{ + public static EventTagQuery Load(BoundaryModelSubscribeStudentToCourse command) + => EventTagQuery + .For(command.CourseId) + .AndEventsOfType() + .Or(command.StudentId) + .AndEventsOfType(); + + public static StudentSubscribedToCourse Handle( + BoundaryModelSubscribeStudentToCourse command, + [BoundaryModel] SubscriptionState state) + { + // guard checks against state projected from events across both streams + return new StudentSubscribedToCourse(...); + } +} +``` + +The handler has two methods: `Load`, which returns an `EventTagQuery` describing the events (across one or more streams) that should feed the projection, and `Handle`, which receives the projected state built from those events. Marten's `FetchForWritingByTags(query)` loads and projects; Wolverine's `[BoundaryModel]` parameter attribute wires the middleware. + +Two sharp edges worth flagging up front: + +- `EventTagQuery.For(tag)` without a following `AndEventsOfType<...>()` call produces zero conditions and throws at runtime. Always pair `For` with `AndEventsOfType`. +- `DcbConcurrencyException` does **not** inherit from `ConcurrencyException`; it inherits from `MartenException` directly. If you have retry policies configured for optimistic concurrency violations, they will not cover DCB violations. Add a separate `opts.OnException().RetryWithCooldown(...)` policy. + +Rather than repeat a DCB sample here, we recommend reading the canonical reference that already exists in the Wolverine test suite: the University domain at [`src/Persistence/MartenTests/Dcb/University/`](https://github.com/JasperFx/wolverine/tree/main/src/Persistence/MartenTests/Dcb/University). It models student-to-course enrollment with cross-stream invariants (a student cannot subscribe to more than three courses, a course cannot exceed its capacity) and is the reference for `[BoundaryModel]` usage in the codebase. + +DCB is still an evolving area. If your Process Manager lives entirely within its own stream, stay with the single-stream recipe above. Reach for DCB when the invariants you need to enforce span streams that are not yours to co-own. diff --git a/src/Samples/ProcessManagerSample/DISCOVERIES.md b/src/Samples/ProcessManagerSample/DISCOVERIES.md new file mode 100644 index 000000000..f8cd41621 --- /dev/null +++ b/src/Samples/ProcessManagerSample/DISCOVERIES.md @@ -0,0 +1,52 @@ +# Discoveries: Process Manager via Existing Wolverine + Marten Features + +This log records what the implementation of `ProcessManagerSample` taught us beyond the initial research notes. It is the feedback loop for the recipe in `docs/guide/durability/marten/process-manager-via-handlers.md` and any future `ProcessManager` framework proposal. + +Entries are grouped by kind: confirmed expectations, corrected expectations, new gotchas, recipe adjustments, and open questions. + +## Confirmed expectations + +1. **`public Guid Id { get; set; }` is required on the state type.** Without it, `ResetAllMartenDataAsync()` / `CleanAllDataAsync()` throws `InvalidDocumentException` because Marten registers the aggregate as a document type. +2. **Correlation by convention works with no attribute.** A command/event property named `{AggregateTypeName}Id` (for us, `OrderFulfillmentStateId`) resolves the stream id automatically. This held across every continue handler in the sample with no per-handler override. +3. **Inline snapshot projection is enough.** Registering `opts.Projections.Snapshot(SnapshotLifecycle.Inline)` makes the next `FetchForWriting` see the latest state without running a daemon. Between two back-to-back `InvokeMessageAndWaitAsync` calls, the second call sees the first call's effects. +4. **`InvokeMessageAndWaitAsync` commits before returning.** The next `LightweightSession().Events.FetchStreamAsync(id)` on the same thread sees the appended event without any sleep or poll. +5. **`FetchForWriting` on a non-existent stream does return `Aggregate == null`.** This matches the research notes verbatim. However, see the corrected expectation below for why this is not actually reachable from the start handler by default. +6. **Pure-function handler tests are as simple as advertised.** `new OrderFulfillmentState { ... }` constructed directly, passed with a plain event into a static `Handle`, assertions run on the returned `Events` list. No host, no Marten, no DI, no async. +7. **Idempotency via per-step flag check works.** Checking `if (state.ThisStepAlreadyHappened) return new Events();` inside each continue handler makes a duplicate integration event a no-op, reliably, because the inline projection has already reflected the first occurrence by the time the duplicate is dispatched. +8. **`(IStartStream, OutgoingMessages)` tuple return composes cleanly.** Both values unpack correctly from the same handler: the `IStartStream` side effect creates the stream and appends the initial events; the `OutgoingMessages` items are dispatched through Wolverine's scheduler. `IStartStream` inherits from `IMartenOp` which inherits from `ISideEffect` (`IMartenOp.cs:23`), which is what makes the unpacker accept it alongside an `OutgoingMessages`. +9. **The timeout handler stays pure.** `PaymentTimeoutHandler` is a standard `[AggregateHandler]` with a `(PaymentTimeout, OrderFulfillmentState) -> Events` signature. No `IMessageBus` injection. The terminal-state and payment-confirmed guards handle both "process already done" and "payment arrived before the timer fired" cases; no explicit cancellation of the scheduled message is needed. This is the strongest argument for the "let state decide" idiom over "cancel the timer" idiom: one fewer API to manage, one fewer failure mode. +10. **Scheduler in solo mode fires within a few seconds of the requested delay.** With `TimeSpan.FromSeconds(1)`, the timeout handler actually runs within roughly 2 to 4 seconds in our test environment. A 10-second observation window is comfortably above that. The scheduler polls on an interval, so the actual fire time is "requested delay plus one poll cycle." + +## Corrected expectations + +1. **`[AggregateHandler]` is not suitable for the "start a new stream" case out of the box.** The research notes suggested a uniform `[AggregateHandler]` pattern for all handlers in the process. In practice, `AggregateHandlerAttribute.OnMissing` defaults to `OnMissing.Simple404` (`AggregateHandlerAttribute.cs:144`). When the aggregate does not yet exist, the handler body is short-circuited: zero events are appended and no exception is thrown, which makes the failure silent and easy to miss. + + The idiomatic fix matches what the Wolverine test suite already does for start cases: a plain handler that returns `IStartStream` via `MartenOps.StartStream(id, events...)`. See `src/Persistence/MartenTests/AggregateHandlerWorkflow/aggregate_handler_workflow_with_ievent.cs:144` for the reference pattern. + + Consequence for the recipe: the start handler has a different shape from continue handlers. The continue handlers stay on `[AggregateHandler]` because the stream exists by the time they run. + +## New gotchas + +1. **Silent no-op on missing aggregate.** Tied to the corrected expectation above. If you accidentally put `[AggregateHandler]` on a would-be start handler, the integration test will not throw; it will just report zero events on the stream. First-time failure mode is confusing. The doc should call this out explicitly. +2. **Nullable single-event returns are unsafe.** Returning `TEvent?` (for example `OrderFulfillmentCancelled?`) from an `[AggregateHandler]` handler looks ergonomic but is a trap: `EventCaptureActionSource` generates an unconditional `stream.AppendOne(variable)` with no null check (`AggregateHandlerAttribute.cs:225`). A `return null` would call `AppendOne(null)`. Always return `Events` (possibly empty) for no-op paths. This is the idiom the recipe should teach for any "sometimes no event" shape. +3. **Completion guard and idempotency guard are two different checks.** `if (state.IsTerminal) return;` handles "the process is closed." `if (state.ThisStepAlreadyHappened) return;` handles "at-least-once redelivery of a message that already landed." Merging them would lose the ability to tell a post-terminal late delivery from a normal retry, and both occur in practice. Continue handlers therefore carry two guard lines at the top, not one. +4. **No first-class "wait for scheduled message to fire" test helper.** Wolverine's `InvokeMessageAndWaitAsync` waits for the cascading work of a single dispatch, not for a delayed message that the scheduler has yet to pick up. For the timeout integration test we wrote a plain polling helper (`WaitForCondition(id, predicate)`) that `FetchLatest`s the state every 250ms until the condition is met or a deadline passes. This works but is unremarkable prose; a more idiomatic test helper on the Wolverine side would be welcome. `Host.TrackActivity()` is likely the right primitive but was not required to get the test green. + +## Recipe adjustments (applied) + +Each adjustment below describes a change that was applied to the recipe in `docs/guide/durability/marten/process-manager-via-handlers.md` during the iterative build. They are listed here as a historical record of why the recipe looks the way it does, for the benefit of future maintainers and the downstream `ProcessManager` proposal. + +1. **Step "Write your handlers" needs to split into two sub-steps.** One for the start handler (plain, returns `IStartStream`), one for the continue handlers (`[AggregateHandler]`, returns events / `Events` / `OutgoingMessages`). Treating them uniformly is the trap. +2. **Recommend `Events` as the default continue-handler return shape.** Single-event returns work for the happy path but force you into nullable workarounds on the no-op paths. `Events` handles both cases with the same return type (empty list for no-op, one item for a record, two items when the event also trips completion). +3. **The completion guard section in the Recipe should be two sub-sections, not one.** First sub-section: the terminal-state guard (`IsTerminal`). Second sub-section: the step-already-happened idempotency guard. They solve different problems and readers will conflate them if we don't separate them. +4. **Remove the "to be validated" banner from Step 6 (Schedule timeouts) in Phase 6.** The tuple-return pattern `(IStartStream, OutgoingMessages)` is confirmed. The pure timeout handler shape is confirmed. The "let state decide, don't cancel the timer" idiom is confirmed. +5. **Step 6 should explicitly teach the "let state decide" idiom.** The timeout handler does not attempt to cancel the scheduled message when payment arrives early. Instead, the handler's guards turn a late-firing timer into a silent no-op. This is simpler and more robust than an explicit cancellation API, and it only works because the process state is authoritative. Emphasize this when the reader is likely reaching for "how do I cancel the scheduled message." +6. **Step 8 (Test it) should show a polling test helper for scheduler assertions.** Integration tests that rely on the scheduler firing need to poll for the terminal condition; `InvokeMessageAndWaitAsync` alone is not enough because it does not wait for delayed messages. Show the helper shape so readers do not try to solve the same problem from scratch. + +## Open questions + +1. Can we configure `[AggregateHandler]` with `OnMissing.ProceedAnyway` (or equivalent) to support a uniform pattern, and should the doc show that as an option? Phase 3 did not surface a need for continue handlers to override this, so the question stays specific to start handlers. Deferred. +2. ~~Does `MartenOps.StartStream` play cleanly with `OutgoingMessages` cascading~~? **Answered in Phase 5: yes.** `(IStartStream, OutgoingMessages)` tuple return works because `IStartStream` is an `IMartenOp : ISideEffect` and the unpacker applies the side effect while also dispatching the `OutgoingMessages`. +3. ~~What happens if `StartOrderFulfillment` is dispatched twice for the same id?~~ **Answered in Phase 7.** A second start for an existing id throws `Marten.Exceptions.ExistingStreamIdCollisionException` ("Stream #{id} already exists in the database"). The exception inherits from `MartenException`, not from `ConcurrencyException`. Wolverine's `InvokeMessageAndWaitAsync` propagates it with its original type intact; there is no wrapping exception to unwrap. The transaction rolls back cleanly, so the first start's data remains the authoritative state. The exception carries the duplicate id and the aggregate type as properties, which makes it trivial to log or convert to an idempotent "already started, ignoring" response. Test: `ProcessManagerSample.Tests.OrderFulfillment.when_starting_a_fulfillment.starting_the_same_process_twice_throws_and_first_start_wins`. +4. ~~Concurrency check firing~~. **Decided in Phase 7: docs-only.** The sample does not simulate two handlers racing on the same stream. A deterministic race test in the sample would require either bypassing the transport (testing something else) or racing real dispatches (non-deterministic and flaky); neither strengthens the pattern-specific signal. The mechanic is already covered by Wolverine's own test suite. The exception-hierarchy trap that readers need to know (`DcbConcurrencyException` and `ExistingStreamIdCollisionException` not inheriting from `ConcurrencyException`) is documented in Section 5 Friction Points and Section 7 DCB Enhancement of the guide. +5. Is there a supported `Host.TrackActivity()`-style helper that waits specifically for delayed/scheduled messages to fire? Our integration test used a polling helper which works but feels like rolling our own. Worth a follow-up look. diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/IntegrationContext.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/IntegrationContext.cs new file mode 100644 index 000000000..acb898bb5 --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/IntegrationContext.cs @@ -0,0 +1,61 @@ +using Alba; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Wolverine; +using Wolverine.Runtime; +using Xunit; + +namespace ProcessManagerSample.Tests; + +public class AppFixture : IAsyncLifetime +{ + public IAlbaHost? Host { get; private set; } + + public async Task InitializeAsync() + { + Host = await AlbaHost.For(x => + { + x.ConfigureServices(services => + { + // Strip any external transports; this sample only exercises local handlers. + services.DisableAllExternalWolverineTransports(); + + // Required when running Marten + Wolverine in a single test process. + services.MartenDaemonModeIsSolo(); + services.RunWolverineInSoloMode(); + }); + }); + } + + public async Task DisposeAsync() + { + await Host!.StopAsync(); + Host.Dispose(); + } +} + +[CollectionDefinition("integration")] +public class IntegrationCollection : ICollectionFixture; + +[Collection("integration")] +public abstract class IntegrationContext : IAsyncLifetime +{ + private readonly AppFixture _fixture; + + protected IntegrationContext(AppFixture fixture) + { + _fixture = fixture; + Runtime = (WolverineRuntime)fixture.Host!.Services.GetRequiredService(); + } + + public WolverineRuntime Runtime { get; } + public IAlbaHost Host => _fixture.Host!; + public IDocumentStore Store => _fixture.Host!.Services.GetRequiredService(); + + async Task IAsyncLifetime.InitializeAsync() + { + await Host.ResetAllMartenDataAsync(); + } + + public Task DisposeAsync() => Task.CompletedTask; +} diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/HandlerUnitTests.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/HandlerUnitTests.cs new file mode 100644 index 000000000..14cfeecfc --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/HandlerUnitTests.cs @@ -0,0 +1,119 @@ +using ProcessManagerSample.OrderFulfillment; +using ProcessManagerSample.OrderFulfillment.Handlers; +using Shouldly; +using Xunit; + +namespace ProcessManagerSample.Tests.OrderFulfillment; + +/// +/// Pure function tests. No Wolverine host, no Marten session. The handlers are static methods +/// over plain inputs, so we can construct state in memory and assert directly on the returned events. +/// +public class HandlerUnitTests +{ + private static OrderFulfillmentState InProgressState( + bool paymentConfirmed = false, + bool itemsReserved = false, + bool shipmentConfirmed = false) + { + return new OrderFulfillmentState + { + Id = Guid.NewGuid(), + CustomerId = Guid.NewGuid(), + TotalAmount = 100m, + PaymentConfirmed = paymentConfirmed, + ItemsReserved = itemsReserved, + ShipmentConfirmed = shipmentConfirmed + }; + } + + [Fact] + public void payment_confirmed_records_event_and_stays_in_progress_when_gates_remain() + { + var state = InProgressState(); + var @event = new PaymentConfirmed(state.Id, state.TotalAmount); + + var result = PaymentConfirmedHandler.Handle(@event, state); + + result.Count.ShouldBe(1); + result[0].ShouldBeOfType(); + } + + [Fact] + public void payment_confirmed_also_completes_when_other_two_gates_are_already_satisfied() + { + var state = InProgressState(itemsReserved: true, shipmentConfirmed: true); + var @event = new PaymentConfirmed(state.Id, state.TotalAmount); + + var result = PaymentConfirmedHandler.Handle(@event, state); + + result.Count.ShouldBe(2); + result[0].ShouldBeOfType(); + var completed = result[1].ShouldBeOfType(); + completed.OrderFulfillmentStateId.ShouldBe(state.Id); + } + + [Fact] + public void payment_confirmed_is_a_no_op_when_payment_already_recorded() + { + var state = InProgressState(paymentConfirmed: true); + var @event = new PaymentConfirmed(state.Id, state.TotalAmount); + + var result = PaymentConfirmedHandler.Handle(@event, state); + + result.ShouldBeEmpty(); + } + + [Fact] + public void completion_guard_rejects_a_continue_message_after_terminal_state() + { + var state = InProgressState(); + state.IsCompleted = true; + + var result = PaymentConfirmedHandler.Handle( + new PaymentConfirmed(state.Id, state.TotalAmount), + state); + + result.ShouldBeEmpty(); + } + + [Fact] + public void completion_guard_rejects_a_continue_message_after_cancellation() + { + var state = InProgressState(); + state.IsCancelled = true; + + var result = ShipmentConfirmedHandler.Handle( + new ShipmentConfirmed(state.Id, "TRACK-1"), + state); + + result.ShouldBeEmpty(); + } + + [Fact] + public void cancel_emits_terminal_event_with_reason() + { + var state = InProgressState(paymentConfirmed: true); + var command = new CancelOrderFulfillment(state.Id, "Customer requested"); + + var result = CancelOrderFulfillmentHandler.Handle(command, state); + + result.Count.ShouldBe(1); + var cancelled = result[0].ShouldBeOfType(); + cancelled.OrderFulfillmentStateId.ShouldBe(state.Id); + cancelled.Reason.ShouldBe("Customer requested"); + } + + [Fact] + public void cancel_on_already_terminal_state_is_a_no_op() + { + var state = InProgressState(); + state.IsCompleted = true; + + var result = CancelOrderFulfillmentHandler.Handle( + new CancelOrderFulfillment(state.Id, "Too late"), + state); + + result.ShouldBeEmpty(); + } +} diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/when_cancelling_a_fulfillment.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/when_cancelling_a_fulfillment.cs new file mode 100644 index 000000000..6cd3de89e --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/when_cancelling_a_fulfillment.cs @@ -0,0 +1,72 @@ +using Marten; +using ProcessManagerSample.OrderFulfillment; +using Shouldly; +using Wolverine.Tracking; +using Xunit; + +namespace ProcessManagerSample.Tests.OrderFulfillment; + +public class when_cancelling_a_fulfillment : IntegrationContext +{ + public when_cancelling_a_fulfillment(AppFixture fixture) : base(fixture) + { + } + + [Fact] + public async Task cancel_mid_process_marks_the_stream_cancelled() + { + var id = Guid.NewGuid(); + + await Host.InvokeMessageAndWaitAsync(new StartOrderFulfillment(id, Guid.NewGuid(), 42m)); + await Host.InvokeMessageAndWaitAsync(new PaymentConfirmed(id, 42m)); + await Host.InvokeMessageAndWaitAsync(new CancelOrderFulfillment(id, "Fraud suspected")); + + await using var session = Store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(id); + + events.Count.ShouldBe(3); + var cancelled = events[2].Data.ShouldBeOfType(); + cancelled.Reason.ShouldBe("Fraud suspected"); + + var state = await session.Events.FetchLatest(id); + state.ShouldNotBeNull(); + state.IsCancelled.ShouldBeTrue(); + state.IsCompleted.ShouldBeFalse(); + } + + [Fact] + public async Task integration_events_after_cancellation_are_ignored() + { + var id = Guid.NewGuid(); + + await Host.InvokeMessageAndWaitAsync(new StartOrderFulfillment(id, Guid.NewGuid(), 99m)); + await Host.InvokeMessageAndWaitAsync(new CancelOrderFulfillment(id, "Customer changed mind")); + + // Warehouse did not know about the cancellation in time and still sends a reservation event. + await Host.InvokeMessageAndWaitAsync(new ItemsReserved(id, Guid.NewGuid())); + await Host.InvokeMessageAndWaitAsync(new PaymentConfirmed(id, 99m)); + + await using var session = Store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(id); + + events.Count.ShouldBe(2); + events[0].Data.ShouldBeOfType(); + events[1].Data.ShouldBeOfType(); + } + + [Fact] + public async Task second_cancel_is_a_no_op() + { + var id = Guid.NewGuid(); + + await Host.InvokeMessageAndWaitAsync(new StartOrderFulfillment(id, Guid.NewGuid(), 5m)); + await Host.InvokeMessageAndWaitAsync(new CancelOrderFulfillment(id, "First reason")); + await Host.InvokeMessageAndWaitAsync(new CancelOrderFulfillment(id, "Second reason")); + + await using var session = Store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(id); + + events.Count.ShouldBe(2); + events[1].Data.ShouldBeOfType().Reason.ShouldBe("First reason"); + } +} diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/when_completing_a_fulfillment.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/when_completing_a_fulfillment.cs new file mode 100644 index 000000000..e90765d7f --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/when_completing_a_fulfillment.cs @@ -0,0 +1,96 @@ +using Marten; +using ProcessManagerSample.OrderFulfillment; +using Shouldly; +using Wolverine.Tracking; +using Xunit; + +namespace ProcessManagerSample.Tests.OrderFulfillment; + +public class when_completing_a_fulfillment : IntegrationContext +{ + public when_completing_a_fulfillment(AppFixture fixture) : base(fixture) + { + } + + [Fact] + public async Task happy_path_ends_with_OrderFulfillmentCompleted() + { + var id = Guid.NewGuid(); + + await Host.InvokeMessageAndWaitAsync(new StartOrderFulfillment(id, Guid.NewGuid(), 249.00m)); + await Host.InvokeMessageAndWaitAsync(new PaymentConfirmed(id, 249.00m)); + await Host.InvokeMessageAndWaitAsync(new ItemsReserved(id, Guid.NewGuid())); + await Host.InvokeMessageAndWaitAsync(new ShipmentConfirmed(id, "TRACK-ABC")); + + await using var session = Store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(id); + + events.Count.ShouldBe(5); + events[0].Data.ShouldBeOfType(); + events[1].Data.ShouldBeOfType(); + events[2].Data.ShouldBeOfType(); + events[3].Data.ShouldBeOfType(); + events[4].Data.ShouldBeOfType(); + + var state = await session.Events.FetchLatest(id); + state.ShouldNotBeNull(); + state.IsCompleted.ShouldBeTrue(); + state.IsCancelled.ShouldBeFalse(); + state.PaymentConfirmed.ShouldBeTrue(); + state.ItemsReserved.ShouldBeTrue(); + state.ShipmentConfirmed.ShouldBeTrue(); + } + + [Fact] + public async Task messages_arriving_out_of_order_still_complete_the_process() + { + var id = Guid.NewGuid(); + + // Payment is deliberately last here. Any permutation of the three gates should complete. + await Host.InvokeMessageAndWaitAsync(new StartOrderFulfillment(id, Guid.NewGuid(), 50m)); + await Host.InvokeMessageAndWaitAsync(new ShipmentConfirmed(id, "TRACK-1")); + await Host.InvokeMessageAndWaitAsync(new ItemsReserved(id, Guid.NewGuid())); + await Host.InvokeMessageAndWaitAsync(new PaymentConfirmed(id, 50m)); + + await using var session = Store.LightweightSession(); + var state = await session.Events.FetchLatest(id); + + state.ShouldNotBeNull(); + state.IsCompleted.ShouldBeTrue(); + } + + [Fact] + public async Task duplicate_integration_event_is_a_no_op() + { + var id = Guid.NewGuid(); + + await Host.InvokeMessageAndWaitAsync(new StartOrderFulfillment(id, Guid.NewGuid(), 75m)); + await Host.InvokeMessageAndWaitAsync(new PaymentConfirmed(id, 75m)); + await Host.InvokeMessageAndWaitAsync(new PaymentConfirmed(id, 75m)); + + await using var session = Store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(id); + + events.Count.ShouldBe(2); + events[0].Data.ShouldBeOfType(); + events[1].Data.ShouldBeOfType(); + } + + [Fact] + public async Task integration_events_after_completion_are_ignored() + { + var id = Guid.NewGuid(); + + await Host.InvokeMessageAndWaitAsync(new StartOrderFulfillment(id, Guid.NewGuid(), 10m)); + await Host.InvokeMessageAndWaitAsync(new PaymentConfirmed(id, 10m)); + await Host.InvokeMessageAndWaitAsync(new ItemsReserved(id, Guid.NewGuid())); + await Host.InvokeMessageAndWaitAsync(new ShipmentConfirmed(id, "TRACK-1")); + + // Duplicate ShipmentConfirmed after terminal state. The completion guard must ignore it. + await Host.InvokeMessageAndWaitAsync(new ShipmentConfirmed(id, "TRACK-2")); + + await using var session = Store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(id); + events.Count.ShouldBe(5); + } +} diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/when_payment_times_out.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/when_payment_times_out.cs new file mode 100644 index 000000000..4441f3fff --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/when_payment_times_out.cs @@ -0,0 +1,117 @@ +using Marten; +using ProcessManagerSample.OrderFulfillment; +using ProcessManagerSample.OrderFulfillment.Handlers; +using Shouldly; +using Wolverine.Tracking; +using Xunit; + +namespace ProcessManagerSample.Tests.OrderFulfillment; + +public class when_payment_times_out : IntegrationContext +{ + // How long to wait for the scheduler to pick up and dispatch the PaymentTimeout. + // The scheduler polls on an interval, so the actual fire time is "around" the requested delay + // plus one poll cycle. Keep the observation window comfortably larger than both combined. + private static readonly TimeSpan SchedulerObservationWindow = TimeSpan.FromSeconds(10); + + public when_payment_times_out(AppFixture fixture) : base(fixture) + { + } + + [Fact] + public void unit_timeout_cancels_when_payment_not_yet_confirmed() + { + var state = new OrderFulfillmentState { Id = Guid.NewGuid() }; + + var result = PaymentTimeoutHandler.Handle(new PaymentTimeout(state.Id), state); + + result.Count.ShouldBe(1); + var cancelled = result[0].ShouldBeOfType(); + cancelled.Reason.ShouldBe("Payment timed out"); + } + + [Fact] + public void unit_timeout_is_no_op_when_payment_already_confirmed() + { + var state = new OrderFulfillmentState + { + Id = Guid.NewGuid(), + PaymentConfirmed = true + }; + + var result = PaymentTimeoutHandler.Handle(new PaymentTimeout(state.Id), state); + + result.ShouldBeEmpty(); + } + + [Fact] + public async Task scheduler_fires_timeout_and_cancels_the_process() + { + var id = Guid.NewGuid(); + + await Host.InvokeMessageAndWaitAsync(new StartOrderFulfillment( + id, Guid.NewGuid(), 10m, + PaymentTimeoutWindow: TimeSpan.FromSeconds(1))); + + // Give the scheduler a window to pick up the delayed message and for the + // PaymentTimeoutHandler to run through FetchForWriting -> Apply -> SaveChangesAsync. + await WaitForCondition(id, state => state.IsTerminal); + + await using var session = Store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(id); + + events.Count.ShouldBe(2); + events[0].Data.ShouldBeOfType(); + var cancelled = events[1].Data.ShouldBeOfType(); + cancelled.Reason.ShouldBe("Payment timed out"); + + var state = await session.Events.FetchLatest(id); + state.ShouldNotBeNull(); + state.IsCancelled.ShouldBeTrue(); + } + + [Fact] + public async Task payment_before_timeout_silences_the_timer() + { + var id = Guid.NewGuid(); + + await Host.InvokeMessageAndWaitAsync(new StartOrderFulfillment( + id, Guid.NewGuid(), 10m, + PaymentTimeoutWindow: TimeSpan.FromSeconds(1))); + + // Payment arrives before the scheduled timeout fires. + await Host.InvokeMessageAndWaitAsync(new PaymentConfirmed(id, 10m)); + + // Wait out the scheduler window. The timeout handler will run, observe + // state.PaymentConfirmed == true, and return empty Events. + await Task.Delay(SchedulerObservationWindow); + + await using var session = Store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(id); + + events.Count.ShouldBe(2); + events[0].Data.ShouldBeOfType(); + events[1].Data.ShouldBeOfType(); + + var state = await session.Events.FetchLatest(id); + state.ShouldNotBeNull(); + state.IsCancelled.ShouldBeFalse(); + state.PaymentConfirmed.ShouldBeTrue(); + } + + private async Task WaitForCondition(Guid id, Func predicate) + { + var deadline = DateTime.UtcNow + SchedulerObservationWindow; + while (DateTime.UtcNow < deadline) + { + await using var session = Store.LightweightSession(); + var state = await session.Events.FetchLatest(id); + if (state is not null && predicate(state)) return; + + await Task.Delay(TimeSpan.FromMilliseconds(250)); + } + + throw new TimeoutException( + $"Condition on OrderFulfillmentState {id} not met within {SchedulerObservationWindow}."); + } +} diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/when_starting_a_fulfillment.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/when_starting_a_fulfillment.cs new file mode 100644 index 000000000..32b45954c --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/OrderFulfillment/when_starting_a_fulfillment.cs @@ -0,0 +1,72 @@ +using Marten; +using Marten.Exceptions; +using ProcessManagerSample.OrderFulfillment; +using Shouldly; +using Wolverine.Tracking; +using Xunit; + +namespace ProcessManagerSample.Tests.OrderFulfillment; + +public class when_starting_a_fulfillment : IntegrationContext +{ + public when_starting_a_fulfillment(AppFixture fixture) : base(fixture) + { + } + + [Fact] + public async Task creates_the_stream_with_the_started_event() + { + var command = new StartOrderFulfillment( + OrderFulfillmentStateId: Guid.NewGuid(), + CustomerId: Guid.NewGuid(), + TotalAmount: 129.99m); + + // InvokeMessageAndWaitAsync drives the handler through Wolverine, including the + // Marten transaction. When it returns, the first AppendOne has been committed. + await Host.InvokeMessageAndWaitAsync(command); + + await using var session = Store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(command.OrderFulfillmentStateId); + + events.Count.ShouldBe(1); + var started = events[0].Data.ShouldBeOfType(); + started.OrderFulfillmentStateId.ShouldBe(command.OrderFulfillmentStateId); + started.CustomerId.ShouldBe(command.CustomerId); + started.TotalAmount.ShouldBe(command.TotalAmount); + + // Inline snapshot must have projected the event into the aggregate document. + var state = await session.Events.FetchLatest(command.OrderFulfillmentStateId); + state.ShouldNotBeNull(); + state.Id.ShouldBe(command.OrderFulfillmentStateId); + state.CustomerId.ShouldBe(command.CustomerId); + state.TotalAmount.ShouldBe(command.TotalAmount); + state.IsTerminal.ShouldBeFalse(); + } + + [Fact] + public async Task starting_the_same_process_twice_throws_and_first_start_wins() + { + var id = Guid.NewGuid(); + + await Host.InvokeMessageAndWaitAsync(new StartOrderFulfillment(id, Guid.NewGuid(), 100m)); + + // MartenOps.StartStream forbids duplicate stream creation. On a second start with the same + // id, Marten throws ExistingStreamIdCollisionException and Wolverine propagates it through + // InvokeMessageAndWaitAsync. No swallowing, no silent drop. + var thrown = await Should.ThrowAsync(async () => + { + await Host.InvokeMessageAndWaitAsync(new StartOrderFulfillment(id, Guid.NewGuid(), 200m)); + }); + + thrown.Id.ShouldBe(id); + thrown.AggregateType.ShouldBe(typeof(OrderFulfillmentState)); + + // The first start's data must still be intact; the second start's transaction rolled back. + await using var session = Store.LightweightSession(); + var events = await session.Events.FetchStreamAsync(id); + + events.Count.ShouldBe(1); + var started = events[0].Data.ShouldBeOfType(); + started.TotalAmount.ShouldBe(100m); + } +} diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/ProcessManagerSample.Tests.csproj b/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/ProcessManagerSample.Tests.csproj new file mode 100644 index 000000000..663a1dcbc --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample.Tests/ProcessManagerSample.Tests.csproj @@ -0,0 +1,26 @@ + + + + false + net9.0 + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Commands.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Commands.cs new file mode 100644 index 000000000..785d15ec5 --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Commands.cs @@ -0,0 +1,30 @@ +namespace ProcessManagerSample.OrderFulfillment; + +/// +/// Starts the order fulfillment process. The caller assigns the identity, which +/// then becomes both the stream id and the correlation id for the remaining steps. +/// Name matches the OrderFulfillmentState convention so Wolverine resolves the stream id +/// without needing a [WriteAggregate("...")] override. +/// The optional overrides the default 15-minute window; +/// tests set it to a small value so the scheduler fires within the test window. +/// +public record StartOrderFulfillment( + Guid OrderFulfillmentStateId, + Guid CustomerId, + decimal TotalAmount, + TimeSpan? PaymentTimeoutWindow = null); + +/// +/// Compensating command. Cancels an in-flight process and marks the stream terminated +/// so any subsequent integration events are ignored idempotently. +/// +public record CancelOrderFulfillment( + Guid OrderFulfillmentStateId, + string Reason); + +/// +/// Scheduled self-message fired by the start handler via OutgoingMessages.Delay. +/// Handled by , which cancels the process +/// if payment has not arrived by the time the scheduler dispatches this message. +/// +public record PaymentTimeout(Guid OrderFulfillmentStateId); diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Events.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Events.cs new file mode 100644 index 000000000..3ccdffc6d --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Events.cs @@ -0,0 +1,45 @@ +namespace ProcessManagerSample.OrderFulfillment; + +/// +/// Emitted when the fulfillment process kicks off for a newly placed order. +/// Creates the stream and sets the correlation identity for all downstream handlers. +/// +public record OrderFulfillmentStarted( + Guid OrderFulfillmentStateId, + Guid CustomerId, + decimal TotalAmount); + +/// +/// Integration event from the payment service. The process manager treats this as both the +/// trigger message and the fact it records on its own stream. +/// +public record PaymentConfirmed( + Guid OrderFulfillmentStateId, + decimal Amount); + +/// +/// Integration event from the warehouse service. +/// +public record ItemsReserved( + Guid OrderFulfillmentStateId, + Guid ReservationId); + +/// +/// Integration event from the shipping service. +/// +public record ShipmentConfirmed( + Guid OrderFulfillmentStateId, + string TrackingNumber); + +/// +/// Terminal event for the happy path. Appended by whichever continue handler observes +/// that all three prerequisite steps are now complete. +/// +public record OrderFulfillmentCompleted(Guid OrderFulfillmentStateId); + +/// +/// Terminal event for the compensating path. Appended by the cancel handler or a timeout. +/// +public record OrderFulfillmentCancelled( + Guid OrderFulfillmentStateId, + string Reason); diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/CancelOrderFulfillmentHandler.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/CancelOrderFulfillmentHandler.cs new file mode 100644 index 000000000..4efe7a2f2 --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/CancelOrderFulfillmentHandler.cs @@ -0,0 +1,20 @@ +using Wolverine.Marten; + +namespace ProcessManagerSample.OrderFulfillment.Handlers; + +/// +/// Compensating path. Marks the process cancelled with a reason; subsequent integration events +/// are ignored by the completion guard on each continue handler. +/// +[AggregateHandler] +public static class CancelOrderFulfillmentHandler +{ + public static Events Handle(CancelOrderFulfillment command, OrderFulfillmentState state) + { + if (state.IsTerminal) return new Events(); + + var events = new Events(); + events += new OrderFulfillmentCancelled(state.Id, command.Reason); + return events; + } +} diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/ItemsReservedHandler.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/ItemsReservedHandler.cs new file mode 100644 index 000000000..e6d84dcf0 --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/ItemsReservedHandler.cs @@ -0,0 +1,28 @@ +using Wolverine.Marten; + +namespace ProcessManagerSample.OrderFulfillment.Handlers; + +/// +/// Reacts to an integration event from the warehouse service. +/// Records the event on the process stream and, if this was the last gate, appends the +/// terminal event. +/// +[AggregateHandler] +public static class ItemsReservedHandler +{ + public static Events Handle(ItemsReserved @event, OrderFulfillmentState state) + { + if (state.IsTerminal) return new Events(); + if (state.ItemsReserved) return new Events(); + + var events = new Events(); + events += @event; + + if (state.PaymentConfirmed && state.ShipmentConfirmed) + { + events += new OrderFulfillmentCompleted(state.Id); + } + + return events; + } +} diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/PaymentConfirmedHandler.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/PaymentConfirmedHandler.cs new file mode 100644 index 000000000..d664a104a --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/PaymentConfirmedHandler.cs @@ -0,0 +1,31 @@ +using Wolverine.Marten; + +namespace ProcessManagerSample.OrderFulfillment.Handlers; + +/// +/// Reacts to a integration event from the payment service. +/// Records the event on the process stream and, if this was the last gate, appends the +/// terminal event. +/// +[AggregateHandler] +public static class PaymentConfirmedHandler +{ + public static Events Handle(PaymentConfirmed @event, OrderFulfillmentState state) + { + // Completion guard: every continue handler must check this first. + if (state.IsTerminal) return new Events(); + + // Idempotency guard against a duplicate delivery of the same integration event. + if (state.PaymentConfirmed) return new Events(); + + var events = new Events(); + events += @event; + + if (state.ItemsReserved && state.ShipmentConfirmed) + { + events += new OrderFulfillmentCompleted(state.Id); + } + + return events; + } +} diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/PaymentTimeoutHandler.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/PaymentTimeoutHandler.cs new file mode 100644 index 000000000..f57bdf26a --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/PaymentTimeoutHandler.cs @@ -0,0 +1,23 @@ +using Wolverine.Marten; + +namespace ProcessManagerSample.OrderFulfillment.Handlers; + +/// +/// Fires when the scheduled message is dispatched by Wolverine's scheduler. +/// Cancels the process if payment never confirmed. Idempotent by construction: both guards below make +/// this a silent no-op if payment arrived before the timer fired, or if the process is already terminal. +/// No explicit cancellation of the scheduled message is needed; state decides whether the timeout acts. +/// +[AggregateHandler] +public static class PaymentTimeoutHandler +{ + public static Events Handle(PaymentTimeout _, OrderFulfillmentState state) + { + if (state.IsTerminal) return new Events(); + if (state.PaymentConfirmed) return new Events(); + + var events = new Events(); + events += new OrderFulfillmentCancelled(state.Id, "Payment timed out"); + return events; + } +} diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/ShipmentConfirmedHandler.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/ShipmentConfirmedHandler.cs new file mode 100644 index 000000000..3578703e3 --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/ShipmentConfirmedHandler.cs @@ -0,0 +1,28 @@ +using Wolverine.Marten; + +namespace ProcessManagerSample.OrderFulfillment.Handlers; + +/// +/// Reacts to a integration event from the shipping service. +/// Records the event on the process stream and, if this was the last gate, appends the +/// terminal event. +/// +[AggregateHandler] +public static class ShipmentConfirmedHandler +{ + public static Events Handle(ShipmentConfirmed @event, OrderFulfillmentState state) + { + if (state.IsTerminal) return new Events(); + if (state.ShipmentConfirmed) return new Events(); + + var events = new Events(); + events += @event; + + if (state.PaymentConfirmed && state.ItemsReserved) + { + events += new OrderFulfillmentCompleted(state.Id); + } + + return events; + } +} diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/StartOrderFulfillmentHandler.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/StartOrderFulfillmentHandler.cs new file mode 100644 index 000000000..bf906310c --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/Handlers/StartOrderFulfillmentHandler.cs @@ -0,0 +1,36 @@ +using Wolverine; +using Wolverine.Marten; + +namespace ProcessManagerSample.OrderFulfillment.Handlers; + +/// +/// Bootstraps a new order fulfillment stream via +/// and schedules a via . +/// +/// The "start" case is intentionally a plain handler rather than an [AggregateHandler]: the default +/// OnMissing behavior on [AggregateHandler] short-circuits when the aggregate does not yet exist, so it +/// cannot be used to create a new stream without overriding that behavior. Continue handlers, which operate on +/// an already-existing stream, use [AggregateHandler] freely. +/// +public static class StartOrderFulfillmentHandler +{ + // Production default. Tests override via StartOrderFulfillment.PaymentTimeoutWindow. + public static readonly TimeSpan DefaultPaymentTimeoutWindow = TimeSpan.FromMinutes(15); + + public static (IStartStream, OutgoingMessages) Handle(StartOrderFulfillment command) + { + var started = new OrderFulfillmentStarted( + command.OrderFulfillmentStateId, + command.CustomerId, + command.TotalAmount); + + var outgoing = new OutgoingMessages(); + outgoing.Delay( + new PaymentTimeout(command.OrderFulfillmentStateId), + command.PaymentTimeoutWindow ?? DefaultPaymentTimeoutWindow); + + return ( + MartenOps.StartStream(command.OrderFulfillmentStateId, started), + outgoing); + } +} diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/OrderFulfillmentState.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/OrderFulfillmentState.cs new file mode 100644 index 000000000..4b0057d41 --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/OrderFulfillmentState.cs @@ -0,0 +1,46 @@ +namespace ProcessManagerSample.OrderFulfillment; + +/// +/// Event-sourced state for the order fulfillment process. Projected inline from the event stream +/// via Apply methods. Serves as the correlation surface for the handlers that coordinate payment, +/// warehouse, and shipping steps. +/// +public class OrderFulfillmentState +{ + // Required by Marten: FetchForWriting registers the aggregate type as a document type. + // Without a public Guid Id { get; set; }, CleanAllDataAsync throws InvalidDocumentException. + public Guid Id { get; set; } + + public Guid CustomerId { get; set; } + public decimal TotalAmount { get; set; } + + public bool PaymentConfirmed { get; set; } + public bool ItemsReserved { get; set; } + public bool ShipmentConfirmed { get; set; } + + public bool IsCompleted { get; set; } + public bool IsCancelled { get; set; } + + /// + /// True once the process has reached a terminal state. Every continue handler must guard on this + /// to stay idempotent against late-arriving messages after completion or cancellation. + /// + public bool IsTerminal => IsCompleted || IsCancelled; + + public void Apply(OrderFulfillmentStarted e) + { + Id = e.OrderFulfillmentStateId; + CustomerId = e.CustomerId; + TotalAmount = e.TotalAmount; + } + + public void Apply(PaymentConfirmed _) => PaymentConfirmed = true; + + public void Apply(ItemsReserved _) => ItemsReserved = true; + + public void Apply(ShipmentConfirmed _) => ShipmentConfirmed = true; + + public void Apply(OrderFulfillmentCompleted _) => IsCompleted = true; + + public void Apply(OrderFulfillmentCancelled _) => IsCancelled = true; +} diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/README.md b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/README.md new file mode 100644 index 000000000..7afaafc89 --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample/OrderFulfillment/README.md @@ -0,0 +1,15 @@ +# Order Fulfillment Process + +A realistic Process Manager built on existing Wolverine + Marten features: + +- `OrderFulfillmentState` is an event-sourced aggregate projected inline from the process's own stream. +- Six static handler classes, one per trigger message: start, payment confirmed, items reserved, shipment confirmed, cancel, and payment timeout. All keyed off the same stream. +- `[AggregateHandler]` on each continue handler class wires `FetchForWriting` + optimistic concurrency automatically. The start handler is a plain static class that returns `IStartStream` via `MartenOps.StartStream`; see the Process Manager via Handlers guide in the Wolverine docs for why. +- The process reaches a terminal state by appending either `OrderFulfillmentCompleted` or `OrderFulfillmentCancelled`. + +Start here: +- `OrderFulfillmentState.cs` — the state type and its `Apply` methods +- `Events.cs` / `Commands.cs` — the event and command records +- `Handlers/` — one handler per trigger message + +This folder is the whole sample. The `Program.cs` at the project root is a thin hosting wrapper for demos. diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample/ProcessManagerSample.csproj b/src/Samples/ProcessManagerSample/ProcessManagerSample/ProcessManagerSample.csproj new file mode 100644 index 000000000..1548b7ffa --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample/ProcessManagerSample.csproj @@ -0,0 +1,16 @@ + + + + net9.0 + enable + enable + + + + + + + + + + diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample/Program.cs b/src/Samples/ProcessManagerSample/ProcessManagerSample/Program.cs new file mode 100644 index 000000000..cd6c4e597 --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample/Program.cs @@ -0,0 +1,38 @@ +using JasperFx; +using Marten; +using Marten.Events.Projections; +using ProcessManagerSample.OrderFulfillment; +using Wolverine; +using Wolverine.Marten; + +var builder = WebApplication.CreateBuilder(args); + +builder.Services.AddMarten(opts => + { + var connectionString = builder.Configuration.GetConnectionString("Marten"); + opts.Connection(connectionString!); + opts.DatabaseSchemaName = "process_manager"; + + // Inline snapshot: OrderFulfillmentState is projected on every SaveChangesAsync + // so that subsequent FetchForWriting calls see the latest state without a daemon. + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }) + .IntegrateWithWolverine(); + +builder.Host.UseWolverine(opts => +{ + // Wraps handler execution in a Marten session + SaveChangesAsync and wires the outbox. + opts.Policies.AutoApplyTransactions(); +}); + +var app = builder.Build(); + +// Thin HTTP surface so you can curl the sample or demo the process externally. +// The documentation focuses on the handlers, not the transport. +app.MapPost("/orders/start", + (StartOrderFulfillment command, IMessageBus bus) => bus.InvokeAsync(command)); + +return await app.RunJasperFxCommands(args); + +// Exposed as partial so Alba can bootstrap the host in the test project. +public partial class Program; diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample/appsettings.Development.json b/src/Samples/ProcessManagerSample/ProcessManagerSample/appsettings.Development.json new file mode 100644 index 000000000..0c208ae91 --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/src/Samples/ProcessManagerSample/ProcessManagerSample/appsettings.json b/src/Samples/ProcessManagerSample/ProcessManagerSample/appsettings.json new file mode 100644 index 000000000..84c262a27 --- /dev/null +++ b/src/Samples/ProcessManagerSample/ProcessManagerSample/appsettings.json @@ -0,0 +1,12 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*", + "ConnectionStrings": { + "Marten": "Host=localhost;Port=5433;Database=postgres;Username=postgres;Password=postgres" + } +} diff --git a/wolverine.slnx b/wolverine.slnx index 384ce7c12..2b8f601a9 100644 --- a/wolverine.slnx +++ b/wolverine.slnx @@ -174,6 +174,10 @@ + + + +