Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/Mocha/src/Mocha/Sagas/Definitions/SagaConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ public sealed class SagaConfiguration : MessagingConfiguration
/// </summary>
public SagaStateConfiguration? DuringAny { get; set; }

/// <summary>
/// Gets or sets the timeout duration for the saga. When set, the saga will automatically
/// transition to the timed-out final state after this duration.
/// </summary>
public TimeSpan? Timeout { get; set; }

/// <summary>
/// Gets or sets a factory for creating a custom saga state serializer.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ public static ISagaFinalStateDescriptor<TState> Timeout<TState>(
TimeSpan timeout)
where TState : SagaStateBase
{
// TODO for this we need scheduling
throw new NotImplementedException();
descriptor.Extend().Configuration.Timeout = timeout;

return descriptor.Finally(StateNames.TimedOut);
}
}
41 changes: 39 additions & 2 deletions src/Mocha/src/Mocha/Sagas/Saga.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Immutable;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Mocha.Features;

Expand Down Expand Up @@ -213,8 +214,7 @@ protected Saga()

/// <inheritdoc />
/// <exception cref="InvalidOperationException">Thrown when the saga has not been initialized.</exception>
public override IReadOnlyDictionary<string, SagaState> States
=> _states ?? throw ThrowHelper.SagaNotInitialized();
public override IReadOnlyDictionary<string, SagaState> States => _states ?? throw ThrowHelper.SagaNotInitialized();

/// <inheritdoc />
public override Type StateType => typeof(TState);
Expand Down Expand Up @@ -257,6 +257,23 @@ public override async Task<bool> HandleEvent(IConsumeContext context)
_logger!.CreatedSagaState(Name, state.Id);

await OnEnterStateAsync(state, _initialState, context);

// Schedule timeout if configured
if (Configuration.Timeout is { } timeout)
{
var bus = context.GetBus();

var timeProvider = context.Services.GetService<TimeProvider>() ?? TimeProvider.System;

var scheduledTime = timeProvider.GetUtcNow().Add(timeout);

var result = await bus.ScheduleSendAsync(new SagaTimedOutEvent(state.Id), scheduledTime, ct);

if (result.Token is not null)
{
state.TimeoutToken = result.Token;
}
}
}

await OnHandleTransitionAsync(state, @event, context);
Expand Down Expand Up @@ -397,6 +414,18 @@ protected virtual async Task OnEnterStateAsync(TState state, SagaState nextState

if (nextState.IsFinal)
{
if (state.TimeoutToken is { } timeoutToken)
{
try
{
await context.GetBus().CancelScheduledMessageAsync(timeoutToken, ct);
}
catch (Exception ex)
{
_logger?.FailedToCancelScheduledMessage(ex, timeoutToken, Name!, state.Id);
}
}

if (nextState.Response is not null
&& state.Metadata.TryGet(SagaContextData.ReplyAddress, out var replyTo)
&& state.Metadata.TryGet(SagaContextData.CorrelationId, out var correlationId)
Expand Down Expand Up @@ -602,4 +631,12 @@ public static partial void ReplyingToSaga(

[LoggerMessage(LogLevel.Information, "Saga completed {SagaName} {SagaId}")]
public static partial void SagaCompleted(this ILogger logger, string sagaName, Guid sagaId);

[LoggerMessage(LogLevel.Warning, "Failed to cancel scheduled message {Token} for saga {SagaName} ({SagaId})")]
public static partial void FailedToCancelScheduledMessage(
this ILogger logger,
Exception ex,
string token,
string sagaName,
Guid sagaId);
}
6 changes: 6 additions & 0 deletions src/Mocha/src/Mocha/Sagas/State/SagaStateBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public SagaStateBase() : this(Guid.NewGuid(), StateNames.Initial) { }
/// </summary>
public List<SagaError> Errors { get; set; } = [];

/// <summary>
/// Gets or sets the cancellation token for a scheduled timeout, used to cancel the timeout
/// when the saga completes or transitions before the timeout fires.
/// </summary>
public string? TimeoutToken { get; set; }

/// <summary>
/// Gets or sets custom metadata associated with this saga instance.
/// </summary>
Expand Down
61 changes: 46 additions & 15 deletions src/Mocha/test/Mocha.Sagas.TestHelpers/TestMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ namespace Mocha.Sagas.Tests;
/// </summary>
public sealed class TestMessageBus(TestMessageOutbox outbox) : IMessageBus
{
private int _scheduleCounter;

public List<string> CancelledTokens { get; } = [];

public ValueTask PublishAsync<T>(T message, CancellationToken cancellationToken)
{
outbox.Messages.Add(new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Publish, message!, null));
Expand Down Expand Up @@ -68,32 +72,52 @@ public ValueTask ReplyAsync<TResponse>(
public ValueTask<SchedulingResult> SchedulePublishAsync<T>(
T message,
DateTimeOffset scheduledTime,
CancellationToken cancellationToken) where T : notnull
CancellationToken cancellationToken)
where T : notnull
{
outbox.Messages.Add(
new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Publish, message, null));
return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime });
var token = $"test:{Interlocked.Increment(ref _scheduleCounter)}";
outbox.Messages.Add(new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Publish, message, null));
return ValueTask.FromResult(
new SchedulingResult
{
Token = token,
ScheduledTime = scheduledTime,
IsCancellable = true
});
}

