Conversation
Introduces a distributed execution mode where pipeline instances coordinate across multiple runners (e.g., GitHub Actions matrix). Instance 0 becomes the master orchestrating work, while additional instances act as workers. Key components: - Core abstractions: IDistributedCoordinator, IDistributedArtifactStore - Redis coordinator backend for cross-instance orchestration - S3-compatible artifact store (supports Cloudflare R2, AWS S3, MinIO) - Attribute-driven artifact sharing ([ProducesArtifact], [ConsumesArtifact]) - Automatic deduplication of artifact downloads for same paths - Matrix module expansion and capability-based routing - Worker health monitoring and cancellation propagation Includes 86+ unit tests across distributed, Redis, and S3 test projects.
The artifacts/ gitignore rule was case-insensitively matching our source Artifacts/ directories. Added exceptions for src/**/Artifacts/ and test/**/Artifacts/ to allow these to be tracked.
There was a problem hiding this comment.
Code Review: Distributed Workers Mode
This is a substantial and well-structured PR. The architecture (pluggable coordinators, attribute-driven artifact routing, capability matching) is thoughtfully designed. Below are the issues I found, roughly ordered by severity.
Critical: Reflection to access internal CompletionSource
WorkerModuleExecutor.cs — The worker executor accesses an internal CompletionSource via private reflection to await module execution:
var completionSourceProp = module.GetType().GetProperty(
"CompletionSource",
System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic)!;This is a fragile coupling to implementation internals. It will break silently if the property is renamed/removed, produces null-ref exceptions on ! if not found, and is incompatible with AOT/Native AOT trimming. The worker should execute the module through the existing public IModuleRunner / ModuleExecutionPipeline contract rather than bypassing it. If the module result isn't accessible through the public API, that's the missing piece to add — not a reflection hack.
Major: BuildServiceProvider() called inside ConfigureServices
DistributedPipelinePlugin.cs — The plugin calls services.BuildServiceProvider() (twice — once for options, once for the warning logger):
var sp = services.BuildServiceProvider(); // first call
// ...
var tempSp = services.BuildServiceProvider(); // second callCalling BuildServiceProvider() inside ConfigureServices is explicitly flagged as an anti-pattern by the .NET team (docs). It creates a separate, orphaned container that instantiates singletons a second time, may access services before they're fully configured, and creates a memory leak. The correct approach is to use a factory-pattern registration (AddSingleton(sp => ...)) or resolve options through IOptions<T> inside the factory delegates that are already present.
Major: WorkerHealthMonitor is a non-functional stub
WorkerHealthMonitor.cs — The core of the monitor loop is empty:
foreach (var worker in workers)
{
if (worker.Status == WorkerStatus.TimedOut || worker.Status == WorkerStatus.Disconnected)
continue;
// Check if worker has timed out based on registration time
// In a real implementation, we'd track last heartbeat time
// For now, rely on the coordinator's heartbeat tracking
}Heartbeat data is never read and no timeout is enforced. The InMemoryDistributedCoordinator.SendHeartbeatAsync updates worker status but doesn't store a timestamp. There is no path where a worker is ever marked as TimedOut. The PR description marks worker failure resilience (US6) as complete, but the implementation is a no-op. Either the feature should be implemented or the PR description should be updated and this registered as a known gap.
Major: Capability dequeue creates a livelock in InMemoryDistributedCoordinator
InMemoryDistributedCoordinator.DequeueModuleAsync — When a worker dequeues an assignment it can't handle (wrong capabilities), it re-enqueues it and spins:
// Re-enqueue if this worker can't handle it
_workQueue.Writer.TryWrite(assignment);
await Task.Delay(50, cancellationToken);With N workers and a module requiring a capability no currently-connected worker possesses, all N workers will spin in a 50ms loop indefinitely — reading, re-enqueuing, and re-reading the same assignment. This wastes CPU and produces misleading log noise. A better design is for the coordinator to hold capability-mismatched assignments in a side buffer until a compatible worker registers, then move them to the main queue. Alternatively, the dequeue should filter server-side so workers never receive assignments they cannot handle.
Major: CancellationToken.None throughout worker execution loop
WorkerModuleExecutor.cs — Nearly all coordinator calls use CancellationToken.None:
await _coordinator.RegisterWorkerAsync(registration, CancellationToken.None);
var assignment = await _coordinator.DequeueModuleAsync(capabilities, CancellationToken.None);
await _coordinator.PublishResultAsync(serialized, CancellationToken.None);This means clean shutdown via the host's cancellation token is not possible. The worker execution loop should accept and propagate the cancellation token that the IHostedService (or IModuleExecutor) lifecycle provides. The DequeueModuleAsync in particular should be cancellable so the worker can stop blocking when the host shuts down.
Significant: ConfigurePipeline is empty — matrix expansion not implemented
DistributedPipelinePlugin.cs:
public void ConfigurePipeline(PipelineBuilder pipelineBuilder)
{
// Pipeline-level configuration (e.g., matrix expansion) will be added in later phases
}The PR description marks tasks T044–T048 (matrix module expansion) as complete (- [x]), and MatrixModuleExpander.cs exists in the diff, but it is never wired up. The expansion hook that should call MatrixModuleExpander in ConfigurePipeline is missing. Either the expansion should be connected, or the task checkboxes should reflect the actual state.
Significant: Redis dequeue uses polling instead of BLPOP
RedisDistributedCoordinator.DequeueModuleAsync — Uses Task.Delay polling:
await Task.Delay(_dequeuePollDelay, cancellationToken);Redis natively supports blocking pop (BLPOP/BRPOP) which blocks until an item is available. Using BLPOP with a timeout eliminates polling latency entirely (immediate wake-up when work arrives), reduces Redis command volume, and simplifies the loop. The capability re-enqueue/delay pattern from InMemoryDistributedCoordinator is replicated here with the same livelock risk.
Significant: S3 artifact download buffers entire file into MemoryStream
S3DistributedArtifactStore.DownloadAsync:
var ms = new MemoryStream();
await response.ResponseStream.CopyToAsync(ms, cancellationToken);
ms.Position = 0;
return ms;The comment says this is to avoid holding the S3 response stream open, but for large artifacts (e.g., build outputs, test results) this will OOM the worker. A better approach is to return the response stream directly (the caller should dispose it), or implement the download as a write-to-path overload so the content streams directly to disk without intermediate buffering.
Moderate: TOTAL_INSTANCES: 3 hardcoded in GitHub Actions workflow
.github/workflows/dotnet.yml:
env:
TOTAL_INSTANCES: 3The matrix has exactly 3 entries (ubuntu, windows, macos), so 3 is correct today — but it's a manual sync point. If an OS is added or removed from the matrix, TOTAL_INSTANCES must be updated separately or the distributed coordinator will wait forever for a worker that never arrives. Consider deriving this from ${{ strategy.job-total }} which GitHub Actions provides automatically.
Moderate: ArtifactLifecycleManager.ResolvePathPattern has surprising multi-match behavior
ArtifactLifecycleManager.cs — When a glob matches multiple files, it returns the common parent directory:
if (matches.Length > 1)
{
return GetCommonParentDirectory(matches) ?? baseDir;
}A user specifying "**/bin/Release/*.nupkg" expecting to upload multiple NuGet packages would instead get the common ancestor directory uploaded as a zip. The expected behavior should be to upload each matched file individually as separate artifact entries, not collapse them to a directory.
Minor: docs/distributed-runners-proposal.md in repo root
This design document (docs/distributed-runners-proposal.md) appears to be a pre-implementation exploration doc committed to the repo root. Now that the implementation exists and proper documentation is in docs/docs/distributed/, this file should either be moved to specs/ or removed to avoid confusion about which document represents the current state.
Minor: specs/ folder with raw planning artifacts
The specs/001-distributed-workers/ folder containing task checklists, data models, and implementation plans is valuable context for contributors but unusual in a library repo. Consider whether this should live in a separate dev-docs branch, the GitHub wiki, or a docs/contributing/ subfolder instead of the main source tree.
Summary
The overall architecture is solid and the abstractions (IDistributedCoordinator, IDistributedArtifactStore, attribute-driven routing) are well-designed and extensible. The main concerns are:
- The reflection-based
CompletionSourceaccess should be replaced with a proper public execution path. BuildServiceProvider()inConfigureServicesis a correctness issue that can cause surprising DI behavior.WorkerHealthMonitorand matrix expansion integration are marked done but aren't functional yet.CancellationToken.Nonethroughout the worker makes clean shutdown impossible.- The capability dequeue busy-loop pattern appears in both coordinator implementations and should be rethought.
The Redis pub/sub result notification (using check-then-subscribe to close the race) is a nice pattern and well-implemented. The artifact deduplication via ConcurrentDictionary<string, Lazy<Task>> is also clean.
There was a problem hiding this comment.
Code Review: Distributed Workers Mode
This is a substantial and well-structured PR. The architecture (pluggable coordinators, attribute-driven artifact routing, capability matching) is thoughtfully designed. Below are the issues I found, roughly ordered by severity.
Critical: Reflection to access internal CompletionSource
WorkerModuleExecutor.cs — The worker executor accesses an internal CompletionSource via private reflection to await module execution:
var completionSourceProp = module.GetType().GetProperty(
"CompletionSource",
System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic)\!;This is a fragile coupling to implementation internals. It will break silently if the property is renamed/removed, produces null-ref exceptions on \! if not found, and is incompatible with AOT/Native AOT trimming. The worker should execute the module through the existing public IModuleRunner / ModuleExecutionPipeline contract rather than bypassing it. If the module result isn't accessible through the public API, that's the missing piece to add — not a reflection hack.
Major: BuildServiceProvider() called inside ConfigureServices
DistributedPipelinePlugin.cs — The plugin calls services.BuildServiceProvider() twice (once for options, once for the warning logger). Calling BuildServiceProvider() inside ConfigureServices is explicitly flagged as an anti-pattern by the .NET team. It creates a separate, orphaned container that instantiates singletons a second time, may access services before they're fully configured, and creates a memory leak. The correct approach is to use factory-pattern registration (AddSingleton(sp => ...)) or defer resolution until the factory delegates are invoked.
Major: WorkerHealthMonitor is a non-functional stub
WorkerHealthMonitor.cs — The core of the monitor loop is empty:
// In a real implementation, we'd track last heartbeat time
// For now, rely on the coordinator's heartbeat trackingHeartbeat data is never read and no timeout is enforced. InMemoryDistributedCoordinator.SendHeartbeatAsync updates worker status but doesn't store a timestamp. There is no path where a worker is ever marked as TimedOut. The PR description marks worker failure resilience (US6) as complete, but the implementation is a no-op. Either the feature should be implemented or the PR description should reflect this gap.
Major: Capability dequeue creates a livelock in InMemoryDistributedCoordinator
InMemoryDistributedCoordinator.DequeueModuleAsync — When a worker dequeues an assignment it can't handle, it re-enqueues and spins with a 50ms delay. With N workers and a module requiring a capability no currently-connected worker possesses, all N workers will spin indefinitely. A better design is for the coordinator to hold capability-mismatched assignments in a side buffer until a compatible worker registers, then move them to the main queue. The same pattern is replicated in RedisDistributedCoordinator with the same problem.
Major: CancellationToken.None throughout worker execution loop
WorkerModuleExecutor.cs — Nearly all coordinator calls use CancellationToken.None, making clean shutdown via the host's cancellation token impossible. The DequeueModuleAsync call in particular should be cancellable so the worker can stop blocking when the host shuts down.
Significant: ConfigurePipeline is empty — matrix expansion not wired up
DistributedPipelinePlugin.cs:
public void ConfigurePipeline(PipelineBuilder pipelineBuilder)
{
// Pipeline-level configuration (e.g., matrix expansion) will be added in later phases
}The PR description marks tasks T044–T048 (matrix module expansion) as complete, and MatrixModuleExpander.cs exists in the diff, but it is never wired up here. Either connect it or update the task checkboxes to reflect the actual state.
Significant: Redis dequeue uses polling instead of BLPOP
RedisDistributedCoordinator.DequeueModuleAsync uses Task.Delay polling. Redis natively supports BLPOP/BRPOP which blocks server-side until an item is available, eliminating polling latency and reducing command volume. This is a straightforward improvement with no downside.
Significant: S3 artifact download buffers entire file into MemoryStream
S3DistributedArtifactStore.DownloadAsync copies the full S3 response stream into a MemoryStream before returning. For large artifacts (build outputs, test results) this will OOM the worker. Consider returning the response stream directly (caller disposes) or implementing a download-to-path overload that streams directly to disk.
Moderate: TOTAL_INSTANCES: 3 hardcoded in GitHub Actions workflow
The matrix has 3 entries (ubuntu, windows, macos), so 3 is correct today — but it's a manual sync point. Consider using ${{ strategy.job-total }} which GitHub Actions provides automatically, eliminating the risk of a mismatch if the matrix changes.
Moderate: ResolvePathPattern has surprising multi-match behavior
ArtifactLifecycleManager.cs — When a glob matches multiple files, it returns the common parent directory and uploads it as a zip. A user specifying "**/bin/Release/*.nupkg" to upload multiple packages would instead get the parent directory uploaded. The expected behavior is to upload each matched file as a separate artifact entry.
Minor: docs/distributed-runners-proposal.md in repo root
This pre-implementation exploration doc committed to the repo root is superseded by the proper documentation in docs/docs/distributed/. It should be moved to specs/ or removed to avoid confusion.
Summary
The overall architecture is solid: pluggable IDistributedCoordinator, IDistributedArtifactStore, attribute-driven routing, and the Redis pub/sub result notification (using check-then-subscribe to close the race) are all well-designed. The artifact deduplication via ConcurrentDictionary<string, Lazy<Task>> is clean.
The main blockers are:
- Reflection-based
CompletionSourceaccess — needs a proper public execution path. BuildServiceProvider()inConfigureServices— correctness issue.WorkerHealthMonitorand matrix expansion — marked done but not functional.CancellationToken.Nonethroughout the worker — clean shutdown is impossible.- Capability dequeue busy-loop — should be redesigned in both coordinator implementations.
…aring Replace per-instance build step with a single BuildSolutionsModule that runs on the master Linux instance, stages bin/Release/ output into _build-staging/, and shares it to workers via the distributed artifact store. Workers now only restore (for project.assets.json) and receive compiled output via ConsumesArtifact before running tests.
There was a problem hiding this comment.
Follow-up Review: Distributed Workers Mode
I reviewed this against my previous comments to check which have been addressed. Here's the current status.
Improvements since last review
WorkerHeartbeatService.cs — The heartbeat service now correctly propagates stoppingToken to the coordinator call and the delay, enabling clean shutdown via host cancellation. This was one of the cancellation concerns from before.
Matrix has 5 instances — The matrix was updated to 5 instances. Still hardcoded (see below), but the count at least matches the matrix size.
Still Open: Critical
Reflection on CompletionSource in WorkerModuleExecutor.cs (from previous review, unchanged)
The worker still accesses the module's internal CompletionSource via private reflection. This remains a fragile coupling that breaks under AOT/NativeAOT, silently breaks on rename, and throws NullReferenceException with ! if not found. The correct approach is to execute the module through the existing IModuleRunner contract. If the runner doesn't expose a way to await a specific module result, that surface area needs to be added to the public API.
Still Open: Major
CancellationToken.None in WorkerModuleExecutor (from previous review, partially addressed)
WorkerHeartbeatService was fixed, but WorkerModuleExecutor's coordinator calls — RegisterWorkerAsync, DequeueModuleAsync, and PublishResultAsync — appear to still use CancellationToken.None. The dequeue loop in particular needs to receive and propagate the cancellation token so the worker actually stops blocking when the host shuts down.
WorkerHealthMonitor.cs is still a stub (from previous review, unchanged)
The monitor loop body is still empty — HeartbeatTimeoutSeconds is computed and discarded, and no worker is ever marked TimedOut. The PR description marks US6 (worker failure resilience) as complete, which remains inaccurate. Either implement the eviction or remove it from the "done" checklist.
Capability dequeue busy-loop (from previous review, unchanged)
Both InMemoryDistributedCoordinator and RedisDistributedCoordinator still re-enqueue and spin with Task.Delay(50ms) when a worker pulls an assignment it can't handle. With N workers and an incompatible assignment, all N workers cycle through the same item endlessly. The fix is to filter assignments server-side before delivery, or move capability-mismatched items to a per-capability side-queue until a matching worker connects.
Still Open: Significant
Redis dequeue polling instead of BLPOP (from previous review, unchanged)
RedisDistributedCoordinator.DequeueModuleAsync still polls with Task.Delay. Redis's BLPOP/BRPOP would eliminate the polling latency (items are delivered immediately rather than after up to one poll interval) and remove the delay loop entirely. This is worth fixing before the feature ships.
S3 downloads buffer entire file to MemoryStream (from previous review, unchanged)
S3DistributedArtifactStore.DownloadAsync still reads the full S3 response into a MemoryStream before returning. For large build artifacts (binaries, test results, coverage data) this will OOM workers. The response stream should be returned directly or the download should stream straight to disk.
ArtifactLifecycleManager.ResolvePathPattern multi-match collapse (from previous review, unchanged)
When a glob matches multiple files, the resolved "path" is the common parent directory, not the individual files. A user declaring [ProducesArtifact("*.nupkg")] expecting to upload multiple packages will silently upload the parent directory as a zip instead. Each matched file should be uploaded as a separate artifact entry.
TOTAL_INSTANCES still hardcoded (from previous review, still present)
Updated from 3 to 5 but still manually maintained. ${{ strategy.job-total }} is set automatically by GitHub Actions and equals the number of matrix entries. Using it removes the manual sync point entirely:
env:
TOTAL_INSTANCES: ${{ strategy.job-total }}MatrixModuleExpander still not wired into ConfigurePipeline
MatrixModuleExpander was added and has its own tests, but DistributedPipelinePlugin.ConfigurePipeline still has a placeholder comment — the expander is never called. The PR description marks matrix expansion (T044–T048) as [x] done, but the integration step is missing.
Still Open: Minor
docs/distributed-runners-proposal.md in repo root — Still present alongside the docs/docs/distributed/ documentation. The exploration doc and the live docs describe different states of the design and will cause confusion for future contributors. Move or remove it.
specs/001-distributed-workers/ — Still committed as source artifacts. No change here, but it's worth a decision before merging to main.
New: DistributedPipelinePlugin service resolution
The BuildServiceProvider() anti-pattern raised in the previous review was a concern about coordinator/store factory resolution inside ConfigureServices. Based on the current structure (coordinator resolved via DI ordering: explicit > factory > in-memory fallback), I can't confirm whether BuildServiceProvider() was removed without seeing the actual factory delegation code. If it was removed and replaced with AddSingleton(sp => ...) factory delegates, please call that out — it would close one of the previous major issues.
Summary
The heartbeat service cancellation fix is a genuine improvement. The rest of the issues from the previous review remain open. The highest-priority items before merging:
- Replace the
CompletionSourcereflection with a supported execution path - Propagate cancellation tokens through
WorkerModuleExecutor - Implement or explicitly stub out the
WorkerHealthMonitor - Wire
MatrixModuleExpanderintoConfigurePipeline(or uncheck the task in the PR description) - Fix the capability dequeue livelock
- Replace reflection-based CompletionSource access with IModule.ResultTask property, eliminating fragile reflection in WorkerModuleExecutor - Remove BuildServiceProvider() anti-pattern in DistributedPipelinePlugin; resolve options directly from service descriptors instead - Implement WorkerHealthMonitor heartbeat tracking with actual timeout detection via GetLastHeartbeatAsync/UpdateWorkerStatusAsync on coordinator - Fix capability dequeue livelock: InMemoryDistributedCoordinator now uses List+SemaphoreSlim with selective dequeue instead of Channel re-enqueue - Redis dequeue uses LRANGE+LREM for capability scanning and BRPOP for blocking wait, eliminating polling loops - Propagate CancellationToken via IHostApplicationLifetime in worker executor instead of using CancellationToken.None everywhere - Stream S3 artifact downloads to temp files instead of MemoryStream to avoid OOM on large artifacts - Use strategy.job-total for TOTAL_INSTANCES in GitHub Actions workflow - Fix ResolvePathPattern to return all matched paths instead of collapsing to common parent directory - Remove superseded docs/specs planning artifacts
There was a problem hiding this comment.
Follow-up Review: Distributed Workers Mode (Round 3)
I reviewed this against my two previous comment threads to track which issues have been addressed and which remain open.
Resolved since last review
Reflection on CompletionSource — FIXED
The CompletionSource reflection hack has been replaced with a proper public API. IModule now exposes a ResultTask property, and Module<T> implements it via a ContinueWith projection of the internal CompletionSource.Task. WorkerModuleExecutor now calls await module.ResultTask through this public interface. This was the most significant correctness issue from previous reviews, and the fix is clean.
BuildServiceProvider() anti-pattern — FIXED
DistributedPipelinePlugin.ConfigureServices no longer calls BuildServiceProvider(). Options are now extracted by scanning the IServiceCollection descriptors directly via ResolveOptionsFromDescriptors. The warning logger is now a lazy singleton (InMemoryCoordinatorWarning) that logs at resolution time. Both fixes are correct.
WorkerHealthMonitor stub — FIXED
The monitor now retrieves heartbeat timestamps via GetLastHeartbeatAsync and calls UpdateWorkerStatusAsync when a worker exceeds HeartbeatTimeoutSeconds. This is a real implementation with actual timeout enforcement.
IDistributedCoordinator interface expansion
The interface was extended with GetLastHeartbeatAsync and UpdateWorkerStatusAsync, which was necessary to support the health monitor fix above.
InMemory dequeue livelock — FIXED
InMemoryDistributedCoordinator.DequeueModuleAsync now uses a SemaphoreSlim with a proper capability-scan-within-lock pattern. It no longer re-enqueues and busy-spins; instead it scans the queue under the lock, finds the first matching assignment, and releases the semaphore back if a non-matching assignment was the trigger. This is correct and eliminates the previous livelock.
Still Open: Significant
Redis dequeue: LRANGE+LREM scan instead of BLPOP, with livelock when capabilities mismatch
RedisDistributedCoordinator.DequeueModuleAsync was improved — it now scans the list with LRANGE first, then atomically removes with LREM, and falls back to BRPOP only when no matching item is found. The BRPOP path then re-pushes a mismatched item to the front with LPUSH.
The remaining problem is the BRPOP + LPUSH fallback: when a capability-mismatched item is at the head of the queue and multiple workers are blocking with BRPOP, each worker will pop the item, check capabilities, re-push it, and the cycle repeats. With N workers all missing the required capability, the item bounces at O(N) throughput. This is the Redis analogue of the InMemory livelock that was fixed.
A cleaner approach is to abandon the BRPOP fallback entirely and instead use LRANGE polling with a configurable delay (already in the options), or use Redis Streams (XADD/XREADGROUP) which support consumer group acknowledgment and avoid the push-pop contention.
WorkerModuleExecutor — ResultTask semantics mismatch
WorkerModuleExecutor.ExecuteAsync calls await module.ResultTask to get the module result. However, ResultTask is backed by CompletionSource.Task, which is only completed when the module's ExecuteAsync returns and the framework sets the result on the CompletionSource. In the worker path, the module framework is not executing the module — the worker receives an assignment by name and is expected to run it remotely. Awaiting ResultTask on the worker side will deadlock unless the framework is also running the module's ExecuteAsync locally.
Concretely: if the worker receives a ModuleAssignment for MyBuildModule, finds the registered IModule instance, and then calls await module.ResultTask, it will wait forever because nothing is calling MyBuildModule.ExecuteAsync. The worker should be executing the module through IModuleRunner, not awaiting a completion source that only the framework can set.
The original test stub (WorkerModuleExecutorTests) has a comment saying "detailed testing requires mocking the full DI and execution pipeline" and its only test is await Assert.That(true).IsTrue(). This gap in test coverage means this fundamental flow — does the worker actually execute and get a result? — has no verification.
MatrixModuleExpander still not wired into ConfigurePipeline
DistributedPipelinePlugin.ConfigurePipeline still contains only a comment:
public void ConfigurePipeline(PipelineBuilder pipelineBuilder)
{
// Pipeline-level configuration (e.g., matrix expansion) will be added in later phases
}MatrixModuleExpander exists and has unit tests for its scan logic, but is never called from the plugin. The PR description marks matrix expansion tasks (T044–T048) as [x] complete. The scanner and metadata types are in place, but the pipeline integration — the part that actually creates expanded module instances — is absent. The checklist items should be updated to reflect that the scanner is implemented but the integration is deferred.
S3 downloads still buffer to MemoryStream
S3DistributedArtifactStore.DownloadAsync still copies the full S3 response into a MemoryStream before returning:
var ms = new MemoryStream();
await response.ResponseStream.CopyToAsync(ms, cancellationToken);
ms.Position = 0;
return ms;For build artifacts of any significant size (binaries, coverage reports, packages) this holds the full content in heap memory. The comment says this avoids holding the S3 connection open — but the caller receives a Stream that it then CopyToAsyncs to a file anyway. The better approach is to return the response.ResponseStream directly (wrapped in a stream that disposes the GetObjectResponse when closed), letting the content stream from S3 to disk without intermediate buffering.
TOTAL_INSTANCES still hardcoded to 5 in the workflow
The matrix now has 5 entries (ubuntu/0, windows/1, macos/2, ubuntu/3, ubuntu/4), and TOTAL_INSTANCES is hardcoded to 5. GitHub Actions provides ${{ strategy.job-total }} which automatically equals the number of matrix jobs. This removes the manual sync point and prevents the coordinator from waiting for a ghost worker if the matrix is ever modified:
TOTAL_INSTANCES: ${{ strategy.job-total }}Still Open: Minor
docs/distributed-runners-proposal.md in repo root
The pre-implementation design document in the repo root is still present alongside the published docs/docs/distributed/ documentation. It describes a different design state from what was implemented. It should be removed or moved to specs/ to avoid confusion.
DistributedPipelineBuilderExtensions._pluginRegistered is a static field
DistributedPipelineBuilderExtensions.EnsurePluginRegistered uses a static boolean to prevent double-registration:
private static bool _pluginRegistered;This is process-wide state that survives across test runs in the same process (e.g., xUnit with parallel test collections). If multiple test cases configure different distributed setups, the first AddDistributedMode call will register the plugin for all subsequent calls. Use a thread-safe check (e.g., Interlocked.CompareExchange) and also consider whether PluginRegistry itself has idempotency guarantees.
Duplicate AddDistributedMode in PipelineBuilderExtensions
There are now two AddDistributedMode extension methods on PipelineBuilder: one in ModularPipelines.Distributed.Extensions.DistributedPipelineBuilderExtensions and one in ModularPipelines.Extensions.PipelineBuilderExtensions. The latter does not call EnsurePluginRegistered, so calling the core version won't register the plugin. This is a latent API confusion issue — callers of the core-package version of AddDistributedMode will get options registered but no plugin wired up.
New observation: IArtifactContext not registered per-module scope
ArtifactContextImpl is registered as a singleton in DistributedPipelinePlugin.ConfigureServices:
services.AddSingleton<IArtifactContext>(sp2 =>
new ArtifactContextImpl(
sp2.GetRequiredService<IDistributedArtifactStore>(),
sp2.GetRequiredService<IOptions<ArtifactOptions>>(),
string.Empty)); // empty moduleTypeNameThe moduleTypeName parameter is hardcoded to string.Empty. The ArtifactContextImpl presumably uses this to tag artifacts with the producing module's name. Every module calling context.Artifacts().PublishFileAsync(...) will publish with an empty module type name, defeating the [ConsumesArtifact(producerModule: typeof(X))] lookup that uses the producer's type name as the key.
If IArtifactContext is meant to be per-module, it should be registered as a scoped service and initialized with the current module's type name. If it's meant to be shared, the PublishFileAsync API should accept the module type as a parameter rather than baking it into the constructor.
Summary
Solid progress since the last review. The three most important fixes (reflection removal, BuildServiceProvider(), and WorkerHealthMonitor) are all correct. The InMemory dequeue livelock is fixed. The heartbeat timestamp tracking is now real.
The items that need attention before this is production-ready:
WorkerModuleExecutor.ResultTasksemantics — awaiting aCompletionSourcethat nothing sets on the worker side will deadlock in real execution. This needs either a real execution call throughIModuleRunner, or a clear explanation of how the module gets executed and sets its completion source on the worker.- Redis dequeue
BRPOPbounce — the InMemory livelock was fixed but the Redis analogue remains. IArtifactContextsingleton with emptymoduleTypeName— breaks artifact attribution in practice.MatrixModuleExpanderintegration — the checklist items should be unchecked until theConfigurePipelinewiring exists.- S3
MemoryStreambuffering — lower priority but a real OOM risk for large artifacts.
…ptional Move ModularPipelines.Analyzers.sln into the restore loop and BuildSolutionsModule instead of a separate workflow build step. Make DependsOn<BuildSolutionsModule> optional since it has [RunOnLinuxOnly] and isn't registered on Mac/Windows instances.
There was a problem hiding this comment.
Code Review: Distributed Workers Mode
This is an ambitious and well-structured addition. The architecture is sound — clear separation of concerns with coordinator abstractions, capability routing, and artifact lifecycle management. The documentation is thorough and the test coverage is solid. Below are architectural concerns and design patterns worth revisiting before merge.
1. IDistributedCoordinator is a Role-Confused Interface (ISP Violation)
The single IDistributedCoordinator interface is implemented differently for master vs. worker, and both SignalRMasterCoordinator and SignalRWorkerCoordinator throw NotSupportedException on several methods:
// SignalRWorkerCoordinator
public Task EnqueueModuleAsync(...) => throw new NotSupportedException("Workers do not enqueue work.");
public Task<SerializedModuleResult> WaitForResultAsync(...) => throw new NotSupportedException("Workers do not wait for results.");
public Task<IReadOnlyList<WorkerRegistration>> GetRegisteredWorkersAsync(...) => throw new NotSupportedException("Workers do not query registered workers.");This is a classic Liskov Substitution Principle violation: callers accepting IDistributedCoordinator cannot trust the contract at compile time. This also makes the interface harder to mock correctly in tests — you have to know which methods are valid for which role.
Why it matters: Any new coordinator implementation must carry the cognitive burden of knowing which half of the interface to implement, and runtime failures from wrong usage are invisible until execution.
Suggested direction: Split into two focused interfaces:
// The part the master needs
public interface IMasterCoordinator
{
Task EnqueueModuleAsync(ModuleAssignment assignment, CancellationToken ct);
Task<SerializedModuleResult> WaitForResultAsync(string moduleTypeName, CancellationToken ct);
Task<IReadOnlyList<WorkerRegistration>> GetRegisteredWorkersAsync(CancellationToken ct);
Task SignalCompletionAsync(CancellationToken ct);
}
// The part a worker needs
public interface IWorkerCoordinator
{
Task<ModuleAssignment?> DequeueModuleAsync(IReadOnlySet<string> capabilities, CancellationToken ct);
Task PublishResultAsync(SerializedModuleResult result, CancellationToken ct);
Task RegisterWorkerAsync(WorkerRegistration registration, CancellationToken ct);
}The Redis and InMemory implementations can implement both (they share state), while SignalR master/worker coordinators only implement the relevant one.
2. Significant Code Duplication Between Master and Worker Executors
DistributedModuleExecutor and WorkerModuleExecutor share nearly identical implementations for:
- ApplyDependencyResults(...) — both decompress GZip-prefixed JSON and call ModuleCompletionSourceApplicator.TryApply
- Artifact upload/download logic with the same error handling patterns
- PublishResolutionFailureAsync(...) — identical sentinel-value failure publishing
This has already created an inconsistency: DistributedModuleExecutor references DistributedWorkPublisher.GzipPrefix directly, while WorkerModuleExecutor references it as Master.DistributedWorkPublisher.GzipPrefix — a leaky internal namespace dependency.
Why it matters: When one copy is updated (e.g., a bug fix to the GZip decompression path), the other diverges silently.
Suggested direction: Extract shared execution logic into an AssignmentExecutor helper used by both master and worker.
3. Dependency Results Embedded in Work Queue Payloads Will Not Scale
DistributedWorkPublisher.GatherDependencyResults() serializes and embeds all upstream dependency results into each ModuleAssignment. This has compounding problems:
- Payload fan-out: A diamond dependency pattern causes earlier results to be re-transmitted in every downstream assignment.
- Redis transport limits: The comment says "10 MB request cap" but this is not enforced — it will fail at the transport layer, not gracefully.
- The InMemory coordinator never compresses: GZip is only exercised in the Redis path, so unit tests using InMemory may pass while the Redis path fails under load.
Why it matters: For large build outputs, or pipelines with deep dependency chains, this causes silent data loss, transport errors, or OOM conditions.
Suggested direction: Workers should pull dependency results from the coordinator on demand (the same pattern Redis pub/sub already uses for final results):
var depResult = await coordinator.GetResultAsync(depTypeName, cancellationToken);4. [MatrixTarget] is Public API with Silent Wrong Behavior
The MatrixModuleExpander.ScanForExpansions() is implemented, but the TODO comment in DistributedModuleExecutor shows it is explicitly not wired in:
// TODO(matrix): MatrixModuleExpander.ScanForExpansions not yet connected.
// Modules with [MatrixTarget] will run once, not N times.The [MatrixTarget] attribute is public API and appears in documentation examples. Users who apply it will observe silent wrong behavior — their module runs once, not N times, with no error or warning.
Suggested direction: Either wire ScanForExpansions into DistributedModuleExecutor before merge, or add a runtime warning log when the attribute is detected, so the failure mode is visible rather than silent.
5. RunIdentifierResolver is Duplicated Across Three Packages
RunIdentifierResolver is copied verbatim into:
- src/ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cs
- src/ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cs
- src/ModularPipelines.Distributed.Discovery.Redis/RunIdentifierResolver.cs
Why it matters: If a new CI system needs support, all three must be updated independently. Any drift between them is a latent correctness bug.
Suggested direction: Move it into the core ModularPipelines.Distributed package as an internal static utility shared by all.
6. ArtifactLifecycleManager Has a Stuck-Download Risk
In ArtifactLifecycleManager.DownloadConsumedArtifactsForPathAsync, the shared download uses CancellationToken.None intentionally to prevent one caller's cancellation from aborting other waiters. But if the download hangs (network failure, S3 timeout), every subsequent caller for that key also hangs indefinitely — the failed-entry removal only runs if an exception is thrown, not on timeout.
Why it matters: A stuck download will deadlock the entire pipeline on all workers waiting for that artifact, with no escape valve.
Suggested direction: Apply a configurable download timeout independent of the caller's token, so the shared task itself has a bounded lifetime even when callers are patient.
7. Redis Lua Script Does a Linear Scan of the Entire Work Queue
ScanAndClaimScript does LRANGE key 0 -1 — fetching the entire list — then iterates to find a matching assignment. For the common case (no capability filtering), this is O(N) on every dequeue attempt by every worker.
Why it matters: With many workers polling simultaneously and a large queue, this creates O(N x W) Redis operations per scheduling cycle. Each LRANGE holds a Redis server lock for the entire iteration.
Suggested direction: Add a fast path for the no-capability case:
-- O(1) fast path when no capability filtering needed
if required == nil or #required == 0 then
return redis.call('RPOP', KEYS[1])
end
-- fall through to O(N) capability-matching scanMinor Observations
- Heartbeat fields are documented but absent: HeartbeatIntervalSeconds and HeartbeatTimeoutSeconds appear in the docs configuration table but are not present in DistributedOptions in the diff. The heartbeat background services referenced in the architecture docs do not appear to be wired up.
- Cloudflared regex is hardcoded to trycloudflare.com: This will fail silently if Cloudflare changes their quick-tunnel domain or if a user has a named tunnel. The regex should match any HTTPS URL from cloudflared output rather than a specific hostname suffix.
Summary
The overall design — coordinator abstraction, capability routing, artifact lifecycle management, and the Redis pub/sub approach for results — is well-thought-out and the test coverage is comprehensive. The main concerns to address before merge are:
- IDistributedCoordinator violates ISP — split by role
- Master/worker executor duplication should be extracted
- Embedding dependency results in assignments will not scale — prefer pull-on-demand
- [MatrixTarget] ships with documented-but-broken behavior — add a runtime warning at minimum
- RunIdentifierResolver triplication is a maintenance hazard
- ArtifactLifecycleManager needs a timeout escape valve for stuck downloads
- Redis Lua scan is O(N) where the common case could be O(1)
Server's JsonHubProtocol defaults to camelCase naming, but the client uses default STJ options (PascalCase). This causes "Error binding arguments" when deserializing WorkerRegistration records. Configure server to use PascalCase (PropertyNamingPolicy = null) and case-insensitive matching to align with the client.
There was a problem hiding this comment.
Distributed Workers Mode — Round 12 Code Review
Reviewing against all previous rounds to track resolution status.
Resolved Since Last Review
Redis upload OOM — FIXED
RedisDistributedArtifactStore.UploadAsync now streams correctly using ReadFullBufferAsync and makes the single-vs-chunked decision within the streaming loop, without ever buffering the entire artifact in memory. The previous critical concern is resolved.
Reflection-based result application — FIXED
IModule now exposes TrySetDistributedResult(IModuleResult result) and ModuleCompletionSourceApplicator.TryApply calls it cleanly. No private reflection.
GetModule<T>() cross-process hang — Resolved by inline dependency results
The DependencyResults field in ModuleAssignment carries serialized results for all direct [DependsOn<T>] dependencies. Both master and worker apply these before execution, so GetModule<T>() works correctly across process boundaries for declared dependencies.
ArtifactLifecycleManager cached-failure in Lazy<Task> — FIXED
Failed entries are removed from _completedRestores in the catch block, allowing retry on subsequent calls.
Still Open
1. [MatrixTarget] is dead public API — ships with no effect
MatrixModuleExpander.ScanForExpansions is not called anywhere in DistributedModuleExecutor. The [MatrixTarget] attribute is public, documented, and mentioned in the PR summary, but silently has no effect. Modules decorated with it run once, not N times.
This was raised in the previous round and remains unaddressed. Shipping public attributes that appear to control behaviour but don't is misleading and hard to un-ship once users adopt it. Either wire up the expansion before this merges, or remove the attribute and expander from this PR entirely and add them in a follow-up.
2. architecture.md describes the wrong interface
The docs still describe an IDistributedCoordinator with 9 methods across four concerns including SendHeartbeatAsync, BroadcastCancellationAsync, and IsCancellationRequestedAsync. The actual interface has 7 methods; those three don't exist.
The architecture doc also says:
"Two background services start:
WorkerHeartbeatService(periodic heartbeats) andWorkerCancellationMonitor(polls for cancellation)."
Neither of these services exists in the current codebase.
This creates a misleading contract description for implementors of custom coordinators. The docs need to be updated to reflect the current interface.
3. Redis chunked download — silent truncation on chunk TTL expiry
RedisDistributedArtifactStore.DownloadAsync reconstructs chunked artifacts by iterating until a chunk key is missing:
while (true)
{
var chunkKey = _keys.ArtifactChunk(reference.ArtifactId, chunkIndex);
var chunk = await _database.StringGetAsync(chunkKey);
if (chunk.IsNull) { break; }
ms.Write(bytes, 0, bytes.Length);
chunkIndex++;
}If any chunk key expires before all others (Redis evicts under memory pressure with non-uniform TTL enforcement), the result is silently truncated data — the download succeeds but the reconstructed artifact is corrupt.
The fix: store the chunk count in the artifact metadata during upload (ArtifactReference already has SizeBytes, but chunk count is not there). On download, validate that the number of chunks read matches the expected count and throw if they differ. The metadata JSON is small and won't add meaningful overhead.
4. WorkerModuleScheduler.ReadyModules throws NotSupportedException
public ChannelReader<ModuleState> ReadyModules =>
throw new NotSupportedException("Worker does not use the ready-modules channel.");If any framework code (hooks, health monitors, diagnostics) enumerates IModuleScheduler.ReadyModules without checking the concrete type, this propagates an unexpected exception. The null-object pattern is safer here:
private static readonly ChannelReader<ModuleState> _neverCompletingReader =
Channel.CreateUnbounded<ModuleState>().Reader;
public ChannelReader<ModuleState> ReadyModules => _neverCompletingReader;A never-completing reader behaves correctly: WaitToReadAsync blocks without returning items, and ReadAllAsync never yields. No caller is broken and no exception leaks.
5. RunIdentifierResolver is duplicated across three packages
There are three near-identical implementations of RunIdentifierResolver:
src/ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cssrc/ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cssrc/ModularPipelines.Distributed.Discovery.Redis/RunIdentifierResolver.cs
All three read the same CI environment variables (GITHUB_RUN_ID, BUILD_BUILDID, etc.) and fall back to git SHA. Any bug fix or new CI system needs to be applied in three places. Move this to the core ModularPipelines.Distributed package (as RunIdentifierResolver in a Configuration namespace) and have the backend packages reference it.
New Findings
6. PortableFilePathJsonConverter — case-insensitive comparison on case-sensitive file systems
if (normalizedAbsolute.StartsWith(normalizedRoot, StringComparison.OrdinalIgnoreCase))Linux file systems are case-sensitive. A path like /home/runner/Work/Project/src/output.dll will correctly serialise using OrdinalIgnoreCase, but if the git root is /home/runner/work/Project (note different casing), this will match and produce a malformed relative path. The comparison should use StringComparison.Ordinal on Linux and OrdinalIgnoreCase on Windows/macOS:
var comparison = RuntimeInformation.IsOSPlatform(OSPlatform.Linux)
? StringComparison.Ordinal
: StringComparison.OrdinalIgnoreCase;
if (normalizedAbsolute.StartsWith(normalizedRoot, comparison))This is particularly important for cross-platform distributed runs (e.g., a Windows master serialising paths for a Linux worker).
7. ArtifactLifecycleManager.ResolvePathPattern — ** glob not supported
var wildcardIndex = pathPattern.IndexOfAny(['*', '?']);
// Split at first wildcard onlyFor a pattern like **/bin/*.dll (common in .NET projects), this splits at index 0, takes baseDir = cwd, and searchPattern = Path.GetFileName("**/bin/*.dll") = "*.dll". The result is all .dll files anywhere under cwd, not just those in bin/ subdirectories. The ** directory wildcard is not handled.
While fully implementing glob support from scratch is not trivial, using Microsoft.Extensions.FileSystemGlobbing (already available in the .NET SDK) would handle this correctly:
var matcher = new Microsoft.Extensions.FileSystemGlobbing.Matcher();
matcher.AddInclude(pathPattern);
var result = matcher.Execute(new DirectoryInfoWrapper(new DirectoryInfo(baseDir)));
return result.Files.Select(f => Path.Combine(baseDir, f.Path)).ToList();Summary
The PR has improved significantly across 12 rounds. The upload OOM, reflection coupling, and cached-failure deduplication issues are properly resolved. The remaining blockers before merge are:
[MatrixTarget]unimplemented public API — remove or wire uparchitecture.mdinterface description — outdated by 2 methods + 2 background services- Redis chunked download truncation — correctness risk, needs chunk count validation
The other issues (duplicate RunIdentifierResolver, WorkerModuleScheduler.ReadyModules, path converter case sensitivity, glob handling) are improvements worth addressing but aren't blocking.
There was a problem hiding this comment.
Distributed Workers Mode — Round 12 Code Review
Reviewing against all previous rounds to track resolution status.
Resolved Since Last Review
Redis upload OOM — FIXED
RedisDistributedArtifactStore.UploadAsync now streams correctly using ReadFullBufferAsync and makes the single-vs-chunked decision within the streaming loop, without ever buffering the entire artifact in memory.
Reflection-based result application — FIXED
IModule now exposes TrySetDistributedResult(IModuleResult result) and ModuleCompletionSourceApplicator.TryApply calls it cleanly. No private reflection.
GetModule<T>() cross-process hang — Resolved by inline dependency results
DependencyResults in ModuleAssignment carries serialized results for all direct [DependsOn<T>] dependencies. Both master and worker apply these before execution, so GetModule<T>() works correctly across process boundaries.
ArtifactLifecycleManager cached-failure in Lazy<Task> — FIXED
Failed entries are removed from _completedRestores in the catch block, allowing retry.
Still Open
1. [MatrixTarget] is dead public API — ships with no effect
MatrixModuleExpander.ScanForExpansions is not called anywhere in DistributedModuleExecutor. The [MatrixTarget] attribute is public, documented, and mentioned in the PR summary, but silently has no effect — modules decorated with it run once, not N times.
Raised in the previous round and remains unaddressed. Shipping public attributes that appear to control behaviour but don't is misleading and hard to un-ship once users adopt it. Either wire up the expansion before this merges, or remove the attribute, expander, and MatrixModuleInstance from this PR entirely.
2. architecture.md describes the wrong interface
The docs still describe an IDistributedCoordinator with 9 methods across four concerns including SendHeartbeatAsync, BroadcastCancellationAsync, and IsCancellationRequestedAsync. The actual interface has 7 methods; those three don't exist.
The doc also says:
"Two background services start:
WorkerHeartbeatService(periodic heartbeats) andWorkerCancellationMonitor(polls for cancellation)."
Neither of these services exists in the current codebase. This creates a misleading contract description for anyone implementing a custom coordinator.
3. Redis chunked download — silent truncation on chunk TTL expiry
RedisDistributedArtifactStore.DownloadAsync reconstructs chunked artifacts by iterating until a chunk key is missing:
while (true)
{
var chunk = await _database.StringGetAsync(chunkKey);
if (chunk.IsNull) { break; } // assumed to mean "no more chunks"
...
}If any chunk key expires before all others (Redis eviction under memory pressure does not guarantee uniform TTL enforcement), the download silently succeeds with truncated data — the artifact is corrupt but no exception is thrown.
Fix: store the chunk count in the artifact metadata during upload (add a ChunkCount field to ArtifactReference or the metadata JSON). On download, validate that chunks read equals the expected count and throw if they differ.
New Findings
4. WorkerModuleScheduler.ReadyModules throws NotSupportedException
public ChannelReader<ModuleState> ReadyModules =>
throw new NotSupportedException("Worker does not use the ready-modules channel.");If any framework code (hooks, diagnostics) enumerates IModuleScheduler.ReadyModules without knowing the concrete type, this throws unexpectedly. The null-object pattern is safer:
private static readonly ChannelReader<ModuleState> _neverCompleting =
Channel.CreateUnbounded<ModuleState>().Reader;
public ChannelReader<ModuleState> ReadyModules => _neverCompleting;A never-completing reader blocks WaitToReadAsync without yielding items, and ReadAllAsync never iterates — no caller breaks.
5. RunIdentifierResolver duplicated across three packages
Three near-identical implementations read the same CI environment variables:
src/ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cssrc/ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cssrc/ModularPipelines.Distributed.Discovery.Redis/RunIdentifierResolver.cs
Any bug fix or new CI system needs updating in three places. Move to the core ModularPipelines.Distributed package.
6. PortableFilePathJsonConverter — case-insensitive path comparison on case-sensitive file systems
if (normalizedAbsolute.StartsWith(normalizedRoot, StringComparison.OrdinalIgnoreCase))Linux file systems are case-sensitive. Using OrdinalIgnoreCase can produce a malformed relative path if the git root casing doesn't exactly match the file path casing. This is particularly relevant for cross-platform runs (Windows master → Linux worker). Use StringComparison.Ordinal on Linux, OrdinalIgnoreCase on Windows/macOS.
7. ArtifactLifecycleManager.ResolvePathPattern — ** glob not supported
For a pattern like **/bin/*.dll, the code splits at index 0, uses cwd as base directory, and uses *.dll as the search pattern — matching all .dll files anywhere, not just those in bin/. The ** directory wildcard is silently ignored.
Microsoft.Extensions.FileSystemGlobbing (available in the .NET SDK) handles this correctly and would be a drop-in replacement for the manual glob logic.
Summary
The critical issues from previous rounds (upload OOM, reflection coupling, cached-failure deduplication) are properly resolved. The remaining blockers before merge:
[MatrixTarget]unimplemented public API — remove or wire uparchitecture.mdinterface description — outdated (wrong method count, non-existent background services)- Redis chunked download truncation — correctness risk; chunk count must be validated
The other items (RunIdentifierResolver duplication, WorkerModuleScheduler.ReadyModules, path converter case sensitivity, glob handling) are improvements worth addressing but not strictly blocking.
There was a problem hiding this comment.
Code Review — Round 4
This round reviews the current state of the feat: Distributed workers mode branch after three prior review rounds. The SignalR backend is new since last round; several previously raised issues remain unaddressed.
Previously Raised Issues — Status
| Issue | Prior Rounds | Status |
|---|---|---|
Reflection-based CompletionSource access |
Round 1 | ✅ Fixed — TrySetDistributedResult interface method |
BuildServiceProvider() in ConfigureServices |
Round 1 | ✅ Fixed — descriptor scanning |
CancellationToken.None in worker loop |
Round 1 | ✅ Fixed — ApplicationStopping token threaded through |
S3 MemoryStream OOM risk |
Round 1 | ✅ Fixed — temp file with DeleteOnClose |
| Sync-over-async in factory init | Round 3 | ✅ Fixed — DeferredCoordinator wrapper |
| Redis LRANGE/LREM race | Round 1 | ✅ Fixed — atomic Lua scan-and-claim |
[MatrixTarget] public but non-functional |
All rounds | ❌ Unchanged — TODO comment still present |
ResultTask cancellation-to-fault promotion |
Round 3 | ❌ Unchanged |
Three divergent RunIdentifierResolver impls |
All rounds | ❌ Unchanged |
WorkerModuleScheduler.ReadyModules throws |
Round 1 | ❌ Unchanged |
S3._ttlSeconds stored but never applied |
Prior rounds | ❌ Unchanged |
ResolveDistributedOptions misses env-var config |
Round 1 | ❌ Partially addressed |
Critical Issues
1. [MatrixTarget] ships as public API with docs but silently does nothing
MatrixModuleExpander, [MatrixTargetAttribute], [MatrixExpansionAttribute], and the full docs under docs/docs/distributed/ are all committed. The TODO comment in DistributedModuleExecutor is unchanged:
// TODO(matrix): MatrixModuleExpander.ScanForExpansions not yet connected.
// Modules with [MatrixTarget] will run once, not N times.A user who reads the docs and uses [MatrixTarget] gets silent wrong behavior — the module runs once instead of N times with no warning, no exception, no log message. This is worse than not shipping the feature at all.
Required action: Either wire up MatrixModuleExpander.ScanForExpansions in DistributedModuleExecutor before the scheduler loop, or remove [MatrixTarget], MatrixModuleExpander, and the matrix documentation from this PR. Shipping undocumented stubs as public API with live documentation is a correctness trap.
2. ResultTask cancellation-to-fault promotion (unresolved from Round 3)
In Module.cs:
Task<IModuleResult> IModule.ResultTask => CompletionSource.Task.ContinueWith(
static t => (IModuleResult)t.Result, TaskContinuationOptions.ExecuteSynchronously);When CompletionSource is cancelled, t.Result throws TaskCanceledException, which the continuation catches and surfaces as a faulted (not cancelled) task. This means task.IsCanceled is false when it should be true. In the distributed path, CollectDistributedResultAsync uses cts.IsCancellationRequested to distinguish timeout from pipeline cancellation — the incorrect task state makes these checks unreliable.
Fix:
Task<IModuleResult> IModule.ResultTask => CompletionSource.Task.ContinueWith(
static t => t.IsCanceled
? Task.FromCanceled<IModuleResult>(new CancellationToken(true))
: t.IsFaulted
? Task.FromException<IModuleResult>(t.Exception!.InnerException ?? t.Exception)
: Task.FromResult<IModuleResult>(t.Result),
TaskContinuationOptions.ExecuteSynchronously).Unwrap();Or use an async/await wrapper which handles this correctly by default.
3. SignalR OnDisconnectedAsync does not re-enqueue in-flight work — worker crash is a hang
In DistributedPipelineHub.cs:
public override Task OnDisconnectedAsync(Exception? exception)
{
if (_masterState.Workers.TryRemove(Context.ConnectionId, out var workerState))
{
_logger.LogWarning("Worker {Index} disconnected...");
// TODO: Re-enqueue in-flight work for the disconnected worker
}
return Task.CompletedTask;
}If a worker crashes after receiving an assignment but before publishing a result, CollectDistributedResultAsync on the master blocks until ModuleResultTimeoutSeconds expires, then the whole pipeline fails with a timeout. The TODO acknowledges this but it needs to be resolved before the SignalR backend is usable in production.
The fix requires tracking the current assignment per worker (keyed by ConnectionId) and re-enqueuing to _masterState.PendingWork on disconnect.
4. cloudflared tunnel URL regex misses multi-segment subdomains
In CloudflaredTunnel.cs:
[GeneratedRegex(@"https://[a-zA-Z0-9\-]+\.trycloudflare\.com")]Cloudflare's free tunnel URLs typically have multi-segment subdomains like https://word-word-word-word.trycloudflare.com. The pattern [a-zA-Z0-9\-]+ does not match dots, so multi-segment hostnames never match. The URL task never resolves, the timeout fires, and the master fails with a TimeoutException that looks unrelated to the regex.
Fix:
[GeneratedRegex(@"https://[a-zA-Z0-9][a-zA-Z0-9\-\.]*\.trycloudflare\.com")]This is a functional defect that will prevent the SignalR backend from working in most real cases.
Significant Issues
5. SignalRWorkerCoordinator.DequeueModuleAsync floods master with redundant RequestWork calls
The worker loop calls DequeueModuleAsync in a while loop, and each call sends a RequestWork message to the master. If the master has no work, it queues the request — but the next loop iteration immediately sends another RequestWork before the previous one is resolved. A worker could accumulate dozens of pending requests in the master's queue while idle.
The worker should use a persistent subscription (listen on a long-lived channel) rather than polling via request/response. A single RequestWork call followed by indefinite waiting on the channel reader matches SignalR's push semantics much better.
6. Three divergent RunIdentifierResolver implementations — coordination failure on fallback
Three separate implementations exist with different fallback strategies:
ModularPipelines.Distributed.Redis→ GUID fallbackModularPipelines.Distributed.Artifacts.S3→ GUID fallback (identical copy)ModularPipelines.Distributed.Discovery.Redis→ CWD SHA256 hash fallback
If Discovery.Redis and Distributed.Redis are used together and neither CI env vars nor git SHA resolves, the Redis coordinator uses a random GUID (different per process) while the Discovery coordinator uses a CWD hash (same per process). The run identifier is the Redis key namespace for all coordination — a mismatch means master and workers never communicate.
This was raised in all previous rounds. Move to a single shared RunIdentifierResolver in ModularPipelines.Distributed core.
7. ResolveDistributedOptions misses environment-variable-bound configuration
The scanner only handles IOptions<T> direct instances and IConfigureOptions<T> with ImplementationInstance. It misses:
builder.Configuration.Bind("Distributed", opts)IConfigureOptions<T>withImplementationFactory- Standard .NET env vars like
MODULARPIPELINES__DISTRIBUTED__INSTANCEINDEX
A user who uses standard .NET configuration binding will have every instance default to InstanceIndex = 0 (master), causing all workers to also try to act as master. The RoleDetector reads MODULAR_PIPELINES_INSTANCE directly from Environment.GetEnvironmentVariable as a workaround, but this is not the standard options pattern and isn't obvious to users.
Moderate Issues
8. WorkerModuleScheduler.ReadyModules throws NotSupportedException
Any framework code (hooks, diagnostics, progress monitoring) that accesses IModuleScheduler.ReadyModules without being aware of worker-mode context gets an unhandled exception. Return a completed channel reader instead:
private static readonly ChannelReader<ModuleState> _empty = CreateCompletedReader();
private static ChannelReader<ModuleState> CreateCompletedReader()
{
var ch = Channel.CreateUnbounded<ModuleState>();
ch.Writer.Complete();
return ch.Reader;
}
public ChannelReader<ModuleState> ReadyModules => _empty;9. Dual-container IHubContext<T> ownership in SignalR backend
MasterServerHost starts a WebApplication with its own DI container. SignalRMasterCoordinator is registered in the pipeline's DI container but holds IHubContext<T> resolved from the WebApp's container. These are two separate containers. SignalRMasterState is shared via reference, which works but makes ownership non-obvious.
If the WebApplication is disposed before the coordinator finishes (e.g., on pipeline cancellation), IHubContext<T> becomes invalid mid-flight. Making the coordinator an IHostedService would tie its lifetime to the main host and avoid this ambiguity.
10. S3._ttlSeconds stored but never applied (unresolved from prior rounds)
The _ttlSeconds field is assigned in the constructor but not passed to any S3 API call. S3 does not support per-object TTL in the standard API — expiration requires bucket lifecycle rules. The field creates a false expectation that artifacts auto-expire. Either document that TTL is managed via S3 bucket lifecycle policy and remove the field, or implement it via object tagging + a lifecycle rule.
11. ResolvePathPattern is not a real glob engine
The implementation uses Path.GetFileName to extract the search pattern from a glob like **/bin/Release/*.nupkg, which discards intermediate path segments. Brace expansion (*.{zip,tar.gz}) is not supported at all. Use Microsoft.Extensions.FileSystemGlobbing.Matcher instead:
var matcher = new Matcher();
matcher.AddInclude(pathPattern);
var result = matcher.Execute(new DirectoryInfoWrapper(new DirectoryInfo(baseDirectory)));
return result.Files.Select(f => Path.Combine(baseDirectory, f.Path)).ToList();Summary
The core distributed execution mechanics (Redis coordination, dependency serialization, fault propagation) are much improved from prior rounds. The main blockers before this can merge are:
[MatrixTarget]— either wire it up or remove it + its docs from this PRResultTaskfault promotion — affects cancellation semantics in both distributed and non-distributed paths- SignalR
OnDisconnectedAsync— worker crash causes master hang; needs in-flight re-enqueue - Cloudflared regex — functional defect preventing SignalR tunnel from working on real URLs
The RunIdentifierResolver duplication and ResolveDistributedOptions config-binding gap are also worth resolving before shipping given the coordination failure modes they can cause.
…serialization CreateSlimBuilder in .NET 10 disables reflection-based JSON serialization, which prevents IReadOnlySet<string> deserialization in SignalR hub method binding (RegisterWorker). CreateBuilder restores full JSON support.
There was a problem hiding this comment.
Code Review — Round 5
Reviewed against all prior rounds. The most recent review (Round 4) was submitted ~6 hours ago. Code state appears unchanged — all Round 4 TODO comments are still present. One previously-called-out issue (RequestWork flooding) appears to have actually been fixed in the design; the others remain open.
Resolution Status vs Round 4
| Issue | Round 4 Status | Current Status |
|---|---|---|
Reflection-based CompletionSource |
✅ Fixed | ✅ Still fixed |
[MatrixTarget] dead public API |
❌ Open | ❌ Unchanged — TODO in 2 files |
ResultTask cancellation-to-fault |
❌ Open | ❌ Unchanged |
OnDisconnectedAsync re-enqueue |
❌ Open | ❌ TODO still present |
| Cloudflared regex | ❌ Open | ❌ Unchanged |
WorkerModuleScheduler.ReadyModules throws |
❌ Open | ❌ Unchanged |
Three divergent RunIdentifierResolver impls |
❌ Open | ❌ Unchanged |
S3._ttlSeconds stored, never applied |
❌ Open | ❌ Unchanged |
ResolvePathPattern not a real glob engine |
❌ Open | ❌ Unchanged |
RequestWork flooding |
❌ Open in Round 4 | ✅ Actually resolved — see below |
Correction from Round 4 — RequestWork is NOT flooding
Round 4 flagged SignalRWorkerCoordinator.DequeueModuleAsync as flooding the master. Looking at the current implementation more carefully, this concern is incorrect:
await _connection.InvokeAsync(HubMethodNames.RequestWork, workerCapabilities, cancellationToken);
if (await _assignmentChannel.Reader.WaitToReadAsync(cancellationToken))
{
if (_assignmentChannel.Reader.TryRead(out var assignment))
return assignment;
}The worker sends one RequestWork, then blocks on WaitToReadAsync until an assignment arrives. The push model in SignalRMasterCoordinator.EnqueueModuleAsync calls TryPushToIdleWorker, which sends ReceiveAssignment directly to idle workers — the channel write unblocks WaitToReadAsync regardless of when the assignment arrives relative to the call. The worker never sends a second RequestWork until the first assignment is processed. This is correct behavior and is not flooding.
Critical Issues (still open)
1. [MatrixTarget] ships as documented public API with no effect
The TODO comment is unchanged in two files:
// DistributedModuleExecutor.cs
// TODO(matrix): MatrixModuleExpander.ScanForExpansions not yet connected.
// Modules with [MatrixTarget] will run once, not N times.
// DistributedWorkPublisher.cs
MatrixTarget: null, // TODO(matrix): Set by MatrixModuleExpander when wired upThe [MatrixTarget] attribute, MatrixModuleExpander, and the full docs/docs/distributed/ docs are all committed. A user following the documentation will get silent wrong behavior (module runs once, not N times) with no exception, no log warning, and no indication that the feature is non-functional.
Required action before merge: Either wire up MatrixModuleExpander.ScanForExpansions before the scheduler loop in DistributedModuleExecutor, or remove [MatrixTargetAttribute], MatrixModuleExpander, and the matrix documentation from this PR entirely.
2. ResultTask cancellation-to-fault promotion
Unchanged from all prior rounds:
// Module.cs
Task<IModuleResult> IModule.ResultTask => CompletionSource.Task.ContinueWith(
static t => (IModuleResult)t.Result, TaskContinuationOptions.ExecuteSynchronously);When CompletionSource is cancelled, t.Result throws TaskCanceledException, and the continuation wraps it as a faulted task — not cancelled. task.IsCanceled returns false when it should be true. The distributed result collector checks cts.IsCancellationRequested to distinguish cancellation from timeout — this mismatch makes those checks unreliable.
Fix:
Task<IModuleResult> IModule.ResultTask => CompletionSource.Task.ContinueWith(
static t =>
{
if (t.IsCanceled) return Task.FromCanceled<IModuleResult>(new CancellationToken(true));
if (t.IsFaulted) return Task.FromException<IModuleResult>(t.Exception!.InnerException ?? t.Exception);
return Task.FromResult<IModuleResult>(t.Result);
},
TaskContinuationOptions.ExecuteSynchronously).Unwrap();3. OnDisconnectedAsync — worker crash causes master hang (SignalR)
Unchanged:
public override Task OnDisconnectedAsync(Exception? exception)
{
if (_masterState.Workers.TryRemove(Context.ConnectionId, out var workerState))
{
_logger.LogWarning("Worker {Index} disconnected (connection {ConnectionId})", ...);
// TODO: Re-enqueue in-flight work for the disconnected worker
}
return Task.CompletedTask;
}If a worker crashes after receiving an assignment but before calling PublishResult, CollectDistributedResultAsync on the master blocks until the timeout expires and then fails the entire pipeline. For the SignalR backend to be production-usable, the assignment in-flight on disconnect must be re-enqueued to _masterState.PendingAssignments. Track the current assignment per ConnectionId in WorkerState and re-enqueue on disconnect.
4. Cloudflared regex does not match typical tunnel URLs
Unchanged:
[GeneratedRegex(@"https://[a-zA-Z0-9\-]+\.trycloudflare\.com")]
private static partial Regex TunnelUrlRegex();Cloudflare free tunnel hostnames use multiple hyphen-separated words: clever-tooth-piano-mango.trycloudflare.com. Since [a-zA-Z0-9\-]+ includes hyphens, this regex does match single-level subdomain URLs like clever-tooth-piano-mango.trycloudflare.com. However, Cloudflare occasionally generates URLs with numeric segments or underscores that may not match. More importantly, the regex does not match URLs with port numbers (e.g., https://abc.trycloudflare.com:443) and is fragile if the output format changes slightly.
The safer pattern anchors to the full URL more precisely:
[GeneratedRegex(@"https://[\w\-]+(\.[\w\-]+)*\.trycloudflare\.com(?::\d+)?/?")]This issue is lower severity than previously flagged — typical URLs should match — but the regex is still brittle.
Significant Issues (still open)
5. Three divergent RunIdentifierResolver implementations
Three separate implementations exist with different fallback strategies:
| Package | Fallback when no CI env var / git SHA |
|---|---|
Distributed.Redis |
Random GUID (different per process) |
Distributed.Artifacts.S3 |
Random GUID (identical copy) |
Distributed.Discovery.Redis |
SHA256 of Environment.CurrentDirectory (same per process) |
If a user combines Distributed.Redis (coordination) with Distributed.Discovery.Redis (master discovery), and neither CI env vars nor git resolve, the run IDs differ between packages — master and workers never communicate.
The fix is a single RunIdentifierResolver in ModularPipelines.Distributed core that all packages call. This has been raised in every prior round and is a real coordination failure scenario for local/non-CI runs.
6. WorkerModuleScheduler.ReadyModules throws NotSupportedException
Unchanged — any framework code (hooks, diagnostics, progress tracking) that accesses IModuleScheduler.ReadyModules without knowing about worker mode gets an unhandled exception. Return a completed ChannelReader<ModuleState> instead:
public ChannelReader<ModuleState> ReadyModules
{
get
{
var ch = Channel.CreateUnbounded<ModuleState>();
ch.Writer.Complete();
return ch.Reader;
}
}Moderate Issues (still open)
7. S3._ttlSeconds stored but never applied
private readonly int _ttlSeconds; // constructor-assigned_ttlSeconds is not used in PutObjectRequest or anywhere else. S3 does not support per-object TTL — expiration requires bucket lifecycle rules. Either document that TTL is managed via bucket lifecycle configuration (and remove the field and constructor parameter), or implement TTL via object tagging + documented lifecycle rule convention. The current state creates a false expectation.
8. ArtifactLifecycleManager.ResolvePathPattern is not a real glob engine
The current implementation:
var wildcardIndex = pathPattern.IndexOfAny(['*', '?']);
var baseDir = Path.GetDirectoryName(pathPattern[..wildcardIndex]);
// then: Directory.GetFiles(baseDir, Path.GetFileName(pathPattern))For a pattern like **/bin/Release/*.nupkg:
wildcardIndex = 0(the**starts at index 0)baseDir = GetDirectoryName("")→ current directoryPath.GetFileName("**/bin/Release/*.nupkg")→"*.nupkg"- Result: all
.nupkgfiles in the current directory only — thebin/Release/path constraint is silently discarded
Use Microsoft.Extensions.FileSystemGlobbing.Matcher (already available in the .NET SDK):
var matcher = new Matcher();
matcher.AddInclude(pathPattern);
var result = matcher.Execute(new DirectoryInfoWrapper(new DirectoryInfo(baseDirectory)));
return result.Files.Select(f => Path.Combine(baseDirectory, f.Path)).ToList();Summary
The core distributed mechanics are solid and the architecture is well-designed. The blockers before this can merge remain:
[MatrixTarget]— wire it up or remove it + its docsResultTaskfault promotion — 4-line fix, affects all cancellation paths- SignalR
OnDisconnectedAsync— re-enqueue in-flight work on worker crash RunIdentifierResolverdivergence — coordination failure on fallback for local runs
The ResolvePathPattern glob issue is also worth fixing before shipping since users will hit it as soon as they use glob patterns for artifact paths.
…tocol Two fixes: 1. Set RunIdentifier to GITHUB_RUN_ID so overlapping CI runs don't share stale master URLs in Redis. 2. Add matching AddJsonProtocol config to the client HubConnectionBuilder (PascalCase + case-insensitive) to match the server-side settings.
Code Review: Distributed Workers ModeThis is an impressive, ambitious feature that adds a fully-fledged distributed execution system to ModularPipelines. The architecture is well-thought-out, the documentation is thorough, and the test coverage for a feature this size is commendable. That said, there are several design issues and correctness concerns worth addressing before merging. 1. IDistributedCoordinator Mixes Master and Worker Concerns (Architectural Issue)The This violates the Interface Segregation Principle. When a class implementing an interface must throw Suggested approach: Split into two interfaces — 2. ResolveDistributedOptions Crawls the IServiceCollection at Build Time (Fragile Design)
The reason this workaround exists — to avoid calling Better approach: Accept a 3. DeferredCoordinator / DeferredArtifactStore Live in PipelineBuilder (Separation of Concerns)Two non-trivial distributed infrastructure classes ( Suggestion: Move 4. Module Lookup by FullName Is O(n) Per Assignment (Performance)Both var module = modules.FirstOrDefault(m => m.GetType().FullName == assignment.ModuleTypeName);This is a linear scan executed for every module assignment. In a pipeline with many modules this adds up. The same scan is repeated for every dependency result in Suggestion: Build a 5. ApplyDependencyResults Is Duplicated Between Master and Worker ExecutorsThe This creates a maintenance hazard. If the compression strategy changes, both copies must be updated. Suggestion: Extract a shared 6. MatrixTarget Feature Is Incomplete but Publicly DocumentedThe // TODO(matrix): MatrixModuleExpander.ScanForExpansions not yet connected.
// Modules with [MatrixTarget] will run once, not N times.The Suggestion: Either wire up the expander before merging, or remove the 7. Redis Lua Script Scans the Entire Work Queue (Scalability)The For small pipelines this is fine. But for pipelines with many queued modules, this will block Redis for all clients while scanning. Redis scripts run atomically on a single thread. Longer-term improvement: Use separate queues per capability set (e.g., 8. SignalR Transport Has External Dependency on CloudflaredThe curl -fsSL https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64 -o /usr/local/bin/cloudflaredUsing Suggestion: Pin the cloudflared version in the workflow (e.g., a specific tag), and add it to 9. DeferredCoordinator Has a Double-Checked Locking RaceThe if (_inner is not null) return _inner;
await _lock.WaitAsync(ct);
try { return _inner ??= await factory.CreateAsync(ct); }
finally { _lock.Release(); }The outer Fix: Declare Minor Issue: DistributedOptions.HeartbeatIntervalSeconds / HeartbeatTimeoutSeconds Documented but Not UsedThe configuration docs describe What Works Well
The feature is architecturally sound. Addressing the incomplete 🤖 Generated with Claude Code |
…or SignalR serialization System.Text.Json cannot reliably deserialize IReadOnlySet<string> as a record constructor parameter in SignalR's argument binding pipeline. Using the concrete HashSet<string> type eliminates the binding error.
Code Review: Distributed Workers Mode (Latest Pass)This PR has evolved substantially across many revision rounds. The core architecture is solid: the pluggable Below is the status of remaining concerns and new findings. Still Open from Previous Rounds1.
|
| # | Issue | Status |
|---|---|---|
| 1 | [MatrixTarget] dead public API |
Open (TODO in code, raised prior round) |
| 2 | IDistributedCoordinator ISP violation |
Open (raised prior round) |
| 3 | 50ms busy-poll in SignalR master dequeue | New |
| 4 | Result waiter cancellation race on shutdown | New |
| 5 | CloudflaredTunnel regex hardcodes trycloudflare.com | New |
| 6 | 5 CI runners on every push | New (informational) |
Issues 1 and 2 have been raised in prior rounds and remain unaddressed. Issue 1 in particular is a correctness/API contract problem that should be resolved before merge. The others are design improvements worth addressing.
Start a real SignalR server and client to validate RegisterWorker, PublishResult, and ModuleAssignment serialization. Also resolve AdvertisedUrl from actual bound port (supports port 0 for tests).
There was a problem hiding this comment.
Distributed Workers Mode - Code Review
This is an ambitious and well-structured feature. The abstractions are clean, the documentation is thorough, and the test coverage is meaningful. Below are observations across several categories — design concerns, correctness issues, and places where the implementation could be more maintainable.
1. Heartbeat / SendHeartbeat Missing from IDistributedCoordinator
The PR description and architecture docs mention SendHeartbeatAsync and WorkerHeartbeatService / WorkerCancellationMonitor, but the actual IDistributedCoordinator interface only has:
Task EnqueueModuleAsync(...)
Task<ModuleAssignment?> DequeueModuleAsync(...)
Task PublishResultAsync(...)
Task<SerializedModuleResult> WaitForResultAsync(...)
Task RegisterWorkerAsync(...)
Task<IReadOnlyList<WorkerRegistration>> GetRegisteredWorkersAsync(...)
Task SignalCompletionAsync(...)SendHeartbeatAsync, IsCancellationRequestedAsync, and BroadcastCancellationAsync are documented in docs/docs/distributed/architecture.md but are absent from the interface definition. This means the documented cancellation propagation system and health monitoring via heartbeats are not actually wired up — the interface contract does not match the documentation. If these features exist in a coordinator implementation they are not callable through the abstraction, which defeats the purpose of the interface. Either the interface needs to be updated to include these members, or the documentation needs to be corrected to match what was actually shipped.
2. DeferredCoordinator / DeferredArtifactStore Belong in a Dedicated File, Not Nested in PipelineBuilder
PipelineBuilder.cs now contains two private nested classes (DeferredCoordinator, DeferredArtifactStore) and a 150-line static method ActivateDistributedModeIfConfigured plus ResolveDistributedOptions. This violates single responsibility: PipelineBuilder is now also a distributed-mode activation engine.
The deferred proxy pattern is legitimate, but it should live in its own file under src/ModularPipelines/Distributed/. The activation logic (ActivateDistributedModeIfConfigured + ResolveDistributedOptions) should be extracted to a dedicated class such as DistributedModeActivator. This keeps PipelineBuilder focused on pipeline construction.
3. ResolveDistributedOptions Walks the Service Collection Without BuildServiceProvider() — Fragile
The options resolution in PipelineBuilder iterates IServiceCollection looking for IConfigureOptions<DistributedOptions> descriptors with an ImplementationInstance, then manually calls .Configure() and .PostConfigure() on them:
foreach (var descriptor in services.Where(d =>
d.ServiceType == typeof(IConfigureOptions<DistributedOptions>) &&
d.ImplementationInstance is IConfigureOptions<DistributedOptions>))
{
((IConfigureOptions<DistributedOptions>)descriptor.ImplementationInstance!).Configure(opts);
}This approach is brittle. When Configure<TOptions>(Action<TOptions>) is called, it registers an IConfigureOptions<TOptions> using a lambda factory (not an ImplementationInstance), which means ImplementationInstance is null and the descriptor is silently skipped. The approach works in the current code path only because AddDistributedMode wraps the user's Action<DistributedOptions> in a closure passed directly to Configure, which happens to create a concrete ConfigureNamedOptions with a captured action — but this is an implementation detail of Microsoft.Extensions.Options that could change.
A more robust approach is to provide a separate mechanism for the user to signal distributed mode (e.g., a boolean on PipelineBuilder) rather than sniffing the service collection. Alternatively, the distributed-mode bootstrap could be deferred to IHostedService startup, where a proper IOptions<DistributedOptions> is available.
4. ModuleAssignment.RequiredCapabilities is HashSet<string> on the Record — Mutability Leak
public record ModuleAssignment(
...
HashSet<string> RequiredCapabilities,
...
);Using HashSet<string> (a mutable type) on a record undermines value semantics. Records with mutable reference-type properties do not get structural equality on those properties. IReadOnlySet<string> should be used instead, which is what WorkerRegistration.Capabilities is. The same applies to WorkerRegistration.Capabilities which also uses HashSet<string>. Notably, IDistributedCoordinator.DequeueModuleAsync already takes IReadOnlySet<string>, so the gap is in the data types themselves.
5. Redis Lua Script: LRANGE Scans the Entire Queue on Every Dequeue
The ScanAndClaimScript in RedisDistributedCoordinator:
local items = redis.call('LRANGE', KEYS[1], 0, -1)This loads the entire work queue into Lua memory on every dequeue call. With a small number of short-running modules this is fine, but for a pipeline with many modules (100+) and many workers calling DequeueModuleAsync concurrently, this can become a bottleneck. The script also iterates items in FIFO order and stops at the first match, which means a module at position 0 that no current worker can satisfy will block the scan from finding a matchable item at position 5 without re-examining the rest.
A more scalable approach would be to segregate the queue by capability set (separate lists per capability group), or to accept the O(n) scan with a configurable limit and re-enqueue unmatched items at the tail instead of skipping them.
6. Capability Matching Inconsistency: IReadOnlySet<string> vs HashSet<string> Custom Equality
CapabilityMatcher.CanExecute uses:
return assignment.RequiredCapabilities.All(
required => workerCapabilities.Contains(required, StringComparer.OrdinalIgnoreCase));But assignment.RequiredCapabilities is a HashSet<string> constructed with StringComparer.OrdinalIgnoreCase, and workerCapabilities is also built with that comparer. The call .Contains(required, StringComparer.OrdinalIgnoreCase) on an IReadOnlySet<string> is actually the IEnumerable<T>.Contains(T, IEqualityComparer<T>) LINQ overload — it bypasses the set's internal hash structure and falls back to a linear scan. If workerCapabilities is large, this is inefficient.
To correctly leverage the set's O(1) lookup, ensure workerCapabilities is declared as HashSet<string> (with the correct comparer) and call workerCapabilities.Contains(required) directly, or use IReadOnlySet<string> with Contains(T) only — trusting that both sets were constructed with the same comparer.
7. TrySetDistributedResult Cast Can Throw at Runtime
In Module<T>:
bool IModule.TrySetDistributedResult(IModuleResult result)
{
return CompletionSource.TrySetResult((ModuleResult<T?>)result);
}If result is not a ModuleResult<T?>, this throws InvalidCastException without any diagnostic message. Given that this is called from deserialization in a distributed execution context (where type mismatches are possible due to version skew or registry gaps), the cast should be guarded:
if (result is not ModuleResult<T?> typed)
{
return false;
}
return CompletionSource.TrySetResult(typed);8. ArtifactLifecycleManager.DownloadConsumedArtifactsForPathAsync Silences Failure Cases
When an artifact is not found:
if (artifact is null)
{
_logger.LogWarning(...);
return; // silently succeeds
}If a module has [ConsumesArtifact] and the artifact doesn't exist, execution continues and the module may fail in a confusing way (e.g., FileNotFoundException during execution rather than a clear "required artifact not available" error). This should throw, not log-and-continue, so that the failure is attributed to the missing artifact rather than to arbitrary module code.
9. GzipPrefix String Constant Couples DistributedWorkPublisher and WorkerModuleExecutor
WorkerModuleExecutor.ApplyDependencyResults accesses Master.DistributedWorkPublisher.GzipPrefix and DistributedWorkPublisher.DecompressJson:
if (serializedDep.SerializedJson.StartsWith(Master.DistributedWorkPublisher.GzipPrefix, StringComparison.Ordinal))
{
var decompressed = Master.DistributedWorkPublisher.DecompressJson(serializedDep.SerializedJson);This creates a circular concern: the worker knows about the master's compression implementation. The compression/decompression logic (and its sentinel prefix) should be extracted to a shared utility class (e.g., JsonCompressor) in the Serialization namespace, so both master and worker can use it without cross-namespace coupling.
10. [MatrixTarget] Is Documented and Partially Implemented, but Silently Broken
The feature is visible in the public [MatrixTarget] attribute, documented in docs, and has a corresponding MatrixModuleExpander class — but the expander is explicitly not connected:
// TODO(matrix): MatrixModuleExpander.ScanForExpansions not yet connected.
// Modules with [MatrixTarget] will run once, not N times.Shipping a public attribute that does not work as documented is misleading to users who discover it. Either the attribute should be marked [Obsolete("Matrix expansion is not yet implemented")] or marked [EditorBrowsable(EditorBrowsableState.Never)], or the PR should either complete the feature or not add the public API surface until it is ready.
11. InMemoryDistributedCoordinator Has a Missing SendHeartbeatAsync/BroadcastCancellationAsync Concern
If the interface is eventually updated to include heartbeats/cancellation (see point 1), InMemoryDistributedCoordinator will need to implement them. It is worth noting now that the _completed flag in InMemoryDistributedCoordinator relies on volatile bool, which is fine for the read side, but the write side (_completed = true) followed by _workAvailable.Release() does not guarantee ordering on all architectures without a proper memory barrier. Volatile.Write(ref _completed, true) would be clearer and more explicit about intent.
12. PortableFilePathJsonConverter / PortableFolderPathJsonConverter vs [JsonConverter] on File / Folder
Adding [JsonConverter(typeof(FilePathJsonConverter))] to the File and Folder classes is a global change that affects all serialization of these types — not just distributed module result serialization. If any existing code relies on the default serialization behavior of File or Folder, this is a breaking change. The portable path converters (which are added only to the distributed serializer's JsonSerializerOptions) avoid this problem by being scoped. Consider whether the global [JsonConverter] annotation on File/Folder is intentional and whether it may affect non-distributed serialization paths.
Summary
The architectural foundation is sound: the coordinator abstraction, capability-based routing, artifact lifecycle management, and the deferred initialization pattern are all well-designed. The most actionable issues to address before merging are:
- Correctness: The
IDistributedCoordinatorinterface is missing members that exist in implementations and are documented (heartbeat, cancellation) — this is a gap between docs and code. - Safety: The
TrySetDistributedResultcast should be guarded. - Correctness: The missing artifact case should throw, not silently continue.
- API design:
[MatrixTarget]should not be a visible public attribute while the expansion logic is not connected. - Type design:
HashSet<string>onrecordtypes should beIReadOnlySet<string>.
Tests validate: - Capability-based routing across 3 workers - Result broadcast to non-producing workers (dependency propagation) - Full workflow: enqueue → assign → execute → publish → collect → complete
macOS CI runners take longer to resolve cloudflared tunnel DNS names. 30 seconds was insufficient — increase to 120s to accommodate slower DNS resolution across different CI platforms.
There was a problem hiding this comment.
Code Review: Distributed Workers Mode (Follow-Up)
This is a follow-up to the previous review. Several of the major issues raised earlier have been addressed — well done. Below is the current state.
Resolved from Previous Review
| Issue | Status |
|---|---|
Reflection to access internal CompletionSource |
✅ Fixed — replaced with IModule.TrySetDistributedResult |
BuildServiceProvider() inside ConfigureServices |
✅ Fixed — ResolveDistributedOptions avoids this |
WorkerHealthMonitor non-functional stub |
✅ Removed from scope |
CancellationToken.None throughout worker loop |
✅ Fixed — uses _lifetime.ApplicationStopping |
InMemoryDistributedCoordinator busy-spin livelock |
|
MatrixModuleExpander not wired up |
TODO(matrix) comment confirms |
Critical: IModule interface polluted with distributed-specific API
IModule.cs now exposes two members that are only relevant in distributed execution:
Task<IModuleResult> ResultTask { get; }
bool TrySetDistributedResult(IModuleResult result);This is a breaking change for any user who implements IModule directly rather than inheriting from Module<T>. More importantly, TrySetDistributedResult is a distributed-execution implementation detail that has no meaning in the single-process case. Adding it to the core interface violates the Interface Segregation Principle — every module now carries an obligation it never needs unless distributed mode is enabled.
Why this matters: The original reflection hack was (correctly) identified as bad. But the replacement leaks the distributed concern upward into the public API rather than keeping it internal. The better approach is an internal interface:
// Internal to ModularPipelines — never exposed to users
internal interface IDistributedModule
{
Task<IModuleResult> ResultTask { get; }
bool TrySetDistributedResult(IModuleResult result);
}Module<T> can implement IDistributedModule in addition to IModule, and WorkerModuleExecutor can cast to IDistributedModule internally. This keeps the public IModule contract clean and non-breaking.
Significant: ResultTask creates a new Task object on every access
// Module.cs
Task<IModuleResult> IModule.ResultTask => CompletionSource.Task.ContinueWith(
static t => (IModuleResult)t.Result, TaskContinuationOptions.ExecuteSynchronously);This is a property, not a field — so every access to ResultTask allocates a new continuation task. If the worker (or anything else) accesses ResultTask more than once, you get multiple independent continuations on the same CompletionSource. This is a silent performance and correctness risk. It should be a lazily-initialized field:
private Task<IModuleResult>? _resultTask;
Task<IModuleResult> IModule.ResultTask =>
_resultTask ??= CompletionSource.Task.ContinueWith(
static t => (IModuleResult)t.Result, TaskContinuationOptions.ExecuteSynchronously);Significant: ResolveDistributedOptions is fragile
PipelineBuilder.ResolveDistributedOptions manually re-implements the options resolution pipeline by walking IServiceCollection descriptors and invoking IConfigureOptions<T> instances directly. This works today because services.Configure<T>(Action<T>) internally registers ConfigureNamedOptions<T> as a singleton instance — but it silently breaks for any IConfigureOptions<DistributedOptions> registered via a factory delegate (ImplementationFactory rather than ImplementationInstance), because those are filtered out:
// Only processes descriptors with ImplementationInstance — factory-registered options are silently skipped
foreach (var descriptor in services.Where(d =>
d.ServiceType == typeof(IConfigureOptions<DistributedOptions>) &&
d.ImplementationInstance is IConfigureOptions<DistributedOptions>))This is essentially reimplementing BuildServiceProvider() by hand to avoid calling BuildServiceProvider(). A more robust alternative is to defer the distributed activation decision to runtime rather than registration time. Since the executor (IModuleExecutor) is resolved from the container when the pipeline runs, you could use a switching executor that reads IOptions<DistributedOptions> at resolution time:
services.AddSingleton<IModuleExecutor>(sp =>
{
var opts = sp.GetRequiredService<IOptions<DistributedOptions>>().Value;
if (!opts.Enabled || opts.TotalInstances <= 1)
return sp.GetRequiredService<DefaultModuleExecutor>();
return opts.InstanceIndex == 0
? sp.GetRequiredService<DistributedModuleExecutor>()
: sp.GetRequiredService<WorkerModuleExecutor>();
});This is simpler, relies on the standard DI container, and handles all registration patterns correctly.
Significant: SignalR coordinator violates Interface Segregation
Both SignalRMasterCoordinator and SignalRWorkerCoordinator implement IDistributedCoordinator but throw NotSupportedException for the methods not applicable to their role:
SignalRWorkerCoordinator.EnqueueModuleAsync—NotSupportedExceptionSignalRWorkerCoordinator.WaitForResultAsync—NotSupportedExceptionSignalRWorkerCoordinator.GetRegisteredWorkersAsync—NotSupportedException
A coordinator that throws NotSupportedException on half its interface is not actually implementing the interface — it's using it as an umbrella type. The Redis implementation doesn't have this problem because Redis is symmetric. The SignalR design is inherently asymmetric (master hosts the Hub, workers connect as clients), so the interface should reflect that:
public interface IMasterCoordinator { ... } // EnqueueModuleAsync, WaitForResultAsync, GetRegisteredWorkersAsync
public interface IWorkerCoordinator { ... } // DequeueModuleAsync, PublishResultAsync, RegisterWorkerAsyncOr alternatively, the IDistributedCoordinator contract should document which methods are called on which side so implementers know what to expect.
Minor: RunIdentifierResolver duplicated across three packages
RunIdentifierResolver is implemented identically (or near-identically) in:
src/ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cssrc/ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cssrc/ModularPipelines.Distributed.Discovery.Redis/RunIdentifierResolver.cs
The resolution logic (GITHUB_SHA → BUILD_SOURCEVERSION → CI_COMMIT_SHA → git rev-parse → GUID) should live once, likely in the core ModularPipelines.Distributed namespace, and be reused. Three copies will diverge over time.
Minor: cloudflared downloaded from "latest" in CI without integrity check
curl -fsSL https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64 -o /usr/local/bin/cloudflaredPinning to latest means a new release with breaking changes or a compromised binary can break CI without any indication of what changed. Prefer a pinned version with a checksum verification:
CLOUDFLARED_VERSION: "2025.1.0"
CLOUDFLARED_SHA256: "<expected-sha>"Observation: No cancellation propagation in IDistributedCoordinator
The previous BroadcastCancellationAsync / IsCancellationRequestedAsync methods have been removed from the coordinator interface. This means there is currently no cross-instance cancellation mechanism — if the master cancels, workers will not be notified and will continue dequeuing until SignalCompletionAsync is received (or the queue runs dry). The Redis implementation previously had this, so it appears to have been dropped intentionally, but it's worth confirming this is an acceptable gap for the initial release.
There was a problem hiding this comment.
Code Review: Distributed Workers Mode — Round 7
Reviewing against the previous round (Round 6, submitted 2026-02-23T14:15:37Z) to track what has been addressed and what remains open.
Resolved Since Round 6
SignalR coordinator introduced — major architecture improvement
The previous round flagged the Redis dequeue ordering inconsistency (LRANGE+LREM with BRPOP fallback creating FIFO violations and capability-mismatch bouncing). This has been superseded by a new backend: ModularPipelines.Distributed.SignalR replaces the poll-based Redis coordinator with a push model. The master hosts a Kestrel/SignalR server; workers connect as clients and receive assignments via hub callbacks. This eliminates the polling and the LRANGE/BRPOP incoherence in one move. The pub/sub design is correct for this use case.
ModuleCompletionSourceApplicator still uses reflection — but TrySetDistributedResult was also added
IModule now exposes TrySetDistributedResult(IModuleResult), implemented in Module<T> via CompletionSource.TrySetResult — this is the clean public API that was requested in previous rounds. However, ModuleCompletionSourceApplicator still uses reflection (GetProperty("CompletionSource", NonPublic)) and is actively called from both DistributedModuleExecutor.CollectDistributedResultAsync and WorkerModuleExecutor.ApplyDependencyResults. The new TrySetDistributedResult method on IModule is not called anywhere in the codebase. This means the fragile reflection path is still the live code path. The fix is trivial: replace ModuleCompletionSourceApplicator.TryApply(module, result) with module.TrySetDistributedResult(result) at both call sites, then delete ModuleCompletionSourceApplicator.
IArtifactContext registration — FIXED
The singleton registration with string.Empty is gone. IArtifactContext is no longer registered in DI; ArtifactContextExtensions.Artifacts() calls context.GetService<IArtifactContext>() which will throw at runtime, but this is now a known TODO gap rather than a silent misattribution. (See open issue below.)
Redis upload OOM — status improved but not fully resolved
The Redis artifact store still reads the full stream into a MemoryStream before deciding whether to chunk:
using var ms = new MemoryStream();
await data.CopyToAsync(ms, cancellationToken);
var bytes = ms.ToArray();
if (bytes.Length <= _maxSingleUpload)
// single key
else
// chunk bytes[] ← already fully in heapThe chunking logic is correct in structure but the stream is fully buffered before any decision is made. For large artifacts this remains an OOM risk. The S3 store correctly streams to a temp file; Redis should do the same. This was flagged in Round 6 and is unchanged.
RunIdentifierResolver duplication — PARTIALLY ADDRESSED
There are still three copies: one in ModularPipelines.Distributed.Redis, one in ModularPipelines.Distributed.Artifacts.S3, and one in ModularPipelines.Distributed.Discovery.Redis. The deduplication was noted as a minor concern in Round 6 and has not been addressed.
New Issues Introduced Since Round 6
1. DistributedPipelineHub.MasterState — now correctly constructor-injected (not a bug)
The previous version of DistributedPipelineHub had an internal property setter for MasterState with no injection mechanism. The current version correctly takes SignalRMasterState as a constructor parameter. MasterServerHost registers it via builder.Services.AddSingleton(masterState). This is correct — SignalR hub DI works via constructor injection and the singleton registration is the right approach. No issue here.
2. TryAssignPendingWork race in DistributedPipelineHub — concurrent assignment loss
TryAssignPendingWork dequeues from PendingAssignments (a ConcurrentQueue) and calls workerState.TryMarkBusy(). If TryMarkBusy() fails (worker raced to busy), the assignment is re-enqueued. However, this worker then returns without attempting other idle workers. Combined with the loop structure that iterates over a snapshot count (pendingCount), a scenario where multiple workers complete simultaneously can leave assignments in the queue longer than necessary. This is a throughput concern, not a correctness bug, but given the design goal of maximising parallelism it is worth addressing by continuing to try other workers after a failed TryMarkBusy rather than stopping.
3. SignalRWorkerCoordinator.DequeueModuleAsync — single-item delivery with no re-request loop
DequeueModuleAsync invokes RequestWork, then waits on the channel for exactly one item, then returns. The WorkerModuleExecutor loop calls DequeueModuleAsync in a while loop, so subsequent calls each trigger a new RequestWork invocation. This creates a round-trip-per-module pattern. More importantly: if the channel completes (via OnSignalCompletion calling TryComplete()) while a worker is waiting for work that the master will never send (e.g., all remaining assignments require capabilities this worker doesn't have), the worker correctly stops. However, if a worker's RequestWork arrives at the hub between the master enqueuing and the hub's TryAssignPendingWork executing, the assignment can be missed and the worker hangs until a future RequestWork or completion signal. This is an inherent race in request-based rather than subscription-based work delivery.
4. WorkerModuleExecutorTests — still asserts nothing
WorkerModuleExecutorTests.Worker_Registers_With_Coordinator remains:
await Assert.That(true).IsTrue();This is the most execution-critical class in the feature and has zero meaningful test coverage. At minimum the test should verify that a module is dequeued, executed via IModuleRunner, and its result published. The comment says 'Detailed testing requires mocking the full DI and execution pipeline' — this is accurate but not a blocker; DependencyResultPropagationTests demonstrates that the module execution components can be wired with hand-constructed collaborators.
5. Cloudflared tunnel regex is too narrow
CloudflaredTunnel.TunnelUrlRegex matches only https://[a-zA-Z0-9-]+\.trycloudflare\.com. Cloudflare quick tunnels use multi-segment random hostnames (e.g., https://random-words-here.trycloudflare.com) but the pattern is constrained to a single label between https:// and .trycloudflare.com. If the tunnel URL contains a dash-separated multi-word hostname (which is the current Cloudflare format), the regex will fail to match and CloudflaredTunnel.StartAsync will time out, making the master unreachable to workers. The regex should be https://[\w-]+\.trycloudflare\.com or similar to match multi-label subdomain patterns.
6. ArtifactContextExtensions.Artifacts() — not registered, will throw at runtime
As noted above, IArtifactContext has no DI registration. context.GetService<IArtifactContext>() in ArtifactContextExtensions.Artifacts() will throw InvalidOperationException for any module that calls context.Artifacts(). This is a regression from the previous fix attempt. The cleanest resolution: change the extension to construct ArtifactContextImpl directly, taking IDistributedArtifactStore, IOptions<ArtifactOptions>, and the current module type name from context. Since the extension signature is Artifacts(this IPipelineContext context), and IPipelineContext does not expose the current module type, the extension should be changed to Artifacts(this IModuleContext context) where IModuleContext exposes the module type, or an overload accepting the module type explicitly.
7. Worker disconnect does not re-enqueue in-flight work — documented but unresolved
OnDisconnectedAsync in DistributedPipelineHub contains:
// TODO: Re-enqueue in-flight work for the disconnected workerIf a worker disconnects mid-execution, the master's WaitForResultAsync will block indefinitely for that module. This is a correctness issue; the pipeline will hang. Since the hub tracks WorkerState with IsBusy state, it is possible to detect what the worker was executing at disconnect time and re-publish the assignment. If re-execution is not feasible (e.g., side effects already occurred), at minimum the master should be notified and the affected result TCS cancelled with a descriptive error.
Still Open From Previous Rounds
ModuleCompletionSourceApplicator uses reflection despite TrySetDistributedResult existing — as described above, the reflection path is still live. TrySetDistributedResult was added to IModule but is unused. Replace the two ModuleCompletionSourceApplicator.TryApply call sites with module.TrySetDistributedResult(result).
MatrixModuleExpander not wired into DistributedModuleExecutor — the TODO(matrix) comment in DistributedModuleExecutor.ExecuteAsync and DistributedWorkPublisher.CreateAssignment is unchanged. [MatrixTarget] is a public attribute that does nothing. This was flagged in Rounds 3–6. Either connect MatrixModuleExpander.ScanForExpansions in the execution loop or remove the public-facing attribute and expander from this PR.
Redis artifact store uploads full stream to memory — unchanged, as described above.
Summary
The SignalR backend is a substantial improvement over the polling Redis coordinator and correctly addresses the FIFO/livelock concerns from previous rounds. The hub DI wiring, pub/sub completion signaling, and worker lifecycle are well-structured.
The three items that need resolution before merge:
- Replace
ModuleCompletionSourceApplicatorreflection withmodule.TrySetDistributedResult(result)— the public API already exists, just use it. IArtifactContextis unregistered —context.Artifacts()throws at runtime for all users of the feature.- Worker disconnect hangs the master — the TODO in
OnDisconnectedAsyncneeds a concrete cancellation path (even if re-execution is deferred).
The remaining items (Cloudflared regex, WorkerModuleExecutorTests stub, MatrixModuleExpander dead code, Redis OOM) are improvements that would increase robustness and correctness of documentation, but are lower severity.
…n for WorkerModuleScheduler - ConfigurationTests: update expected ConnectionTimeoutSeconds from 30 to 120 to match the default change in SignalRDistributedOptions - WorkerModuleScheduler: return never-completing Channel reader instead of throwing NotSupportedException from ReadyModules property
If Redis evicts individual chunk keys under memory pressure, the download would silently return truncated data. Now validates the reassembled length against the stored SizeBytes in ArtifactReference.
There was a problem hiding this comment.
Distributed Workers Mode — Current Pass Code Review
After many revision rounds, this PR has matured significantly. The core architecture is solid: the pluggable IDistributedCoordinator/IDistributedArtifactStore abstractions, real IHubContext<T> usage in SignalRMasterCoordinator, Lua-based atomic dequeue in Redis (TryScanAndClaimAsync), Lazy<Task> deduplication in ArtifactLifecycleManager, proper await _app.StartAsync() for Kestrel readiness, and the check-subscribe-recheck pattern in WaitForResultAsync are all correct and well-thought-out. The 86+ tests across three packages demonstrate strong coverage. Below are the remaining issues that should be addressed before merging.
🔴 Critical: Worker disconnection causes master hang
DistributedPipelineHub.cs — OnDisconnectedAsync
public override Task OnDisconnectedAsync(Exception? exception)
{
if (_masterState.Workers.TryRemove(Context.ConnectionId, out var workerState))
{
_logger.LogWarning("Worker {Index} disconnected (connection {ConnectionId})",
workerState.Registration.WorkerIndex, Context.ConnectionId);
// TODO: Re-enqueue in-flight work for the disconnected worker
}
return Task.CompletedTask;
}The TODO has been present for multiple rounds. When a worker disconnects mid-execution, the master calls WaitForResultAsync on a TaskCompletionSource that is never completed. With ModuleResultTimeoutSeconds = 0 (the default), this is an infinite hang — the pipeline stalls silently.
Suggested approach: Track each worker's currently-executing assignment in WorkerState. In OnDisconnectedAsync, if the worker had an in-flight assignment, either call PublishResultAsync with a failure result, or re-enqueue the assignment so another worker (or the master) can retry it. The failure-result path is simpler and gives a clear error to the caller:
if (workerState.CurrentAssignment is { } assignment)
{
var failure = SerializedModuleResult.Failure(assignment.ModuleTypeName, "Worker disconnected");
if (_masterState.ResultWaiters.TryGetValue(assignment.ModuleTypeName, out var tcs))
tcs.TrySetResult(failure);
}🔴 Critical: WaitForResultAsync (Redis) silently drops deserialization errors
RedisDistributedCoordinator.cs
subscription.OnMessage(msg =>
{
var result = JsonSerializer.Deserialize<SerializedModuleResult>(msg.Message.ToString(), _jsonOptions)!;
tcs.TrySetResult(result);
});If JsonSerializer.Deserialize throws (malformed message, schema mismatch), the exception is swallowed by StackExchange.Redis's message handler. The tcs is never resolved and WaitForResultAsync hangs indefinitely (or until the optional timeout fires, if one is configured).
Suggested fix: Wrap the callback body in try/catch and call tcs.TrySetException on failure:
subscription.OnMessage(msg =>
{
try
{
var result = JsonSerializer.Deserialize<SerializedModuleResult>(msg.Message.ToString(), _jsonOptions)!;
tcs.TrySetResult(result);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
});🟡 Design: IDistributedCoordinator violates Interface Segregation
SignalRWorkerCoordinator throws NotSupportedException for three methods that are conceptually master-only:
public Task EnqueueModuleAsync(...) => throw new NotSupportedException("Workers do not enqueue work.");
public Task<SerializedModuleResult> WaitForResultAsync(...) => throw new NotSupportedException("Workers do not wait for results.");
public Task<IReadOnlyList<WorkerRegistration>> GetRegisteredWorkersAsync(...) => throw new NotSupportedException("Workers do not query registered workers.");This means the interface cannot be used safely by code that calls these methods without knowing the underlying role. This has been raised in earlier rounds — splitting into IMasterCoordinator : IDistributedCoordinator and IWorkerCoordinator : IDistributedCoordinator (or separate interfaces entirely) would eliminate the NotSupportedException paths and make the role contract explicit at the type level. That said, since both master and worker use IDistributedCoordinator in DistributedModuleExecutor, this refactor has non-trivial scope — at minimum, consider adding XML doc comments on the throwing methods noting they are master-only, and adding a guard in the factory so the wrong coordinator type cannot be injected for a given role.
🟡 Design: [MatrixTarget] ships as dead public API
MatrixTargetAttribute is public with a clean, usable API:
[MatrixTarget("linux", "windows", "macos")]
public class MyModule : Module<string> { ... }But the expander is explicitly not wired up:
// TODO(matrix): MatrixModuleExpander.ScanForExpansions not yet connected.
// Modules with [MatrixTarget] will run once, not N times.Shipping a public attribute that silently does nothing when used is a leaky API contract — users will apply it expecting N runs and get 1. Recommended: Either remove [MatrixTarget] and MatrixModuleExpander from this PR (add in a follow-up), or add a runtime warning/exception in DistributedModuleExecutor when it encounters a module decorated with [MatrixTarget] so users get actionable feedback.
🟡 Maintainability: RunIdentifierResolver duplicated across packages
src/ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cs and src/ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cs are byte-for-byte identical. The same CI environment variable resolution logic (GITHUB_SHA, BUILD_SOURCEVERSION, CI_COMMIT_SHA, git fallback, GUID fallback) is copied in two places.
Suggested approach: Extract to a shared internal helper in ModularPipelines (core) or a ModularPipelines.Distributed package, and reference it from both packages. If inter-package dependency is undesirable, link the file as a shared source.
🟡 Reliability: ModuleResultTimeoutSeconds defaults to 0 (wait forever)
/// <summary>
/// Default timeout in seconds for waiting for a distributed module result.
/// Applied when a module has no explicit Timeout configured. 0 = no timeout (wait forever).
/// </summary>
public int ModuleResultTimeoutSeconds { get; set; }In a distributed system where workers can crash, a default of "no timeout" combined with the unresolved OnDisconnectedAsync TODO creates a silent, indefinite hang with no signal to the operator. Until the disconnect-recovery is implemented, consider defaulting this to something reasonable (e.g., 3600 seconds) so failures surface in bounded time.
✅ What's in good shape
- Reflection-based
CompletionSourceaccess is gone —TrySetDistributedResultviaIModuleis clean - Redis
TryScanAndClaimAsyncusing a Lua script is correctly atomic (LRANGE + LREM in a single script evaluation) ArtifactLifecycleManagerfailure path calls_completedRestores.TryRemoveso retries workMasterServerHostusesawait _app.StartAsync()so there is no race between server readiness and worker connectionSignalRMasterCoordinatoruses the realIHubContext<DistributedPipelineHub>from the WebApp DI container — the no-op proxy issue from earlier rounds is fully resolved- Redis pub/sub-based
DequeueModuleAsynccorrectly avoids the initial-scan → subscribe race with a re-check after subscription - Artifact download deduplication via
ConcurrentDictionary<string, Lazy<Task>>is a correct and efficient pattern - The
CloudflaredTunneldisposal callsKill(entireProcessTree: true)which correctly cleans up the child process
There was a problem hiding this comment.
Round 7 Review: Distributed Workers Mode
I reviewed the current code at SHA 541ffb241d against all previous review rounds. Several issues from the previous review have been resolved; the two critical correctness bugs remain open.
Resolved Since Previous Review
specs/directory — removed ✓WorkerModuleScheduler.ReadyModules— now returns a static emptyChannel<ModuleState>instead of throwingNotSupportedException✓- Redis upload OOM —
RedisDistributedArtifactStore.UploadAsyncdoes NOT buffer the entire stream; it reads in fixed-size_chunkSizechunks and makes the single-vs-chunked decision based on the first read result. The previous review's OOM concern does not apply to the current code ✓ - Redis chunked download integrity — size validation is present on reassembly ✓
ArtifactLifecycleManagerLazy<Task>exception caching — on failure, the key is removed from_completedRestores, allowing retry. The previous concern does not apply ✓
Still Open: Critical
1. GetModule<T>() cross-process hang — fix reverted with no alternative
The commit 799d23c7 reverts the [PinToMaster] protection for modules that call context.GetModule<T>(). In distributed mode, GetModule<T>() awaits a TaskCompletionSource that is set only in the process that executed the module. If a worker calls GetModule<T>() for a module that ran on the master or another worker, the source is never set in that process and the worker deadlocks permanently.
This silently breaks any existing pipeline that uses GetModule<T>() — the most common module-composition pattern in the framework. Users have no warning.
Options:
- Restore
[PinToMaster]auto-detection and restore the reverted commit - Throw a clear
DistributedModeExceptionfromGetModule<T>()on non-master processes with a message pointing users to[PinToMaster] - Propagate completed results for ALL modules (not just declared
[DependsOn<T>]dependencies) through the coordinator so workers can satisfyGetModule<T>()calls cross-process
2. IArtifactContext not registered in DI — context.Artifacts() throws at runtime
ArtifactContextExtensions.Artifacts() calls context.GetService<IArtifactContext>(), but IArtifactContext / ArtifactContextImpl has no DI registration anywhere — not in DependencyInjectionSetup.RegisterDistributedServices(), not in DistributedPipelineBuilderExtensions, not in any package extension. Any module that calls context.Artifacts() will throw InvalidOperationException at runtime.
ArtifactContextImpl requires a string currentModuleTypeName constructor parameter, making a plain singleton registration incorrect (all modules would share an empty or wrong type name). The cleanest fix avoids DI registration entirely:
// Change signature from IPipelineContext to IModuleContext
public static IArtifactContext Artifacts(this IModuleContext context)
{
return new ArtifactContextImpl(
context.Services.Get<IDistributedArtifactStore>(),
context.Services.Get<IOptions<ArtifactOptions>>(),
context.CurrentModuleType.FullName!);
}This constructs the correct instance per-call without any lifetime mismatch.
Still Open: Significant
3. WorkerModuleExecutorTests is a placeholder
[Test]
public async Task Worker_Registers_With_Coordinator()
{
// Detailed testing requires mocking the full DI and execution pipeline.
await Assert.That(true).IsTrue();
}This is the most execution-critical component in the entire feature. The existing integration test in DistributedPipelineIntegrationTests exercises the coordinator and master path but simulates the worker manually — it does not go through WorkerModuleExecutor. At minimum there should be a test that verifies: a module is dequeued → executed → result published to coordinator. Without this, regressions in the worker path (the GetModule<T>() hang, capability matching, artifact lifecycle) are invisible.
4. MatrixModuleExpander is dead public API
[MatrixTarget] is a public attribute that silently does nothing. MatrixModuleExpander.ScanForExpansions is never called from DistributedModuleExecutor.ExecuteAsync or anywhere else. The PR description marks tasks T044–T048 as [x] complete, which is inaccurate. A TODO(matrix) comment in the executor acknowledges this.
Before merging: either wire the expander into DistributedModuleExecutor.ExecuteAsync, or remove [MatrixTarget], [MatrixExpansionAttribute], and MatrixModuleExpander from this PR and mark the matrix tasks as deferred. Shipping public API surface that appears functional but has no effect is worse than shipping nothing — users will write code against it and discover it does nothing only at runtime.
5. ModuleCompletionSourceApplicator still uses reflection for dependency results
WorkerModuleExecutor.ApplyDependencyResults calls ModuleCompletionSourceApplicator.TryApply(depModule, result), which accesses the internal CompletionSource property of dependency modules via private reflection. While the module's own ResultTask is now properly exposed through IModule.TrySetDistributedResult (fixing the critical issue from Round 3), the dependency-result application path still bypasses the framework's public contract.
The fix pattern is already established — extend IModule with an internal SetDependencyResult(IModuleResult) method (or reuse TrySetDistributedResult if semantics permit), so the entire distributed result-application path goes through controlled API rather than reflection that will break silently on any Module<T> refactor.
6. InMemory dequeue: capability-mismatch spin with no backoff
When a queued item does not match any connected worker's capabilities, each worker that wakes scans the full queue, finds no match, and re-releases the semaphore — immediately waking the next worker to repeat. With N workers and one incompatible item, this becomes an O(N) spin loop with no sleep.
Adding a short delay before re-releasing the semaphore (await Task.Delay(50, cancellationToken) before _workAvailable.Release() in the no-match branch) would break the spin without meaningful latency impact, since the scenario only occurs when no suitable worker exists.
Minor
7. RunIdentifierResolver duplicated across three packages
Identical implementations (same priority chain: explicit config → GITHUB_SHA → BUILD_SOURCEVERSION → CI_COMMIT_SHA → git rev-parse HEAD → Guid.NewGuid()) exist in ModularPipelines.Distributed.Redis, ModularPipelines.Distributed.Artifacts.S3, and ModularPipelines.Distributed.Discovery.Redis. Any bug fix or new CI environment must be applied three times. This should be consolidated into a shared location.
8. Redis queue LIFO ordering (undocumented)
EnqueueModuleAsync uses ListLeftPushAsync (head-push), and the Lua scan starts from index 0 (head), so the most recently enqueued module executes first — LIFO, not FIFO. This may be intentional, but it is undocumented and can cause starvation for older items when new work arrives continuously. If FIFO is intended, change to ListRightPushAsync on enqueue.
Summary Table
| # | Issue | Status |
|---|---|---|
| 1 | GetModule<T>() cross-process hang — fix reverted, no alternative |
Critical |
| 2 | IArtifactContext not in DI — context.Artifacts() throws at runtime |
Critical |
| 3 | WorkerModuleExecutorTests is a placeholder with no real assertions |
Significant |
| 4 | MatrixModuleExpander not wired — [MatrixTarget] silently does nothing |
Significant |
| 5 | ModuleCompletionSourceApplicator uses reflection for dependency results |
Significant |
| 6 | InMemory dequeue: capability-mismatch busy-spin with no backoff | Significant |
| 7 | RunIdentifierResolver duplicated in three packages |
Minor |
| 8 | Redis LIFO ordering undocumented | Minor |
| 9 | specs/ directory |
Fixed ✓ |
| 10 | WorkerModuleScheduler.ReadyModules throws |
Fixed ✓ |
| 11 | Redis upload OOM | Fixed ✓ (current code streams chunk-by-chunk) |
| 12 | ArtifactLifecycleManager Lazy<Task> failure caching |
Fixed ✓ |
| 13 | Redis chunked download integrity | Fixed ✓ |
| (prior rounds) | CompletionSource reflection, BuildServiceProvider, WorkerHealthMonitor stub, CancellationToken.None, InMemory livelock, BRPOP ordering, TOTAL_INSTANCES, S3 MemoryStream, plugin thread safety, duplicate AddDistributedMode | All Fixed ✓ |
Issues #1 and #2 are runtime correctness bugs that will affect any user of the feature as shipped. They should be resolved before merge.
…egex - Make MatrixTargetAttribute internal (not yet wired to executor, avoid shipping dead public API) - Replace 50ms busy-poll in SignalRMasterCoordinator.DequeueModuleAsync with SemaphoreSlim signal from EnqueueModuleAsync/SignalCompletionAsync - Generalize cloudflared tunnel URL regex to support custom domains and add warning log when URL detection times out
Response to Code Review (Latest Pass)Addressed in commit 976e647:
Not addressing in this PR:
|
…hed www.cloudflare.com The generalized regex `https://[a-zA-Z0-9\-]+\.[a-zA-Z0-9\-\.]+` was too broad and matched `https://www.cloudflare.com` from cloudflared's log output, causing workers to connect to the Cloudflare homepage instead of the tunnel URL. Reverted to `*.trycloudflare.com` since we only use quick tunnels.
There was a problem hiding this comment.
Code Review: Distributed Workers Mode (PR #2430)
This is a substantial and well-architected PR that adds distributed pipeline execution. The abstractions are clean and the documentation is thorough. Below are observations ranging from architectural concerns to specific implementation issues.
Architecture and Design
Overall design is sound. The master/worker model with a pluggable coordinator (IDistributedCoordinator) is a well-established pattern. Separating concerns into coordinator, artifact store, capability matcher, and serializer layers is clean and extensible.
The DeferredCoordinator and DeferredArtifactStore in PipelineBuilder.cs are a good pattern, but the double-checked locking has a subtle non-atomicity issue:
private async ValueTask<IDistributedCoordinator> GetAsync(CancellationToken ct)
{
if (_inner is not null) return _inner; // unsynchronized read - fine for reference reads
await _lock.WaitAsync(ct);
try { return _inner ??= await factory.CreateAsync(ct); }
finally { _lock.Release(); }
}The initial null check outside the lock is fine for reference reads in .NET (references are written atomically), but _inner should be declared volatile to prevent CPU/compiler reordering from returning a partially-initialized object that was written before volatile memory barriers were published. Add private volatile IDistributedCoordinator? _inner; to be safe.
Significant Issues
1. ModuleAssignment.RequiredCapabilities uses HashSet<string> (mutable) in a public record
public record ModuleAssignment(
string ModuleTypeName,
// ...
HashSet<string> RequiredCapabilities, // mutable collection in a record
// ...Using a mutable HashSet<string> in a record breaks value equality semantics (records use reference equality for mutable collections) and makes the assignment payload mutable after creation. Since IReadOnlySet<string> is already used in IDistributedCoordinator.DequeueModuleAsync and capability matching, the record should use IReadOnlySet<string> or FrozenSet<string> instead:
public record ModuleAssignment(
// ...
IReadOnlySet<string> RequiredCapabilities,
// ...The same issue applies to WorkerRegistration.Capabilities.
2. ResolveDistributedOptions in PipelineBuilder.cs only processes ImplementationInstance descriptors
foreach (var descriptor in services.Where(d =>
d.ServiceType == typeof(IConfigureOptions<DistributedOptions>) &&
d.ImplementationInstance is IConfigureOptions<DistributedOptions>)) // <-- only instancesservices.Configure<DistributedOptions>(o => { ... }) typically registers ConfigureNamedOptions<T> via a factory (ImplementationFactory), not an ImplementationInstance. This means the ResolveDistributedOptions method can fail silently: it detects hasConfigureOptions as true (because the ServiceType exists), creates an empty DistributedOptions, loops over zero matching descriptors, and returns defaults. This could cause distributed mode to be erroneously inactive or to default to the wrong role.
A more reliable approach would be to use a sentinel value or a simpler detection mechanism — for example, registering a concrete DistributedOptions instance as a singleton when AddDistributedMode is called, rather than relying on the options framework internals.
3. InMemoryDistributedCoordinator missing SendHeartbeatAsync and BroadcastCancellationAsync / IsCancellationRequestedAsync
The documentation (architecture.md) describes these methods as part of the coordinator interface, but the IDistributedCoordinator interface in the diff only has 7 methods (no SendHeartbeatAsync, BroadcastCancellationAsync, or IsCancellationRequestedAsync). The docs appear to describe a different (earlier?) iteration of the interface. This should be reconciled — either the interface is incomplete or the documentation is out of date. If WorkerCancellationMonitor polls via these methods (as the docs state), they need to be in the interface.
4. WorkerModuleScheduler has a shared static EmptyChannel that is never completed
private static readonly Channel<ModuleState> EmptyChannel = Channel.CreateUnbounded<ModuleState>();
public ChannelReader<ModuleState> ReadyModules => EmptyChannel.Reader;Since this is static, all WorkerModuleScheduler instances share the same channel. If anything calls ReadAllAsync() on this reader it will block forever (the channel is never written to or completed). The channel writer is never accessible. This is mostly harmless because WorkerModuleExecutor doesn't consume ReadyModules, but if anything in the framework iterates ReadyModules, it will deadlock. Each instance should have its own completed channel:
private readonly Channel<ModuleState> _emptyChannel;
public WorkerModuleScheduler()
{
_emptyChannel = Channel.CreateUnbounded<ModuleState>();
_emptyChannel.Writer.Complete();
}
public ChannelReader<ModuleState> ReadyModules => _emptyChannel.Reader;5. Lua script in RedisDistributedCoordinator scans entire queue on every dequeue
local items = redis.call('LRANGE', KEYS[1], 0, -1)The ScanAndClaimScript does a full LRANGE over the entire work queue to find a matching assignment. In pipelines with many queued modules, this becomes O(N) per dequeue attempt, executed atomically inside a Lua script (blocking Redis for the duration). For moderate queues this is acceptable, but for large pipelines this can cause Redis latency spikes. Consider adding a comment about this tradeoff, or implementing separate per-capability queues as a future optimization.
6. DistributedModuleExecutor executes assignments sequentially on the master
while (!cancellationToken.IsCancellationRequested)
{
var assignment = await _coordinator.DequeueModuleAsync(capabilities, cancellationToken);
// ...
await ExecuteAssignmentAsync(assignment, modules, workerScheduler, cancellationToken);The master worker loop is fully sequential — it executes one module at a time. This means the master never executes multiple modules in parallel, even though the original (non-distributed) executor likely does. The WorkerModuleExecutor has the same limitation. For pipelines where modules have no capability constraints (and many run on the master), this reduces parallelism significantly. A SemaphoreSlim-bounded parallel loop would be more consistent with the original behavior.
7. Artifact upload failure silently allows result publication to proceed
catch (Exception ex)
{
_logger.LogError(ex, "Failed to upload artifacts for {Module}", assignment.ModuleTypeName);
}
// falls through - result is still published without artifactsIf artifact upload fails, the result is published without the artifact references. A consumer module that [ConsumesArtifact] from this module will then fail at download time with a more confusing error. Consider treating artifact upload failure as a module failure, or at minimum publishing a result with an explicit "artifacts missing" indicator so downstream modules fail with a clear message.
8. ModuleResultSerializer silently falls back to absolute path when file is outside git root
// Path is outside git root — store as-is (will only work on same platform)
return absolutePath;This silent fallback produces a cross-platform serialization bug — an absolute Windows path gets sent to a Linux worker (or vice versa) and fails at deserialization with a cryptic File not found error. The serializer should throw or log a warning when it cannot make a path portable, rather than silently storing an OS-specific path.
Minor Issues
9. DistributedOptions lacks XML documentation
DistributedOptions is a public class but its properties have no XML doc comments, unlike the configuration docs (which are excellent). Adding <summary> tags would enable IDE tooling to surface descriptions.
10. MatrixTargetAttribute is internal but documented as a public user-facing feature
The docs (capabilities.md) show [MatrixTarget("windows", "linux", "macos")] as a user-facing attribute, but it is declared internal sealed class MatrixTargetAttribute. Users cannot apply it. This should either be made public or the documentation should be removed/marked as "coming soon."
11. DistributedOptions.HeartbeatIntervalSeconds and HeartbeatTimeoutSeconds are documented but missing from the class
The configuration docs list HeartbeatIntervalSeconds (default 10) and HeartbeatTimeoutSeconds (default 30) as properties of DistributedOptions, but these are not present in the DistributedOptions class in the diff. The WorkerHealthMonitor and WorkerHeartbeatService background services mentioned in the architecture docs are also absent. The docs may be aspirational — if heartbeat monitoring is not yet implemented, the docs should note this.
12. ConsumesArtifactAttribute.RestorePath setter is public but should validate
public string? RestorePath { get; set; }The property is a mutable string with no validation. Since it's configured at compile time via an attribute initializer, a required property or init-only setter would be more appropriate and prevent accidental runtime mutation.
13. strategy.job-total GitHub Actions expression may not be available
TOTAL_INSTANCES: ${{ strategy.job-total }}strategy.job-total is a GitHub Actions context expression that reflects the total number of matrix jobs. This should work correctly for the matrix defined in the workflow, but it is worth validating that this expression is stable across GitHub Actions runner versions and doesn't return incorrect values when fail-fast: false is used and some jobs are cancelled.
Positive Observations
- The check-subscribe-recheck pattern in
RedisDistributedCoordinator.WaitForResultAsynccorrectly handles the race condition and is well-documented. - Compressing large dependency results before sending them through Redis (
GzipPrefix+ base64) is a thoughtful optimization that prevents payload limit issues. - Run identifier auto-detection (GitHub SHA → Azure DevOps SHA → GitLab SHA → git → GUID) is pragmatic and correctly isolates concurrent runs.
- The
IDistributedCoordinatorinterface is clean and narrow enough to be implementable with diverse backends (HTTP, shared filesystem, message queues, etc.). - Worker registration polling with a configurable
CapabilityTimeoutSecondsgracefully handles staggered matrix runner startup. - The test coverage across coordinator, serialization, capability matching, and integration scenarios is thorough.
Summary: The core design is solid and the implementation handles most edge cases (race conditions, master hanging, worker crash). The most impactful issues to address before merge are: (1) the ResolveDistributedOptions reliability concern, (2) the mutable HashSet in ModuleAssignment, (3) making MatrixTargetAttribute public or removing it from docs, and (4) clarifying which features documented in architecture.md (heartbeats, cancellation broadcast, BroadcastCancellationAsync) are actually implemented in this PR.
🤖 Generated with Claude Code
There was a problem hiding this comment.
Code Review: feat: Distributed Workers Mode (Round 13)
The fixes from commit 976e647 are confirmed. Three of the six items from the last round are resolved.
Fixed Since Last Review ✓
| # | Issue | Status |
|---|---|---|
| 1 | [MatrixTarget] dead public API |
Fixed — internal sealed with XML docs noting "Not yet wired... Reserved for future use" |
| 3 | SignalRMasterCoordinator 50ms busy-poll |
Fixed — SemaphoreSlim.WaitAsync replaces the polling loop |
| 5 | CloudflaredTunnel URL regex hardcoded domain |
Fixed — generalised to https://[a-zA-Z0-9\-]+\.[a-zA-Z0-9\-\.]+ with api.cloudflare.com exclusion |
Not Addressed (Author's Decision — Noted)
| # | Issue |
|---|---|
| 2 | IDistributedCoordinator ISP violation — NotSupportedException guards are present; all implementations are internal so external callers cannot trigger them. Acceptable for now. |
| 4 | SignalCompletionAsync cancellation race — narrow window, TrySetCanceled is idempotent on resolved waiters. Acceptable. |
| 6 | 5 CI runners on every push — deferred. |
Remaining Open Issues
These were raised in earlier rounds and are still present in the current head commit.
1. docs/docs/distributed/architecture.md describes a different interface than exists
The architecture doc lists a Cancellation concern with BroadcastCancellationAsync and IsCancellationRequestedAsync, and describes background services WorkerHealthMonitor, WorkerHeartbeatService, and WorkerCancellationMonitor. None of these exist in the codebase or in IDistributedCoordinator (which has 7 methods). The docs imply 9+ methods.
Why this matters: users implementing a custom coordinator will write stub implementations for methods that don't exist, or look for services that aren't there. This is a straightforward documentation fix — update the interface table and remove the non-existent services section.
2. RunIdentifierResolver triplicated with inconsistent fallback logic — could cause runtime failures
Three independent copies exist:
src/ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cs ← git SHA / CI env vars
src/ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cs ← byte-for-byte identical to Redis
src/ModularPipelines.Distributed.Discovery.Redis/RunIdentifierResolver.cs ← SHA256 hash of CWD (different!)
The inconsistency is the real concern: the discovery package derives a run identifier using SHA256 of Environment.CurrentDirectory, while the coordinator and artifact store use git SHA or CI environment variables. If these produce different strings, the discovery package and coordinator will look up different Redis keys and workers will fail to locate the master.
Fix: move RunIdentifierResolver into ModularPipelines.Distributed core (it was already suggested in round 1) and agree on a single fallback order. The two identical copies can simply reference the shared one.
3. ArtifactLifecycleManager.ResolvePathPattern silently returns wrong results for ** glob patterns
var wildcardIndex = pathPattern.IndexOfAny(['*', '?']);
var baseDir = Path.GetDirectoryName(pathPattern[..wildcardIndex]); // "**/bin/*.dll" → "" → cwd
var searchPattern = Path.GetFileName(pathPattern); // → "*.dll" (loses the "/bin/" constraint)
var matches = Directory.GetFiles(baseDir, searchPattern, SearchOption.AllDirectories);
// Returns ALL *.dll files from cwd, not only those under bin/ directoriesFor **/bin/*.dll the bin/ segment is silently dropped; for src/tests/**/*.cs the tests/ sub-path is also lost. No error is raised — callers get plausible-looking results that may include far more files than intended. Published artifacts can become unexpectedly large.
Suggested fix using Microsoft.Extensions.FileSystemGlobbing (no new NuGet dependency — it ships in-box with the SDK):
var matcher = new Matcher();
matcher.AddInclude(pathPattern);
var result = matcher.Execute(
new DirectoryInfoWrapper(new DirectoryInfo(Directory.GetCurrentDirectory())));
return result.Files
.Select(f => Path.GetFullPath(Path.Combine(Directory.GetCurrentDirectory(), f.Path)))
.ToList();4. InMemoryDistributedCoordinator.WaitForResultAsync — shared TCS can be permanently poisoned by cancellation
// Current code
var tcs = _results.GetOrAdd(moduleTypeName,
_ => new TaskCompletionSource<SerializedModuleResult>()); // shared across all callers
using var reg = cancellationToken.Register(
() => tcs.TrySetCanceled(cancellationToken)); // permanently sets state to Cancelled
return await tcs.Task;If any caller's token fires, the shared TCS moves to the Cancelled terminal state. Every subsequent caller for the same module immediately throws OperationCanceledException, and PublishResultAsync's TrySetResult silently no-ops. Note also that TaskCreationOptions.RunContinuationsAsynchronously is absent here (it is present in the SignalRMasterCoordinator equivalent), so cancellation callbacks run synchronously on the cancelling thread.
Fix — use Task.WaitAsync to propagate cancellation without touching the shared TCS:
public Task<SerializedModuleResult> WaitForResultAsync(
string moduleTypeName, CancellationToken cancellationToken)
{
var tcs = _results.GetOrAdd(moduleTypeName,
_ => new TaskCompletionSource<SerializedModuleResult>(
TaskCreationOptions.RunContinuationsAsynchronously));
return tcs.Task.WaitAsync(cancellationToken);
}Task.WaitAsync cancels only the awaiting call, leaving the underlying TCS untouched for other waiters and for PublishResultAsync.
5. Worker disconnect silently loses in-flight work — open TODO
DistributedPipelineHub.OnDisconnectedAsync contains:
// TODO: Re-enqueue in-flight work for the disconnected worker
return Task.CompletedTask;A worker that crashes after DequeueModuleAsync but before PublishResultAsync leaves the master's WaitForResultAsync blocked indefinitely (ModuleResultTimeoutSeconds defaults to 0, meaning no timeout). The pipeline hangs with no observable error.
This is worse than either of the two acceptable outcomes: (a) re-enqueue the assignment for another worker to pick up, or (b) fail fast with a clear error message. The current behaviour is an invisible hang. At minimum the TODO should be replaced with a LogCritical and a TrySetException on the result waiter so the pipeline fails loudly instead of stalling forever.
Summary
The three fixes are clean and correct. The five remaining items are in descending order of urgency:
- Architecture docs — misleads
IDistributedCoordinatorimplementors with non-existent methods and services RunIdentifierResolverinconsistency — discovery package uses a different algorithm; could cause workers to fail to locate master- Worker disconnect TODO — silent infinite hang on worker crash; needs resolution or explicit documentation
- TCS poisoning in
InMemoryDistributedCoordinator— correctness bug in less-common paths; one-line fix available ResolvePathPattern**glob — silently uploads wrong file sets;FileSystemGlobbingis the right fix
Items 1–3 are the strongest candidates to address before merge.
…cate shared code, pin cloudflared - Add volatile to DeferredCoordinator/DeferredArtifactStore _inner fields for correct double-checked locking memory visibility (#9) - Replace O(n) FirstOrDefault module lookups with Dictionary<string, IModule> built once per execution in both executors (#4) - Extract DependencyResultApplicator with shared ApplyDependencyResults and PublishResolutionFailureAsync, eliminating duplication between DistributedModuleExecutor and WorkerModuleExecutor (#5) - Pin cloudflared to version 2026.2.0 in CI workflow instead of using latest release (#8)
Response to 9-Item Code ReviewAddressed in commit bc84860:
Previously addressed:
Not addressing in this PR:
Minor (HeartbeatIntervalSeconds/HeartbeatTimeoutSeconds): These properties don't exist in the codebase — false positive from the reviewer. |
There was a problem hiding this comment.
Code Review: Distributed Execution Mode (#2430)
This is a substantial and well-structured addition. The architecture is clearly thought through, the abstractions are clean, and the test coverage is extensive. Comments below focus on architectural concerns rather than style.
1. Interface Segregation Violation in IDistributedCoordinator ⚠️
IDistributedCoordinator is a unified interface implemented by both master and worker coordinators. In the SignalR implementation:
SignalRWorkerCoordinator.EnqueueModuleAsync→ throwsNotSupportedExceptionSignalRWorkerCoordinator.WaitForResultAsync→ throwsNotSupportedExceptionSignalRWorkerCoordinator.GetRegisteredWorkersAsync→ returns emptySignalRWorkerCoordinator.SignalCompletionAsync→ no-op
This violates the Interface Segregation Principle — clients are forced to depend on methods they can't implement. It also makes future implementations error-prone (new implementors won't know which methods are safe to throw on).
Better approach: Split the interface into role-specific contracts:
public interface IMasterCoordinator
{
Task EnqueueModuleAsync(ModuleAssignment assignment, CancellationToken ct);
Task<SerializedModuleResult> WaitForResultAsync(string moduleTypeName, CancellationToken ct);
Task<IReadOnlyList<WorkerRegistration>> GetRegisteredWorkersAsync(CancellationToken ct);
Task SignalCompletionAsync(CancellationToken ct);
}
public interface IWorkerCoordinator
{
Task<ModuleAssignment?> DequeueModuleAsync(IReadOnlySet<string> capabilities, CancellationToken ct);
Task PublishResultAsync(SerializedModuleResult result, CancellationToken ct);
Task RegisterWorkerAsync(WorkerRegistration registration, CancellationToken ct);
}This is especially important because IDistributedCoordinator is public — it's part of the extension point API that third-party users will implement.
2. RunIdentifierResolver Duplicated Across Three Packages ⚠️
src/ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cs and src/ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cs are identical (73 lines each), both implementing: explicit config → GITHUB_SHA → BUILD_SOURCEVERSION → CI_COMMIT_SHA → git rev-parse HEAD → GUID fallback.
src/ModularPipelines.Distributed.Discovery.Redis/RunIdentifierResolver.cs uses a different strategy (directory hash).
The shared logic belongs in ModularPipelines.Distributed (the core package) as a single canonical resolver. Duplicated code in separate packages will diverge — e.g., if a new CI platform's SHA env var needs to be added, it'll need to be updated in 2+ places.
3. Architecture Docs Don't Match Implementation
docs/docs/distributed/architecture.md documents:
BroadcastCancellationAsync()IsCancellationRequestedAsync()
The actual IDistributedCoordinator interface has SignalCompletionAsync() instead. The docs appear to be from an earlier design iteration. This will confuse users trying to implement custom coordinators.
4. SignalR Transport Requires Redis Anyway
The ModularPipelines.Distributed.Discovery.Redis package exists to solve "how do workers find the master's SignalR URL?" The answer is: store the URL in Redis.
This means the SignalR transport path requires:
- Redis (for service discovery via
Discovery.Redis) - cloudflared tunnel (to expose the master's SignalR hub to external workers)
- A running ASP.NET Core web server on the master
Whereas the pure Redis transport path requires only Redis. For most CI use cases, the Redis-only path is simpler, cheaper, and more reliable. It's worth documenting why SignalR is preferred over Redis in scenarios where Redis is already available — the current docs don't make this case.
5. Lua Script Performance on Large Work Queues
In RedisDistributedCoordinator.TryScanAndClaimAsync, the Lua script does:
LRANGE 0 -1— fetch the entire work queue- Iterate in Lua to find the first capability-matching item
LREMthat specific item
Lua scripts in Redis block all other operations for their duration. On a large work queue with many items, this scan could block the Redis server. Since Redis is single-threaded for command execution, other workers are blocked while one worker scans.
Better approach: Use a sorted set (ZSET) with capability-based bucket keys, or limit the scan range (LRANGE 0 N where N is configurable). This gives predictable O(N) with a bounded N.
6. Hardcoded 64KB Compression Threshold
In DistributedWorkPublisher.GatherDependencyResults:
// payloads over 64 KB compressed with GZipThis threshold is hardcoded. Modules with large dependency results (e.g., returning large file lists) may frequently cross this threshold, while others never will. Expose this as a configuration option on DistributedOptions so operators can tune it.
7. CI Matrix: 5 Runners vs Previous 3
The workflow goes from 3 matrix runners (ubuntu, windows, macos) to 5 (+ 2 extra ubuntu instances 3 and 4). This is a 67% increase in GitHub Actions compute costs.
The net benefit depends on whether the wall-clock time savings from distributed execution exceed the coordination overhead (Redis round-trips, heartbeats, serialization, artifact transfers). For a project that already parallelizes work within each runner using the module system, the marginal gain from cross-machine distribution may be small.
It would be useful to document the expected speedup rationale (e.g., "modules X, Y, Z are pinned to specific OS runners, so they need to run in parallel on different machines") to justify the cost.
8. DisablePayloadSigning = true in S3 Store
S3DistributedArtifactStore sets DisablePayloadSigning = true (needed for R2 compatibility). This means the AWS SDK won't compute a SHA-256 of the request body for signing, which removes one layer of upload integrity verification. HTTPS still protects the transfer, but silent corruption that occurs server-side between reception and storage won't be detected.
Consider using S3Config.ChecksumValidationEnabled or EnableHttpChecksumValidation (available in recent AWSSDK versions) as an alternative that works with R2 while still validating content integrity.
9. Test Project Naming Mismatch
Several integration test files (in Integration/ sub-folders under *.UnitTests projects) spin up full pipeline instances:
DistributedPipelineIntegrationTests.csCapabilityRoutingIntegrationTests.csMatrixExpansionIntegrationTests.cs
These are integration tests by nature but live in unit test projects. This creates confusion about test isolation requirements and may cause slowness in unit test runs. Consider moving them to separate *.IntegrationTests projects, or at minimum applying a [Category("Integration")] trait so they can be filtered.
Minor: Architecture.md Links
The architecture doc references WorkerHealthMonitor and WorkerHeartbeatService as background services, but these files don't appear in the PR diff — either they're missing from the implementation or the doc references stale class names. Worth verifying the class names match the shipped code.
Summary
| Issue | Severity |
|---|---|
IDistributedCoordinator ISP violation |
High — public API surface affects all custom implementations |
RunIdentifierResolver duplication |
Medium — will diverge, hard to maintain |
| Docs/code mismatch | Medium — misleads custom coordinator authors |
| SignalR requires Redis anyway | Medium — architectural clarity |
| Lua script unbounded scan | Low-Medium — performance at scale |
| Hardcoded 64KB threshold | Low |
| CI cost justification | Low (documentation gap) |
| S3 payload signing | Low |
| Integration tests in unit test projects | Low |
The core distributed execution model is sound — the capability routing, artifact lifecycle management, result serialization with portable paths, and the check-subscribe-recheck pattern for WaitForResultAsync are all well-implemented. The main ask is splitting IDistributedCoordinator before it becomes a public contract that's hard to change.
There was a problem hiding this comment.
Code Review: Distributed Execution Mode (#2430)
This is a substantial and well-structured addition. The architecture is clearly thought through, the abstractions are clean, and the test coverage is extensive. Comments below focus on architectural concerns rather than style.
1. Interface Segregation Violation in IDistributedCoordinator
IDistributedCoordinator is a unified interface implemented by both master and worker coordinators. In the SignalR implementation:
SignalRWorkerCoordinator.EnqueueModuleAsyncthrowsNotSupportedExceptionSignalRWorkerCoordinator.WaitForResultAsyncthrowsNotSupportedExceptionSignalRWorkerCoordinator.GetRegisteredWorkersAsyncreturns emptySignalRWorkerCoordinator.SignalCompletionAsyncis a no-op
This violates the Interface Segregation Principle — clients are forced to depend on methods they cannot meaningfully implement. Since IDistributedCoordinator is public and is the primary extension point for custom transports, this will mislead third-party implementors who don't know which methods they can safely throw on.
Better approach: Split into role-specific contracts:
public interface IMasterCoordinator
{
Task EnqueueModuleAsync(ModuleAssignment assignment, CancellationToken ct);
Task<SerializedModuleResult> WaitForResultAsync(string moduleTypeName, CancellationToken ct);
Task<IReadOnlyList<WorkerRegistration>> GetRegisteredWorkersAsync(CancellationToken ct);
Task SignalCompletionAsync(CancellationToken ct);
}
public interface IWorkerCoordinator
{
Task<ModuleAssignment?> DequeueModuleAsync(IReadOnlySet<string> capabilities, CancellationToken ct);
Task PublishResultAsync(SerializedModuleResult result, CancellationToken ct);
Task RegisterWorkerAsync(WorkerRegistration registration, CancellationToken ct);
}2. RunIdentifierResolver Duplicated Across Three Packages
ModularPipelines.Distributed.Artifacts.S3/Configuration/RunIdentifierResolver.cs and ModularPipelines.Distributed.Redis/Configuration/RunIdentifierResolver.cs are byte-for-byte identical (73 lines each), both implementing the same priority chain: explicit config → GITHUB_SHA → BUILD_SOURCEVERSION → CI_COMMIT_SHA → git rev-parse HEAD → GUID fallback.
This shared logic belongs in ModularPipelines.Distributed (the core package). Keeping it duplicated means that adding support for a new CI platform's SHA env var (e.g., GitLab's CI_COMMIT_SHA is there, but Bitbucket's BITBUCKET_COMMIT is missing) requires changes in multiple packages.
3. Architecture Docs Don't Match Implementation
docs/docs/distributed/architecture.md documents these coordinator methods:
BroadcastCancellationAsync()IsCancellationRequestedAsync()
The actual IDistributedCoordinator interface has SignalCompletionAsync() instead. The docs appear to be from an earlier design iteration. This will mislead users trying to implement custom coordinators.
4. SignalR Transport Requires Redis Anyway
The ModularPipelines.Distributed.Discovery.Redis package exists to answer "how do workers find the master's SignalR URL?" — by storing the URL in Redis.
This means the SignalR transport path requires both Redis (for service discovery) and a cloudflared tunnel (to expose the master's ASP.NET Core hub to external workers). The Redis-only path requires only Redis. For most CI use cases, the Redis-only path is simpler and more reliable.
The current docs don't explain what scenarios justify choosing SignalR over pure Redis. If the intent is that SignalR handles push-vs-pull efficiency at scale, that case is worth spelling out explicitly.
5. Lua Script Unbounded Scan on Work Queue
In RedisDistributedCoordinator.TryScanAndClaimAsync, the Lua script does:
LRANGE 0 -1— fetches the entire work queue- Iterates in Lua to find the first capability-matching item
LREMthat item
Redis Lua scripts block all other operations for their duration. On a large work queue with heterogeneous capability requirements, this scan grows linearly with queue depth. Since all workers compete on the same Redis server, one slow scan blocks all other workers.
Better approach: Bound the scan range with a configurable limit (LRANGE 0 N) or use capability-bucketed queues (one list per capability set) to avoid scanning.
6. Hardcoded 64KB Compression Threshold
In DistributedWorkPublisher.GatherDependencyResults, payloads over 64KB are GZip-compressed. This threshold is a hardcoded constant. Modules with large dependency results (e.g., returning large file manifests) may frequently cross this threshold while others never will. Expose it as a property on DistributedOptions so operators can tune it for their workload.
7. CI Cost: 5 Runners vs Previous 3
The workflow goes from 3 matrix runners to 5 (adding 2 extra ubuntu instances). This is a 67% increase in GitHub Actions compute. The net benefit depends on whether wall-clock time savings from distributed execution exceed the coordination overhead (Redis round-trips, heartbeats, serialization, artifact transfers) — particularly for a project that already parallelizes work within each runner.
It would be useful to document the expected speedup rationale to justify the cost (e.g., "modules pinned to specific OS runners can now run concurrently across machines").
8. DisablePayloadSigning = true in S3 Store
S3DistributedArtifactStore sets DisablePayloadSigning = true for R2 compatibility. This removes the AWS SDK's pre-upload SHA-256 body signing, which is one layer of upload integrity verification. HTTPS still protects the transfer, but this is worth a code comment explaining the trade-off, and it's worth evaluating whether newer AWSSDK checksum validation options work with R2 as an alternative.
9. Integration Tests in Unit Test Projects
Several tests that spin up full pipeline instances live in *.UnitTests projects under Integration/ sub-folders:
DistributedPipelineIntegrationTests.csCapabilityRoutingIntegrationTests.csMatrixExpansionIntegrationTests.cs
These are integration tests by nature. Mixing them into unit test projects makes it harder to run fast unit-only feedback loops. Consider moving them to *.IntegrationTests projects or applying a test category trait so they can be filtered separately.
Minor: Architecture.md References Possibly Stale Class Names
The architecture doc mentions WorkerHealthMonitor and WorkerHeartbeatService as background services, but these files don't appear in the PR diff. Either they're referenced by different names in the code or are missing from the implementation. Worth a quick check to ensure the documented class names match the shipped ones.
Summary
| Issue | Severity |
|---|---|
IDistributedCoordinator ISP violation (public API) |
High |
RunIdentifierResolver duplication |
Medium |
| Docs/code mismatch on coordinator methods | Medium |
| SignalR transport requires Redis anyway (clarity) | Medium |
| Lua script unbounded LRANGE scan | Low-Medium |
| Hardcoded 64KB compression threshold | Low |
| CI cost increase without documented justification | Low |
| S3 payload signing trade-off undocumented | Low |
| Integration tests in unit test projects | Low |
The core distributed execution model is sound. The capability routing, artifact lifecycle management with deduplication, result serialization with portable cross-platform paths, and the check-subscribe-recheck pattern in WaitForResultAsync are all well-implemented. The main ask before merging is splitting IDistributedCoordinator — it is the primary public extension point and will be difficult to change once users depend on it.
Summary
IDistributedCoordinatorandIDistributedArtifactStoreabstractions with Redis and S3-compatible (Cloudflare R2, AWS S3, MinIO) backends[ProducesArtifact],[ConsumesArtifact]) with automatic deduplication of downloads to the same pathNew packages
Core abstractions (in ModularPipelines)
IDistributedCoordinator/IDistributedCoordinatorFactory— cross-instance orchestrationIDistributedArtifactStore/IDistributedArtifactStoreFactory— file artifact storageIArtifactContext— module-facing API (context.Artifacts().PublishFileAsync/DownloadAsync)[ProducesArtifact],[ConsumesArtifact],[PinToMaster],[RequiresCapability],[MatrixTarget]attributesBuild pipeline integration
Test plan