Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace Microsoft.Extensions.DataIngestion;

internal static class DiagnosticsConstants
{
internal const string ActivitySourceName = "Experimental.Microsoft.Extensions.DataIngestion";
internal const string ErrorTypeTagName = "error.type";

internal static class ProcessDirectory
{
internal const string ActivityName = "ProcessDirectory";
internal const string DirectoryPathTagName = "di.directory.path";
internal const string SearchPatternTagName = "di.directory.search.pattern";
internal const string SearchOptionTagName = "di.directory.search.option";
}

internal static class ProcessFiles
{
internal const string ActivityName = "ProcessFiles";
internal const string FileCountTagName = "di.file.count";
}

internal static class ProcessSource
{
internal const string DocumentIdTagName = "di.document.id";
}

internal static class ProcessFile
{
internal const string ActivityName = "ProcessFile";
internal const string FilePathTagName = "di.file.path";
}
}
218 changes: 218 additions & 0 deletions src/Libraries/Microsoft.Extensions.DataIngestion/IngestionPipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Shared.Diagnostics;
using static Microsoft.Extensions.DataIngestion.DiagnosticsConstants;

namespace Microsoft.Extensions.DataIngestion;

#pragma warning disable IDE0058 // Expression value is never used
#pragma warning disable IDE0063 // Use simple 'using' statement
#pragma warning disable CA1031 // Do not catch general exception types

