Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions src/Wolfgang.Etl.Abstractions/ExtractorBase.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
Expand All @@ -22,6 +23,43 @@ public abstract class ExtractorBase<TSource, TProgress>
{
private int _currentItemCount;
private int _currentSkippedItemCount;
private long _startTimestamp;
private DateTimeOffset _startedAtUtc;



/// <summary>
/// The UTC time at which the first item was processed (extracted or skipped), or
/// <c>null</c> if extraction has not produced any items yet. Captured automatically
/// the first time <see cref="IncrementCurrentItemCount"/> or
/// <see cref="IncrementCurrentSkippedItemCount"/> is called, so derived classes can
/// surface it on their progress report (see <see cref="Report.StartedAt"/>).
/// </summary>
protected DateTimeOffset? StartedAt =>
Volatile.Read(ref _startTimestamp) == 0 ? null : _startedAtUtc;



/// <summary>
/// The monotonic wall-clock time elapsed since the first item was processed, or
/// <see cref="TimeSpan.Zero"/> if extraction has not produced any items yet.
/// Read this when building a progress report (see <see cref="Report.Elapsed"/>) to
/// snapshot how long extraction has been running.
/// </summary>
protected TimeSpan Elapsed
{
get
{
var start = Volatile.Read(ref _startTimestamp);
if (start == 0)
{
return TimeSpan.Zero;
}

var ticks = Stopwatch.GetTimestamp() - start;
return TimeSpan.FromSeconds(ticks / (double)Stopwatch.Frequency);
}
}



Expand Down Expand Up @@ -304,6 +342,7 @@ private void ReportProgress(object? state)
Justification = "Interlocked.Increment return value intentionally discarded; only the side-effect matters.")]
protected void IncrementCurrentItemCount()
{
EnsureStarted();
_ = Interlocked.Increment(ref _currentItemCount);
}

Expand All @@ -320,6 +359,27 @@ protected void IncrementCurrentItemCount()
Justification = "Interlocked.Increment return value intentionally discarded; only the side-effect matters.")]
protected void IncrementCurrentSkippedItemCount()
{
EnsureStarted();
_ = Interlocked.Increment(ref _currentSkippedItemCount);
}



// Captures the start timestamp (monotonic) and wall-clock StartedAt the first
// time any item is processed. Idempotent and thread-safe: the first caller to
// win the CompareExchange records the start; later calls are a cheap volatile read.
private void EnsureStarted()
{
if (Volatile.Read(ref _startTimestamp) != 0)
{
return;
}

var now = DateTimeOffset.UtcNow;
var timestamp = Stopwatch.GetTimestamp();
if (Interlocked.CompareExchange(ref _startTimestamp, timestamp, 0) == 0)
{
_startedAtUtc = now;
}
}
}
19 changes: 19 additions & 0 deletions src/Wolfgang.Etl.Abstractions/IsExternalInit.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#if !NET5_0_OR_GREATER

using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;

namespace System.Runtime.CompilerServices;

/// <summary>
/// Polyfill for the compiler-required <c>IsExternalInit</c> marker type, which enables
/// <see langword="init"/>-only property setters on target frameworks (.NET Framework,
/// .NET Standard 2.0) whose reference assemblies do not ship it. Built-in on .NET 5.0+.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
[ExcludeFromCodeCoverage]
internal static class IsExternalInit
{
}

#endif
60 changes: 60 additions & 0 deletions src/Wolfgang.Etl.Abstractions/LoaderBase.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -22,6 +23,43 @@ public abstract class LoaderBase<TDestination, TProgress>
{
private int _currentItemCount;
private int _currentSkippedItemCount;
private long _startTimestamp;
private DateTimeOffset _startedAtUtc;



/// <summary>
/// The UTC time at which the first item was processed (loaded or skipped), or
/// <c>null</c> if loading has not produced any items yet. Captured automatically
/// the first time <see cref="IncrementCurrentItemCount"/> or
/// <see cref="IncrementCurrentSkippedItemCount"/> is called, so derived classes can
/// surface it on their progress report (see <see cref="Report.StartedAt"/>).
/// </summary>
protected DateTimeOffset? StartedAt =>
Volatile.Read(ref _startTimestamp) == 0 ? null : _startedAtUtc;



/// <summary>
/// The monotonic wall-clock time elapsed since the first item was processed, or
/// <see cref="TimeSpan.Zero"/> if loading has not produced any items yet.
/// Read this when building a progress report (see <see cref="Report.Elapsed"/>) to
/// snapshot how long loading has been running.
/// </summary>
protected TimeSpan Elapsed
{
get
{
var start = Volatile.Read(ref _startTimestamp);
if (start == 0)
{
return TimeSpan.Zero;
}

var ticks = Stopwatch.GetTimestamp() - start;
return TimeSpan.FromSeconds(ticks / (double)Stopwatch.Frequency);
}
}



Expand Down Expand Up @@ -307,6 +345,7 @@ private void ReportProgress(object? state)
Justification = "Interlocked.Increment return value intentionally discarded; only the side-effect matters.")]
protected void IncrementCurrentItemCount()
{
EnsureStarted();
_ = Interlocked.Increment(ref _currentItemCount);
}

