Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion src/Wolfgang.Etl.Abstractions/ExtractorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;



Expand All @@ -17,14 +18,17 @@ namespace Wolfgang.Etl.Abstractions;
/// <typeparam name="TSource">The type of the object being extracted</typeparam>
/// <typeparam name="TProgress">The type of the progress object</typeparam>
public abstract class ExtractorBase<TSource, TProgress>
: IExtractWithProgressAndCancellationAsync<TSource, TProgress>
: IExtractWithProgressAndCancellationAsync<TSource, TProgress>,
IAsyncDisposable,
IDisposable
where TSource : notnull
where TProgress : notnull
{
private int _currentItemCount;
private int _currentSkippedItemCount;
private long _startTimestamp;
private DateTimeOffset _startedAtUtc;
private bool _disposed;



Expand Down Expand Up @@ -417,4 +421,53 @@ private void EnsureStarted()
_startedAtUtc = now;
}
}



/// <summary>
/// Asynchronously releases the resources held by this extractor. The base implementation is a
/// no-op (the base owns no unmanaged resources); derived classes that hold resources such as
/// streams or connections override <see cref="Dispose(bool)"/> to release them. Enables
/// <c>await using</c> on any extractor.
/// </summary>
/// <returns>A completed <see cref="ValueTask"/> for the default no-op implementation.</returns>
public virtual ValueTask DisposeAsync()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
return default;
}



/// <summary>
/// Releases the resources held by this extractor. The base implementation is a no-op; derived
/// classes that hold resources override <see cref="Dispose(bool)"/>.
/// </summary>
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}



/// <summary>
/// Releases resources held by this extractor. Override in a derived class to dispose resources
/// it owns (streams, connections, etc.), then call <c>base.Dispose(disposing)</c>. The base
/// implementation only marks the instance disposed and is idempotent.
/// </summary>
/// <param name="disposing">
/// <see langword="true"/> when called from <see cref="Dispose()"/> or <see cref="DisposeAsync"/>
/// (dispose managed resources); <see langword="false"/> when called from a finalizer.
/// </param>
protected virtual void Dispose(bool disposing)
{
if (_disposed)
{
return;
}

_disposed = true;
}
}
54 changes: 53 additions & 1 deletion src/Wolfgang.Etl.Abstractions/LoaderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ namespace Wolfgang.Etl.Abstractions;
/// <typeparam name="TDestination">The type of the destination object being written</typeparam>
/// <typeparam name="TProgress">The type of the progress object</typeparam>
public abstract class LoaderBase<TDestination, TProgress>
: ILoadWithProgressAndCancellationAsync<TDestination, TProgress>
: ILoadWithProgressAndCancellationAsync<TDestination, TProgress>,
IAsyncDisposable,
IDisposable
where TDestination : notnull
where TProgress : notnull
{
private int _currentItemCount;
private int _currentSkippedItemCount;
private long _startTimestamp;
private DateTimeOffset _startedAtUtc;
private bool _disposed;



Expand Down Expand Up @@ -416,4 +419,53 @@ private void EnsureStarted()
_startedAtUtc = now;
}
}



/// <summary>
/// Asynchronously releases the resources held by this loader. The base implementation is a
/// no-op (the base owns no unmanaged resources); derived classes that hold resources such as
/// connections or streams override <see cref="Dispose(bool)"/> to release them. Enables
/// <c>await using</c> on any loader.
/// </summary>
/// <returns>A completed <see cref="ValueTask"/> for the default no-op implementation.</returns>
public virtual ValueTask DisposeAsync()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
return default;
}



/// <summary>
/// Releases the resources held by this loader. The base implementation is a no-op; derived
/// classes that hold resources override <see cref="Dispose(bool)"/>.
/// </summary>
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}



