Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions src/Orleans.TestingHost/TestCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,8 @@ public async ValueTask DisposeAsync()

await Task.Run(async () =>
{
await DisposeAsync(ClientHost).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);

foreach (var handle in SecondarySilos)
{
await DisposeAsync(handle).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
Expand All @@ -816,8 +818,6 @@ await Task.Run(async () =>
{
await DisposeAsync(Primary).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
}

await DisposeAsync(ClientHost).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
ClientHost = null;

PortAllocator?.Dispose();
Expand All @@ -834,23 +834,14 @@ public void Dispose()
return;
}

foreach (var handle in this.SecondarySilos)
{
handle.Dispose();
}

this.Primary?.Dispose();
this.ClientHost?.Dispose();
this.PortAllocator?.Dispose();

_disposed = true;
DisposeAsync().AsTask().Wait();
}

private static async Task DisposeAsync(IDisposable value)
{
if (value is IAsyncDisposable asyncDisposable)
{
await asyncDisposable.DisposeAsync().ConfigureAwait(false);
await asyncDisposable.DisposeAsync().AsTask().ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
}
else if (value is IDisposable disposable)
{
Expand Down
120 changes: 59 additions & 61 deletions test/Grains/TestInternalGrains/PersistenceTestGrains.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@ internal static class TestRuntimeEnvironmentUtility
public static string CaptureRuntimeEnvironment()
{
var callStack = Utils.GetStackTrace(1); // Don't include this method in stack trace
return string.Format(
" TaskScheduler={0}" + Environment.NewLine
+ " RuntimeContext={1}" + Environment.NewLine
+ " WorkerPoolThread={2}" + Environment.NewLine
+ " Thread.CurrentThread.ManagedThreadId={4}" + Environment.NewLine
+ " StackTrace=" + Environment.NewLine
+ " {5}",
TaskScheduler.Current,
RuntimeContext.Current,
Thread.CurrentThread.Name,
Thread.CurrentThread.ManagedThreadId,
callStack);
return
$"""
TaskScheduler={TaskScheduler.Current}
RuntimeContext={RuntimeContext.Current}
WorkerPoolThread={Thread.CurrentThread.Name}
Thread.CurrentThread.ManagedThreadId={Thread.CurrentThread.ManagedThreadId}
StackTrace={callStack}
""";
}
}

Expand Down Expand Up @@ -900,111 +896,112 @@ public override Task OnActivateAsync(CancellationToken cancellationToken)
logger = loggerFactory.CreateLogger($"NonReentrantStressGrainWithoutState-{_id}");

executing = false;
logger.LogDebug("--> OnActivateAsync");
logger.LogDebug("<-- OnActivateAsync");
logger.LogTrace("--> OnActivateAsync");
logger.LogTrace("<-- OnActivateAsync");
return base.OnActivateAsync(cancellationToken);
}

private async Task SetOne(int iter, int level)
{
if (logger.IsEnabled(LogLevel.Debug))
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogDebug("---> SetOne {Iteration}-{Level}_0", iter, level);
logger.LogTrace("---> SetOne {Iteration}-{Level}_0", iter, level);
}

CheckRuntimeEnvironment("SetOne");
CheckRuntimeEnvironment();
if (level > 0)
{
if (logger.IsEnabled(LogLevel.Debug))
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogDebug("SetOne {Iteration}-{Level}_1. Before await Task.Done.", iter, level);
logger.LogTrace("SetOne {Iteration}-{Level}_1. Before await Task.CompletedTask.", iter, level);
}

await Task.CompletedTask;
if (logger.IsEnabled(LogLevel.Debug))
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogDebug("SetOne {Iteration}-{Level}_2. After await Task.Done.", iter, level);
logger.LogTrace("SetOne {Iteration}-{Level}_2. After await Task.CompletedTask.", iter, level);
}

CheckRuntimeEnvironment($"SetOne {iter}-{level}_3");
if (logger.IsEnabled(LogLevel.Debug))
CheckRuntimeEnvironment();
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogDebug("SetOne {Iteration}-{Level}_4. Before await Task.Delay.", iter, level);
logger.LogTrace("SetOne {Iteration}-{Level}_4. Before yield.", iter, level);
}

await Task.Delay(TimeSpan.FromMilliseconds(10));
if (logger.IsEnabled(LogLevel.Debug))
await Task.Yield();
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogDebug("SetOne {Iteration}-{Level}_5. After await Task.Delay.", iter, level);
logger.LogTrace("SetOne {Iteration}-{Level}_5. After yield.", iter, level);
}

CheckRuntimeEnvironment($"SetOne {iter}-{level}_6");
CheckRuntimeEnvironment();
var nextLevel = level - 1;
await SetOne(iter, nextLevel);
if (logger.IsEnabled(LogLevel.Debug))
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogDebug(
logger.LogTrace(
"SetOne {Iteration}-{Level}_7 => {NextLevel}. After await SetOne call.",
iter,
level,
nextLevel);
}

