Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions tests/Wolfgang.Etl.TestKit.Tests.Unit/FaultyExtractorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,101 @@ public void DuplicateAt_returns_same_instance()



// ------------------------------------------------------------------
// Progress + injected timer
// ------------------------------------------------------------------

[Fact]
public async Task ExtractAsync_with_progress_and_injected_timer_reports_progress_when_timer_fires()
{
using var timer = new ManualProgressTimer();
var sut = new FaultyExtractorWithTimer(new List<int> { 1, 2, 3 }, timer);
Report? captured = null;
var progress = new SynchronousProgress<Report>(r => captured = r);

await using var enumerator = sut.ExtractAsync(progress).GetAsyncEnumerator();
await enumerator.MoveNextAsync();
timer.Fire();
while (await enumerator.MoveNextAsync()) { }

Assert.NotNull(captured);
Assert.True(captured!.CurrentItemCount >= 1);
}



[Fact]
public async Task ExtractAsync_with_progress_and_no_injected_timer_yields_all_items()
{
var sut = new FaultyExtractor<int>(new List<int> { 1, 2, 3 });
var progress = new SynchronousProgress<Report>(_ => { });

var results = await sut.ExtractAsync(progress).ToListAsync();

Assert.Equal(new[] { 1, 2, 3 }, results);
}



// ------------------------------------------------------------------
// Dispose
// ------------------------------------------------------------------

[Fact]
public async Task Dispose_after_timer_wired_unsubscribes_so_firing_timer_does_not_report()
{
using var timer = new ManualProgressTimer();
var reportCount = 0;
var progress = new SynchronousProgress<Report>(_ => reportCount++);

var sut = new FaultyExtractorWithTimer(new List<int> { 1, 2, 3 }, timer);

// Pull one item so the timer is created and the Elapsed handler is wired,
// but leave the enumerator undrained so the base class does not dispose the
// injected timer in its finally. Disposing the SUT now runs the unsubscribe
// branch of Dispose(bool); firing afterwards must produce no report.
var enumerator = sut.ExtractAsync(progress).GetAsyncEnumerator();
await enumerator.MoveNextAsync();

sut.Dispose();
timer.Fire();

Assert.Equal(0, reportCount);
}



// ------------------------------------------------------------------
// SkipItemCount / MaximumItemCount
// ------------------------------------------------------------------

[Fact]
public async Task ExtractAsync_skips_items_up_to_SkipItemCount()
{
var sut = new FaultyExtractor<int>(new[] { 1, 2, 3, 4, 5 });
sut.SkipItemCount = 2;

var results = await sut.ExtractAsync().ToListAsync();

Assert.Equal(new[] { 3, 4, 5 }, results);
Assert.Equal(2, sut.CurrentSkippedItemCount);
}



[Fact]
public async Task ExtractAsync_stops_at_MaximumItemCount()
{
var sut = new FaultyExtractor<int>(new[] { 1, 2, 3, 4, 5 });
sut.MaximumItemCount = 2;

var results = await sut.ExtractAsync().ToListAsync();

Assert.Equal(new[] { 1, 2 }, results);
}



