-
Notifications
You must be signed in to change notification settings - Fork 849
[MEDI] Pipeline #6993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
[MEDI] Pipeline #6993
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
111f823
move code as is
adamsitnik e7e1eb4
fix the compile errors and warnings
adamsitnik dff80b6
Merge remote-tracking branch 'upstream/main' into pipelines
adamsitnik f2042c7
move writer tests to dedicated folder
adamsitnik 6e87cf6
refactor the tests so they don't require any files to be included in …
adamsitnik 0746ade
change the error handling
adamsitnik 028e755
fixes
adamsitnik b41733c
address code review feedback
adamsitnik cc8bd46
Merge remote-tracking branch 'upstream/main' into pipelines
adamsitnik a70b6e1
fix the tests after recent merge
adamsitnik 23535e3
address code review feedback
adamsitnik 3a07b68
introduce IngestionResult and use it represent the result of ingestin…
adamsitnik File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
35 changes: 35 additions & 0 deletions
35
src/Libraries/Microsoft.Extensions.DataIngestion/DiagnosticsConstants.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| // Licensed to the .NET Foundation under one or more agreements. | ||
| // The .NET Foundation licenses this file to you under the MIT license. | ||
|
|
||
| namespace Microsoft.Extensions.DataIngestion; | ||
|
|
||
| internal static class DiagnosticsConstants | ||
| { | ||
| internal const string ActivitySourceName = "Experimental.Microsoft.Extensions.DataIngestion"; | ||
| internal const string ErrorTypeTagName = "error.type"; | ||
|
|
||
| internal static class ProcessDirectory | ||
| { | ||
| internal const string ActivityName = "ProcessDirectory"; | ||
| internal const string DirectoryPathTagName = "rag.directory.path"; | ||
| internal const string SearchPatternTagName = "rag.directory.search.pattern"; | ||
| internal const string SearchOptionTagName = "rag.directory.search.option"; | ||
| } | ||
|
|
||
| internal static class ProcessFiles | ||
| { | ||
| internal const string ActivityName = "ProcessFiles"; | ||
| internal const string FileCountTagName = "rag.file.count"; | ||
| } | ||
|
|
||
| internal static class ProcessSource | ||
| { | ||
| internal const string DocumentIdTagName = "rag.document.id"; | ||
| } | ||
|
|
||
| internal static class ProcessFile | ||
| { | ||
| internal const string ActivityName = "ProcessFile"; | ||
| internal const string FilePathTagName = "rag.file.path"; | ||
| } | ||
| } |
192 changes: 192 additions & 0 deletions
192
src/Libraries/Microsoft.Extensions.DataIngestion/IngestionPipeline.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,192 @@ | ||
| // Licensed to the .NET Foundation under one or more agreements. | ||
| // The .NET Foundation licenses this file to you under the MIT license. | ||
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.IO; | ||
| using System.Runtime.CompilerServices; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Microsoft.Extensions.Logging; | ||
| using Microsoft.Shared.Diagnostics; | ||
| using static Microsoft.Extensions.DataIngestion.DiagnosticsConstants; | ||
|
|
||
| namespace Microsoft.Extensions.DataIngestion; | ||
|
|
||
| #pragma warning disable IDE0058 // Expression value is never used | ||
| #pragma warning disable IDE0063 // Use simple 'using' statement | ||
| #pragma warning disable CA1031 // Do not catch general exception types | ||
|
|
||
| /// <summary> | ||
| /// Represents a pipeline for ingesting data from documents and processing it into chunks. | ||
| /// </summary> | ||
| /// <typeparam name="T">The type of the chunk content.</typeparam> | ||
| public sealed class IngestionPipeline<T> : IDisposable | ||
| { | ||
| private readonly IngestionDocumentReader _reader; | ||
| private readonly IngestionChunker<T> _chunker; | ||
| private readonly IngestionChunkWriter<T> _writer; | ||
| private readonly ActivitySource _activitySource; | ||
| private readonly ILogger? _logger; | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of the <see cref="IngestionPipeline{T}"/> class. | ||
| /// </summary> | ||
| /// <param name="reader">The reader for ingestion documents.</param> | ||
| /// <param name="chunker">The chunker to split documents into chunks.</param> | ||
| /// <param name="writer">The writer for processing chunks.</param> | ||
| /// <param name="options">The options for the ingestion pipeline.</param> | ||
| /// <param name="loggerFactory">The logger factory for creating loggers.</param> | ||
| public IngestionPipeline( | ||
| IngestionDocumentReader reader, | ||
| IngestionChunker<T> chunker, | ||
| IngestionChunkWriter<T> writer, | ||
| IngestionPipelineOptions? options = default, | ||
| ILoggerFactory? loggerFactory = default) | ||
| { | ||
| _reader = Throw.IfNull(reader); | ||
| _chunker = Throw.IfNull(chunker); | ||
| _writer = Throw.IfNull(writer); | ||
| _activitySource = new((options ?? new()).ActivitySourceName); | ||
| _logger = loggerFactory?.CreateLogger<IngestionPipeline<T>>(); | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public void Dispose() | ||
| { | ||
| _writer.Dispose(); | ||
| _activitySource.Dispose(); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the document processors in the pipeline. | ||
| /// </summary> | ||
| public IList<IngestionDocumentProcessor> DocumentProcessors { get; } = []; | ||
|
|
||
| /// <summary> | ||
| /// Gets the chunk processors in the pipeline. | ||
| /// </summary> | ||
| public IList<IngestionChunkProcessor<T>> ChunkProcessors { get; } = []; | ||
|
|
||
| /// <summary> | ||
| /// Processes all files in the specified directory that match the given search pattern and option. | ||
| /// </summary> | ||
| /// <param name="directory">The directory to process.</param> | ||
| /// <param name="searchPattern">The search pattern for file selection.</param> | ||
| /// <param name="searchOption">The search option for directory traversal.</param> | ||
| /// <param name="cancellationToken">The cancellation token for the operation.</param> | ||
| /// <returns>A task representing the asynchronous operation.</returns> | ||
| public async IAsyncEnumerable<IngestionResult> ProcessAsync(DirectoryInfo directory, string searchPattern = "*.*", | ||
| SearchOption searchOption = SearchOption.TopDirectoryOnly, [EnumeratorCancellation] CancellationToken cancellationToken = default) | ||
| { | ||
| Throw.IfNull(directory); | ||
| Throw.IfNullOrEmpty(searchPattern); | ||
| Throw.IfOutOfRange((int)searchOption, (int)SearchOption.TopDirectoryOnly, (int)SearchOption.AllDirectories); | ||
|
|
||
| using (Activity? rootActivity = _activitySource.StartActivity(ProcessDirectory.ActivityName)) | ||
| { | ||
| rootActivity?.SetTag(ProcessDirectory.DirectoryPathTagName, directory.FullName) | ||
| .SetTag(ProcessDirectory.SearchPatternTagName, searchPattern) | ||
| .SetTag(ProcessDirectory.SearchOptionTagName, searchOption.ToString()); | ||
| _logger?.ProcessingDirectory(directory.FullName, searchPattern, searchOption); | ||
|
|
||
| await foreach (var ingestionResult in ProcessAsync(directory.EnumerateFiles(searchPattern, searchOption), rootActivity, cancellationToken).ConfigureAwait(false)) | ||
| { | ||
| yield return ingestionResult; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Processes the specified files. | ||
| /// </summary> | ||
| /// <param name="files">The collection of files to process.</param> | ||
| /// <param name="cancellationToken">The cancellation token for the operation.</param> | ||
| /// <returns>A task representing the asynchronous operation.</returns> | ||
| public async IAsyncEnumerable<IngestionResult> ProcessAsync(IEnumerable<FileInfo> files, [EnumeratorCancellation] CancellationToken cancellationToken = default) | ||
| { | ||
| Throw.IfNull(files); | ||
|
|
||
| using (Activity? rootActivity = _activitySource.StartActivity(ProcessFiles.ActivityName)) | ||
| { | ||
| await foreach (var ingestionResult in ProcessAsync(files, rootActivity, cancellationToken).ConfigureAwait(false)) | ||
| { | ||
| yield return ingestionResult; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private static string GetShortName(object any) => any.GetType().Name; | ||
|
|
||
| private static void TraceException(Activity? activity, Exception ex) | ||
| { | ||
| activity?.SetTag(ErrorTypeTagName, ex.GetType().FullName) | ||
| .SetStatus(ActivityStatusCode.Error, ex.Message); | ||
| } | ||
|
|
||
| private async IAsyncEnumerable<IngestionResult> ProcessAsync(IEnumerable<FileInfo> files, Activity? rootActivity, | ||
| [EnumeratorCancellation] CancellationToken cancellationToken) | ||
| { | ||
| #if NET | ||
| if (System.Linq.Enumerable.TryGetNonEnumeratedCount(files, out int count)) | ||
| #else | ||
| if (files is IReadOnlyCollection<FileInfo> { Count: int count }) | ||
| #endif | ||
| { | ||
| rootActivity?.SetTag(ProcessFiles.FileCountTagName, count); | ||
| _logger?.LogFileCount(count); | ||
| } | ||
|
|
||
| foreach (FileInfo fileInfo in files) | ||
| { | ||
| using (Activity? processFileActivity = _activitySource.StartActivity(ProcessFile.ActivityName, ActivityKind.Internal, parentContext: rootActivity?.Context ?? default)) | ||
| { | ||
| processFileActivity?.SetTag(ProcessFile.FilePathTagName, fileInfo.FullName); | ||
| _logger?.ReadingFile(fileInfo.FullName, GetShortName(_reader)); | ||
|
|
||
| IngestionDocument? document = null; | ||
| Exception? failure = null; | ||
| try | ||
| { | ||
| document = await _reader.ReadAsync(fileInfo, cancellationToken).ConfigureAwait(false); | ||
|
|
||
| processFileActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier); | ||
| _logger?.ReadDocument(document.Identifier); | ||
|
|
||
| await IngestAsync(document, processFileActivity, cancellationToken).ConfigureAwait(false); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| TraceException(processFileActivity, ex); | ||
| _logger?.IngestingFailed(ex, document?.Identifier ?? fileInfo.FullName); | ||
|
|
||
| failure = ex; | ||
| } | ||
|
|
||
| yield return new IngestionResult(fileInfo, document, failure); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private async Task IngestAsync(IngestionDocument document, Activity? parentActivity, CancellationToken cancellationToken) | ||
| { | ||
| foreach (IngestionDocumentProcessor processor in DocumentProcessors) | ||
| { | ||
| document = await processor.ProcessAsync(document, cancellationToken).ConfigureAwait(false); | ||
|
|
||
| // A DocumentProcessor might change the document identifier (for example by extracting it from its content), so update the ID tag. | ||
| parentActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier); | ||
| } | ||
|
|
||
| IAsyncEnumerable<IngestionChunk<T>> chunks = _chunker.ProcessAsync(document, cancellationToken); | ||
| foreach (var processor in ChunkProcessors) | ||
| { | ||
| chunks = processor.ProcessAsync(chunks, cancellationToken); | ||
| } | ||
|
|
||
| _logger?.WritingChunks(GetShortName(_writer)); | ||
| await _writer.WriteAsync(chunks, cancellationToken).ConfigureAwait(false); | ||
| _logger?.WroteChunks(document.Identifier); | ||
| } | ||
| } | ||
30 changes: 30 additions & 0 deletions
30
src/Libraries/Microsoft.Extensions.DataIngestion/IngestionPipelineOptions.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| // Licensed to the .NET Foundation under one or more agreements. | ||
| // The .NET Foundation licenses this file to you under the MIT license. | ||
|
|
||
| using System.Diagnostics; | ||
| using Microsoft.Shared.Diagnostics; | ||
|
|
||
| namespace Microsoft.Extensions.DataIngestion; | ||
|
|
||
| #pragma warning disable SA1500 // Braces for multi-line statements should not share line | ||
| #pragma warning disable SA1513 // Closing brace should be followed by blank line | ||
|
|
||
| /// <summary> | ||
| /// Options for configuring the ingestion pipeline. | ||
| /// </summary> | ||
| public sealed class IngestionPipelineOptions | ||
| { | ||
| /// <summary> | ||
| /// Gets or sets the name of the <see cref="ActivitySource"/> used for diagnostics. | ||
| /// </summary> | ||
| public string ActivitySourceName | ||
| { | ||
| get; | ||
| set => field = Throw.IfNullOrEmpty(value); | ||
| } = DiagnosticsConstants.ActivitySourceName; | ||
|
|
||
| internal IngestionPipelineOptions Clone() => new() | ||
| { | ||
| ActivitySourceName = ActivitySourceName, | ||
| }; | ||
| } |
41 changes: 41 additions & 0 deletions
41
src/Libraries/Microsoft.Extensions.DataIngestion/IngestionResult.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| // Licensed to the .NET Foundation under one or more agreements. | ||
| // The .NET Foundation licenses this file to you under the MIT license. | ||
|
|
||
| using System; | ||
| using System.IO; | ||
| using Microsoft.Shared.Diagnostics; | ||
|
|
||
| namespace Microsoft.Extensions.DataIngestion; | ||
|
|
||
| /// <summary> | ||
| /// Represents the result of an ingestion operation. | ||
| /// </summary> | ||
| public sealed class IngestionResult | ||
| { | ||
| /// <summary> | ||
| /// Gets the source file that was ingested. | ||
| /// </summary> | ||
| public FileInfo Source { get; } | ||
|
|
||
| /// <summary> | ||
| /// Gets the ingestion document created from the source file, if reading the document has succeeded. | ||
| /// </summary> | ||
| public IngestionDocument? Document { get; } | ||
|
|
||
| /// <summary> | ||
| /// Gets the exception that occurred during ingestion, if any. | ||
| /// </summary> | ||
| public Exception? Exception { get; } | ||
|
|
||
| /// <summary> | ||
| /// Gets a value indicating whether the ingestion succeeded. | ||
| /// </summary> | ||
| public bool Succeeded => Exception is null; | ||
adamsitnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| internal IngestionResult(FileInfo source, IngestionDocument? document, Exception? exception) | ||
| { | ||
| Source = Throw.IfNull(source); | ||
| Document = document; | ||
| Exception = exception; | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| // Licensed to the .NET Foundation under one or more agreements. | ||
| // The .NET Foundation licenses this file to you under the MIT license. | ||
|
|
||
| using System; | ||
| using System.IO; | ||
| using Microsoft.Extensions.Logging; | ||
|
|
||
| #pragma warning disable S109 // Magic numbers should not be used | ||
|
|
||
| namespace Microsoft.Extensions.DataIngestion | ||
| { | ||
| internal static partial class Log | ||
| { | ||
| [LoggerMessage(0, LogLevel.Information, "Starting to process files in directory '{directory}' with search pattern '{searchPattern}' and search option '{searchOption}'.")] | ||
| internal static partial void ProcessingDirectory(this ILogger logger, string directory, string searchPattern, System.IO.SearchOption searchOption); | ||
|
|
||
| [LoggerMessage(1, LogLevel.Information, "Processing {fileCount} files.")] | ||
| internal static partial void LogFileCount(this ILogger logger, int fileCount); | ||
|
|
||
| [LoggerMessage(2, LogLevel.Information, "Reading file '{filePath}' using '{reader}'.")] | ||
| internal static partial void ReadingFile(this ILogger logger, string filePath, string reader); | ||
|
|
||
| [LoggerMessage(3, LogLevel.Information, "Read document '{documentId}'.")] | ||
| internal static partial void ReadDocument(this ILogger logger, string documentId); | ||
|
|
||
| [LoggerMessage(4, LogLevel.Information, "Writing chunks using {writer}.")] | ||
| internal static partial void WritingChunks(this ILogger logger, string writer); | ||
|
|
||
| [LoggerMessage(5, LogLevel.Information, "Wrote chunks for document '{documentId}'.")] | ||
| internal static partial void WroteChunks(this ILogger logger, string documentId); | ||
|
|
||
| [LoggerMessage(6, LogLevel.Error, "An error occurred while ingesting document '{identifier}'.")] | ||
| internal static partial void IngestingFailed(this ILogger logger, Exception exception, string identifier); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.