Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -433,3 +433,4 @@ docfx_project/obj/
docs/*
!docs/.gitkeep
!docs/RELEASE-WORKFLOW-SETUP.md
.nuget/nuget.exe
11 changes: 11 additions & 0 deletions examples/Net8.0/Example6-ReducingDuplicateCode/ConsoleColors.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Example6_ReducingDuplicateCode
{
internal class ConsoleColors
{
public const string Green = "\u001b[32m";
public const string Yellow = "\u001b[33m";
public const string Reset = "\u001b[0m";
public const string Red = "\u001b[31m";
public const string Cyan = "\u001b[36m";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,27 @@ public int ProgressInterval

public Task LoadAsync(IAsyncEnumerable<string> items)
{
ArgumentNullException.ThrowIfNull(items, nameof(items));
ArgumentNullException.ThrowIfNull(items);

return WorkerAsync(items, null, CancellationToken.None);
return WorkerAsync(items, progress: null, CancellationToken.None);
}



public Task LoadAsync(IAsyncEnumerable<string> items, CancellationToken token)
{

ArgumentNullException.ThrowIfNull(items, nameof(items));
ArgumentNullException.ThrowIfNull(items);

return WorkerAsync(items, null, token);
return WorkerAsync(items, progress: null, token);
}



public Task LoadAsync(IAsyncEnumerable<string> items, IProgress<EtlProgress> progress)
{
ArgumentNullException.ThrowIfNull(items, nameof(items));
ArgumentNullException.ThrowIfNull(progress, nameof(progress));
ArgumentNullException.ThrowIfNull(items);
ArgumentNullException.ThrowIfNull(progress);

return WorkerAsync(items, progress, CancellationToken.None);
}
Expand All @@ -57,8 +57,8 @@ public Task LoadAsync(IAsyncEnumerable<string> items, IProgress<EtlProgress> pro

public Task LoadAsync(IAsyncEnumerable<string> items, IProgress<EtlProgress> progress, CancellationToken token)
{
ArgumentNullException.ThrowIfNull(items, nameof(items));
ArgumentNullException.ThrowIfNull(progress, nameof(progress));
ArgumentNullException.ThrowIfNull(items);
ArgumentNullException.ThrowIfNull(progress);

return WorkerAsync(items, progress, token);
}
Expand All @@ -72,26 +72,26 @@ private async Task WorkerAsync
CancellationToken token
)
{
ArgumentNullException.ThrowIfNull(items, nameof(items));
ArgumentNullException.ThrowIfNull(items);

Console.WriteLine($"{ConsoleColors.Green}Loading{ConsoleColors.Reset} data to console asynchronously...\n");

var count = 0;
await using var timer = new Timer
(
_ => progress?.Report(new EtlProgress(Volatile.Read(ref count))),
null,
state: null,
TimeSpan.Zero,
TimeSpan.FromMilliseconds(_progressInterval) // Use the configured progress interval
);
).ConfigureAwait(false);
Comment on lines 82 to +86
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.Threading.Timer is IDisposable and the constructor doesn’t return a task, so await using and .ConfigureAwait(false) on the timer creation won’t compile. Use a normal using var (or PeriodicTimer) and keep it within the async method scope so it’s disposed after the load completes.

Copilot uses AI. Check for mistakes.


await foreach (var item in items.WithCancellation(token))
{
token.ThrowIfCancellationRequested();

Console.WriteLine($"Loading item: {item}\n");
await Task.Delay(50); // Simulate some delay for loading
await Task.Delay(50, token).ConfigureAwait(false); // Simulate some delay for loading
count = Interlocked.Increment(ref count);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ public int ProgressInterval

public IAsyncEnumerable<int> ExtractAsync()
{
return WorkerAsync(null, CancellationToken.None);
return WorkerAsync(progress: null, CancellationToken.None);
}



public IAsyncEnumerable<int> ExtractAsync(CancellationToken token)
{
return WorkerAsync(null, token);
return WorkerAsync(progress: null, token);
}



public IAsyncEnumerable<int> ExtractAsync(IProgress<EtlProgress> progress)
{
ArgumentNullException.ThrowIfNull(progress, nameof(progress));
ArgumentNullException.ThrowIfNull(progress);

return WorkerAsync(progress, CancellationToken.None);
}
Expand All @@ -55,7 +55,7 @@ public IAsyncEnumerable<int> ExtractAsync
CancellationToken token
)
{
ArgumentNullException.ThrowIfNull(progress, nameof(progress));
ArgumentNullException.ThrowIfNull(progress);

return WorkerAsync(progress, token);
}
Expand All @@ -74,10 +74,10 @@ [EnumeratorCancellation] CancellationToken token
await using var timer = new Timer
(
_ => progress?.Report(new EtlProgress(Volatile.Read(ref count))),
null,
state: null,
TimeSpan.Zero,
TimeSpan.FromMilliseconds(_progressInterval) // Use the configured progress interval
);
).ConfigureAwait(false);
Comment on lines 76 to +80
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.Threading.Timer is IDisposable and the constructor doesn’t return a task, so await using and .ConfigureAwait(false) on the timer creation won’t compile. Use using var (or PeriodicTimer) and keep the timer scoped to the async iterator so it lives for the duration of enumeration.

Copilot uses AI. Check for mistakes.

var current = 1;
var previous = 0;
Expand All @@ -100,7 +100,7 @@ [EnumeratorCancellation] CancellationToken token
var temp = current;
current += previous;
previous = temp;
await Task.Delay(100); // Simulate asynchronous operation
await Task.Delay(100, CancellationToken.None).ConfigureAwait(false); // Simulate asynchronous operation
}
Comment on lines 100 to 104
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This delay uses CancellationToken.None, which prevents timely cancellation during the simulated work. Pass the iterator’s token to Task.Delay to ensure cancellation stops the extractor promptly.

Copilot uses AI. Check for mistakes.

progress?.Report(new EtlProgress(Volatile.Read(ref count))); // Report final count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public IAsyncEnumerable<string> TransformAsync
IAsyncEnumerable<int> items
)
{
ArgumentNullException.ThrowIfNull(items, nameof(items));
ArgumentNullException.ThrowIfNull(items);

return WorkerAsync(items, null, CancellationToken.None);
return WorkerAsync(items, progress: null, CancellationToken.None);
}


Expand All @@ -44,9 +44,9 @@ public IAsyncEnumerable<string> TransformAsync
CancellationToken token
)
{
ArgumentNullException.ThrowIfNull(items, nameof(items));
ArgumentNullException.ThrowIfNull(items);

return WorkerAsync(items, null, token);
return WorkerAsync(items, progress: null, token);
}


Expand All @@ -57,8 +57,8 @@ public IAsyncEnumerable<string> TransformAsync
IProgress<EtlProgress> progress
)
{
ArgumentNullException.ThrowIfNull(items, nameof(items));
ArgumentNullException.ThrowIfNull(progress, nameof(progress));
ArgumentNullException.ThrowIfNull(items);
ArgumentNullException.ThrowIfNull(progress);

return WorkerAsync(items, progress, CancellationToken.None);
}
Expand All @@ -72,8 +72,8 @@ public IAsyncEnumerable<string> TransformAsync
CancellationToken token
)
{
ArgumentNullException.ThrowIfNull(items, nameof(items));
ArgumentNullException.ThrowIfNull(progress, nameof(progress));
ArgumentNullException.ThrowIfNull(items);
ArgumentNullException.ThrowIfNull(progress);

return WorkerAsync(items, progress, token);

Expand All @@ -88,18 +88,18 @@ private async IAsyncEnumerable<string> WorkerAsync
[EnumeratorCancellation] CancellationToken token
)
{
ArgumentNullException.ThrowIfNull(items, nameof(items));
ArgumentNullException.ThrowIfNull(items);

Console.WriteLine($"{ConsoleColors.Green}Transforming{ConsoleColors.Reset} integers to strings asynchronously...\n");

var count = 0;
await using var timer = new Timer
(
_ => progress?.Report(new EtlProgress(Volatile.Read(ref count))),
null,
state: null,
TimeSpan.Zero,
TimeSpan.FromMilliseconds(_progressInterval) // Use the configured progress interval
);
).ConfigureAwait(false);
Comment on lines 98 to +102
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.Threading.Timer is IDisposable (not IAsyncDisposable) and the constructor doesn’t return a task, so await using and the trailing .ConfigureAwait(false) on the timer creation won’t compile. Use a normal using var for the timer (or switch to PeriodicTimer with an async loop) and keep it scoped to the async iterator so it’s disposed when enumeration completes.

Copilot uses AI. Check for mistakes.


await foreach (var item in items.WithCancellation(token))
Expand All @@ -115,7 +115,7 @@ [EnumeratorCancellation] CancellationToken token
}

Console.WriteLine($"Transforming integer {item} to string.");
await Task.Delay(50); // Simulate some delay for transformation
await Task.Delay(50, CancellationToken.None).ConfigureAwait(false); // Simulate some delay for transformation
yield return item.ToString();
Comment on lines 117 to 119
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This delay is using CancellationToken.None, which makes cancellation less responsive (the iterator won’t observe cancellation until after the delay completes). Pass the method’s token to Task.Delay so cancellation can interrupt the delay promptly.

Copilot uses AI. Check for mistakes.
count = Interlocked.Increment(ref count);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
namespace Example6_ReducingDuplicateCode
{
internal record EtlProgress(int CurrentCount);
}
31 changes: 8 additions & 23 deletions examples/Net8.0/Example6-ReducingDuplicateCode/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ private static async Task Main()
Console.WriteLine($"{ConsoleColors.Green}.NET Version: {frameworkVersion}{ConsoleColors.Reset}\n");


await EtlWithNoProgressOrCancellation();
await EtlWithNoProgressOrCancellation().ConfigureAwait(false);


await EtlWithCancellationToken();
await EtlWithCancellationToken().ConfigureAwait(false);


await EtlWithProgress();
await EtlWithProgress().ConfigureAwait(false);


await EtlWithProgressAndCancellationToken();
await EtlWithProgressAndCancellationToken().ConfigureAwait(false);


Console.WriteLine($"\n\n{ConsoleColors.Yellow}All ETLs completed.{ConsoleColors.Reset}");
Expand All @@ -43,7 +43,7 @@ private static async Task EtlWithNoProgressOrCancellation()

var sourceItems = extractor.ExtractAsync();
var transformedItems = transformer.TransformAsync(sourceItems);
await loader.LoadAsync(transformedItems);
await loader.LoadAsync(transformedItems).ConfigureAwait(false);

Console.WriteLine($"\n\n{ConsoleColors.Yellow}ETL process completed.{ConsoleColors.Reset}");

Expand All @@ -67,7 +67,7 @@ private static async Task EtlWithCancellationToken()

var sourceItems = extractor.ExtractAsync(token);
var transformedItems = transformer.TransformAsync(sourceItems, token);
await loader.LoadAsync(transformedItems, token);
await loader.LoadAsync(transformedItems, token).ConfigureAwait(false);

Console.WriteLine($"\n\n{ConsoleColors.Yellow}ETL process completed.{ConsoleColors.Reset}");

Expand Down Expand Up @@ -95,7 +95,7 @@ private static async Task EtlWithProgress()
// the loader, but you could also pass it to the extractor or transformer depending on your needs.
var sourceItems = extractor.ExtractAsync();
var transformedItems = transformer.TransformAsync(sourceItems);
await loader.LoadAsync(transformedItems, progress);
await loader.LoadAsync(transformedItems, progress).ConfigureAwait(false);

Console.WriteLine($"\n\n{ConsoleColors.Yellow}ETL process completed.{ConsoleColors.Reset}");

Expand Down Expand Up @@ -126,25 +126,10 @@ private static async Task EtlWithProgressAndCancellationToken()
// the loader, but you could also pass it to the extractor or transformer depending on your needs.
var sourceItems = extractor.ExtractAsync(token);
var transformedItems = transformer.TransformAsync(sourceItems, token);
await loader.LoadAsync(transformedItems, progress, token);
await loader.LoadAsync(transformedItems, progress, token).ConfigureAwait(false);

Console.WriteLine($"\n\n{ConsoleColors.Yellow}ETL process completed.{ConsoleColors.Reset}");

}
}



internal record EtlProgress(int CurrentCount);



internal class ConsoleColors
{
public const string Green = "\u001b[32m";
public const string Yellow = "\u001b[33m";
public const string Reset = "\u001b[0m";
public const string Red = "\u001b[31m";
public const string Cyan = "\u001b[36m";
}
}
13 changes: 7 additions & 6 deletions src/Wolfgang.Etl.Abstractions/ExtractorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class ExtractorBase<TSource, TProgress>
/// <summary>
/// The number of milliseconds between progress updates.
/// </summary>
/// <exception cref="ArgumentException">Value cannot be less than 1</exception>
/// <exception cref="ArgumentOutOfRangeException">Value cannot be less than 1</exception>
public int ReportingInterval
{
get => _reportingInterval;
Expand All @@ -50,7 +50,7 @@ public int ReportingInterval
/// It is the responsibility of the derived class to keep this value up to date as the
/// base class will have no way of knowing the correct value
/// </remarks>

/// <exception cref="ArgumentOutOfRangeException">Value cannot be less than 0</exception>
[Range(0, int.MaxValue, ErrorMessage = "Current item count cannot be less than 0.")]
public int CurrentItemCount
{
Expand All @@ -70,6 +70,7 @@ protected set
/// <summary>
/// Gets the current number of records skipped
/// </summary>
/// <exception cref="ArgumentOutOfRangeException">Value cannot be less than 0</exception>
public int CurrentSkippedItemCount
{
get => _currentSkippedItemCount;
Expand All @@ -92,7 +93,7 @@ protected set
/// This is useful for partially extracting data from a source, especially when the source is large
/// or infinite or during development.
/// </remarks>
/// <exception cref="ArgumentException">The specified value is less than 0</exception>
/// <exception cref="ArgumentOutOfRangeException">The specified value is less than 0</exception>
/// <example>
/// <code>
/// var count = 0;
Expand Down Expand Up @@ -136,7 +137,7 @@ public int MaximumItemCount
/// This is useful for partially extracting data from a source during development, or to skip
/// items that were already processed or are not relevant for the current extraction.
/// </remarks>
/// <exception cref="ArgumentException">The specified value is less than 0</exception>
/// <exception cref="ArgumentOutOfRangeException">The specified value is less than 0</exception>
/// <example>
/// <code>
/// using (var reader = new StreamReader(filePath))
Expand Down Expand Up @@ -231,7 +232,7 @@ public virtual IAsyncEnumerable<TSource> ExtractAsync(IProgress<TProgress> progr
using var timer = new Timer
(
_ => progress.Report(CreateProgressReport()),
null,
state: null,
TimeSpan.Zero,
TimeSpan.FromMilliseconds(ReportingInterval)
);
Comment on lines 232 to 238
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExtractAsync(progress) returns a deferred IAsyncEnumerable, but the progress Timer is in a using var, so it will be disposed as soon as the method returns (before enumeration starts). Wrap the worker enumeration in an async iterator that owns the timer (or otherwise keep the timer alive for the duration of enumeration) so progress reports are actually emitted.

Copilot uses AI. Check for mistakes.
Expand Down Expand Up @@ -265,7 +266,7 @@ public virtual IAsyncEnumerable<TSource> ExtractAsync(IProgress<TProgress> progr
using var timer = new Timer
(
_ => progress.Report(CreateProgressReport()),
null,
state: null,
TimeSpan.Zero,

TimeSpan.FromMilliseconds(ReportingInterval)
Expand Down
Loading
Loading