diff --git a/src/Wolfgang.Etl.TestKit/FaultyExtractor.cs b/src/Wolfgang.Etl.TestKit/FaultyExtractor.cs
new file mode 100644
index 0000000..cce016f
--- /dev/null
+++ b/src/Wolfgang.Etl.TestKit/FaultyExtractor.cs
@@ -0,0 +1,291 @@
+using System;
+using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Wolfgang.Etl.Abstractions;
+
+namespace Wolfgang.Etl.TestKit;
+
+///
+/// An in-memory extractor for testing error-handling paths. It yields items from an
+/// like a normal extractor, but can be configured to inject
+/// deterministic faults — throwing at a given item, throwing after completion, or
+/// duplicating an item — so consumers can exercise mid-stream failure, finalization
+/// failure, and de-duplication logic without hand-rolling broken fakes.
+///
+/// The type of item to extract.
+///
+///
+/// Faults are configured through the fluent ,
+/// , and methods, each of
+/// which returns the same instance so calls can be chained. Multiple faults stack on a
+/// single instance — for example ThrowAt(50, ex) and DuplicateAt(10) both
+/// take effect in the same run.
+///
+///
+/// Fault indices are zero-based and refer to the position in the emitted (post-skip)
+/// sequence. A configured fault fires after
+/// for that item,
+/// so a progress report reflects the item that caused the failure. When a
+/// and a are configured for the same index,
+/// the throw takes precedence and the duplicate is not emitted. Calling
+/// twice for the same index replaces the earlier exception (last-wins).
+///
+///
+///
+///
+/// var items = new List<int> { 1, 2, 3, 4, 5 };
+/// var extractor = new FaultyExtractor<int>(items)
+/// .ThrowAt(index: 3, new System.IO.IOException("disk read failure"))
+/// .DuplicateAt(index: 1);
+///
+/// // Enumerates 1, 2, 2 (duplicate), 3, then throws IOException reaching index 3.
+/// await foreach (var item in extractor.ExtractAsync()) { /* ... */ }
+///
+///
+public class FaultyExtractor : ExtractorBase
+ where T : notnull
+{
+ // ------------------------------------------------------------------
+ // Fields
+ // ------------------------------------------------------------------
+
+ private readonly IEnumerable _items;
+ private readonly Dictionary _throwAt = new Dictionary();
+ private readonly HashSet _duplicateAt = new HashSet();
+ private Exception? _throwAfterCompletion;
+ private readonly IProgressTimer? _progressTimer;
+ private bool _progressTimerWired;
+
+
+
+ // ------------------------------------------------------------------
+ // Constructors
+ // ------------------------------------------------------------------
+
+ ///
+ /// Initializes a new that yields items from the
+ /// specified .
+ ///
+ ///
+ /// The sequence of items to extract. The enumerable is evaluated on each extraction
+ /// run, so the same extractor instance can be reused.
+ ///
+ ///
+ /// is .
+ ///
+ public FaultyExtractor(IEnumerable items)
+ {
+ _items = items ?? throw new ArgumentNullException(nameof(items));
+ }
+
+
+
+ ///
+ /// Initializes a new that yields items from the
+ /// specified and uses the supplied
+ /// to drive progress callbacks.
+ ///
+ /// The sequence of items to extract.
+ ///
+ /// The timer used to drive progress callbacks. Inject a
+ /// ManualProgressTimer in tests to fire callbacks on demand.
+ ///
+ ///
+ /// or is .
+ ///
+ protected FaultyExtractor(IEnumerable items, IProgressTimer timer)
+ {
+ _items = items ?? throw new ArgumentNullException(nameof(items));
+ _progressTimer = timer ?? throw new ArgumentNullException(nameof(timer));
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // Fluent fault configuration
+ // ------------------------------------------------------------------
+
+ ///
+ /// Configures the extractor to throw when it reaches the
+ /// item at the specified zero-based in the emitted sequence.
+ /// The failing item is counted (its
+ /// runs) before
+ /// the exception is thrown, so progress reflects the item that caused the failure, but
+ /// the item itself is not yielded.
+ ///
+ /// The zero-based index of the item to fail on.
+ /// The exception to throw.
+ /// The same instance, to allow chaining.
+ ///
+ /// is negative.
+ ///
+ ///
+ /// is .
+ ///
+ ///
+ ///
+ /// var extractor = new FaultyExtractor<int>(items)
+ /// .ThrowAt(50, new System.IO.IOException("disk read failure"));
+ ///
+ ///
+ public FaultyExtractor ThrowAt(int index, Exception exception)
+ {
+ if (index < 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(index));
+ }
+
+ _throwAt[index] = exception ?? throw new ArgumentNullException(nameof(exception));
+
+ return this;
+ }
+
+
+
+ ///
+ /// Configures the extractor to throw after all items have
+ /// been yielded successfully, simulating a cleanup or finalization failure.
+ ///
+ /// The exception to throw after completion.
+ /// The same instance, to allow chaining.
+ ///
+ /// is .
+ ///
+ ///
+ ///
+ /// var extractor = new FaultyExtractor<int>(items)
+ /// .ThrowAfterCompletion(new System.InvalidOperationException("finalize failed"));
+ ///
+ ///
+ public FaultyExtractor ThrowAfterCompletion(Exception exception)
+ {
+ _throwAfterCompletion = exception ?? throw new ArgumentNullException(nameof(exception));
+
+ return this;
+ }
+
+
+
+ ///
+ /// Configures the extractor to yield the item at the specified zero-based
+ /// twice. The duplicate is a real second emission and is
+ /// counted, so the total number of yielded items grows by one per configured duplicate.
+ ///
+ /// The zero-based index of the item to duplicate.
+ /// The same instance, to allow chaining.
+ ///
+ /// is negative.
+ ///
+ ///
+ ///
+ /// var extractor = new FaultyExtractor<int>(items)
+ /// .DuplicateAt(10);
+ ///
+ ///
+ public FaultyExtractor DuplicateAt(int index)
+ {
+ if (index < 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(index));
+ }
+
+ _duplicateAt.Add(index);
+
+ return this;
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // ExtractorBase overrides
+ // ------------------------------------------------------------------
+
+ ///
+ protected override IProgressTimer CreateProgressTimer(IProgress progress)
+ {
+ if (_progressTimer is null)
+ {
+ return base.CreateProgressTimer(progress);
+ }
+
+ if (!_progressTimerWired)
+ {
+ _progressTimerWired = true;
+ _progressTimer.Elapsed += () => progress.Report(CreateProgressReport());
+ }
+
+ return _progressTimer;
+ }
+
+
+
+ ///
+ protected override Report CreateProgressReport() => new(CurrentItemCount);
+
+
+
+ ///
+ protected override async IAsyncEnumerable ExtractWorkerAsync
+ (
+ [EnumeratorCancellation] CancellationToken token
+ )
+ {
+ token.ThrowIfCancellationRequested();
+
+ var enumerator = _items.GetEnumerator();
+
+ try
+ {
+ var index = 0;
+
+ while (enumerator.MoveNext())
+ {
+ token.ThrowIfCancellationRequested();
+
+ if (CurrentSkippedItemCount < SkipItemCount)
+ {
+ IncrementCurrentSkippedItemCount();
+ continue;
+ }
+
+ if (CurrentItemCount >= MaximumItemCount)
+ {
+ yield break;
+ }
+
+ var item = enumerator.Current;
+
+ IncrementCurrentItemCount();
+
+ if (_throwAt.TryGetValue(index, out var exception))
+ {
+ throw exception;
+ }
+
+ yield return item;
+
+ if (_duplicateAt.Contains(index) && CurrentItemCount < MaximumItemCount)
+ {
+ IncrementCurrentItemCount();
+ yield return item;
+ }
+
+ index++;
+ }
+ }
+ finally
+ {
+ _progressTimer?.StopTimer();
+ enumerator.Dispose();
+ }
+
+ if (_throwAfterCompletion is not null)
+ {
+ throw _throwAfterCompletion;
+ }
+
+ await Task.Yield(); // satisfies async method contract without causing extra allocations
+ }
+}
diff --git a/src/Wolfgang.Etl.TestKit/FaultyLoader.cs b/src/Wolfgang.Etl.TestKit/FaultyLoader.cs
new file mode 100644
index 0000000..399e423
--- /dev/null
+++ b/src/Wolfgang.Etl.TestKit/FaultyLoader.cs
@@ -0,0 +1,313 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Wolfgang.Etl.Abstractions;
+
+namespace Wolfgang.Etl.TestKit;
+
+///
+/// An in-memory loader for testing error-handling paths. It consumes the source stream
+/// like a normal loader, but can be configured to inject deterministic faults — throwing
+/// at a given item, throwing after completion, or duplicating an item — so consumers can
+/// exercise mid-stream failure, finalization failure, and idempotency logic without
+/// hand-rolling broken fakes.
+///
+/// The type of item to load.
+///
+///
+/// When constructed with collectItems: true, every loaded item — including any
+/// duplicates — is accumulated and exposed via .
+///
+///
+/// Faults are configured through the fluent ,
+/// , and methods, each of which
+/// returns the same instance so calls can be chained. Multiple faults stack on a single
+/// instance.
+///
+///
+/// Fault indices are zero-based and refer to the position in the loaded (post-skip)
+/// sequence. A configured fault fires after
+/// for that item,
+/// so a progress report reflects the item that caused the failure. When a
+/// and a are configured for the same index,
+/// the throw takes precedence and the duplicate is not loaded. Calling
+/// twice for the same index replaces the earlier exception (last-wins).
+///
+///
+///
+///
+/// var loader = new FaultyLoader<int>(collectItems: true)
+/// .ThrowAt(index: 25, new System.TimeoutException("connection lost"));
+///
+/// await loader.LoadAsync(extractor.ExtractAsync()); // throws reaching index 25
+///
+///
+public class FaultyLoader : LoaderBase
+ where T : notnull
+{
+ // ------------------------------------------------------------------
+ // Fields
+ // ------------------------------------------------------------------
+
+ private readonly bool _collectItems;
+ private readonly List _buffer = new List();
+ private readonly Dictionary _throwAt = new Dictionary();
+ private readonly HashSet _duplicateAt = new HashSet();
+ private Exception? _throwAfterCompletion;
+ private readonly IProgressTimer? _progressTimer;
+ private bool _progressTimerWired;
+
+
+
+ // ------------------------------------------------------------------
+ // Constructors
+ // ------------------------------------------------------------------
+
+ ///
+ /// Initializes a new .
+ ///
+ ///
+ /// When , loaded items (including duplicates) are accumulated in
+ /// an internal buffer during each load operation and made available via
+ /// . When , items are consumed but
+ /// not stored — returns .
+ ///
+ public FaultyLoader(bool collectItems)
+ {
+ _collectItems = collectItems;
+ }
+
+
+
+ ///
+ /// Initializes a new with the supplied
+ /// to drive progress callbacks.
+ ///
+ ///
+ /// When , loaded items are accumulated and accessible via
+ /// .
+ ///
+ ///
+ /// The timer used to drive progress callbacks. Inject a
+ /// ManualProgressTimer in tests to fire callbacks on demand.
+ ///
+ ///
+ /// is .
+ ///
+ protected FaultyLoader(bool collectItems, IProgressTimer timer)
+ {
+ _collectItems = collectItems;
+ _progressTimer = timer ?? throw new ArgumentNullException(nameof(timer));
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // Public API
+ // ------------------------------------------------------------------
+
+ ///
+ /// Returns a snapshot of the items loaded so far, or if the
+ /// loader was constructed with collectItems: false.
+ ///
+ ///
+ /// A containing a point-in-time copy of the loaded items
+ /// (including any injected duplicates), or when collection is
+ /// disabled.
+ ///
+ public IReadOnlyList? GetCollectedItems() =>
+ _collectItems
+ ? _buffer.ToArray()
+ : null;
+
+
+
+ // ------------------------------------------------------------------
+ // Fluent fault configuration
+ // ------------------------------------------------------------------
+
+ ///
+ /// Configures the loader to throw when it reaches the item
+ /// at the specified zero-based in the loaded sequence. The
+ /// failing item is counted (its
+ /// runs) before
+ /// the exception is thrown, so progress reflects the item that caused the failure, but
+ /// the item itself is not stored.
+ ///
+ /// The zero-based index of the item to fail on.
+ /// The exception to throw.
+ /// The same instance, to allow chaining.
+ ///
+ /// is negative.
+ ///
+ ///
+ /// is .
+ ///
+ ///
+ ///
+ /// var loader = new FaultyLoader<int>(collectItems: true)
+ /// .ThrowAt(25, new System.TimeoutException("connection lost"));
+ ///
+ ///
+ public FaultyLoader ThrowAt(int index, Exception exception)
+ {
+ if (index < 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(index));
+ }
+
+ _throwAt[index] = exception ?? throw new ArgumentNullException(nameof(exception));
+
+ return this;
+ }
+
+
+
+ ///
+ /// Configures the loader to throw after all items have been
+ /// loaded successfully, simulating a cleanup or finalization failure.
+ ///
+ /// The exception to throw after completion.
+ /// The same instance, to allow chaining.
+ ///
+ /// is .
+ ///
+ ///
+ ///
+ /// var loader = new FaultyLoader<int>(collectItems: false)
+ /// .ThrowAfterCompletion(new System.InvalidOperationException("commit failed"));
+ ///
+ ///
+ public FaultyLoader ThrowAfterCompletion(Exception exception)
+ {
+ _throwAfterCompletion = exception ?? throw new ArgumentNullException(nameof(exception));
+
+ return this;
+ }
+
+
+
+ ///
+ /// Configures the loader to load the item at the specified zero-based
+ /// twice. The duplicate is a real second load and is counted,
+ /// so the total number of loaded items grows by one per configured duplicate.
+ ///
+ /// The zero-based index of the item to duplicate.
+ /// The same instance, to allow chaining.
+ ///
+ /// is negative.
+ ///
+ ///
+ ///
+ /// var loader = new FaultyLoader<int>(collectItems: true)
+ /// .DuplicateAt(10);
+ ///
+ ///
+ public FaultyLoader DuplicateAt(int index)
+ {
+ if (index < 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(index));
+ }
+
+ _duplicateAt.Add(index);
+
+ return this;
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // LoaderBase overrides
+ // ------------------------------------------------------------------
+
+ ///
+ protected override IProgressTimer CreateProgressTimer(IProgress progress)
+ {
+ if (_progressTimer is null)
+ {
+ return base.CreateProgressTimer(progress);
+ }
+
+ if (!_progressTimerWired)
+ {
+ _progressTimerWired = true;
+ _progressTimer.Elapsed += () => progress.Report(CreateProgressReport());
+ }
+
+ return _progressTimer;
+ }
+
+
+
+ ///
+ protected override Report CreateProgressReport() =>
+ new Report(CurrentItemCount);
+
+
+
+ ///
+ protected override async Task LoadWorkerAsync(
+ IAsyncEnumerable items,
+ CancellationToken token)
+ {
+ token.ThrowIfCancellationRequested();
+
+ _buffer.Clear();
+
+ var index = 0;
+
+ try
+ {
+ await foreach (var item in items.WithCancellation(token).ConfigureAwait(false))
+ {
+ token.ThrowIfCancellationRequested();
+
+ if (CurrentSkippedItemCount < SkipItemCount)
+ {
+ IncrementCurrentSkippedItemCount();
+ continue;
+ }
+
+ if (CurrentItemCount >= MaximumItemCount)
+ {
+ break;
+ }
+
+ IncrementCurrentItemCount();
+
+ if (_throwAt.TryGetValue(index, out var exception))
+ {
+ throw exception;
+ }
+
+ if (_collectItems)
+ {
+ _buffer.Add(item);
+ }
+
+ if (_duplicateAt.Contains(index) && CurrentItemCount < MaximumItemCount)
+ {
+ IncrementCurrentItemCount();
+
+ if (_collectItems)
+ {
+ _buffer.Add(item);
+ }
+ }
+
+ index++;
+ }
+ }
+ finally
+ {
+ _progressTimer?.StopTimer();
+ }
+
+ if (_throwAfterCompletion is not null)
+ {
+ throw _throwAfterCompletion;
+ }
+ }
+}
diff --git a/src/Wolfgang.Etl.TestKit/FaultyTransformer.cs b/src/Wolfgang.Etl.TestKit/FaultyTransformer.cs
new file mode 100644
index 0000000..8de82c1
--- /dev/null
+++ b/src/Wolfgang.Etl.TestKit/FaultyTransformer.cs
@@ -0,0 +1,267 @@
+using System;
+using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Wolfgang.Etl.Abstractions;
+
+namespace Wolfgang.Etl.TestKit;
+
+///
+/// A pass-through transformer for testing error-handling paths. It returns each item
+/// unchanged like a normal transformer, but can be configured to inject deterministic
+/// faults — throwing at a given item, throwing after completion, or duplicating an item —
+/// so consumers can exercise mid-stream failure, finalization failure, and de-duplication
+/// logic without hand-rolling broken fakes.
+///
+/// The type of item to transform.
+///
+///
+/// Faults are configured through the fluent ,
+/// , and methods, each of which
+/// returns the same instance so calls can be chained. Multiple faults stack on a single
+/// instance.
+///
+///
+/// Fault indices are zero-based and refer to the position in the emitted (post-skip)
+/// sequence. A configured fault fires after
+///
+/// for that item, so a progress report reflects the item that caused the failure. When a
+/// and a are configured for the same index,
+/// the throw takes precedence and the duplicate is not emitted. Calling
+/// twice for the same index replaces the earlier exception (last-wins).
+///
+///
+///
+///
+/// var transformer = new FaultyTransformer<int>()
+/// .ThrowAt(index: 50, new System.InvalidOperationException("bad record"))
+/// .DuplicateAt(index: 10);
+///
+/// await loader.LoadAsync(transformer.TransformAsync(extractor.ExtractAsync()));
+///
+///
+public class FaultyTransformer : TransformerBase
+ where T : notnull
+{
+ // ------------------------------------------------------------------
+ // Fields
+ // ------------------------------------------------------------------
+
+ private readonly Dictionary _throwAt = new Dictionary();
+ private readonly HashSet _duplicateAt = new HashSet();
+ private Exception? _throwAfterCompletion;
+ private readonly IProgressTimer? _progressTimer;
+ private bool _progressTimerWired;
+
+
+
+ // ------------------------------------------------------------------
+ // Constructors
+ // ------------------------------------------------------------------
+
+ ///
+ /// Initializes a new using the default production
+ /// timer.
+ ///
+ public FaultyTransformer() { }
+
+
+
+ ///
+ /// Initializes a new with the supplied
+ /// to drive progress callbacks.
+ ///
+ ///
+ /// The timer used to drive progress callbacks. Inject a
+ /// ManualProgressTimer in tests to fire callbacks on demand.
+ ///
+ ///
+ /// is .
+ ///
+ protected FaultyTransformer(IProgressTimer timer)
+ {
+ _progressTimer = timer ?? throw new ArgumentNullException(nameof(timer));
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // Fluent fault configuration
+ // ------------------------------------------------------------------
+
+ ///
+ /// Configures the transformer to throw when it reaches the
+ /// item at the specified zero-based in the emitted sequence.
+ /// The failing item is counted (its
+ ///
+ /// runs) before the exception is thrown, so progress reflects the item that caused the
+ /// failure, but the item itself is not emitted.
+ ///
+ /// The zero-based index of the item to fail on.
+ /// The exception to throw.
+ /// The same instance, to allow chaining.
+ ///
+ /// is negative.
+ ///
+ ///
+ /// is .
+ ///
+ ///
+ ///
+ /// var transformer = new FaultyTransformer<int>()
+ /// .ThrowAt(50, new System.InvalidOperationException("bad record"));
+ ///
+ ///
+ public FaultyTransformer ThrowAt(int index, Exception exception)
+ {
+ if (index < 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(index));
+ }
+
+ _throwAt[index] = exception ?? throw new ArgumentNullException(nameof(exception));
+
+ return this;
+ }
+
+
+
+ ///
+ /// Configures the transformer to throw after all items have
+ /// been emitted successfully, simulating a cleanup or finalization failure.
+ ///
+ /// The exception to throw after completion.
+ /// The same instance, to allow chaining.
+ ///
+ /// is .
+ ///
+ ///
+ ///
+ /// var transformer = new FaultyTransformer<int>()
+ /// .ThrowAfterCompletion(new System.InvalidOperationException("flush failed"));
+ ///
+ ///
+ public FaultyTransformer ThrowAfterCompletion(Exception exception)
+ {
+ _throwAfterCompletion = exception ?? throw new ArgumentNullException(nameof(exception));
+
+ return this;
+ }
+
+
+
+ ///
+ /// Configures the transformer to emit the item at the specified zero-based
+ /// twice. The duplicate is a real second emission and is
+ /// counted, so the total number of emitted items grows by one per configured duplicate.
+ ///
+ /// The zero-based index of the item to duplicate.
+ /// The same instance, to allow chaining.
+ ///
+ /// is negative.
+ ///
+ ///
+ ///
+ /// var transformer = new FaultyTransformer<int>()
+ /// .DuplicateAt(10);
+ ///
+ ///
+ public FaultyTransformer DuplicateAt(int index)
+ {
+ if (index < 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(index));
+ }
+
+ _duplicateAt.Add(index);
+
+ return this;
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // TransformerBase overrides
+ // ------------------------------------------------------------------
+
+ ///
+ protected override IProgressTimer CreateProgressTimer(IProgress progress)
+ {
+ if (_progressTimer is null)
+ {
+ return base.CreateProgressTimer(progress);
+ }
+
+ if (!_progressTimerWired)
+ {
+ _progressTimerWired = true;
+ _progressTimer.Elapsed += () => progress.Report(CreateProgressReport());
+ }
+
+ return _progressTimer;
+ }
+
+
+
+ ///
+ protected override Report CreateProgressReport() =>
+ new Report(CurrentItemCount);
+
+
+
+ ///
+ protected override async IAsyncEnumerable TransformWorkerAsync(
+ IAsyncEnumerable items,
+ [EnumeratorCancellation] CancellationToken token)
+ {
+ token.ThrowIfCancellationRequested();
+
+ var index = 0;
+
+ try
+ {
+ await foreach (var item in items.WithCancellation(token).ConfigureAwait(false))
+ {
+ token.ThrowIfCancellationRequested();
+
+ if (CurrentSkippedItemCount < SkipItemCount)
+ {
+ IncrementCurrentSkippedItemCount();
+ continue;
+ }
+
+ if (CurrentItemCount >= MaximumItemCount)
+ {
+ yield break;
+ }
+
+ IncrementCurrentItemCount();
+
+ if (_throwAt.TryGetValue(index, out var exception))
+ {
+ throw exception;
+ }
+
+ yield return item;
+
+ if (_duplicateAt.Contains(index) && CurrentItemCount < MaximumItemCount)
+ {
+ IncrementCurrentItemCount();
+ yield return item;
+ }
+
+ index++;
+ }
+ }
+ finally
+ {
+ _progressTimer?.StopTimer();
+ }
+
+ if (_throwAfterCompletion is not null)
+ {
+ throw _throwAfterCompletion;
+ }
+ }
+}
diff --git a/src/Wolfgang.Etl.TestKit/PublicAPI.Unshipped.txt b/src/Wolfgang.Etl.TestKit/PublicAPI.Unshipped.txt
index 968eaf4..2eceee7 100644
--- a/src/Wolfgang.Etl.TestKit/PublicAPI.Unshipped.txt
+++ b/src/Wolfgang.Etl.TestKit/PublicAPI.Unshipped.txt
@@ -1,4 +1,23 @@
#nullable enable
+Wolfgang.Etl.TestKit.FaultyExtractor
+Wolfgang.Etl.TestKit.FaultyExtractor.DuplicateAt(int index) -> Wolfgang.Etl.TestKit.FaultyExtractor!
+Wolfgang.Etl.TestKit.FaultyExtractor.FaultyExtractor(System.Collections.Generic.IEnumerable! items) -> void
+Wolfgang.Etl.TestKit.FaultyExtractor.FaultyExtractor(System.Collections.Generic.IEnumerable! items, Wolfgang.Etl.Abstractions.IProgressTimer! timer) -> void
+Wolfgang.Etl.TestKit.FaultyExtractor.ThrowAfterCompletion(System.Exception! exception) -> Wolfgang.Etl.TestKit.FaultyExtractor!
+Wolfgang.Etl.TestKit.FaultyExtractor.ThrowAt(int index, System.Exception! exception) -> Wolfgang.Etl.TestKit.FaultyExtractor!
+Wolfgang.Etl.TestKit.FaultyLoader
+Wolfgang.Etl.TestKit.FaultyLoader.DuplicateAt(int index) -> Wolfgang.Etl.TestKit.FaultyLoader!
+Wolfgang.Etl.TestKit.FaultyLoader.FaultyLoader(bool collectItems) -> void
+Wolfgang.Etl.TestKit.FaultyLoader.FaultyLoader(bool collectItems, Wolfgang.Etl.Abstractions.IProgressTimer! timer) -> void
+Wolfgang.Etl.TestKit.FaultyLoader.GetCollectedItems() -> System.Collections.Generic.IReadOnlyList?
+Wolfgang.Etl.TestKit.FaultyLoader.ThrowAfterCompletion(System.Exception! exception) -> Wolfgang.Etl.TestKit.FaultyLoader!
+Wolfgang.Etl.TestKit.FaultyLoader.ThrowAt(int index, System.Exception! exception) -> Wolfgang.Etl.TestKit.FaultyLoader!
+Wolfgang.Etl.TestKit.FaultyTransformer
+Wolfgang.Etl.TestKit.FaultyTransformer.DuplicateAt(int index) -> Wolfgang.Etl.TestKit.FaultyTransformer!
+Wolfgang.Etl.TestKit.FaultyTransformer.FaultyTransformer() -> void
+Wolfgang.Etl.TestKit.FaultyTransformer.FaultyTransformer(Wolfgang.Etl.Abstractions.IProgressTimer! timer) -> void
+Wolfgang.Etl.TestKit.FaultyTransformer.ThrowAfterCompletion(System.Exception! exception) -> Wolfgang.Etl.TestKit.FaultyTransformer!
+Wolfgang.Etl.TestKit.FaultyTransformer.ThrowAt(int index, System.Exception! exception) -> Wolfgang.Etl.TestKit.FaultyTransformer!
Wolfgang.Etl.TestKit.TestExtractor.TestExtractor(System.Func factory) -> void
Wolfgang.Etl.TestKit.TestExtractor.TestExtractor(System.Func factory, int count) -> void
Wolfgang.Etl.TestKit.TestExtractor.TestExtractor(System.Func factory, int count, Wolfgang.Etl.Abstractions.IProgressTimer timer) -> void
@@ -7,3 +26,12 @@ Wolfgang.Etl.TestKit.TestExtractor.TestExtractor(System.Func factory) -> v
Wolfgang.Etl.TestKit.TestExtractor.TestExtractor(System.Func factory, int count) -> void
Wolfgang.Etl.TestKit.TestExtractor.TestExtractor(System.Func factory, int count, Wolfgang.Etl.Abstractions.IProgressTimer timer) -> void
Wolfgang.Etl.TestKit.TestExtractor.TestExtractor(System.Func factory, Wolfgang.Etl.Abstractions.IProgressTimer timer) -> void
+override Wolfgang.Etl.TestKit.FaultyExtractor.CreateProgressReport() -> Wolfgang.Etl.Abstractions.Report!
+override Wolfgang.Etl.TestKit.FaultyExtractor.CreateProgressTimer(System.IProgress! progress) -> Wolfgang.Etl.Abstractions.IProgressTimer!
+override Wolfgang.Etl.TestKit.FaultyExtractor.ExtractWorkerAsync(System.Threading.CancellationToken token) -> System.Collections.Generic.IAsyncEnumerable!
+override Wolfgang.Etl.TestKit.FaultyLoader.CreateProgressReport() -> Wolfgang.Etl.Abstractions.Report!
+override Wolfgang.Etl.TestKit.FaultyLoader.CreateProgressTimer(System.IProgress! progress) -> Wolfgang.Etl.Abstractions.IProgressTimer!
+override Wolfgang.Etl.TestKit.FaultyLoader.LoadWorkerAsync(System.Collections.Generic.IAsyncEnumerable! items, System.Threading.CancellationToken token) -> System.Threading.Tasks.Task!
+override Wolfgang.Etl.TestKit.FaultyTransformer.CreateProgressReport() -> Wolfgang.Etl.Abstractions.Report!
+override Wolfgang.Etl.TestKit.FaultyTransformer.CreateProgressTimer(System.IProgress! progress) -> Wolfgang.Etl.Abstractions.IProgressTimer!
+override Wolfgang.Etl.TestKit.FaultyTransformer.TransformWorkerAsync(System.Collections.Generic.IAsyncEnumerable! items, System.Threading.CancellationToken token) -> System.Collections.Generic.IAsyncEnumerable!
diff --git a/tests/Wolfgang.Etl.TestKit.Tests.Unit/FaultyExtractorTests.cs b/tests/Wolfgang.Etl.TestKit.Tests.Unit/FaultyExtractorTests.cs
new file mode 100644
index 0000000..9d40710
--- /dev/null
+++ b/tests/Wolfgang.Etl.TestKit.Tests.Unit/FaultyExtractorTests.cs
@@ -0,0 +1,321 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Wolfgang.Etl.Abstractions;
+using Wolfgang.Etl.TestKit.Xunit;
+using Xunit;
+
+namespace Wolfgang.Etl.TestKit.Tests.Unit;
+
+public class FaultyExtractorTests
+{
+ // ------------------------------------------------------------------
+ // Constructor — argument validation
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public void Constructor_when_items_is_null_throws_ArgumentNullException()
+ {
+ Assert.Throws
+ (
+ () => new FaultyExtractor(null!)
+ );
+ }
+
+
+
+ [Fact]
+ public void Constructor_with_timer_when_timer_is_null_throws_ArgumentNullException()
+ {
+ Assert.Throws
+ (
+ () => new FaultyExtractorWithTimer(new List { 1 }, null!)
+ );
+ }
+
+
+
+ [Fact]
+ public async Task Constructor_with_timer_creates_extractor_that_yields_items()
+ {
+ using var timer = new ManualProgressTimer();
+ var sut = new FaultyExtractorWithTimer(new List { 1, 2, 3 }, timer);
+
+ var results = await sut.ExtractAsync().ToListAsync();
+
+ Assert.Equal(new[] { 1, 2, 3 }, results);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // No faults — pass-through
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task ExtractAsync_when_no_faults_configured_yields_all_items_in_order()
+ {
+ var sut = new FaultyExtractor(new List { 1, 2, 3 });
+
+ var results = await sut.ExtractAsync().ToListAsync();
+
+ Assert.Equal
+ (
+ new[] { 1, 2, 3 },
+ results
+ );
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // ThrowAt
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task ExtractAsync_when_ThrowAt_configured_throws_that_exception_reaching_the_index()
+ {
+ var expected = new InvalidOperationException("boom");
+ var sut = new FaultyExtractor(new[] { 10, 20, 30, 40, 50 })
+ .ThrowAt(2, expected);
+
+ var collected = new List();
+
+ var actual = await Assert.ThrowsAsync
+ (
+ async () =>
+ {
+ await foreach (var item in sut.ExtractAsync())
+ {
+ collected.Add(item);
+ }
+ }
+ );
+
+ Assert.Same(expected, actual);
+ Assert.Equal(new[] { 10, 20 }, collected);
+ }
+
+
+
+ [Fact]
+ public async Task ExtractAsync_when_ThrowAt_configured_counts_the_failing_item()
+ {
+ var sut = new FaultyExtractor(new[] { 10, 20, 30, 40, 50 })
+ .ThrowAt(2, new InvalidOperationException("boom"));
+
+ await Assert.ThrowsAsync
+ (
+ async () => await sut.ExtractAsync().ToListAsync()
+ );
+
+ // Index 2 is the third item; the failing item is counted, so 2 + 1 == 3.
+ Assert.Equal(3, sut.CurrentItemCount);
+ }
+
+
+
+ [Fact]
+ public async Task ExtractAsync_when_ThrowAt_called_twice_for_same_index_last_exception_wins()
+ {
+ var first = new InvalidOperationException("first");
+ var second = new TimeoutException("second");
+ var sut = new FaultyExtractor(new[] { 1, 2, 3 })
+ .ThrowAt(1, first)
+ .ThrowAt(1, second);
+
+ var actual = await Assert.ThrowsAsync
+ (
+ async () => await sut.ExtractAsync().ToListAsync()
+ );
+
+ Assert.Same(second, actual);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // ThrowAfterCompletion
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task ExtractAsync_when_ThrowAfterCompletion_configured_yields_all_items_then_throws()
+ {
+ var expected = new InvalidOperationException("finalize failed");
+ var sut = new FaultyExtractor(new[] { 1, 2, 3 })
+ .ThrowAfterCompletion(expected);
+
+ var collected = new List();
+
+ var actual = await Assert.ThrowsAsync
+ (
+ async () =>
+ {
+ await foreach (var item in sut.ExtractAsync())
+ {
+ collected.Add(item);
+ }
+ }
+ );
+
+ Assert.Same(expected, actual);
+ Assert.Equal(new[] { 1, 2, 3 }, collected);
+ Assert.Equal(3, sut.CurrentItemCount);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // DuplicateAt
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task ExtractAsync_when_DuplicateAt_configured_yields_that_item_twice_consecutively()
+ {
+ var sut = new FaultyExtractor(new[] { 1, 2, 3 })
+ .DuplicateAt(1);
+
+ var results = await sut.ExtractAsync().ToListAsync();
+
+ Assert.Equal(new[] { 1, 2, 2, 3 }, results);
+ Assert.Equal(4, sut.CurrentItemCount);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // Composability
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task ExtractAsync_when_DuplicateAt_and_later_ThrowAt_configured_duplicate_emitted_then_throw_fires()
+ {
+ var expected = new InvalidOperationException("boom");
+ var sut = new FaultyExtractor(new[] { 1, 2, 3, 4, 5 })
+ .DuplicateAt(1)
+ .ThrowAt(3, expected);
+
+ var collected = new List();
+
+ var actual = await Assert.ThrowsAsync
+ (
+ async () =>
+ {
+ await foreach (var item in sut.ExtractAsync())
+ {
+ collected.Add(item);
+ }
+ }
+ );
+
+ Assert.Same(expected, actual);
+ Assert.Equal(new[] { 1, 2, 2, 3 }, collected);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // Argument validation
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public void ThrowAt_when_index_is_negative_throws_ArgumentOutOfRangeException()
+ {
+ var sut = new FaultyExtractor(new[] { 1 });
+
+ var ex = Assert.Throws
+ (
+ () => sut.ThrowAt(-1, new InvalidOperationException())
+ );
+
+ Assert.Equal("index", ex.ParamName);
+ }
+
+
+
+ [Fact]
+ public void ThrowAt_when_exception_is_null_throws_ArgumentNullException()
+ {
+ var sut = new FaultyExtractor(new[] { 1 });
+
+ var ex = Assert.Throws
+ (
+ () => sut.ThrowAt(0, null!)
+ );
+
+ Assert.Equal("exception", ex.ParamName);
+ }
+
+
+
+ [Fact]
+ public void ThrowAfterCompletion_when_exception_is_null_throws_ArgumentNullException()
+ {
+ var sut = new FaultyExtractor(new[] { 1 });
+
+ var ex = Assert.Throws
+ (
+ () => sut.ThrowAfterCompletion(null!)
+ );
+
+ Assert.Equal("exception", ex.ParamName);
+ }
+
+
+
+ [Fact]
+ public void DuplicateAt_when_index_is_negative_throws_ArgumentOutOfRangeException()
+ {
+ var sut = new FaultyExtractor(new[] { 1 });
+
+ var ex = Assert.Throws
+ (
+ () => sut.DuplicateAt(-1)
+ );
+
+ Assert.Equal("index", ex.ParamName);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // Fluent chaining
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public void ThrowAt_returns_same_instance()
+ {
+ var sut = new FaultyExtractor(new[] { 1 });
+
+ Assert.Same(sut, sut.ThrowAt(0, new InvalidOperationException()));
+ }
+
+
+
+ [Fact]
+ public void ThrowAfterCompletion_returns_same_instance()
+ {
+ var sut = new FaultyExtractor(new[] { 1 });
+
+ Assert.Same(sut, sut.ThrowAfterCompletion(new InvalidOperationException()));
+ }
+
+
+
+ [Fact]
+ public void DuplicateAt_returns_same_instance()
+ {
+ var sut = new FaultyExtractor(new[] { 1 });
+
+ Assert.Same(sut, sut.DuplicateAt(0));
+ }
+
+
+
+ private sealed class FaultyExtractorWithTimer : FaultyExtractor
+ {
+ public FaultyExtractorWithTimer(IEnumerable items, IProgressTimer timer)
+ : base(items, timer) { }
+ }
+}
diff --git a/tests/Wolfgang.Etl.TestKit.Tests.Unit/FaultyLoaderTests.cs b/tests/Wolfgang.Etl.TestKit.Tests.Unit/FaultyLoaderTests.cs
new file mode 100644
index 0000000..1095095
--- /dev/null
+++ b/tests/Wolfgang.Etl.TestKit.Tests.Unit/FaultyLoaderTests.cs
@@ -0,0 +1,285 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Wolfgang.Etl.Abstractions;
+using Wolfgang.Etl.TestKit.Xunit;
+using Xunit;
+
+namespace Wolfgang.Etl.TestKit.Tests.Unit;
+
+public class FaultyLoaderTests
+{
+ // ------------------------------------------------------------------
+ // Constructor — argument validation
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public void Constructor_with_timer_when_timer_is_null_throws_ArgumentNullException()
+ {
+ Assert.Throws
+ (
+ () => new FaultyLoaderWithTimer(collectItems: false, null!)
+ );
+ }
+
+
+
+ [Fact]
+ public async Task Constructor_with_timer_creates_loader_that_collects_items()
+ {
+ using var timer = new ManualProgressTimer();
+ var loader = new FaultyLoaderWithTimer(collectItems: true, timer);
+
+ await loader.LoadAsync(new FaultyExtractor(new[] { 1, 2, 3 }).ExtractAsync());
+
+ Assert.Equal(new[] { 1, 2, 3 }, loader.GetCollectedItems());
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // No faults — pass-through
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task LoadAsync_when_no_faults_configured_loads_all_items_in_order()
+ {
+ var loader = new FaultyLoader(collectItems: true);
+
+ await loader.LoadAsync(new FaultyExtractor(new[] { 1, 2, 3 }).ExtractAsync());
+
+ Assert.Equal
+ (
+ new[] { 1, 2, 3 },
+ loader.GetCollectedItems()
+ );
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // ThrowAt
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task LoadAsync_when_ThrowAt_configured_throws_that_exception_reaching_the_index()
+ {
+ var expected = new TimeoutException("connection lost");
+ var loader = new FaultyLoader(collectItems: true)
+ .ThrowAt(2, expected);
+
+ var actual = await Assert.ThrowsAsync
+ (
+ async () => await loader.LoadAsync(new FaultyExtractor(new[] { 10, 20, 30, 40, 50 }).ExtractAsync())
+ );
+
+ Assert.Same(expected, actual);
+ Assert.Equal(new[] { 10, 20 }, loader.GetCollectedItems());
+ }
+
+
+
+ [Fact]
+ public async Task LoadAsync_when_ThrowAt_configured_counts_the_failing_item()
+ {
+ var loader = new FaultyLoader(collectItems: true)
+ .ThrowAt(2, new TimeoutException("connection lost"));
+
+ await Assert.ThrowsAsync
+ (
+ async () => await loader.LoadAsync(new FaultyExtractor(new[] { 10, 20, 30, 40, 50 }).ExtractAsync())
+ );
+
+ // Index 2 is the third item; the failing item is counted, so 2 + 1 == 3.
+ Assert.Equal(3, loader.CurrentItemCount);
+ }
+
+
+
+ [Fact]
+ public async Task LoadAsync_when_ThrowAt_called_twice_for_same_index_last_exception_wins()
+ {
+ var first = new InvalidOperationException("first");
+ var second = new TimeoutException("second");
+ var loader = new FaultyLoader(collectItems: false)
+ .ThrowAt(1, first)
+ .ThrowAt(1, second);
+
+ var actual = await Assert.ThrowsAsync
+ (
+ async () => await loader.LoadAsync(new FaultyExtractor(new[] { 1, 2, 3 }).ExtractAsync())
+ );
+
+ Assert.Same(second, actual);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // ThrowAfterCompletion
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task LoadAsync_when_ThrowAfterCompletion_configured_loads_all_items_then_throws()
+ {
+ var expected = new InvalidOperationException("commit failed");
+ var loader = new FaultyLoader(collectItems: true)
+ .ThrowAfterCompletion(expected);
+
+ var actual = await Assert.ThrowsAsync
+ (
+ async () => await loader.LoadAsync(new FaultyExtractor(new[] { 1, 2, 3 }).ExtractAsync())
+ );
+
+ Assert.Same(expected, actual);
+ Assert.Equal(new[] { 1, 2, 3 }, loader.GetCollectedItems());
+ Assert.Equal(3, loader.CurrentItemCount);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // DuplicateAt
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task LoadAsync_when_DuplicateAt_configured_loads_that_item_twice_consecutively()
+ {
+ var loader = new FaultyLoader(collectItems: true)
+ .DuplicateAt(1);
+
+ await loader.LoadAsync(new FaultyExtractor(new[] { 1, 2, 3 }).ExtractAsync());
+
+ Assert.Equal(new[] { 1, 2, 2, 3 }, loader.GetCollectedItems());
+ Assert.Equal(4, loader.CurrentItemCount);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // Composability
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task LoadAsync_when_DuplicateAt_and_later_ThrowAt_configured_duplicate_loaded_then_throw_fires()
+ {
+ var expected = new TimeoutException("boom");
+ var loader = new FaultyLoader(collectItems: true)
+ .DuplicateAt(1)
+ .ThrowAt(3, expected);
+
+ var actual = await Assert.ThrowsAsync
+ (
+ async () => await loader.LoadAsync(new FaultyExtractor(new[] { 1, 2, 3, 4, 5 }).ExtractAsync())
+ );
+
+ Assert.Same(expected, actual);
+ Assert.Equal(new[] { 1, 2, 2, 3 }, loader.GetCollectedItems());
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // Argument validation
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public void ThrowAt_when_index_is_negative_throws_ArgumentOutOfRangeException()
+ {
+ var loader = new FaultyLoader(collectItems: false);
+
+ var ex = Assert.Throws
+ (
+ () => loader.ThrowAt(-1, new InvalidOperationException())
+ );
+
+ Assert.Equal("index", ex.ParamName);
+ }
+
+
+
+ [Fact]
+ public void ThrowAt_when_exception_is_null_throws_ArgumentNullException()
+ {
+ var loader = new FaultyLoader(collectItems: false);
+
+ var ex = Assert.Throws
+ (
+ () => loader.ThrowAt(0, null!)
+ );
+
+ Assert.Equal("exception", ex.ParamName);
+ }
+
+
+
+ [Fact]
+ public void ThrowAfterCompletion_when_exception_is_null_throws_ArgumentNullException()
+ {
+ var loader = new FaultyLoader(collectItems: false);
+
+ var ex = Assert.Throws
+ (
+ () => loader.ThrowAfterCompletion(null!)
+ );
+
+ Assert.Equal("exception", ex.ParamName);
+ }
+
+
+
+ [Fact]
+ public void DuplicateAt_when_index_is_negative_throws_ArgumentOutOfRangeException()
+ {
+ var loader = new FaultyLoader(collectItems: false);
+
+ var ex = Assert.Throws
+ (
+ () => loader.DuplicateAt(-1)
+ );
+
+ Assert.Equal("index", ex.ParamName);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // Fluent chaining
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public void ThrowAt_returns_same_instance()
+ {
+ var loader = new FaultyLoader(collectItems: false);
+
+ Assert.Same(loader, loader.ThrowAt(0, new InvalidOperationException()));
+ }
+
+
+
+ [Fact]
+ public void ThrowAfterCompletion_returns_same_instance()
+ {
+ var loader = new FaultyLoader(collectItems: false);
+
+ Assert.Same(loader, loader.ThrowAfterCompletion(new InvalidOperationException()));
+ }
+
+
+
+ [Fact]
+ public void DuplicateAt_returns_same_instance()
+ {
+ var loader = new FaultyLoader(collectItems: false);
+
+ Assert.Same(loader, loader.DuplicateAt(0));
+ }
+
+
+
+ private sealed class FaultyLoaderWithTimer : FaultyLoader
+ {
+ public FaultyLoaderWithTimer(bool collectItems, IProgressTimer timer)
+ : base(collectItems, timer) { }
+ }
+}
diff --git a/tests/Wolfgang.Etl.TestKit.Tests.Unit/FaultyTransformerTests.cs b/tests/Wolfgang.Etl.TestKit.Tests.Unit/FaultyTransformerTests.cs
new file mode 100644
index 0000000..ee64544
--- /dev/null
+++ b/tests/Wolfgang.Etl.TestKit.Tests.Unit/FaultyTransformerTests.cs
@@ -0,0 +1,313 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Wolfgang.Etl.Abstractions;
+using Wolfgang.Etl.TestKit.Xunit;
+using Xunit;
+
+namespace Wolfgang.Etl.TestKit.Tests.Unit;
+
+public class FaultyTransformerTests
+{
+ // ------------------------------------------------------------------
+ // Constructor — argument validation
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public void Constructor_with_timer_when_timer_is_null_throws_ArgumentNullException()
+ {
+ Assert.Throws
+ (
+ () => new FaultyTransformerWithTimer(null!)
+ );
+ }
+
+
+
+ [Fact]
+ public async Task Constructor_with_timer_creates_transformer_that_yields_items()
+ {
+ using var timer = new ManualProgressTimer();
+ var transformer = new FaultyTransformerWithTimer(timer);
+ var extractor = new FaultyExtractor(new[] { 1, 2, 3 });
+
+ var results = await transformer.TransformAsync(extractor.ExtractAsync()).ToListAsync();
+
+ Assert.Equal(new[] { 1, 2, 3 }, results);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // No faults — pass-through
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task TransformAsync_when_no_faults_configured_yields_all_items_in_order()
+ {
+ var extractor = new FaultyExtractor(new[] { 1, 2, 3 });
+ var transformer = new FaultyTransformer();
+
+ var results = await transformer.TransformAsync(extractor.ExtractAsync()).ToListAsync();
+
+ Assert.Equal
+ (
+ new[] { 1, 2, 3 },
+ results
+ );
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // ThrowAt
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task TransformAsync_when_ThrowAt_configured_throws_that_exception_reaching_the_index()
+ {
+ var expected = new InvalidOperationException("bad record");
+ var extractor = new FaultyExtractor(new[] { 10, 20, 30, 40, 50 });
+ var transformer = new FaultyTransformer().ThrowAt(2, expected);
+
+ var collected = new List();
+
+ var actual = await Assert.ThrowsAsync
+ (
+ async () =>
+ {
+ await foreach (var item in transformer.TransformAsync(extractor.ExtractAsync()))
+ {
+ collected.Add(item);
+ }
+ }
+ );
+
+ Assert.Same(expected, actual);
+ Assert.Equal(new[] { 10, 20 }, collected);
+ }
+
+
+
+ [Fact]
+ public async Task TransformAsync_when_ThrowAt_configured_counts_the_failing_item()
+ {
+ var extractor = new FaultyExtractor(new[] { 10, 20, 30, 40, 50 });
+ var transformer = new FaultyTransformer().ThrowAt(2, new InvalidOperationException("bad record"));
+
+ await Assert.ThrowsAsync
+ (
+ async () => await transformer.TransformAsync(extractor.ExtractAsync()).ToListAsync()
+ );
+
+ // Index 2 is the third item; the failing item is counted, so 2 + 1 == 3.
+ Assert.Equal(3, transformer.CurrentItemCount);
+ }
+
+
+
+ [Fact]
+ public async Task TransformAsync_when_ThrowAt_called_twice_for_same_index_last_exception_wins()
+ {
+ var first = new InvalidOperationException("first");
+ var second = new TimeoutException("second");
+ var extractor = new FaultyExtractor(new[] { 1, 2, 3 });
+ var transformer = new FaultyTransformer()
+ .ThrowAt(1, first)
+ .ThrowAt(1, second);
+
+ var actual = await Assert.ThrowsAsync
+ (
+ async () => await transformer.TransformAsync(extractor.ExtractAsync()).ToListAsync()
+ );
+
+ Assert.Same(second, actual);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // ThrowAfterCompletion
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task TransformAsync_when_ThrowAfterCompletion_configured_yields_all_items_then_throws()
+ {
+ var expected = new InvalidOperationException("flush failed");
+ var extractor = new FaultyExtractor(new[] { 1, 2, 3 });
+ var transformer = new FaultyTransformer().ThrowAfterCompletion(expected);
+
+ var collected = new List();
+
+ var actual = await Assert.ThrowsAsync
+ (
+ async () =>
+ {
+ await foreach (var item in transformer.TransformAsync(extractor.ExtractAsync()))
+ {
+ collected.Add(item);
+ }
+ }
+ );
+
+ Assert.Same(expected, actual);
+ Assert.Equal(new[] { 1, 2, 3 }, collected);
+ Assert.Equal(3, transformer.CurrentItemCount);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // DuplicateAt
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task TransformAsync_when_DuplicateAt_configured_yields_that_item_twice_consecutively()
+ {
+ var extractor = new FaultyExtractor(new[] { 1, 2, 3 });
+ var transformer = new FaultyTransformer().DuplicateAt(1);
+
+ var results = await transformer.TransformAsync(extractor.ExtractAsync()).ToListAsync();
+
+ Assert.Equal(new[] { 1, 2, 2, 3 }, results);
+ Assert.Equal(4, transformer.CurrentItemCount);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // Composability
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public async Task TransformAsync_when_DuplicateAt_and_later_ThrowAt_configured_duplicate_emitted_then_throw_fires()
+ {
+ var expected = new InvalidOperationException("boom");
+ var extractor = new FaultyExtractor(new[] { 1, 2, 3, 4, 5 });
+ var transformer = new FaultyTransformer()
+ .DuplicateAt(1)
+ .ThrowAt(3, expected);
+
+ var collected = new List();
+
+ var actual = await Assert.ThrowsAsync
+ (
+ async () =>
+ {
+ await foreach (var item in transformer.TransformAsync(extractor.ExtractAsync()))
+ {
+ collected.Add(item);
+ }
+ }
+ );
+
+ Assert.Same(expected, actual);
+ Assert.Equal(new[] { 1, 2, 2, 3 }, collected);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // Argument validation
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public void ThrowAt_when_index_is_negative_throws_ArgumentOutOfRangeException()
+ {
+ var transformer = new FaultyTransformer();
+
+ var ex = Assert.Throws
+ (
+ () => transformer.ThrowAt(-1, new InvalidOperationException())
+ );
+
+ Assert.Equal("index", ex.ParamName);
+ }
+
+
+
+ [Fact]
+ public void ThrowAt_when_exception_is_null_throws_ArgumentNullException()
+ {
+ var transformer = new FaultyTransformer();
+
+ var ex = Assert.Throws
+ (
+ () => transformer.ThrowAt(0, null!)
+ );
+
+ Assert.Equal("exception", ex.ParamName);
+ }
+
+
+
+ [Fact]
+ public void ThrowAfterCompletion_when_exception_is_null_throws_ArgumentNullException()
+ {
+ var transformer = new FaultyTransformer();
+
+ var ex = Assert.Throws
+ (
+ () => transformer.ThrowAfterCompletion(null!)
+ );
+
+ Assert.Equal("exception", ex.ParamName);
+ }
+
+
+
+ [Fact]
+ public void DuplicateAt_when_index_is_negative_throws_ArgumentOutOfRangeException()
+ {
+ var transformer = new FaultyTransformer();
+
+ var ex = Assert.Throws
+ (
+ () => transformer.DuplicateAt(-1)
+ );
+
+ Assert.Equal("index", ex.ParamName);
+ }
+
+
+
+ // ------------------------------------------------------------------
+ // Fluent chaining
+ // ------------------------------------------------------------------
+
+ [Fact]
+ public void ThrowAt_returns_same_instance()
+ {
+ var transformer = new FaultyTransformer();
+
+ Assert.Same(transformer, transformer.ThrowAt(0, new InvalidOperationException()));
+ }
+
+
+
+ [Fact]
+ public void ThrowAfterCompletion_returns_same_instance()
+ {
+ var transformer = new FaultyTransformer();
+
+ Assert.Same(transformer, transformer.ThrowAfterCompletion(new InvalidOperationException()));
+ }
+
+
+
+ [Fact]
+ public void DuplicateAt_returns_same_instance()
+ {
+ var transformer = new FaultyTransformer();
+
+ Assert.Same(transformer, transformer.DuplicateAt(0));
+ }
+
+
+
+ private sealed class FaultyTransformerWithTimer : FaultyTransformer
+ {
+ public FaultyTransformerWithTimer(IProgressTimer timer) : base(timer) { }
+ }
+}