diff --git a/src/Mocha/src/Mocha/Sagas/Definitions/SagaConfiguration.cs b/src/Mocha/src/Mocha/Sagas/Definitions/SagaConfiguration.cs index 921178c1a4a..78cb1ce6579 100644 --- a/src/Mocha/src/Mocha/Sagas/Definitions/SagaConfiguration.cs +++ b/src/Mocha/src/Mocha/Sagas/Definitions/SagaConfiguration.cs @@ -20,6 +20,12 @@ public sealed class SagaConfiguration : MessagingConfiguration /// public SagaStateConfiguration? DuringAny { get; set; } + /// + /// Gets or sets the timeout duration for the saga. When set, the saga will automatically + /// transition to the timed-out final state after this duration. + /// + public TimeSpan? Timeout { get; set; } + /// /// Gets or sets a factory for creating a custom saga state serializer. /// diff --git a/src/Mocha/src/Mocha/Sagas/Descriptors/SagaDescriptorExtensions.cs b/src/Mocha/src/Mocha/Sagas/Descriptors/SagaDescriptorExtensions.cs index 935896acd4b..744f423dae3 100644 --- a/src/Mocha/src/Mocha/Sagas/Descriptors/SagaDescriptorExtensions.cs +++ b/src/Mocha/src/Mocha/Sagas/Descriptors/SagaDescriptorExtensions.cs @@ -18,7 +18,8 @@ public static ISagaFinalStateDescriptor Timeout( TimeSpan timeout) where TState : SagaStateBase { - // TODO for this we need scheduling - throw new NotImplementedException(); + descriptor.Extend().Configuration.Timeout = timeout; + + return descriptor.Finally(StateNames.TimedOut); } } diff --git a/src/Mocha/src/Mocha/Sagas/Saga.cs b/src/Mocha/src/Mocha/Sagas/Saga.cs index 8657dd0a6b4..03eb8b016b3 100644 --- a/src/Mocha/src/Mocha/Sagas/Saga.cs +++ b/src/Mocha/src/Mocha/Sagas/Saga.cs @@ -1,4 +1,5 @@ using System.Collections.Immutable; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Mocha.Features; @@ -213,8 +214,7 @@ protected Saga() /// /// Thrown when the saga has not been initialized. - public override IReadOnlyDictionary States - => _states ?? throw ThrowHelper.SagaNotInitialized(); + public override IReadOnlyDictionary States => _states ?? throw ThrowHelper.SagaNotInitialized(); /// public override Type StateType => typeof(TState); @@ -257,6 +257,23 @@ public override async Task HandleEvent(IConsumeContext context) _logger!.CreatedSagaState(Name, state.Id); await OnEnterStateAsync(state, _initialState, context); + + // Schedule timeout if configured + if (Configuration.Timeout is { } timeout) + { + var bus = context.GetBus(); + + var timeProvider = context.Services.GetService() ?? TimeProvider.System; + + var scheduledTime = timeProvider.GetUtcNow().Add(timeout); + + var result = await bus.ScheduleSendAsync(new SagaTimedOutEvent(state.Id), scheduledTime, ct); + + if (result.Token is not null) + { + state.TimeoutToken = result.Token; + } + } } await OnHandleTransitionAsync(state, @event, context); @@ -397,6 +414,18 @@ protected virtual async Task OnEnterStateAsync(TState state, SagaState nextState if (nextState.IsFinal) { + if (state.TimeoutToken is { } timeoutToken) + { + try + { + await context.GetBus().CancelScheduledMessageAsync(timeoutToken, ct); + } + catch (Exception ex) + { + _logger?.FailedToCancelScheduledMessage(ex, timeoutToken, Name!, state.Id); + } + } + if (nextState.Response is not null && state.Metadata.TryGet(SagaContextData.ReplyAddress, out var replyTo) && state.Metadata.TryGet(SagaContextData.CorrelationId, out var correlationId) @@ -602,4 +631,12 @@ public static partial void ReplyingToSaga( [LoggerMessage(LogLevel.Information, "Saga completed {SagaName} {SagaId}")] public static partial void SagaCompleted(this ILogger logger, string sagaName, Guid sagaId); + + [LoggerMessage(LogLevel.Warning, "Failed to cancel scheduled message {Token} for saga {SagaName} ({SagaId})")] + public static partial void FailedToCancelScheduledMessage( + this ILogger logger, + Exception ex, + string token, + string sagaName, + Guid sagaId); } diff --git a/src/Mocha/src/Mocha/Sagas/State/SagaStateBase.cs b/src/Mocha/src/Mocha/Sagas/State/SagaStateBase.cs index 6f8bd5ccb46..a0abfbbee6d 100644 --- a/src/Mocha/src/Mocha/Sagas/State/SagaStateBase.cs +++ b/src/Mocha/src/Mocha/Sagas/State/SagaStateBase.cs @@ -30,6 +30,12 @@ public SagaStateBase() : this(Guid.NewGuid(), StateNames.Initial) { } /// public List Errors { get; set; } = []; + /// + /// Gets or sets the cancellation token for a scheduled timeout, used to cancel the timeout + /// when the saga completes or transitions before the timeout fires. + /// + public string? TimeoutToken { get; set; } + /// /// Gets or sets custom metadata associated with this saga instance. /// diff --git a/src/Mocha/test/Mocha.Sagas.TestHelpers/TestMessageBus.cs b/src/Mocha/test/Mocha.Sagas.TestHelpers/TestMessageBus.cs index 32da210583c..4bbe9ef60d3 100644 --- a/src/Mocha/test/Mocha.Sagas.TestHelpers/TestMessageBus.cs +++ b/src/Mocha/test/Mocha.Sagas.TestHelpers/TestMessageBus.cs @@ -5,6 +5,10 @@ namespace Mocha.Sagas.Tests; /// public sealed class TestMessageBus(TestMessageOutbox outbox) : IMessageBus { + private int _scheduleCounter; + + public List CancelledTokens { get; } = []; + public ValueTask PublishAsync(T message, CancellationToken cancellationToken) { outbox.Messages.Add(new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Publish, message!, null)); @@ -68,22 +72,36 @@ public ValueTask ReplyAsync( public ValueTask SchedulePublishAsync( T message, DateTimeOffset scheduledTime, - CancellationToken cancellationToken) where T : notnull + CancellationToken cancellationToken) + where T : notnull { - outbox.Messages.Add( - new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Publish, message, null)); - return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime }); + var token = $"test:{Interlocked.Increment(ref _scheduleCounter)}"; + outbox.Messages.Add(new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Publish, message, null)); + return ValueTask.FromResult( + new SchedulingResult + { + Token = token, + ScheduledTime = scheduledTime, + IsCancellable = true + }); } public ValueTask SchedulePublishAsync( T message, DateTimeOffset scheduledTime, PublishOptions options, - CancellationToken cancellationToken) where T : notnull + CancellationToken cancellationToken) + where T : notnull { - outbox.Messages.Add( - new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Publish, message, options)); - return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime }); + var token = $"test:{Interlocked.Increment(ref _scheduleCounter)}"; + outbox.Messages.Add(new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Publish, message, options)); + return ValueTask.FromResult( + new SchedulingResult + { + Token = token, + ScheduledTime = scheduledTime, + IsCancellable = true + }); } public ValueTask ScheduleSendAsync( @@ -91,9 +109,15 @@ public ValueTask ScheduleSendAsync( DateTimeOffset scheduledTime, CancellationToken cancellationToken) { - outbox.Messages.Add( - new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Send, message, null)); - return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime }); + var token = $"test:{Interlocked.Increment(ref _scheduleCounter)}"; + outbox.Messages.Add(new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Send, message, null)); + return ValueTask.FromResult( + new SchedulingResult + { + Token = token, + ScheduledTime = scheduledTime, + IsCancellable = true + }); } public ValueTask ScheduleSendAsync( @@ -102,13 +126,20 @@ public ValueTask ScheduleSendAsync( SendOptions options, CancellationToken cancellationToken) { - outbox.Messages.Add( - new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Send, message, options)); - return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime }); + var token = $"test:{Interlocked.Increment(ref _scheduleCounter)}"; + outbox.Messages.Add(new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Send, message, options)); + return ValueTask.FromResult( + new SchedulingResult + { + Token = token, + ScheduledTime = scheduledTime, + IsCancellable = true + }); } public ValueTask CancelScheduledMessageAsync(string token, CancellationToken cancellationToken) { - return ValueTask.FromResult(false); + CancelledTokens.Add(token); + return ValueTask.FromResult(true); } } diff --git a/src/Mocha/test/Mocha.Sagas.Tests/IntegrationTests.cs b/src/Mocha/test/Mocha.Sagas.Tests/IntegrationTests.cs index f18100e51f0..d2746abb021 100644 --- a/src/Mocha/test/Mocha.Sagas.Tests/IntegrationTests.cs +++ b/src/Mocha/test/Mocha.Sagas.Tests/IntegrationTests.cs @@ -98,10 +98,39 @@ public async Task Saga_Should_Timeout() Assert.Equal(0, storage.Count); } - [Fact(Skip = "Timeout(TimeSpan) auto-scheduling throws NotImplementedException")] + [Fact] public async Task Saga_Should_TimeoutWithCustomResponse() { - await Task.CompletedTask; + // arrange + var recorder = new MessageRecorder(); + await using var provider = await CreateBusAsync(b => + { + b.Services.AddSingleton(recorder); + b.AddEventHandler(); + b.AddSaga(); + }); + + using var scope = provider.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + var storage = provider.GetRequiredService(); + + // act - publish StartTimeoutEvent to start the saga + await bus.PublishAsync(new StartTimeoutEvent(), CancellationToken.None); + + // wait for the saga to publish a TriggerEvent so we can observe it started + Assert.True(await recorder.WaitAsync(s_timeout), "Saga did not publish event within timeout"); + + var recorded = Assert.Single(recorder.Messages); + var sagaId = Assert.IsType(recorded).CorrelationId!.Value; + + // simulate timeout by sending SagaTimedOutEvent with the saga ID + await bus.SendAsync(new SagaTimedOutEvent(sagaId), CancellationToken.None); + + // allow time for final state processing + await Task.Delay(500, default); + + // assert - saga should be deleted from store after reaching final state + Assert.Equal(0, storage.Count); } [Fact(Skip = "OnAnyReply saga reply routing requires full reply pipeline investigation")] @@ -183,6 +212,31 @@ protected override void Configure(ISagaDescriptor descriptor) } } + /// + /// Timeout saga with Timeout() API: StartTimeoutEvent -> Active (publishes TriggerEvent) -> + /// SagaTimedOutEvent -> TimedOut (final, with custom response possible) + /// + public sealed class TimeoutWithResponseSaga : Saga + { + protected override void Configure(ISagaDescriptor descriptor) + { + descriptor.Timeout(TimeSpan.FromMinutes(30)); + + descriptor + .Initially() + .OnEvent() + .StateFactory(_ => new TimeoutState()) + .Publish((_, state) => new TriggerEvent(state.Id)) + .TransitionTo("Active"); + + descriptor.DuringAny().OnTimeout().TransitionTo(StateNames.TimedOut); + + descriptor.During("Active").OnEvent().TransitionTo("Completed"); + + descriptor.Finally("Completed"); + } + } + /// /// Request-response saga: StartRequestEvent -> AwaitingResponse (sends TriggerRequest) -> /// any reply -> Completed (final) @@ -212,6 +266,8 @@ public sealed class InitEvent; public sealed class StartTimeoutEvent; + public sealed class EndTimeoutEvent; + public sealed class StartRequestEvent; public sealed record TriggerEvent(Guid? CorrelationId) : ICorrelatable; diff --git a/src/Mocha/test/Mocha.Sagas.Tests/SagaSchedulingTests.cs b/src/Mocha/test/Mocha.Sagas.Tests/SagaSchedulingTests.cs index 9d5af9693d8..098e4674493 100644 --- a/src/Mocha/test/Mocha.Sagas.Tests/SagaSchedulingTests.cs +++ b/src/Mocha/test/Mocha.Sagas.Tests/SagaSchedulingTests.cs @@ -109,14 +109,9 @@ public async Task ScheduledPublish_Lifecycle_Should_ProducePublishOptionsWithSch var saga = Saga.Create(x => { - x.Initially() - .OnEvent() - .StateFactory(_ => new TestState()) - .TransitionTo("Started"); + x.Initially().OnEvent().StateFactory(_ => new TestState()).TransitionTo("Started"); - x.During("Started") - .OnEntry() - .ScheduledPublish(delay, _ => new ScheduledNotification()); + x.During("Started").OnEntry().ScheduledPublish(delay, _ => new ScheduledNotification()); x.During("Started").OnEvent().TransitionTo("Ended"); @@ -144,14 +139,9 @@ public async Task ScheduledSend_Lifecycle_Should_ProduceSendOptionsWithScheduled var saga = Saga.Create(x => { - x.Initially() - .OnEvent() - .StateFactory(_ => new TestState()) - .TransitionTo("Started"); + x.Initially().OnEvent().StateFactory(_ => new TestState()).TransitionTo("Started"); - x.During("Started") - .OnEntry() - .ScheduledSend(delay, _ => new ScheduledCommand()); + x.During("Started").OnEntry().ScheduledSend(delay, _ => new ScheduledCommand()); x.During("Started").OnEvent().TransitionTo("Ended"); @@ -182,8 +172,8 @@ private TestConsumeContext CreateContext(Saga saga, object message) Runtime = s_runtime }; - context.Features.Configure(f => f.Message = message); - context.Features.Configure(f => f.Store = _store); + context.Features.GetOrSet().Message = message; + context.Features.GetOrSet().Store = _store; return context; } diff --git a/src/Mocha/test/Mocha.Sagas.Tests/SagaTimeoutTests.cs b/src/Mocha/test/Mocha.Sagas.Tests/SagaTimeoutTests.cs new file mode 100644 index 00000000000..9759e799088 --- /dev/null +++ b/src/Mocha/test/Mocha.Sagas.Tests/SagaTimeoutTests.cs @@ -0,0 +1,245 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Time.Testing; +using Mocha.Features; +using Mocha.Transport.InMemory; + +namespace Mocha.Sagas.Tests; + +public sealed class SagaTimeoutTests +{ + private static readonly IMessagingRuntime s_runtime = CreateRuntime(); + + private readonly TestMessageOutbox _outbox; + private readonly TestSagaStore _store; + private readonly TestSagaCleanup _cleanup; + private readonly FakeTimeProvider _timeProvider; + private readonly TestMessageBus _bus; + private readonly IServiceProvider _services; + + private static IMessagingRuntime CreateRuntime() + { + var services = new ServiceCollection(); + var builder = services.AddMessageBus(); + builder.AddInMemory(); + var provider = services.BuildServiceProvider(); + return provider.GetRequiredService(); + } + + public SagaTimeoutTests() + { + _store = new TestSagaStore(); + _cleanup = new TestSagaCleanup(); + _outbox = new TestMessageOutbox(); + _timeProvider = new FakeTimeProvider(new DateTimeOffset(2026, 6, 1, 12, 0, 0, TimeSpan.Zero)); + _bus = new TestMessageBus(_outbox); + _services = new ServiceCollection() + .AddSingleton(_cleanup) + .AddSingleton(_bus) + .AddSingleton(_timeProvider) + .BuildServiceProvider(); + } + + [Fact] + public async Task Timeout_Should_ScheduleTimedOutEvent_When_SagaCreated() + { + // Arrange + var timeout = TimeSpan.FromMinutes(5); + var saga = + Saga.Create(x => + { + x.Timeout(timeout); + + x.Initially().OnEvent().StateFactory(_ => new TimeoutState()).TransitionTo("Active"); + + x.DuringAny().OnTimeout().TransitionTo(StateNames.TimedOut); + + x.During("Active").OnEvent().TransitionTo("Completed"); + + x.Finally("Completed"); + }); + + Initialize(saga); + + // Act + var context = CreateContext(saga, new StartEvent()); + await saga.HandleEvent(context); + + // Assert - ScheduleSendAsync should have been called with a SagaTimedOutEvent + var scheduledOp = _outbox.Messages.FirstOrDefault(m => m.Message is SagaTimedOutEvent); + Assert.NotNull(scheduledOp); + + var timedOutEvent = Assert.IsType(scheduledOp.Message); + var state = Assert.Single(_store.States); + Assert.Equal(state.Id, timedOutEvent.SagaId); + + // Verify the scheduled time matches now + timeout + Assert.Equal(TestMessageOutbox.OperationKind.Send, scheduledOp.Kind); + } + + [Fact] + public async Task Timeout_Should_StoreToken_When_SagaCreated() + { + // Arrange + var saga = + Saga.Create(x => + { + x.Timeout(TimeSpan.FromMinutes(5)); + + x.Initially().OnEvent().StateFactory(_ => new TimeoutState()).TransitionTo("Active"); + + x.DuringAny().OnTimeout().TransitionTo(StateNames.TimedOut); + + x.During("Active").OnEvent().TransitionTo("Completed"); + + x.Finally("Completed"); + }); + + Initialize(saga); + + // Act + var context = CreateContext(saga, new StartEvent()); + await saga.HandleEvent(context); + + // Assert + var state = Assert.Single(_store.States); + Assert.NotNull(state.TimeoutToken); + Assert.StartsWith("test:", state.TimeoutToken); + } + + [Fact] + public async Task Timeout_Should_CancelToken_When_SagaReachesFinalState() + { + // Arrange + var saga = + Saga.Create(x => + { + x.Timeout(TimeSpan.FromMinutes(5)); + + x.Initially().OnEvent().StateFactory(_ => new TimeoutState()).TransitionTo("Active"); + + x.DuringAny().OnTimeout().TransitionTo(StateNames.TimedOut); + + x.During("Active").OnEvent().TransitionTo("Completed"); + + x.Finally("Completed"); + }); + + Initialize(saga); + + // Act - create saga + var createContext = CreateContext(saga, new StartEvent()); + await saga.HandleEvent(createContext); + + var state = Assert.Single(_store.States); + var token = state.TimeoutToken; + Assert.NotNull(token); + + // Act - send event that transitions to final state + var endContext = CreateContextWithSagaId(saga, new EndEvent(), state.Id); + await saga.HandleEvent(endContext); + + // Assert - token should have been cancelled + Assert.Contains(token, _bus.CancelledTokens); + // Saga should be deleted from store + Assert.Empty(_store.States); + } + + [Fact] + public async Task Timeout_Should_TransitionToTimedOut_When_TimeoutEventReceived() + { + // Arrange + var saga = + Saga.Create(x => + { + x.Timeout(TimeSpan.FromMinutes(5)); + + x.Initially().OnEvent().StateFactory(_ => new TimeoutState()).TransitionTo("Active"); + + x.DuringAny().OnTimeout().TransitionTo(StateNames.TimedOut); + + x.During("Active").OnEvent().TransitionTo("Completed"); + + x.Finally("Completed"); + }); + + Initialize(saga); + + // Act - create saga + var createContext = CreateContext(saga, new StartEvent()); + await saga.HandleEvent(createContext); + + var state = Assert.Single(_store.States); + var sagaId = state.Id; + + // Act - simulate timeout by sending SagaTimedOutEvent + var timeoutContext = CreateContextWithSagaId(saga, new SagaTimedOutEvent(sagaId), sagaId); + await saga.HandleEvent(timeoutContext); + + // Assert - saga should be deleted (final state reached) + Assert.Empty(_store.States); + } + + [Fact] + public async Task NoTimeout_Should_NotSchedule_When_TimeoutNotConfigured() + { + // Arrange - no Timeout() call + var saga = + Saga.Create(x => + { + x.Initially().OnEvent().StateFactory(_ => new TimeoutState()).TransitionTo("Active"); + + x.During("Active").OnEvent().TransitionTo("Completed"); + + x.Finally("Completed"); + }); + + Initialize(saga); + + // Act + var context = CreateContext(saga, new StartEvent()); + await saga.HandleEvent(context); + + // Assert - no SagaTimedOutEvent should be scheduled + var scheduledOp = _outbox.Messages.FirstOrDefault(m => m.Message is SagaTimedOutEvent); + Assert.Null(scheduledOp); + + // State should not have a timeout token + var state = Assert.Single(_store.States); + Assert.Null(state.TimeoutToken); + } + + private TestConsumeContext CreateContext(Saga saga, object message) + { + var context = new TestConsumeContext + { + CancellationToken = CancellationToken.None, + CorrelationId = Guid.NewGuid().ToString(), + MessageId = Guid.NewGuid().ToString(), + Services = _services, + Runtime = s_runtime + }; + + context.Features.GetOrSet().Message = message; + context.Features.GetOrSet().Store = _store; + + return context; + } + + private TestConsumeContext CreateContextWithSagaId(Saga saga, object message, Guid sagaId) + { + var context = CreateContext(saga, message); + context.MutableHeaders.Set(SagaContextData.SagaId, sagaId.ToString("D")); + return context; + } + + private static void Initialize(Saga saga) + { + saga.Initialize(TestMessagingSetupContext.Instance); + } + + private sealed class TimeoutState : SagaStateBase; + + private sealed class StartEvent; + + private sealed class EndEvent; +} diff --git a/website/src/docs/mocha/v1/sagas.md b/website/src/docs/mocha/v1/sagas.md index 7d2b2981999..a5259152c8e 100644 --- a/website/src/docs/mocha/v1/sagas.md +++ b/website/src/docs/mocha/v1/sagas.md @@ -617,6 +617,139 @@ The correlation lookup order is: 2. Check the message headers for a `saga-id` header. 3. If neither is found, treat the message as an initiating event and create a new saga instance. +# Timeouts + +A saga that waits for a message that never arrives will stay in its current state forever. Timeouts ensure every saga eventually completes — either through normal processing or by timing out. + +Mocha provides a saga-level `Timeout()` API that sets a single deadline for the entire saga instance. The timeout is scheduled when the saga is created and automatically cancelled when the saga reaches any final state. + +> **Prerequisites:** Durable, cancellable timeouts require a scheduling store. Configure `UsePostgresScheduling()` before using `Timeout()` — see [Scheduling: Set up store-based scheduling](/docs/mocha/v1/scheduling#set-up-store-based-scheduling-for-rabbitmq) for setup instructions. Native transport scheduling (InMemory, PostgreSQL) also works but does not support automatic cancellation. + +## Configure a saga-level timeout + +Call `Timeout()` on the saga descriptor to set a deadline that applies to the entire saga instance: + +```csharp +protected override void Configure(ISagaDescriptor descriptor) +{ + // 30-minute timeout — saga will time out if it doesn't reach a final state + descriptor.Timeout(TimeSpan.FromMinutes(30)) + .Respond(state => new OrderTimedOutResponse(state.Id)); + + descriptor.Initially() + .OnEvent() + .StateFactory(e => new OrderState { OrderId = e.OrderId }) + .TransitionTo("AwaitingPayment"); + + // Handle timeout in any non-final state + descriptor.DuringAny() + .OnTimeout() + .TransitionTo(StateNames.TimedOut); + + descriptor.During("AwaitingPayment") + .OnEvent() + .TransitionTo("Completed"); + + descriptor.Finally("Completed"); +} +``` + +`Timeout(TimeSpan)` does three things: + +1. Creates a "Timed Out" final state (named `StateNames.TimedOut`). +2. Returns an `ISagaFinalStateDescriptor` so you can chain `.Respond()` to send a response when the saga times out. +3. Tells the saga framework to schedule a timeout event when a new saga instance is created. + +The timeout clock starts when the saga is created — that is, when the first event arrives and a new instance is provisioned. If the saga reaches any final state before the deadline, the pending timeout is automatically cancelled. If the timeout fires, a `SagaTimedOutEvent` is delivered to the saga instance. + +Use `OnTimeout()` on a state descriptor to define what happens when the timeout arrives. Handle it the same way you handle any other event — run `.Then()` actions, dispatch commands, or transition to a different state. + +## Key behaviors + +| Behavior | Detail | +| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| Scope | Per-saga instance, not per-state. The deadline covers the entire saga lifetime. | +| Duration | Fixed at configuration time via `TimeSpan`. | +| Auto-cancellation | When the saga reaches any final state (normal completion, error final state, etc.), the pending timeout is cancelled. | +| Late delivery | If the timeout fires after the saga was already deleted, the event is silently dropped. | +| Missing handler | If no `OnTimeout()` handler is configured for the current state, the saga throws an execution error. See [troubleshooting](#timeout-troubleshooting) below. | +| Recommended pattern | `DuringAny().OnTimeout()` handles the timeout regardless of which state the saga is in. | +| Response on timeout | Chain `.Respond()` on the timed-out final state to send a response back to the original requester. | +| Scheduling store | Requires a scheduling store for durable timeouts. Configure `UsePostgresScheduling()` — see [Scheduling](/docs/mocha/v1/scheduling#set-up-store-based-scheduling-for-rabbitmq) for setup. Native transport scheduling (InMemory, PostgreSQL) also works but does not support cancellation. | + +## Timeout troubleshooting + +**Timeout never fires.** +Verify that a scheduling provider is configured. For durable, cancellable timeouts, register `UsePostgresScheduling()` with an EF Core DbContext. Native transport scheduling (InMemory, PostgreSQL) works without extra setup but does not cancel timeouts when the saga completes normally. + +**"SagaExecutionException: No transition defined for SagaTimedOutEvent."** +You configured `Timeout()` but did not add an `OnTimeout()` handler. Add a catch-all handler: + +```csharp +descriptor.DuringAny() + .OnTimeout() + .TransitionTo(StateNames.TimedOut); +``` + +# API reference + +## Saga descriptor + +| Method | Available on | Parameters | Description | +| ----------- | ------------------------- | ------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `Initially` | `ISagaDescriptor` | — | Returns the initial state descriptor for defining transitions that create new saga instances. | +| `During` | `ISagaDescriptor` | `string stateName` | Returns a state descriptor for defining transitions in the named state. | +| `DuringAny` | `ISagaDescriptor` | — | Returns a catch-all state descriptor whose transitions apply to all non-initial, non-final states. | +| `Finally` | `ISagaDescriptor` | `string stateName` | Declares a final state. When the saga enters this state, persisted state is deleted and an optional response is sent. | +| `Timeout` | `ISagaDescriptor` | `TimeSpan timeout` | Creates a timed-out final state and schedules automatic timeout on saga creation. Returns `ISagaFinalStateDescriptor` for chaining `.Respond()`. | + +## State transitions + +| Method | Available on | Parameters | Description | +| -------------- | ------------------------------ | ---------- | ------------------------------------------------------------------------------------------- | +| `OnEvent` | `ISagaStateDescriptor` | — | Registers a transition triggered by a published event. | +| `OnRequest` | `ISagaStateDescriptor` | — | Registers a transition triggered by a request message (captures reply address). | +| `OnReply` | `ISagaStateDescriptor` | — | Registers a transition triggered by a reply to a previously sent command. | +| `OnFault` | `ISagaStateDescriptor` | — | Registers a transition triggered by a `NotAcknowledgedEvent`. | +| `OnTimeout` | `ISagaStateDescriptor` | — | Registers a transition for `SagaTimedOutEvent`. Sugar for `OnRequest()`. | + +## Transition actions + +| Method | Available on | Parameters | Description | +| -------------- | ------------------------------------------- | ------------------------------ | ---------------------------------------------------------------------------------- | +| `StateFactory` | `ISagaTransitionDescriptor` | `Func` | Creates new saga state from the initiating event. Required on initial transitions. | +| `Then` | `ISagaTransitionDescriptor` | `Action` | Runs a synchronous action to update saga state. | +| `Send` | `ISagaTransitionDescriptor` | `Func` | Sends a command as a side-effect of the transition. | +| `Publish` | `ISagaTransitionDescriptor` | `Func` | Publishes an event as a side-effect of the transition. | +| `TransitionTo` | `ISagaTransitionDescriptor` | `string stateName` | Moves the saga to the named state. | + +## Final state + +| Method | Available on | Parameters | Description | +| --------- | ----------------------------------- | ---------------------- | --------------------------------------------------------------------------------- | +| `Respond` | `ISagaFinalStateDescriptor` | `Func` | Sends a response to the original requester when the saga enters this final state. | + +## Lifecycle + +| Method | Available on | Parameters | Description | +| --------------- | ------------------------------ | ---------- | ------------------------------------------------------------------------------------------ | +| `OnEntry` | `ISagaStateDescriptor` | — | Returns a lifecycle descriptor for actions that run every time the saga enters the state. | +| `WhenCompleted` | `ISagaDescriptor` | — | Returns a lifecycle descriptor for actions that run when the saga reaches any final state. | + +# Troubleshooting + +**Saga state is lost on restart.** +Saga state is stored in memory by default. For production, configure a persistent store. See [Configure saga persistence with Postgres](#configure-saga-persistence-with-postgres). + +**"No transition defined" exception.** +The saga received a message in a state that has no matching transition. Verify that every state has transitions for all expected message types. Use `DuringAny()` for catch-all transitions that apply across all non-final states. + +**Saga never completes.** +Check that the downstream service is running and sending replies. If the saga is waiting for a reply that never arrives, consider adding a [timeout](#timeouts) to ensure the saga eventually reaches a final state. + +**Two messages arrive for the same saga instance simultaneously.** +The Postgres saga store uses optimistic concurrency. The second writer detects the version mismatch, reloads state, and retries automatically. See [Concurrency](#concurrency). + # Next steps Understand how transports work in [Transports](/docs/mocha/v1/transports). diff --git a/website/src/docs/mocha/v1/scheduling.md b/website/src/docs/mocha/v1/scheduling.md index 38fc77c9475..040d68c0062 100644 --- a/website/src/docs/mocha/v1/scheduling.md +++ b/website/src/docs/mocha/v1/scheduling.md @@ -296,6 +296,8 @@ x.WhenCompleted() Both `ScheduledPublish` and `ScheduledSend` are available on `ISagaTransitionDescriptor` and `ISagaLifeCycleDescriptor`. The factory receives the current saga state and returns the message to schedule. +For automatic saga timeouts that cancel themselves on completion, see [Timeouts](/docs/mocha/v1/sagas#timeouts) in the Sagas guide. + See [Sagas](/docs/mocha/v1/sagas) for the full saga configuration guide. # API reference