Expand All @@ -323,6 +362,27 @@ protected void IncrementCurrentItemCount()
Justification = "Interlocked.Increment return value intentionally discarded; only the side-effect matters.")]
protected void IncrementCurrentSkippedItemCount()
{
EnsureStarted();
_ = Interlocked.Increment(ref _currentSkippedItemCount);
}



// Captures the start timestamp (monotonic) and wall-clock StartedAt the first
// time any item is processed. Idempotent and thread-safe: the first caller to
// win the CompareExchange records the start; later calls are a cheap volatile read.
private void EnsureStarted()
{
if (Volatile.Read(ref _startTimestamp) != 0)
{
return;
}

var now = DateTimeOffset.UtcNow;
var timestamp = Stopwatch.GetTimestamp();
if (Interlocked.CompareExchange(ref _startTimestamp, timestamp, 0) == 0)
{
_startedAtUtc = now;
}
}
}
15 changes: 15 additions & 0 deletions src/Wolfgang.Etl.Abstractions/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.CurrentItemCount.get -> int
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.CurrentSkippedItemCount.get -> int
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.Elapsed.get -> System.TimeSpan
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.ExtractorBase() -> void
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.IncrementCurrentItemCount() -> void
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.IncrementCurrentSkippedItemCount() -> void
Expand All @@ -11,6 +12,7 @@ Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.ReportingInterval.ge
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.ReportingInterval.set -> void
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.SkipItemCount.get -> int
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.SkipItemCount.set -> void
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.StartedAt.get -> System.DateTimeOffset?
Wolfgang.Etl.Abstractions.IExtractAsync<TSource>
Wolfgang.Etl.Abstractions.IExtractAsync<TSource>.ExtractAsync() -> System.Collections.Generic.IAsyncEnumerable<TSource>!
Wolfgang.Etl.Abstractions.IExtractStage<TSource>
Expand Down Expand Up @@ -71,6 +73,7 @@ Wolfgang.Etl.Abstractions.ITransformWithProgressAsync<TSource, TDestination, TPr
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.CurrentItemCount.get -> int
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.CurrentSkippedItemCount.get -> int
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.Elapsed.get -> System.TimeSpan
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.IncrementCurrentItemCount() -> void
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.IncrementCurrentSkippedItemCount() -> void
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.LoaderBase() -> void
Expand All @@ -80,13 +83,24 @@ Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.ReportingInterval.
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.ReportingInterval.set -> void
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.SkipItemCount.get -> int
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.SkipItemCount.set -> void
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.StartedAt.get -> System.DateTimeOffset?
Wolfgang.Etl.Abstractions.Pipeline
Wolfgang.Etl.Abstractions.Report
Wolfgang.Etl.Abstractions.Report.CurrentItemCount.get -> int
Wolfgang.Etl.Abstractions.Report.Elapsed.get -> System.TimeSpan
Wolfgang.Etl.Abstractions.Report.Elapsed.init -> void
Wolfgang.Etl.Abstractions.Report.EstimatedRemaining.get -> System.TimeSpan?
Wolfgang.Etl.Abstractions.Report.ItemsPerSecond.get -> double
Wolfgang.Etl.Abstractions.Report.PercentComplete.get -> double?
Wolfgang.Etl.Abstractions.Report.Report(int currentItemCount) -> void
Wolfgang.Etl.Abstractions.Report.StartedAt.get -> System.DateTimeOffset?
Wolfgang.Etl.Abstractions.Report.StartedAt.init -> void
Wolfgang.Etl.Abstractions.Report.TotalItemCount.get -> int?
Wolfgang.Etl.Abstractions.Report.TotalItemCount.init -> void
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.CurrentItemCount.get -> int
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.CurrentSkippedItemCount.get -> int
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.Elapsed.get -> System.TimeSpan
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.IncrementCurrentItemCount() -> void
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.IncrementCurrentSkippedItemCount() -> void
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.MaximumItemCount.get -> int
Expand All @@ -95,6 +109,7 @@ Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.Repo
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.ReportingInterval.set -> void
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.SkipItemCount.get -> int
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.SkipItemCount.set -> void
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.StartedAt.get -> System.DateTimeOffset?
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.TransformerBase() -> void
abstract Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.CreateProgressReport() -> TProgress
abstract Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.ExtractWorkerAsync(System.Threading.CancellationToken token) -> System.Collections.Generic.IAsyncEnumerable<TSource>!
Expand Down
119 changes: 117 additions & 2 deletions src/Wolfgang.Etl.Abstractions/Report.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,23 @@
namespace Wolfgang.Etl.Abstractions;

