Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Orleans.Runtime/Catalog/ActivationCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,9 @@ internal async Task DeactivateInDueTimeOrder(int count, CancellationToken cancel

var candidates = new List<ICollectibleGrainContext>(count);

foreach (var bucket in buckets.OrderBy(b => b.Key))
// snapshot to avoid concurrency collection modification issues
var bucketSnapshot = buckets.ToArray();
foreach (var bucket in bucketSnapshot.OrderBy(b => b.Key))
{
foreach (var item in bucket.Value.Items)
{
Expand Down
167 changes: 138 additions & 29 deletions test/NonSilo.Tests/Runtime/ActivationCollectorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Microsoft.Extensions.Time.Testing;
using NSubstitute;
using Orleans.Configuration;
using Orleans.Runtime;
using Orleans.Statistics;
using TestGrains;
using Xunit;
Expand Down Expand Up @@ -141,39 +142,147 @@ public async Task DeactivateInDueTimeOrder_OnlyOldestAndEligibleAreDeactivated()
var wsLogger = NullLogger<ActivationWorkingSet>.Instance;
var workingSet = new ActivationWorkingSet(timerFactory, wsLogger, new[] { collector });

var now = DateTime.UtcNow;
var activation1 = Substitute.For<ICollectibleGrainContext, IActivationWorkingSetMember>();
activation1.CollectionAgeLimit.Returns(TimeSpan.FromMinutes(1));
activation1.IsValid.Returns(true);
activation1.IsExemptFromCollection.Returns(false);
activation1.IsInactive.Returns(true);
activation1.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); });

var activation2 = Substitute.For<ICollectibleGrainContext, IActivationWorkingSetMember>();
activation2.CollectionAgeLimit.Returns(TimeSpan.FromMinutes(1));
activation2.IsValid.Returns(true);
activation2.IsExemptFromCollection.Returns(false);
activation2.IsInactive.Returns(true);
activation2.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); });

var activation3 = Substitute.For<ICollectibleGrainContext, IActivationWorkingSetMember>();
activation3.CollectionAgeLimit.Returns(TimeSpan.FromMinutes(1));
activation3.IsValid.Returns(true);
activation3.IsExemptFromCollection.Returns(false);
activation3.IsInactive.Returns(true);
activation3.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); });

((IActivationWorkingSetMember)activation1).IsCandidateForRemoval(Arg.Any<bool>()).Returns(true);
((IActivationWorkingSetMember)activation2).IsCandidateForRemoval(Arg.Any<bool>()).Returns(true);
((IActivationWorkingSetMember)activation3).IsCandidateForRemoval(Arg.Any<bool>()).Returns(true);

workingSet.OnActivated(activation1 as IActivationWorkingSetMember);
workingSet.OnActivated(activation2 as IActivationWorkingSetMember);
workingSet.OnActivated(activation3 as IActivationWorkingSetMember);
var activation1 = PrepareActivation(1, collector);
var activation2 = PrepareActivation(1, collector);
var activation3 = PrepareActivation(1, collector);

activation1.IsCandidateForRemoval(Arg.Any<bool>()).Returns(true);
activation2.IsCandidateForRemoval(Arg.Any<bool>()).Returns(true);
activation3.IsCandidateForRemoval(Arg.Any<bool>()).Returns(true);

workingSet.OnActivated(activation1);
workingSet.OnActivated(activation2);
workingSet.OnActivated(activation3);

await collector.DeactivateInDueTimeOrder(2, CancellationToken.None);

Assert.Equal(1, collector._activationCount);
}