public ValueTask<SchedulingResult> SchedulePublishAsync<T>(
T message,
DateTimeOffset scheduledTime,
PublishOptions options,
CancellationToken cancellationToken) where T : notnull
CancellationToken cancellationToken)
where T : notnull
{
outbox.Messages.Add(
new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Publish, message, options));
return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime });
var token = $"test:{Interlocked.Increment(ref _scheduleCounter)}";
outbox.Messages.Add(new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Publish, message, options));
return ValueTask.FromResult(
new SchedulingResult
{
Token = token,
ScheduledTime = scheduledTime,
IsCancellable = true
});
}

public ValueTask<SchedulingResult> ScheduleSendAsync(
object message,
DateTimeOffset scheduledTime,
CancellationToken cancellationToken)
{
outbox.Messages.Add(
new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Send, message, null));
return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime });
var token = $"test:{Interlocked.Increment(ref _scheduleCounter)}";
outbox.Messages.Add(new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Send, message, null));
return ValueTask.FromResult(
new SchedulingResult
{
Token = token,
ScheduledTime = scheduledTime,
IsCancellable = true
});
}

public ValueTask<SchedulingResult> ScheduleSendAsync(
Expand All @@ -102,13 +126,20 @@ public ValueTask<SchedulingResult> ScheduleSendAsync(
SendOptions options,
CancellationToken cancellationToken)
{
outbox.Messages.Add(
new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Send, message, options));
return ValueTask.FromResult(new SchedulingResult { ScheduledTime = scheduledTime });
var token = $"test:{Interlocked.Increment(ref _scheduleCounter)}";
outbox.Messages.Add(new TestMessageOutbox.Operation(TestMessageOutbox.OperationKind.Send, message, options));
return ValueTask.FromResult(
new SchedulingResult
{
Token = token,
ScheduledTime = scheduledTime,
IsCancellable = true
});
}

public ValueTask<bool> CancelScheduledMessageAsync(string token, CancellationToken cancellationToken)
{
return ValueTask.FromResult(false);
CancelledTokens.Add(token);
return ValueTask.FromResult(true);
}
}
60 changes: 58 additions & 2 deletions src/Mocha/test/Mocha.Sagas.Tests/IntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,39 @@ public async Task Saga_Should_Timeout()
Assert.Equal(0, storage.Count);
}

[Fact(Skip = "Timeout(TimeSpan) auto-scheduling throws NotImplementedException")]
[Fact]
public async Task Saga_Should_TimeoutWithCustomResponse()
{
await Task.CompletedTask;
// arrange
var recorder = new MessageRecorder();
await using var provider = await CreateBusAsync(b =>
{
b.Services.AddSingleton(recorder);
b.AddEventHandler<TriggerEventRecorder>();
b.AddSaga<TimeoutWithResponseSaga>();
});

using var scope = provider.CreateScope();
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
var storage = provider.GetRequiredService<InMemorySagaStateStorage>();

// act - publish StartTimeoutEvent to start the saga
await bus.PublishAsync(new StartTimeoutEvent(), CancellationToken.None);

// wait for the saga to publish a TriggerEvent so we can observe it started
Assert.True(await recorder.WaitAsync(s_timeout), "Saga did not publish event within timeout");

var recorded = Assert.Single(recorder.Messages);
var sagaId = Assert.IsType<TriggerEvent>(recorded).CorrelationId!.Value;

// simulate timeout by sending SagaTimedOutEvent with the saga ID
await bus.SendAsync(new SagaTimedOutEvent(sagaId), CancellationToken.None);

// allow time for final state processing
await Task.Delay(500, default);

// assert - saga should be deleted from store after reaching final state
Assert.Equal(0, storage.Count);
}