private sealed class FaultyExtractorWithTimer : FaultyExtractor<int>
{
public FaultyExtractorWithTimer(IEnumerable<int> items, IProgressTimer timer)
Expand Down
106 changes: 106 additions & 0 deletions tests/Wolfgang.Etl.TestKit.Tests.Unit/FaultyLoaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,112 @@ public void DuplicateAt_returns_same_instance()



// ------------------------------------------------------------------
// Progress + injected timer
// ------------------------------------------------------------------

[Fact]
public async Task LoadAsync_with_progress_and_injected_timer_reports_progress_when_timer_fires()
{
using var timer = new ManualProgressTimer();
var loader = new FaultyLoaderWithTimer(collectItems: true, timer);
Report? captured = null;
var progress = new SynchronousProgress<Report>(r => captured = r);
var gate = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

var task = loader.LoadAsync(GatedSourceAsync(gate), progress);
timer.Fire();
gate.SetResult(true);
await task;

Assert.NotNull(captured);
Assert.True(captured!.CurrentItemCount >= 1);
}



[Fact]
public async Task LoadAsync_with_progress_and_no_injected_timer_loads_all_items()
{
var loader = new FaultyLoader<int>(collectItems: true);
var progress = new SynchronousProgress<Report>(_ => { });

await loader.LoadAsync(new FaultyExtractor<int>(new[] { 1, 2, 3 }).ExtractAsync(), progress);

Assert.Equal(new[] { 1, 2, 3 }, loader.GetCollectedItems());
}



// ------------------------------------------------------------------
// Dispose
// ------------------------------------------------------------------

[Fact]
public async Task Dispose_after_timer_wired_runs_unsubscribe_branch_without_error()
{
using var timer = new ManualProgressTimer();
var progress = new SynchronousProgress<Report>(_ => { });

var loader = new FaultyLoaderWithTimer(collectItems: false, timer);

// A completed load wires (and leaves wired) the Elapsed handler. Disposing the
// loader then exercises the unsubscribe branch of Dispose(bool). The injected
// timer is owned by the caller, so disposing the loader must not dispose it.
await loader.LoadAsync(new FaultyExtractor<int>(new[] { 1, 2, 3 }).ExtractAsync(), progress);

var exception = Record.Exception(() => loader.Dispose());

Assert.Null(exception);
}



// ------------------------------------------------------------------
// SkipItemCount / MaximumItemCount
// ------------------------------------------------------------------

[Fact]
public async Task LoadAsync_skips_items_up_to_SkipItemCount()
{
var loader = new FaultyLoader<int>(collectItems: true);
loader.SkipItemCount = 2;

await loader.LoadAsync(new FaultyExtractor<int>(new[] { 1, 2, 3, 4, 5 }).ExtractAsync());

Assert.Equal(new[] { 3, 4, 5 }, loader.GetCollectedItems());
Assert.Equal(2, loader.CurrentSkippedItemCount);
}



[Fact]
public async Task LoadAsync_stops_at_MaximumItemCount()
{
var loader = new FaultyLoader<int>(collectItems: true);
loader.MaximumItemCount = 2;

await loader.LoadAsync(new FaultyExtractor<int>(new[] { 1, 2, 3, 4, 5 }).ExtractAsync());

Assert.Equal(new[] { 1, 2 }, loader.GetCollectedItems());
}



// ------------------------------------------------------------------
// Helpers
// ------------------------------------------------------------------

private static async IAsyncEnumerable<int> GatedSourceAsync(TaskCompletionSource<bool> gate)
{
yield return 1;
await gate.Task.ConfigureAwait(false);
yield return 2;
yield return 3;
}



private sealed class FaultyLoaderWithTimer : FaultyLoader<int>
{
public FaultyLoaderWithTimer(bool collectItems, IProgressTimer timer)
Expand Down
101 changes: 101 additions & 0 deletions tests/Wolfgang.Etl.TestKit.Tests.Unit/FaultyTransformerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,107 @@ public void DuplicateAt_returns_same_instance()



// ------------------------------------------------------------------
// Progress + injected timer
// ------------------------------------------------------------------

[Fact]
public async Task TransformAsync_with_progress_and_injected_timer_reports_progress_when_timer_fires()
{
using var timer = new ManualProgressTimer();
var transformer = new FaultyTransformerWithTimer(timer);
var extractor = new FaultyExtractor<int>(new[] { 1, 2, 3 });
Report? captured = null;
var progress = new SynchronousProgress<Report>(r => captured = r);

await using var enumerator =
transformer.TransformAsync(extractor.ExtractAsync(), progress).GetAsyncEnumerator();
await enumerator.MoveNextAsync();
timer.Fire();
while (await enumerator.MoveNextAsync()) { }

Assert.NotNull(captured);
Assert.True(captured!.CurrentItemCount >= 1);
}



[Fact]
public async Task TransformAsync_with_progress_and_no_injected_timer_yields_all_items()
{
var transformer = new FaultyTransformer<int>();
var extractor = new FaultyExtractor<int>(new[] { 1, 2, 3 });
var progress = new SynchronousProgress<Report>(_ => { });

var results = await transformer.TransformAsync(extractor.ExtractAsync(), progress).ToListAsync();

Assert.Equal(new[] { 1, 2, 3 }, results);
}



// ------------------------------------------------------------------
// Dispose
// ------------------------------------------------------------------

[Fact]
public async Task Dispose_after_timer_wired_unsubscribes_so_firing_timer_does_not_report()
{
using var timer = new ManualProgressTimer();
var reportCount = 0;
var progress = new SynchronousProgress<Report>(_ => reportCount++);

var transformer = new FaultyTransformerWithTimer(timer);
var extractor = new FaultyExtractor<int>(new[] { 1, 2, 3 });

// Pull one item so the timer is wired, but leave the enumerator undrained so
// the base class does not dispose the injected timer in its finally. Disposing
// the SUT runs the unsubscribe branch; firing afterwards must produce no report.
var enumerator =
transformer.TransformAsync(extractor.ExtractAsync(), progress).GetAsyncEnumerator();
await enumerator.MoveNextAsync();

transformer.Dispose();
timer.Fire();

Assert.Equal(0, reportCount);
}



// ------------------------------------------------------------------
// SkipItemCount / MaximumItemCount
// ------------------------------------------------------------------

[Fact]
public async Task TransformAsync_skips_items_up_to_SkipItemCount()
{
var extractor = new FaultyExtractor<int>(new[] { 1, 2, 3, 4, 5 });
var transformer = new FaultyTransformer<int>();
transformer.SkipItemCount = 2;

var results = await transformer.TransformAsync(extractor.ExtractAsync()).ToListAsync();

Assert.Equal(new[] { 3, 4, 5 }, results);
Assert.Equal(2, transformer.CurrentSkippedItemCount);
}



[Fact]
public async Task TransformAsync_stops_at_MaximumItemCount()
{
var extractor = new FaultyExtractor<int>(new[] { 1, 2, 3, 4, 5 });
var transformer = new FaultyTransformer<int>();
transformer.MaximumItemCount = 2;

var results = await transformer.TransformAsync(extractor.ExtractAsync()).ToListAsync();

Assert.Equal(new[] { 1, 2 }, results);
}



private sealed class FaultyTransformerWithTimer : FaultyTransformer<int>
{
public FaultyTransformerWithTimer(IProgressTimer timer) : base(timer) { }
Expand Down
Loading