/// <summary>
/// Releases resources held by this loader. Override in a derived class to dispose resources
/// it owns (connections, streams, etc.), then call <c>base.Dispose(disposing)</c>. The base
/// implementation only marks the instance disposed and is idempotent.
/// </summary>
/// <param name="disposing">
/// <see langword="true"/> when called from <see cref="Dispose()"/> or <see cref="DisposeAsync"/>
/// (dispose managed resources); <see langword="false"/> when called from a finalizer.
/// </param>
protected virtual void Dispose(bool disposing)
{
if (_disposed)
{
return;
}

_disposed = true;
}
}
9 changes: 9 additions & 0 deletions src/Wolfgang.Etl.Abstractions/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.CurrentItemCount.get -> int
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.CurrentSkippedItemCount.get -> int
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.Dispose() -> void
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.Elapsed.get -> System.TimeSpan
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.ExtractorBase() -> void
Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.IncrementCurrentItemCount() -> void
Expand Down Expand Up @@ -73,6 +74,7 @@ Wolfgang.Etl.Abstractions.ITransformWithProgressAsync<TSource, TDestination, TPr
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.CurrentItemCount.get -> int
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.CurrentSkippedItemCount.get -> int
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.Dispose() -> void
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.Elapsed.get -> System.TimeSpan
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.IncrementCurrentItemCount() -> void
Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.IncrementCurrentSkippedItemCount() -> void
Expand Down Expand Up @@ -100,6 +102,7 @@ Wolfgang.Etl.Abstractions.Report.TotalItemCount.init -> void
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.CurrentItemCount.get -> int
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.CurrentSkippedItemCount.get -> int
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.Dispose() -> void
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.Elapsed.get -> System.TimeSpan
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.IncrementCurrentItemCount() -> void
Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.IncrementCurrentSkippedItemCount() -> void
Expand All @@ -122,16 +125,22 @@ static Wolfgang.Etl.Abstractions.Pipeline.Extract<TSource, TProgress>(Wolfgang.E
static Wolfgang.Etl.Abstractions.Pipeline.Extract<TSource>(Wolfgang.Etl.Abstractions.IExtractAsync<TSource>! extractor) -> Wolfgang.Etl.Abstractions.IExtractStage<TSource>!
static Wolfgang.Etl.Abstractions.Pipeline.Extract<TSource>(Wolfgang.Etl.Abstractions.IExtractWithCancellationAsync<TSource>! extractor) -> Wolfgang.Etl.Abstractions.IExtractStage<TSource>!
virtual Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.CreateProgressTimer(System.IProgress<TProgress>! progress) -> Wolfgang.Etl.Abstractions.IProgressTimer!
virtual Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.Dispose(bool disposing) -> void
virtual Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.DisposeAsync() -> System.Threading.Tasks.ValueTask
virtual Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.ExtractAsync() -> System.Collections.Generic.IAsyncEnumerable<TSource>!
virtual Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.ExtractAsync(System.IProgress<TProgress>! progress) -> System.Collections.Generic.IAsyncEnumerable<TSource>!
virtual Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.ExtractAsync(System.IProgress<TProgress>! progress, System.Threading.CancellationToken token) -> System.Collections.Generic.IAsyncEnumerable<TSource>!
virtual Wolfgang.Etl.Abstractions.ExtractorBase<TSource, TProgress>.ExtractAsync(System.Threading.CancellationToken token) -> System.Collections.Generic.IAsyncEnumerable<TSource>!
virtual Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.CreateProgressTimer(System.IProgress<TProgress>! progress) -> Wolfgang.Etl.Abstractions.IProgressTimer!
virtual Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.Dispose(bool disposing) -> void
virtual Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.DisposeAsync() -> System.Threading.Tasks.ValueTask
virtual Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.LoadAsync(System.Collections.Generic.IAsyncEnumerable<TDestination>! items) -> System.Threading.Tasks.Task!
virtual Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.LoadAsync(System.Collections.Generic.IAsyncEnumerable<TDestination>! items, System.IProgress<TProgress>! progress) -> System.Threading.Tasks.Task!
virtual Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.LoadAsync(System.Collections.Generic.IAsyncEnumerable<TDestination>! items, System.IProgress<TProgress>! progress, System.Threading.CancellationToken token) -> System.Threading.Tasks.Task!
virtual Wolfgang.Etl.Abstractions.LoaderBase<TDestination, TProgress>.LoadAsync(System.Collections.Generic.IAsyncEnumerable<TDestination>! items, System.Threading.CancellationToken token) -> System.Threading.Tasks.Task!
virtual Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.CreateProgressTimer(System.IProgress<TProgress>! progress) -> Wolfgang.Etl.Abstractions.IProgressTimer!
virtual Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.Dispose(bool disposing) -> void
virtual Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.DisposeAsync() -> System.Threading.Tasks.ValueTask
virtual Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.TransformAsync(System.Collections.Generic.IAsyncEnumerable<TSource>! items) -> System.Collections.Generic.IAsyncEnumerable<TDestination>!
virtual Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.TransformAsync(System.Collections.Generic.IAsyncEnumerable<TSource>! items, System.IProgress<TProgress>! progress) -> System.Collections.Generic.IAsyncEnumerable<TDestination>!
virtual Wolfgang.Etl.Abstractions.TransformerBase<TSource, TDestination, TProgress>.TransformAsync(System.Collections.Generic.IAsyncEnumerable<TSource>! items, System.IProgress<TProgress>! progress, System.Threading.CancellationToken token) -> System.Collections.Generic.IAsyncEnumerable<TDestination>!
Expand Down
54 changes: 53 additions & 1 deletion src/Wolfgang.Etl.Abstractions/TransformerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;



Expand All @@ -18,7 +19,9 @@ namespace Wolfgang.Etl.Abstractions;
/// <typeparam name="TDestination">The type of the destination object</typeparam>
/// <typeparam name="TProgress">The type of the progress object</typeparam>
public abstract class TransformerBase<TSource, TDestination, TProgress>
: ITransformWithProgressAndCancellationAsync<TSource, TDestination, TProgress>
: ITransformWithProgressAndCancellationAsync<TSource, TDestination, TProgress>,
IAsyncDisposable,
IDisposable
where TSource : notnull
where TDestination : notnull
where TProgress : notnull
Expand All @@ -27,6 +30,7 @@ public abstract class TransformerBase<TSource, TDestination, TProgress>
private int _currentSkippedItemCount;
private long _startTimestamp;
private DateTimeOffset _startedAtUtc;
private bool _disposed;



Expand Down Expand Up @@ -425,4 +429,52 @@ private void EnsureStarted()
_startedAtUtc = now;
}
}



/// <summary>
/// Asynchronously releases the resources held by this transformer. The base implementation is a
/// no-op (the base owns no unmanaged resources); derived classes that hold resources override
/// <see cref="Dispose(bool)"/> to release them. Enables <c>await using</c> on any transformer.
/// </summary>
/// <returns>A completed <see cref="ValueTask"/> for the default no-op implementation.</returns>
public virtual ValueTask DisposeAsync()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
return default;
}



/// <summary>
/// Releases the resources held by this transformer. The base implementation is a no-op; derived
/// classes that hold resources override <see cref="Dispose(bool)"/>.
/// </summary>
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}



/// <summary>
/// Releases resources held by this transformer. Override in a derived class to dispose resources
/// it owns, then call <c>base.Dispose(disposing)</c>. The base implementation only marks the
/// instance disposed and is idempotent.
/// </summary>
/// <param name="disposing">
/// <see langword="true"/> when called from <see cref="Dispose()"/> or <see cref="DisposeAsync"/>
/// (dispose managed resources); <see langword="false"/> when called from a finalizer.
/// </param>
protected virtual void Dispose(bool disposing)
{
if (_disposed)
{
return;
}

_disposed = true;
}
}
Loading
Loading