-
Notifications
You must be signed in to change notification settings - Fork 14
OpenAI: investigation for StreamingClientResult<T> #39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,8 @@ | ||
| using System.ClientModel; | ||
| using System; | ||
| using System.ClientModel; | ||
| using System.ClientModel.Primitives; | ||
| using System.Threading; | ||
| using System.Collections.Generic; | ||
| using System; | ||
| using System.Threading; | ||
|
|
||
| namespace OpenAI; | ||
|
|
||
|
|
@@ -12,84 +11,28 @@ namespace OpenAI; | |
| /// is still being received. | ||
| /// </summary> | ||
| /// <typeparam name="T"> The data type representative of distinct, streamable items. </typeparam> | ||
| public class StreamingClientResult<T> | ||
| : IDisposable | ||
| , IAsyncEnumerable<T> | ||
| internal class StreamingEventResult<T> : StreamingClientResult<T> | ||
| { | ||
| private ClientResult _rawResult { get; } | ||
| private IAsyncEnumerable<T> _asyncEnumerableSource { get; } | ||
| private bool _disposedValue { get; set; } | ||
|
|
||
| private StreamingClientResult() { } | ||
|
|
||
| private StreamingClientResult( | ||
| ClientResult rawResult, | ||
| Func<ClientResult, IAsyncEnumerable<T>> asyncEnumerableProcessor) | ||
|
|
||
| private StreamingEventResult(PipelineResponse response, | ||
| Func<PipelineResponse, IAsyncEnumerable<T>> asyncEnumerableProcessor) | ||
| : base(response) | ||
| { | ||
| _rawResult = rawResult; | ||
| _asyncEnumerableSource = asyncEnumerableProcessor.Invoke(rawResult); | ||
| _asyncEnumerableSource = asyncEnumerableProcessor.Invoke(response); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Creates a new instance of <see cref="StreamingClientResult{T}"/> using the provided underlying HTTP response. The | ||
| /// provided function will be used to resolve the response into an asynchronous enumeration of streamed response | ||
| /// items. | ||
| /// </summary> | ||
| /// <param name="result">The HTTP response.</param> | ||
| /// <param name="asyncEnumerableProcessor"> | ||
| /// The function that will resolve the provided response into an IAsyncEnumerable. | ||
| /// </param> | ||
| /// <returns> | ||
| /// A new instance of <see cref="StreamingClientResult{T}"/> that will be capable of asynchronous enumeration of | ||
| /// <typeparamref name="T"/> items from the HTTP response. | ||
| /// </returns> | ||
| internal static StreamingClientResult<T> CreateFromResponse( | ||
| ClientResult result, | ||
| Func<ClientResult, IAsyncEnumerable<T>> asyncEnumerableProcessor) | ||
| internal static StreamingEventResult<T> CreateFromResponse( | ||
| PipelineResponse response, | ||
| Func<PipelineResponse, IAsyncEnumerable<T>> asyncEnumerableProcessor) | ||
| { | ||
| return new(result, asyncEnumerableProcessor); | ||
| return new(response, asyncEnumerableProcessor); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the underlying <see cref="PipelineResponse"/> instance that this <see cref="StreamingClientResult{T}"/> may enumerate | ||
| /// over. | ||
| /// </summary> | ||
| /// <returns> The <see cref="PipelineResponse"/> instance attached to this <see cref="StreamingClientResult{T}"/>. </returns> | ||
| public PipelineResponse GetRawResponse() => _rawResult.GetRawResponse(); | ||
|
|
||
| /// <summary> | ||
| /// Gets the asynchronously enumerable collection of distinct, streamable items in the response. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// <para> The return value of this method may be used with the "await foreach" statement. </para> | ||
| /// <para> | ||
| /// As <see cref="StreamingClientResult{T}"/> explicitly implements <see cref="IAsyncEnumerable{T}"/>, callers may | ||
| /// enumerate a <see cref="StreamingClientResult{T}"/> instance directly instead of calling this method. | ||
| /// </para> | ||
| /// </remarks> | ||
| /// <returns></returns> | ||
| public IAsyncEnumerable<T> EnumerateValues() => this; | ||
|
|
||
| /// <inheritdoc/> | ||
| public void Dispose() | ||
| { | ||
| Dispose(disposing: true); | ||
| GC.SuppressFinalize(this); | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| protected virtual void Dispose(bool disposing) | ||
| { | ||
| if (!_disposedValue) | ||
| { | ||
| if (disposing) | ||
| { | ||
| _rawResult?.GetRawResponse()?.Dispose(); | ||
| } | ||
| _disposedValue = true; | ||
| } | ||
| } | ||
| // TODO: Handle disposal via Enumerator? Validate that this will work. | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an open question we'll still need to resolve - whether or not to make StreamingClientResult implement IDisposable.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the earlier-era Toub discussions, I think we'll end up needing |
||
| // If it doesn't, we likely need to implement IDisposable or IAsyncDisposable | ||
| // on StreamingClientResult<T>. | ||
|
|
||
| IAsyncEnumerator<T> IAsyncEnumerable<T>.GetAsyncEnumerator(CancellationToken cancellationToken) | ||
| public override IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) | ||
| => _asyncEnumerableSource.GetAsyncEnumerator(cancellationToken); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type becomes internal.