Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/dotnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,13 @@ jobs:
Codacy__ApiKey: ${{ secrets.CODACY_APIKEY }}
CodeCov__Token: ${{ secrets.CODECOV_TOKEN }}
EMAIL_PASSWORD: ${{ secrets.EMAIL_PASSWORD }}

- name: Upload Hang Dumps
if: always()
uses: actions/upload-artifact@v4
with:
name: hang-dumps-${{ matrix.os }}
path: |
**/*.dmp
if-no-files-found: ignore
retention-days: 7
2 changes: 1 addition & 1 deletion src/ModularPipelines.Build/Modules/RunUnitTestsModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ protected override ModuleConfiguration Configure() => ModuleConfiguration.Create
Project = unitTestProjectFile.Path,
NoBuild = true,
Framework = _pipelineSettings.Value.TestFramework,
Arguments = ["--coverage", "--coverage-output-format", "cobertura"],
Arguments = ["--coverage", "--coverage-output-format", "cobertura", "--hangdump", "--hangdump-timeout", "20m"],
Configuration = "Release",
Properties =
[
Expand Down
5 changes: 3 additions & 2 deletions src/ModularPipelines/Console/ConsoleCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,17 @@ public void Install()
_outputLogger = _loggerFactory.CreateLogger("ModularPipelines.Output");

// Install our intercepting writers
// Buffer output when progress is active AND we're not in the middle of flushing
_coordinatedOut = new CoordinatedTextWriter(
this,
_originalConsoleOut,
() => _isProgressActive,
() => _isProgressActive && !_outputCoordinator.IsFlushing,
_secretObfuscator);

_coordinatedError = new CoordinatedTextWriter(
this,
_originalConsoleError,
() => _isProgressActive,
() => _isProgressActive && !_outputCoordinator.IsFlushing,
_secretObfuscator);

System.Console.SetOut(_coordinatedOut);
Expand Down
14 changes: 7 additions & 7 deletions src/ModularPipelines/Console/CoordinatedTextWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal class CoordinatedTextWriter : TextWriter
{
private readonly IConsoleCoordinator _coordinator;
private readonly TextWriter _realConsole;
private readonly Func<bool> _isProgressActive;
private readonly Func<bool> _shouldBuffer;
private readonly ISecretObfuscator _secretObfuscator;
private readonly StringBuilder _lineBuffer = new();
private readonly object _lineBufferLock = new();
Expand All @@ -39,17 +39,17 @@ internal class CoordinatedTextWriter : TextWriter
/// </summary>
/// <param name="coordinator">The console coordinator.</param>
/// <param name="realConsole">The real console to write to when not buffering.</param>
/// <param name="isProgressActive">Function that returns whether progress is active.</param>
/// <param name="shouldBuffer">Function that returns whether output should be buffered.</param>
/// <param name="secretObfuscator">Obfuscator for secrets in output.</param>
public CoordinatedTextWriter(
IConsoleCoordinator coordinator,
TextWriter realConsole,
Func<bool> isProgressActive,
Func<bool> shouldBuffer,
ISecretObfuscator secretObfuscator)
{
_coordinator = coordinator;
_realConsole = realConsole;
_isProgressActive = isProgressActive;
_shouldBuffer = shouldBuffer;
_secretObfuscator = secretObfuscator;
}

Expand All @@ -62,7 +62,7 @@ public override void WriteLine(string? value)
var message = value ?? string.Empty;
var obfuscated = _secretObfuscator.Obfuscate(message, null);

if (!_isProgressActive())
if (!_shouldBuffer())
{
// Progress not running - write directly
_realConsole.WriteLine(obfuscated);
Expand All @@ -89,7 +89,7 @@ public override void Write(string? value)

var obfuscated = _secretObfuscator.Obfuscate(value, null);

if (!_isProgressActive())
if (!_shouldBuffer())
{
// Progress not running - write directly
_realConsole.Write(obfuscated);
Expand Down Expand Up @@ -159,7 +159,7 @@ public override void Flush()
var line = _lineBuffer.ToString();
_lineBuffer.Clear();

if (_isProgressActive())
if (_shouldBuffer())
{
RouteToBuffer(line);
}
Expand Down
6 changes: 6 additions & 0 deletions src/ModularPipelines/Console/IOutputCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ namespace ModularPipelines.Console;
/// </summary>
internal interface IOutputCoordinator
{
/// <summary>
/// Gets whether output flushing is currently in progress.
/// When true, console writes should bypass buffering and go directly to the real console.
/// </summary>
bool IsFlushing { get; }

/// <summary>
/// Sets the progress controller for pause/resume coordination.
/// Must be called before any modules complete.
Expand Down
20 changes: 13 additions & 7 deletions src/ModularPipelines/Console/OutputCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@ internal sealed class OutputCoordinator : IOutputCoordinator
private readonly SemaphoreSlim _writeLock = new(1, 1);

private IProgressController _progressController = NoOpProgressController.Instance;
private bool _isFlushing;
private bool _isProcessingQueue;
private volatile bool _isFlushingOutput;

public OutputCoordinator(
IBuildSystemFormatterProvider formatterProvider,
ILoggerFactory loggerFactory,
IServiceProvider serviceProvider)
{
_formatterProvider = formatterProvider;
_loggerFactory = loggerFactory;
_serviceProvider = serviceProvider;
_formatterProvider = formatterProvider ?? throw new ArgumentNullException(nameof(formatterProvider));
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_logger = loggerFactory.CreateLogger<OutputCoordinator>();
_console = System.Console.Out;
}

/// <inheritdoc />
public bool IsFlushing => _isFlushingOutput;

/// <inheritdoc />
public void SetProgressController(IProgressController controller)
{
Expand All @@ -55,10 +59,10 @@ public async Task EnqueueAndFlushAsync(IModuleOutputBuffer buffer, CancellationT
lock (_queueLock)
{
_pendingQueue.Enqueue(pending);
shouldProcess = !_isFlushing;
shouldProcess = !_isProcessingQueue;
if (shouldProcess)
{
_isFlushing = true;
_isProcessingQueue = true;
}
}

Expand All @@ -85,7 +89,7 @@ private async Task ProcessQueueAsync(CancellationToken cancellationToken)
{
if (_pendingQueue.Count == 0)
{
_isFlushing = false;
_isProcessingQueue = false;
return;
}

Expand All @@ -104,12 +108,14 @@ private async Task ProcessQueueAsync(CancellationToken cancellationToken)
try
{
await _progressController.PauseAsync().ConfigureAwait(false);
_isFlushingOutput = true;
try
{
FlushBuffer(pending.Buffer, formatter);
}
finally
{
_isFlushingOutput = false;
await _progressController.ResumeAsync().ConfigureAwait(false);
}
}
Expand Down
111 changes: 69 additions & 42 deletions src/ModularPipelines/Console/ProgressSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ internal class ProgressSession : IProgressSession, IProgressController
private int _totalModuleCount;
private int _completedModuleCount;

private readonly SemaphoreSlim _pauseLock = new(1, 1);
// Pause coordination - uses signals instead of locks held during slow operations
private readonly object _pauseStateLock = new();
private bool _isPaused;
private bool _inRefresh; // True while ctx.Refresh() is executing
private TaskCompletionSource? _resumeSignal;
private TaskCompletionSource? _refreshCompleted; // Signaled when in-flight refresh completes

public ProgressSession(
ConsoleCoordinator coordinator,
Expand Down Expand Up @@ -102,57 +105,67 @@ await AnsiConsole.Progress()
// Keep alive until all modules complete or cancellation
while (!ctx.IsFinished && !_cancellationToken.IsCancellationRequested)
{
// Check if we should pause for module output
bool shouldRefresh = true;
// Check pause state and prepare for refresh
bool shouldRefresh;
TaskCompletionSource? resumeSignal = null;

await _pauseLock.WaitAsync().ConfigureAwait(false);
try
lock (_pauseStateLock)
{
if (_isPaused)
{
// We're paused - signal that any in-flight refresh is done
_refreshCompleted?.TrySetResult();
resumeSignal = _resumeSignal;
shouldRefresh = false;
}
}
finally
{
_pauseLock.Release();
else
{
// Mark that we're about to refresh
_inRefresh = true;
shouldRefresh = true;
}
}

if (resumeSignal != null)
{
// Wait for resume signal with timeout to prevent stuck UI
// If output takes longer than 60 seconds, auto-resume to prevent indefinite pause
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(60), CancellationToken.None);
var completedTask = await Task.WhenAny(resumeSignal.Task, timeoutTask).ConfigureAwait(false);

if (completedTask == timeoutTask)
{
// Timeout - force resume to prevent stuck state
// Log warning as this indicates a potential issue with output flushing
_logger.LogWarning(
"Progress pause timed out after 60 seconds. Forcing resume to prevent stuck UI. " +
"This may indicate slow I/O or a deadlock in output coordination.");
"Progress pause timed out after 60 seconds. Forcing resume to prevent stuck UI.");

await _pauseLock.WaitAsync().ConfigureAwait(false);
try
lock (_pauseStateLock)
{
_isPaused = false;
_inRefresh = false; // Reset for state consistency
_resumeSignal?.TrySetResult();
_resumeSignal = null;
}
finally
{
_pauseLock.Release();
_refreshCompleted = null;
}
}
}
else if (shouldRefresh)
{
// Manually refresh when not paused - this prevents rendering glitches
// when module output is being written to the console
ctx.Refresh();
try
{
ctx.Refresh();
}
finally
{
// Mark refresh complete and check if pause was requested during refresh
lock (_pauseStateLock)
{
_inRefresh = false;
if (_isPaused)
{
// Pause was requested while we were refreshing - signal completion
_refreshCompleted?.TrySetResult();
}
}
}
}

await Task.Delay(100, CancellationToken.None).ConfigureAwait(false);
Expand Down Expand Up @@ -374,55 +387,69 @@ private void StartProgressTicker(ProgressTask task, TimeSpan estimatedDuration)
/// <inheritdoc />
public async Task PauseAsync()
{
await _pauseLock.WaitAsync().ConfigureAwait(false);
try
TaskCompletionSource? waitForRefresh = null;

lock (_pauseStateLock)
{
if (_isPaused)
{
// Already paused - nothing to do. We don't wait on _refreshCompleted here
// because it could be stale (e.g., from a previous pause that timed out),
// which would cause this caller to wait forever.
return;
}

_isPaused = true;
_resumeSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

// Signal the progress loop to pause
// The loop will clear progress and wait for resume
if (_inRefresh)
{
// Progress loop is currently in ctx.Refresh() - need to wait for it
_refreshCompleted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
waitForRefresh = _refreshCompleted;
}
// else: Progress loop is not refreshing, pause takes effect immediately
}
finally

// Wait for any in-flight refresh to complete (outside the lock)
if (waitForRefresh != null)
{
_pauseLock.Release();
await waitForRefresh.Task.ConfigureAwait(false);
}

// Wait a moment for progress to clear
// Small delay to allow terminal to process any buffered escape sequences
await Task.Delay(50).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task ResumeAsync()
public Task ResumeAsync()
{
await _pauseLock.WaitAsync().ConfigureAwait(false);
try
lock (_pauseStateLock)
{
if (!_isPaused)
{
return;
}

_isPaused = false;
_resumeSignal?.TrySetResult();
_resumeSignal = null;
_refreshCompleted = null;
}
finally
{
_pauseLock.Release();
}

return Task.CompletedTask;
}

#endregion

/// <inheritdoc />
public async ValueTask DisposeAsync()
{
// Signal resume in case we're disposed while paused
// Also clear the signals to prevent any concurrent PauseAsync from hanging
lock (_pauseStateLock)
{
_isPaused = false;
_resumeSignal?.TrySetResult();
_resumeSignal = null;
_refreshCompleted = null;
}

// Signal all tasks to stop
lock (_progressLock)
{
Expand Down
Loading
Loading