diff --git a/sdk/core/System.ClientModel/api/System.ClientModel.net6.0.cs b/sdk/core/System.ClientModel/api/System.ClientModel.net6.0.cs index 3edacd35c1d4..38f6b92d7e3a 100644 --- a/sdk/core/System.ClientModel/api/System.ClientModel.net6.0.cs +++ b/sdk/core/System.ClientModel/api/System.ClientModel.net6.0.cs @@ -7,6 +7,12 @@ public ApiKeyCredential(string key) { } public static implicit operator System.ClientModel.ApiKeyCredential (string key) { throw null; } public void Update(string key) { } } + public abstract partial class AsyncClientResultCollection : System.ClientModel.ClientResult, System.Collections.Generic.IAsyncEnumerable + { + protected internal AsyncClientResultCollection(System.ClientModel.Primitives.PipelineResponse response) : base (default(System.ClientModel.Primitives.PipelineResponse)) { } + public static System.ClientModel.AsyncClientResultCollection Create(System.ClientModel.Primitives.PipelineResponse response) where TValue : System.ClientModel.Primitives.IJsonModel { throw null; } + public abstract System.Collections.Generic.IAsyncEnumerator GetAsyncEnumerator(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); + } public abstract partial class BinaryContent : System.IDisposable { protected BinaryContent() { } @@ -20,11 +26,12 @@ protected BinaryContent() { } } public partial class ClientResult { - protected ClientResult(System.ClientModel.Primitives.PipelineResponse response) { } + protected ClientResult(System.ClientModel.Primitives.PipelineResponse? response) { } public static System.ClientModel.ClientResult FromOptionalValue(T? value, System.ClientModel.Primitives.PipelineResponse response) { throw null; } public static System.ClientModel.ClientResult FromResponse(System.ClientModel.Primitives.PipelineResponse response) { throw null; } public static System.ClientModel.ClientResult FromValue(T value, System.ClientModel.Primitives.PipelineResponse response) { throw null; } public System.ClientModel.Primitives.PipelineResponse GetRawResponse() { throw null; } + protected void SetRawResponse(System.ClientModel.Primitives.PipelineResponse response) { } } public partial class ClientResultException : System.Exception { diff --git a/sdk/core/System.ClientModel/api/System.ClientModel.netstandard2.0.cs b/sdk/core/System.ClientModel/api/System.ClientModel.netstandard2.0.cs index 2367ba7f518d..8b08908c8590 100644 --- a/sdk/core/System.ClientModel/api/System.ClientModel.netstandard2.0.cs +++ b/sdk/core/System.ClientModel/api/System.ClientModel.netstandard2.0.cs @@ -7,6 +7,12 @@ public ApiKeyCredential(string key) { } public static implicit operator System.ClientModel.ApiKeyCredential (string key) { throw null; } public void Update(string key) { } } + public abstract partial class AsyncClientResultCollection : System.ClientModel.ClientResult, System.Collections.Generic.IAsyncEnumerable + { + protected internal AsyncClientResultCollection(System.ClientModel.Primitives.PipelineResponse response) : base (default(System.ClientModel.Primitives.PipelineResponse)) { } + public static System.ClientModel.AsyncClientResultCollection Create(System.ClientModel.Primitives.PipelineResponse response) where TValue : System.ClientModel.Primitives.IJsonModel { throw null; } + public abstract System.Collections.Generic.IAsyncEnumerator GetAsyncEnumerator(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); + } public abstract partial class BinaryContent : System.IDisposable { protected BinaryContent() { } @@ -20,11 +26,12 @@ protected BinaryContent() { } } public partial class ClientResult { - protected ClientResult(System.ClientModel.Primitives.PipelineResponse response) { } + protected ClientResult(System.ClientModel.Primitives.PipelineResponse? response) { } public static System.ClientModel.ClientResult FromOptionalValue(T? value, System.ClientModel.Primitives.PipelineResponse response) { throw null; } public static System.ClientModel.ClientResult FromResponse(System.ClientModel.Primitives.PipelineResponse response) { throw null; } public static System.ClientModel.ClientResult FromValue(T value, System.ClientModel.Primitives.PipelineResponse response) { throw null; } public System.ClientModel.Primitives.PipelineResponse GetRawResponse() { throw null; } + protected void SetRawResponse(System.ClientModel.Primitives.PipelineResponse response) { } } public partial class ClientResultException : System.Exception { diff --git a/sdk/core/System.ClientModel/src/Convenience/AsyncClientResultCollectionOfT.cs b/sdk/core/System.ClientModel/src/Convenience/AsyncClientResultCollectionOfT.cs new file mode 100644 index 000000000000..53845f3b9898 --- /dev/null +++ b/sdk/core/System.ClientModel/src/Convenience/AsyncClientResultCollectionOfT.cs @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.ClientModel.Internal; +using System.ClientModel.Primitives; +using System.Collections.Generic; +using System.Threading; + +namespace System.ClientModel; + +#pragma warning disable CS1591 // public XML comments +public abstract class AsyncClientResultCollection : ClientResult, IAsyncEnumerable +{ + protected internal AsyncClientResultCollection(PipelineResponse response) : base(response) + { + } + + public abstract IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default); + + // TODO: take CancellationToken? + //public static ClientResultCollection Create(PipelineResponse response) where TValue : IJsonModel + public static AsyncClientResultCollection Create(PipelineResponse response) where TValue : IJsonModel + { + return StreamingClientResult.Create(response); + } +} +#pragma warning restore CS1591 // public XML comments diff --git a/sdk/core/System.ClientModel/src/Convenience/ClientResult.cs b/sdk/core/System.ClientModel/src/Convenience/ClientResult.cs index 5f60f040fa3f..4943f7e4ade9 100644 --- a/sdk/core/System.ClientModel/src/Convenience/ClientResult.cs +++ b/sdk/core/System.ClientModel/src/Convenience/ClientResult.cs @@ -11,7 +11,7 @@ namespace System.ClientModel; /// public class ClientResult { - private readonly PipelineResponse _response; + private PipelineResponse? _response; /// /// Create a new instance of from a service @@ -19,10 +19,8 @@ public class ClientResult /// /// The received /// from the service. - protected ClientResult(PipelineResponse response) + protected ClientResult(PipelineResponse? response) { - Argument.AssertNotNull(response, nameof(response)); - _response = response; } @@ -31,7 +29,37 @@ protected ClientResult(PipelineResponse response) /// /// the received from the service. /// - public PipelineResponse GetRawResponse() => _response; + /// No + /// value is currently available for this + /// instance. This can happen when the instance + /// is a collection type like + /// that has not yet been enumerated. + public PipelineResponse GetRawResponse() + { + if (_response is null) + { + throw new InvalidOperationException("No response is associated " + + "with this result. If the result is a collection result " + + "type, this may be because no request has been sent to the " + + "server yet."); + } + + return _response; + } + + /// + /// Update the value returned from . + /// + /// This method may be called from types derived from + /// that poll the service for status updates + /// or to retrieve additional collection values to update the raw response + /// to the response most recently returned from the service. + /// The to return + /// from . + protected void SetRawResponse(PipelineResponse response) + { + _response = response; + } #region Factory methods for ClientResult and subtypes @@ -44,7 +72,11 @@ protected ClientResult(PipelineResponse response) /// provided . /// public static ClientResult FromResponse(PipelineResponse response) - => new ClientResult(response); + { + Argument.AssertNotNull(response, nameof(response)); + + return new ClientResult(response); + } /// /// Creates a new instance of that holds the @@ -60,6 +92,8 @@ public static ClientResult FromResponse(PipelineResponse response) /// public static ClientResult FromValue(T value, PipelineResponse response) { + Argument.AssertNotNull(response, nameof(response)); + if (value is null) { string message = "ClientResult contract guarantees that ClientResult.Value is non-null. " + @@ -90,7 +124,11 @@ public static ClientResult FromValue(T value, PipelineResponse response) /// provided and . /// public static ClientResult FromOptionalValue(T? value, PipelineResponse response) - => new ClientResult(value, response); + { + Argument.AssertNotNull(response, nameof(response)); + + return new ClientResult(value, response); + } #endregion } diff --git a/sdk/core/System.ClientModel/src/Convenience/ClientResultCollectionOfT.cs b/sdk/core/System.ClientModel/src/Convenience/ClientResultCollectionOfT.cs new file mode 100644 index 000000000000..b416813377c9 --- /dev/null +++ b/sdk/core/System.ClientModel/src/Convenience/ClientResultCollectionOfT.cs @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.ClientModel.Internal; +using System.ClientModel.Primitives; +using System.Collections; +using System.Collections.Generic; + +namespace System.ClientModel; + +// TODO: Re-enable sync version + +//#pragma warning disable CS1591 // public XML comments +//public abstract class ClientResultCollection : ClientResult, IEnumerable +//{ +// protected internal ClientResultCollection(PipelineResponse response) : base(response) +// { +// } + +// public abstract IEnumerator GetEnumerator(); + +// IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + +// // TODO: take CancellationToken? +// //public static ClientResultCollection Create(PipelineResponse response) where TValue : IJsonModel +// public static ClientResultCollection Create(PipelineResponse response) where TValue : IJsonModel +// { +// return StreamingClientResult.Create(response); +// } +//} +//#pragma warning restore CS1591 // public XML comments diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/AsyncServerSentEventEnumerator.cs b/sdk/core/System.ClientModel/src/Internal/SSE/AsyncServerSentEventEnumerator.cs new file mode 100644 index 000000000000..d2011b876854 --- /dev/null +++ b/sdk/core/System.ClientModel/src/Internal/SSE/AsyncServerSentEventEnumerator.cs @@ -0,0 +1,67 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace System.ClientModel.Internal; + +internal sealed class AsyncServerSentEventEnumerator : IAsyncEnumerator, IDisposable, IAsyncDisposable +{ + // TODO: make this configurable per coming from TypeSpec + private static readonly ReadOnlyMemory _doneToken = "[DONE]".AsMemory(); + + private readonly ServerSentEventReader _reader; + private readonly CancellationToken _cancellationToken; + private bool _disposedValue; + + public ServerSentEvent Current { get; private set; } + + public AsyncServerSentEventEnumerator(ServerSentEventReader reader, CancellationToken cancellationToken = default) + { + _reader = reader; + _cancellationToken = cancellationToken; + } + + public async ValueTask MoveNextAsync() + { + ServerSentEvent? nextEvent = await _reader.TryGetNextEventAsync(_cancellationToken).ConfigureAwait(false); + if (nextEvent.HasValue) + { + if (nextEvent.Value.Data.Span.SequenceEqual(_doneToken.Span)) + { + return false; + } + Current = nextEvent.Value; + return true; + } + return false; + } + + private void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + _reader.Dispose(); + } + + _disposedValue = true; + } + } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + public ValueTask DisposeAsync() + { + Dispose(); + return new ValueTask(); + } +} diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/AsyncServerSentEventJsonDataEnumerator.cs b/sdk/core/System.ClientModel/src/Internal/SSE/AsyncServerSentEventJsonDataEnumerator.cs new file mode 100644 index 000000000000..3dcac7ecec03 --- /dev/null +++ b/sdk/core/System.ClientModel/src/Internal/SSE/AsyncServerSentEventJsonDataEnumerator.cs @@ -0,0 +1,61 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.ClientModel.Primitives; +using System.Collections.Generic; +using System.Text.Json; +using System.Threading.Tasks; + +namespace System.ClientModel.Internal; + +internal class AsyncServerSentEventJsonDataEnumerator : IAsyncEnumerator, IDisposable, IAsyncDisposable + where T : IJsonModel +{ + private readonly AsyncServerSentEventEnumerator _eventEnumerator; + + private T? _current; + + // TODO: is null supression the correct pattern here? + public T Current { get => _current!; } + + public AsyncServerSentEventJsonDataEnumerator(AsyncServerSentEventEnumerator eventEnumerator) + { + Argument.AssertNotNull(eventEnumerator, nameof(eventEnumerator)); + + _eventEnumerator = eventEnumerator; + } + + public async ValueTask MoveNextAsync() + { + if (await _eventEnumerator.MoveNextAsync().ConfigureAwait(false)) + { + using JsonDocument eventDocument = JsonDocument.Parse(_eventEnumerator.Current.Data); + BinaryData eventData = BinaryData.FromObjectAsJson(eventDocument.RootElement); + T? jsonData = ModelReaderWriter.Read(eventData); + + // TODO: should we stop iterating if we can't deserialize? + if (jsonData is null) + { + _current = default; + return false; + } + + if (jsonData is T singleInstanceData) + { + _current = singleInstanceData; + return true; + } + } + return false; + } + + public async ValueTask DisposeAsync() + { + await _eventEnumerator.DisposeAsync().ConfigureAwait(false); + } + + public void Dispose() + { + _eventEnumerator.Dispose(); + } +} diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEvent.cs b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEvent.cs new file mode 100644 index 000000000000..91e355c1b279 --- /dev/null +++ b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEvent.cs @@ -0,0 +1,70 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Collections.Generic; +using System.Text; + +namespace System.ClientModel.Internal; + +// SSE specification: https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream +internal readonly struct ServerSentEvent +{ + // Gets the value of the SSE "event type" buffer, used to distinguish between event kinds. + public ReadOnlyMemory EventName { get; } + // Gets the value of the SSE "data" buffer, which holds the payload of the server-sent event. + public ReadOnlyMemory Data { get; } + // Gets the value of the "last event ID" buffer, with which a user agent can reestablish a session. + public ReadOnlyMemory LastEventId { get; } + // If present, gets the defined "retry" value for the event, which represents the delay before reconnecting. + public TimeSpan? ReconnectionTime { get; } + + private readonly IReadOnlyList _fields; + private readonly string? _multiLineData; + + internal ServerSentEvent(IReadOnlyList fields) + { + _fields = fields; + StringBuilder? multiLineDataBuilder = null; + for (int i = 0; i < _fields.Count; i++) + { + ReadOnlyMemory fieldValue = _fields[i].Value; + switch (_fields[i].FieldType) + { + case ServerSentEventFieldKind.Event: + EventName = fieldValue; + break; + case ServerSentEventFieldKind.Data: + { + if (multiLineDataBuilder != null) + { + multiLineDataBuilder.Append(fieldValue); + } + else if (Data.IsEmpty) + { + Data = fieldValue; + } + else + { + multiLineDataBuilder ??= new(); + multiLineDataBuilder.Append(fieldValue); + Data = null; + } + break; + } + case ServerSentEventFieldKind.Id: + LastEventId = fieldValue; + break; + case ServerSentEventFieldKind.Retry: + ReconnectionTime = int.TryParse(fieldValue.ToString(), out int retry) ? TimeSpan.FromMilliseconds(retry) : null; + break; + default: + break; + } + if (multiLineDataBuilder != null) + { + _multiLineData = multiLineDataBuilder.ToString(); + Data = _multiLineData.AsMemory(); + } + } + } +} diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventField.cs b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventField.cs new file mode 100644 index 000000000000..c484946fa37f --- /dev/null +++ b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventField.cs @@ -0,0 +1,65 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +namespace System.ClientModel.Internal; + +// SSE specification: https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream +internal readonly struct ServerSentEventField +{ + public ServerSentEventFieldKind FieldType { get; } + + // TODO: we should not expose UTF16 publicly + public ReadOnlyMemory Value + { + get + { + if (_valueStartIndex >= _original.Length) + { + return ReadOnlyMemory.Empty; + } + else + { + return _original.AsMemory(_valueStartIndex); + } + } + } + + private readonly string _original; + private readonly int _valueStartIndex; + + internal ServerSentEventField(string line) + { + _original = line; + int colonIndex = _original.AsSpan().IndexOf(':'); + + ReadOnlyMemory fieldName = colonIndex < 0 ? _original.AsMemory(): _original.AsMemory(0, colonIndex); + FieldType = fieldName.Span switch + { + var x when x.SequenceEqual(s_eventFieldName.Span) => ServerSentEventFieldKind.Event, + var x when x.SequenceEqual(s_dataFieldName.Span) => ServerSentEventFieldKind.Data, + var x when x.SequenceEqual(s_lastEventIdFieldName.Span) => ServerSentEventFieldKind.Id, + var x when x.SequenceEqual(s_retryFieldName.Span) => ServerSentEventFieldKind.Retry, + _ => ServerSentEventFieldKind.Ignored, + }; + + if (colonIndex < 0) + { + _valueStartIndex = _original.Length; + } + else if (colonIndex + 1 < _original.Length && _original[colonIndex + 1] == ' ') + { + _valueStartIndex = colonIndex + 2; + } + else + { + _valueStartIndex = colonIndex + 1; + } + } + + public override string ToString() => _original; + + private static readonly ReadOnlyMemory s_eventFieldName = "event".AsMemory(); + private static readonly ReadOnlyMemory s_dataFieldName = "data".AsMemory(); + private static readonly ReadOnlyMemory s_lastEventIdFieldName = "id".AsMemory(); + private static readonly ReadOnlyMemory s_retryFieldName = "retry".AsMemory(); +} diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventFieldKind.cs b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventFieldKind.cs new file mode 100644 index 000000000000..48d1884a2a7a --- /dev/null +++ b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventFieldKind.cs @@ -0,0 +1,15 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +namespace System.ClientModel.Internal; + +// SSE specification: https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream +internal enum ServerSentEventFieldKind +{ + // TODO: zero value? + Event, + Data, + Id, + Retry, + Ignored +} diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventReader.cs b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventReader.cs new file mode 100644 index 000000000000..b423a3747a7f --- /dev/null +++ b/sdk/core/System.ClientModel/src/Internal/SSE/ServerSentEventReader.cs @@ -0,0 +1,126 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Collections.Generic; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace System.ClientModel.Internal; + +internal sealed class ServerSentEventReader : IDisposable +{ + private readonly Stream _stream; + private readonly StreamReader _reader; + private bool _disposedValue; + + public ServerSentEventReader(Stream stream) + { + _stream = stream; + _reader = new StreamReader(stream); + } + + /// + /// Synchronously retrieves the next server-sent event from the underlying stream, blocking until a new event is + /// available and returning null once no further data is present on the stream. + /// + /// An optional cancellation token that can abort subsequent reads. + /// + /// The next in the stream, or null once no more data can be read from the stream. + /// + // TODO: Would we rather use standard .NET TryGet semantics? + public ServerSentEvent? TryGetNextEvent(CancellationToken cancellationToken = default) + { + List fields = new(); + + while (!cancellationToken.IsCancellationRequested) + { + string? line = _reader.ReadLine(); + if (line == null) + { + // A null line indicates end of input + return null; + } + else if (line.Length == 0) + { + // An empty line should dispatch an event for pending accumulated fields + ServerSentEvent nextEvent = new(fields); + fields = new(); + return nextEvent; + } + else if (line[0] == ':') + { + // A line beginning with a colon is a comment and should be ignored + continue; + } + else + { + // Otherwise, process the the field + value and accumulate it for the next dispatched event + fields.Add(new ServerSentEventField(line)); + } + } + + return null; + } + + /// + /// Asynchronously retrieves the next server-sent event from the underlying stream, blocking until a new event is + /// available and returning null once no further data is present on the stream. + /// + /// An optional cancellation token that can abort subsequent reads. + /// + /// The next in the stream, or null once no more data can be read from the stream. + /// + public async Task TryGetNextEventAsync(CancellationToken cancellationToken = default) + { + List fields = new(); + + while (!cancellationToken.IsCancellationRequested) + { + string? line = await _reader.ReadLineAsync().ConfigureAwait(false); + if (line == null) + { + // A null line indicates end of input + return null; + } + else if (line.Length == 0) + { + // An empty line should dispatch an event for pending accumulated fields + ServerSentEvent nextEvent = new(fields); + return nextEvent; + } + else if (line[0] == ':') + { + // A line beginning with a colon is a comment and should be ignored + continue; + } + else + { + // Otherwise, process the the field + value and accumulate it for the next dispatched event + fields.Add(new ServerSentEventField(line)); + } + } + + return null; + } + + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + private void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + _reader.Dispose(); + _stream.Dispose(); + } + + _disposedValue = true; + } + } +} diff --git a/sdk/core/System.ClientModel/src/Internal/SSE/StreamingClientResult.cs b/sdk/core/System.ClientModel/src/Internal/SSE/StreamingClientResult.cs new file mode 100644 index 000000000000..08787dacc6b0 --- /dev/null +++ b/sdk/core/System.ClientModel/src/Internal/SSE/StreamingClientResult.cs @@ -0,0 +1,79 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.ClientModel.Primitives; +using System.Collections.Generic; +using System.IO; +using System.Threading; + +namespace System.ClientModel.Internal; + +/// +/// Represents an operation response with streaming content that can be deserialized and enumerated while the response +/// is still being received. +/// +/// The data type representative of distinct, streamable items. +internal class StreamingClientResult : AsyncClientResultCollection +{ + private readonly Func> _asyncEnumeratorSourceDelegate; + + // TODO: use? + //private bool _disposedValue; + + private StreamingClientResult(PipelineResponse response, Func> asyncEnumeratorSourceDelegate) + : base(response) + { + Argument.AssertNotNull(response, nameof(response)); + + if (response.ContentStream is null) + { + throw new ArgumentException("Unable to create result from response with null ContentStream", nameof(response)); + } + + _asyncEnumeratorSourceDelegate = asyncEnumeratorSourceDelegate; + } + + /// + /// Creates a new instance of that will yield items of the specified type + /// as they become available via server-sent event JSON data on the available + /// . This overload uses via the + /// interface and only supports single-item deserialization per server-sent event data + /// payload. + /// + /// The base for this result instance. + /// + /// The optional cancellation token used to control the enumeration. + /// + /// A new instance of . + public static StreamingClientResult Create(PipelineResponse response, CancellationToken cancellationToken = default) + where U : IJsonModel + { + return new(response, GetServerSentEventDeserializationEnumerator); + } + + private static IAsyncEnumerator GetServerSentEventDeserializationEnumerator(Stream stream, CancellationToken cancellationToken = default) + where U : IJsonModel + { + ServerSentEventReader? sseReader = null; + AsyncServerSentEventEnumerator? sseEnumerator = null; + try + { + sseReader = new(stream); + sseEnumerator = new(sseReader, cancellationToken); + AsyncServerSentEventJsonDataEnumerator instanceEnumerator = new(sseEnumerator); + sseEnumerator = null; + sseReader = null; + return instanceEnumerator; + } + finally + { + sseEnumerator?.Dispose(); + sseReader?.Dispose(); + } + } + + public override IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken) + { + return _asyncEnumeratorSourceDelegate.Invoke(GetRawResponse().ContentStream!, cancellationToken); + } +} diff --git a/sdk/core/System.ClientModel/tests/Convenience/ClientResultCollectionTests.cs b/sdk/core/System.ClientModel/tests/Convenience/ClientResultCollectionTests.cs new file mode 100644 index 000000000000..fc35af69b0d5 --- /dev/null +++ b/sdk/core/System.ClientModel/tests/Convenience/ClientResultCollectionTests.cs @@ -0,0 +1,74 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Threading.Tasks; +using Azure.Core.TestFramework; +using ClientModel.Tests.Mocks; +using NUnit.Framework; +using SyncAsyncTestBase = ClientModel.Tests.SyncAsyncTestBase; + +namespace System.ClientModel.Tests.Results; + +public class ClientResultCollectionTests : SyncAsyncTestBase +{ + public ClientResultCollectionTests(bool isAsync) : base(isAsync) + { + } + + [Test] + public async Task CreatesAsyncResultCollection() + { + MockPipelineResponse response = new(); + response.SetContent("[DONE]"); + + var results = AsyncClientResultCollection.Create(response); + + bool empty = true; + await foreach (MockJsonModel result in results) + { + empty = false; + } + + Assert.IsNotNull(results); + Assert.AreEqual(results.GetRawResponse(), response); + Assert.IsTrue(empty); + } + + [Test] + public async Task EnumeratesModelValues() + { + MockPipelineResponse response = new(); + response.SetContent(_mockContent); + var results = AsyncClientResultCollection.Create(response); + + int i = 0; + await foreach (MockJsonModel model in results) + { + Assert.AreEqual(model.IntValue, i); + Assert.AreEqual(model.StringValue, i.ToString()); + + i++; + } + + Assert.AreEqual(i, 3); + } + + #region Helpers + + private readonly string _mockContent = """ + event: event.0 + data: { "IntValue": 0, "StringValue": "0" } + + event: event.1 + data: { "IntValue": 1, "StringValue": "1" } + + event: event.2 + data: { "IntValue": 2, "StringValue": "2" } + + event: done + data: [DONE] + + """; + + #endregion +} diff --git a/sdk/core/System.ClientModel/tests/Convenience/ClientResultTests.cs b/sdk/core/System.ClientModel/tests/Convenience/ClientResultTests.cs index ccad66170e57..629f1ec27f2b 100644 --- a/sdk/core/System.ClientModel/tests/Convenience/ClientResultTests.cs +++ b/sdk/core/System.ClientModel/tests/Convenience/ClientResultTests.cs @@ -15,7 +15,6 @@ public class PipelineResponseTests [Test] public void CannotCreateClientResultFromNullResponse() { - Assert.Throws(() => new MockClientResult(null!)); Assert.Throws(() => { ClientResult result = ClientResult.FromResponse(null!); @@ -98,7 +97,6 @@ public void CannotCreateClientResultOfTFromNullResponse() { object value = new(); - Assert.Throws(() => new MockClientResult(value, null!)); Assert.Throws(() => { ClientResult result = ClientResult.FromValue(value, null!); diff --git a/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/AsyncServerSentEventEnumeratorTests.cs b/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/AsyncServerSentEventEnumeratorTests.cs new file mode 100644 index 000000000000..c8eb562feabe --- /dev/null +++ b/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/AsyncServerSentEventEnumeratorTests.cs @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.ClientModel.Internal; +using System.IO; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace System.ClientModel.Tests.Convenience; + +public class AsyncServerSentEventEnumeratorTests +{ + [Test] + public async Task EnumeratesSingleEvents() + { + Stream contentStream = BinaryData.FromString(_mockContent).ToStream(); + using ServerSentEventReader reader = new(contentStream); + using AsyncServerSentEventEnumerator enumerator = new(reader); + + int i = 0; + while (await enumerator.MoveNextAsync()) + { + ServerSentEvent sse = enumerator.Current; + + Assert.IsTrue(sse.EventName.Span.SequenceEqual($"event.{i}".AsSpan())); + Assert.IsTrue(sse.Data.Span.SequenceEqual($"{{ \"id\": \"{i}\", \"object\": {i} }}".AsSpan())); + + i++; + } + + Assert.AreEqual(i, 3); + } + + // TODO: Add tests for dispose and handling cancellation token + // TODO: later, add tests for varying the _doneToken value. + + #region Helpers + + private string _mockContent = """ + event: event.0 + data: { "id": "0", "object": 0 } + + event: event.1 + data: { "id": "1", "object": 1 } + + event: event.2 + data: { "id": "2", "object": 2 } + + event: done + data: [DONE] + + """; + + #endregion +} diff --git a/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/AsyncServerSentEventJsonDataEnumeratorTests.cs b/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/AsyncServerSentEventJsonDataEnumeratorTests.cs new file mode 100644 index 000000000000..4cbccc0c32de --- /dev/null +++ b/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/AsyncServerSentEventJsonDataEnumeratorTests.cs @@ -0,0 +1,58 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.ClientModel.Internal; +using System.IO; +using System.Threading.Tasks; +using Azure.Core.TestFramework; +using NUnit.Framework; + +namespace System.ClientModel.Tests.Convenience; + +public class AsyncServerSentEventJsonDataEnumeratorTests +{ + [Test] + public async Task EnumeratesSingleEvents() + { + Stream contentStream = BinaryData.FromString(_mockContent).ToStream(); + using ServerSentEventReader reader = new(contentStream); + using AsyncServerSentEventEnumerator eventEnumerator = new(reader); + using AsyncServerSentEventJsonDataEnumerator modelEnumerator = new(eventEnumerator); + + int i = 0; + while (await modelEnumerator.MoveNextAsync()) + { + MockJsonModel model = modelEnumerator.Current; + + Assert.AreEqual(model.IntValue, i); + Assert.AreEqual(model.StringValue, i.ToString()); + + i++; + } + + Assert.AreEqual(i, 3); + } + + // TODO: Add tests for dispose and handling cancellation token + // TODO: later, add tests for varying the _doneToken value. + // TODO: tests for infinite stream -- no terminal event; how to show it won't stop? + + #region Helpers + + private readonly string _mockContent = """ + event: event.0 + data: { "IntValue": 0, "StringValue": "0" } + + event: event.1 + data: { "IntValue": 1, "StringValue": "1" } + + event: event.2 + data: { "IntValue": 2, "StringValue": "2" } + + event: done + data: [DONE] + + """; + + #endregion +} diff --git a/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/ServerSentEventFieldTests.cs b/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/ServerSentEventFieldTests.cs new file mode 100644 index 000000000000..acac0f6056f9 --- /dev/null +++ b/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/ServerSentEventFieldTests.cs @@ -0,0 +1,21 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.ClientModel.Internal; +using NUnit.Framework; + +namespace System.ClientModel.Tests.Convenience; + +public class ServerSentEventFieldTests +{ + [Test] + public void ParsesEventField() + { + string line = "event: event.name"; + ServerSentEventField field = new(line); + + Assert.AreEqual(field.ToString(), line); + Assert.AreEqual(field.FieldType, ServerSentEventFieldKind.Event); + Assert.IsTrue(field.Value.Span.SequenceEqual("event.name".AsSpan())); + } +} diff --git a/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/ServerSentEventReaderTests.cs b/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/ServerSentEventReaderTests.cs new file mode 100644 index 000000000000..173afbfba46b --- /dev/null +++ b/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/ServerSentEventReaderTests.cs @@ -0,0 +1,60 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.ClientModel.Internal; +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace System.ClientModel.Tests.Convenience; + +public class ServerSentEventReaderTests +{ + // TODO: Test both sync and async + + [Test] + public async Task GetsEventsFromStream() + { + Stream contentStream = BinaryData.FromString(_mockContent).ToStream(); + using ServerSentEventReader reader = new(contentStream); + + List events = new(); + ServerSentEvent? ssEvent = await reader.TryGetNextEventAsync(); + while (ssEvent is not null) + { + events.Add(ssEvent.Value); + ssEvent = await reader.TryGetNextEventAsync(); + } + + Assert.AreEqual(events.Count, 3); + + for (int i = 0; i < events.Count; i++) + { + ServerSentEvent sse = events[i]; + Assert.IsTrue(sse.EventName.Span.SequenceEqual($"event.{i}".AsSpan())); + Assert.IsTrue(sse.Data.Span.SequenceEqual($"{{ \"id\": \"{i}\", \"object\": {i} }}".AsSpan())); + } + + // TODO: Question - should this include the "done" event? Probably yes? + } + + #region Helpers + + private string _mockContent = """ + event: event.0 + data: { "id": "0", "object": 0 } + + event: event.1 + data: { "id": "1", "object": 1 } + + event: event.2 + data: { "id": "2", "object": 2 } + + event: done + data: [DONE] + + """; + + #endregion +} diff --git a/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/ServerSentEventTests.cs b/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/ServerSentEventTests.cs new file mode 100644 index 000000000000..63ecc7c6bb82 --- /dev/null +++ b/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/ServerSentEventTests.cs @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.ClientModel.Internal; +using System.Collections.Generic; +using NUnit.Framework; + +namespace System.ClientModel.Tests.Convenience; + +public class ServerSentEventTests +{ + [Test] + public void SetsPropertiesFromFields() + { + string eventLine = "event: event.name"; + string dataLine = """data: {"id":"a","object":"value"}"""; + + List fields = new() { + new ServerSentEventField(eventLine), + new ServerSentEventField(dataLine) + }; + + ServerSentEvent ssEvent = new(fields); + + Assert.IsNull(ssEvent.ReconnectionTime); + Assert.IsTrue(ssEvent.EventName.Span.SequenceEqual("event.name".AsSpan())); + Assert.IsTrue(ssEvent.Data.Span.SequenceEqual("""{"id":"a","object":"value"}""".AsSpan())); + Assert.AreEqual(ssEvent.LastEventId.Length, 0); + } +} diff --git a/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/StreamingClientResultTests.cs b/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/StreamingClientResultTests.cs new file mode 100644 index 000000000000..1b3cff691faa --- /dev/null +++ b/sdk/core/System.ClientModel/tests/internal/Convenience/SSE/StreamingClientResultTests.cs @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.ClientModel.Internal; +using System.Threading.Tasks; +using Azure.Core.TestFramework; +using ClientModel.Tests.Mocks; +using NUnit.Framework; + +namespace System.ClientModel.Tests.Convenience; + +public class StreamingClientResultTests +{ + [Test] + public async Task EnumeratesModelValues() + { + MockPipelineResponse response = new(); + response.SetContent(_mockContent); + var results = StreamingClientResult.Create(response); + + int i = 0; + await foreach (MockJsonModel model in results) + { + Assert.AreEqual(model.IntValue, i); + Assert.AreEqual(model.StringValue, i.ToString()); + + i++; + } + + Assert.AreEqual(i, 3); + } + + // TODO: Add tests for dispose and handling cancellation token + // TODO: later, add tests for varying the _doneToken value. + + #region Helpers + + private readonly string _mockContent = """ + event: event.0 + data: { "IntValue": 0, "StringValue": "0" } + + event: event.1 + data: { "IntValue": 1, "StringValue": "1" } + + event: event.2 + data: { "IntValue": 2, "StringValue": "2" } + + event: done + data: [DONE] + + """; + + #endregion +}