From 1e1ed1e12dfcbfc1ce2469445da51a6ef44015f4 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Wed, 22 Oct 2025 12:55:45 -0700 Subject: [PATCH 1/2] StateMachineManager: start work loop during activation --- src/Orleans.Journaling/StateMachineManager.cs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Orleans.Journaling/StateMachineManager.cs b/src/Orleans.Journaling/StateMachineManager.cs index 6f62ab9d936..e5058c14bc1 100644 --- a/src/Orleans.Journaling/StateMachineManager.cs +++ b/src/Orleans.Journaling/StateMachineManager.cs @@ -22,7 +22,7 @@ internal sealed partial class StateMachineManager : IStateMachineManager, ILifec private readonly Queue _workQueue = new(); private readonly CancellationTokenSource _shutdownCancellation = new(); private readonly StateMachineManagerState _stateMachineIds; - private readonly Task _workLoop; + private Task? _workLoop; private ManagerState _state; private Task? _pendingWrite; private ulong _nextStateMachineId = MinApplicationStateMachineId; @@ -40,8 +40,6 @@ public StateMachineManager( // This allows us to recover the list of state machines ids without having to store it separately. _stateMachineIds = new StateMachineManagerState(this, StringCodec, UInt64Codec, serializerSessionPool); _stateMachinesMap[0] = _stateMachineIds; - - _workLoop = Start(); } public void RegisterStateMachine(string name, IDurableStateMachine stateMachine) @@ -65,6 +63,8 @@ public async ValueTask InitializeAsync(CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); _shutdownCancellation.Token.ThrowIfCancellationRequested(); + Debug.Assert(_workLoop is null, "InitializeAsync should only be called once."); + _workLoop = Start(); Task task; lock (_lock) @@ -379,7 +379,10 @@ async Task ILifecycleObserver.OnStop(CancellationToken cancellationToken) { _shutdownCancellation.Cancel(); _workSignal.Signal(); - await _workLoop.WaitAsync(cancellationToken).ConfigureAwait(ConfigureAwaitOptions.ContinueOnCapturedContext | ConfigureAwaitOptions.SuppressThrowing); + if (_workLoop is { } task) + { + await task.WaitAsync(cancellationToken).ConfigureAwait(ConfigureAwaitOptions.ContinueOnCapturedContext | ConfigureAwaitOptions.SuppressThrowing); + } } void IDisposable.Dispose() From d1c74d76d5314a4811ca487798815c16f06db1de Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Wed, 22 Oct 2025 15:39:28 -0700 Subject: [PATCH 2/2] fix test --- test/Orleans.Journaling.Tests/DurableQueueTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Orleans.Journaling.Tests/DurableQueueTests.cs b/test/Orleans.Journaling.Tests/DurableQueueTests.cs index 24b23bdb3b9..f1aeb9a32c7 100644 --- a/test/Orleans.Journaling.Tests/DurableQueueTests.cs +++ b/test/Orleans.Journaling.Tests/DurableQueueTests.cs @@ -187,8 +187,8 @@ public async Task DurableQueue_EmptyQueueOperations_Test() var manager = sut.Manager; var codec = CodecProvider.GetCodec(); var queue = new DurableQueue("emptyQueue", manager, codec, SessionPool); - await manager.WriteStateAsync(CancellationToken.None); await sut.Lifecycle.OnStart(); + await manager.WriteStateAsync(CancellationToken.None); // Assert Assert.Empty(queue);