Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions TUnit.Engine/Framework/TUnitServiceProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,16 @@ public TUnitServiceProvider(IExtension extension,
isFailFastEnabled,
FailFastCancellationSource,
Logger,
testStateManager));
testStateManager,
ParallelLimitLockProvider));

// Create scheduler configuration from command line options
var testGroupingService = Register<ITestGroupingService>(new TestGroupingService(Logger));
var circularDependencyDetector = Register(new CircularDependencyDetector());

var constraintKeyScheduler = Register<IConstraintKeyScheduler>(new ConstraintKeyScheduler(
testRunner,
Logger,
ParallelLimitLockProvider));
Logger));

var staticPropertyHandler = Register(new StaticPropertyHandler(Logger, objectTracker, trackableObjectGraphProvider, disposer, lazyPropertyInjector, objectGraphDiscoveryService));

Expand All @@ -266,7 +266,6 @@ public TUnitServiceProvider(IExtension extension,
testGroupingService,
MessageBus,
CommandLineOptions,
ParallelLimitLockProvider,
testStateManager,
testRunner,
circularDependencyDetector,
Expand Down
19 changes: 1 addition & 18 deletions TUnit.Engine/Scheduling/ConstraintKeyScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,13 @@ internal sealed class ConstraintKeyScheduler : IConstraintKeyScheduler
{
private readonly TestRunner _testRunner;
private readonly TUnitFrameworkLogger _logger;
private readonly ParallelLimitLockProvider _parallelLimitLockProvider;

public ConstraintKeyScheduler(
TestRunner testRunner,
TUnitFrameworkLogger logger,
ParallelLimitLockProvider parallelLimitLockProvider)
TUnitFrameworkLogger logger)
{
_testRunner = testRunner;
_logger = logger;
_parallelLimitLockProvider = parallelLimitLockProvider;
}

#if NET8_0_OR_GREATER
Expand Down Expand Up @@ -146,26 +143,12 @@ private async Task ExecuteTestAndReleaseKeysAsync(
WaitingTestIndex waitingTestIndex,
CancellationToken cancellationToken)
{
SemaphoreSlim? parallelLimiterSemaphore = null;

try
{
// Two-phase acquisition: Acquire ParallelLimiter BEFORE executing
// This ensures constrained resources are acquired before holding constraint keys
if (test.Context.ParallelLimiter != null)
{
parallelLimiterSemaphore = _parallelLimitLockProvider.GetLock(test.Context.ParallelLimiter);
await parallelLimiterSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
}

// Execute the test (constraint keys are already held by caller)
await _testRunner.ExecuteTestAsync(test, cancellationToken).ConfigureAwait(false);
}
finally
{
// Release ParallelLimiter if we acquired it
parallelLimiterSemaphore?.Release();

// Release the constraint keys and check if any waiting tests can now run
var testsToStart = new List<WaitingTest>();

Expand Down
47 changes: 34 additions & 13 deletions TUnit.Engine/Scheduling/TestRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,24 @@ public sealed class TestRunner
private readonly CancellationTokenSource _failFastCancellationSource;
private readonly TUnitFrameworkLogger _logger;
private readonly TestStateManager _testStateManager;
private readonly ParallelLimitLockProvider _parallelLimitLockProvider;

internal TestRunner(
ITestCoordinator testCoordinator,
ITUnitMessageBus tunitMessageBus,
bool isFailFastEnabled,
CancellationTokenSource failFastCancellationSource,
TUnitFrameworkLogger logger,
TestStateManager testStateManager)
TestStateManager testStateManager,
ParallelLimitLockProvider parallelLimitLockProvider)
{
_testCoordinator = testCoordinator;
_tunitMessageBus = tunitMessageBus;
_isFailFastEnabled = isFailFastEnabled;
_failFastCancellationSource = failFastCancellationSource;
_logger = logger;
_testStateManager = testStateManager;
_parallelLimitLockProvider = parallelLimitLockProvider;
}

private readonly ConcurrentDictionary<string, TaskCompletionSource<bool>> _executingTests = new();
Expand Down Expand Up @@ -74,7 +77,8 @@ private async ValueTask ExecuteTestInternalAsync(AbstractExecutableTest test, Ca
{
try
{
// First, execute all dependencies recursively
// First, execute all dependencies recursively (without holding the limiter
// semaphore — avoids deadlocks in dependency chains sharing a limiter).
foreach (var dependency in test.Dependencies)
{
await ExecuteTestAsync(dependency.Test, cancellationToken).ConfigureAwait(false);
Expand All @@ -87,21 +91,38 @@ private async ValueTask ExecuteTestInternalAsync(AbstractExecutableTest test, Ca
}
}

test.State = TestState.Running;
test.StartTime = DateTimeOffset.UtcNow;
// Acquired here (not in the scheduler) so the limit is enforced
// regardless of entry point — scheduler or dependency recursion.
SemaphoreSlim? acquiredLimiter = null;
try
{
if (test.Context.ParallelLimiter is { } parallelLimiter)
{
var limiter = _parallelLimitLockProvider.GetLock(parallelLimiter);
await limiter.WaitAsync(cancellationToken).ConfigureAwait(false);
acquiredLimiter = limiter;
}

// TestCoordinator handles sending InProgress message
await _testCoordinator.ExecuteTestAsync(test, cancellationToken).ConfigureAwait(false);
test.State = TestState.Running;
test.StartTime = DateTimeOffset.UtcNow;

if ((_isFailFastEnabled || TUnitSettings.Default.Execution.FailFast) && test.Result?.State == TestState.Failed)
{
// Capture the first failure exception before triggering cancellation
if (test.Result.Exception != null)
// TestCoordinator handles sending InProgress message
await _testCoordinator.ExecuteTestAsync(test, cancellationToken).ConfigureAwait(false);

if ((_isFailFastEnabled || TUnitSettings.Default.Execution.FailFast) && test.Result?.State == TestState.Failed)
{
Interlocked.CompareExchange(ref _firstFailFastException, test.Result.Exception, null);
// Capture the first failure exception before triggering cancellation
if (test.Result.Exception != null)
{
Interlocked.CompareExchange(ref _firstFailFastException, test.Result.Exception, null);
}
await _logger.LogErrorAsync($"Test {test.TestId} failed. Triggering fail-fast cancellation.").ConfigureAwait(false);
_failFastCancellationSource.Cancel();
}
await _logger.LogErrorAsync($"Test {test.TestId} failed. Triggering fail-fast cancellation.").ConfigureAwait(false);
_failFastCancellationSource.Cancel();
}
finally
{
acquiredLimiter?.Release();
}
}
catch (Exception ex)
Expand Down
143 changes: 15 additions & 128 deletions TUnit.Engine/Scheduling/TestScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Buffers;
using Microsoft.Testing.Platform.CommandLine;
using TUnit.Core;
using TUnit.Core.Exceptions;
Expand All @@ -19,7 +18,6 @@ internal sealed class TestScheduler : ITestScheduler
private readonly TUnitFrameworkLogger _logger;
private readonly ITestGroupingService _groupingService;
private readonly ITUnitMessageBus _messageBus;
private readonly ParallelLimitLockProvider _parallelLimitLockProvider;
private readonly TestStateManager _testStateManager;
private readonly TestRunner _testRunner;
private readonly CircularDependencyDetector _circularDependencyDetector;
Expand All @@ -36,7 +34,6 @@ public TestScheduler(
ITestGroupingService groupingService,
ITUnitMessageBus messageBus,
ICommandLineOptions commandLineOptions,
ParallelLimitLockProvider parallelLimitLockProvider,
TestStateManager testStateManager,
TestRunner testRunner,
CircularDependencyDetector circularDependencyDetector,
Expand All @@ -49,7 +46,6 @@ public TestScheduler(
_logger = logger;
_groupingService = groupingService;
_messageBus = messageBus;
_parallelLimitLockProvider = parallelLimitLockProvider;
_testStateManager = testStateManager;
_testRunner = testRunner;
_circularDependencyDetector = circularDependencyDetector;
Expand Down Expand Up @@ -330,7 +326,7 @@ await Parallel.ForEachAsync(
new ParallelOptions { CancellationToken = cancellationToken },
async (test, ct) =>
{
test.ExecutionTask ??= ExecuteSingleTestAsync(test, ct);
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, ct).AsTask();
await test.ExecutionTask.ConfigureAwait(false);
}
).ConfigureAwait(false);
Expand All @@ -340,39 +336,13 @@ await Parallel.ForEachAsync(
for (var i = 0; i < tests.Length; i++)
{
var test = tests[i];
tasks[i] = test.ExecutionTask ??= ExecuteSingleTestAsync(test, cancellationToken);
tasks[i] = test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, cancellationToken).AsTask();
}
await Task.WhenAll(tasks).ConfigureAwait(false);
#endif
}
}

#if NET8_0_OR_GREATER
[System.Diagnostics.CodeAnalysis.RequiresUnreferencedCode("Test execution involves reflection for hooks and initialization")]
#endif
private async Task ExecuteSingleTestAsync(
AbstractExecutableTest test,
CancellationToken cancellationToken)
{
if (test.Context.ParallelLimiter != null)
{
var semaphore = _parallelLimitLockProvider.GetLock(test.Context.ParallelLimiter);
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await _testRunner.ExecuteTestAsync(test, cancellationToken).ConfigureAwait(false);
}
finally
{
semaphore.Release();
}
}
else
{
await _testRunner.ExecuteTestAsync(test, cancellationToken).ConfigureAwait(false);
}
}

#if NET8_0_OR_GREATER
[System.Diagnostics.CodeAnalysis.RequiresUnreferencedCode("Test execution involves reflection for hooks and initialization")]
#endif
Expand All @@ -382,7 +352,7 @@ private async Task ExecuteSequentiallyAsync(
{
foreach (var test in tests)
{
test.ExecutionTask ??= ExecuteSingleTestAsync(test, cancellationToken);
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, cancellationToken).AsTask();
await test.ExecutionTask.ConfigureAwait(false);
}
}
Expand All @@ -395,33 +365,19 @@ private async Task ExecuteWithGlobalLimitAsync(
var maxParallelism = _maxParallelism.Value;

#if NET8_0_OR_GREATER
// PERFORMANCE OPTIMIZATION: Partition tests by whether they have parallel limiters
// Tests without limiters can run with unlimited parallelism (avoiding global semaphore overhead)
var testsWithLimiters = new List<AbstractExecutableTest>();
var testsWithoutLimiters = new List<AbstractExecutableTest>();

foreach (var test in tests)
{
if (test.Context.ParallelLimiter != null)
await Parallel.ForEachAsync(
tests,
new ParallelOptions
{
testsWithLimiters.Add(test);
}
else
MaxDegreeOfParallelism = maxParallelism,
CancellationToken = cancellationToken
},
async (test, ct) =>
{
testsWithoutLimiters.Add(test);
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, ct).AsTask();
await test.ExecutionTask.ConfigureAwait(false);
}
}

// Execute both groups concurrently
var limitedTask = testsWithLimiters.Count > 0
? ExecuteWithLimitAsync(testsWithLimiters, maxParallelism, cancellationToken)
: Task.CompletedTask;

var unlimitedTask = testsWithoutLimiters.Count > 0
? ExecuteUnlimitedAsync(testsWithoutLimiters, maxParallelism, cancellationToken)
: Task.CompletedTask;

await Task.WhenAll(limitedTask, unlimitedTask).ConfigureAwait(false);
).ConfigureAwait(false);
#else
// Fallback for netstandard2.0: Manual bounded concurrency using existing semaphore
var tasks = new Task[tests.Length];
Expand All @@ -430,26 +386,11 @@ private async Task ExecuteWithGlobalLimitAsync(
var test = tests[i];
tasks[i] = Task.Run(async () =>
{
SemaphoreSlim? parallelLimiterSemaphore = null;

await globalSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (test.Context.ParallelLimiter != null)
{
parallelLimiterSemaphore = _parallelLimitLockProvider.GetLock(test.Context.ParallelLimiter);
await parallelLimiterSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
}

try
{
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, cancellationToken).AsTask();
await test.ExecutionTask.ConfigureAwait(false);
}
finally
{
parallelLimiterSemaphore?.Release();
}
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, cancellationToken).AsTask();
await test.ExecutionTask.ConfigureAwait(false);
}
finally
{
Expand All @@ -461,60 +402,6 @@ private async Task ExecuteWithGlobalLimitAsync(
#endif
}

#if NET8_0_OR_GREATER
private async Task ExecuteWithLimitAsync(
List<AbstractExecutableTest> tests,
int maxParallelism,
CancellationToken cancellationToken)
{
// Execute tests with parallel limiters using the global limit
await Parallel.ForEachAsync(
tests,
new ParallelOptions
{
MaxDegreeOfParallelism = maxParallelism,
CancellationToken = cancellationToken
},
async (test, ct) =>
{
var parallelLimiterSemaphore = _parallelLimitLockProvider.GetLock(test.Context.ParallelLimiter!);
await parallelLimiterSemaphore.WaitAsync(ct).ConfigureAwait(false);

try
{
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, ct).AsTask();
await test.ExecutionTask.ConfigureAwait(false);
}
finally
{
parallelLimiterSemaphore.Release();
}
}
).ConfigureAwait(false);
}

private async Task ExecuteUnlimitedAsync(
List<AbstractExecutableTest> tests,
int maxParallelism,
CancellationToken cancellationToken)
{
// Execute tests without per-test limiters, but still apply global parallelism limit
await Parallel.ForEachAsync(
tests,
new ParallelOptions
{
MaxDegreeOfParallelism = maxParallelism,
CancellationToken = cancellationToken
},
async (test, ct) =>
{
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, ct).AsTask();
await test.ExecutionTask.ConfigureAwait(false);
}
).ConfigureAwait(false);
}
#endif

private async Task WaitForTasksWithFailFastHandling(IEnumerable<Task> tasks, CancellationToken cancellationToken)
{
try
Expand Down
Loading
Loading