CheckRuntimeEnvironment($"SetOne {iter}-{level}_8");
if (logger.IsEnabled(LogLevel.Debug))
CheckRuntimeEnvironment();
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogDebug("SetOne {Iteration}-{Level}_9. Finished SetOne.", iter, level);
logger.LogTrace("SetOne {Iteration}-{Level}_9. Finished SetOne.", iter, level);
}
}

CheckRuntimeEnvironment($"SetOne {iter}-{level}_10");
logger.LogDebug("<--- SetOne {Iteration}-{Level}_11", iter, level);
CheckRuntimeEnvironment();
logger.LogTrace("<--- SetOne {Iteration}-{Level}_11", iter, level);
}

public async Task Test1()
{
CheckRuntimeEnvironment("Test1.BeforeLoop");
if (logger.IsEnabled(LogLevel.Debug))
CheckRuntimeEnvironment();
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogDebug("Test1.Start");
logger.LogTrace("Test1.Start");
}

var tasks = new List<Task>();
for (var i = 0; i < Multiple; i++)
{
CheckRuntimeEnvironment($"Test1_{i}_0");
if (logger.IsEnabled(LogLevel.Debug))
CheckRuntimeEnvironment();
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogDebug("Test1_ ------> {CallNum}", i);
logger.LogTrace("Test1_ ------> {CallNum}", i);
}

var task = SetOne(i, LEVEL);
CheckRuntimeEnvironment($"Test1_{i}_1");
if (logger.IsEnabled(LogLevel.Debug))
CheckRuntimeEnvironment();
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogDebug("After SetOne call {CallNum}", i);
logger.LogTrace("After SetOne call {CallNum}", i);
}

tasks.Add(task);
CheckRuntimeEnvironment($"Test1_{i}_2");
if (logger.IsEnabled(LogLevel.Debug))
CheckRuntimeEnvironment();
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogDebug("Test1_ <------ {CallNum}", i);
logger.LogTrace("Test1_ <------ {CallNum}", i);
}
}
CheckRuntimeEnvironment("Test1.AfterLoop");
if (logger.IsEnabled(LogLevel.Debug))

CheckRuntimeEnvironment();
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogDebug("Test1_About to WhenAll");
logger.LogTrace("Test1_About to WhenAll");
}

await Task.WhenAll(tasks);
if (logger.IsEnabled(LogLevel.Debug))
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogDebug("Test1.Finish");
logger.LogTrace("Test1.Finish");
}

