diff --git a/Orleans.slnx b/Orleans.slnx
index 2af4d7d21aa..1e1fdaa1730 100644
--- a/Orleans.slnx
+++ b/Orleans.slnx
@@ -84,7 +84,7 @@
-
+
@@ -133,7 +133,7 @@
-
+
diff --git a/src/Orleans.Journaling/DurableDictionary.cs b/src/Orleans.Journaling/DurableDictionary.cs
index 1883d1c70fd..2a6c2d8ddc7 100644
--- a/src/Orleans.Journaling/DurableDictionary.cs
+++ b/src/Orleans.Journaling/DurableDictionary.cs
@@ -211,10 +211,10 @@ private void ApplySet(K key, V value)
OnSet(key, value);
}
- private bool ApplyRemove(K key) => _items.Remove(key);
+ internal bool ApplyRemove(K key) => _items.Remove(key);
private void ApplyClear() => _items.Clear();
- private IStateMachineLogWriter GetStorage()
+ protected virtual IStateMachineLogWriter GetStorage()
{
Debug.Assert(_storage is not null);
return _storage;
diff --git a/src/Orleans.Journaling/DurableNothing.cs b/src/Orleans.Journaling/DurableNothing.cs
index b9336e93d71..6cf3d096a37 100644
--- a/src/Orleans.Journaling/DurableNothing.cs
+++ b/src/Orleans.Journaling/DurableNothing.cs
@@ -1,4 +1,4 @@
-using System.Buffers;
+using System.Buffers;
using Microsoft.Extensions.DependencyInjection;
namespace Orleans.Journaling;
diff --git a/src/Orleans.Journaling/HostingExtensions.cs b/src/Orleans.Journaling/HostingExtensions.cs
index 4eff8b3e93d..a6bcb2358ac 100644
--- a/src/Orleans.Journaling/HostingExtensions.cs
+++ b/src/Orleans.Journaling/HostingExtensions.cs
@@ -1,11 +1,14 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
+using Orleans.Configuration;
namespace Orleans.Journaling;
+
public static class HostingExtensions
{
public static ISiloBuilder AddStateMachineStorage(this ISiloBuilder builder)
{
+ builder.Services.AddOptions();
builder.Services.TryAddScoped(sp => sp.GetRequiredService().Create(sp.GetRequiredService()));
builder.Services.TryAddScoped();
builder.Services.TryAddKeyedScoped(typeof(IDurableDictionary<,>), KeyedService.AnyKey, typeof(DurableDictionary<,>));
diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs
index e5058c14bc1..288d2f4e44e 100644
--- a/src/Orleans.Journaling/StateMachineManager.cs
+++ b/src/Orleans.Journaling/StateMachineManager.cs
@@ -1,7 +1,9 @@
using System.Buffers;
+using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using Orleans.Runtime.Internal;
using Orleans.Serialization.Codecs;
using Orleans.Serialization.Session;
@@ -13,15 +15,19 @@ internal sealed partial class StateMachineManager : IStateMachineManager, ILifec
private const int MinApplicationStateMachineId = 8;
private static readonly StringCodec StringCodec = new();
private static readonly UInt64Codec UInt64Codec = new();
+ private static readonly DateTimeCodec DateTimeCodec = new();
private readonly object _lock = new();
private readonly Dictionary _stateMachines = new(StringComparer.Ordinal);
private readonly Dictionary _stateMachinesMap = [];
private readonly IStateMachineStorage _storage;
private readonly ILogger _logger;
+ private readonly TimeProvider _timeProvider;
private readonly SingleWaiterAutoResetEvent _workSignal = new() { RunContinuationsAsynchronously = true };
private readonly Queue _workQueue = new();
private readonly CancellationTokenSource _shutdownCancellation = new();
private readonly StateMachineManagerState _stateMachineIds;
+ private readonly StateMachinesRetirementTracker _retirementTracker;
+ private readonly TimeSpan _retirementGracePeriod;
private Task? _workLoop;
private ManagerState _state;
private Task? _pendingWrite;
@@ -31,25 +37,59 @@ internal sealed partial class StateMachineManager : IStateMachineManager, ILifec
public StateMachineManager(
IStateMachineStorage storage,
ILogger logger,
- SerializerSessionPool serializerSessionPool)
+ IOptions options,
+ SerializerSessionPool serializerSessionPool,
+ TimeProvider timeProvider)
{
_storage = storage;
_logger = logger;
+ _timeProvider = timeProvider;
+ _retirementGracePeriod = options.Value.RetirementGracePeriod;
// The list of known state machines is itself stored as a durable state machine with the implicit id 0.
// This allows us to recover the list of state machines ids without having to store it separately.
_stateMachineIds = new StateMachineManagerState(this, StringCodec, UInt64Codec, serializerSessionPool);
- _stateMachinesMap[0] = _stateMachineIds;
+ _stateMachinesMap[StateMachineManagerState.Id] = _stateMachineIds;
+
+ // The retirement tracker is a special internal state machine with a fixed id.
+ // It is not stored in _stateMachineIds and does not participate in the general name->id mapping.
+ _retirementTracker = new StateMachinesRetirementTracker(this, StringCodec, DateTimeCodec, serializerSessionPool);
+ _stateMachinesMap[StateMachinesRetirementTracker.Id] = _retirementTracker;
}
public void RegisterStateMachine(string name, IDurableStateMachine stateMachine)
{
- _shutdownCancellation.Token.ThrowIfCancellationRequested();
ArgumentNullException.ThrowIfNullOrEmpty(name);
+ _shutdownCancellation.Token.ThrowIfCancellationRequested();
lock (_lock)
{
- _stateMachines.Add(name, stateMachine);
+ if (_stateMachines.TryGetValue(name, out var machine))
+ {
+ if (machine is RetiredStateMachineVessel vessel)
+ {
+ // If the existing machine is a vessel for a retired one, it means the machine was loaded from a previous
+ // log during recovery but has not been re-registered. We effectively are "staging" the resurrection of the machine.
+ // The removal from the tracker is handled within the serialized loop. This is to prevent logical race conditions with the recovery process.
+ // We also make sure to apply any buffered data that could have occured while the vessel took this machine's place.
+ stateMachine.Reset(new StateMachineLogWriter(this, new(_stateMachineIds[name])));
+ foreach (var entry in vessel.BufferedData)
+ {
+ stateMachine.Apply(new ReadOnlySequence(entry));
+ }
+ _stateMachines[name] = stateMachine;
+ }
+ else
+ {
+ // A real state machine is already registered with this name, this must be a developer error.
+ throw new ArgumentException($"A state machine with the key '{name}' has already been registered.");
+ }
+ }
+ else
+ {
+ _stateMachines.Add(name, stateMachine);
+ }
+
_workQueue.Enqueue(new WorkItem(WorkItemType.RegisterStateMachine, completion: null)
{
Context = name
@@ -125,28 +165,32 @@ private async Task WorkLoop()
// If the current log length is greater than the snapshot size, then take a snapshot instead of appending more log entries.
var isSnapshot = workItem.Type is WorkItemType.WriteSnapshot;
LogExtentBuilder? logSegment;
+
lock (_lock)
{
- if (isSnapshot && _currentLogSegment is { } existingSegment)
+ if (isSnapshot)
{
// If there are pending writes, reset them since they will be captured by the snapshot instead.
// If we did not do this, the log would begin with some writes which would be followed by a snapshot which also included those writes.
- existingSegment.Reset();
- }
- else
- {
- _currentLogSegment ??= new();
+ _currentLogSegment?.Reset();
+
+ if (_retirementTracker.Count > 0)
+ {
+ RetireOrResurectStateMachines();
+ }
}
+ _currentLogSegment ??= new();
+
// The map of state machine ids is itself stored as a durable state machine with the id 0.
// This must be stored first, since it includes the identities of all other state machines, which are needed when replaying the log.
+ // If we removed retired machines, this snapshot will persist that change.
AppendUpdatesOrSnapshotStateMachine(_currentLogSegment, isSnapshot, 0, _stateMachineIds);
foreach (var (id, stateMachine) in _stateMachinesMap)
{
if (id is 0 || stateMachine is null)
{
- // Skip state machines which have been removed.
continue;
}
@@ -256,6 +300,39 @@ private async Task WorkLoop()
}
}
+ private void RetireOrResurectStateMachines()
+ {
+ foreach (var (name, timestamp) in _retirementTracker)
+ {
+ var isDuetime = _timeProvider.GetUtcNow().UtcDateTime - timestamp >= _retirementGracePeriod;
+ if (isDuetime && _stateMachineIds.TryGetValue(name, out var id))
+ {
+ var stateMachine = _stateMachines[name];
+
+ Debug.Assert(stateMachine is not null);
+
+ if (stateMachine is RetiredStateMachineVessel)
+ {
+ LogRemovingRetiredStateMachine(_logger, name);
+
+ // Since we are permanently removing this state machine, we will clean it up by reseting it.
+ stateMachine.Reset(new StateMachineLogWriter(this, new(id)));
+
+ _stateMachinesMap.Remove(id);
+ // We remove these from memory only, since the snapshot will persist these changes.
+ _stateMachineIds.ApplyRemove(name);
+ _retirementTracker.ApplyRemove(name);
+ }
+ else
+ {
+ LogRetiredStateMachineComebackDetected(_logger, name);
+ // We remove the tracker from memory only, since the snapshot will persist the change.
+ _retirementTracker.ApplyRemove(name);
+ }
+ }
+ }
+ }
+
private static void AppendUpdatesOrSnapshotStateMachine(LogExtentBuilder logSegment, bool isSnapshot, ulong id, IDurableStateMachine stateMachine)
{
var writer = logSegment.CreateLogWriter(new(id));
@@ -285,8 +362,12 @@ public async ValueTask DeleteStateAsync(CancellationToken cancellationToken)
private async Task RecoverAsync(CancellationToken cancellationToken)
{
- _stateMachineIds.ResetVolatileState();
- await foreach (var segment in _storage.ReadAsync(cancellationToken))
+ lock (_lock)
+ {
+ _stateMachineIds.ResetVolatileState();
+ }
+
+ await foreach (var segment in _storage.ReadAsync(cancellationToken).ConfigureAwait(false))
{
cancellationToken.ThrowIfCancellationRequested();
try
@@ -305,9 +386,18 @@ private async Task RecoverAsync(CancellationToken cancellationToken)
lock (_lock)
{
- foreach (var stateMachine in _stateMachines.Values)
+ foreach ((var name, var stateMachine) in _stateMachines)
{
stateMachine.OnRecoveryCompleted();
+
+ if (stateMachine is RetiredStateMachineVessel)
+ {
+ // We can use TryAdd since recovery has finished.
+ if (_retirementTracker.TryAdd(name, _timeProvider.GetUtcNow().UtcDateTime))
+ {
+ LogRetiredStateMachineDetected(_logger, name);
+ }
+ }
}
}
}
@@ -366,7 +456,13 @@ private void OnSetStateMachineId(string name, ulong id)
}
else
{
- throw new InvalidOperationException($"State machine \"{name}\" (id: {id}) has not been registered on this state machine manager.");
+ var vessel = new RetiredStateMachineVessel();
+
+ // We must not make the vessel self-register with the manager, since it will
+ // result in a late-registration after the manager is 'ready'. Instead we add it inline here.
+
+ _stateMachines.Add(name, vessel);
+ _stateMachinesMap[id] = vessel;
}
}
}
@@ -435,7 +531,7 @@ private enum WorkItemType
private enum ManagerState
{
Unknown,
- Ready,
+ Ready
}
private sealed class StateMachineManagerState(
@@ -444,15 +540,74 @@ private sealed class StateMachineManagerState(
IFieldCodec valueCodec,
SerializerSessionPool serializerSessionPool) : DurableDictionary(keyCodec, valueCodec, serializerSessionPool)
{
+ public const int Id = 0;
+
private readonly StateMachineManager _manager = manager;
- public void ResetVolatileState() => ((IDurableStateMachine)this).Reset(new StateMachineLogWriter(_manager, new(0)));
+ public void ResetVolatileState() => ((IDurableStateMachine)this).Reset(new StateMachineLogWriter(_manager, new(Id)));
protected override void OnSet(string key, ulong value) => _manager.OnSetStateMachineId(key, value);
}
+ ///
+ /// Used to track state machines that are not registered via user-code anymore, until time-based purging has elapsed.
+ ///
+ /// Resurrecting of retired machines is supported.
+ private sealed class StateMachinesRetirementTracker(
+ StateMachineManager manager, IFieldCodec keyCodec, IFieldCodec valueCodec, SerializerSessionPool sessionPool)
+ : DurableDictionary(keyCodec, valueCodec, sessionPool)
+ {
+ public const int Id = 1;
+
+ private readonly StateMachineLogWriter _logWriter = new(manager, new(Id));
+
+ protected override IStateMachineLogWriter GetStorage() => _logWriter;
+ }
+
+ ///
+ /// Used to keep retired machines into a purgatory state until time-based purging or if a comeback occurs.
+ /// This keeps buffering entries and dumps them back into the log upon compaction.
+ ///
+ [DebuggerDisplay(nameof(RetiredStateMachineVessel))]
+ private sealed class RetiredStateMachineVessel : IDurableStateMachine
+ {
+ private readonly List _bufferedData = [];
+
+ public ReadOnlyCollection BufferedData => _bufferedData.AsReadOnly();
+
+ void IDurableStateMachine.AppendSnapshot(StateMachineStorageWriter snapshotWriter)
+ {
+ foreach (var data in _bufferedData)
+ {
+ snapshotWriter.AppendEntry(data);
+ }
+ }
+
+ void IDurableStateMachine.Reset(IStateMachineLogWriter storage) => _bufferedData.Clear();
+ void IDurableStateMachine.Apply(ReadOnlySequence logEntry) => _bufferedData.Add(logEntry.ToArray());
+ void IDurableStateMachine.AppendEntries(StateMachineStorageWriter logWriter) { }
+ IDurableStateMachine IDurableStateMachine.DeepCopy() => throw new NotSupportedException();
+ }
+
+
[LoggerMessage(
Level = LogLevel.Error,
Message = "Error processing work items.")]
private static partial void LogErrorProcessingWorkItems(ILogger logger, Exception exception);
+
+ [LoggerMessage(
+ Level = LogLevel.Information,
+ Message = "State machine \"{Name}\" was not found. I have substituted a placeholder for graceful time-based retirement.")]
+ private static partial void LogRetiredStateMachineDetected(ILogger logger, string name);
+
+ [LoggerMessage(
+ Level = LogLevel.Information,
+ Message = "State machine \"{Name}\" was previously retired (but not removed), and has hence been re-introduced. " +
+ "There is still time left before its permanent removal, so I will resurrect it.")]
+ private static partial void LogRetiredStateMachineComebackDetected(ILogger logger, string name);
+
+ [LoggerMessage(
+ Level = LogLevel.Information,
+ Message = "Removing retired state machine \"{Name}\" and its data. Operation will be durably persisted shortly after compaction has finalized.")]
+ private static partial void LogRemovingRetiredStateMachine(ILogger logger, string name);
}
diff --git a/src/Orleans.Journaling/StateMachineManagerOptions.cs b/src/Orleans.Journaling/StateMachineManagerOptions.cs
new file mode 100644
index 00000000000..9a491a502a3
--- /dev/null
+++ b/src/Orleans.Journaling/StateMachineManagerOptions.cs
@@ -0,0 +1,26 @@
+namespace Orleans.Journaling;
+
+///
+/// Options to configure the .
+///
+public sealed class StateMachineManagerOptions
+{
+ ///
+ /// Specifies the period of time to wait until the manager retires
+ /// a if its not registered in the manager anymore.
+ ///
+ ///
+ /// The act of retirement removes this state machine from the log.
+ /// If the state machine is reintroduced (within the grace period), than it will not be removed by the manager.
+ ///
+ /// This value represents the minimum time the fate of the state machine will be postponed.
+ /// The final decision can take longer - usually + [time until next compaction occurs].
+ ///
+ ///
+ public TimeSpan RetirementGracePeriod { get; set; } = DEFAULT_RETIREMENT_GRACE_PERIOD;
+
+ ///
+ /// The default value of .
+ ///
+ public static readonly TimeSpan DEFAULT_RETIREMENT_GRACE_PERIOD = TimeSpan.FromDays(7);
+}
diff --git a/test/Orleans.Journaling.Tests/LogSegmentTests.cs b/test/Orleans.Journaling.Tests/LogSegmentTests.cs
index 2482e65f32d..d24f2d93412 100644
--- a/test/Orleans.Journaling.Tests/LogSegmentTests.cs
+++ b/test/Orleans.Journaling.Tests/LogSegmentTests.cs
@@ -48,6 +48,7 @@ public abstract class LogSegmentTests : IAsyncLifetime
private IServiceProvider _serviceProvider = null!;
private SiloLifecycleSubject? _siloLifecycle;
private IStateMachineStorageProvider _storageProvider = null!;
+ private static readonly IOptions ManagerOptions = Options.Create(new StateMachineManagerOptions());
public virtual async Task InitializeAsync()
{
@@ -85,7 +86,7 @@ public async Task DisposeAsync()
var codecProvider = _serviceProvider.GetRequiredService();
var grainContext = new TestGrainContext(grainId); // Use provided GrainId
var storage = _storageProvider.Create(grainContext);
- var manager = new StateMachineManager(storage, _serviceProvider.GetRequiredService>(), sessionPool);
+ var manager = new StateMachineManager(storage, _serviceProvider.GetRequiredService>(), ManagerOptions, sessionPool, TimeProvider.System);
var list = new DurableList(listName, manager, codecProvider.GetCodec(), sessionPool);
return (manager, list, storage);
}
@@ -143,7 +144,7 @@ public async Task DurableList_Persistence_Test()
var sessionPool = _serviceProvider.GetRequiredService();
var codecProvider = _serviceProvider.GetRequiredService();
- var manager2 = new StateMachineManager(storage, _serviceProvider.GetRequiredService>(), sessionPool);
+ var manager2 = new StateMachineManager(storage, _serviceProvider.GetRequiredService>(), ManagerOptions, sessionPool, TimeProvider.System);
var list2 = new DurableList(listName, manager2, codecProvider.GetCodec(), sessionPool);
await manager2.InitializeAsync(cts.Token);
@@ -342,7 +343,7 @@ public async Task DurableList_LargeNumberOfOperations_And_Snapshot_Test()
// Test recovery (potentially from snapshot)
var sessionPool = _serviceProvider.GetRequiredService();
var codecProvider = _serviceProvider.GetRequiredService();
- var manager2 = new StateMachineManager(storage, _serviceProvider.GetRequiredService>(), sessionPool); // Reuses the storage object linked via grainId
+ var manager2 = new StateMachineManager(storage, _serviceProvider.GetRequiredService>(), ManagerOptions, sessionPool, TimeProvider.System);// Reuses the storage object linked via grainId
var list2 = new DurableList(listName, manager2, codecProvider.GetCodec(), sessionPool);
await manager2.InitializeAsync(cts.Token);
diff --git a/test/Orleans.Journaling.Tests/Orleans.Journaling.Tests.csproj b/test/Orleans.Journaling.Tests/Orleans.Journaling.Tests.csproj
index db5f347c593..c6f90490f5e 100644
--- a/test/Orleans.Journaling.Tests/Orleans.Journaling.Tests.csproj
+++ b/test/Orleans.Journaling.Tests/Orleans.Journaling.Tests.csproj
@@ -15,6 +15,7 @@
+
diff --git a/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs b/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs
index 451b444a639..cf8ddae6931 100644
--- a/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs
+++ b/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Time.Testing;
using Xunit;
namespace Orleans.Journaling.Tests;
@@ -256,4 +257,125 @@ public async Task StateMachineManager_LargeStateRecovery_Test()
Assert.Equal($"Value {i}", recoveredDict[i]);
}
}
+
+ ///
+ /// Tests the full lifecycle of a retired state machine. It is preserved and also reintroduced through an
+ /// early compaction, but purged eventually after its grace period expires on later compactions.
+ ///
+ [Fact]
+ public async Task StateMachineManager_AutoRetiringStateMachines()
+ {
+ const string DictToKeepKey = "dictToKeep";
+ const string DictToRetireKey = "dictToRetire";
+
+ var period = ManagerOptions.RetirementGracePeriod;
+ var timeProvider = new FakeTimeProvider(DateTime.UtcNow);
+ var storage = CreateStorage();
+
+ // -------------- STEP 1 --------------
+
+ // We begin with 2 dictionaries, one of which we will retire by means of not registering it in the manager.
+ // This would be in the real-world developers removing it from the grain's ctor as a dependency.
+ var sut1 = CreateTestSystem(storage, timeProvider);
+ var dictToKeep1 = CreateTestMachine(DictToKeepKey, sut1.Manager);
+ var dictToRetire2 = CreateTestMachine(DictToRetireKey, sut1.Manager);
+
+ await sut1.Lifecycle.OnStart();
+
+ dictToKeep1.Add("a", 1);
+ dictToRetire2.Add("b", 1);
+
+ await sut1.Manager.WriteStateAsync(CancellationToken.None);
+
+ // -------------- STEP 2 --------------
+
+ // This time, we only register the dictionary we want to keep, this marks dictToRetire as retired.
+ var sut2 = CreateTestSystem(storage, timeProvider);
+ var dictToKeep2 = CreateTestMachine(DictToKeepKey, sut2.Manager);
+
+ await sut2.Lifecycle.OnStart();
+
+ // The manager should have recovered the state for dictToKeep,
+ // and created a DurableNothing placeholder for dictToRetire (we cant test for it at this point).
+ Assert.Equal(1, dictToKeep2["a"]);
+
+ // We advance time by half the grace period to see if we can save it from purging.
+ timeProvider.Advance(period / 2);
+
+ await TriggerCompaction(sut2.Manager, dictToKeep2);
+
+ // -------------- STEP 3 --------------
+
+ // Verify that the retired dictionary was NOT purged by this compaction, as only half the time has passed.
+ var sut3 = CreateTestSystem(storage, timeProvider);
+ var dictToKeep3 = CreateTestMachine(DictToKeepKey, sut3.Manager);
+ var dictToRetire3 = CreateTestMachine(DictToRetireKey, sut3.Manager);
+
+ await sut3.Lifecycle.OnStart();
+
+ Assert.Equal(10, dictToKeep3["a"]);
+
+ // The fact this entry ["b", 1] exists proves that the state of dictToRetire was preserved, even though we did not register it in step 2.
+ Assert.Equal(1, dictToRetire3["b"]);
+
+ // By advancing time by another half-period we cover the full period. But since we have re-introduced dictToRetire, we should have un-retired it.
+ // This is similar to step 2, but there we avoided purging due to time not being due, whereas here we avoid purging due to re-registration.
+ timeProvider.Advance(period / 2);
+
+ await TriggerCompaction(sut3.Manager, dictToKeep3);
+
+ // -------------- STEP 4 --------------
+
+ // Because of re-registration is step 3 (to test it was not purged), this means dictToRetire has been removed from the tracker.
+ // Again as in step 2, we only register the dictionary we want to keep, this marks dictToRetire as retired.
+ var sut4 = CreateTestSystem(storage, timeProvider);
+ var dictToKeep4 = CreateTestMachine(DictToKeepKey, sut4.Manager);
+
+ await sut4.Lifecycle.OnStart();
+
+ // The manager should have recovered the state for dictToKeep.
+ // It should have created a DurableNothing placeholder for dictToRetire, but we can not test for that.
+
+
+ // This time we advance time to cover the full period. Note that this is necessary because a side effect of step 3
+ // was that dictToRetire was removed from the tracker (since it came back), so just triggering a compaction won't cut it
+ // as time to retire will essentially be reset to "now".
+ timeProvider.Advance(period);
+
+ // This compaction should finally purge it.
+ await TriggerCompaction(sut4.Manager, dictToKeep4);
+
+ // -------------- STEP 5 --------------
+
+ // At this point, the manager has performed a snapshot, so it should have purged the dictToRetire data.
+ // By registering both dictionaries again, we should see what state remains after the snapshot.
+ var sut5 = CreateTestSystem(storage, timeProvider);
+ var dictToKeep5 = CreateTestMachine(DictToKeepKey, sut5.Manager);
+ var dictToRetire5 = CreateTestMachine(DictToRetireKey, sut5.Manager);
+
+ await sut5.Lifecycle.OnStart();
+ Assert.Equal(10, dictToKeep5["a"]);
+
+ // The retired dictionary should now be empty because its state was purged during the compaction.
+ // Note that this is a new version of dictToRetire, since the original was removed. Idea here is
+ // that if we can register a new dictToRetire (with the same key), it means that the machine itself
+ // has been removed but also the data, otherwise a previous machine would have had at least one
+ // entry i.e. ["b", 1].
+
+ Assert.Empty(dictToRetire5);
+
+ // Note: The retirement of state machines has the nice benefit of being able to reuse machine names.
+
+ DurableDictionary CreateTestMachine(string key, IStateMachineManager manager) =>
+ new(key, manager, CodecProvider.GetCodec(), CodecProvider.GetCodec(), SessionPool);
+
+ static async Task TriggerCompaction(IStateMachineManager manager, DurableDictionary dict)
+ {
+ for (var i = 0; i < 11; i++)
+ {
+ dict["a"] = i;
+ await manager.WriteStateAsync(CancellationToken.None);
+ }
+ }
+ }
}
diff --git a/test/Orleans.Journaling.Tests/StateMachineTestBase.cs b/test/Orleans.Journaling.Tests/StateMachineTestBase.cs
index 78adb0435ac..5e94f605be5 100644
--- a/test/Orleans.Journaling.Tests/StateMachineTestBase.cs
+++ b/test/Orleans.Journaling.Tests/StateMachineTestBase.cs
@@ -1,9 +1,11 @@
using System.Collections.Immutable;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using Orleans.Serialization;
using Orleans.Serialization.Serializers;
using Orleans.Serialization.Session;
+using Microsoft.Extensions.Time.Testing;
namespace Orleans.Journaling.Tests;
@@ -16,10 +18,12 @@ public abstract class StateMachineTestBase
protected readonly SerializerSessionPool SessionPool;
protected readonly ICodecProvider CodecProvider;
protected readonly ILoggerFactory LoggerFactory;
+ protected readonly StateMachineManagerOptions ManagerOptions = new();
protected StateMachineTestBase()
{
var services = new ServiceCollection();
+
services.AddSerializer();
services.AddLogging(builder => builder.AddConsole());
@@ -40,11 +44,14 @@ protected virtual IStateMachineStorage CreateStorage()
///
/// Creates a state machine manager with in-memory storage
///
- internal (IStateMachineManager Manager, IStateMachineStorage Storage, ILifecycleSubject Lifecycle) CreateTestSystem(IStateMachineStorage? storage = null)
+ internal (IStateMachineManager Manager, IStateMachineStorage Storage, ILifecycleSubject Lifecycle)
+ CreateTestSystem(IStateMachineStorage? storage = null, TimeProvider? provider = null)
{
storage ??= CreateStorage();
+ provider ??= TimeProvider.System;
+
var logger = LoggerFactory.CreateLogger();
- var manager = new StateMachineManager(storage, logger, SessionPool);
+ var manager = new StateMachineManager(storage, logger, Options.Create(ManagerOptions), SessionPool, provider);
var lifecycle = new GrainLifecycle(LoggerFactory.CreateLogger());
(manager as ILifecycleParticipant)?.Participate(lifecycle);
return (manager, storage, lifecycle);