Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions TUnit.Engine/Framework/TUnitServiceProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,16 @@ public TUnitServiceProvider(IExtension extension,
isFailFastEnabled,
FailFastCancellationSource,
Logger,
testStateManager));
testStateManager,
ParallelLimitLockProvider));

// Create scheduler configuration from command line options
var testGroupingService = Register<ITestGroupingService>(new TestGroupingService(Logger));
var circularDependencyDetector = Register(new CircularDependencyDetector());

var constraintKeyScheduler = Register<IConstraintKeyScheduler>(new ConstraintKeyScheduler(
testRunner,
Logger,
ParallelLimitLockProvider));
Logger));

var staticPropertyHandler = Register(new StaticPropertyHandler(Logger, objectTracker, trackableObjectGraphProvider, disposer, lazyPropertyInjector, objectGraphDiscoveryService));

Expand All @@ -266,7 +266,6 @@ public TUnitServiceProvider(IExtension extension,
testGroupingService,
MessageBus,
CommandLineOptions,
ParallelLimitLockProvider,
testStateManager,
testRunner,
circularDependencyDetector,
Expand Down
19 changes: 1 addition & 18 deletions TUnit.Engine/Scheduling/ConstraintKeyScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,13 @@ internal sealed class ConstraintKeyScheduler : IConstraintKeyScheduler
{
private readonly TestRunner _testRunner;
private readonly TUnitFrameworkLogger _logger;
private readonly ParallelLimitLockProvider _parallelLimitLockProvider;

public ConstraintKeyScheduler(
TestRunner testRunner,
TUnitFrameworkLogger logger,
ParallelLimitLockProvider parallelLimitLockProvider)
TUnitFrameworkLogger logger)
{
_testRunner = testRunner;
_logger = logger;
_parallelLimitLockProvider = parallelLimitLockProvider;
}

#if NET8_0_OR_GREATER
Expand Down Expand Up @@ -146,26 +143,12 @@ private async Task ExecuteTestAndReleaseKeysAsync(
WaitingTestIndex waitingTestIndex,
CancellationToken cancellationToken)
{
SemaphoreSlim? parallelLimiterSemaphore = null;

try
{
// Two-phase acquisition: Acquire ParallelLimiter BEFORE executing
// This ensures constrained resources are acquired before holding constraint keys
if (test.Context.ParallelLimiter != null)
{
parallelLimiterSemaphore = _parallelLimitLockProvider.GetLock(test.Context.ParallelLimiter);
await parallelLimiterSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
}

// Execute the test (constraint keys are already held by caller)
await _testRunner.ExecuteTestAsync(test, cancellationToken).ConfigureAwait(false);
}
finally
{
// Release ParallelLimiter if we acquired it
parallelLimiterSemaphore?.Release();

// Release the constraint keys and check if any waiting tests can now run
var testsToStart = new List<WaitingTest>();

Expand Down
47 changes: 34 additions & 13 deletions TUnit.Engine/Scheduling/TestRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,24 @@ public sealed class TestRunner
private readonly CancellationTokenSource _failFastCancellationSource;
private readonly TUnitFrameworkLogger _logger;
private readonly TestStateManager _testStateManager;
private readonly ParallelLimitLockProvider _parallelLimitLockProvider;

internal TestRunner(
ITestCoordinator testCoordinator,
ITUnitMessageBus tunitMessageBus,
bool isFailFastEnabled,
CancellationTokenSource failFastCancellationSource,
TUnitFrameworkLogger logger,
TestStateManager testStateManager)
TestStateManager testStateManager,
ParallelLimitLockProvider parallelLimitLockProvider)
{
_testCoordinator = testCoordinator;
_tunitMessageBus = tunitMessageBus;
_isFailFastEnabled = isFailFastEnabled;
_failFastCancellationSource = failFastCancellationSource;
_logger = logger;
_testStateManager = testStateManager;
_parallelLimitLockProvider = parallelLimitLockProvider;
}

private readonly ConcurrentDictionary<string, TaskCompletionSource<bool>> _executingTests = new();
Expand Down Expand Up @@ -74,7 +77,8 @@ private async ValueTask ExecuteTestInternalAsync(AbstractExecutableTest test, Ca
{
try
{
// First, execute all dependencies recursively
// First, execute all dependencies recursively (without holding the limiter
// semaphore — avoids deadlocks in dependency chains sharing a limiter).
foreach (var dependency in test.Dependencies)
{
await ExecuteTestAsync(dependency.Test, cancellationToken).ConfigureAwait(false);
Expand All @@ -87,21 +91,38 @@ private async ValueTask ExecuteTestInternalAsync(AbstractExecutableTest test, Ca
}
}

test.State = TestState.Running;
test.StartTime = DateTimeOffset.UtcNow;
// Acquired here (not in the scheduler) so the limit is enforced
// regardless of entry point — scheduler or dependency recursion.
SemaphoreSlim? acquiredLimiter = null;
try
{
if (test.Context.ParallelLimiter is { } parallelLimiter)
{
var limiter = _parallelLimitLockProvider.GetLock(parallelLimiter);
await limiter.WaitAsync(cancellationToken).ConfigureAwait(false);
acquiredLimiter = limiter;
}

// TestCoordinator handles sending InProgress message
await _testCoordinator.ExecuteTestAsync(test, cancellationToken).ConfigureAwait(false);
test.State = TestState.Running;
test.StartTime = DateTimeOffset.UtcNow;

if ((_isFailFastEnabled || TUnitSettings.Default.Execution.FailFast) && test.Result?.State == TestState.Failed)
{
// Capture the first failure exception before triggering cancellation
if (test.Result.Exception != null)
// TestCoordinator handles sending InProgress message
await _testCoordinator.ExecuteTestAsync(test, cancellationToken).ConfigureAwait(false);

if ((_isFailFastEnabled || TUnitSettings.Default.Execution.FailFast) && test.Result?.State == TestState.Failed)
{
Interlocked.CompareExchange(ref _firstFailFastException, test.Result.Exception, null);
// Capture the first failure exception before triggering cancellation
if (test.Result.Exception != null)
{
Interlocked.CompareExchange(ref _firstFailFastException, test.Result.Exception, null);
}
await _logger.LogErrorAsync($"Test {test.TestId} failed. Triggering fail-fast cancellation.").ConfigureAwait(false);
_failFastCancellationSource.Cancel();
}
await _logger.LogErrorAsync($"Test {test.TestId} failed. Triggering fail-fast cancellation.").ConfigureAwait(false);
_failFastCancellationSource.Cancel();
}
finally
{
acquiredLimiter?.Release();
}
}
catch (Exception ex)
Expand Down
143 changes: 15 additions & 128 deletions TUnit.Engine/Scheduling/TestScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Buffers;
using Microsoft.Testing.Platform.CommandLine;
using TUnit.Core;
using TUnit.Core.Exceptions;
Expand All @@ -19,7 +18,6 @@ internal sealed class TestScheduler : ITestScheduler
private readonly TUnitFrameworkLogger _logger;
private readonly ITestGroupingService _groupingService;
private readonly ITUnitMessageBus _messageBus;
private readonly ParallelLimitLockProvider _parallelLimitLockProvider;
private readonly TestStateManager _testStateManager;
private readonly TestRunner _testRunner;
private readonly CircularDependencyDetector _circularDependencyDetector;
Expand All @@ -36,7 +34,6 @@ public TestScheduler(
ITestGroupingService groupingService,
ITUnitMessageBus messageBus,
ICommandLineOptions commandLineOptions,
ParallelLimitLockProvider parallelLimitLockProvider,
TestStateManager testStateManager,
TestRunner testRunner,
CircularDependencyDetector circularDependencyDetector,
Expand All @@ -49,7 +46,6 @@ public TestScheduler(
_logger = logger;
_groupingService = groupingService;
_messageBus = messageBus;
_parallelLimitLockProvider = parallelLimitLockProvider;
_testStateManager = testStateManager;
_testRunner = testRunner;
_circularDependencyDetector = circularDependencyDetector;
Expand Down Expand Up @@ -330,7 +326,7 @@ await Parallel.ForEachAsync(
new ParallelOptions { CancellationToken = cancellationToken },
async (test, ct) =>
{
test.ExecutionTask ??= ExecuteSingleTestAsync(test, ct);
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, ct).AsTask();
await test.ExecutionTask.ConfigureAwait(false);
}
).ConfigureAwait(false);
Expand All @@ -340,39 +336,13 @@ await Parallel.ForEachAsync(
for (var i = 0; i < tests.Length; i++)
{
var test = tests[i];
tasks[i] = test.ExecutionTask ??= ExecuteSingleTestAsync(test, cancellationToken);
tasks[i] = test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, cancellationToken).AsTask();
}
await Task.WhenAll(tasks).ConfigureAwait(false);
#endif
}
}

#if NET8_0_OR_GREATER
[System.Diagnostics.CodeAnalysis.RequiresUnreferencedCode("Test execution involves reflection for hooks and initialization")]
#endif
private async Task ExecuteSingleTestAsync(
AbstractExecutableTest test,
CancellationToken cancellationToken)
{
if (test.Context.ParallelLimiter != null)
{
var semaphore = _parallelLimitLockProvider.GetLock(test.Context.ParallelLimiter);
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await _testRunner.ExecuteTestAsync(test, cancellationToken).ConfigureAwait(false);
}
finally
{
semaphore.Release();
}
}
else
{
await _testRunner.ExecuteTestAsync(test, cancellationToken).ConfigureAwait(false);
}
}

#if NET8_0_OR_GREATER
[System.Diagnostics.CodeAnalysis.RequiresUnreferencedCode("Test execution involves reflection for hooks and initialization")]
#endif
Expand All @@ -382,7 +352,7 @@ private async Task ExecuteSequentiallyAsync(
{
foreach (var test in tests)
{
test.ExecutionTask ??= ExecuteSingleTestAsync(test, cancellationToken);
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, cancellationToken).AsTask();
await test.ExecutionTask.ConfigureAwait(false);
}
}
Expand All @@ -395,33 +365,19 @@ private async Task ExecuteWithGlobalLimitAsync(
var maxParallelism = _maxParallelism.Value;

#if NET8_0_OR_GREATER
// PERFORMANCE OPTIMIZATION: Partition tests by whether they have parallel limiters
// Tests without limiters can run with unlimited parallelism (avoiding global semaphore overhead)
var testsWithLimiters = new List<AbstractExecutableTest>();
var testsWithoutLimiters = new List<AbstractExecutableTest>();

foreach (var test in tests)
{
if (test.Context.ParallelLimiter != null)
await Parallel.ForEachAsync(
tests,
new ParallelOptions
{
testsWithLimiters.Add(test);
}
else
MaxDegreeOfParallelism = maxParallelism,
CancellationToken = cancellationToken
},
async (test, ct) =>
{
testsWithoutLimiters.Add(test);
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, ct).AsTask();
await test.ExecutionTask.ConfigureAwait(false);
}
}

// Execute both groups concurrently
var limitedTask = testsWithLimiters.Count > 0
? ExecuteWithLimitAsync(testsWithLimiters, maxParallelism, cancellationToken)
: Task.CompletedTask;

var unlimitedTask = testsWithoutLimiters.Count > 0
? ExecuteUnlimitedAsync(testsWithoutLimiters, maxParallelism, cancellationToken)
: Task.CompletedTask;

await Task.WhenAll(limitedTask, unlimitedTask).ConfigureAwait(false);
).ConfigureAwait(false);
#else
// Fallback for netstandard2.0: Manual bounded concurrency using existing semaphore
var tasks = new Task[tests.Length];
Expand All @@ -430,26 +386,11 @@ private async Task ExecuteWithGlobalLimitAsync(
var test = tests[i];
tasks[i] = Task.Run(async () =>
{
SemaphoreSlim? parallelLimiterSemaphore = null;

await globalSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (test.Context.ParallelLimiter != null)
{
parallelLimiterSemaphore = _parallelLimitLockProvider.GetLock(test.Context.ParallelLimiter);
await parallelLimiterSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
}

try
{
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, cancellationToken).AsTask();
await test.ExecutionTask.ConfigureAwait(false);
}
finally
{
parallelLimiterSemaphore?.Release();
}
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, cancellationToken).AsTask();
await test.ExecutionTask.ConfigureAwait(false);
}
finally
{
Expand All @@ -461,60 +402,6 @@ private async Task ExecuteWithGlobalLimitAsync(
#endif
}

#if NET8_0_OR_GREATER
private async Task ExecuteWithLimitAsync(
List<AbstractExecutableTest> tests,
int maxParallelism,
CancellationToken cancellationToken)
{
// Execute tests with parallel limiters using the global limit
await Parallel.ForEachAsync(
tests,
new ParallelOptions
{
MaxDegreeOfParallelism = maxParallelism,
CancellationToken = cancellationToken
},
async (test, ct) =>
{
var parallelLimiterSemaphore = _parallelLimitLockProvider.GetLock(test.Context.ParallelLimiter!);
await parallelLimiterSemaphore.WaitAsync(ct).ConfigureAwait(false);

try
{
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, ct).AsTask();
await test.ExecutionTask.ConfigureAwait(false);
}
finally
{
parallelLimiterSemaphore.Release();
}
}
).ConfigureAwait(false);
}

private async Task ExecuteUnlimitedAsync(
List<AbstractExecutableTest> tests,
int maxParallelism,
CancellationToken cancellationToken)
{
// Execute tests without per-test limiters, but still apply global parallelism limit
await Parallel.ForEachAsync(
tests,
new ParallelOptions
{
MaxDegreeOfParallelism = maxParallelism,
CancellationToken = cancellationToken
},
async (test, ct) =>
{
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, ct).AsTask();
await test.ExecutionTask.ConfigureAwait(false);
}
).ConfigureAwait(false);
}
#endif

private async Task WaitForTasksWithFailFastHandling(IEnumerable<Task> tasks, CancellationToken cancellationToken)
{
try
Expand Down
Loading
Loading