[Fact]
public async Task DeactivateInDueTimeOrder_ConcurrentModification_ShouldNotThrow()
{
var grainCollectionOptions = Options.Create(new GrainCollectionOptions());

var logger = NullLogger<ActivationCollector>.Instance;
var statsProvider = Substitute.For<IEnvironmentStatisticsProvider>();
var timeProvider = new FakeTimeProvider(DateTimeOffset.UtcNow);

var collector = new ActivationCollector(timeProvider, grainCollectionOptions, logger, statsProvider);
var timer = Substitute.For<IAsyncTimer>();
timer.NextTick().Returns(Task.FromResult(false));
var timerFactory = Substitute.For<IAsyncTimerFactory>();
timerFactory.Create(Arg.Any<TimeSpan>(), Arg.Any<string>()).Returns(timer);

var wsLogger = NullLogger<ActivationWorkingSet>.Instance;
var workingSet = new ActivationWorkingSet(timerFactory, wsLogger, new[] { collector });

var totalActivations = 500;
var activations = new List<IActivationWorkingSetMember>();

for (var i = 0; i < totalActivations; i++)
{
var collectionAgeLimit = TimeSpan.FromMinutes(1) + TimeSpan.FromMinutes(i * 1);

var activation = PrepareActivation(collectionAgeLimit, collector);

activation.IsCandidateForRemoval(Arg.Any<bool>()).Returns(true);
var activationMember = activation;
activations.Add(activationMember);
workingSet.OnActivated(activationMember);
}

// Now we have 500 buckets. Let's trigger the race condition.
var exceptions = new ConcurrentBag<Exception>();
var cts = new CancellationTokenSource();

// Task 1: Aggressively ADD new activations (creates NEW buckets in the dictionary)
var addTask = Task.Run(async () =>
{
int addCount = 0;
while (!cts.Token.IsCancellationRequested && addCount < 200)
{
// Add 10 activations at a time with random collection ages
for (int i = 0; i < 10; i++)
{
var activation = PrepareActivation(501 + Random.Shared.Next(200), collector);
activation.IsCandidateForRemoval(Arg.Any<bool>()).Returns(true);

workingSet.OnActivated(activation);
addCount++;
}

await Task.Yield();
}
});

// Task 2: Aggressively REMOVE activations (empties buckets, causing REMOVAL from dictionary)
var removeTask = Task.Run(async () =>
{
int removeCount = 0;
while (!cts.Token.IsCancellationRequested && removeCount < 200)
{
// Remove 10 activations at a time
for (int i = 0; i < 10 && activations.Count > 100; i++)
{
var activation = activations[Random.Shared.Next(activations.Count)] as ICollectibleGrainContext;

// TryCancelCollection removes the activation from its bucket
// If the bucket becomes empty, it gets removed from the dictionary!
if (collector.TryCancelCollection(activation))
{
removeCount++;
}
}

await Task.Yield();
}
});

// Task 3: Run DeactivateInDueTimeOrder MANY times concurrently
// This is where OrderBy enumerates buckets and can race with add/remove
var deactivateTasks = Enumerable.Range(0, 20).Select(_ => Task.Run(async () =>
{
for (int i = 0; i < 100; i++)
{
try
{
// Deactivation iterates through the buckets, and if code is not resilient for concurrent modification,
// it will blow up with some form of collection modification exception.
Comment thread
DeagleGross marked this conversation as resolved.
await collector.DeactivateInDueTimeOrder(50, CancellationToken.None);
await Task.Delay(1);
Comment thread
DeagleGross marked this conversation as resolved.
}
catch (Exception ex)
{
exceptions.Add(ex);
}
}
})).ToArray();

// Wait for all deactivation attempts
await Task.WhenAll(deactivateTasks);

// Stop background modifications
cts.Cancel();
await Task.WhenAll(addTask, removeTask);

// Verify no exceptions occurred during deactivation
Assert.Empty(exceptions);
}

private IActivationWorkingSetMember PrepareActivation(int collectionAgeLimitMinutes, ActivationCollector collector)
=> PrepareActivation(TimeSpan.FromMinutes(collectionAgeLimitMinutes), collector);

private IActivationWorkingSetMember PrepareActivation(TimeSpan collectionAgeLimit, ActivationCollector collector)
{
var activation = Substitute.For<ICollectibleGrainContext, IActivationWorkingSetMember>();
activation.CollectionAgeLimit.Returns(collectionAgeLimit);
activation.IsValid.Returns(true);
activation.IsExemptFromCollection.Returns(false);
activation.IsInactive.Returns(true);
activation.Deactivated.Returns(Task.CompletedTask).AndDoes(_ => { Interlocked.Decrement(ref collector._activationCount); });

return (IActivationWorkingSetMember)activation;
}
}
}
Loading