diff --git a/src/Orleans.Runtime/Catalog/ActivationCollector.cs b/src/Orleans.Runtime/Catalog/ActivationCollector.cs index 4d2d89a6968..eca61178be2 100644 --- a/src/Orleans.Runtime/Catalog/ActivationCollector.cs +++ b/src/Orleans.Runtime/Catalog/ActivationCollector.cs @@ -362,7 +362,9 @@ internal async Task DeactivateInDueTimeOrder(int count, CancellationToken cancel var candidates = new List(count); - foreach (var bucket in buckets.OrderBy(b => b.Key)) + // snapshot to avoid concurrency collection modification issues + var bucketSnapshot = buckets.ToArray(); + foreach (var bucket in bucketSnapshot.OrderBy(b => b.Key)) { foreach (var item in bucket.Value.Items) { diff --git a/test/NonSilo.Tests/Runtime/ActivationCollectorTests.cs b/test/NonSilo.Tests/Runtime/ActivationCollectorTests.cs index 8aa2faa8c15..424c5939f05 100644 --- a/test/NonSilo.Tests/Runtime/ActivationCollectorTests.cs +++ b/test/NonSilo.Tests/Runtime/ActivationCollectorTests.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Time.Testing; using NSubstitute; using Orleans.Configuration; +using Orleans.Runtime; using Orleans.Statistics; using TestGrains; using Xunit; @@ -141,39 +142,147 @@ public async Task DeactivateInDueTimeOrder_OnlyOldestAndEligibleAreDeactivated() var wsLogger = NullLogger.Instance; var workingSet = new ActivationWorkingSet(timerFactory, wsLogger, new[] { collector }); - var now = DateTime.UtcNow; - var activation1 = Substitute.For(); - activation1.CollectionAgeLimit.Returns(TimeSpan.FromMinutes(1)); - activation1.IsValid.Returns(true); - activation1.IsExemptFromCollection.Returns(false); - activation1.IsInactive.Returns(true); - activation1.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); }); - - var activation2 = Substitute.For(); - activation2.CollectionAgeLimit.Returns(TimeSpan.FromMinutes(1)); - activation2.IsValid.Returns(true); - activation2.IsExemptFromCollection.Returns(false); - activation2.IsInactive.Returns(true); - activation2.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); }); - - var activation3 = Substitute.For(); - activation3.CollectionAgeLimit.Returns(TimeSpan.FromMinutes(1)); - activation3.IsValid.Returns(true); - activation3.IsExemptFromCollection.Returns(false); - activation3.IsInactive.Returns(true); - activation3.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); }); - - ((IActivationWorkingSetMember)activation1).IsCandidateForRemoval(Arg.Any()).Returns(true); - ((IActivationWorkingSetMember)activation2).IsCandidateForRemoval(Arg.Any()).Returns(true); - ((IActivationWorkingSetMember)activation3).IsCandidateForRemoval(Arg.Any()).Returns(true); - - workingSet.OnActivated(activation1 as IActivationWorkingSetMember); - workingSet.OnActivated(activation2 as IActivationWorkingSetMember); - workingSet.OnActivated(activation3 as IActivationWorkingSetMember); + var activation1 = PrepareActivation(1, collector); + var activation2 = PrepareActivation(1, collector); + var activation3 = PrepareActivation(1, collector); + + activation1.IsCandidateForRemoval(Arg.Any()).Returns(true); + activation2.IsCandidateForRemoval(Arg.Any()).Returns(true); + activation3.IsCandidateForRemoval(Arg.Any()).Returns(true); + + workingSet.OnActivated(activation1); + workingSet.OnActivated(activation2); + workingSet.OnActivated(activation3); await collector.DeactivateInDueTimeOrder(2, CancellationToken.None); Assert.Equal(1, collector._activationCount); } + + [Fact] + public async Task DeactivateInDueTimeOrder_ConcurrentModification_ShouldNotThrow() + { + var grainCollectionOptions = Options.Create(new GrainCollectionOptions()); + + var logger = NullLogger.Instance; + var statsProvider = Substitute.For(); + var timeProvider = new FakeTimeProvider(DateTimeOffset.UtcNow); + + var collector = new ActivationCollector(timeProvider, grainCollectionOptions, logger, statsProvider); + var timer = Substitute.For(); + timer.NextTick().Returns(Task.FromResult(false)); + var timerFactory = Substitute.For(); + timerFactory.Create(Arg.Any(), Arg.Any()).Returns(timer); + + var wsLogger = NullLogger.Instance; + var workingSet = new ActivationWorkingSet(timerFactory, wsLogger, new[] { collector }); + + var totalActivations = 500; + var activations = new List(); + + for (var i = 0; i < totalActivations; i++) + { + var collectionAgeLimit = TimeSpan.FromMinutes(1) + TimeSpan.FromMinutes(i * 1); + + var activation = PrepareActivation(collectionAgeLimit, collector); + + activation.IsCandidateForRemoval(Arg.Any()).Returns(true); + var activationMember = activation; + activations.Add(activationMember); + workingSet.OnActivated(activationMember); + } + + // Now we have 500 buckets. Let's trigger the race condition. + var exceptions = new ConcurrentBag(); + var cts = new CancellationTokenSource(); + + // Task 1: Aggressively ADD new activations (creates NEW buckets in the dictionary) + var addTask = Task.Run(async () => + { + int addCount = 0; + while (!cts.Token.IsCancellationRequested && addCount < 200) + { + // Add 10 activations at a time with random collection ages + for (int i = 0; i < 10; i++) + { + var activation = PrepareActivation(501 + Random.Shared.Next(200), collector); + activation.IsCandidateForRemoval(Arg.Any()).Returns(true); + + workingSet.OnActivated(activation); + addCount++; + } + + await Task.Yield(); + } + }); + + // Task 2: Aggressively REMOVE activations (empties buckets, causing REMOVAL from dictionary) + var removeTask = Task.Run(async () => + { + int removeCount = 0; + while (!cts.Token.IsCancellationRequested && removeCount < 200) + { + // Remove 10 activations at a time + for (int i = 0; i < 10 && activations.Count > 100; i++) + { + var activation = activations[Random.Shared.Next(activations.Count)] as ICollectibleGrainContext; + + // TryCancelCollection removes the activation from its bucket + // If the bucket becomes empty, it gets removed from the dictionary! + if (collector.TryCancelCollection(activation)) + { + removeCount++; + } + } + + await Task.Yield(); + } + }); + + // Task 3: Run DeactivateInDueTimeOrder MANY times concurrently + // This is where OrderBy enumerates buckets and can race with add/remove + var deactivateTasks = Enumerable.Range(0, 20).Select(_ => Task.Run(async () => + { + for (int i = 0; i < 100; i++) + { + try + { + // Deactivation iterates through the buckets, and if code is not resilient for concurrent modification, + // it will blow up with some form of collection modification exception. + await collector.DeactivateInDueTimeOrder(50, CancellationToken.None); + await Task.Delay(1); + } + catch (Exception ex) + { + exceptions.Add(ex); + } + } + })).ToArray(); + + // Wait for all deactivation attempts + await Task.WhenAll(deactivateTasks); + + // Stop background modifications + cts.Cancel(); + await Task.WhenAll(addTask, removeTask); + + // Verify no exceptions occurred during deactivation + Assert.Empty(exceptions); + } + + private IActivationWorkingSetMember PrepareActivation(int collectionAgeLimitMinutes, ActivationCollector collector) + => PrepareActivation(TimeSpan.FromMinutes(collectionAgeLimitMinutes), collector); + + private IActivationWorkingSetMember PrepareActivation(TimeSpan collectionAgeLimit, ActivationCollector collector) + { + var activation = Substitute.For(); + activation.CollectionAgeLimit.Returns(collectionAgeLimit); + activation.IsValid.Returns(true); + activation.IsExemptFromCollection.Returns(false); + activation.IsInactive.Returns(true); + activation.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); }); + + return (IActivationWorkingSetMember)activation; + } } }