Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Microsoft.Testing.Platform.CommandLine;
using Microsoft.Testing.Platform.Extensions;
using Microsoft.Testing.Platform.Extensions.CommandLine;

namespace TUnit.Engine.CommandLineProviders;

internal class AdaptiveMetricsCommandProvider(IExtension extension) : ICommandLineOptionsProvider
{
public const string AdaptiveMetrics = "adaptive-metrics";

public Task<bool> IsEnabledAsync()
{
return extension.IsEnabledAsync();
}

public string Uid => extension.Uid;

public string Version => extension.Version;

public string DisplayName => extension.DisplayName;

public string Description => extension.Description;

public IReadOnlyCollection<CommandLineOption> GetCommandLineOptions()
{
return
[
new CommandLineOption(AdaptiveMetrics, "Enable detailed metrics logging for adaptive parallelism", ArgumentArity.Zero, false)
];
}

public Task<ValidationResult> ValidateOptionArgumentsAsync(CommandLineOption commandOption, string[] arguments)
{
return ValidationResult.ValidTask;
}

public Task<ValidationResult> ValidateCommandLineOptionsAsync(ICommandLineOptions commandLineOptions)
{
return ValidationResult.ValidTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using Microsoft.Testing.Platform.CommandLine;
using Microsoft.Testing.Platform.Extensions;
using Microsoft.Testing.Platform.Extensions.CommandLine;

namespace TUnit.Engine.CommandLineProviders;

internal class ParallelismStrategyCommandProvider(IExtension extension) : ICommandLineOptionsProvider
{
public const string ParallelismStrategy = "parallelism-strategy";

public Task<bool> IsEnabledAsync()
{
return extension.IsEnabledAsync();
}

public string Uid => extension.Uid;

public string Version => extension.Version;

public string DisplayName => extension.DisplayName;

public string Description => extension.Description;

public IReadOnlyCollection<CommandLineOption> GetCommandLineOptions()
{
return
[
new CommandLineOption(ParallelismStrategy, "Parallelism strategy: fixed or adaptive (default: adaptive)", ArgumentArity.ExactlyOne, false)
];
}

public Task<ValidationResult> ValidateOptionArgumentsAsync(CommandLineOption commandOption, string[] arguments)
{
if (commandOption.Name == ParallelismStrategy && arguments.Length != 1)
{
return ValidationResult.InvalidTask("A single value must be provided for parallelism strategy");
}

if (commandOption.Name == ParallelismStrategy)
{
var strategy = arguments[0].ToLowerInvariant();
if (strategy != "fixed" && strategy != "adaptive")
{
return ValidationResult.InvalidTask("Parallelism strategy must be 'fixed' or 'adaptive'");
}
}

return ValidationResult.ValidTask;
}

public Task<ValidationResult> ValidateCommandLineOptionsAsync(ICommandLineOptions commandLineOptions)
{
return ValidationResult.ValidTask;
}
}
4 changes: 4 additions & 0 deletions TUnit.Engine/Extensions/TestApplicationBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public static void AddTUnit(this ITestApplicationBuilder testApplicationBuilder)
testApplicationBuilder.CommandLine.AddProvider(() => new ReflectionModeCommandProvider(extension));
testApplicationBuilder.CommandLine.AddProvider(() => new DisableLogoCommandProvider(extension));

// Adaptive parallelism command providers
testApplicationBuilder.CommandLine.AddProvider(() => new ParallelismStrategyCommandProvider(extension));
testApplicationBuilder.CommandLine.AddProvider(() => new AdaptiveMetricsCommandProvider(extension));

// Unified verbosity control (replaces HideTestOutput, DisableLogo, DetailedStacktrace)
testApplicationBuilder.CommandLine.AddProvider(() => new VerbosityCommandProvider(extension));

Expand Down
243 changes: 243 additions & 0 deletions TUnit.Engine/Scheduling/AdaptiveParallelismController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
using TUnit.Core.Logging;
using TUnit.Engine.Logging;
using TUnit.Engine.Services;

namespace TUnit.Engine.Scheduling;

/// <summary>
/// Controller that runs in the background and adjusts parallelism based on system metrics
/// </summary>
internal sealed class AdaptiveParallelismController : IDisposable
{
private readonly AdaptiveSemaphore _semaphore;
private readonly SystemMetricsCollector _metricsCollector;
private readonly ParallelismAdjustmentStrategy _adjustmentStrategy;
private readonly TUnitFrameworkLogger _logger;
private readonly bool _enableMetricsLogging;
private readonly CancellationTokenSource _cancellationSource;
private readonly Task _adjustmentTask;
private readonly Task? _metricsLoggingTask;
private int _currentParallelism;
private bool _disposed;

public AdaptiveParallelismController(
AdaptiveSemaphore semaphore,
TUnitFrameworkLogger logger,
int minParallelism,
int maxParallelism,
int initialParallelism,
bool enableMetricsLogging)
{
_semaphore = semaphore ?? throw new ArgumentNullException(nameof(semaphore));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_enableMetricsLogging = enableMetricsLogging;
_currentParallelism = initialParallelism;

_metricsCollector = new SystemMetricsCollector();
_adjustmentStrategy = new ParallelismAdjustmentStrategy(minParallelism, maxParallelism);
_cancellationSource = new CancellationTokenSource();

// Start background tasks
_adjustmentTask = RunAdjustmentLoopAsync(_cancellationSource.Token);

if (_enableMetricsLogging)
{
_metricsLoggingTask = RunMetricsLoggingLoopAsync(_cancellationSource.Token);
}
}

/// <summary>
/// Gets the current parallelism level
/// </summary>
public int CurrentParallelism => _currentParallelism;

/// <summary>
/// Records a test completion for metrics
/// </summary>
public void RecordTestCompletion(TimeSpan executionTime)
{
_adjustmentStrategy.RecordTestCompletion(executionTime);
}

private async Task RunAdjustmentLoopAsync(CancellationToken cancellationToken)
{
#if NET6_0_OR_GREATER
// Use PeriodicTimer for cleaner async timing (500ms intervals)
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(500));

while (!cancellationToken.IsCancellationRequested)
{
try
{
await timer.WaitForNextTickAsync(cancellationToken);
await AdjustParallelismAsync();
}
catch (OperationCanceledException)
{
// Expected when cancellation is requested
break;
}
catch (Exception ex)
{
// Log error but don't crash the adjustment loop
await _logger.LogErrorAsync($"Error in adaptive parallelism adjustment: {ex.Message}");
}
}
#else
// Fallback for netstandard2.0
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(500, cancellationToken);
await AdjustParallelismAsync();
}
catch (OperationCanceledException)
{
// Expected when cancellation is requested
break;
}
catch (Exception ex)
{
// Log error but don't crash the adjustment loop
await _logger.LogErrorAsync($"Error in adaptive parallelism adjustment: {ex.Message}");
}
}
#endif
}