/// <summary>
/// Represents a pipeline for ingesting data from documents and processing it into chunks.
/// </summary>
/// <typeparam name="T">The type of the chunk content.</typeparam>
public sealed class IngestionPipeline<T> : IDisposable
{
private readonly IngestionDocumentReader _reader;
private readonly IngestionChunker<T> _chunker;
private readonly IngestionChunkWriter<T> _writer;
private readonly IngestionPipelineOptions _options;
private readonly ActivitySource _activitySource;
private readonly ILogger? _logger;

/// <summary>
/// Initializes a new instance of the <see cref="IngestionPipeline{T}"/> class.
/// </summary>
/// <param name="reader">The reader for ingestion documents.</param>
/// <param name="chunker">The chunker to split documents into chunks.</param>
/// <param name="writer">The writer for processing chunks.</param>
/// <param name="options">The options for the ingestion pipeline.</param>
/// <param name="loggerFactory">The logger factory for creating loggers.</param>
public IngestionPipeline(
IngestionDocumentReader reader,
IngestionChunker<T> chunker,
IngestionChunkWriter<T> writer,
IngestionPipelineOptions? options = default,
ILoggerFactory? loggerFactory = default)
{
_reader = Throw.IfNull(reader);
_chunker = Throw.IfNull(chunker);
_writer = Throw.IfNull(writer);
_options = options?.Clone() ?? new();
_activitySource = new(_options.ActivitySourceName);
_logger = loggerFactory?.CreateLogger<IngestionPipeline<T>>();
}

/// <inheritdoc/>
public void Dispose()
{
_writer.Dispose();
_activitySource.Dispose();
}

/// <summary>
/// Gets the document processors in the pipeline.
/// </summary>
public IList<IngestionDocumentProcessor> DocumentProcessors { get; } = [];

/// <summary>
/// Gets the chunk processors in the pipeline.
/// </summary>
public IList<IngestionChunkProcessor<T>> ChunkProcessors { get; } = [];

/// <summary>
/// Processes all files in the specified directory that match the given search pattern and option.
/// </summary>
/// <param name="directory">The directory to process.</param>
/// <param name="searchPattern">The search pattern for file selection.</param>
/// <param name="searchOption">The search option for directory traversal.</param>
/// <param name="cancellationToken">The cancellation token for the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public async Task ProcessAsync(DirectoryInfo directory, string searchPattern = "*.*", SearchOption searchOption = SearchOption.TopDirectoryOnly, CancellationToken cancellationToken = default)
{
Throw.IfNull(directory);
Throw.IfNullOrEmpty(searchPattern);
Throw.IfOutOfRange((int)searchOption, (int)SearchOption.TopDirectoryOnly, (int)SearchOption.AllDirectories);

using (Activity? rootActivity = StartActivity(ProcessDirectory.ActivityName, ActivityKind.Internal))
{
rootActivity?.SetTag(ProcessDirectory.DirectoryPathTagName, directory.FullName);
rootActivity?.SetTag(ProcessDirectory.SearchPatternTagName, searchPattern);
rootActivity?.SetTag(ProcessDirectory.SearchOptionTagName, searchOption.ToString());
_logger?.ProcessingDirectory(directory.FullName, searchPattern, searchOption);

await ProcessAsync(directory.EnumerateFiles(searchPattern, searchOption), rootActivity, cancellationToken).ConfigureAwait(false);
}
}

/// <summary>
/// Processes the specified files.
/// </summary>
/// <param name="files">The collection of files to process.</param>
/// <param name="cancellationToken">The cancellation token for the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public async Task ProcessAsync(IEnumerable<FileInfo> files, CancellationToken cancellationToken = default)
{
Throw.IfNull(files);

using (Activity? rootActivity = StartActivity(ProcessFiles.ActivityName, ActivityKind.Internal))
{
await ProcessAsync(files, rootActivity, cancellationToken).ConfigureAwait(false);
}
}

private static string GetShortName(object any) => any.GetType().Name;

private static void TraceException(Activity? activity, Exception ex)
{
activity?.SetTag(ErrorTypeTagName, ex.GetType().FullName);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
}

private async Task ProcessAsync(IEnumerable<FileInfo> files, Activity? rootActivity, CancellationToken cancellationToken)
{
if (files is IReadOnlyList<FileInfo> materialized)
{
rootActivity?.SetTag(ProcessFiles.FileCountTagName, materialized.Count);
_logger?.LogFileCount(materialized.Count);
}

List<Exception>? exceptions = null;
int fileCount = 0;
foreach (FileInfo fileInfo in files)
{
fileCount++;
using (Activity? processFileActivity = StartActivity(ProcessFile.ActivityName, ActivityKind.Internal, parent: rootActivity))
{
processFileActivity?.SetTag(ProcessFile.FilePathTagName, fileInfo.FullName);
_logger?.ReadingFile(fileInfo.FullName, GetShortName(_reader));

IngestionDocument? document = null;
try
{
document = await _reader.ReadAsync(fileInfo, cancellationToken).ConfigureAwait(false);

processFileActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier);
_logger?.ReadDocument(document.Identifier);

await IngestAsync(document, processFileActivity, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
TraceException(processFileActivity, ex);
_logger?.IngestingFailed(ex, document?.Identifier ?? fileInfo.FullName);

exceptions ??= [];
exceptions.Add(ex);

if (exceptions.Count > _options.MaximumErrorsPerProcessing)
{
TraceException(rootActivity, ex);

if (exceptions.Count == 1)
{
throw;
}

throw new AggregateException(exceptions);
}

continue;
}
}
}

// When all ingestions throw, the method has to throw no matter how MaximumErrorsPerProcessing was configured.
if (exceptions is not null && exceptions.Count == fileCount)
{
TraceException(rootActivity, exceptions[exceptions.Count - 1]);

throw new AggregateException(exceptions);
}
}

private async Task IngestAsync(IngestionDocument document, Activity? parentActivity, CancellationToken cancellationToken)
{
foreach (IngestionDocumentProcessor processor in DocumentProcessors)
{
document = await processor.ProcessAsync(document, cancellationToken).ConfigureAwait(false);

// A DocumentProcessor might change the document identifier (for example by extracting it from its content), so update the ID tag.
parentActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier);
}

IAsyncEnumerable<IngestionChunk<T>> chunks = _chunker.ProcessAsync(document, cancellationToken);
foreach (var processor in ChunkProcessors)
{
chunks = processor.ProcessAsync(chunks, cancellationToken);
}

_logger?.WritingChunks(GetShortName(_writer));
await _writer.WriteAsync(chunks, cancellationToken).ConfigureAwait(false);
_logger?.WroteChunks(document.Identifier);
}