[Fact(Skip = "OnAnyReply saga reply routing requires full reply pipeline investigation")]
Expand Down Expand Up @@ -183,6 +212,31 @@ protected override void Configure(ISagaDescriptor<TimeoutState> descriptor)
}
}

/// <summary>
/// Timeout saga with Timeout() API: StartTimeoutEvent -> Active (publishes TriggerEvent) ->
/// SagaTimedOutEvent -> TimedOut (final, with custom response possible)
/// </summary>
public sealed class TimeoutWithResponseSaga : Saga<TimeoutState>
{
protected override void Configure(ISagaDescriptor<TimeoutState> descriptor)
{
descriptor.Timeout(TimeSpan.FromMinutes(30));

descriptor
.Initially()
.OnEvent<StartTimeoutEvent>()
.StateFactory(_ => new TimeoutState())
.Publish((_, state) => new TriggerEvent(state.Id))
.TransitionTo("Active");

descriptor.DuringAny().OnTimeout().TransitionTo(StateNames.TimedOut);

descriptor.During("Active").OnEvent<EndTimeoutEvent>().TransitionTo("Completed");

descriptor.Finally("Completed");
}
}

/// <summary>
/// Request-response saga: StartRequestEvent -> AwaitingResponse (sends TriggerRequest) ->
/// any reply -> Completed (final)
Expand Down Expand Up @@ -212,6 +266,8 @@ public sealed class InitEvent;

public sealed class StartTimeoutEvent;

public sealed class EndTimeoutEvent;

public sealed class StartRequestEvent;

public sealed record TriggerEvent(Guid? CorrelationId) : ICorrelatable;
Expand Down
22 changes: 6 additions & 16 deletions src/Mocha/test/Mocha.Sagas.Tests/SagaSchedulingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,9 @@ public async Task ScheduledPublish_Lifecycle_Should_ProducePublishOptionsWithSch
var saga =
Saga.Create<TestState>(x =>
{
x.Initially()
.OnEvent<StartEvent>()
.StateFactory(_ => new TestState())
.TransitionTo("Started");
x.Initially().OnEvent<StartEvent>().StateFactory(_ => new TestState()).TransitionTo("Started");

x.During("Started")
.OnEntry()
.ScheduledPublish(delay, _ => new ScheduledNotification());
x.During("Started").OnEntry().ScheduledPublish(delay, _ => new ScheduledNotification());

x.During("Started").OnEvent<EndEvent>().TransitionTo("Ended");

Expand Down Expand Up @@ -144,14 +139,9 @@ public async Task ScheduledSend_Lifecycle_Should_ProduceSendOptionsWithScheduled
var saga =
Saga.Create<TestState>(x =>
{
x.Initially()
.OnEvent<StartEvent>()
.StateFactory(_ => new TestState())
.TransitionTo("Started");
x.Initially().OnEvent<StartEvent>().StateFactory(_ => new TestState()).TransitionTo("Started");

x.During("Started")
.OnEntry()
.ScheduledSend(delay, _ => new ScheduledCommand());
x.During("Started").OnEntry().ScheduledSend(delay, _ => new ScheduledCommand());

x.During("Started").OnEvent<EndEvent>().TransitionTo("Ended");

Expand Down Expand Up @@ -182,8 +172,8 @@ private TestConsumeContext CreateContext(Saga saga, object message)
Runtime = s_runtime
};

context.Features.Configure<MessageParsingFeature>(f => f.Message = message);
context.Features.Configure<SagaFeature>(f => f.Store = _store);
context.Features.GetOrSet<MessageParsingFeature>().Message = message;
context.Features.GetOrSet<SagaFeature>().Store = _store;

return context;
}
Expand Down
Loading
Loading