From ee6cc8354274ad0d2e4fd3c33f854da713f5bd39 Mon Sep 17 00:00:00 2001 From: Korolev Dmitry Date: Fri, 31 Oct 2025 20:25:04 +0100 Subject: [PATCH 1/2] init --- .../Runtime/ActivationCollectorTests.cs | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/test/NonSilo.Tests/Runtime/ActivationCollectorTests.cs b/test/NonSilo.Tests/Runtime/ActivationCollectorTests.cs index 8aa2faa8c15..c73c66689ed 100644 --- a/test/NonSilo.Tests/Runtime/ActivationCollectorTests.cs +++ b/test/NonSilo.Tests/Runtime/ActivationCollectorTests.cs @@ -175,5 +175,66 @@ public async Task DeactivateInDueTimeOrder_OnlyOldestAndEligibleAreDeactivated() Assert.Equal(1, collector._activationCount); } + + [Fact] + public async Task DeactivateInDueTimeOrder_HandlesRaceDuringEnumeration() + { + var grainCollectionOptions = Options.Create(new GrainCollectionOptions()); + + var logger = NullLogger.Instance; + var statsProvider = Substitute.For(); + var timeProvider = new FakeTimeProvider(DateTimeOffset.UtcNow); + + var collector = new ActivationCollector(timeProvider, grainCollectionOptions, logger, statsProvider); + var timer = Substitute.For(); + timer.NextTick().Returns(Task.FromResult(false)); + var timerFactory = Substitute.For(); + timerFactory.Create(Arg.Any(), Arg.Any()).Returns(timer); + + var wsLogger = NullLogger.Instance; + var workingSet = new ActivationWorkingSet(timerFactory, wsLogger, new[] { collector }); + + var activations = new List<(ICollectibleGrainContext, IActivationWorkingSetMember)>(); + + for (int i = 0; i < 100; i++) + { + var activation = Substitute.For(); + activation.CollectionAgeLimit.Returns(TimeSpan.FromMinutes(1)); + activation.IsValid.Returns(true); + activation.IsExemptFromCollection.Returns(false); + activation.IsInactive.Returns(true); + activation.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); }); + ((IActivationWorkingSetMember)activation).IsCandidateForRemoval(Arg.Any()).Returns(true); + + workingSet.OnActivated(activation as IActivationWorkingSetMember); + activations.Add((activation, activation as IActivationWorkingSetMember)); + } + + var deactivateTask = Task.Run(async () => + { + await collector.DeactivateInDueTimeOrder(50, CancellationToken.None); + }); + + var addRemoveTask = Task.Run(() => + { + for (int i = 0; i < 50; i++) + { + var activation = Substitute.For(); + activation.CollectionAgeLimit.Returns(TimeSpan.FromMinutes(1)); + activation.IsValid.Returns(true); + activation.IsExemptFromCollection.Returns(false); + activation.IsInactive.Returns(true); + activation.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); }); + ((IActivationWorkingSetMember)activation).IsCandidateForRemoval(Arg.Any()).Returns(true); + + workingSet.OnActivated(activation as IActivationWorkingSetMember); + Thread.Sleep(1); + } + }); + + await Task.WhenAll(deactivateTask, addRemoveTask); + + Assert.True(collector._activationCount >= 0); + } } } From 511e082a2df2f9226c9d2912049330ded05224bd Mon Sep 17 00:00:00 2001 From: Korolev Dmitry Date: Mon, 3 Nov 2025 15:09:21 +0100 Subject: [PATCH 2/2] fix concurrency issue --- .../Catalog/ActivationCollector.cs | 4 +- .../Runtime/ActivationCollectorTests.cs | 164 +++++++++++------- 2 files changed, 109 insertions(+), 59 deletions(-) diff --git a/src/Orleans.Runtime/Catalog/ActivationCollector.cs b/src/Orleans.Runtime/Catalog/ActivationCollector.cs index 4d2d89a6968..eca61178be2 100644 --- a/src/Orleans.Runtime/Catalog/ActivationCollector.cs +++ b/src/Orleans.Runtime/Catalog/ActivationCollector.cs @@ -362,7 +362,9 @@ internal async Task DeactivateInDueTimeOrder(int count, CancellationToken cancel var candidates = new List(count); - foreach (var bucket in buckets.OrderBy(b => b.Key)) + // snapshot to avoid concurrency collection modification issues + var bucketSnapshot = buckets.ToArray(); + foreach (var bucket in bucketSnapshot.OrderBy(b => b.Key)) { foreach (var item in bucket.Value.Items) { diff --git a/test/NonSilo.Tests/Runtime/ActivationCollectorTests.cs b/test/NonSilo.Tests/Runtime/ActivationCollectorTests.cs index c73c66689ed..424c5939f05 100644 --- a/test/NonSilo.Tests/Runtime/ActivationCollectorTests.cs +++ b/test/NonSilo.Tests/Runtime/ActivationCollectorTests.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Time.Testing; using NSubstitute; using Orleans.Configuration; +using Orleans.Runtime; using Orleans.Statistics; using TestGrains; using Xunit; @@ -141,35 +142,17 @@ public async Task DeactivateInDueTimeOrder_OnlyOldestAndEligibleAreDeactivated() var wsLogger = NullLogger.Instance; var workingSet = new ActivationWorkingSet(timerFactory, wsLogger, new[] { collector }); - var now = DateTime.UtcNow; - var activation1 = Substitute.For(); - activation1.CollectionAgeLimit.Returns(TimeSpan.FromMinutes(1)); - activation1.IsValid.Returns(true); - activation1.IsExemptFromCollection.Returns(false); - activation1.IsInactive.Returns(true); - activation1.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); }); - - var activation2 = Substitute.For(); - activation2.CollectionAgeLimit.Returns(TimeSpan.FromMinutes(1)); - activation2.IsValid.Returns(true); - activation2.IsExemptFromCollection.Returns(false); - activation2.IsInactive.Returns(true); - activation2.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); }); - - var activation3 = Substitute.For(); - activation3.CollectionAgeLimit.Returns(TimeSpan.FromMinutes(1)); - activation3.IsValid.Returns(true); - activation3.IsExemptFromCollection.Returns(false); - activation3.IsInactive.Returns(true); - activation3.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); }); - - ((IActivationWorkingSetMember)activation1).IsCandidateForRemoval(Arg.Any()).Returns(true); - ((IActivationWorkingSetMember)activation2).IsCandidateForRemoval(Arg.Any()).Returns(true); - ((IActivationWorkingSetMember)activation3).IsCandidateForRemoval(Arg.Any()).Returns(true); - - workingSet.OnActivated(activation1 as IActivationWorkingSetMember); - workingSet.OnActivated(activation2 as IActivationWorkingSetMember); - workingSet.OnActivated(activation3 as IActivationWorkingSetMember); + var activation1 = PrepareActivation(1, collector); + var activation2 = PrepareActivation(1, collector); + var activation3 = PrepareActivation(1, collector); + + activation1.IsCandidateForRemoval(Arg.Any()).Returns(true); + activation2.IsCandidateForRemoval(Arg.Any()).Returns(true); + activation3.IsCandidateForRemoval(Arg.Any()).Returns(true); + + workingSet.OnActivated(activation1); + workingSet.OnActivated(activation2); + workingSet.OnActivated(activation3); await collector.DeactivateInDueTimeOrder(2, CancellationToken.None); @@ -177,7 +160,7 @@ public async Task DeactivateInDueTimeOrder_OnlyOldestAndEligibleAreDeactivated() } [Fact] - public async Task DeactivateInDueTimeOrder_HandlesRaceDuringEnumeration() + public async Task DeactivateInDueTimeOrder_ConcurrentModification_ShouldNotThrow() { var grainCollectionOptions = Options.Create(new GrainCollectionOptions()); @@ -194,47 +177,112 @@ public async Task DeactivateInDueTimeOrder_HandlesRaceDuringEnumeration() var wsLogger = NullLogger.Instance; var workingSet = new ActivationWorkingSet(timerFactory, wsLogger, new[] { collector }); - var activations = new List<(ICollectibleGrainContext, IActivationWorkingSetMember)>(); + var totalActivations = 500; + var activations = new List(); - for (int i = 0; i < 100; i++) + for (var i = 0; i < totalActivations; i++) { - var activation = Substitute.For(); - activation.CollectionAgeLimit.Returns(TimeSpan.FromMinutes(1)); - activation.IsValid.Returns(true); - activation.IsExemptFromCollection.Returns(false); - activation.IsInactive.Returns(true); - activation.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); }); - ((IActivationWorkingSetMember)activation).IsCandidateForRemoval(Arg.Any()).Returns(true); - - workingSet.OnActivated(activation as IActivationWorkingSetMember); - activations.Add((activation, activation as IActivationWorkingSetMember)); + var collectionAgeLimit = TimeSpan.FromMinutes(1) + TimeSpan.FromMinutes(i * 1); + + var activation = PrepareActivation(collectionAgeLimit, collector); + + activation.IsCandidateForRemoval(Arg.Any()).Returns(true); + var activationMember = activation; + activations.Add(activationMember); + workingSet.OnActivated(activationMember); } - var deactivateTask = Task.Run(async () => + // Now we have 500 buckets. Let's trigger the race condition. + var exceptions = new ConcurrentBag(); + var cts = new CancellationTokenSource(); + + // Task 1: Aggressively ADD new activations (creates NEW buckets in the dictionary) + var addTask = Task.Run(async () => { - await collector.DeactivateInDueTimeOrder(50, CancellationToken.None); + int addCount = 0; + while (!cts.Token.IsCancellationRequested && addCount < 200) + { + // Add 10 activations at a time with random collection ages + for (int i = 0; i < 10; i++) + { + var activation = PrepareActivation(501 + Random.Shared.Next(200), collector); + activation.IsCandidateForRemoval(Arg.Any()).Returns(true); + + workingSet.OnActivated(activation); + addCount++; + } + + await Task.Yield(); + } }); - var addRemoveTask = Task.Run(() => + // Task 2: Aggressively REMOVE activations (empties buckets, causing REMOVAL from dictionary) + var removeTask = Task.Run(async () => { - for (int i = 0; i < 50; i++) + int removeCount = 0; + while (!cts.Token.IsCancellationRequested && removeCount < 200) { - var activation = Substitute.For(); - activation.CollectionAgeLimit.Returns(TimeSpan.FromMinutes(1)); - activation.IsValid.Returns(true); - activation.IsExemptFromCollection.Returns(false); - activation.IsInactive.Returns(true); - activation.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); }); - ((IActivationWorkingSetMember)activation).IsCandidateForRemoval(Arg.Any()).Returns(true); - - workingSet.OnActivated(activation as IActivationWorkingSetMember); - Thread.Sleep(1); + // Remove 10 activations at a time + for (int i = 0; i < 10 && activations.Count > 100; i++) + { + var activation = activations[Random.Shared.Next(activations.Count)] as ICollectibleGrainContext; + + // TryCancelCollection removes the activation from its bucket + // If the bucket becomes empty, it gets removed from the dictionary! + if (collector.TryCancelCollection(activation)) + { + removeCount++; + } + } + + await Task.Yield(); } }); - await Task.WhenAll(deactivateTask, addRemoveTask); + // Task 3: Run DeactivateInDueTimeOrder MANY times concurrently + // This is where OrderBy enumerates buckets and can race with add/remove + var deactivateTasks = Enumerable.Range(0, 20).Select(_ => Task.Run(async () => + { + for (int i = 0; i < 100; i++) + { + try + { + // Deactivation iterates through the buckets, and if code is not resilient for concurrent modification, + // it will blow up with some form of collection modification exception. + await collector.DeactivateInDueTimeOrder(50, CancellationToken.None); + await Task.Delay(1); + } + catch (Exception ex) + { + exceptions.Add(ex); + } + } + })).ToArray(); + + // Wait for all deactivation attempts + await Task.WhenAll(deactivateTasks); + + // Stop background modifications + cts.Cancel(); + await Task.WhenAll(addTask, removeTask); - Assert.True(collector._activationCount >= 0); + // Verify no exceptions occurred during deactivation + Assert.Empty(exceptions); + } + + private IActivationWorkingSetMember PrepareActivation(int collectionAgeLimitMinutes, ActivationCollector collector) + => PrepareActivation(TimeSpan.FromMinutes(collectionAgeLimitMinutes), collector); + + private IActivationWorkingSetMember PrepareActivation(TimeSpan collectionAgeLimit, ActivationCollector collector) + { + var activation = Substitute.For(); + activation.CollectionAgeLimit.Returns(collectionAgeLimit); + activation.IsValid.Returns(true); + activation.IsExemptFromCollection.Returns(false); + activation.IsInactive.Returns(true); + activation.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); }); + + return (IActivationWorkingSetMember)activation; } } }