CheckRuntimeEnvironment("Test1.Finish-CheckRuntimeEnvironment");
CheckRuntimeEnvironment();
//#if DEBUG
// // HACK for testing
// Logger.SetTraceLevelOverrides(overridesOff.ToList());
Expand All @@ -1015,11 +1012,11 @@ public async Task Task_Delay(bool doStart)
{
var wrapper = new Task<Task>(async () =>
{
logger.LogDebug("Before Task.Delay #1 TaskScheduler.Current={TaskScheduler}", TaskScheduler.Current);
logger.LogTrace("Before Task.Delay #1 TaskScheduler.Current={TaskScheduler}", TaskScheduler.Current);
await DoDelay(1);
logger.LogDebug("After Task.Delay #1 TaskScheduler.Current={TaskScheduler}", TaskScheduler.Current);
logger.LogTrace("After Task.Delay #1 TaskScheduler.Current={TaskScheduler}", TaskScheduler.Current);
await DoDelay(2);
logger.LogDebug("After Task.Delay #2 TaskScheduler.Current={TaskScheduler}", TaskScheduler.Current);
logger.LogTrace("After Task.Delay #2 TaskScheduler.Current={TaskScheduler}", TaskScheduler.Current);
});

if (doStart)
Expand All @@ -1032,16 +1029,16 @@ public async Task Task_Delay(bool doStart)

private async Task DoDelay(int i)
{
logger.LogDebug("Before Task.Delay #{Num} TaskScheduler.Current={TaskScheduler}", i, TaskScheduler.Current);
logger.LogTrace("Before Task.Delay #{Num} TaskScheduler.Current={TaskScheduler}", i, TaskScheduler.Current);
await Task.Delay(1);
logger.LogDebug("After Task.Delay #{Num} TaskScheduler.Current={TaskScheduler}", i, TaskScheduler.Current);
logger.LogTrace("After Task.Delay #{Num} TaskScheduler.Current={TaskScheduler}", i, TaskScheduler.Current);
}

private void CheckRuntimeEnvironment(string str)
private void CheckRuntimeEnvironment()
{
var callStack = new StackTrace();
if (executing)
{
var callStack = new StackTrace();
var errorMsg = string.Format(
"Found out that grain {0} is already in the middle of execution."
+ "\n Single threaded-ness violation!"
Expand All @@ -1054,7 +1051,8 @@ private void CheckRuntimeEnvironment(string str)
}

executing = true;
Thread.Sleep(10);
var stopwatch = ValueStopwatch.StartNew();
SpinWait.SpinUntil(() => stopwatch.Elapsed > TimeSpan.FromMicroseconds(1));
executing = false;
}
}
Expand Down
22 changes: 14 additions & 8 deletions test/NonSilo.Tests/Membership/MembershipTableManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,16 @@ public async Task MembershipTableManager_ExistingCluster()
private async Task BasicScenarioTest(InMemoryMembershipTable membershipTable, bool gracefulShutdown = true)
{
var timers = new List<DelegateAsyncTimer>();
var timerCalls = new ConcurrentQueue<(TimeSpan? DelayOverride, TaskCompletionSource<bool> Completion)>();
var timerCalls = new BlockingCollection<(TimeSpan? DelayOverride, TaskCompletionSource<bool> Completion)>();

var timerFactory = new DelegateAsyncTimerFactory(
(period, name) =>
{
var timer = new DelegateAsyncTimer(
overridePeriod =>
{
var task = new TaskCompletionSource<bool>();
timerCalls.Enqueue((overridePeriod, task));
timerCalls.Add((overridePeriod, task));
return task.Task;
});
timers.Add(timer);
Expand Down Expand Up @@ -166,20 +167,25 @@ private async Task BasicScenarioTest(InMemoryMembershipTable membershipTable, bo
{
// Check that a timer is being requested and that after it expires a call to
// refresh the membership table is made.
Assert.True(timerCalls.TryDequeue(out var timer));
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(10));
var (_, completion) = timerCalls.Take(cts.Token);
membershipTable.ClearCalls();
timer.Completion.TrySetResult(true);
completion.TrySetResult(true);
while (membershipTable.Calls.Count == 0) await Task.Delay(10);
Assert.Contains(membershipTable.Calls, c => c.Method.Equals(nameof(IMembershipTable.ReadAll)));
}

var cts = new CancellationTokenSource();
if (!gracefulShutdown) cts.Cancel();
var shutdownCts = new CancellationTokenSource();
if (!gracefulShutdown) shutdownCts.Cancel();
Assert.Equal(0, timers.First().DisposedCounter);
var stopped = this.lifecycle.OnStop(cts.Token);
var stopped = this.lifecycle.OnStop(shutdownCts.Token);

// Complete any timers that were waiting.
while (timerCalls.TryDequeue(out var t)) t.Completion.TrySetResult(false);
while (timerCalls.TryTake(out var t))
{
t.Completion.TrySetResult(false);
}

await stopped;
Assert.Equal(1, timers.First().DisposedCounter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Orleans.Internal;
using Xunit;
using Xunit.Abstractions;
using System.Threading.Tasks;

namespace UnitTests.SchedulerTests
{
Expand Down Expand Up @@ -34,38 +35,42 @@ public void Dispose()
}

[Fact, TestCategory("Functional"), TestCategory("Scheduler")]
public void Sched_AC_Test()
public async Task Sched_AC_Test()
{
int n = 0;
bool insideTask = false;
var context = UnitTestSchedulingContext.Create(loggerFactory);

this.output.WriteLine("Running Main in Context=" + RuntimeContext.Current);
var tasksTask = new TaskCompletionSource<List<Task>>();
context.Scheduler.QueueAction(() =>
{
var tasks = new List<Task>(10);
for (int i = 0; i < 10; i++)
{
Task.Factory.StartNew(() =>
var taskNum = i;
tasks.Add(Task.Factory.StartNew(() =>
{
// ReSharper disable AccessToModifiedClosure
this.output.WriteLine("Starting " + i + " in Context=" + RuntimeContext.Current);
this.output.WriteLine("Starting " + taskNum + " in Context=" + RuntimeContext.Current);
Assert.False(insideTask, $"Starting new task when I am already inside task of iteration {n}");
insideTask = true;

// Exacerbate the chance of a data race in the event that two of these tasks run concurrently.
int k = n;
Thread.Sleep(100);
n = k + 1;

insideTask = false;
// ReSharper restore AccessToModifiedClosure
}).Ignore();
}));
}
tasksTask.SetResult(tasks);
});

// Pause to let things run
Thread.Sleep(1500);
await Task.WhenAll(await tasksTask.Task);

// N should be 10, because all tasks should execute serially
Assert.True(n != 0, "Work items did not get executed");
Assert.Equal(10, n); // "Work items executed concurrently"
Assert.Equal(10, n); // "Work items executed concurrently"
}

[Fact, TestCategory("Functional"), TestCategory("Scheduler")]
Expand All @@ -85,7 +90,8 @@ public async Task Sched_AC_WaitTest()
Assert.False(insideTask, $"Starting new task when I am already inside task of iteration {n}");
insideTask = true;
this.output.WriteLine("===> 1a");
Thread.Sleep(1000); n = n + 3;
Thread.Sleep(1000);
n = n + 3;
this.output.WriteLine("===> 1b");
insideTask = false;
});
Expand Down
Loading
Loading