Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9a967f5
retire and purge state machines
ledjon-behluli Sep 17, 2025
86cea82
Update src/Orleans.Journaling/StateMachineManager.cs
ledjon-behluli Sep 17, 2025
5a48051
move comments up
ledjon-behluli Sep 17, 2025
b64b9a8
wip
ledjon-behluli Sep 18, 2025
699dca2
time-based retirement logic
ledjon-behluli Sep 18, 2025
35e531b
merge
ledjon-behluli Sep 28, 2025
15213bd
last touches
ledjon-behluli Sep 28, 2025
6cc1612
remove test project
ledjon-behluli Sep 28, 2025
fbbd90a
remove unused usings
ledjon-behluli Sep 28, 2025
cbe7fde
prevent duplicate key error during state machine resurrection
ledjon-behluli Oct 5, 2025
14ea39a
ensure buffered data is re-applied if machine comes back
ledjon-behluli Oct 7, 2025
c45cc71
remove hacky assertion in tests
ledjon-behluli Oct 10, 2025
8419cdb
Update src/Orleans.Journaling/StateMachineManager.cs
ledjon-behluli Oct 26, 2025
dd396e0
Update src/Orleans.Journaling/StateMachineManager.cs
ledjon-behluli Oct 26, 2025
37dab7f
Update test/Orleans.Journaling.Tests/StateMachineManagerTests.cs
ledjon-behluli Oct 26, 2025
fdd14e6
Update test/Orleans.Journaling.Tests/StateMachineManagerTests.cs
ledjon-behluli Oct 26, 2025
3de46b7
Update test/Orleans.Journaling.Tests/StateMachineManagerTests.cs
ledjon-behluli Oct 26, 2025
40ce19c
Fix table creation logging to reflect existing tables correctly (#9696)
egil Sep 30, 2025
75e7916
Fix ZooKeeper CI tests: correct service name and use official Docker …
Copilot Oct 1, 2025
167d225
Fix Consul CI tests by using compatible Consul version (#9701)
Copilot Oct 2, 2025
e7c922d
Remove explicit ActivityStatusCode.Ok setting to comply with OpenTele…
Copilot Oct 3, 2025
b91aeee
chore(deps): bump dotnet-sdk from 9.0.303 to 9.0.305 (#9677)
dependabot[bot] Oct 3, 2025
2a917e0
Fix race condition in `TransactionInfo.Fork` `PendingCalls` increment…
bknote71 Oct 7, 2025
8ea8f8f
[FIX] Potential NRE in the activation repartitioner (#9713)
ledjon-behluli Oct 8, 2025
2495095
Remove infinite timespan reminder checks (#9715)
Da-Teach Oct 15, 2025
53b8678
ResponseCompletionSource: RunContinuationsAsynchronously (#9724)
ReubenBond Oct 22, 2025
c0c11fb
chore(deps): bump dotnet-sdk from 9.0.305 to 9.0.306 (#9720)
dependabot[bot] Oct 22, 2025
2f389d8
Add `BigInteger` codec (#9669)
galvesribeiro Oct 22, 2025
a226ad5
StateMachineManager: start work loop during activation (#9725)
ReubenBond Oct 23, 2025
e256d97
Flag PostgreSQL as asynchronous (#9705)
hiyelbaz Oct 23, 2025
f9eb0c7
Enable GitHub merge queue (#9727)
ReubenBond Oct 23, 2025
8621812
Add missing validation for the NATS stream provider (#9668)
galvesribeiro Oct 23, 2025
420da38
`ActivationDataActivatorProvider`: run grain constructor on grain sch…
ReubenBond Oct 24, 2025
3847b58
Modifying Endpoint to use Pascale Casing as EndPoint for consistency …
ramzimort Oct 26, 2025
ac665c5
treat retirement tracker as a first-class (internal) machine
ledjon-behluli Oct 26, 2025
2cc6ffd
Merge branch 'main' into retire-state-machines
ledjon-behluli Oct 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Orleans.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
<Project Path="src/Cassandra/Orleans.Clustering.Cassandra/Orleans.Clustering.Cassandra.csproj" />
</Folder>
<Folder Name="/src/Extensions/NATS/">
<Project Path="src\Orleans.Streaming.NATS\Orleans.Streaming.NATS.csproj" Type="Classic C#" />
<Project Path="src\Orleans.Streaming.NATS\Orleans.Streaming.NATS.csproj" />
</Folder>
<Folder Name="/src/Extensions/Redis/">
<Project Path="src/Redis/Orleans.Clustering.Redis/Orleans.Clustering.Redis.csproj" />
Expand Down Expand Up @@ -133,7 +133,7 @@
<Project Path="test/Extensions/TesterAdoNet/Tester.AdoNet.csproj" />
<Project Path="test/Extensions/TesterAzureUtils/Tester.AzureUtils.csproj" />
<Project Path="test/Extensions/TesterZooKeeperUtils/Tester.ZooKeeperUtils.csproj" />
<Project Path="test\Extensions\NATS.Tests\NATS.Tests.csproj" Type="Classic C#" />
<Project Path="test\Extensions\NATS.Tests\NATS.Tests.csproj" />
</Folder>
<Folder Name="/test/Grains/">
<Project Path="test/Grains/TestFSharp/TestFSharp.fsproj" />
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Journaling/DurableDictionary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ private void ApplySet(K key, V value)
OnSet(key, value);
}

private bool ApplyRemove(K key) => _items.Remove(key);
internal bool ApplyRemove(K key) => _items.Remove(key);
private void ApplyClear() => _items.Clear();

private IStateMachineLogWriter GetStorage()
protected virtual IStateMachineLogWriter GetStorage()
{
Debug.Assert(_storage is not null);
return _storage;
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Journaling/DurableNothing.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Buffers;
using System.Buffers;
using Microsoft.Extensions.DependencyInjection;

namespace Orleans.Journaling;
Expand Down
3 changes: 3 additions & 0 deletions src/Orleans.Journaling/HostingExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Orleans.Configuration;

namespace Orleans.Journaling;

public static class HostingExtensions
{
public static ISiloBuilder AddStateMachineStorage(this ISiloBuilder builder)
{
builder.Services.AddOptions<StateMachineManagerOptions>();
builder.Services.TryAddScoped<IStateMachineStorage>(sp => sp.GetRequiredService<IStateMachineStorageProvider>().Create(sp.GetRequiredService<IGrainContext>()));
builder.Services.TryAddScoped<IStateMachineManager, StateMachineManager>();
builder.Services.TryAddKeyedScoped(typeof(IDurableDictionary<,>), KeyedService.AnyKey, typeof(DurableDictionary<,>));
Expand Down
189 changes: 172 additions & 17 deletions src/Orleans.Journaling/StateMachineManager.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using System.Buffers;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Runtime.Internal;
using Orleans.Serialization.Codecs;
using Orleans.Serialization.Session;
Expand All @@ -13,15 +15,19 @@ internal sealed partial class StateMachineManager : IStateMachineManager, ILifec
private const int MinApplicationStateMachineId = 8;
private static readonly StringCodec StringCodec = new();
private static readonly UInt64Codec UInt64Codec = new();
private static readonly DateTimeCodec DateTimeCodec = new();
private readonly object _lock = new();
private readonly Dictionary<string, IDurableStateMachine> _stateMachines = new(StringComparer.Ordinal);
private readonly Dictionary<ulong, IDurableStateMachine> _stateMachinesMap = [];
private readonly IStateMachineStorage _storage;
private readonly ILogger<StateMachineManager> _logger;
private readonly TimeProvider _timeProvider;
private readonly SingleWaiterAutoResetEvent _workSignal = new() { RunContinuationsAsynchronously = true };
private readonly Queue<WorkItem> _workQueue = new();
private readonly CancellationTokenSource _shutdownCancellation = new();
private readonly StateMachineManagerState _stateMachineIds;
private readonly StateMachinesRetirementTracker _retirementTracker;
private readonly TimeSpan _retirementGracePeriod;
private Task? _workLoop;
private ManagerState _state;
private Task? _pendingWrite;
Expand All @@ -31,25 +37,59 @@ internal sealed partial class StateMachineManager : IStateMachineManager, ILifec
public StateMachineManager(
IStateMachineStorage storage,
ILogger<StateMachineManager> logger,
SerializerSessionPool serializerSessionPool)
IOptions<StateMachineManagerOptions> options,
SerializerSessionPool serializerSessionPool,
TimeProvider timeProvider)
{
_storage = storage;
_logger = logger;
_timeProvider = timeProvider;
_retirementGracePeriod = options.Value.RetirementGracePeriod;

// The list of known state machines is itself stored as a durable state machine with the implicit id 0.
// This allows us to recover the list of state machines ids without having to store it separately.
_stateMachineIds = new StateMachineManagerState(this, StringCodec, UInt64Codec, serializerSessionPool);
_stateMachinesMap[0] = _stateMachineIds;
_stateMachinesMap[StateMachineManagerState.Id] = _stateMachineIds;

// The retirement tracker is a special internal state machine with a fixed id.
// It is not stored in _stateMachineIds and does not participate in the general name->id mapping.
_retirementTracker = new StateMachinesRetirementTracker(this, StringCodec, DateTimeCodec, serializerSessionPool);
Comment thread
ledjon-behluli marked this conversation as resolved.
_stateMachinesMap[StateMachinesRetirementTracker.Id] = _retirementTracker;
}

public void RegisterStateMachine(string name, IDurableStateMachine stateMachine)
{
_shutdownCancellation.Token.ThrowIfCancellationRequested();
ArgumentNullException.ThrowIfNullOrEmpty(name);
_shutdownCancellation.Token.ThrowIfCancellationRequested();

lock (_lock)
{
_stateMachines.Add(name, stateMachine);
if (_stateMachines.TryGetValue(name, out var machine))
{
if (machine is RetiredStateMachineVessel vessel)
{
// If the existing machine is a vessel for a retired one, it means the machine was loaded from a previous
// log during recovery but has not been re-registered. We effectively are "staging" the resurrection of the machine.
// The removal from the tracker is handled within the serialized loop. This is to prevent logical race conditions with the recovery process.
// We also make sure to apply any buffered data that could have occured while the vessel took this machine's place.
stateMachine.Reset(new StateMachineLogWriter(this, new(_stateMachineIds[name])));
foreach (var entry in vessel.BufferedData)
{
stateMachine.Apply(new ReadOnlySequence<byte>(entry));
}
_stateMachines[name] = stateMachine;
}
else
{
// A real state machine is already registered with this name, this must be a developer error.
throw new ArgumentException($"A state machine with the key '{name}' has already been registered.");
}
}
else
{
_stateMachines.Add(name, stateMachine);
}

_workQueue.Enqueue(new WorkItem(WorkItemType.RegisterStateMachine, completion: null)
{
Context = name
Expand Down Expand Up @@ -125,28 +165,32 @@ private async Task WorkLoop()
// If the current log length is greater than the snapshot size, then take a snapshot instead of appending more log entries.
var isSnapshot = workItem.Type is WorkItemType.WriteSnapshot;
LogExtentBuilder? logSegment;

lock (_lock)
{
if (isSnapshot && _currentLogSegment is { } existingSegment)
if (isSnapshot)
{
// If there are pending writes, reset them since they will be captured by the snapshot instead.
// If we did not do this, the log would begin with some writes which would be followed by a snapshot which also included those writes.
existingSegment.Reset();
}
else
{
_currentLogSegment ??= new();
_currentLogSegment?.Reset();

if (_retirementTracker.Count > 0)
{
RetireOrResurectStateMachines();
}
}

_currentLogSegment ??= new();

// The map of state machine ids is itself stored as a durable state machine with the id 0.
// This must be stored first, since it includes the identities of all other state machines, which are needed when replaying the log.
// If we removed retired machines, this snapshot will persist that change.
AppendUpdatesOrSnapshotStateMachine(_currentLogSegment, isSnapshot, 0, _stateMachineIds);

foreach (var (id, stateMachine) in _stateMachinesMap)
{
if (id is 0 || stateMachine is null)
{
// Skip state machines which have been removed.
continue;
}

Expand Down Expand Up @@ -256,6 +300,39 @@ private async Task WorkLoop()
}
}

private void RetireOrResurectStateMachines()
{
foreach (var (name, timestamp) in _retirementTracker)
{
var isDuetime = _timeProvider.GetUtcNow().UtcDateTime - timestamp >= _retirementGracePeriod;
if (isDuetime && _stateMachineIds.TryGetValue(name, out var id))
{
var stateMachine = _stateMachines[name];

Debug.Assert(stateMachine is not null);

if (stateMachine is RetiredStateMachineVessel)
{
LogRemovingRetiredStateMachine(_logger, name);

// Since we are permanently removing this state machine, we will clean it up by reseting it.
stateMachine.Reset(new StateMachineLogWriter(this, new(id)));

_stateMachinesMap.Remove(id);
// We remove these from memory only, since the snapshot will persist these changes.
_stateMachineIds.ApplyRemove(name);
_retirementTracker.ApplyRemove(name);
}
else
{
LogRetiredStateMachineComebackDetected(_logger, name);
// We remove the tracker from memory only, since the snapshot will persist the change.
_retirementTracker.ApplyRemove(name);
}
}
}
}

private static void AppendUpdatesOrSnapshotStateMachine(LogExtentBuilder logSegment, bool isSnapshot, ulong id, IDurableStateMachine stateMachine)
{
var writer = logSegment.CreateLogWriter(new(id));
Expand Down Expand Up @@ -285,8 +362,12 @@ public async ValueTask DeleteStateAsync(CancellationToken cancellationToken)

private async Task RecoverAsync(CancellationToken cancellationToken)
{
_stateMachineIds.ResetVolatileState();
await foreach (var segment in _storage.ReadAsync(cancellationToken))
lock (_lock)
{
_stateMachineIds.ResetVolatileState();
}

await foreach (var segment in _storage.ReadAsync(cancellationToken).ConfigureAwait(false))
{
cancellationToken.ThrowIfCancellationRequested();
try
Expand All @@ -305,9 +386,18 @@ private async Task RecoverAsync(CancellationToken cancellationToken)

lock (_lock)
{
foreach (var stateMachine in _stateMachines.Values)
foreach ((var name, var stateMachine) in _stateMachines)
{
stateMachine.OnRecoveryCompleted();

if (stateMachine is RetiredStateMachineVessel)
{
// We can use TryAdd since recovery has finished.
if (_retirementTracker.TryAdd(name, _timeProvider.GetUtcNow().UtcDateTime))
{
LogRetiredStateMachineDetected(_logger, name);
}
}
}
}
}
Expand Down Expand Up @@ -366,7 +456,13 @@ private void OnSetStateMachineId(string name, ulong id)
}
else
{
throw new InvalidOperationException($"State machine \"{name}\" (id: {id}) has not been registered on this state machine manager.");
var vessel = new RetiredStateMachineVessel();

// We must not make the vessel self-register with the manager, since it will
// result in a late-registration after the manager is 'ready'. Instead we add it inline here.

_stateMachines.Add(name, vessel);
_stateMachinesMap[id] = vessel;
}
}
}
Expand Down Expand Up @@ -435,7 +531,7 @@ private enum WorkItemType
private enum ManagerState
{
Unknown,
Ready,
Ready
}

private sealed class StateMachineManagerState(
Expand All @@ -444,15 +540,74 @@ private sealed class StateMachineManagerState(
IFieldCodec<ulong> valueCodec,
SerializerSessionPool serializerSessionPool) : DurableDictionary<string, ulong>(keyCodec, valueCodec, serializerSessionPool)
{
public const int Id = 0;

private readonly StateMachineManager _manager = manager;

public void ResetVolatileState() => ((IDurableStateMachine)this).Reset(new StateMachineLogWriter(_manager, new(0)));
public void ResetVolatileState() => ((IDurableStateMachine)this).Reset(new StateMachineLogWriter(_manager, new(Id)));

protected override void OnSet(string key, ulong value) => _manager.OnSetStateMachineId(key, value);
}

/// <summary>
/// Used to track state machines that are not registered via user-code anymore, until time-based purging has elapsed.
/// </summary>
/// <remarks>Resurrecting of retired machines is supported.</remarks>
private sealed class StateMachinesRetirementTracker(
StateMachineManager manager, IFieldCodec<string> keyCodec, IFieldCodec<DateTime> valueCodec, SerializerSessionPool sessionPool)
: DurableDictionary<string, DateTime>(keyCodec, valueCodec, sessionPool)
{
public const int Id = 1;

private readonly StateMachineLogWriter _logWriter = new(manager, new(Id));

protected override IStateMachineLogWriter GetStorage() => _logWriter;
}

/// <summary>
/// Used to keep retired machines into a purgatory state until time-based purging or if a comeback occurs.
/// This keeps buffering entries and dumps them back into the log upon compaction.
/// </summary>
[DebuggerDisplay(nameof(RetiredStateMachineVessel))]
private sealed class RetiredStateMachineVessel : IDurableStateMachine
{
private readonly List<byte[]> _bufferedData = [];

public ReadOnlyCollection<byte[]> BufferedData => _bufferedData.AsReadOnly();

void IDurableStateMachine.AppendSnapshot(StateMachineStorageWriter snapshotWriter)
{
foreach (var data in _bufferedData)
{
snapshotWriter.AppendEntry(data);
}
}

void IDurableStateMachine.Reset(IStateMachineLogWriter storage) => _bufferedData.Clear();
void IDurableStateMachine.Apply(ReadOnlySequence<byte> logEntry) => _bufferedData.Add(logEntry.ToArray());
void IDurableStateMachine.AppendEntries(StateMachineStorageWriter logWriter) { }
IDurableStateMachine IDurableStateMachine.DeepCopy() => throw new NotSupportedException();
}


[LoggerMessage(
Level = LogLevel.Error,
Message = "Error processing work items.")]
private static partial void LogErrorProcessingWorkItems(ILogger logger, Exception exception);

[LoggerMessage(
Level = LogLevel.Information,
Message = "State machine \"{Name}\" was not found. I have substituted a placeholder for graceful time-based retirement.")]
private static partial void LogRetiredStateMachineDetected(ILogger logger, string name);

[LoggerMessage(
Level = LogLevel.Information,
Message = "State machine \"{Name}\" was previously retired (but not removed), and has hence been re-introduced. " +
"There is still time left before its permanent removal, so I will resurrect it.")]
private static partial void LogRetiredStateMachineComebackDetected(ILogger logger, string name);

[LoggerMessage(
Level = LogLevel.Information,
Message = "Removing retired state machine \"{Name}\" and its data. Operation will be durably persisted shortly after compaction has finalized.")]
private static partial void LogRemovingRetiredStateMachine(ILogger logger, string name);
}
26 changes: 26 additions & 0 deletions src/Orleans.Journaling/StateMachineManagerOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace Orleans.Journaling;

/// <summary>
/// Options to configure the <see cref="IStateMachineManager"/>.
/// </summary>
public sealed class StateMachineManagerOptions
{
/// <summary>
/// Specifies the period of time to wait until the manager retires
/// a <see cref="IDurableStateMachine"/> if its not registered in the manager anymore.
/// </summary>
/// <remarks>
/// <para>The act of retirement removes this state machine from the log.</para>
/// <para>If the state machine is reintroduced (within the grace period), than it will not be removed by the manager.</para>
/// <para>
/// This value represents the <b>minimum</b> time the fate of the state machine will be postponed.
/// The final decision can take longer - usually <see cref="RetirementGracePeriod"/> + [time until next compaction occurs].
/// </para>
/// </remarks>
public TimeSpan RetirementGracePeriod { get; set; } = DEFAULT_RETIREMENT_GRACE_PERIOD;

/// <summary>
/// The default value of <see cref="RetirementGracePeriod"/>.
/// </summary>
public static readonly TimeSpan DEFAULT_RETIREMENT_GRACE_PERIOD = TimeSpan.FromDays(7);
}
Loading