private async Task RunMetricsLoggingLoopAsync(CancellationToken cancellationToken)
{
// Initial delay to let tests start
await Task.Delay(1000, cancellationToken);

#if NET6_0_OR_GREATER
// Use PeriodicTimer for metrics logging (3 second intervals)
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(3));

while (!cancellationToken.IsCancellationRequested)
{
try
{
await timer.WaitForNextTickAsync(cancellationToken);

// Get current metrics
var metrics = _metricsCollector.GetMetrics();
await LogMetrics(metrics);
}
catch (OperationCanceledException)
{
// Expected when cancellation is requested
break;
}
catch (Exception ex)
{
// Log error but continue
await _logger.LogErrorAsync($"Error logging adaptive metrics: {ex.Message}");
}
}
#else
// Fallback for netstandard2.0
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(3000, cancellationToken);

// Get current metrics
var metrics = _metricsCollector.GetMetrics();
await LogMetrics(metrics);
}
catch (OperationCanceledException)
{
// Expected when cancellation is requested
break;
}
catch (Exception ex)
{
// Log error but continue
await _logger.LogErrorAsync($"Error logging adaptive metrics: {ex.Message}");
}
}
#endif
}

private async Task AdjustParallelismAsync()
{
// Collect metrics
var metrics = _metricsCollector.GetMetrics();

// Calculate adjustment
var recommendation = _adjustmentStrategy.CalculateAdjustment(metrics, _currentParallelism);

// Apply adjustment if needed
if (recommendation.NewParallelism != _currentParallelism)
{
_semaphore.AdjustMaxCount(recommendation.NewParallelism);
var oldParallelism = _currentParallelism;
_currentParallelism = recommendation.NewParallelism;

if (_enableMetricsLogging)
{
await LogAdjustment(oldParallelism, recommendation, metrics);
}
}
}

private async Task LogAdjustment(int oldParallelism, AdjustmentRecommendation recommendation, SystemMetrics metrics)
{
var direction = recommendation.Direction == AdjustmentDirection.Increase ? "↑" : "↓";
await _logger.LogDebugAsync(
$"[Adaptive] Parallelism adjusted: {oldParallelism} {direction} {recommendation.NewParallelism} | " +
$"Reason: {recommendation.Reason} | " +
$"CPU: {metrics.SystemCpuUsagePercent:F1}% | " +
$"Threads: {metrics.AvailableWorkerThreads}/{metrics.MaxWorkerThreads} | " +
$"Memory: {metrics.TotalMemoryBytes / 1_000_000}MB");
}

private async Task LogMetrics(SystemMetrics metrics)
{
var semaphoreAvailable = _semaphore.CurrentCount;
var activeTests = _currentParallelism - semaphoreAvailable;

await _logger.LogDebugAsync(
$"[Adaptive] Metrics | " +
$"Parallelism: {_currentParallelism} (Active: {activeTests}, Available: {semaphoreAvailable}) | " +
$"CPU: {metrics.SystemCpuUsagePercent:F1}% | " +
$"Threads: {metrics.AvailableWorkerThreads}/{metrics.MaxWorkerThreads} | " +
$"Pending: {metrics.PendingWorkItems} | " +
$"Memory: {metrics.TotalMemoryBytes / 1_000_000}MB");
}

public void Dispose()
{
if (_disposed)
return;

_disposed = true;

// Cancel background tasks
_cancellationSource.Cancel();

// Wait for tasks to complete (with timeout)
try
{
var tasksToWait = new[] { _adjustmentTask, _metricsLoggingTask }
.Where(t => t != null)
.Cast<Task>()
.ToArray();

if (tasksToWait.Length > 0)
{
Task.WaitAll(tasksToWait, TimeSpan.FromSeconds(5));
}
}
catch (AggregateException)
{
// Tasks may have been cancelled, which is expected
}

_cancellationSource.Dispose();
_metricsCollector?.Dispose();
}
}
Loading
Loading