-
Notifications
You must be signed in to change notification settings - Fork 5.1k
ClientModel: Add SSE result collection #43821
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
54f822b
d239e17
115295a
3b12f1f
dbf18ad
a798fd9
72951b9
63e1351
f604598
d3bd6b8
3fca028
4ea146d
262c78d
984499d
f26a89b
647f1a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| using System.ClientModel.Internal; | ||
| using System.ClientModel.Primitives; | ||
| using System.Collections.Generic; | ||
| using System.Threading; | ||
|
|
||
| namespace System.ClientModel; | ||
|
|
||
| #pragma warning disable CS1591 // public XML comments | ||
| public abstract class AsyncClientResultCollection<T> : ClientResult, IAsyncEnumerable<T> | ||
| { | ||
| protected internal AsyncClientResultCollection(PipelineResponse response) : base(response) | ||
| { | ||
| } | ||
|
|
||
| public abstract IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default); | ||
|
|
||
| // TODO: take CancellationToken? | ||
| //public static ClientResultCollection<T> Create<TValue>(PipelineResponse response) where TValue : IJsonModel<T> | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: could we accomplish this method signature? |
||
| public static AsyncClientResultCollection<TValue> Create<TValue>(PipelineResponse response) where TValue : IJsonModel<TValue> | ||
| { | ||
| return StreamingClientResult<TValue>.Create<TValue>(response); | ||
| } | ||
| } | ||
| #pragma warning restore CS1591 // public XML comments | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,18 +11,16 @@ namespace System.ClientModel; | |
| /// </summary> | ||
| public class ClientResult | ||
| { | ||
| private readonly PipelineResponse _response; | ||
| private PipelineResponse? _response; | ||
|
|
||
| /// <summary> | ||
| /// Create a new instance of <see cref="ClientResult"/> from a service | ||
| /// response. | ||
| /// </summary> | ||
| /// <param name="response">The <see cref="PipelineResponse"/> received | ||
| /// from the service.</param> | ||
| protected ClientResult(PipelineResponse response) | ||
| protected ClientResult(PipelineResponse? response) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We would only include this change if we have good reason to believe we'll want it for pageable/operation types in the future. |
||
| { | ||
| Argument.AssertNotNull(response, nameof(response)); | ||
|
|
||
| _response = response; | ||
| } | ||
|
|
||
|
|
@@ -31,7 +29,37 @@ protected ClientResult(PipelineResponse response) | |
| /// </summary> | ||
| /// <returns>the <see cref="PipelineResponse"/> received from the service. | ||
| /// </returns> | ||
| public PipelineResponse GetRawResponse() => _response; | ||
| /// <exception cref="InvalidOperationException">No | ||
| /// <see cref="PipelineResponse"/> value is currently available for this | ||
| /// <see cref="ClientResult"/> instance. This can happen when the instance | ||
| /// is a collection type like <see cref="AsyncClientResultCollection{T}"/> | ||
| /// that has not yet been enumerated.</exception> | ||
| public PipelineResponse GetRawResponse() | ||
| { | ||
| if (_response is null) | ||
| { | ||
| throw new InvalidOperationException("No response is associated " + | ||
| "with this result. If the result is a collection result " + | ||
| "type, this may be because no request has been sent to the " + | ||
| "server yet."); | ||
| } | ||
|
|
||
| return _response; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Update the value returned from <see cref="GetRawResponse"/>. | ||
| /// </summary> | ||
| /// <remarks>This method may be called from types derived from | ||
| /// <see cref="ClientResult"/> that poll the service for status updates | ||
| /// or to retrieve additional collection values to update the raw response | ||
| /// to the response most recently returned from the service.</remarks> | ||
| /// <param name="response">The <see cref="PipelineResponse"/> to return | ||
| /// from <see cref="GetRawResponse"/>.</param> | ||
| protected void SetRawResponse(PipelineResponse response) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: Assert not null per nullability annotation |
||
| { | ||
| _response = response; | ||
| } | ||
|
|
||
| #region Factory methods for ClientResult and subtypes | ||
|
|
||
|
|
@@ -44,7 +72,11 @@ protected ClientResult(PipelineResponse response) | |
| /// provided <paramref name="response"/>. | ||
| /// </returns> | ||
| public static ClientResult FromResponse(PipelineResponse response) | ||
| => new ClientResult(response); | ||
| { | ||
| Argument.AssertNotNull(response, nameof(response)); | ||
|
|
||
| return new ClientResult(response); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Creates a new instance of <see cref="ClientResult{T}"/> that holds the | ||
|
|
@@ -60,6 +92,8 @@ public static ClientResult FromResponse(PipelineResponse response) | |
| /// </returns> | ||
| public static ClientResult<T> FromValue<T>(T value, PipelineResponse response) | ||
| { | ||
| Argument.AssertNotNull(response, nameof(response)); | ||
|
|
||
| if (value is null) | ||
| { | ||
| string message = "ClientResult<T> contract guarantees that ClientResult<T>.Value is non-null. " + | ||
|
|
@@ -90,7 +124,11 @@ public static ClientResult<T> FromValue<T>(T value, PipelineResponse response) | |
| /// provided <paramref name="value"/> and <paramref name="response"/>. | ||
| /// </returns> | ||
| public static ClientResult<T?> FromOptionalValue<T>(T? value, PipelineResponse response) | ||
| => new ClientResult<T?>(value, response); | ||
| { | ||
| Argument.AssertNotNull(response, nameof(response)); | ||
|
|
||
| return new ClientResult<T?>(value, response); | ||
| } | ||
|
|
||
| #endregion | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| using System.ClientModel.Internal; | ||
| using System.ClientModel.Primitives; | ||
| using System.Collections; | ||
| using System.Collections.Generic; | ||
|
|
||
| namespace System.ClientModel; | ||
|
|
||
| // TODO: Re-enable sync version | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: sync result collection |
||
|
|
||
| //#pragma warning disable CS1591 // public XML comments | ||
| //public abstract class ClientResultCollection<T> : ClientResult, IEnumerable<T> | ||
| //{ | ||
| // protected internal ClientResultCollection(PipelineResponse response) : base(response) | ||
| // { | ||
| // } | ||
|
|
||
| // public abstract IEnumerator<T> GetEnumerator(); | ||
|
|
||
| // IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); | ||
|
|
||
| // // TODO: take CancellationToken? | ||
| // //public static ClientResultCollection<T> Create<TValue>(PipelineResponse response) where TValue : IJsonModel<T> | ||
| // public static ClientResultCollection<TValue> Create<TValue>(PipelineResponse response) where TValue : IJsonModel<TValue> | ||
| // { | ||
| // return StreamingClientResult<TValue>.Create<TValue>(response); | ||
| // } | ||
| //} | ||
| //#pragma warning restore CS1591 // public XML comments | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| using System.Collections.Generic; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
|
|
||
| namespace System.ClientModel.Internal; | ||
|
|
||
| internal sealed class AsyncServerSentEventEnumerator : IAsyncEnumerator<ServerSentEvent>, IDisposable, IAsyncDisposable | ||
| { | ||
| // TODO: make this configurable per coming from TypeSpec | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: parameterize terminal event |
||
| private static readonly ReadOnlyMemory<char> _doneToken = "[DONE]".AsMemory(); | ||
|
|
||
| private readonly ServerSentEventReader _reader; | ||
| private readonly CancellationToken _cancellationToken; | ||
| private bool _disposedValue; | ||
|
|
||
| public ServerSentEvent Current { get; private set; } | ||
|
|
||
| public AsyncServerSentEventEnumerator(ServerSentEventReader reader, CancellationToken cancellationToken = default) | ||
| { | ||
| _reader = reader; | ||
| _cancellationToken = cancellationToken; | ||
| } | ||
|
|
||
| public async ValueTask<bool> MoveNextAsync() | ||
| { | ||
| ServerSentEvent? nextEvent = await _reader.TryGetNextEventAsync(_cancellationToken).ConfigureAwait(false); | ||
| if (nextEvent.HasValue) | ||
| { | ||
| if (nextEvent.Value.Data.Span.SequenceEqual(_doneToken.Span)) | ||
| { | ||
| return false; | ||
| } | ||
| Current = nextEvent.Value; | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| private void Dispose(bool disposing) | ||
| { | ||
| if (!_disposedValue) | ||
| { | ||
| if (disposing) | ||
| { | ||
| _reader.Dispose(); | ||
| } | ||
|
|
||
| _disposedValue = true; | ||
| } | ||
| } | ||
|
|
||
| public void Dispose() | ||
| { | ||
| // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method | ||
| Dispose(disposing: true); | ||
| GC.SuppressFinalize(this); | ||
| } | ||
|
|
||
| public ValueTask DisposeAsync() | ||
| { | ||
| Dispose(); | ||
| return new ValueTask(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| using System.ClientModel.Primitives; | ||
| using System.Collections.Generic; | ||
| using System.Text.Json; | ||
| using System.Threading.Tasks; | ||
|
|
||
| namespace System.ClientModel.Internal; | ||
|
|
||
| internal class AsyncServerSentEventJsonDataEnumerator<T> : IAsyncEnumerator<T>, IDisposable, IAsyncDisposable | ||
| where T : IJsonModel<T> | ||
| { | ||
| private readonly AsyncServerSentEventEnumerator _eventEnumerator; | ||
|
|
||
| private T? _current; | ||
|
|
||
| // TODO: is null supression the correct pattern here? | ||
| public T Current { get => _current!; } | ||
|
|
||
| public AsyncServerSentEventJsonDataEnumerator(AsyncServerSentEventEnumerator eventEnumerator) | ||
| { | ||
| Argument.AssertNotNull(eventEnumerator, nameof(eventEnumerator)); | ||
|
|
||
| _eventEnumerator = eventEnumerator; | ||
| } | ||
|
|
||
| public async ValueTask<bool> MoveNextAsync() | ||
| { | ||
| if (await _eventEnumerator.MoveNextAsync().ConfigureAwait(false)) | ||
| { | ||
| using JsonDocument eventDocument = JsonDocument.Parse(_eventEnumerator.Current.Data); | ||
| BinaryData eventData = BinaryData.FromObjectAsJson(eventDocument.RootElement); | ||
| T? jsonData = ModelReaderWriter.Read<T>(eventData); | ||
|
|
||
| // TODO: should we stop iterating if we can't deserialize? | ||
| if (jsonData is null) | ||
| { | ||
| _current = default; | ||
| return false; | ||
| } | ||
|
|
||
| if (jsonData is T singleInstanceData) | ||
| { | ||
| _current = singleInstanceData; | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| public async ValueTask DisposeAsync() | ||
| { | ||
| await _eventEnumerator.DisposeAsync().ConfigureAwait(false); | ||
| } | ||
|
|
||
| public void Dispose() | ||
| { | ||
| _eventEnumerator.Dispose(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Take cancellation token in factory method?