private Activity? StartActivity(string name, ActivityKind activityKind, Activity? parent = default)
{
if (!_activitySource.HasListeners())
{
return null;
}
else if (parent is null)
{
return _activitySource.StartActivity(name, activityKind);
}

return _activitySource.StartActivity(name, activityKind, parent.Context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Diagnostics;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Extensions.DataIngestion;

#pragma warning disable SA1500 // Braces for multi-line statements should not share line
#pragma warning disable SA1513 // Closing brace should be followed by blank line

/// <summary>
/// Options for configuring the ingestion pipeline.
/// </summary>
public sealed class IngestionPipelineOptions
{
/// <summary>
/// Gets or sets the name of the <see cref="ActivitySource"/> used for diagnostics.
/// </summary>
public string ActivitySourceName
{
get;
set => field = Throw.IfNullOrEmpty(value);
} = DiagnosticsConstants.ActivitySourceName;

/// <summary>
/// Gets or sets the maximum number of ingestions that are allowed to fail with an error.
/// </summary>
/// <value>
/// The maximum number of ingestions that are allowed to fail with an error.
/// The default value is 3.
/// </value>
/// <remarks>
/// <para>
/// When document processing fails with an exception, the <see cref="IngestionPipeline{T}"/>
/// continues to process files, optionally logging the exceptions.
/// </para>
/// <para>
/// However, in case document processing continues to produce exceptions, this property can be used to
/// limit the number of ingestions that are allowed to fail with an error. When the limit is reached, the exceptions will be
/// rethrown to the caller as <see cref="AggregateException"/>.
/// </para>
/// <para>
/// If the value is set to zero, all exceptions immediately terminate the processing.
/// </para>
/// <para>
/// When all ingestions fail, the processing will always terminate with an <see cref="AggregateException"/>.
/// </para>
/// </remarks>
public int MaximumErrorsPerProcessing
{
get;
set => field = Throw.IfLessThan(value, 0);
} = 3;

internal IngestionPipelineOptions Clone() => new()
{
ActivitySourceName = ActivitySourceName,
MaximumErrorsPerProcessing = MaximumErrorsPerProcessing
};
}
35 changes: 35 additions & 0 deletions src/Libraries/Microsoft.Extensions.DataIngestion/Log.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.IO;
using Microsoft.Extensions.Logging;

#pragma warning disable S109 // Magic numbers should not be used

namespace Microsoft.Extensions.DataIngestion
{
internal static partial class Log
{
[LoggerMessage(0, LogLevel.Information, "Starting to process files in directory '{directory}' with search pattern '{searchPattern}' and search option '{searchOption}'.")]
internal static partial void ProcessingDirectory(this ILogger logger, string directory, string searchPattern, System.IO.SearchOption searchOption);

[LoggerMessage(1, LogLevel.Information, "Processing {fileCount} files.")]
internal static partial void LogFileCount(this ILogger logger, int fileCount);

[LoggerMessage(2, LogLevel.Information, "Reading file '{filePath}' using '{reader}'.")]
internal static partial void ReadingFile(this ILogger logger, string filePath, string reader);

[LoggerMessage(3, LogLevel.Information, "Read document '{documentId}'.")]
internal static partial void ReadDocument(this ILogger logger, string documentId);

[LoggerMessage(4, LogLevel.Information, "Writing chunks using {writer}.")]
internal static partial void WritingChunks(this ILogger logger, string writer);

[LoggerMessage(5, LogLevel.Information, "Wrote chunks for document '{documentId}'.")]
internal static partial void WroteChunks(this ILogger logger, string documentId);

[LoggerMessage(6, LogLevel.Error, "An error occurred while ingesting document '{identifier}'.")]
internal static partial void IngestingFailed(this ILogger logger, Exception exception, string identifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
<TargetFrameworks>$(TargetFrameworks);netstandard2.0</TargetFrameworks>
<RootNamespace>Microsoft.Extensions.DataIngestion</RootNamespace>

<!-- Project reference can be removed -->
<NoWarn>$(NoWarn);RT0002</NoWarn>
<UseLoggingGenerator>true</UseLoggingGenerator>
<DisableMicrosoftExtensionsLoggingSourceGenerator>false</DisableMicrosoftExtensionsLoggingSourceGenerator>

<!-- we are not ready to publish yet -->
<IsPackable>false</IsPackable>
Expand All @@ -18,9 +18,14 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="System.Collections.Immutable" Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="Microsoft.Extensions.VectorData.Abstractions" />
<PackageReference Include="Microsoft.ML.Tokenizers" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
<PackageReference Include="System.Collections.Immutable" />
</ItemGroup>

</Project>
Loading
Loading