Skip to content

Add fluent Pipeline API for Extract -> Transform -> Load (#83)#134

Merged
Chris-Wolfgang merged 12 commits into
mainfrom
feature/83-pipeline-api
Apr 25, 2026
Merged

Add fluent Pipeline API for Extract -> Transform -> Load (#83)#134
Chris-Wolfgang merged 12 commits into
mainfrom
feature/83-pipeline-api

Conversation

@Chris-Wolfgang
Copy link
Copy Markdown
Owner

Summary

Implements the fluent Pipeline API agreed on in #83 (see design-decisions comment).

await Pipeline
    .Extract(extractor).WithProgress(extractProgress)
    .Transform(t1)
    .Transform(t2).WithProgress(t2Progress)
    .Load(loader).WithProgress(loadProgress)
    .WithName("nightly-import")
    .RunAsync(cancellationToken);

What's included

Public API (all in Wolfgang.Etl.Abstractions namespace):

  • Pipeline — static entry with overloaded Extract<T> / Extract<T, TProgress>
  • IExtractStage<T> / IExtractStageWithProgress<T, TProgress>
  • ITransformStage<T> / ITransformStageWithProgress<T, TProgress>
  • IPipeline / IPipelineWithLoadProgress<TProgress> — runnable terminals with WithName, RunAsync(), RunAsync(CancellationToken)

Key properties:

  • Compile-time stage-to-stage type safety (mismatched stages are red squiggles, not runtime throws)
  • .WithProgress(...) only appears on stage types whose underlying extractor/transformer/loader supports progress
  • One-shot execution: a second RunAsync call throws InvalidOperationException
  • Raw exception propagation (no wrapping)
  • Pipeline owns nothing — caller disposes stages

Out of scope (per design)

Tests

19 new tests covering:

  • Null-argument guards on every factory, stage method, and setter
  • Simple E -> L, E -> T -> L, and multi-transformer E -> T -> T -> T -> L composition (with type-changing transformers)
  • WithProgress forwarding on extractor, transformer, and loader
  • WithName default and assignment
  • One-shot enforcement on both pipeline variants
  • Cancellation token forwarded to stages
  • Raw exception propagation
  • Stage state observable after a mid-stream failure

All 151 unit tests pass in Release on net10.0.

Test plan

  • dotnet build -c Release -f net10.0 clean (0 warnings, 0 errors) on src + tests
  • dotnet test -c Release -f net10.0 — 151/151 pass
  • CI matrix builds across all TFMs

Closes #83.

🤖 Generated with Claude Code

Chris-Wolfgang and others added 5 commits April 23, 2026 20:48
Implements Pipeline.Extract(...).Transform(...).Load(...).RunAsync() with
compile-time stage-to-stage type safety, optional per-stage WithProgress,
one-shot execution, and raw exception propagation. Design decisions are
recorded in the issue thread.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
TestKit doubles implement the progress-capable interfaces, so overload
resolution on Pipeline.Extract/.Transform/.Load always bound to the
progress-capable overloads — leaving ExtractStage, TransformStage, and
PipelineImpl at 0-50% coverage and failing the 90% module gate.

Add 9 tests that cast the test doubles to the no-progress interfaces to
force overload resolution onto the previously-untested paths. Module
coverage now 94.9% (was 82%).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Chris-Wolfgang Chris-Wolfgang force-pushed the feature/83-pipeline-api branch from 352913e to f2692cc Compare April 24, 2026 00:48
Chris-Wolfgang and others added 4 commits April 23, 2026 20:52
The Pipeline API previously required every stage to support cancellation.
Expand Pipeline.Extract / IExtractStage.Transform / IExtractStage.Load /
ITransformStage.Transform / ITransformStage.Load to 4 overloads each:

  - Base                    : IExtractAsync<T>               etc.
  - With cancellation        : IExtractWithCancellationAsync  etc.
  - With progress            : IExtractWithProgressAsync      etc.
  - With progress + cancel   : IExtractWithProgressAnd...    etc.

Overload resolution picks the most-specific interface a given class
implements, so existing code is unaffected. No-cancellation stages
receive no CancellationToken; the pipeline's RunAsync token is simply
not forwarded to those stages (documented).

Rewrite the test suite around 12 real test-double classes (4 capability
combos x 3 roles) instead of casting TestKit doubles to bind overloads.
Adds BareTests, CancelOnlyTests, ProgressOnlyTests, FullTests,
MixedCapabilityTests, and PipelineBehaviorTests. 191/191 tests pass in
Release on net10.0; module line coverage 96.4%.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add two focused test files to close the last gaps in the Pipeline
internals:

- NullGuardOnChainedStagesTests: 8 null-argument guards on mid-chain
  ITransformStage.Transform/Load overloads (the PipelineBehaviorTests
  guards only hit the IExtractStage overloads, so the ITransformStage
  throws were previously unreachable in tests).

- NoProgressPathTests: 9 tests that exercise the "progress-capable
  stage appended without WithProgress" code paths in
  ExtractStageWithProgress, TransformStageWithProgress, and the
  no-progress lambdas inside ExtractStage/TransformStage.

All 9 Pipeline classes now at 100% line coverage (208/208 tests pass
in Release on net10.0).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Spells out in the XML <remarks> that RunAsync does not catch, wrap, or
aggregate — exceptions (including OperationCanceledException) propagate
unchanged and the caller should wrap in try/catch. Includes a short
code example.

No behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
S3246 (Sonar): TProgress should be declared 'out' (covariant) on
IPipelineWithLoadProgress, IExtractStageWithProgress, and
ITransformStageWithProgress. TProgress only appears inside
IProgress<T> (contravariant) parameter positions, which makes it
effectively covariant at the outer interface level.

VSTHRD200 (vs-threading): private Source methods returning
IAsyncEnumerable<T> must have the Async suffix. Rename Source ->
SourceAsync in ExtractStageWithProgress and TransformStageWithProgress.

Both errors only fired on net462/net472/net48/net481 (the Stage 2
Windows matrix); Linux Stage 1 (net5-net10) was green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Chris-Wolfgang and others added 3 commits April 24, 2026 21:11
Addresses findings #6, #7, and #11 from the review.

#6 — Use Volatile.Read when reading the Interlocked-incremented
CurrentItemCount and CurrentSkippedItemCount fields on ExtractorBase,
LoaderBase, and TransformerBase. Writers use Interlocked.Increment;
pairing the reads with Volatile.Read closes a theoretical
weak-memory-model read hazard (relevant primarily on older ARM NetFx
targets) and is free on x86/x64.

#7 — Replace the raw InvalidOperationException throws in the
CancelOnly* test doubles with a sentinel WrongOverloadCalledException.
A future Pipeline refactor that accidentally routes through a bare
overload will now produce a unique, named exception in test output
instead of a generic "Operation is not valid" message.

#11 — Assert Assert.Empty(loader.Loaded) after the pre-cancelled-token
cancellation tests on CancelOnlyTests and FullTests. Previously the
tests only verified that SOME OperationCanceledException was thrown;
a regression where cancellation fires mid-stream after one or two
items would have still passed.

208/208 tests pass in Release on net10.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Chris-Wolfgang Chris-Wolfgang merged commit d05b998 into main Apr 25, 2026
9 checks passed
@Chris-Wolfgang Chris-Wolfgang deleted the feature/83-pipeline-api branch April 25, 2026 22:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add fluent Pipeline API for composing Extract → Transform → Load

1 participant