From d889070a25063d65fc6483e2a1b92e043be30fd8 Mon Sep 17 00:00:00 2001
From: Tom Longhurst <30480171+thomhurst@users.noreply.github.com>
Date: Sun, 18 Jan 2026 21:33:27 +0000
Subject: [PATCH 01/10] feat(console): add deferred flush methods to
IOutputCoordinator interface
---
.../Console/IOutputCoordinator.cs | 22 +++++++++++++++++++
1 file changed, 22 insertions(+)
diff --git a/src/ModularPipelines/Console/IOutputCoordinator.cs b/src/ModularPipelines/Console/IOutputCoordinator.cs
index 116b6a61e2..0c5911678c 100644
--- a/src/ModularPipelines/Console/IOutputCoordinator.cs
+++ b/src/ModularPipelines/Console/IOutputCoordinator.cs
@@ -27,4 +27,26 @@ internal interface IOutputCoordinator
/// Cancellation token.
/// A task that completes when the buffer has been flushed.
Task EnqueueAndFlushAsync(IModuleOutputBuffer buffer, CancellationToken cancellationToken = default);
+
+ ///
+ /// Sets whether live progress is currently active.
+ /// When active, module output is deferred until pipeline end.
+ ///
+ /// True when progress display is running.
+ void SetProgressActive(bool isActive);
+
+ ///
+ /// Called when a module completes. Decides whether to flush immediately or defer.
+ ///
+ /// The module's output buffer.
+ /// The type of the completed module.
+ /// Cancellation token.
+ Task OnModuleCompletedAsync(IModuleOutputBuffer buffer, Type moduleType, CancellationToken cancellationToken = default);
+
+ ///
+ /// Flushes all deferred output in module completion order.
+ /// Called after progress ends, before results are printed.
+ ///
+ /// Cancellation token.
+ Task FlushDeferredAsync(CancellationToken cancellationToken = default);
}
From 5ee2800fb18f9b18a0d67ed32e235756bbd07f4b Mon Sep 17 00:00:00 2001
From: Tom Longhurst <30480171+thomhurst@users.noreply.github.com>
Date: Sun, 18 Jan 2026 21:36:42 +0000
Subject: [PATCH 02/10] feat(console): implement deferred output storage and
flush in OutputCoordinator
---
.../Console/OutputCoordinator.cs | 88 +++++++++++++++++++
1 file changed, 88 insertions(+)
diff --git a/src/ModularPipelines/Console/OutputCoordinator.cs b/src/ModularPipelines/Console/OutputCoordinator.cs
index 78860832eb..66102d3282 100644
--- a/src/ModularPipelines/Console/OutputCoordinator.cs
+++ b/src/ModularPipelines/Console/OutputCoordinator.cs
@@ -23,6 +23,14 @@ internal sealed class OutputCoordinator : IOutputCoordinator
private IProgressController _progressController = NoOpProgressController.Instance;
private bool _isProcessingQueue;
private volatile bool _isFlushingOutput;
+ private volatile bool _isProgressActive;
+ private readonly List _deferredOutputs = new();
+
+ private readonly record struct DeferredModuleOutput(
+ IModuleOutputBuffer Buffer,
+ Type ModuleType,
+ DateTimeOffset CompletedAt
+ );
public OutputCoordinator(
IBuildSystemFormatterProvider formatterProvider,
@@ -45,6 +53,86 @@ public void SetProgressController(IProgressController controller)
_progressController = controller;
}
+ ///
+ public void SetProgressActive(bool isActive)
+ {
+ _isProgressActive = isActive;
+ }
+
+ ///
+ public async Task OnModuleCompletedAsync(IModuleOutputBuffer buffer, Type moduleType, CancellationToken cancellationToken = default)
+ {
+ if (!buffer.HasOutput)
+ {
+ return;
+ }
+
+ if (_isProgressActive)
+ {
+ // Progress is active - defer output until pipeline end
+ lock (_queueLock)
+ {
+ _deferredOutputs.Add(new DeferredModuleOutput(
+ buffer,
+ moduleType,
+ DateTimeOffset.UtcNow
+ ));
+ }
+ }
+ else
+ {
+ // No progress - flush immediately (existing behavior)
+ await EnqueueAndFlushAsync(buffer, cancellationToken).ConfigureAwait(false);
+ }
+ }
+
+ ///
+ public async Task FlushDeferredAsync(CancellationToken cancellationToken = default)
+ {
+ List toFlush;
+
+ lock (_queueLock)
+ {
+ if (_deferredOutputs.Count == 0)
+ {
+ return;
+ }
+
+ toFlush = _deferredOutputs
+ .OrderBy(x => x.CompletedAt)
+ .ToList();
+ _deferredOutputs.Clear();
+ }
+
+ var formatter = _formatterProvider.GetFormatter();
+
+ await _writeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+ try
+ {
+ _isFlushingOutput = true;
+ try
+ {
+ foreach (var output in toFlush)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ break;
+ }
+
+ FlushBuffer(output.Buffer, formatter);
+ }
+ }
+ finally
+ {
+ _isFlushingOutput = false;
+ }
+ }
+ finally
+ {
+ _writeLock.Release();
+ }
+ }
+
///
public async Task EnqueueAndFlushAsync(IModuleOutputBuffer buffer, CancellationToken cancellationToken = default)
{
From 9af5e21901cf92d545ffb9407aa1d1486b0349cf Mon Sep 17 00:00:00 2001
From: Tom Longhurst <30480171+thomhurst@users.noreply.github.com>
Date: Sun, 18 Jan 2026 21:42:50 +0000
Subject: [PATCH 03/10] feat(logging): use OnModuleCompletedAsync instead of
EnqueueAndFlushAsync
---
src/ModularPipelines/Logging/ModuleLogger.cs | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/src/ModularPipelines/Logging/ModuleLogger.cs b/src/ModularPipelines/Logging/ModuleLogger.cs
index f9eadfa7ba..44da8fb9f9 100644
--- a/src/ModularPipelines/Logging/ModuleLogger.cs
+++ b/src/ModularPipelines/Logging/ModuleLogger.cs
@@ -173,13 +173,13 @@ public override async ValueTask DisposeAsync()
try
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
- await _outputCoordinator.EnqueueAndFlushAsync(_buffer, cts.Token).ConfigureAwait(false);
+ await _outputCoordinator.OnModuleCompletedAsync(_buffer, typeof(T), cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Timeout occurred - log warning, output may be lost
_defaultLogger.LogWarning(
- "Module output flush timed out after 30 seconds for {ModuleType}. Some output may be lost.",
+ "Module output handling timed out after 30 seconds for {ModuleType}.",
typeof(T).Name);
}
catch (Exception ex)
From 2e020662899a7f46cb63b18050b7784bc9d73d6a Mon Sep 17 00:00:00 2001
From: Tom Longhurst <30480171+thomhurst@users.noreply.github.com>
Date: Sun, 18 Jan 2026 21:45:25 +0000
Subject: [PATCH 04/10] feat(console): wire up progress state to
OutputCoordinator
---
src/ModularPipelines/Console/ConsoleCoordinator.cs | 2 ++
1 file changed, 2 insertions(+)
diff --git a/src/ModularPipelines/Console/ConsoleCoordinator.cs b/src/ModularPipelines/Console/ConsoleCoordinator.cs
index 8aa28e9693..a938d1f387 100644
--- a/src/ModularPipelines/Console/ConsoleCoordinator.cs
+++ b/src/ModularPipelines/Console/ConsoleCoordinator.cs
@@ -230,6 +230,7 @@ public async Task BeginProgressAsync(
// Wire up the progress controller for output coordination
_outputCoordinator.SetProgressController(session);
+ _outputCoordinator.SetProgressActive(true);
_activeSession = session;
@@ -247,6 +248,7 @@ internal void EndProgressPhase()
lock (_phaseLock)
{
_outputCoordinator.SetProgressController(NoOpProgressController.Instance);
+ _outputCoordinator.SetProgressActive(false);
_isProgressActive = false;
_activeSession = null;
}
From d2ab0a792b1c3d3e3c086412f235d03c3719b592 Mon Sep 17 00:00:00 2001
From: Tom Longhurst <30480171+thomhurst@users.noreply.github.com>
Date: Sun, 18 Jan 2026 21:48:14 +0000
Subject: [PATCH 05/10] feat(executors): call FlushDeferredAsync after progress
ends
---
.../Executors/PipelineOutputCoordinator.cs | 17 +++++++++++++----
1 file changed, 13 insertions(+), 4 deletions(-)
diff --git a/src/ModularPipelines/Engine/Executors/PipelineOutputCoordinator.cs b/src/ModularPipelines/Engine/Executors/PipelineOutputCoordinator.cs
index 3ce55fdd74..0cd665d06e 100644
--- a/src/ModularPipelines/Engine/Executors/PipelineOutputCoordinator.cs
+++ b/src/ModularPipelines/Engine/Executors/PipelineOutputCoordinator.cs
@@ -26,19 +26,22 @@ internal class PipelineOutputCoordinator : IPipelineOutputCoordinator
private readonly IInternalSummaryLogger _summaryLogger;
private readonly IExceptionBuffer _exceptionBuffer;
private readonly IConsoleCoordinator _consoleCoordinator;
+ private readonly IOutputCoordinator _outputCoordinator;
public PipelineOutputCoordinator(
IPrintProgressExecutor printProgressExecutor,
IConsolePrinter consolePrinter,
IInternalSummaryLogger summaryLogger,
IExceptionBuffer exceptionBuffer,
- IConsoleCoordinator consoleCoordinator)
+ IConsoleCoordinator consoleCoordinator,
+ IOutputCoordinator outputCoordinator)
{
_printProgressExecutor = printProgressExecutor;
_consolePrinter = consolePrinter;
_summaryLogger = summaryLogger;
_exceptionBuffer = exceptionBuffer;
_consoleCoordinator = consoleCoordinator;
+ _outputCoordinator = outputCoordinator;
}
///
@@ -48,7 +51,7 @@ public async Task InitializeAsync()
_consoleCoordinator.Install();
var printProgressExecutor = await _printProgressExecutor.InitializeAsync().ConfigureAwait(false);
- return new PipelineOutputScope(printProgressExecutor, _consoleCoordinator);
+ return new PipelineOutputScope(printProgressExecutor, _consoleCoordinator, _outputCoordinator);
}
///
@@ -77,13 +80,16 @@ private sealed class PipelineOutputScope : IPipelineOutputScope
{
private readonly IPrintProgressExecutor _printProgressExecutor;
private readonly IConsoleCoordinator _consoleCoordinator;
+ private readonly IOutputCoordinator _outputCoordinator;
public PipelineOutputScope(
IPrintProgressExecutor printProgressExecutor,
- IConsoleCoordinator consoleCoordinator)
+ IConsoleCoordinator consoleCoordinator,
+ IOutputCoordinator outputCoordinator)
{
_printProgressExecutor = printProgressExecutor;
_consoleCoordinator = consoleCoordinator;
+ _outputCoordinator = outputCoordinator;
}
public async ValueTask DisposeAsync()
@@ -92,7 +98,10 @@ public async ValueTask DisposeAsync()
// 1. Stop progress display FIRST (ends buffering phase)
await _printProgressExecutor.DisposeAsync().ConfigureAwait(false);
- // 2. Flush buffered module output from coordinator
+ // 2. Flush deferred module output (in completion order)
+ await _outputCoordinator.FlushDeferredAsync().ConfigureAwait(false);
+
+ // 3. Flush any unattributed output from coordinator
_consoleCoordinator.FlushModuleOutput();
}
}
From 2c25be4c0d1c2b55237fee62d453c4bc02c8fc96 Mon Sep 17 00:00:00 2001
From: Tom Longhurst <30480171+thomhurst@users.noreply.github.com>
Date: Sun, 18 Jan 2026 21:51:33 +0000
Subject: [PATCH 06/10] feat(console): add IAsyncDisposable safety net to
OutputCoordinator
---
.../Console/OutputCoordinator.cs | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
diff --git a/src/ModularPipelines/Console/OutputCoordinator.cs b/src/ModularPipelines/Console/OutputCoordinator.cs
index 66102d3282..0d79d08324 100644
--- a/src/ModularPipelines/Console/OutputCoordinator.cs
+++ b/src/ModularPipelines/Console/OutputCoordinator.cs
@@ -8,7 +8,7 @@ namespace ModularPipelines.Console;
/// Coordinates immediate flushing of module output with FIFO ordering and synchronization.
///
[ExcludeFromCodeCoverage]
-internal sealed class OutputCoordinator : IOutputCoordinator
+internal sealed class OutputCoordinator : IOutputCoordinator, IAsyncDisposable
{
private readonly IBuildSystemFormatterProvider _formatterProvider;
private readonly ILoggerFactory _loggerFactory;
@@ -246,4 +246,19 @@ public PendingFlush(IModuleOutputBuffer buffer)
Buffer = buffer;
}
}
+
+ ///
+ /// Safety net: flushes any remaining deferred output on disposal.
+ ///
+ public async ValueTask DisposeAsync()
+ {
+ try
+ {
+ await FlushDeferredAsync(CancellationToken.None).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, "Failed to flush deferred output during disposal");
+ }
+ }
}
From c523f622958ce822a69341fb4753f7308ef29ea3 Mon Sep 17 00:00:00 2001
From: Tom Longhurst <30480171+thomhurst@users.noreply.github.com>
Date: Sun, 18 Jan 2026 22:01:38 +0000
Subject: [PATCH 07/10] test(console): add unit tests for deferred output flush
---
.../OutputCoordinatorDeferredFlushTests.cs | 69 +++++++++++++++++++
1 file changed, 69 insertions(+)
create mode 100644 test/ModularPipelines.UnitTests/Console/OutputCoordinatorDeferredFlushTests.cs
diff --git a/test/ModularPipelines.UnitTests/Console/OutputCoordinatorDeferredFlushTests.cs b/test/ModularPipelines.UnitTests/Console/OutputCoordinatorDeferredFlushTests.cs
new file mode 100644
index 0000000000..39b3c96df2
--- /dev/null
+++ b/test/ModularPipelines.UnitTests/Console/OutputCoordinatorDeferredFlushTests.cs
@@ -0,0 +1,69 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using ModularPipelines.Context;
+using ModularPipelines.Extensions;
+using ModularPipelines.Modules;
+using ModularPipelines.Options;
+using ModularPipelines.TestHelpers;
+
+namespace ModularPipelines.UnitTests.Console;
+
+public class OutputCoordinatorDeferredFlushTests : TestBase
+{
+ private class Module1 : Module
+ {
+ protected internal override async Task ExecuteAsync(IModuleContext context, CancellationToken cancellationToken)
+ {
+ context.Logger.LogInformation("Module1 output");
+ await Task.Delay(50, cancellationToken);
+ return true;
+ }
+ }
+
+ private class Module2 : Module
+ {
+ protected internal override async Task ExecuteAsync(IModuleContext context, CancellationToken cancellationToken)
+ {
+ context.Logger.LogInformation("Module2 output");
+ await Task.Yield();
+ return true;
+ }
+ }
+
+ [Test]
+ public async Task Pipeline_Completes_When_Progress_Disabled()
+ {
+ var host = await TestPipelineHostBuilder.Create()
+ .ConfigureServices((_, services) =>
+ {
+ services.Configure(opt => opt.ShowProgressInConsole = false);
+ })
+ .AddModule()
+ .BuildHostAsync();
+
+ await using (host)
+ {
+ var result = await host.ExecutePipelineAsync();
+ await Assert.That(result.Status).IsEqualTo(ModularPipelines.Enums.Status.Successful);
+ }
+ }
+
+ [Test]
+ public async Task Pipeline_With_Multiple_Modules_Completes_Successfully()
+ {
+ var host = await TestPipelineHostBuilder.Create()
+ .ConfigureServices((_, services) =>
+ {
+ services.Configure(opt => opt.ShowProgressInConsole = false);
+ })
+ .AddModule()
+ .AddModule()
+ .BuildHostAsync();
+
+ await using (host)
+ {
+ var result = await host.ExecutePipelineAsync();
+ await Assert.That(result.Status).IsEqualTo(ModularPipelines.Enums.Status.Successful);
+ }
+ }
+}
From 2cff4d684899aae1ebbf1d960c5dbd38ce44ae65 Mon Sep 17 00:00:00 2001
From: Tom Longhurst <30480171+thomhurst@users.noreply.github.com>
Date: Sun, 18 Jan 2026 22:40:09 +0000
Subject: [PATCH 08/10] fix(console): address PR review feedback for deferred
output
- Fix race condition: move SetProgressActive(true) inside _phaseLock
to prevent modules from completing before OutputCoordinator knows
progress is active
- Add explicit SetProgressActive(false) when creating NoOpProgressSession
for state consistency
- Restore "Some output may be lost" warning in timeout message
---
.../Console/ConsoleCoordinator.cs | 30 ++++++++++++-------
src/ModularPipelines/Logging/ModuleLogger.cs | 2 +-
2 files changed, 20 insertions(+), 12 deletions(-)
diff --git a/src/ModularPipelines/Console/ConsoleCoordinator.cs b/src/ModularPipelines/Console/ConsoleCoordinator.cs
index a938d1f387..51d26a1ca0 100644
--- a/src/ModularPipelines/Console/ConsoleCoordinator.cs
+++ b/src/ModularPipelines/Console/ConsoleCoordinator.cs
@@ -207,10 +207,14 @@ public async Task BeginProgressAsync(
if (!_options.Value.ShowProgressInConsole)
{
// Return a no-op session if progress is disabled
+ // Explicitly ensure deferred output is disabled for consistency
+ _outputCoordinator.SetProgressActive(false);
_activeSession = new NoOpProgressSession();
return _activeSession;
}
+ ProgressSession session;
+
lock (_phaseLock)
{
if (_isProgressActive)
@@ -219,20 +223,24 @@ public async Task BeginProgressAsync(
}
_isProgressActive = true;
- }
- var session = new ProgressSession(
- this,
- modules,
- _options,
- _loggerFactory,
- cancellationToken);
+ // CRITICAL: Set OutputCoordinator's progress state inside the lock
+ // to prevent race conditions where a module completes between
+ // _isProgressActive = true and OutputCoordinator being notified
+ _outputCoordinator.SetProgressActive(true);
- // Wire up the progress controller for output coordination
- _outputCoordinator.SetProgressController(session);
- _outputCoordinator.SetProgressActive(true);
+ session = new ProgressSession(
+ this,
+ modules,
+ _options,
+ _loggerFactory,
+ cancellationToken);
- _activeSession = session;
+ // Wire up the progress controller for output coordination
+ _outputCoordinator.SetProgressController(session);
+
+ _activeSession = session;
+ }
// Start the progress display
session.Start();
diff --git a/src/ModularPipelines/Logging/ModuleLogger.cs b/src/ModularPipelines/Logging/ModuleLogger.cs
index 44da8fb9f9..419eab50f0 100644
--- a/src/ModularPipelines/Logging/ModuleLogger.cs
+++ b/src/ModularPipelines/Logging/ModuleLogger.cs
@@ -179,7 +179,7 @@ public override async ValueTask DisposeAsync()
{
// Timeout occurred - log warning, output may be lost
_defaultLogger.LogWarning(
- "Module output handling timed out after 30 seconds for {ModuleType}.",
+ "Module output handling timed out after 30 seconds for {ModuleType}. Some output may be lost.",
typeof(T).Name);
}
catch (Exception ex)
From 4dddd7115612404573aa4cf14d777724e5dfd323 Mon Sep 17 00:00:00 2001
From: Tom Longhurst <30480171+thomhurst@users.noreply.github.com>
Date: Sun, 18 Jan 2026 22:45:43 +0000
Subject: [PATCH 09/10] fix(console): address second round of PR review
feedback
- Add separate _deferredLock for deferred operations to reduce lock
contention with immediate flush operations
- Clear stale deferred outputs when SetProgressActive(true) is called
to prevent memory leaks from previous crashed pipeline runs
- Improve DisposeAsync documentation to clarify it's a safety net and
the primary flush happens in PipelineOutputScope.DisposeAsync
---
.../Console/OutputCoordinator.cs | 40 ++++++++++++++++++-
1 file changed, 38 insertions(+), 2 deletions(-)
diff --git a/src/ModularPipelines/Console/OutputCoordinator.cs b/src/ModularPipelines/Console/OutputCoordinator.cs
index 0d79d08324..1f25c9bc6c 100644
--- a/src/ModularPipelines/Console/OutputCoordinator.cs
+++ b/src/ModularPipelines/Console/OutputCoordinator.cs
@@ -20,6 +20,10 @@ internal sealed class OutputCoordinator : IOutputCoordinator, IAsyncDisposable
private readonly Queue _pendingQueue = new();
private readonly SemaphoreSlim _writeLock = new(1, 1);
+ // Separate lock for deferred output operations to reduce contention
+ // with immediate flush operations that use _queueLock
+ private readonly object _deferredLock = new();
+
private IProgressController _progressController = NoOpProgressController.Instance;
private bool _isProcessingQueue;
private volatile bool _isFlushingOutput;
@@ -56,6 +60,23 @@ public void SetProgressController(IProgressController controller)
///
public void SetProgressActive(bool isActive)
{
+ if (isActive)
+ {
+ // Starting a new progress session - check for stale deferred outputs
+ // from a previous run that crashed before FlushDeferredAsync was called
+ lock (_deferredLock)
+ {
+ if (_deferredOutputs.Count > 0)
+ {
+ _logger.LogWarning(
+ "Found {Count} stale deferred outputs from a previous pipeline run. " +
+ "This indicates FlushDeferredAsync was not called. Clearing to prevent memory leak.",
+ _deferredOutputs.Count);
+ _deferredOutputs.Clear();
+ }
+ }
+ }
+
_isProgressActive = isActive;
}
@@ -70,7 +91,8 @@ public async Task OnModuleCompletedAsync(IModuleOutputBuffer buffer, Type module
if (_isProgressActive)
{
// Progress is active - defer output until pipeline end
- lock (_queueLock)
+ // Uses separate lock to avoid contention with immediate flush operations
+ lock (_deferredLock)
{
_deferredOutputs.Add(new DeferredModuleOutput(
buffer,
@@ -91,13 +113,14 @@ public async Task FlushDeferredAsync(CancellationToken cancellationToken = defau
{
List toFlush;
- lock (_queueLock)
+ lock (_deferredLock)
{
if (_deferredOutputs.Count == 0)
{
return;
}
+ // Order by completion time to maintain consistent output ordering
toFlush = _deferredOutputs
.OrderBy(x => x.CompletedAt)
.ToList();
@@ -250,6 +273,19 @@ public PendingFlush(IModuleOutputBuffer buffer)
///
/// Safety net: flushes any remaining deferred output on disposal.
///
+ ///
+ ///
+ /// This is a defensive measure - the primary flush happens in
+ ///
+ /// which explicitly calls after progress ends.
+ ///
+ ///
+ /// This safety net handles edge cases where the normal flush sequence is interrupted
+ /// (e.g., due to exceptions or abnormal termination). Note that this requires the
+ /// DI container to be disposed via DisposeAsync() - synchronous disposal
+ /// may not trigger this method.
+ ///
+ ///
public async ValueTask DisposeAsync()
{
try
From c9be15af607b63d9666a985107629c4e211948be Mon Sep 17 00:00:00 2001
From: Tom Longhurst <30480171+thomhurst@users.noreply.github.com>
Date: Sun, 18 Jan 2026 22:50:19 +0000
Subject: [PATCH 10/10] fix(console): address third round of PR review feedback
- Remove IAsyncDisposable implementation (dead code - DI container
doesn't automatically dispose async disposables for singletons)
- Clear deferred outputs when SetProgressActive(false) is called to
handle cancellation scenarios where FlushDeferredAsync wasn't called
- This ensures memory cleanup happens whenever progress ends, regardless
of whether it was cancelled or completed normally
---
.../Console/OutputCoordinator.cs | 52 +++++++------------
1 file changed, 19 insertions(+), 33 deletions(-)
diff --git a/src/ModularPipelines/Console/OutputCoordinator.cs b/src/ModularPipelines/Console/OutputCoordinator.cs
index 1f25c9bc6c..2700ecc49d 100644
--- a/src/ModularPipelines/Console/OutputCoordinator.cs
+++ b/src/ModularPipelines/Console/OutputCoordinator.cs
@@ -8,7 +8,7 @@ namespace ModularPipelines.Console;
/// Coordinates immediate flushing of module output with FIFO ordering and synchronization.
///
[ExcludeFromCodeCoverage]
-internal sealed class OutputCoordinator : IOutputCoordinator, IAsyncDisposable
+internal sealed class OutputCoordinator : IOutputCoordinator
{
private readonly IBuildSystemFormatterProvider _formatterProvider;
private readonly ILoggerFactory _loggerFactory;
@@ -60,12 +60,12 @@ public void SetProgressController(IProgressController controller)
///
public void SetProgressActive(bool isActive)
{
- if (isActive)
+ lock (_deferredLock)
{
- // Starting a new progress session - check for stale deferred outputs
- // from a previous run that crashed before FlushDeferredAsync was called
- lock (_deferredLock)
+ if (isActive)
{
+ // Starting a new progress session - check for stale deferred outputs
+ // from a previous run that crashed before FlushDeferredAsync was called
if (_deferredOutputs.Count > 0)
{
_logger.LogWarning(
@@ -75,6 +75,20 @@ public void SetProgressActive(bool isActive)
_deferredOutputs.Clear();
}
}
+ else
+ {
+ // Progress ending - clean up any remaining deferred outputs that weren't flushed
+ // This handles cancellation scenarios where FlushDeferredAsync wasn't called
+ if (_deferredOutputs.Count > 0)
+ {
+ _logger.LogWarning(
+ "Progress ended with {Count} unflushed deferred outputs. " +
+ "This indicates FlushDeferredAsync was not called (possibly due to cancellation). " +
+ "Clearing to prevent memory leak.",
+ _deferredOutputs.Count);
+ _deferredOutputs.Clear();
+ }
+ }
}
_isProgressActive = isActive;
@@ -269,32 +283,4 @@ public PendingFlush(IModuleOutputBuffer buffer)
Buffer = buffer;
}
}
-
- ///
- /// Safety net: flushes any remaining deferred output on disposal.
- ///
- ///
- ///
- /// This is a defensive measure - the primary flush happens in
- ///
- /// which explicitly calls after progress ends.
- ///
- ///
- /// This safety net handles edge cases where the normal flush sequence is interrupted
- /// (e.g., due to exceptions or abnormal termination). Note that this requires the
- /// DI container to be disposed via DisposeAsync() - synchronous disposal
- /// may not trigger this method.
- ///
- ///
- public async ValueTask DisposeAsync()
- {
- try
- {
- await FlushDeferredAsync(CancellationToken.None).ConfigureAwait(false);
- }
- catch (Exception ex)
- {
- _logger.LogWarning(ex, "Failed to flush deferred output during disposal");
- }
- }
}