From 9a967f546f6b09e35ca953c8f2fa9485234a1fc2 Mon Sep 17 00:00:00 2001 From: Ledjon Behluli Date: Wed, 17 Sep 2025 23:13:35 +0200 Subject: [PATCH 01/34] retire and purge state machines --- src/Orleans.Journaling/DurableNothing.cs | 14 ++--- src/Orleans.Journaling/HostingExtensions.cs | 2 +- src/Orleans.Journaling/StateMachineManager.cs | 50 ++++++++++++--- .../StateMachineManagerTests.cs | 61 +++++++++++++++++++ 4 files changed, 111 insertions(+), 16 deletions(-) diff --git a/src/Orleans.Journaling/DurableNothing.cs b/src/Orleans.Journaling/DurableNothing.cs index b9336e93d71..624457740bd 100644 --- a/src/Orleans.Journaling/DurableNothing.cs +++ b/src/Orleans.Journaling/DurableNothing.cs @@ -1,4 +1,5 @@ -using System.Buffers; +using System.Buffers; +using System.Diagnostics; using Microsoft.Extensions.DependencyInjection; namespace Orleans.Journaling; @@ -6,19 +7,16 @@ namespace Orleans.Journaling; /// /// A durable object which does nothing, used for retiring other durable types. /// -public interface IDurableNothing +[DebuggerDisplay("DurableNothing")] +internal sealed class DurableNothing : IDurableStateMachine { -} + public string StateMachineKey { get; } -/// -/// A durable object which does nothing, used for retiring other durable types. -/// -internal sealed class DurableNothing : IDurableNothing, IDurableStateMachine -{ public DurableNothing([ServiceKey] string key, IStateMachineManager manager) { ArgumentNullException.ThrowIfNullOrEmpty(key); manager.RegisterStateMachine(key, this); + StateMachineKey = key; } void IDurableStateMachine.Reset(IStateMachineLogWriter storage) { } diff --git a/src/Orleans.Journaling/HostingExtensions.cs b/src/Orleans.Journaling/HostingExtensions.cs index 4eff8b3e93d..6d55a115ee3 100644 --- a/src/Orleans.Journaling/HostingExtensions.cs +++ b/src/Orleans.Journaling/HostingExtensions.cs @@ -2,6 +2,7 @@ using Microsoft.Extensions.DependencyInjection.Extensions; namespace Orleans.Journaling; + public static class HostingExtensions { public static ISiloBuilder AddStateMachineStorage(this ISiloBuilder builder) @@ -15,7 +16,6 @@ public static ISiloBuilder AddStateMachineStorage(this ISiloBuilder builder) builder.Services.TryAddKeyedScoped(typeof(IDurableValue<>), KeyedService.AnyKey, typeof(DurableValue<>)); builder.Services.TryAddKeyedScoped(typeof(IPersistentState<>), KeyedService.AnyKey, typeof(DurableState<>)); builder.Services.TryAddKeyedScoped(typeof(IDurableTaskCompletionSource<>), KeyedService.AnyKey, typeof(DurableTaskCompletionSource<>)); - builder.Services.TryAddKeyedScoped(typeof(IDurableNothing), KeyedService.AnyKey, typeof(DurableNothing)); return builder; } } diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs index 6f62ab9d936..ae08e40bd85 100644 --- a/src/Orleans.Journaling/StateMachineManager.cs +++ b/src/Orleans.Journaling/StateMachineManager.cs @@ -25,6 +25,7 @@ internal sealed partial class StateMachineManager : IStateMachineManager, ILifec private readonly Task _workLoop; private ManagerState _state; private Task? _pendingWrite; + private bool _hasStateMachineToRetire; private ulong _nextStateMachineId = MinApplicationStateMachineId; private LogExtentBuilder? _currentLogSegment; @@ -125,21 +126,38 @@ private async Task WorkLoop() // If the current log length is greater than the snapshot size, then take a snapshot instead of appending more log entries. var isSnapshot = workItem.Type is WorkItemType.WriteSnapshot; LogExtentBuilder? logSegment; + lock (_lock) { - if (isSnapshot && _currentLogSegment is { } existingSegment) + if (isSnapshot) { // If there are pending writes, reset them since they will be captured by the snapshot instead. // If we did not do this, the log would begin with some writes which would be followed by a snapshot which also included those writes. - existingSegment.Reset(); - } - else - { - _currentLogSegment ??= new(); + _currentLogSegment?.Reset(); + + if (_hasStateMachineToRetire) // We use this flag because the majooority of times, there wont be any state machine to retire. + { + // Since this is a snapshot, we use the opportunity to purge retired state machines. + foreach (var (id, machine) in _stateMachinesMap) + { + if (machine is DurableNothing retired) + { + var name = retired.StateMachineKey; + + LogPurgingRetiredStateMachine(_logger, name, id); + + _stateMachinesMap.Remove(id); + _stateMachineIds.Remove(name); // This will take effect when the snapshot is persisted below. + } + } + } } + _currentLogSegment ??= new(); + // The map of state machine ids is itself stored as a durable state machine with the id 0. // This must be stored first, since it includes the identities of all other state machines, which are needed when replaying the log. + // If we removed retired machines, this snapshot will persist that change. AppendUpdatesOrSnapshotStateMachine(_currentLogSegment, isSnapshot, 0, _stateMachineIds); foreach (var (id, stateMachine) in _stateMachinesMap) @@ -366,7 +384,15 @@ private void OnSetStateMachineId(string name, ulong id) } else { - throw new InvalidOperationException($"State machine \"{name}\" (id: {id}) has not been registered on this state machine manager."); + // When a state machine is not found, we substitute it with a DurableNothing. + // It will be purged on the next snapshot. + + LogRetiredStateMachineDetected(_logger, name, id); + + _stateMachinesMap[id] = new DurableNothing(name, this); + _hasStateMachineToRetire = true; + + // No need to call Reset, as DurableNothing does nothing. } } } @@ -452,4 +478,14 @@ private sealed class StateMachineManagerState( Level = LogLevel.Error, Message = "Error processing work items.")] private static partial void LogErrorProcessingWorkItems(ILogger logger, Exception exception); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "State machine \"{Name}\" (id: {Id}) not found. Substituting a placeholder for graceful retirement.")] + private static partial void LogRetiredStateMachineDetected(ILogger logger, string name, ulong id); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Purging retired state machine \"{Name}\" (id: {Id}).")] + private static partial void LogPurgingRetiredStateMachine(ILogger logger, string name, ulong id); } diff --git a/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs b/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs index 451b444a639..54b6ae6858e 100644 --- a/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs +++ b/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs @@ -256,4 +256,65 @@ public async Task StateMachineManager_LargeStateRecovery_Test() Assert.Equal($"Value {i}", recoveredDict[i]); } } + + /// + /// Tests that a "retired" state machine (one that is no longer registered) + /// has its data (and itself) purged when the storage triggers a compaction. + /// + [Fact] + public async Task StateMachineManager_RetiredStateMachine_IsPurgedOnCompaction() + { + const string DictToKeepKey = "dictToKeep"; + const string DictToRetireKey = "dictToRetire"; + + var sut1 = CreateTestSystem(); + + // We beging with 2 dictionaries, one of which we will retire by means of not registering it in the manger. + // This would be in the real-world developers removing it from the grain's ctor as a dependecy. + var dictToKeep = new DurableDictionary(DictToKeepKey, sut1.Manager, CodecProvider.GetCodec(), CodecProvider.GetCodec(), SessionPool); + var dictToRetire = new DurableDictionary(DictToRetireKey, sut1.Manager, CodecProvider.GetCodec(), CodecProvider.GetCodec(), SessionPool); + + await sut1.Lifecycle.OnStart(); + + dictToKeep.Add("a", 1); + dictToRetire.Add("b", 2); + + await sut1.Manager.WriteStateAsync(CancellationToken.None); + + var sut2 = CreateTestSystem(sut1.Storage); + + // This time, we only register the dictionary we want to keep, this marks dictToRetire as retired. + var recoveredDictToKeep = new DurableDictionary(DictToKeepKey, sut2.Manager, CodecProvider.GetCodec(), CodecProvider.GetCodec(), SessionPool); + + await sut2.Lifecycle.OnStart(); + + // The manager should have recovered the state for dictToKeep, and created a DurableNothing placeholder for dictToRetire. + Assert.Equal(1, recoveredDictToKeep["a"]); + + // Now, we trigger the compaction logic in VolatileStateMachineStorage by writing more than 10 times. + for (var i = 0; i < 11; i++) + { + recoveredDictToKeep["a"] = i; + await sut2.Manager.WriteStateAsync(CancellationToken.None); + } + + // At this point, the manager has performed a snapshot, so it should have purged the dictToRetire data. + var sut3 = CreateTestSystem(sut2.Storage); + + // So by registering both dictionaries again, we should see what state remains after the snapshot. + var finalDictToKeep = new DurableDictionary(DictToKeepKey, sut3.Manager, CodecProvider.GetCodec(), CodecProvider.GetCodec(), SessionPool); + var finalDictToRetire = new DurableDictionary(DictToRetireKey, sut3.Manager, CodecProvider.GetCodec(), CodecProvider.GetCodec(), SessionPool); + + await sut3.Lifecycle.OnStart(); + + // The dictionary we kept should have the last value written to it. + Assert.Equal(10, finalDictToKeep["a"]); // The last value from the i=0..10 loop. + + // The retired dictionary should now be empty because its state was purged during the compaction. + // Note that this is a new version of dictToRetire, since that is removed as a state machine, idea here + // is that if we can register a new dictToRetire with the same key, it means that the machine itself has been removed + // but also the data, otherwise previous machine would have had at least one entry i.e. ["b", 2]. The removal of the machine + // itself has also the nice benefit of being able to reuse old machine names. + Assert.Empty(finalDictToRetire); + } } From 86cea8244b66c073e46e4c7a3530b37195b34a9f Mon Sep 17 00:00:00 2001 From: Ledjon Behluli <46324828+ledjon-behluli@users.noreply.github.com> Date: Thu, 18 Sep 2025 00:02:19 +0200 Subject: [PATCH 02/34] Update src/Orleans.Journaling/StateMachineManager.cs Co-authored-by: Reuben Bond <203839+ReubenBond@users.noreply.github.com> --- src/Orleans.Journaling/StateMachineManager.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs index ae08e40bd85..c8d4373395c 100644 --- a/src/Orleans.Journaling/StateMachineManager.cs +++ b/src/Orleans.Journaling/StateMachineManager.cs @@ -135,7 +135,7 @@ private async Task WorkLoop() // If we did not do this, the log would begin with some writes which would be followed by a snapshot which also included those writes. _currentLogSegment?.Reset(); - if (_hasStateMachineToRetire) // We use this flag because the majooority of times, there wont be any state machine to retire. + if (_hasStateMachineToRetire) // We use this flag because the majority of times, there won't be any state machine to retire. { // Since this is a snapshot, we use the opportunity to purge retired state machines. foreach (var (id, machine) in _stateMachinesMap) From 5a480516b967d77eda15427957466e23e9f2a681 Mon Sep 17 00:00:00 2001 From: Ledjon Behluli Date: Thu, 18 Sep 2025 00:11:05 +0200 Subject: [PATCH 03/34] move comments up --- src/Orleans.Journaling/StateMachineManager.cs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs index c8d4373395c..cb8b7296489 100644 --- a/src/Orleans.Journaling/StateMachineManager.cs +++ b/src/Orleans.Journaling/StateMachineManager.cs @@ -135,7 +135,8 @@ private async Task WorkLoop() // If we did not do this, the log would begin with some writes which would be followed by a snapshot which also included those writes. _currentLogSegment?.Reset(); - if (_hasStateMachineToRetire) // We use this flag because the majority of times, there won't be any state machine to retire. + // We use this flag because the majority of times, there won't be any state machine to retire. + if (_hasStateMachineToRetire) { // Since this is a snapshot, we use the opportunity to purge retired state machines. foreach (var (id, machine) in _stateMachinesMap) @@ -147,7 +148,9 @@ private async Task WorkLoop() LogPurgingRetiredStateMachine(_logger, name, id); _stateMachinesMap.Remove(id); - _stateMachineIds.Remove(name); // This will take effect when the snapshot is persisted below. + + // This will take effect when the snapshot is persisted below. + _stateMachineIds.Remove(name); } } } From b64b9a8e45fa2618a3f59cd06a33ea0aafda6cf5 Mon Sep 17 00:00:00 2001 From: Ledjon Behluli Date: Thu, 18 Sep 2025 21:47:50 +0200 Subject: [PATCH 04/34] wip --- src/Orleans.Journaling/DurableNothing.cs | 24 ++++++++++++++----- src/Orleans.Journaling/StateMachineManager.cs | 13 ++++++---- .../StateMachineManagerOptions.cs | 16 +++++++++++++ 3 files changed, 43 insertions(+), 10 deletions(-) create mode 100644 src/Orleans.Journaling/StateMachineManagerOptions.cs diff --git a/src/Orleans.Journaling/DurableNothing.cs b/src/Orleans.Journaling/DurableNothing.cs index 624457740bd..75329698e28 100644 --- a/src/Orleans.Journaling/DurableNothing.cs +++ b/src/Orleans.Journaling/DurableNothing.cs @@ -5,27 +5,39 @@ namespace Orleans.Journaling; /// -/// A durable object which does nothing, used for retiring other durable types. +/// A durable object which does nothing but preserves the raw state of a retired durable type. /// [DebuggerDisplay("DurableNothing")] internal sealed class DurableNothing : IDurableStateMachine { + private readonly List _bufferedData = []; + public string StateMachineKey { get; } + public DateTime RetirementTimestamp { get; } - public DurableNothing([ServiceKey] string key, IStateMachineManager manager) + public DurableNothing([ServiceKey] string key, DateTime retirementTimestamp, IStateMachineManager manager) { ArgumentNullException.ThrowIfNullOrEmpty(key); + manager.RegisterStateMachine(key, this); + StateMachineKey = key; + RetirementTimestamp = retirementTimestamp; } - void IDurableStateMachine.Reset(IStateMachineLogWriter storage) { } - - void IDurableStateMachine.Apply(ReadOnlySequence logEntry) { } + void IDurableStateMachine.Reset(IStateMachineLogWriter storage) => _bufferedData.Clear(); + void IDurableStateMachine.Apply(ReadOnlySequence logEntry) => _bufferedData.Add(logEntry.ToArray()); void IDurableStateMachine.AppendEntries(StateMachineStorageWriter logWriter) { } - void IDurableStateMachine.AppendSnapshot(StateMachineStorageWriter snapshotWriter) { } + void IDurableStateMachine.AppendSnapshot(StateMachineStorageWriter snapshotWriter) + { + // When a snapshot is created, we write the preserved raw data back out. + foreach (var data in _bufferedData) + { + snapshotWriter.AppendEntry(data); + } + } public IDurableStateMachine DeepCopy() => this; } diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs index cb8b7296489..81f36513afd 100644 --- a/src/Orleans.Journaling/StateMachineManager.cs +++ b/src/Orleans.Journaling/StateMachineManager.cs @@ -2,6 +2,7 @@ using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using Orleans.Runtime.Internal; using Orleans.Serialization.Codecs; using Orleans.Serialization.Session; @@ -18,6 +19,7 @@ internal sealed partial class StateMachineManager : IStateMachineManager, ILifec private readonly Dictionary _stateMachinesMap = []; private readonly IStateMachineStorage _storage; private readonly ILogger _logger; + private readonly TimeProvider _timeProvider; private readonly SingleWaiterAutoResetEvent _workSignal = new() { RunContinuationsAsynchronously = true }; private readonly Queue _workQueue = new(); private readonly CancellationTokenSource _shutdownCancellation = new(); @@ -26,16 +28,21 @@ internal sealed partial class StateMachineManager : IStateMachineManager, ILifec private ManagerState _state; private Task? _pendingWrite; private bool _hasStateMachineToRetire; + private TimeSpan _retirementGracePeriod; private ulong _nextStateMachineId = MinApplicationStateMachineId; private LogExtentBuilder? _currentLogSegment; public StateMachineManager( IStateMachineStorage storage, ILogger logger, - SerializerSessionPool serializerSessionPool) + IOptions options, + SerializerSessionPool serializerSessionPool, + TimeProvider timeProvider) { _storage = storage; _logger = logger; + _timeProvider = timeProvider; + _retirementGracePeriod = options.Value.RetirementGracePeriod; // The list of known state machines is itself stored as a durable state machine with the implicit id 0. // This allows us to recover the list of state machines ids without having to store it separately. @@ -392,10 +399,8 @@ private void OnSetStateMachineId(string name, ulong id) LogRetiredStateMachineDetected(_logger, name, id); - _stateMachinesMap[id] = new DurableNothing(name, this); + _stateMachinesMap[id] = new DurableNothing(name, _timeProvider.GetUtcNow().UtcDateTime, this); _hasStateMachineToRetire = true; - - // No need to call Reset, as DurableNothing does nothing. } } } diff --git a/src/Orleans.Journaling/StateMachineManagerOptions.cs b/src/Orleans.Journaling/StateMachineManagerOptions.cs new file mode 100644 index 00000000000..1c609601cbf --- /dev/null +++ b/src/Orleans.Journaling/StateMachineManagerOptions.cs @@ -0,0 +1,16 @@ +namespace Orleans.Journaling; + +public sealed class StateMachineManagerOptions +{ + /// + /// Specifies the period of time to wait until the manager retires + /// a if its not registered in the manager anymore. + /// + /// The act of retirement removes this state machine and its data is purged. + public TimeSpan RetirementGracePeriod { get; set; } + + /// + /// The default value of . + /// + public static readonly TimeSpan DEFAULT_RETIREMENT_GRACE_PERIOD = TimeSpan.FromHours(1); +} From 699dca287c855a7133427a7f31cd908e516ede81 Mon Sep 17 00:00:00 2001 From: Ledjon Behluli Date: Thu, 18 Sep 2025 21:47:50 +0200 Subject: [PATCH 05/34] time-based retirement logic --- Orleans.slnx | 4 +- src/Orleans.Journaling/DurableDictionary.cs | 4 +- src/Orleans.Journaling/DurableNothing.cs | 12 +- src/Orleans.Journaling/HostingExtensions.cs | 3 + src/Orleans.Journaling/StateMachineManager.cs | 160 +++++++++++++----- .../StateMachineManagerOptions.cs | 26 +++ test/ConsoleApp5/ConsoleApp5.csproj | 15 ++ test/ConsoleApp5/Program.cs | 65 +++++++ .../LogSegmentTests.cs | 7 +- .../Orleans.Journaling.Tests.csproj | 1 + .../StateMachineManagerTests.cs | 148 ++++++++++++---- .../StateMachineTestBase.cs | 11 +- 12 files changed, 370 insertions(+), 86 deletions(-) create mode 100644 src/Orleans.Journaling/StateMachineManagerOptions.cs create mode 100644 test/ConsoleApp5/ConsoleApp5.csproj create mode 100644 test/ConsoleApp5/Program.cs diff --git a/Orleans.slnx b/Orleans.slnx index 2af4d7d21aa..1e1fdaa1730 100644 --- a/Orleans.slnx +++ b/Orleans.slnx @@ -84,7 +84,7 @@ - + @@ -133,7 +133,7 @@ - + diff --git a/src/Orleans.Journaling/DurableDictionary.cs b/src/Orleans.Journaling/DurableDictionary.cs index 1883d1c70fd..2a6c2d8ddc7 100644 --- a/src/Orleans.Journaling/DurableDictionary.cs +++ b/src/Orleans.Journaling/DurableDictionary.cs @@ -211,10 +211,10 @@ private void ApplySet(K key, V value) OnSet(key, value); } - private bool ApplyRemove(K key) => _items.Remove(key); + internal bool ApplyRemove(K key) => _items.Remove(key); private void ApplyClear() => _items.Clear(); - private IStateMachineLogWriter GetStorage() + protected virtual IStateMachineLogWriter GetStorage() { Debug.Assert(_storage is not null); return _storage; diff --git a/src/Orleans.Journaling/DurableNothing.cs b/src/Orleans.Journaling/DurableNothing.cs index 624457740bd..6cf3d096a37 100644 --- a/src/Orleans.Journaling/DurableNothing.cs +++ b/src/Orleans.Journaling/DurableNothing.cs @@ -1,5 +1,4 @@ using System.Buffers; -using System.Diagnostics; using Microsoft.Extensions.DependencyInjection; namespace Orleans.Journaling; @@ -7,16 +6,19 @@ namespace Orleans.Journaling; /// /// A durable object which does nothing, used for retiring other durable types. /// -[DebuggerDisplay("DurableNothing")] -internal sealed class DurableNothing : IDurableStateMachine +public interface IDurableNothing { - public string StateMachineKey { get; } +} +/// +/// A durable object which does nothing, used for retiring other durable types. +/// +internal sealed class DurableNothing : IDurableNothing, IDurableStateMachine +{ public DurableNothing([ServiceKey] string key, IStateMachineManager manager) { ArgumentNullException.ThrowIfNullOrEmpty(key); manager.RegisterStateMachine(key, this); - StateMachineKey = key; } void IDurableStateMachine.Reset(IStateMachineLogWriter storage) { } diff --git a/src/Orleans.Journaling/HostingExtensions.cs b/src/Orleans.Journaling/HostingExtensions.cs index 6d55a115ee3..a6bcb2358ac 100644 --- a/src/Orleans.Journaling/HostingExtensions.cs +++ b/src/Orleans.Journaling/HostingExtensions.cs @@ -1,5 +1,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Orleans.Configuration; namespace Orleans.Journaling; @@ -7,6 +8,7 @@ public static class HostingExtensions { public static ISiloBuilder AddStateMachineStorage(this ISiloBuilder builder) { + builder.Services.AddOptions(); builder.Services.TryAddScoped(sp => sp.GetRequiredService().Create(sp.GetRequiredService())); builder.Services.TryAddScoped(); builder.Services.TryAddKeyedScoped(typeof(IDurableDictionary<,>), KeyedService.AnyKey, typeof(DurableDictionary<,>)); @@ -16,6 +18,7 @@ public static ISiloBuilder AddStateMachineStorage(this ISiloBuilder builder) builder.Services.TryAddKeyedScoped(typeof(IDurableValue<>), KeyedService.AnyKey, typeof(DurableValue<>)); builder.Services.TryAddKeyedScoped(typeof(IPersistentState<>), KeyedService.AnyKey, typeof(DurableState<>)); builder.Services.TryAddKeyedScoped(typeof(IDurableTaskCompletionSource<>), KeyedService.AnyKey, typeof(DurableTaskCompletionSource<>)); + builder.Services.TryAddKeyedScoped(typeof(IDurableNothing), KeyedService.AnyKey, typeof(DurableNothing)); return builder; } } diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs index cb8b7296489..54ce87146b3 100644 --- a/src/Orleans.Journaling/StateMachineManager.cs +++ b/src/Orleans.Journaling/StateMachineManager.cs @@ -1,7 +1,10 @@ using System.Buffers; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Xml.Linq; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using Orleans.Runtime.Internal; using Orleans.Serialization.Codecs; using Orleans.Serialization.Session; @@ -13,43 +16,51 @@ internal sealed partial class StateMachineManager : IStateMachineManager, ILifec private const int MinApplicationStateMachineId = 8; private static readonly StringCodec StringCodec = new(); private static readonly UInt64Codec UInt64Codec = new(); + private static readonly DateTimeCodec DateTimeCodec = new(); private readonly object _lock = new(); private readonly Dictionary _stateMachines = new(StringComparer.Ordinal); private readonly Dictionary _stateMachinesMap = []; private readonly IStateMachineStorage _storage; private readonly ILogger _logger; + private readonly TimeProvider _timeProvider; private readonly SingleWaiterAutoResetEvent _workSignal = new() { RunContinuationsAsynchronously = true }; private readonly Queue _workQueue = new(); private readonly CancellationTokenSource _shutdownCancellation = new(); private readonly StateMachineManagerState _stateMachineIds; + private readonly StateMachinesRetirementTracker _retirementTracker; + private readonly TimeSpan _retirementGracePeriod; private readonly Task _workLoop; private ManagerState _state; private Task? _pendingWrite; - private bool _hasStateMachineToRetire; private ulong _nextStateMachineId = MinApplicationStateMachineId; private LogExtentBuilder? _currentLogSegment; public StateMachineManager( IStateMachineStorage storage, ILogger logger, - SerializerSessionPool serializerSessionPool) + IOptions options, + SerializerSessionPool serializerSessionPool, + TimeProvider timeProvider) { _storage = storage; _logger = logger; + _timeProvider = timeProvider; + _retirementGracePeriod = options.Value.RetirementGracePeriod; // The list of known state machines is itself stored as a durable state machine with the implicit id 0. // This allows us to recover the list of state machines ids without having to store it separately. _stateMachineIds = new StateMachineManagerState(this, StringCodec, UInt64Codec, serializerSessionPool); _stateMachinesMap[0] = _stateMachineIds; - + _retirementTracker = new StateMachinesRetirementTracker(this, StringCodec, DateTimeCodec, serializerSessionPool); _workLoop = Start(); } public void RegisterStateMachine(string name, IDurableStateMachine stateMachine) { - _shutdownCancellation.Token.ThrowIfCancellationRequested(); ArgumentNullException.ThrowIfNullOrEmpty(name); + _shutdownCancellation.Token.ThrowIfCancellationRequested(); + lock (_lock) { _stateMachines.Add(name, stateMachine); @@ -134,26 +145,7 @@ private async Task WorkLoop() // If there are pending writes, reset them since they will be captured by the snapshot instead. // If we did not do this, the log would begin with some writes which would be followed by a snapshot which also included those writes. _currentLogSegment?.Reset(); - - // We use this flag because the majority of times, there won't be any state machine to retire. - if (_hasStateMachineToRetire) - { - // Since this is a snapshot, we use the opportunity to purge retired state machines. - foreach (var (id, machine) in _stateMachinesMap) - { - if (machine is DurableNothing retired) - { - var name = retired.StateMachineKey; - - LogPurgingRetiredStateMachine(_logger, name, id); - - _stateMachinesMap.Remove(id); - - // This will take effect when the snapshot is persisted below. - _stateMachineIds.Remove(name); - } - } - } + RetiredOrResurectStateMachines(); } _currentLogSegment ??= new(); @@ -167,7 +159,6 @@ private async Task WorkLoop() { if (id is 0 || stateMachine is null) { - // Skip state machines which have been removed. continue; } @@ -246,7 +237,7 @@ private async Task WorkLoop() if (!_stateMachineIds.ContainsKey(name)) { // Doing so will trigger a reset, since _stateMachineIds will call OnSetStateMachineId, which resets the state machine in question. - _stateMachineIds[name] = _nextStateMachineId++; + _stateMachineIds[name] = name == StateMachinesRetirementTracker.Name ? StateMachinesRetirementTracker.Id : _nextStateMachineId++; } } } @@ -277,6 +268,42 @@ private async Task WorkLoop() } } + private void RetiredOrResurectStateMachines() + { + lock (_lock) + { + foreach (var (name, timestamp) in _retirementTracker) + { + var isDuetime = _timeProvider.GetUtcNow().UtcDateTime - timestamp >= _retirementGracePeriod; + if (isDuetime && _stateMachineIds.TryGetValue(name, out var id)) + { + var stateMachine = _stateMachines[name]; + + Debug.Assert(stateMachine is not null); + + if (stateMachine is RetiredMachineVessel) + { + LogRemovingRetiredStateMachine(_logger, name); + + // Since we are permanently removing this state machine, we will clean it up by reseting it. + stateMachine.Reset(new StateMachineLogWriter(this, new(id))); + + _stateMachinesMap.Remove(id); + // We remove these from memory only, since the snapshot will persist these changes. + _stateMachineIds.ApplyRemove(name); + _retirementTracker.ApplyRemove(name); + } + else + { + LogRetiredStateMachineComebackDetected(_logger, name); + // We remove the tracker from memory only, since the snapshot will persist the change. + _retirementTracker.ApplyRemove(name); + } + } + } + } + } + private static void AppendUpdatesOrSnapshotStateMachine(LogExtentBuilder logSegment, bool isSnapshot, ulong id, IDurableStateMachine stateMachine) { var writer = logSegment.CreateLogWriter(new(id)); @@ -306,7 +333,11 @@ public async ValueTask DeleteStateAsync(CancellationToken cancellationToken) private async Task RecoverAsync(CancellationToken cancellationToken) { - _stateMachineIds.ResetVolatileState(); + lock (_lock) + { + _stateMachineIds.ResetVolatileState(); + } + await foreach (var segment in _storage.ReadAsync(cancellationToken)) { cancellationToken.ThrowIfCancellationRequested(); @@ -326,9 +357,17 @@ private async Task RecoverAsync(CancellationToken cancellationToken) lock (_lock) { - foreach (var stateMachine in _stateMachines.Values) + foreach ((var name, var stateMachine) in _stateMachines) { stateMachine.OnRecoveryCompleted(); + + if (stateMachine is RetiredMachineVessel) + { + if (_retirementTracker.TryAdd(name, _timeProvider.GetUtcNow().UtcDateTime)) + { + LogRetiredStateMachineDetected(_logger, name); + } + } } } } @@ -387,15 +426,13 @@ private void OnSetStateMachineId(string name, ulong id) } else { - // When a state machine is not found, we substitute it with a DurableNothing. - // It will be purged on the next snapshot. + var vessel = new RetiredMachineVessel(); - LogRetiredStateMachineDetected(_logger, name, id); + // We must not make the vessel self-register with the manager, since it will + // result in a late-registration after the manger is 'ready'. Instead we add it inline here. - _stateMachinesMap[id] = new DurableNothing(name, this); - _hasStateMachineToRetire = true; - - // No need to call Reset, as DurableNothing does nothing. + _stateMachines.Add(name, vessel); + _stateMachinesMap[id] = vessel; } } } @@ -477,6 +514,45 @@ private sealed class StateMachineManagerState( protected override void OnSet(string key, ulong value) => _manager.OnSetStateMachineId(key, value); } + /// + /// Used to track state machines that are not registered via user-code anymore, until time-based purging has elapsed. + /// + /// Resurrecting of retired machines is supported. + private sealed class StateMachinesRetirementTracker( + StateMachineManager manager, IFieldCodec keyCodec, IFieldCodec valueCodec, SerializerSessionPool sessionPool) + : DurableDictionary(Name, manager, keyCodec, valueCodec, sessionPool) + { + public const int Id = 1; + public const string Name = "orleans_retirement_tracker"; + + private readonly StateMachineLogWriter _logWriter = new(manager, new(Id)); + + protected override IStateMachineLogWriter GetStorage() => _logWriter; + } + + /// + /// Used to keep retired machines into a purgatory state until time-based purging or if a comeback ocurrs. + /// This keeps buffering entries and dumps them back into the log upon compaction. + /// + private sealed class RetiredMachineVessel : IDurableStateMachine + { + private readonly List _bufferedData = []; + + void IDurableStateMachine.AppendSnapshot(StateMachineStorageWriter snapshotWriter) + { + foreach (var data in _bufferedData) + { + snapshotWriter.AppendEntry(data); + } + } + + void IDurableStateMachine.Reset(IStateMachineLogWriter storage) => _bufferedData.Clear(); + void IDurableStateMachine.Apply(ReadOnlySequence logEntry) => _bufferedData.Add(logEntry.ToArray()); + void IDurableStateMachine.AppendEntries(StateMachineStorageWriter logWriter) { } + IDurableStateMachine IDurableStateMachine.DeepCopy() => throw new NotSupportedException(); + } + + [LoggerMessage( Level = LogLevel.Error, Message = "Error processing work items.")] @@ -484,11 +560,17 @@ private sealed class StateMachineManagerState( [LoggerMessage( Level = LogLevel.Information, - Message = "State machine \"{Name}\" (id: {Id}) not found. Substituting a placeholder for graceful retirement.")] - private static partial void LogRetiredStateMachineDetected(ILogger logger, string name, ulong id); + Message = "State machine \"{Name}\" was not found. I have substituted a placeholder for graceful time-based retirement.")] + private static partial void LogRetiredStateMachineDetected(ILogger logger, string name); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "State machine \"{Name}\" was previously retired (but not removed), and has hence been re-introduced. " + + "There is still time left before its permanent removal, so I will resurrect it.")] + private static partial void LogRetiredStateMachineComebackDetected(ILogger logger, string name); [LoggerMessage( Level = LogLevel.Information, - Message = "Purging retired state machine \"{Name}\" (id: {Id}).")] - private static partial void LogPurgingRetiredStateMachine(ILogger logger, string name, ulong id); + Message = "Removing retired state machine \"{Name}\" and its data. Operation will be durably persisted shortly after compaction has finalized.")] + private static partial void LogRemovingRetiredStateMachine(ILogger logger, string name); } diff --git a/src/Orleans.Journaling/StateMachineManagerOptions.cs b/src/Orleans.Journaling/StateMachineManagerOptions.cs new file mode 100644 index 00000000000..9fa9e7c8914 --- /dev/null +++ b/src/Orleans.Journaling/StateMachineManagerOptions.cs @@ -0,0 +1,26 @@ +namespace Orleans.Journaling; + +/// +/// Options to configure the . +/// +public sealed class StateMachineManagerOptions +{ + /// + /// Specifies the period of time to wait until the manager retires + /// a if its not registered in the manager anymore. + /// + /// + /// The act of retirement removes this state machine from the log. + /// If the state machine is reintroduced (within the grace period), than it will not be removed by the manager. + /// + /// This value represents the minimum time the fate of the state machine will be postponed. + /// The final decision can take longer - usually + [time until next compaction occurs]. + /// + /// + public TimeSpan RetirementGracePeriod { get; set; } = DEFAULT_RETIREMENT_GRACE_PERIOD; + + /// + /// The default value of . + /// + public static readonly TimeSpan DEFAULT_RETIREMENT_GRACE_PERIOD = TimeSpan.FromHours(1); +} diff --git a/test/ConsoleApp5/ConsoleApp5.csproj b/test/ConsoleApp5/ConsoleApp5.csproj new file mode 100644 index 00000000000..06bff8cabb6 --- /dev/null +++ b/test/ConsoleApp5/ConsoleApp5.csproj @@ -0,0 +1,15 @@ + + + + Exe + net9.0 + enable + true + + + + + + + + \ No newline at end of file diff --git a/test/ConsoleApp5/Program.cs b/test/ConsoleApp5/Program.cs new file mode 100644 index 00000000000..a0ba0c18746 --- /dev/null +++ b/test/ConsoleApp5/Program.cs @@ -0,0 +1,65 @@ +using Azure.Storage.Blobs; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Orleans.Journaling; + +var host = Host.CreateDefaultBuilder(args) + .ConfigureLogging(builder => + { + builder.AddConsole().SetMinimumLevel(LogLevel.Error); + builder.AddFilter("Orleans.Journaling.StateMachineManager", LogLevel.Debug); + }) + .UseOrleans(builder => + { + builder.UseLocalhostClustering(); +#pragma warning disable ORLEANSEXP005 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. + builder.Configure(o => o.RetirementGracePeriod = TimeSpan.FromSeconds(10)); + builder.AddAzureAppendBlobStateMachineStorage(options => + { + options.ContainerName = "test-grains"; + options.BlobServiceClient = new BlobServiceClient("UseDevelopmentStorage=true"); + options.GetBlobName = (grainId) => $"{grainId.Type}-{grainId.Key}.bin"; + }); + }) + .Build(); + +await host.StartAsync(); + +var grainFactory = host.Services.GetRequiredService(); +var grain = grainFactory.GetGrain("key"); + +int i = 1; +while (i <= 50) +{ + await grain.Ping(); + await Task.Delay(1000); + i++; +} + +Console.ReadKey(); + +public interface ITestGrain : IGrainWithStringKey +{ + Task Ping(); +} + +public class TestGrain( + [FromKeyedServices("dict1")] IDurableValue machine1 + //,[FromKeyedServices("dict2")] IDurableValue machine2 + ) : DurableGrain, ITestGrain +{ + public async Task Ping() + { + Do("machine1", machine1); + //Console.WriteLine("------------"); Do("machine2", machine2); + + await WriteStateAsync(); + } + + void Do(string name, IDurableValue machine) + { + machine.Value = ++machine.Value; + Console.WriteLine($"{name}: " + machine.Value); + } +} diff --git a/test/Orleans.Journaling.Tests/LogSegmentTests.cs b/test/Orleans.Journaling.Tests/LogSegmentTests.cs index 2482e65f32d..d24f2d93412 100644 --- a/test/Orleans.Journaling.Tests/LogSegmentTests.cs +++ b/test/Orleans.Journaling.Tests/LogSegmentTests.cs @@ -48,6 +48,7 @@ public abstract class LogSegmentTests : IAsyncLifetime private IServiceProvider _serviceProvider = null!; private SiloLifecycleSubject? _siloLifecycle; private IStateMachineStorageProvider _storageProvider = null!; + private static readonly IOptions ManagerOptions = Options.Create(new StateMachineManagerOptions()); public virtual async Task InitializeAsync() { @@ -85,7 +86,7 @@ public async Task DisposeAsync() var codecProvider = _serviceProvider.GetRequiredService(); var grainContext = new TestGrainContext(grainId); // Use provided GrainId var storage = _storageProvider.Create(grainContext); - var manager = new StateMachineManager(storage, _serviceProvider.GetRequiredService>(), sessionPool); + var manager = new StateMachineManager(storage, _serviceProvider.GetRequiredService>(), ManagerOptions, sessionPool, TimeProvider.System); var list = new DurableList(listName, manager, codecProvider.GetCodec(), sessionPool); return (manager, list, storage); } @@ -143,7 +144,7 @@ public async Task DurableList_Persistence_Test() var sessionPool = _serviceProvider.GetRequiredService(); var codecProvider = _serviceProvider.GetRequiredService(); - var manager2 = new StateMachineManager(storage, _serviceProvider.GetRequiredService>(), sessionPool); + var manager2 = new StateMachineManager(storage, _serviceProvider.GetRequiredService>(), ManagerOptions, sessionPool, TimeProvider.System); var list2 = new DurableList(listName, manager2, codecProvider.GetCodec(), sessionPool); await manager2.InitializeAsync(cts.Token); @@ -342,7 +343,7 @@ public async Task DurableList_LargeNumberOfOperations_And_Snapshot_Test() // Test recovery (potentially from snapshot) var sessionPool = _serviceProvider.GetRequiredService(); var codecProvider = _serviceProvider.GetRequiredService(); - var manager2 = new StateMachineManager(storage, _serviceProvider.GetRequiredService>(), sessionPool); // Reuses the storage object linked via grainId + var manager2 = new StateMachineManager(storage, _serviceProvider.GetRequiredService>(), ManagerOptions, sessionPool, TimeProvider.System);// Reuses the storage object linked via grainId var list2 = new DurableList(listName, manager2, codecProvider.GetCodec(), sessionPool); await manager2.InitializeAsync(cts.Token); diff --git a/test/Orleans.Journaling.Tests/Orleans.Journaling.Tests.csproj b/test/Orleans.Journaling.Tests/Orleans.Journaling.Tests.csproj index db5f347c593..c6f90490f5e 100644 --- a/test/Orleans.Journaling.Tests/Orleans.Journaling.Tests.csproj +++ b/test/Orleans.Journaling.Tests/Orleans.Journaling.Tests.csproj @@ -15,6 +15,7 @@ + diff --git a/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs b/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs index 54b6ae6858e..e5ca107a3f5 100644 --- a/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs +++ b/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs @@ -1,4 +1,5 @@ using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Time.Testing; using Xunit; namespace Orleans.Journaling.Tests; @@ -258,63 +259,144 @@ public async Task StateMachineManager_LargeStateRecovery_Test() } /// - /// Tests that a "retired" state machine (one that is no longer registered) - /// has its data (and itself) purged when the storage triggers a compaction. + /// Tests the full lifecycle of a retired state machine. It is preserved and also reintroduced through an + /// early compaction, but purged eventually after its grace period expires on later compactions. /// [Fact] - public async Task StateMachineManager_RetiredStateMachine_IsPurgedOnCompaction() + public async Task StateMachineManager_AutoRetiringStateMachines() { const string DictToKeepKey = "dictToKeep"; const string DictToRetireKey = "dictToRetire"; - var sut1 = CreateTestSystem(); + var period = ManagerOptions.RetirementGracePeriod; + var timeProvider = new FakeTimeProvider(DateTime.UtcNow); + var storage = CreateStorage(); - // We beging with 2 dictionaries, one of which we will retire by means of not registering it in the manger. - // This would be in the real-world developers removing it from the grain's ctor as a dependecy. - var dictToKeep = new DurableDictionary(DictToKeepKey, sut1.Manager, CodecProvider.GetCodec(), CodecProvider.GetCodec(), SessionPool); - var dictToRetire = new DurableDictionary(DictToRetireKey, sut1.Manager, CodecProvider.GetCodec(), CodecProvider.GetCodec(), SessionPool); + // -------------- STEP 1 -------------- + + // We begin with 2 dictionaries, one of which we will retire by means of not registering it in the manger. + // This would be in the real-world developers removing it from the grain's ctor as a dependecy. + var sut1 = CreateTestSystem(storage, timeProvider); + var dictToKeep1 = CreateTestMachine(DictToKeepKey, sut1.Manager); + var dictToRetire2 = CreateTestMachine(DictToRetireKey, sut1.Manager); await sut1.Lifecycle.OnStart(); - dictToKeep.Add("a", 1); - dictToRetire.Add("b", 2); + dictToKeep1.Add("a", 1); + dictToRetire2.Add("b", 1); await sut1.Manager.WriteStateAsync(CancellationToken.None); - var sut2 = CreateTestSystem(sut1.Storage); + // -------------- STEP 2 -------------- // This time, we only register the dictionary we want to keep, this marks dictToRetire as retired. - var recoveredDictToKeep = new DurableDictionary(DictToKeepKey, sut2.Manager, CodecProvider.GetCodec(), CodecProvider.GetCodec(), SessionPool); + var sut2 = CreateTestSystem(storage, timeProvider); + var dictToKeep2 = CreateTestMachine(DictToKeepKey, sut2.Manager); await sut2.Lifecycle.OnStart(); + + // The manager should have recovered the state for dictToKeep, + // and created a DurableNothing placeholder for dictToRetire (we cant test for it at this point). + await SafeAssertEqual(1, "a", dictToKeep2); - // The manager should have recovered the state for dictToKeep, and created a DurableNothing placeholder for dictToRetire. - Assert.Equal(1, recoveredDictToKeep["a"]); + // We advance time by half the grace period to see if we can save it from purging. + timeProvider.Advance(period / 2); - // Now, we trigger the compaction logic in VolatileStateMachineStorage by writing more than 10 times. - for (var i = 0; i < 11; i++) - { - recoveredDictToKeep["a"] = i; - await sut2.Manager.WriteStateAsync(CancellationToken.None); - } + await TriggerCompaction(sut2.Manager, dictToKeep2); - // At this point, the manager has performed a snapshot, so it should have purged the dictToRetire data. - var sut3 = CreateTestSystem(sut2.Storage); + // -------------- STEP 3 -------------- + + // Verify that the retired dictionary was NOT purged by this compaction, as only half the time has passed. + var sut3 = CreateTestSystem(storage, timeProvider); + var dictToKeep3 = CreateTestMachine(DictToKeepKey, sut3.Manager); + var dictToRetire3 = CreateTestMachine(DictToRetireKey, sut3.Manager); + + await sut3.Lifecycle.OnStart();; + + await SafeAssertEqual(10, "a", dictToKeep3); + // The fact this entry ["b", 1] exists proves that the state of dictToRetire was preserved, even though we did not register it in step 2. + await SafeAssertEqual(1, "b", dictToRetire3); + + // By advancing time by another half-period we cover the full period. But since we have re-introduced dictToRetire, we should have un-retired it. + // This is similar to step 2, but there we avoided purging due to time not being due, whereas here we avoid purging due to re-registration. + timeProvider.Advance(period / 2); + + await TriggerCompaction(sut3.Manager, dictToKeep3); + + // -------------- STEP 4 -------------- + + // Because of re-registration is step 3 (to test it was not purged), this means dictToRetire has been removed from the tracker. + // Again as in step 2, we only register the dictionary we want to keep, this marks dictToRetire as retired. + var sut4 = CreateTestSystem(storage, timeProvider); + var dictToKeep4 = CreateTestMachine(DictToKeepKey, sut4.Manager); + + await sut4.Lifecycle.OnStart(); + + // The manager should have recovered the state for dictToKeep. + // It should have created a DurableNothing placeholder for dictToRetire, but we can not test for that. - // So by registering both dictionaries again, we should see what state remains after the snapshot. - var finalDictToKeep = new DurableDictionary(DictToKeepKey, sut3.Manager, CodecProvider.GetCodec(), CodecProvider.GetCodec(), SessionPool); - var finalDictToRetire = new DurableDictionary(DictToRetireKey, sut3.Manager, CodecProvider.GetCodec(), CodecProvider.GetCodec(), SessionPool); - await sut3.Lifecycle.OnStart(); + // This time we advance time to cover the full period. Note that this is necccessary because a side effect of step 3 + // was that dictToRetire was removed from the tracker (since it came back), so just triggering a compaction won't cut it + // as time to retire will essentially be reset to "now". + timeProvider.Advance(period); - // The dictionary we kept should have the last value written to it. - Assert.Equal(10, finalDictToKeep["a"]); // The last value from the i=0..10 loop. + // This compaction should finally purge it. + await TriggerCompaction(sut4.Manager, dictToKeep4); + + // -------------- STEP 5 -------------- + + // At this point, the manager has performed a snapshot, so it should have purged the dictToRetire data. + // By registering both dictionaries again, we should see what state remains after the snapshot. + var sut5 = CreateTestSystem(storage, timeProvider); + var dictToKeep5 = CreateTestMachine(DictToKeepKey, sut5.Manager); + var dictToRetire5 = CreateTestMachine(DictToRetireKey, sut5.Manager); + + await sut5.Lifecycle.OnStart(); + await SafeAssertEqual(10, "a", dictToKeep5); // The retired dictionary should now be empty because its state was purged during the compaction. - // Note that this is a new version of dictToRetire, since that is removed as a state machine, idea here - // is that if we can register a new dictToRetire with the same key, it means that the machine itself has been removed - // but also the data, otherwise previous machine would have had at least one entry i.e. ["b", 2]. The removal of the machine - // itself has also the nice benefit of being able to reuse old machine names. - Assert.Empty(finalDictToRetire); + // Note that this is a new version of dictToRetire, since the original was removed. Idea here is + // that if we can register a new dictToRetire (with the same key), it means that the machine itself + // has been removed but also the data, otherwise a previous machine would have had at least one + // entry i.e. ["b", 1]. + + Assert.Empty(dictToRetire5); + + // Note: The retirement of state machines has the nice benefit of being able to reuse machine names. + + DurableDictionary CreateTestMachine(string key, IStateMachineManager manager) => + new(key, manager, CodecProvider.GetCodec(), CodecProvider.GetCodec(), SessionPool); + + static async Task TriggerCompaction(IStateMachineManager manager, DurableDictionary dict) + { + for (var i = 0; i < 11; i++) + { + dict["a"] = i; + await manager.WriteStateAsync(CancellationToken.None); + } + } + + // Sometimes the KVP is not (yet) available when a check is followed immediately after the lifecycle subject has started. + static async Task SafeAssertEqual(int expected, string key, DurableDictionary dict) + { + const int MaxAttempts = 10; + + var attempt = 0; + + while (true) + { + try + { + Assert.Equal(expected, dict[key]); + break; + } + catch (KeyNotFoundException) when (attempt < MaxAttempts) + { + attempt++; + await Task.Delay(100); + } + } + } } } diff --git a/test/Orleans.Journaling.Tests/StateMachineTestBase.cs b/test/Orleans.Journaling.Tests/StateMachineTestBase.cs index 78adb0435ac..5e94f605be5 100644 --- a/test/Orleans.Journaling.Tests/StateMachineTestBase.cs +++ b/test/Orleans.Journaling.Tests/StateMachineTestBase.cs @@ -1,9 +1,11 @@ using System.Collections.Immutable; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using Orleans.Serialization; using Orleans.Serialization.Serializers; using Orleans.Serialization.Session; +using Microsoft.Extensions.Time.Testing; namespace Orleans.Journaling.Tests; @@ -16,10 +18,12 @@ public abstract class StateMachineTestBase protected readonly SerializerSessionPool SessionPool; protected readonly ICodecProvider CodecProvider; protected readonly ILoggerFactory LoggerFactory; + protected readonly StateMachineManagerOptions ManagerOptions = new(); protected StateMachineTestBase() { var services = new ServiceCollection(); + services.AddSerializer(); services.AddLogging(builder => builder.AddConsole()); @@ -40,11 +44,14 @@ protected virtual IStateMachineStorage CreateStorage() /// /// Creates a state machine manager with in-memory storage /// - internal (IStateMachineManager Manager, IStateMachineStorage Storage, ILifecycleSubject Lifecycle) CreateTestSystem(IStateMachineStorage? storage = null) + internal (IStateMachineManager Manager, IStateMachineStorage Storage, ILifecycleSubject Lifecycle) + CreateTestSystem(IStateMachineStorage? storage = null, TimeProvider? provider = null) { storage ??= CreateStorage(); + provider ??= TimeProvider.System; + var logger = LoggerFactory.CreateLogger(); - var manager = new StateMachineManager(storage, logger, SessionPool); + var manager = new StateMachineManager(storage, logger, Options.Create(ManagerOptions), SessionPool, provider); var lifecycle = new GrainLifecycle(LoggerFactory.CreateLogger()); (manager as ILifecycleParticipant)?.Participate(lifecycle); return (manager, storage, lifecycle); From 15213bd508571318316faec0bd75dc5d7a0f230c Mon Sep 17 00:00:00 2001 From: Ledjon Behluli Date: Sun, 28 Sep 2025 23:43:23 +0200 Subject: [PATCH 06/34] last touches --- src/Orleans.Journaling/StateMachineManager.cs | 63 ++++++++++--------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs index 54ce87146b3..a6a9c1b1ee1 100644 --- a/src/Orleans.Journaling/StateMachineManager.cs +++ b/src/Orleans.Journaling/StateMachineManager.cs @@ -51,7 +51,9 @@ public StateMachineManager( // This allows us to recover the list of state machines ids without having to store it separately. _stateMachineIds = new StateMachineManagerState(this, StringCodec, UInt64Codec, serializerSessionPool); _stateMachinesMap[0] = _stateMachineIds; + _retirementTracker = new StateMachinesRetirementTracker(this, StringCodec, DateTimeCodec, serializerSessionPool); + _workLoop = Start(); } @@ -145,7 +147,11 @@ private async Task WorkLoop() // If there are pending writes, reset them since they will be captured by the snapshot instead. // If we did not do this, the log would begin with some writes which would be followed by a snapshot which also included those writes. _currentLogSegment?.Reset(); - RetiredOrResurectStateMachines(); + + if (_retirementTracker.Count > 0) + { + RetiredOrResurectStateMachines(); + } } _currentLogSegment ??= new(); @@ -270,35 +276,32 @@ private async Task WorkLoop() private void RetiredOrResurectStateMachines() { - lock (_lock) + foreach (var (name, timestamp) in _retirementTracker) { - foreach (var (name, timestamp) in _retirementTracker) + var isDuetime = _timeProvider.GetUtcNow().UtcDateTime - timestamp >= _retirementGracePeriod; + if (isDuetime && _stateMachineIds.TryGetValue(name, out var id)) { - var isDuetime = _timeProvider.GetUtcNow().UtcDateTime - timestamp >= _retirementGracePeriod; - if (isDuetime && _stateMachineIds.TryGetValue(name, out var id)) - { - var stateMachine = _stateMachines[name]; + var stateMachine = _stateMachines[name]; - Debug.Assert(stateMachine is not null); + Debug.Assert(stateMachine is not null); - if (stateMachine is RetiredMachineVessel) - { - LogRemovingRetiredStateMachine(_logger, name); + if (stateMachine is RetiredStateMachineVessel) + { + LogRemovingRetiredStateMachine(_logger, name); - // Since we are permanently removing this state machine, we will clean it up by reseting it. - stateMachine.Reset(new StateMachineLogWriter(this, new(id))); + // Since we are permanently removing this state machine, we will clean it up by reseting it. + stateMachine.Reset(new StateMachineLogWriter(this, new(id))); - _stateMachinesMap.Remove(id); - // We remove these from memory only, since the snapshot will persist these changes. - _stateMachineIds.ApplyRemove(name); - _retirementTracker.ApplyRemove(name); - } - else - { - LogRetiredStateMachineComebackDetected(_logger, name); - // We remove the tracker from memory only, since the snapshot will persist the change. - _retirementTracker.ApplyRemove(name); - } + _stateMachinesMap.Remove(id); + // We remove these from memory only, since the snapshot will persist these changes. + _stateMachineIds.ApplyRemove(name); + _retirementTracker.ApplyRemove(name); + } + else + { + LogRetiredStateMachineComebackDetected(_logger, name); + // We remove the tracker from memory only, since the snapshot will persist the change. + _retirementTracker.ApplyRemove(name); } } } @@ -338,7 +341,7 @@ private async Task RecoverAsync(CancellationToken cancellationToken) _stateMachineIds.ResetVolatileState(); } - await foreach (var segment in _storage.ReadAsync(cancellationToken)) + await foreach (var segment in _storage.ReadAsync(cancellationToken).ConfigureAwait(false)) { cancellationToken.ThrowIfCancellationRequested(); try @@ -361,8 +364,9 @@ private async Task RecoverAsync(CancellationToken cancellationToken) { stateMachine.OnRecoveryCompleted(); - if (stateMachine is RetiredMachineVessel) + if (stateMachine is RetiredStateMachineVessel) { + // We can use TryAdd since recovery has finished. if (_retirementTracker.TryAdd(name, _timeProvider.GetUtcNow().UtcDateTime)) { LogRetiredStateMachineDetected(_logger, name); @@ -426,7 +430,7 @@ private void OnSetStateMachineId(string name, ulong id) } else { - var vessel = new RetiredMachineVessel(); + var vessel = new RetiredStateMachineVessel(); // We must not make the vessel self-register with the manager, since it will // result in a late-registration after the manger is 'ready'. Instead we add it inline here. @@ -498,7 +502,7 @@ private enum WorkItemType private enum ManagerState { Unknown, - Ready, + Ready } private sealed class StateMachineManagerState( @@ -534,7 +538,8 @@ private sealed class StateMachinesRetirementTracker( /// Used to keep retired machines into a purgatory state until time-based purging or if a comeback ocurrs. /// This keeps buffering entries and dumps them back into the log upon compaction. /// - private sealed class RetiredMachineVessel : IDurableStateMachine + [DebuggerDisplay(nameof(RetiredStateMachineVessel))] + private sealed class RetiredStateMachineVessel : IDurableStateMachine { private readonly List _bufferedData = []; From 6cc16126450a12b3658411d902a5b8a64b7d0711 Mon Sep 17 00:00:00 2001 From: Ledjon Behluli Date: Sun, 28 Sep 2025 23:55:42 +0200 Subject: [PATCH 07/34] remove test project --- test/ConsoleApp5/ConsoleApp5.csproj | 15 ------- test/ConsoleApp5/Program.cs | 65 ----------------------------- 2 files changed, 80 deletions(-) delete mode 100644 test/ConsoleApp5/ConsoleApp5.csproj delete mode 100644 test/ConsoleApp5/Program.cs diff --git a/test/ConsoleApp5/ConsoleApp5.csproj b/test/ConsoleApp5/ConsoleApp5.csproj deleted file mode 100644 index 06bff8cabb6..00000000000 --- a/test/ConsoleApp5/ConsoleApp5.csproj +++ /dev/null @@ -1,15 +0,0 @@ - - - - Exe - net9.0 - enable - true - - - - - - - - \ No newline at end of file diff --git a/test/ConsoleApp5/Program.cs b/test/ConsoleApp5/Program.cs deleted file mode 100644 index a0ba0c18746..00000000000 --- a/test/ConsoleApp5/Program.cs +++ /dev/null @@ -1,65 +0,0 @@ -using Azure.Storage.Blobs; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Orleans.Journaling; - -var host = Host.CreateDefaultBuilder(args) - .ConfigureLogging(builder => - { - builder.AddConsole().SetMinimumLevel(LogLevel.Error); - builder.AddFilter("Orleans.Journaling.StateMachineManager", LogLevel.Debug); - }) - .UseOrleans(builder => - { - builder.UseLocalhostClustering(); -#pragma warning disable ORLEANSEXP005 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. - builder.Configure(o => o.RetirementGracePeriod = TimeSpan.FromSeconds(10)); - builder.AddAzureAppendBlobStateMachineStorage(options => - { - options.ContainerName = "test-grains"; - options.BlobServiceClient = new BlobServiceClient("UseDevelopmentStorage=true"); - options.GetBlobName = (grainId) => $"{grainId.Type}-{grainId.Key}.bin"; - }); - }) - .Build(); - -await host.StartAsync(); - -var grainFactory = host.Services.GetRequiredService(); -var grain = grainFactory.GetGrain("key"); - -int i = 1; -while (i <= 50) -{ - await grain.Ping(); - await Task.Delay(1000); - i++; -} - -Console.ReadKey(); - -public interface ITestGrain : IGrainWithStringKey -{ - Task Ping(); -} - -public class TestGrain( - [FromKeyedServices("dict1")] IDurableValue machine1 - //,[FromKeyedServices("dict2")] IDurableValue machine2 - ) : DurableGrain, ITestGrain -{ - public async Task Ping() - { - Do("machine1", machine1); - //Console.WriteLine("------------"); Do("machine2", machine2); - - await WriteStateAsync(); - } - - void Do(string name, IDurableValue machine) - { - machine.Value = ++machine.Value; - Console.WriteLine($"{name}: " + machine.Value); - } -} From fbbd90a9f9ef9813585f98e478cc56c9d730bef8 Mon Sep 17 00:00:00 2001 From: Ledjon Behluli Date: Sun, 28 Sep 2025 23:58:44 +0200 Subject: [PATCH 08/34] remove unused usings --- src/Orleans.Journaling/StateMachineManager.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs index a6a9c1b1ee1..6d28c73be89 100644 --- a/src/Orleans.Journaling/StateMachineManager.cs +++ b/src/Orleans.Journaling/StateMachineManager.cs @@ -1,8 +1,6 @@ using System.Buffers; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; -using System.Runtime.CompilerServices; -using System.Xml.Linq; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Orleans.Runtime.Internal; From cbe7fde1292382a19c88179e1eb7ec8a9011093d Mon Sep 17 00:00:00 2001 From: Ledjon Behluli Date: Sun, 5 Oct 2025 15:39:35 +0200 Subject: [PATCH 09/34] prevent duplicate key error during state machine resurrection --- src/Orleans.Journaling/StateMachineManager.cs | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs index 6d28c73be89..ebb56f81792 100644 --- a/src/Orleans.Journaling/StateMachineManager.cs +++ b/src/Orleans.Journaling/StateMachineManager.cs @@ -58,12 +58,32 @@ public StateMachineManager( public void RegisterStateMachine(string name, IDurableStateMachine stateMachine) { ArgumentNullException.ThrowIfNullOrEmpty(name); - _shutdownCancellation.Token.ThrowIfCancellationRequested(); lock (_lock) { - _stateMachines.Add(name, stateMachine); + if (_stateMachines.TryGetValue(name, out var machine)) + { + + if (machine is RetiredStateMachineVessel) + { + // If the existing machine is a vessel for a retired one, it means the machine was loaded from a previous + // log during recovery but has not been re-registered. We effectively are "staging" the resurrection of the machine. + // The actual reseting and removal from the tracker is handled within the serialized loop. + // This is to prevent logical race conditions with the recovery process. + _stateMachines[name] = stateMachine; + } + else + { + // A real state machine is already registered with this name, this must be a developer error. + throw new ArgumentException($"A state machine with the key '{name}' has already been registered."); + } + } + else + { + _stateMachines.Add(name, stateMachine); + } + _workQueue.Enqueue(new WorkItem(WorkItemType.RegisterStateMachine, completion: null) { Context = name @@ -148,7 +168,7 @@ private async Task WorkLoop() if (_retirementTracker.Count > 0) { - RetiredOrResurectStateMachines(); + RetireOrResurectStateMachines(); } } @@ -272,7 +292,7 @@ private async Task WorkLoop() } } - private void RetiredOrResurectStateMachines() + private void RetireOrResurectStateMachines() { foreach (var (name, timestamp) in _retirementTracker) { From 14ea39ae3482ca1692a96a7b05a98910148bf17e Mon Sep 17 00:00:00 2001 From: Ledjon Behluli Date: Wed, 8 Oct 2025 00:14:53 +0200 Subject: [PATCH 10/34] ensure buffered data is re-applied if machine comes back --- src/Orleans.Journaling/StateMachineManager.cs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs index ebb56f81792..59dfe138a00 100644 --- a/src/Orleans.Journaling/StateMachineManager.cs +++ b/src/Orleans.Journaling/StateMachineManager.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Collections.ObjectModel; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using Microsoft.Extensions.Logging; @@ -64,13 +65,17 @@ public void RegisterStateMachine(string name, IDurableStateMachine stateMachine) { if (_stateMachines.TryGetValue(name, out var machine)) { - - if (machine is RetiredStateMachineVessel) + if (machine is RetiredStateMachineVessel vessel) { // If the existing machine is a vessel for a retired one, it means the machine was loaded from a previous // log during recovery but has not been re-registered. We effectively are "staging" the resurrection of the machine. - // The actual reseting and removal from the tracker is handled within the serialized loop. - // This is to prevent logical race conditions with the recovery process. + // The removal from the tracker is handled within the serialized loop. This is to prevent logical race conditions with the recovery process. + // We also make sure to apply any buffered data that could have occured while the vessel took this machine's place. + stateMachine.Reset(new StateMachineLogWriter(this, new(_stateMachineIds[name]))); + foreach (var entry in vessel.BufferedData) + { + stateMachine.Apply(new ReadOnlySequence(entry)); + } _stateMachines[name] = stateMachine; } else @@ -561,6 +566,8 @@ private sealed class RetiredStateMachineVessel : IDurableStateMachine { private readonly List _bufferedData = []; + public ReadOnlyCollection BufferedData => _bufferedData.AsReadOnly(); + void IDurableStateMachine.AppendSnapshot(StateMachineStorageWriter snapshotWriter) { foreach (var data in _bufferedData) From c45cc71bb9e31e781fe7f0c2427f07d0e0aa458a Mon Sep 17 00:00:00 2001 From: Ledjon Behluli Date: Sat, 11 Oct 2025 00:07:18 +0200 Subject: [PATCH 11/34] remove hacky assertion in tests --- .../StateMachineManagerTests.cs | 33 ++++--------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs b/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs index e5ca107a3f5..f86c26b025d 100644 --- a/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs +++ b/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs @@ -297,7 +297,7 @@ public async Task StateMachineManager_AutoRetiringStateMachines() // The manager should have recovered the state for dictToKeep, // and created a DurableNothing placeholder for dictToRetire (we cant test for it at this point). - await SafeAssertEqual(1, "a", dictToKeep2); + Assert.Equal(1, dictToKeep2["a"]); // We advance time by half the grace period to see if we can save it from purging. timeProvider.Advance(period / 2); @@ -311,11 +311,12 @@ public async Task StateMachineManager_AutoRetiringStateMachines() var dictToKeep3 = CreateTestMachine(DictToKeepKey, sut3.Manager); var dictToRetire3 = CreateTestMachine(DictToRetireKey, sut3.Manager); - await sut3.Lifecycle.OnStart();; + await sut3.Lifecycle.OnStart(); + + Assert.Equal(10, dictToKeep3["a"]); - await SafeAssertEqual(10, "a", dictToKeep3); // The fact this entry ["b", 1] exists proves that the state of dictToRetire was preserved, even though we did not register it in step 2. - await SafeAssertEqual(1, "b", dictToRetire3); + Assert.Equal(1, dictToRetire3["b"]); // By advancing time by another half-period we cover the full period. But since we have re-introduced dictToRetire, we should have un-retired it. // This is similar to step 2, but there we avoided purging due to time not being due, whereas here we avoid purging due to re-registration. @@ -353,7 +354,7 @@ public async Task StateMachineManager_AutoRetiringStateMachines() var dictToRetire5 = CreateTestMachine(DictToRetireKey, sut5.Manager); await sut5.Lifecycle.OnStart(); - await SafeAssertEqual(10, "a", dictToKeep5); + Assert.Equal(10, dictToKeep5["a"]); // The retired dictionary should now be empty because its state was purged during the compaction. // Note that this is a new version of dictToRetire, since the original was removed. Idea here is @@ -376,27 +377,5 @@ static async Task TriggerCompaction(IStateMachineManager manager, DurableDiction await manager.WriteStateAsync(CancellationToken.None); } } - - // Sometimes the KVP is not (yet) available when a check is followed immediately after the lifecycle subject has started. - static async Task SafeAssertEqual(int expected, string key, DurableDictionary dict) - { - const int MaxAttempts = 10; - - var attempt = 0; - - while (true) - { - try - { - Assert.Equal(expected, dict[key]); - break; - } - catch (KeyNotFoundException) when (attempt < MaxAttempts) - { - attempt++; - await Task.Delay(100); - } - } - } } } From 8419cdba422b34867f7bd9cc4830b5c07651b6b7 Mon Sep 17 00:00:00 2001 From: Ledjon Behluli <46324828+ledjon-behluli@users.noreply.github.com> Date: Sun, 26 Oct 2025 16:28:25 +0100 Subject: [PATCH 12/34] Update src/Orleans.Journaling/StateMachineManager.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Orleans.Journaling/StateMachineManager.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs index 59dfe138a00..2699f683205 100644 --- a/src/Orleans.Journaling/StateMachineManager.cs +++ b/src/Orleans.Journaling/StateMachineManager.cs @@ -456,7 +456,7 @@ private void OnSetStateMachineId(string name, ulong id) var vessel = new RetiredStateMachineVessel(); // We must not make the vessel self-register with the manager, since it will - // result in a late-registration after the manger is 'ready'. Instead we add it inline here. + // result in a late-registration after the manager is 'ready'. Instead we add it inline here. _stateMachines.Add(name, vessel); _stateMachinesMap[id] = vessel; From dd396e0d4b5d06bd533a01b9f3fd1ccdc432cfd6 Mon Sep 17 00:00:00 2001 From: Ledjon Behluli <46324828+ledjon-behluli@users.noreply.github.com> Date: Sun, 26 Oct 2025 16:28:55 +0100 Subject: [PATCH 13/34] Update src/Orleans.Journaling/StateMachineManager.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Orleans.Journaling/StateMachineManager.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs index 2699f683205..bb414ca49a3 100644 --- a/src/Orleans.Journaling/StateMachineManager.cs +++ b/src/Orleans.Journaling/StateMachineManager.cs @@ -558,7 +558,7 @@ private sealed class StateMachinesRetirementTracker( } /// - /// Used to keep retired machines into a purgatory state until time-based purging or if a comeback ocurrs. + /// Used to keep retired machines into a purgatory state until time-based purging or if a comeback occurs. /// This keeps buffering entries and dumps them back into the log upon compaction. /// [DebuggerDisplay(nameof(RetiredStateMachineVessel))] From 37dab7fd74373ad52d92e7bd9256b1562eeb1560 Mon Sep 17 00:00:00 2001 From: Ledjon Behluli <46324828+ledjon-behluli@users.noreply.github.com> Date: Sun, 26 Oct 2025 16:29:06 +0100 Subject: [PATCH 14/34] Update test/Orleans.Journaling.Tests/StateMachineManagerTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- test/Orleans.Journaling.Tests/StateMachineManagerTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs b/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs index f86c26b025d..97854cc1d90 100644 --- a/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs +++ b/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs @@ -274,7 +274,7 @@ public async Task StateMachineManager_AutoRetiringStateMachines() // -------------- STEP 1 -------------- - // We begin with 2 dictionaries, one of which we will retire by means of not registering it in the manger. + // We begin with 2 dictionaries, one of which we will retire by means of not registering it in the manager. // This would be in the real-world developers removing it from the grain's ctor as a dependecy. var sut1 = CreateTestSystem(storage, timeProvider); var dictToKeep1 = CreateTestMachine(DictToKeepKey, sut1.Manager); From fdd14e6b66025428c455b538537432c9dc8bc773 Mon Sep 17 00:00:00 2001 From: Ledjon Behluli <46324828+ledjon-behluli@users.noreply.github.com> Date: Sun, 26 Oct 2025 16:29:16 +0100 Subject: [PATCH 15/34] Update test/Orleans.Journaling.Tests/StateMachineManagerTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- test/Orleans.Journaling.Tests/StateMachineManagerTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs b/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs index 97854cc1d90..300adf78c67 100644 --- a/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs +++ b/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs @@ -275,7 +275,7 @@ public async Task StateMachineManager_AutoRetiringStateMachines() // -------------- STEP 1 -------------- // We begin with 2 dictionaries, one of which we will retire by means of not registering it in the manager. - // This would be in the real-world developers removing it from the grain's ctor as a dependecy. + // This would be in the real-world developers removing it from the grain's ctor as a dependency. var sut1 = CreateTestSystem(storage, timeProvider); var dictToKeep1 = CreateTestMachine(DictToKeepKey, sut1.Manager); var dictToRetire2 = CreateTestMachine(DictToRetireKey, sut1.Manager); From 3de46b7aed2820fcc62b3739ff46f92737b14201 Mon Sep 17 00:00:00 2001 From: Ledjon Behluli <46324828+ledjon-behluli@users.noreply.github.com> Date: Sun, 26 Oct 2025 16:29:26 +0100 Subject: [PATCH 16/34] Update test/Orleans.Journaling.Tests/StateMachineManagerTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- test/Orleans.Journaling.Tests/StateMachineManagerTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs b/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs index 300adf78c67..cf8ddae6931 100644 --- a/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs +++ b/test/Orleans.Journaling.Tests/StateMachineManagerTests.cs @@ -337,7 +337,7 @@ public async Task StateMachineManager_AutoRetiringStateMachines() // It should have created a DurableNothing placeholder for dictToRetire, but we can not test for that. - // This time we advance time to cover the full period. Note that this is necccessary because a side effect of step 3 + // This time we advance time to cover the full period. Note that this is necessary because a side effect of step 3 // was that dictToRetire was removed from the tracker (since it came back), so just triggering a compaction won't cut it // as time to retire will essentially be reset to "now". timeProvider.Advance(period); From 40ce19c4ac52b29cc843318b16550d2cf9d22a23 Mon Sep 17 00:00:00 2001 From: Egil Hansen Date: Tue, 30 Sep 2025 16:49:57 +0000 Subject: [PATCH 17/34] Fix table creation logging to reflect existing tables correctly (#9696) The `CreateIfNotExistsAsync` method's response will have a 409-statuscode if the table already exists and will always return the `TableItem` if the call is successful. --- src/Azure/Shared/Storage/AzureTableDataManager.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Azure/Shared/Storage/AzureTableDataManager.cs b/src/Azure/Shared/Storage/AzureTableDataManager.cs index f4a92020c14..212ea9abd7e 100644 --- a/src/Azure/Shared/Storage/AzureTableDataManager.cs +++ b/src/Azure/Shared/Storage/AzureTableDataManager.cs @@ -81,10 +81,10 @@ public async Task InitTableAsync() { TableServiceClient tableCreationClient = await GetCloudTableCreationClientAsync(); var table = tableCreationClient.GetTableClient(TableName); - var tableItem = await table.CreateIfNotExistsAsync(); - var didCreate = tableItem is not null; + var response = await table.CreateIfNotExistsAsync(); + var alreadyExisted = response.GetRawResponse().Status == (int)HttpStatusCode.Conflict; - LogInfoTableCreation(Logger, didCreate ? "Created" : "Attached to", TableName); + LogInfoTableCreation(Logger, alreadyExisted ? "Attached to" : "Created", TableName); Table = table; } catch (TimeoutException te) From 75e791672edcf3ce949e2d3b7aab2c415ec54e8b Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Wed, 1 Oct 2025 16:26:42 -0700 Subject: [PATCH 18/34] Fix ZooKeeper CI tests: correct service name and use official Docker image (#9699) * Initial plan * Fix ZooKeeper CI tests: correct service name and use specific image tag Co-authored-by: ReubenBond <203839+ReubenBond@users.noreply.github.com> * Use official ZooKeeper Docker image instead of bitnami Co-authored-by: ReubenBond <203839+ReubenBond@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: ReubenBond <203839+ReubenBond@users.noreply.github.com> --- .github/workflows/ci.yml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 67da3bdd013..45701df1429 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -390,12 +390,10 @@ jobs: provider: ["ZooKeeper"] framework: [net8.0] services: - consul: - image: bitnami/zookeeper + zookeeper: + image: zookeeper:3.9 ports: - 2181:2181 - env: - ALLOW_ANONYMOUS_LOGIN: "yes" steps: - uses: actions/checkout@v4 - name: Setup .NET From 167d2259ee6c5112f32c31a678e19c2c7ed14677 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Thu, 2 Oct 2025 10:22:35 -0700 Subject: [PATCH 19/34] Fix Consul CI tests by using compatible Consul version (#9701) * Fix Consul key format to comply with allowed characters Co-authored-by: ReubenBond <203839+ReubenBond@users.noreply.github.com> * Use custom encoding for Consul keys instead of URL encoding Co-authored-by: ReubenBond <203839+ReubenBond@users.noreply.github.com> * Simplify encoding logic to handle both IPv4 and IPv6 Co-authored-by: ReubenBond <203839+ReubenBond@users.noreply.github.com> * Revert breaking changes and use older Consul version for tests Co-authored-by: ReubenBond <203839+ReubenBond@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: ReubenBond <203839+ReubenBond@users.noreply.github.com> --- .github/workflows/ci.yml | 16 +++++++++------- src/Orleans.TestingHost/TestClusterBuilder.cs | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 45701df1429..6f5c6db1ca1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -346,15 +346,14 @@ jobs: matrix: provider: ["Consul"] framework: [net8.0] - services: - consul: - image: hashicorp/consul - ports: - - 8500:8500 - - 8600:8600/tcp - - 8600:8600/udp steps: - uses: actions/checkout@v4 + - name: Start Consul + run: | + docker run -d --name consul -p 8500:8500 -p 8600:8600/tcp -p 8600:8600/udp \ + hashicorp/consul:1.19 agent -dev -client=0.0.0.0 + - name: Wait for Consul + run: sleep 5 - name: Setup .NET uses: actions/setup-dotnet@v4 with: @@ -372,6 +371,9 @@ jobs: # [SuppressMessage("Microsoft.Security", "CSCAN0090:ConfigFile", Justification="Not a secret")] # [SuppressMessage("Microsoft.Security", "CSCAN0220:DefaultPasswordContexts", Justification="Not a secret")] ORLEANSCONSULCONNECTIONSTRING: "http://localhost:8500" + - name: Clean up Consul container + if: always() + run: docker rm -f consul - name: Archive Test Results if: always() uses: actions/upload-artifact@v4 diff --git a/src/Orleans.TestingHost/TestClusterBuilder.cs b/src/Orleans.TestingHost/TestClusterBuilder.cs index 439eafcd63f..206222f1944 100644 --- a/src/Orleans.TestingHost/TestClusterBuilder.cs +++ b/src/Orleans.TestingHost/TestClusterBuilder.cs @@ -179,7 +179,7 @@ public static string CreateClusterId() string prefix = "testcluster-"; int randomSuffix = Random.Shared.Next(1000); DateTime now = DateTime.UtcNow; - string DateTimeFormat = @"yyyy-MM-dd\tHH-mm-ss"; + string DateTimeFormat = @"yyyy-MM-dd-HH-mm-ss"; return $"{prefix}{now.ToString(DateTimeFormat, CultureInfo.InvariantCulture)}-{randomSuffix}"; } From e7c922d70caa1265f317561c0ab14469a8991fb7 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Fri, 3 Oct 2025 07:20:40 -0700 Subject: [PATCH 20/34] Remove explicit ActivityStatusCode.Ok setting to comply with OpenTelemetry specification (#9703) * Initial plan * Remove explicit ActivityStatusCode.Ok per OpenTelemetry spec Co-authored-by: ReubenBond <203839+ReubenBond@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: ReubenBond <203839+ReubenBond@users.noreply.github.com> --- .../Diagnostics/ActivityPropagationGrainCallFilter.cs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/Orleans.Core/Diagnostics/ActivityPropagationGrainCallFilter.cs b/src/Orleans.Core/Diagnostics/ActivityPropagationGrainCallFilter.cs index 29ecebbcaa3..b1e48838fdf 100644 --- a/src/Orleans.Core/Diagnostics/ActivityPropagationGrainCallFilter.cs +++ b/src/Orleans.Core/Diagnostics/ActivityPropagationGrainCallFilter.cs @@ -49,10 +49,6 @@ protected static async Task Process(IGrainCallContext context, Activity activity try { await context.Invoke(); - if (activity is not null && activity.IsAllDataRequested) - { - activity.SetStatus(ActivityStatusCode.Ok); - } } catch (Exception e) { From b91aeeeb22879e90b181d0e80dec196f2f084002 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 3 Oct 2025 14:13:23 -0700 Subject: [PATCH 21/34] chore(deps): bump dotnet-sdk from 9.0.303 to 9.0.305 (#9677) Bumps [dotnet-sdk](https://github.com/dotnet/sdk) from 9.0.303 to 9.0.305. - [Release notes](https://github.com/dotnet/sdk/releases) - [Commits](https://github.com/dotnet/sdk/compare/v9.0.303...v9.0.305) --- updated-dependencies: - dependency-name: dotnet-sdk dependency-version: 9.0.305 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- global.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/global.json b/global.json index a377b594304..74e6143ef29 100644 --- a/global.json +++ b/global.json @@ -1,6 +1,6 @@ { "sdk": { "rollForward": "major", - "version": "9.0.303" + "version": "9.0.305" } } From 2a917e05387fa33b01758c6f1e3563d420f30345 Mon Sep 17 00:00:00 2001 From: Bk Date: Wed, 8 Oct 2025 03:59:11 +0900 Subject: [PATCH 22/34] Fix race condition in `TransactionInfo.Fork` `PendingCalls` increment (#9702) --- src/Orleans.Transactions/DistributedTM/TransactionInfo.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Orleans.Transactions/DistributedTM/TransactionInfo.cs b/src/Orleans.Transactions/DistributedTM/TransactionInfo.cs index 7e98e60cc9f..d124a6b8524 100644 --- a/src/Orleans.Transactions/DistributedTM/TransactionInfo.cs +++ b/src/Orleans.Transactions/DistributedTM/TransactionInfo.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Threading; using Orleans.Serialization; using Orleans.Transactions.Abstractions; @@ -70,7 +71,7 @@ public TransactionInfo(TransactionInfo other) : this() public TransactionInfo Fork() { - PendingCalls++; + Interlocked.Increment(ref PendingCalls); return new TransactionInfo(this); } From 8ea8f8f23be6bedfbc9886294815575e7ec0d81d Mon Sep 17 00:00:00 2001 From: Ledjon Behluli <46324828+ledjon-behluli@users.noreply.github.com> Date: Wed, 8 Oct 2025 16:28:21 +0200 Subject: [PATCH 23/34] [FIX] Potential NRE in the activation repartitioner (#9713) fix potentiall NRE in the activation repartioner --- .../ActivationRepartitioner.MessageSink.cs | 12 +++++++++--- .../Repartitioning/RepartitionerMessageFilter.cs | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.MessageSink.cs b/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.MessageSink.cs index 374eb34d557..58311a000b0 100644 --- a/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.MessageSink.cs +++ b/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.MessageSink.cs @@ -1,5 +1,6 @@ #nullable enable using System; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -53,7 +54,8 @@ private async Task ProcessPendingEdges(CancellationToken cancellationToken) { foreach (var message in drainBuffer[..count]) { - if (!_messageFilter.IsAcceptable(message, out var isSenderMigratable, out var isTargetMigratable)) + if (!IsFullyAddressed(message) || // The silo addresses (likely the target) is set null some time later (after the message is recorded), this can lead to a NRE + !_messageFilter.IsAcceptable(message, out var isSenderMigratable, out var isTargetMigratable)) { continue; } @@ -118,7 +120,7 @@ private void RecordMessage(Message message) } // Sender and target need to be fully addressable to know where to move to or towards. - if (!message.IsSenderFullyAddressed || !message.IsTargetFullyAddressed) + if (!IsFullyAddressed(message)) { return; } @@ -129,6 +131,10 @@ private void RecordMessage(Message message) } } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static bool IsFullyAddressed(Message message) => + message.IsSenderFullyAddressed && message.IsTargetFullyAddressed; + async ValueTask IActivationRepartitionerSystemTarget.FlushBuffers() { while (_pendingMessages.Count > 0) @@ -148,4 +154,4 @@ async ValueTask IActivationRepartitionerSystemTarget.FlushBuffers() Message = "{Service} has stopped." )] private static partial void LogTraceServiceStopped(ILogger logger, string service); -} \ No newline at end of file +} diff --git a/src/Orleans.Runtime/Placement/Repartitioning/RepartitionerMessageFilter.cs b/src/Orleans.Runtime/Placement/Repartitioning/RepartitionerMessageFilter.cs index 954af5497e0..9853fc76cc4 100644 --- a/src/Orleans.Runtime/Placement/Repartitioning/RepartitionerMessageFilter.cs +++ b/src/Orleans.Runtime/Placement/Repartitioning/RepartitionerMessageFilter.cs @@ -30,4 +30,4 @@ public bool IsAcceptable(Message message, out bool isSenderMigratable, out bool // If both are not migratable types we ignore this. But if one of them is not, then we allow passing, as we wish to move grains closer to them, as with any type of grain. return isSenderMigratable || isTargetMigratable; } -} \ No newline at end of file +} From 249509550663c90f7729ff3b3ae5ec811776d773 Mon Sep 17 00:00:00 2001 From: Da-Teach Date: Wed, 15 Oct 2025 23:55:25 +0200 Subject: [PATCH 24/34] Remove infinite timespan reminder checks (#9715) * Timeout.InfiniteTimeSpan is not allowed for reminders * Specifically check for InfiniteTimeSpan to make clear it's usage is not allowed (and to allow for an eventual future change of the InfiniteTimeSpan value) --------- Co-authored-by: Jerremy Koot --- .../ReminderService/ReminderRegistry.cs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/Orleans.Reminders/ReminderService/ReminderRegistry.cs b/src/Orleans.Reminders/ReminderService/ReminderRegistry.cs index 9bb6eb3f43b..0b7e89b7825 100644 --- a/src/Orleans.Reminders/ReminderService/ReminderRegistry.cs +++ b/src/Orleans.Reminders/ReminderService/ReminderRegistry.cs @@ -24,14 +24,19 @@ public ReminderRegistry(IServiceProvider serviceProvider, IOptions RegisterOrUpdateReminder(GrainId callingGrainId, string reminderName, TimeSpan dueTime, TimeSpan period) { - // Perform input volatility checks that are consistent with System.Threading.Timer - // http://referencesource.microsoft.com/#mscorlib/system/threading/timer.cs,c454f2afe745d4d3,references - if (dueTime.Ticks < 0 && dueTime != Timeout.InfiniteTimeSpan) + // Perform input volatility checks + if (dueTime == Timeout.InfiniteTimeSpan) + throw new ArgumentOutOfRangeException(nameof(dueTime), "Cannot use InfiniteTimeSpan dueTime to create a reminder"); + + if (dueTime.Ticks < 0) throw new ArgumentOutOfRangeException(nameof(dueTime), "Cannot use negative dueTime to create a reminder"); - - if (period.Ticks < 0 && period != Timeout.InfiniteTimeSpan) + + if (period == Timeout.InfiniteTimeSpan) + throw new ArgumentOutOfRangeException(nameof(period), "Cannot use InfiniteTimeSpan period to create a reminder"); + + if (period.Ticks < 0) throw new ArgumentOutOfRangeException(nameof(period), "Cannot use negative period to create a reminder"); - + var minReminderPeriod = options.MinimumReminderPeriod; if (period < minReminderPeriod) throw new ArgumentException($"Cannot register reminder {reminderName} as requested period ({period}) is less than minimum allowed reminder period ({minReminderPeriod})"); @@ -92,4 +97,4 @@ private static void ThrowInvalidContext() + " Ensure that you are only accessing grain functionality from within the context of a grain."); } } -} \ No newline at end of file +} From 53b86788b7208614caab907c91e4624a81a78d2b Mon Sep 17 00:00:00 2001 From: Reuben Bond <203839+ReubenBond@users.noreply.github.com> Date: Wed, 22 Oct 2025 15:57:42 -0700 Subject: [PATCH 25/34] ResponseCompletionSource: RunContinuationsAsynchronously (#9724) --- .../Invocation/ResponseCompletionSource.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Orleans.Serialization/Invocation/ResponseCompletionSource.cs b/src/Orleans.Serialization/Invocation/ResponseCompletionSource.cs index bdfabec2ff9..f622c701740 100644 --- a/src/Orleans.Serialization/Invocation/ResponseCompletionSource.cs +++ b/src/Orleans.Serialization/Invocation/ResponseCompletionSource.cs @@ -10,7 +10,7 @@ namespace Orleans.Serialization.Invocation /// public sealed class ResponseCompletionSource : IResponseCompletionSource, IValueTaskSource, IValueTaskSource { - private ManualResetValueTaskSourceCore _core; + private ManualResetValueTaskSourceCore _core = new() { RunContinuationsAsynchronously = true }; /// /// Returns this instance as a . @@ -113,7 +113,7 @@ void IValueTaskSource.GetResult(short token) /// The underlying result type. public sealed class ResponseCompletionSource : IResponseCompletionSource, IValueTaskSource, IValueTaskSource { - private ManualResetValueTaskSourceCore _core; + private ManualResetValueTaskSourceCore _core = new() { RunContinuationsAsynchronously = true }; /// /// Returns this instance as a . @@ -255,4 +255,4 @@ void IValueTaskSource.GetResult(short token) } } } -} \ No newline at end of file +} From c0c11fb85c7deaca1d5d89b6f2d1dcac962defe1 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 22 Oct 2025 15:59:09 -0700 Subject: [PATCH 26/34] chore(deps): bump dotnet-sdk from 9.0.305 to 9.0.306 (#9720) Bumps [dotnet-sdk](https://github.com/dotnet/sdk) from 9.0.305 to 9.0.306. - [Release notes](https://github.com/dotnet/sdk/releases) - [Commits](https://github.com/dotnet/sdk/compare/v9.0.305...v9.0.306) --- updated-dependencies: - dependency-name: dotnet-sdk dependency-version: 9.0.306 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- global.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/global.json b/global.json index 74e6143ef29..1db34c99d49 100644 --- a/global.json +++ b/global.json @@ -1,6 +1,6 @@ { "sdk": { "rollForward": "major", - "version": "9.0.305" + "version": "9.0.306" } } From 2f389d8a23e2355c25009b61fc8cb2c6244c53e8 Mon Sep 17 00:00:00 2001 From: Gutemberg Ribeiro Date: Wed, 22 Oct 2025 20:28:26 -0300 Subject: [PATCH 27/34] Add `BigInteger` codec (#9669) --- .../Cloning/IDeepCopier.cs | 1 + .../Codecs/BigIntegerCodec.cs | 84 +++++++++++++ .../BuiltInCodecTests.cs | 110 +++++++++++++++++- 3 files changed, 193 insertions(+), 2 deletions(-) create mode 100644 src/Orleans.Serialization/Codecs/BigIntegerCodec.cs diff --git a/src/Orleans.Serialization/Cloning/IDeepCopier.cs b/src/Orleans.Serialization/Cloning/IDeepCopier.cs index 18f5fa32b36..6c65c0bc134 100644 --- a/src/Orleans.Serialization/Cloning/IDeepCopier.cs +++ b/src/Orleans.Serialization/Cloning/IDeepCopier.cs @@ -289,6 +289,7 @@ internal static class ShallowCopyableTypes [typeof(UInt128)] = true, [typeof(Int128)] = true, #endif + [typeof(System.Numerics.BigInteger)] = true, #if NET5_0_OR_GREATER [typeof(Half)] = true, #endif diff --git a/src/Orleans.Serialization/Codecs/BigIntegerCodec.cs b/src/Orleans.Serialization/Codecs/BigIntegerCodec.cs new file mode 100644 index 00000000000..0d725e4041a --- /dev/null +++ b/src/Orleans.Serialization/Codecs/BigIntegerCodec.cs @@ -0,0 +1,84 @@ +using System; +using System.Buffers; +using System.Numerics; +using System.Runtime.CompilerServices; +using Orleans.Serialization.Buffers; +using Orleans.Serialization.WireProtocol; + +namespace Orleans.Serialization.Codecs; + +/// +/// Serializer for . +/// +[RegisterSerializer] +public sealed class BigIntegerCodec : IFieldCodec +{ + /// + void IFieldCodec.WriteField(ref Writer writer, uint fieldIdDelta, + Type expectedType, BigInteger value) + { + ReferenceCodec.MarkValueField(writer.Session); + writer.WriteFieldHeader(fieldIdDelta, expectedType, typeof(BigInteger), WireType.LengthPrefixed); + + WriteField(ref writer, value); + } + + /// + /// Writes a field without type info (expected type is statically known). + /// + /// The buffer writer type. + /// The writer. + /// The field identifier delta. + /// The value. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void WriteField(ref Writer writer, uint fieldIdDelta, BigInteger value) where TBufferWriter : IBufferWriter + { + ReferenceCodec.MarkValueField(writer.Session); + writer.WriteFieldHeaderExpected(fieldIdDelta, WireType.LengthPrefixed); + + WriteField(ref writer, value); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static void WriteField(ref Writer writer, BigInteger value) + where TBufferWriter : IBufferWriter + { + var byteCount = value.GetByteCount(); + writer.WriteVarUInt32((uint)byteCount); + + writer.EnsureContiguous(byteCount); + if (value.TryWriteBytes(writer.WritableSpan, out var bytesWritten)) + { + writer.AdvanceSpan(bytesWritten); + } + else + { + writer.Write(value.ToByteArray()); + } + } + + /// + BigInteger IFieldCodec.ReadValue(ref Reader reader, Field field) => ReadValue(ref reader, field); + + /// + /// Reads a value. + /// + /// The reader input type. + /// The reader. + /// The field. + /// The value. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static BigInteger ReadValue(ref Reader reader, Field field) + { + ReferenceCodec.MarkValueField(reader.Session); + + if (field.WireType != WireType.LengthPrefixed) + { + throw new UnexpectedLengthPrefixValueException(nameof(BigInteger), 0, 0); + } + + var length = reader.ReadVarUInt32(); + var bytes = reader.ReadBytes(length); + return new BigInteger(bytes); + } +} diff --git a/test/Orleans.Serialization.UnitTests/BuiltInCodecTests.cs b/test/Orleans.Serialization.UnitTests/BuiltInCodecTests.cs index db60ad39c79..9de099994ab 100644 --- a/test/Orleans.Serialization.UnitTests/BuiltInCodecTests.cs +++ b/test/Orleans.Serialization.UnitTests/BuiltInCodecTests.cs @@ -15,6 +15,7 @@ using System.Collections.Specialized; using System.Linq; using System.Net; +using System.Numerics; using Xunit; using Microsoft.FSharp.Collections; using Xunit.Abstractions; @@ -50,7 +51,7 @@ public enum MyEnum : short /// - Collection types (List, Dictionary, arrays, etc.) /// - Nullable types /// - Enums - /// + /// /// These codecs are designed for: /// - High performance with minimal allocations /// - Compact binary representation @@ -1339,6 +1340,111 @@ public class UInt128CopierTests(ITestOutputHelper output) : CopierTester> ValueProvider => assert => Gen.ULong.Select(Gen.ULong).Sample(value => assert(new (value.V0, value.V1))); } + + public class BigIntegerCodecTests(ITestOutputHelper output) : FieldCodecTester(output) + { + // New behavior in .NET 9: https://learn.microsoft.com/en-us/dotnet/core/compatibility/core-libraries/9.0/biginteger-limit#new-behavior + protected override int[] MaxSegmentSizes => [(2^31)-1]; + + protected override BigInteger CreateValue() + { + var bytes = new byte[Random.Next(1, 100)]; + Random.NextBytes(bytes); + return new BigInteger(bytes); + } + + protected override BigInteger[] TestValues => + [ + BigInteger.Zero, + BigInteger.One, + BigInteger.MinusOne, + new BigInteger(byte.MaxValue), + new BigInteger(byte.MaxValue) + 1, + new BigInteger(ushort.MaxValue), + new BigInteger(ushort.MaxValue) + 1, + new BigInteger(uint.MaxValue), + new BigInteger(uint.MaxValue) + 1, + new BigInteger(ulong.MaxValue), + new BigInteger(ulong.MaxValue) + 1, + (BigInteger)Int128.MaxValue, + (BigInteger)Int128.MaxValue + 1, + (BigInteger)UInt128.MaxValue, + (BigInteger)UInt128.MaxValue + 1, + -new BigInteger(byte.MaxValue), + -new BigInteger(ushort.MaxValue), + -new BigInteger(uint.MaxValue), + -new BigInteger(ulong.MaxValue), + -(BigInteger)Int128.MaxValue, + -(BigInteger)UInt128.MaxValue, + BigInteger.Parse("123456789012345678901234567890123456789012345678901234567890"), + BigInteger.Parse("-123456789012345678901234567890123456789012345678901234567890"), + ]; + + protected override Action> ValueProvider => assert => + { + Gen.Byte.Array.Sample(bytes => + { + if (bytes.Length > 0) + { + assert(new BigInteger(bytes)); + } + }); + }; + } + + public class BigIntegerCopierTests(ITestOutputHelper output) : CopierTester>(output) + { + protected override bool IsImmutable => true; + + protected override BigInteger CreateValue() + { + var bytes = new byte[Random.Next(1, 100)]; + Random.NextBytes(bytes); + return new BigInteger(bytes); + } + + protected override BigInteger[] TestValues => + [ + BigInteger.Zero, + BigInteger.One, + BigInteger.MinusOne, + new BigInteger(byte.MaxValue), + new BigInteger(byte.MaxValue) + 1, + new BigInteger(ushort.MaxValue), + new BigInteger(ushort.MaxValue) + 1, + new BigInteger(uint.MaxValue), + new BigInteger(uint.MaxValue) + 1, + new BigInteger(ulong.MaxValue), + new BigInteger(ulong.MaxValue) + 1, +#if NET7_0_OR_GREATER + (BigInteger)Int128.MaxValue, + (BigInteger)Int128.MaxValue + 1, + (BigInteger)UInt128.MaxValue, + (BigInteger)UInt128.MaxValue + 1, +#endif + -new BigInteger(byte.MaxValue), + -new BigInteger(ushort.MaxValue), + -new BigInteger(uint.MaxValue), + -new BigInteger(ulong.MaxValue), +#if NET7_0_OR_GREATER + -(BigInteger)Int128.MaxValue, + -(BigInteger)UInt128.MaxValue, +#endif + BigInteger.Parse("123456789012345678901234567890123456789012345678901234567890"), + BigInteger.Parse("-123456789012345678901234567890123456789012345678901234567890"), + ]; + + protected override Action> ValueProvider => assert => + { + Gen.Byte.Array.Sample(bytes => + { + if (bytes.Length > 0) + { + assert(new BigInteger(bytes)); + } + }); + }; + } #endif public class UInt64CodecTests(ITestOutputHelper output) : FieldCodecTester(output) @@ -3913,4 +4019,4 @@ public class CancellationTokenCopierTests(ITestOutputHelper output) : CopierTest new CancellationToken(true) ]; } -} \ No newline at end of file +} From a226ad59b4ebbbbaa07ddfbab398e7974082ee09 Mon Sep 17 00:00:00 2001 From: Reuben Bond <203839+ReubenBond@users.noreply.github.com> Date: Wed, 22 Oct 2025 20:06:32 -0700 Subject: [PATCH 28/34] StateMachineManager: start work loop during activation (#9725) --- src/Orleans.Journaling/StateMachineManager.cs | 11 +++++++---- test/Orleans.Journaling.Tests/DurableQueueTests.cs | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs index bb414ca49a3..8ad565aa376 100644 --- a/src/Orleans.Journaling/StateMachineManager.cs +++ b/src/Orleans.Journaling/StateMachineManager.cs @@ -28,7 +28,7 @@ internal sealed partial class StateMachineManager : IStateMachineManager, ILifec private readonly StateMachineManagerState _stateMachineIds; private readonly StateMachinesRetirementTracker _retirementTracker; private readonly TimeSpan _retirementGracePeriod; - private readonly Task _workLoop; + private Task? _workLoop; private ManagerState _state; private Task? _pendingWrite; private ulong _nextStateMachineId = MinApplicationStateMachineId; @@ -52,8 +52,6 @@ public StateMachineManager( _stateMachinesMap[0] = _stateMachineIds; _retirementTracker = new StateMachinesRetirementTracker(this, StringCodec, DateTimeCodec, serializerSessionPool); - - _workLoop = Start(); } public void RegisterStateMachine(string name, IDurableStateMachine stateMachine) @@ -102,6 +100,8 @@ public async ValueTask InitializeAsync(CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); _shutdownCancellation.Token.ThrowIfCancellationRequested(); + Debug.Assert(_workLoop is null, "InitializeAsync should only be called once."); + _workLoop = Start(); Task task; lock (_lock) @@ -472,7 +472,10 @@ async Task ILifecycleObserver.OnStop(CancellationToken cancellationToken) { _shutdownCancellation.Cancel(); _workSignal.Signal(); - await _workLoop.WaitAsync(cancellationToken).ConfigureAwait(ConfigureAwaitOptions.ContinueOnCapturedContext | ConfigureAwaitOptions.SuppressThrowing); + if (_workLoop is { } task) + { + await task.WaitAsync(cancellationToken).ConfigureAwait(ConfigureAwaitOptions.ContinueOnCapturedContext | ConfigureAwaitOptions.SuppressThrowing); + } } void IDisposable.Dispose() diff --git a/test/Orleans.Journaling.Tests/DurableQueueTests.cs b/test/Orleans.Journaling.Tests/DurableQueueTests.cs index 24b23bdb3b9..f1aeb9a32c7 100644 --- a/test/Orleans.Journaling.Tests/DurableQueueTests.cs +++ b/test/Orleans.Journaling.Tests/DurableQueueTests.cs @@ -187,8 +187,8 @@ public async Task DurableQueue_EmptyQueueOperations_Test() var manager = sut.Manager; var codec = CodecProvider.GetCodec(); var queue = new DurableQueue("emptyQueue", manager, codec, SessionPool); - await manager.WriteStateAsync(CancellationToken.None); await sut.Lifecycle.OnStart(); + await manager.WriteStateAsync(CancellationToken.None); // Assert Assert.Empty(queue); From e256d97752aa7d9ba1e5a8c6859e975f736b096b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20B=C3=BCy=C3=BCkatl=C4=B1?= Date: Thu, 23 Oct 2025 06:37:53 +0300 Subject: [PATCH 29/34] Flag PostgreSQL as asynchronous (#9705) Changed isSynchronousAdoNetImplementation from true to false for PostgreSQL thanks to updates in Npgsql 8.0. https://github.com/npgsql/npgsql/issues/1130 --- src/AdoNet/Shared/Storage/DbConstantsStore.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/AdoNet/Shared/Storage/DbConstantsStore.cs b/src/AdoNet/Shared/Storage/DbConstantsStore.cs index 7f676e9658d..e5cba79afdb 100644 --- a/src/AdoNet/Shared/Storage/DbConstantsStore.cs +++ b/src/AdoNet/Shared/Storage/DbConstantsStore.cs @@ -44,7 +44,7 @@ internal static class DbConstantsStore startEscapeIndicator: '"', endEscapeIndicator: '"', unionAllSelectTemplate: " UNION ALL SELECT ", - isSynchronousAdoNetImplementation: true, //there are some intermittent PostgreSQL problems too, see more discussion at https://github.com/dotnet/orleans/pull/2949. + isSynchronousAdoNetImplementation: false, supportsStreamNatively: true, supportsCommandCancellation: true, // See https://dev.mysql.com/doc/connector-net/en/connector-net-ref-mysqlclient-mysqlcommandmembers.html. commandInterceptor: NoOpCommandInterceptor.Instance) From f9eb0c738f05ba68eb657110b4d901bcea9e7146 Mon Sep 17 00:00:00 2001 From: Reuben Bond <203839+ReubenBond@users.noreply.github.com> Date: Wed, 22 Oct 2025 20:46:49 -0700 Subject: [PATCH 30/34] Enable GitHub merge queue (#9727) Enable merge queue --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6f5c6db1ca1..472494e73f8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,6 +7,9 @@ on: pull_request: branches: - main + merge_group: + types: + - checks_requested env: DOTNET_SKIP_FIRST_TIME_EXPERIENCE: 1 DOTNET_NOLOGO: true From 862181273a409cfae4e21a2e01f4209371c529c4 Mon Sep 17 00:00:00 2001 From: Gutemberg Ribeiro Date: Thu, 23 Oct 2025 20:24:46 -0300 Subject: [PATCH 31/34] Add missing validation for the NATS stream provider (#9668) Add missing validation for the Nats stream provider --- .../Hosting/NatsStreamConfigurator.cs | 14 +++++++++----- src/Orleans.Streaming.NATS/NatsOptions.cs | 15 ++++++++++++++- .../Providers/NatsConnectionManager.cs | 2 ++ 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/Orleans.Streaming.NATS/Hosting/NatsStreamConfigurator.cs b/src/Orleans.Streaming.NATS/Hosting/NatsStreamConfigurator.cs index 16406858e3d..7c485924cdc 100644 --- a/src/Orleans.Streaming.NATS/Hosting/NatsStreamConfigurator.cs +++ b/src/Orleans.Streaming.NATS/Hosting/NatsStreamConfigurator.cs @@ -13,9 +13,11 @@ public SiloNatsStreamConfigurator(string name, Action { this.ConfigureDelegate(services => { - services.ConfigureNamedOptionForLogging(name) + services + .ConfigureNamedOptionForLogging(name) .ConfigureNamedOptionForLogging(name) - .ConfigureNamedOptionForLogging(name); + .ConfigureNamedOptionForLogging(name) + .AddTransient(sp => new NatsStreamOptionsValidator(sp.GetRequiredService>().Get(name), name)); }); } @@ -48,8 +50,10 @@ public ClusterClientNatsStreamConfigurator(string name, IClientBuilder builder) builder .ConfigureServices(services => { - services.ConfigureNamedOptionForLogging(name) - .ConfigureNamedOptionForLogging(name); + services + .ConfigureNamedOptionForLogging(name) + .ConfigureNamedOptionForLogging(name) + .AddTransient(sp => new NatsStreamOptionsValidator(sp.GetRequiredService>().Get(name), name)); }); } @@ -66,4 +70,4 @@ public ClusterClientNatsStreamConfigurator ConfigurePartitioning( ob.Configure(options => options.TotalQueueCount = numOfparitions)); return this; } -} \ No newline at end of file +} diff --git a/src/Orleans.Streaming.NATS/NatsOptions.cs b/src/Orleans.Streaming.NATS/NatsOptions.cs index c55707774d3..ae84d882d3d 100644 --- a/src/Orleans.Streaming.NATS/NatsOptions.cs +++ b/src/Orleans.Streaming.NATS/NatsOptions.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using Orleans.Runtime; using NATS.Client.Core; namespace Orleans.Streaming.NATS; @@ -49,5 +50,17 @@ public class NatsOptions /// /// System.Text.Json serializer options to be used by the NATS provider. /// - public JsonSerializerOptions JsonSerializerOptions { get; set; } = default!; + public JsonSerializerOptions? JsonSerializerOptions { get; set; } +} + +public class NatsStreamOptionsValidator(NatsOptions options, string? name = null) : IConfigurationValidator +{ + public void ValidateConfiguration() + { + if (string.IsNullOrWhiteSpace(options.StreamName)) + { + throw new OrleansConfigurationException( + $"The {nameof(NatsOptions.StreamName)} is required for the NATS stream provider '{name}'."); + } + } } diff --git a/src/Orleans.Streaming.NATS/Providers/NatsConnectionManager.cs b/src/Orleans.Streaming.NATS/Providers/NatsConnectionManager.cs index 2c4e41fe575..76a19ed7cb0 100644 --- a/src/Orleans.Streaming.NATS/Providers/NatsConnectionManager.cs +++ b/src/Orleans.Streaming.NATS/Providers/NatsConnectionManager.cs @@ -32,7 +32,9 @@ public NatsConnectionManager(string providerName, ILoggerFactory loggerFactory, this._loggerFactory = loggerFactory; this._logger = this._loggerFactory.CreateLogger(); this._options = options; + this._options.JsonSerializerOptions ??= new(); this._options.JsonSerializerOptions.TypeInfoResolverChain.Add(NatsSerializerContext.Default); + if (this._options.NatsClientOptions is null) { this._options.NatsClientOptions = NatsOpts.Default with From 420da385d675d7257857affbe8188d7690c4cd45 Mon Sep 17 00:00:00 2001 From: Reuben Bond <203839+ReubenBond@users.noreply.github.com> Date: Thu, 23 Oct 2025 19:34:05 -0700 Subject: [PATCH 32/34] `ActivationDataActivatorProvider`: run grain constructor on grain scheduler (#9726) * ActivationDataActivatorProvider: always run grain constructor on ActivationTaskScheduler * ActivationDataActivatorProvider: always run grain constructor on ActivationTaskScheduler --- .../ActivationDataActivatorProvider.cs | 39 ++++++------- src/Orleans.Runtime/Catalog/ActivationData.cs | 38 ++++++++++++- src/Orleans.Runtime/Catalog/Catalog.cs | 57 +++++++++++++------ .../Scheduler/ActivationTaskScheduler.cs | 6 +- .../Scheduler/TaskSchedulerUtils.cs | 2 +- .../Scheduler/WorkItemGroup.cs | 2 +- 6 files changed, 96 insertions(+), 48 deletions(-) diff --git a/src/Orleans.Runtime/Activation/ActivationDataActivatorProvider.cs b/src/Orleans.Runtime/Activation/ActivationDataActivatorProvider.cs index 3dfda5e57f6..204b4974860 100644 --- a/src/Orleans.Runtime/Activation/ActivationDataActivatorProvider.cs +++ b/src/Orleans.Runtime/Activation/ActivationDataActivatorProvider.cs @@ -1,9 +1,15 @@ +#nullable enable using System; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.ExceptionServices; +using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Orleans.Configuration; using Orleans.GrainReferences; using Orleans.Metadata; +using Orleans.Runtime.Internal; using Orleans.Runtime.Scheduler; namespace Orleans.Runtime @@ -45,7 +51,7 @@ public ActivationDataActivatorProvider( _grainReferenceActivator = grainReferenceActivator; } - public bool TryGet(GrainType grainType, out IGrainContextActivator activator) + public bool TryGet(GrainType grainType, [NotNullWhen(true)] out IGrainContextActivator? activator) { if (!_grainClassMap.TryGetGrainClass(grainType, out var grainClass) || !typeof(IGrain).IsAssignableFrom(grainClass)) { @@ -91,6 +97,7 @@ private partial class ActivationDataActivator : IGrainContextActivator private readonly GrainTypeSharedContext _sharedComponents; private readonly ILogger _grainLogger; private readonly Func _createWorkItemGroup; + private readonly Action _startActivation; public ActivationDataActivator( IGrainActivator grainActivator, @@ -113,6 +120,7 @@ public ActivationDataActivator( _workItemGroupLogger, _activationTaskSchedulerLogger, _schedulingOptions); + _startActivation = state => ((ActivationData)state!).Start(_grainActivator); } public IGrainContext CreateContext(GrainAddress activationAddress) @@ -123,32 +131,21 @@ public IGrainContext CreateContext(GrainAddress activationAddress) _serviceProvider, _sharedComponents); - RuntimeContext.SetExecutionContext(context, out var originalContext); - - try - { - // Instantiate the grain itself - var instance = _grainActivator.CreateInstance(context); - context.SetGrainInstance(instance); - } - catch (Exception exception) - { - LogErrorFailedToConstructGrain(_grainLogger, exception, activationAddress.GrainId); - throw; - } - finally - { - RuntimeContext.ResetExecutionContext(originalContext); - } - + using var ecSuppressor = ExecutionContext.SuppressFlow(); + _ = Task.Factory.StartNew( + _startActivation, + context, + CancellationToken.None, + TaskCreationOptions.DenyChildAttach, + context.ActivationTaskScheduler); return context; } [LoggerMessage( Level = LogLevel.Error, - Message = "Failed to construct grain '{GrainId}'." + Message = "Failed to dispose grain '{GrainId}'." )] - private static partial void LogErrorFailedToConstructGrain(ILogger logger, Exception exception, GrainId grainId); + private static partial void LogErrorFailedToDisposeGrain(ILogger logger, Exception exception, GrainId grainId); } } diff --git a/src/Orleans.Runtime/Catalog/ActivationData.cs b/src/Orleans.Runtime/Catalog/ActivationData.cs index 7c436b039ff..bcbe92279f6 100644 --- a/src/Orleans.Runtime/Catalog/ActivationData.cs +++ b/src/Orleans.Runtime/Catalog/ActivationData.cs @@ -27,6 +27,7 @@ namespace Orleans.Runtime; /// MUST lock this object for any concurrent access /// Consider: compartmentalize by usage, e.g., using separate interfaces for data for catalog, etc. /// +[DebuggerDisplay("GrainId = {GrainId}, State = {State}, Waiting = {WaitingCount}, Executing = {IsCurrentlyExecuting}")] internal sealed partial class ActivationData : IGrainContext, ICollectibleGrainContext, @@ -62,7 +63,7 @@ internal sealed partial class ActivationData : // The task representing this activation's message loop. // This field is assigned and never read and exists only for debugging purposes (eg, in memory dumps, to associate a loop task with an activation). #pragma warning disable IDE0052 // Remove unread private members - private readonly Task _messageLoopTask; + private Task? _messageLoopTask; #pragma warning restore IDE0052 // Remove unread private members public ActivationData( @@ -81,9 +82,28 @@ public ActivationData( Debug.Assert(_serviceScope != null, "_serviceScope must not be null."); _workItemGroup = createWorkItemGroup(this); Debug.Assert(_workItemGroup != null, "_workItemGroup must not be null."); - _messageLoopTask = this.RunOrQueueTask(RunMessageLoop); } + public void Start(IGrainActivator grainActivator) + { + Debug.Assert(Equals(ActivationTaskScheduler, TaskScheduler.Current)); + lock (this) + { + try + { + var instance = grainActivator.CreateInstance(this); + SetGrainInstance(instance); + } + catch (Exception exception) + { + Deactivate(new(DeactivationReasonCode.ActivationFailed, exception, "Error constructing grain instance."), CancellationToken.None); + } + + _messageLoopTask = RunMessageLoop(); + } + } + + public TaskScheduler ActivationTaskScheduler => _workItemGroup.TaskScheduler; public IGrainRuntime GrainRuntime => _shared.Runtime; public object? GrainInstance { get; private set; } public GrainAddress Address { get; private set; } @@ -914,7 +934,7 @@ public TExtensionInterface GetExtension() } var implementation = ActivationServices.GetKeyedService(typeof(TExtensionInterface)); - if (!(implementation is TExtensionInterface typedResult)) + if (implementation is not TExtensionInterface typedResult) { throw new GrainExtensionNotInstalledException($"No extension of type {typeof(TExtensionInterface)} is installed on this instance and no implementations are registered for automated install"); } @@ -1510,6 +1530,12 @@ public void Activate(Dictionary? requestContext, CancellationTok private async Task ActivateAsync(Dictionary? requestContextData, CancellationToken cancellationToken) { + if (State != ActivationState.Creating) + { + LogIgnoringActivateAttempt(_shared.Logger, this, State); + return; + } + // A chain of promises that will have to complete in order to complete the activation // Register with the grain directory and call the Activate method on the new activation. try @@ -2325,6 +2351,12 @@ private readonly struct ActivationDataLogValue(ActivationData activation, bool i Message = "Failed to register grain {Grain} in grain directory")] private static partial void LogFailedToRegisterGrain(ILogger logger, Exception exception, ActivationData grain); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Ignoring activation request for {Grain} because this grain is in the '{State}' state")] + private static partial void LogIgnoringActivateAttempt(ILogger logger, ActivationData grain, ActivationState state); + [LoggerMessage( EventId = (int)ErrorCode.Catalog_BeforeCallingActivate, Level = LogLevel.Debug, diff --git a/src/Orleans.Runtime/Catalog/Catalog.cs b/src/Orleans.Runtime/Catalog/Catalog.cs index 0116846783f..5a864e52b74 100644 --- a/src/Orleans.Runtime/Catalog/Catalog.cs +++ b/src/Orleans.Runtime/Catalog/Catalog.cs @@ -21,6 +21,11 @@ internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecyclePartic private readonly GrainContextActivator grainActivator; private ISiloStatusOracle _siloStatusOracle; + // Lock striping is used for activation creation to reduce contention + private const int LockCount = 32; // Must be a power of 2 + private const int LockMask = LockCount - 1; + private readonly object[] _locks = new object[LockCount]; + public Catalog( ILocalSiloDetails localSiloDetails, GrainDirectoryResolver grainDirectoryResolver, @@ -40,6 +45,12 @@ public Catalog( this.logger = loggerFactory.CreateLogger(); this.activationCollector = activationCollector; + // Initialize lock striping array + for (var i = 0; i < LockCount; i++) + { + _locks[i] = new object(); + } + GC.GetTotalMemory(true); // need to call once w/true to ensure false returns OK value MessagingProcessingInstruments.RegisterActivationDataAllObserve(() => @@ -58,6 +69,19 @@ public Catalog( shared.ActivationDirectory.RecordNewTarget(this); } + /// + /// Gets the lock for a specific grain ID using consistent hashing. + /// + /// The grain ID to get the lock for. + /// The lock object for the specified grain ID. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private object GetStripedLock(in GrainId grainId) + { + var hash = grainId.GetUniformHashCode(); + var lockIndex = (int)(hash & LockMask); + return _locks[lockIndex]; + } + /// /// Unregister message target and stop delivering messages to it /// @@ -115,8 +139,7 @@ public IGrainContext GetOrCreateActivation( return null; } - // Lock over all activations to try to prevent multiple instances of the same activation being created concurrently. - lock (activations) + lock (GetStripedLock(grainId)) { if (TryGetGrainContext(grainId, out result)) { @@ -298,24 +321,22 @@ internal void OnSiloStatusChange(ILocalGrainDirectory directory, SiloAddress upd try { // scan all activations in activation directory and deactivate the ones that the removed silo is their primary partition owner. - lock (activations) + // Note: No lock needed here since ActivationDirectory uses ConcurrentDictionary which provides thread-safe enumeration + foreach (var activation in activations) { - foreach (var activation in activations) + try + { + var activationData = activation.Value; + var placementStrategy = activationData.GetComponent(); + var isUsingGrainDirectory = placementStrategy is { IsUsingGrainDirectory: true }; + if (!isUsingGrainDirectory || !grainDirectoryResolver.IsUsingDefaultDirectory(activationData.GrainId.Type)) continue; + if (!updatedSilo.Equals(directory.GetPrimaryForGrain(activationData.GrainId))) continue; + + activationsToShutdown.Add(activationData); + } + catch (Exception exc) { - try - { - var activationData = activation.Value; - var placementStrategy = activationData.GetComponent(); - var isUsingGrainDirectory = placementStrategy is { IsUsingGrainDirectory: true }; - if (!isUsingGrainDirectory || !grainDirectoryResolver.IsUsingDefaultDirectory(activationData.GrainId.Type)) continue; - if (!updatedSilo.Equals(directory.GetPrimaryForGrain(activationData.GrainId))) continue; - - activationsToShutdown.Add(activationData); - } - catch (Exception exc) - { - LogErrorCatalogSiloStatusChangeNotification(new(updatedSilo), exc); - } + LogErrorCatalogSiloStatusChangeNotification(new(updatedSilo), exc); } } diff --git a/src/Orleans.Runtime/Scheduler/ActivationTaskScheduler.cs b/src/Orleans.Runtime/Scheduler/ActivationTaskScheduler.cs index eaabb33614d..b16cdb7c5a2 100644 --- a/src/Orleans.Runtime/Scheduler/ActivationTaskScheduler.cs +++ b/src/Orleans.Runtime/Scheduler/ActivationTaskScheduler.cs @@ -12,7 +12,7 @@ namespace Orleans.Runtime.Scheduler /// /// A single-concurrency, in-order task scheduler for per-activation work scheduling. /// - [DebuggerDisplay("ActivationTaskScheduler-{myId} RunQueue={workerGroup.WorkItemCount}")] + [DebuggerDisplay("ActivationTaskScheduler RunQueue={workerGroup.ExternalWorkItemCount} GrainContext={workerGroup.GrainContext}")] internal sealed partial class ActivationTaskScheduler : TaskScheduler { private readonly ILogger logger; @@ -98,13 +98,11 @@ protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQu #endif // Try to run the task. bool done = TryExecuteTask(task); +#if DEBUG if (!done) { -#if DEBUG LogWarnTryExecuteTaskNotDone(task.Id, task.Status); -#endif } -#if DEBUG LogTraceTryExecuteTaskInlineCompleted(myId, task.Id, Thread.CurrentThread.ManagedThreadId, done); #endif diff --git a/src/Orleans.Runtime/Scheduler/TaskSchedulerUtils.cs b/src/Orleans.Runtime/Scheduler/TaskSchedulerUtils.cs index 0655a108f3b..b82d62c27a7 100644 --- a/src/Orleans.Runtime/Scheduler/TaskSchedulerUtils.cs +++ b/src/Orleans.Runtime/Scheduler/TaskSchedulerUtils.cs @@ -10,7 +10,7 @@ internal static class TaskSchedulerUtils [MethodImpl(MethodImplOptions.AggressiveInlining)] public static void QueueAction(this ActivationTaskScheduler taskScheduler, Action action) { - using var suppressExecutionContext = new ExecutionContextSuppressor(); + using var suppressExecutionContext = new ExecutionContextSuppressor(); var task = new Task(action); task.Start(taskScheduler); diff --git a/src/Orleans.Runtime/Scheduler/WorkItemGroup.cs b/src/Orleans.Runtime/Scheduler/WorkItemGroup.cs index 81cc76eae07..fe77c6adb8b 100644 --- a/src/Orleans.Runtime/Scheduler/WorkItemGroup.cs +++ b/src/Orleans.Runtime/Scheduler/WorkItemGroup.cs @@ -13,7 +13,7 @@ namespace Orleans.Runtime.Scheduler; -[DebuggerDisplay("WorkItemGroup Context={GrainContext} State={state}")] +[DebuggerDisplay("WorkItemGroup Context={GrainContext} State={_state}")] internal sealed class WorkItemGroup : IThreadPoolWorkItem, IWorkItemScheduler { private enum WorkGroupStatus : byte From 3847b581d5a35156978314035a03a818da4b03eb Mon Sep 17 00:00:00 2001 From: Ramzi Mourtada Date: Sun, 26 Oct 2025 11:36:44 -0700 Subject: [PATCH 33/34] Modifying Endpoint to use Pascale Casing as EndPoint for consistency (#9671) Co-authored-by: Ramzi Mourtada --- src/Orleans.Core/Messaging/GatewayManager.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Orleans.Core/Messaging/GatewayManager.cs b/src/Orleans.Core/Messaging/GatewayManager.cs index 4d769920758..5ea0ab851b2 100644 --- a/src/Orleans.Core/Messaging/GatewayManager.cs +++ b/src/Orleans.Core/Messaging/GatewayManager.cs @@ -410,9 +410,9 @@ private readonly struct UrisLogValue(IList uris) [LoggerMessage( Level = LogLevel.Information, - Message = "Closing connection to '{Endpoint}' because it has been marked as dead." + Message = "Closing connection to '{EndPoint}' because it has been marked as dead." )] - private static partial void LogClosingConnectionToDeadGateway(ILogger logger, SiloAddress endpoint); + private static partial void LogClosingConnectionToDeadGateway(ILogger logger, SiloAddress endPoint); [LoggerMessage( EventId = (int)ErrorCode.GatewayManager_AllGatewaysDead, From ac665c5db2784d7bb95e7f3ee71eb38691db096b Mon Sep 17 00:00:00 2001 From: Ledjon Behluli Date: Sun, 26 Oct 2025 23:11:07 +0100 Subject: [PATCH 34/34] treat retirement tracker as a first-class (internal) machine --- src/Orleans.Journaling/StateMachineManager.cs | 16 ++++++++++------ .../StateMachineManagerOptions.cs | 2 +- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs index 8ad565aa376..288d2f4e44e 100644 --- a/src/Orleans.Journaling/StateMachineManager.cs +++ b/src/Orleans.Journaling/StateMachineManager.cs @@ -49,9 +49,12 @@ public StateMachineManager( // The list of known state machines is itself stored as a durable state machine with the implicit id 0. // This allows us to recover the list of state machines ids without having to store it separately. _stateMachineIds = new StateMachineManagerState(this, StringCodec, UInt64Codec, serializerSessionPool); - _stateMachinesMap[0] = _stateMachineIds; + _stateMachinesMap[StateMachineManagerState.Id] = _stateMachineIds; + // The retirement tracker is a special internal state machine with a fixed id. + // It is not stored in _stateMachineIds and does not participate in the general name->id mapping. _retirementTracker = new StateMachinesRetirementTracker(this, StringCodec, DateTimeCodec, serializerSessionPool); + _stateMachinesMap[StateMachinesRetirementTracker.Id] = _retirementTracker; } public void RegisterStateMachine(string name, IDurableStateMachine stateMachine) @@ -69,7 +72,7 @@ public void RegisterStateMachine(string name, IDurableStateMachine stateMachine) // log during recovery but has not been re-registered. We effectively are "staging" the resurrection of the machine. // The removal from the tracker is handled within the serialized loop. This is to prevent logical race conditions with the recovery process. // We also make sure to apply any buffered data that could have occured while the vessel took this machine's place. - stateMachine.Reset(new StateMachineLogWriter(this, new(_stateMachineIds[name]))); + stateMachine.Reset(new StateMachineLogWriter(this, new(_stateMachineIds[name]))); foreach (var entry in vessel.BufferedData) { stateMachine.Apply(new ReadOnlySequence(entry)); @@ -266,7 +269,7 @@ private async Task WorkLoop() if (!_stateMachineIds.ContainsKey(name)) { // Doing so will trigger a reset, since _stateMachineIds will call OnSetStateMachineId, which resets the state machine in question. - _stateMachineIds[name] = name == StateMachinesRetirementTracker.Name ? StateMachinesRetirementTracker.Id : _nextStateMachineId++; + _stateMachineIds[name] = _nextStateMachineId++; } } } @@ -537,9 +540,11 @@ private sealed class StateMachineManagerState( IFieldCodec valueCodec, SerializerSessionPool serializerSessionPool) : DurableDictionary(keyCodec, valueCodec, serializerSessionPool) { + public const int Id = 0; + private readonly StateMachineManager _manager = manager; - public void ResetVolatileState() => ((IDurableStateMachine)this).Reset(new StateMachineLogWriter(_manager, new(0))); + public void ResetVolatileState() => ((IDurableStateMachine)this).Reset(new StateMachineLogWriter(_manager, new(Id))); protected override void OnSet(string key, ulong value) => _manager.OnSetStateMachineId(key, value); } @@ -550,10 +555,9 @@ private sealed class StateMachineManagerState( /// Resurrecting of retired machines is supported. private sealed class StateMachinesRetirementTracker( StateMachineManager manager, IFieldCodec keyCodec, IFieldCodec valueCodec, SerializerSessionPool sessionPool) - : DurableDictionary(Name, manager, keyCodec, valueCodec, sessionPool) + : DurableDictionary(keyCodec, valueCodec, sessionPool) { public const int Id = 1; - public const string Name = "orleans_retirement_tracker"; private readonly StateMachineLogWriter _logWriter = new(manager, new(Id)); diff --git a/src/Orleans.Journaling/StateMachineManagerOptions.cs b/src/Orleans.Journaling/StateMachineManagerOptions.cs index 9fa9e7c8914..9a491a502a3 100644 --- a/src/Orleans.Journaling/StateMachineManagerOptions.cs +++ b/src/Orleans.Journaling/StateMachineManagerOptions.cs @@ -22,5 +22,5 @@ public sealed class StateMachineManagerOptions /// /// The default value of . /// - public static readonly TimeSpan DEFAULT_RETIREMENT_GRACE_PERIOD = TimeSpan.FromHours(1); + public static readonly TimeSpan DEFAULT_RETIREMENT_GRACE_PERIOD = TimeSpan.FromDays(7); }