Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9a967f5
retire and purge state machines
ledjon-behluli Sep 17, 2025
86cea82
Update src/Orleans.Journaling/StateMachineManager.cs
ledjon-behluli Sep 17, 2025
5a48051
move comments up
ledjon-behluli Sep 17, 2025
b64b9a8
wip
ledjon-behluli Sep 18, 2025
699dca2
time-based retirement logic
ledjon-behluli Sep 18, 2025
35e531b
merge
ledjon-behluli Sep 28, 2025
15213bd
last touches
ledjon-behluli Sep 28, 2025
6cc1612
remove test project
ledjon-behluli Sep 28, 2025
fbbd90a
remove unused usings
ledjon-behluli Sep 28, 2025
cbe7fde
prevent duplicate key error during state machine resurrection
ledjon-behluli Oct 5, 2025
14ea39a
ensure buffered data is re-applied if machine comes back
ledjon-behluli Oct 7, 2025
c45cc71
remove hacky assertion in tests
ledjon-behluli Oct 10, 2025
8419cdb
Update src/Orleans.Journaling/StateMachineManager.cs
ledjon-behluli Oct 26, 2025
dd396e0
Update src/Orleans.Journaling/StateMachineManager.cs
ledjon-behluli Oct 26, 2025
37dab7f
Update test/Orleans.Journaling.Tests/StateMachineManagerTests.cs
ledjon-behluli Oct 26, 2025
fdd14e6
Update test/Orleans.Journaling.Tests/StateMachineManagerTests.cs
ledjon-behluli Oct 26, 2025
3de46b7
Update test/Orleans.Journaling.Tests/StateMachineManagerTests.cs
ledjon-behluli Oct 26, 2025
40ce19c
Fix table creation logging to reflect existing tables correctly (#9696)
egil Sep 30, 2025
75e7916
Fix ZooKeeper CI tests: correct service name and use official Docker …
Copilot Oct 1, 2025
167d225
Fix Consul CI tests by using compatible Consul version (#9701)
Copilot Oct 2, 2025
e7c922d
Remove explicit ActivityStatusCode.Ok setting to comply with OpenTele…
Copilot Oct 3, 2025
b91aeee
chore(deps): bump dotnet-sdk from 9.0.303 to 9.0.305 (#9677)
dependabot[bot] Oct 3, 2025
2a917e0
Fix race condition in `TransactionInfo.Fork` `PendingCalls` increment…
bknote71 Oct 7, 2025
8ea8f8f
[FIX] Potential NRE in the activation repartitioner (#9713)
ledjon-behluli Oct 8, 2025
2495095
Remove infinite timespan reminder checks (#9715)
Da-Teach Oct 15, 2025
53b8678
ResponseCompletionSource: RunContinuationsAsynchronously (#9724)
ReubenBond Oct 22, 2025
c0c11fb
chore(deps): bump dotnet-sdk from 9.0.305 to 9.0.306 (#9720)
dependabot[bot] Oct 22, 2025
2f389d8
Add `BigInteger` codec (#9669)
galvesribeiro Oct 22, 2025
a226ad5
StateMachineManager: start work loop during activation (#9725)
ReubenBond Oct 23, 2025
e256d97
Flag PostgreSQL as asynchronous (#9705)
hiyelbaz Oct 23, 2025
f9eb0c7
Enable GitHub merge queue (#9727)
ReubenBond Oct 23, 2025
8621812
Add missing validation for the NATS stream provider (#9668)
galvesribeiro Oct 23, 2025
420da38
`ActivationDataActivatorProvider`: run grain constructor on grain sch…
ReubenBond Oct 24, 2025
3847b58
Modifying Endpoint to use Pascale Casing as EndPoint for consistency …
ramzimort Oct 26, 2025
ac665c5
treat retirement tracker as a first-class (internal) machine
ledjon-behluli Oct 26, 2025
2cc6ffd
Merge branch 'main' into retire-state-machines
ledjon-behluli Oct 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions src/Orleans.Journaling/DurableNothing.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
using System.Buffers;
using System.Buffers;
using System.Diagnostics;
using Microsoft.Extensions.DependencyInjection;

namespace Orleans.Journaling;

/// <summary>
/// A durable object which does nothing, used for retiring other durable types.
/// </summary>
public interface IDurableNothing
[DebuggerDisplay("DurableNothing")]
internal sealed class DurableNothing : IDurableStateMachine
{
}
public string StateMachineKey { get; }

/// <summary>
/// A durable object which does nothing, used for retiring other durable types.
/// </summary>
internal sealed class DurableNothing : IDurableNothing, IDurableStateMachine
{
public DurableNothing([ServiceKey] string key, IStateMachineManager manager)
{
ArgumentNullException.ThrowIfNullOrEmpty(key);
manager.RegisterStateMachine(key, this);
StateMachineKey = key;
}

void IDurableStateMachine.Reset(IStateMachineLogWriter storage) { }
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Journaling/HostingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace Orleans.Journaling;

public static class HostingExtensions
{
public static ISiloBuilder AddStateMachineStorage(this ISiloBuilder builder)
Expand All @@ -15,7 +16,6 @@ public static ISiloBuilder AddStateMachineStorage(this ISiloBuilder builder)
builder.Services.TryAddKeyedScoped(typeof(IDurableValue<>), KeyedService.AnyKey, typeof(DurableValue<>));
builder.Services.TryAddKeyedScoped(typeof(IPersistentState<>), KeyedService.AnyKey, typeof(DurableState<>));
builder.Services.TryAddKeyedScoped(typeof(IDurableTaskCompletionSource<>), KeyedService.AnyKey, typeof(DurableTaskCompletionSource<>));
builder.Services.TryAddKeyedScoped(typeof(IDurableNothing), KeyedService.AnyKey, typeof(DurableNothing));
return builder;
}
}
50 changes: 43 additions & 7 deletions src/Orleans.Journaling/StateMachineManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ internal sealed partial class StateMachineManager : IStateMachineManager, ILifec
private readonly Task _workLoop;
private ManagerState _state;
private Task? _pendingWrite;
private bool _hasStateMachineToRetire;
private ulong _nextStateMachineId = MinApplicationStateMachineId;
private LogExtentBuilder? _currentLogSegment;

Expand Down Expand Up @@ -125,21 +126,38 @@ private async Task WorkLoop()
// If the current log length is greater than the snapshot size, then take a snapshot instead of appending more log entries.
var isSnapshot = workItem.Type is WorkItemType.WriteSnapshot;
LogExtentBuilder? logSegment;

lock (_lock)
{
if (isSnapshot && _currentLogSegment is { } existingSegment)
if (isSnapshot)
{
// If there are pending writes, reset them since they will be captured by the snapshot instead.
// If we did not do this, the log would begin with some writes which would be followed by a snapshot which also included those writes.
existingSegment.Reset();
}
else
{
_currentLogSegment ??= new();
_currentLogSegment?.Reset();

if (_hasStateMachineToRetire) // We use this flag because the majooority of times, there wont be any state machine to retire.
Comment thread
ledjon-behluli marked this conversation as resolved.
Outdated
{
// Since this is a snapshot, we use the opportunity to purge retired state machines.
foreach (var (id, machine) in _stateMachinesMap)
Comment thread
ledjon-behluli marked this conversation as resolved.
Outdated
{
if (machine is DurableNothing retired)
{
var name = retired.StateMachineKey;

LogPurgingRetiredStateMachine(_logger, name, id);

_stateMachinesMap.Remove(id);
_stateMachineIds.Remove(name); // This will take effect when the snapshot is persisted below.
}
}
}
}

_currentLogSegment ??= new();

// The map of state machine ids is itself stored as a durable state machine with the id 0.
// This must be stored first, since it includes the identities of all other state machines, which are needed when replaying the log.
// If we removed retired machines, this snapshot will persist that change.
AppendUpdatesOrSnapshotStateMachine(_currentLogSegment, isSnapshot, 0, _stateMachineIds);

foreach (var (id, stateMachine) in _stateMachinesMap)
Expand Down Expand Up @@ -366,7 +384,15 @@ private void OnSetStateMachineId(string name, ulong id)
}
else
{
throw new InvalidOperationException($"State machine \"{name}\" (id: {id}) has not been registered on this state machine manager.");
// When a state machine is not found, we substitute it with a DurableNothing.
// It will be purged on the next snapshot.

LogRetiredStateMachineDetected(_logger, name, id);

_stateMachinesMap[id] = new DurableNothing(name, this);
_hasStateMachineToRetire = true;

// No need to call Reset, as DurableNothing does nothing.
}
}
}
Expand Down Expand Up @@ -452,4 +478,14 @@ private sealed class StateMachineManagerState(
Level = LogLevel.Error,
Message = "Error processing work items.")]
private static partial void LogErrorProcessingWorkItems(ILogger logger, Exception exception);

[LoggerMessage(
Level = LogLevel.Information,
Message = "State machine \"{Name}\" (id: {Id}) not found. Substituting a placeholder for graceful retirement.")]
private static partial void LogRetiredStateMachineDetected(ILogger logger, string name, ulong id);

[LoggerMessage(
Level = LogLevel.Information,
Message = "Purging retired state machine \"{Name}\" (id: {Id}).")]
private static partial void LogPurgingRetiredStateMachine(ILogger logger, string name, ulong id);
}
61 changes: 61 additions & 0 deletions test/Orleans.Journaling.Tests/StateMachineManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,65 @@ public async Task StateMachineManager_LargeStateRecovery_Test()
Assert.Equal($"Value {i}", recoveredDict[i]);
}
}

/// <summary>
/// Tests that a "retired" state machine (one that is no longer registered)
/// has its data (and itself) purged when the storage triggers a compaction.
/// </summary>
[Fact]
public async Task StateMachineManager_RetiredStateMachine_IsPurgedOnCompaction()
{
const string DictToKeepKey = "dictToKeep";
const string DictToRetireKey = "dictToRetire";

var sut1 = CreateTestSystem();

// We beging with 2 dictionaries, one of which we will retire by means of not registering it in the manger.
// This would be in the real-world developers removing it from the grain's ctor as a dependecy.
var dictToKeep = new DurableDictionary<string, int>(DictToKeepKey, sut1.Manager, CodecProvider.GetCodec<string>(), CodecProvider.GetCodec<int>(), SessionPool);
var dictToRetire = new DurableDictionary<string, int>(DictToRetireKey, sut1.Manager, CodecProvider.GetCodec<string>(), CodecProvider.GetCodec<int>(), SessionPool);

await sut1.Lifecycle.OnStart();

dictToKeep.Add("a", 1);
dictToRetire.Add("b", 2);

await sut1.Manager.WriteStateAsync(CancellationToken.None);

var sut2 = CreateTestSystem(sut1.Storage);

// This time, we only register the dictionary we want to keep, this marks dictToRetire as retired.
var recoveredDictToKeep = new DurableDictionary<string, int>(DictToKeepKey, sut2.Manager, CodecProvider.GetCodec<string>(), CodecProvider.GetCodec<int>(), SessionPool);

await sut2.Lifecycle.OnStart();

// The manager should have recovered the state for dictToKeep, and created a DurableNothing placeholder for dictToRetire.
Assert.Equal(1, recoveredDictToKeep["a"]);

// Now, we trigger the compaction logic in VolatileStateMachineStorage by writing more than 10 times.
for (var i = 0; i < 11; i++)
{
recoveredDictToKeep["a"] = i;
await sut2.Manager.WriteStateAsync(CancellationToken.None);
}

// At this point, the manager has performed a snapshot, so it should have purged the dictToRetire data.
var sut3 = CreateTestSystem(sut2.Storage);

// So by registering both dictionaries again, we should see what state remains after the snapshot.
var finalDictToKeep = new DurableDictionary<string, int>(DictToKeepKey, sut3.Manager, CodecProvider.GetCodec<string>(), CodecProvider.GetCodec<int>(), SessionPool);
var finalDictToRetire = new DurableDictionary<string, int>(DictToRetireKey, sut3.Manager, CodecProvider.GetCodec<string>(), CodecProvider.GetCodec<int>(), SessionPool);

await sut3.Lifecycle.OnStart();

// The dictionary we kept should have the last value written to it.
Assert.Equal(10, finalDictToKeep["a"]); // The last value from the i=0..10 loop.

// The retired dictionary should now be empty because its state was purged during the compaction.
// Note that this is a new version of dictToRetire, since that is removed as a state machine, idea here
// is that if we can register a new dictToRetire with the same key, it means that the machine itself has been removed
// but also the data, otherwise previous machine would have had at least one entry i.e. ["b", 2]. The removal of the machine
// itself has also the nice benefit of being able to reuse old machine names.
Assert.Empty(finalDictToRetire);
}
}
Loading