/// <summary>
/// Provides a report of the current item count in an ETL process.
/// Provides a point-in-time snapshot of the progress of an ETL process — how many
/// items have been processed, how long it has been running, and (when the total is
/// known) how far along it is and how much longer it is expected to take.
/// </summary>
/// <remarks>
/// <para>
/// This class can be used as a base class for other progress reports and expanded
/// with additional information such as total count, count remaining, etc.
/// with additional information specific to a particular extractor, transformer, or loader.
/// </para>
/// <para>
/// All values are <em>snapshot</em> values captured at the moment the report is
/// constructed. <see cref="Elapsed"/> does not advance after construction, mirroring
/// <see cref="CurrentItemCount"/>. The throughput and completion estimates
/// (<see cref="ItemsPerSecond"/>, <see cref="PercentComplete"/>,
/// <see cref="EstimatedRemaining"/>) are derived from those snapshot values, so a
/// given report is internally consistent.
/// </para>
/// </remarks>
public record Report
{
Expand All @@ -32,4 +44,107 @@ public Report(int currentItemCount)
/// The number of items that have been processed so far in the ETL process.
/// </summary>
public int CurrentItemCount { get; }



/// <summary>
/// The wall-clock time (UTC) at which processing started, or <c>null</c> if it
/// has not started yet (no items processed) or the producer does not track it.
/// </summary>
public DateTimeOffset? StartedAt { get; init; }



/// <summary>
/// The wall-clock time that had elapsed since processing started, captured at the
/// moment this report was constructed. <see cref="TimeSpan.Zero"/> when timing is
/// not tracked or processing has not started.
/// </summary>
public TimeSpan Elapsed { get; init; }



/// <summary>
/// The total number of items expected to be processed, when known (for example a
/// file line count or a SQL <c>COUNT(*)</c>). <c>null</c> for unknown-size or
/// infinite sources. Must be greater than or equal to 0 when set.
/// </summary>
/// <exception cref="ArgumentOutOfRangeException">The specified value is less than 0.</exception>
public int? TotalItemCount
{
get;
init
{
if (value is < 0)
{
throw new ArgumentOutOfRangeException(nameof(value), "Total item count cannot be less than 0.");
}

field = value;
}
}



/// <summary>
/// The processing throughput, in items per second, derived from
/// <see cref="CurrentItemCount"/> and <see cref="Elapsed"/>. Returns 0 until at
/// least some time has elapsed.
/// </summary>
public double ItemsPerSecond =>
Elapsed.TotalSeconds > 0
? CurrentItemCount / Elapsed.TotalSeconds
: 0d;



/// <summary>
/// The fraction of work completed, as a percentage in the range [0, 100], when
/// <see cref="TotalItemCount"/> is known; otherwise <c>null</c>. Clamped to 100
/// if <see cref="CurrentItemCount"/> exceeds <see cref="TotalItemCount"/>.
/// </summary>
public double? PercentComplete
{
get
{
if (TotalItemCount is not { } total)
{
return null;
}

return total == 0
? 100d
: Math.Min(100d, 100d * CurrentItemCount / total);
}
}



/// <summary>
/// The estimated time remaining until completion, when both
/// <see cref="TotalItemCount"/> is known and throughput can be measured;
/// otherwise <c>null</c>. Returns <see cref="TimeSpan.Zero"/> once the current
/// count has reached the total.
/// </summary>
public TimeSpan? EstimatedRemaining
{
get
{
if (TotalItemCount is not { } total)
{
return null;
}

var remaining = total - CurrentItemCount;
if (remaining <= 0)
{
return TimeSpan.Zero;
}

var rate = ItemsPerSecond;
return rate > 0
? TimeSpan.FromSeconds(remaining / rate)
: (TimeSpan?)null;
}
}
}
Loading
Loading