diff --git a/sdk/core/Azure.Core/api/Azure.Core.net461.cs b/sdk/core/Azure.Core/api/Azure.Core.net461.cs index 679dd6fac145..740bcb0f0db8 100644 --- a/sdk/core/Azure.Core/api/Azure.Core.net461.cs +++ b/sdk/core/Azure.Core/api/Azure.Core.net461.cs @@ -239,13 +239,15 @@ public abstract partial class Response : System.ClientModel.Primitives.PipelineR { protected Response() { } public abstract string ClientRequestId { get; set; } - public virtual new System.BinaryData Content { get { throw null; } } + public override System.BinaryData Content { get { throw null; } } public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } } protected internal abstract bool ContainsHeader(string name); protected internal abstract System.Collections.Generic.IEnumerable EnumerateHeaders(); public static Azure.Response FromValue(T value, Azure.Response response) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; } + public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public override System.Threading.Tasks.ValueTask ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] protected sealed override void SetIsErrorCore(bool isError) { } public override string ToString() { throw null; } diff --git a/sdk/core/Azure.Core/api/Azure.Core.net472.cs b/sdk/core/Azure.Core/api/Azure.Core.net472.cs index 679dd6fac145..740bcb0f0db8 100644 --- a/sdk/core/Azure.Core/api/Azure.Core.net472.cs +++ b/sdk/core/Azure.Core/api/Azure.Core.net472.cs @@ -239,13 +239,15 @@ public abstract partial class Response : System.ClientModel.Primitives.PipelineR { protected Response() { } public abstract string ClientRequestId { get; set; } - public virtual new System.BinaryData Content { get { throw null; } } + public override System.BinaryData Content { get { throw null; } } public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } } protected internal abstract bool ContainsHeader(string name); protected internal abstract System.Collections.Generic.IEnumerable EnumerateHeaders(); public static Azure.Response FromValue(T value, Azure.Response response) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; } + public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public override System.Threading.Tasks.ValueTask ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] protected sealed override void SetIsErrorCore(bool isError) { } public override string ToString() { throw null; } diff --git a/sdk/core/Azure.Core/api/Azure.Core.net6.0.cs b/sdk/core/Azure.Core/api/Azure.Core.net6.0.cs index 03e42fa65b7e..7d279514ae41 100644 --- a/sdk/core/Azure.Core/api/Azure.Core.net6.0.cs +++ b/sdk/core/Azure.Core/api/Azure.Core.net6.0.cs @@ -239,13 +239,15 @@ public abstract partial class Response : System.ClientModel.Primitives.PipelineR { protected Response() { } public abstract string ClientRequestId { get; set; } - public virtual new System.BinaryData Content { get { throw null; } } + public override System.BinaryData Content { get { throw null; } } public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } } protected internal abstract bool ContainsHeader(string name); protected internal abstract System.Collections.Generic.IEnumerable EnumerateHeaders(); public static Azure.Response FromValue(T value, Azure.Response response) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; } + public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public override System.Threading.Tasks.ValueTask ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] protected sealed override void SetIsErrorCore(bool isError) { } public override string ToString() { throw null; } diff --git a/sdk/core/Azure.Core/api/Azure.Core.netstandard2.0.cs b/sdk/core/Azure.Core/api/Azure.Core.netstandard2.0.cs index 679dd6fac145..740bcb0f0db8 100644 --- a/sdk/core/Azure.Core/api/Azure.Core.netstandard2.0.cs +++ b/sdk/core/Azure.Core/api/Azure.Core.netstandard2.0.cs @@ -239,13 +239,15 @@ public abstract partial class Response : System.ClientModel.Primitives.PipelineR { protected Response() { } public abstract string ClientRequestId { get; set; } - public virtual new System.BinaryData Content { get { throw null; } } + public override System.BinaryData Content { get { throw null; } } public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } } protected internal abstract bool ContainsHeader(string name); protected internal abstract System.Collections.Generic.IEnumerable EnumerateHeaders(); public static Azure.Response FromValue(T value, Azure.Response response) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; } + public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public override System.Threading.Tasks.ValueTask ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] protected sealed override void SetIsErrorCore(bool isError) { } public override string ToString() { throw null; } diff --git a/sdk/core/Azure.Core/src/Internal/AzureBaseBuffersExtensions.cs b/sdk/core/Azure.Core/src/Internal/AzureBaseBuffersExtensions.cs index 50087757232c..6fba08bab455 100644 --- a/sdk/core/Azure.Core/src/Internal/AzureBaseBuffersExtensions.cs +++ b/sdk/core/Azure.Core/src/Internal/AzureBaseBuffersExtensions.cs @@ -13,6 +13,9 @@ namespace Azure.Core.Buffers { internal static class AzureBaseBuffersExtensions { + // Same value as Stream.CopyTo uses by default + private const int DefaultCopyBufferSize = 81920; + public static async Task WriteAsync(this Stream stream, ReadOnlyMemory buffer, CancellationToken cancellation = default) { Argument.AssertNotNull(stream, nameof(stream)); @@ -87,5 +90,82 @@ public static async Task WriteAsync(this Stream stream, ReadOnlySequence b ArrayPool.Shared.Return(array); } } + + public static async Task CopyToAsync(this Stream source, Stream destination, TimeSpan timeout, CancellationToken cancellationToken) + { + using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(timeout); + + // If cancellation is possible (whether due to network timeout or a user cancellation token being passed), then + // register callback to dispose the stream on cancellation. + if (timeout != Timeout.InfiniteTimeSpan || cancellationToken.CanBeCanceled) + { + cts.Token.Register(state => ((Stream?)state)?.Dispose(), source); + } + + byte[] buffer = ArrayPool.Shared.Rent(DefaultCopyBufferSize); + + try + { + while (true) + { +#pragma warning disable CA1835 // ReadAsync(Memory<>) overload is not available in all targets + int bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cts.Token).ConfigureAwait(false); +#pragma warning restore // ReadAsync(Memory<>) overload is not available in all targets + if (bytesRead == 0) + break; + await destination.WriteAsync(new ReadOnlyMemory(buffer, 0, bytesRead), cts.Token).ConfigureAwait(false); + } + } + catch (Exception ex) when (ex is ObjectDisposedException + or IOException + or OperationCanceledException + or NotSupportedException) + { + CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, cts.Token, ex, timeout); + throw; + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + + public static void CopyTo(this Stream source, Stream destination, TimeSpan timeout, CancellationToken cancellationToken) + { + using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(timeout); + + // If cancellation is possible (whether due to network timeout or a user cancellation token being passed), then + // register callback to dispose the stream on cancellation. + if (timeout != Timeout.InfiniteTimeSpan || cancellationToken.CanBeCanceled) + { + cts.Token.Register(state => ((Stream?)state)?.Dispose(), source); + } + + byte[] buffer = ArrayPool.Shared.Rent(DefaultCopyBufferSize); + + try + { + int read; + while ((read = source.Read(buffer, 0, buffer.Length)) != 0) + { + cts.Token.ThrowIfCancellationRequested(); + destination.Write(buffer, 0, read); + } + } + catch (Exception ex) when (ex is ObjectDisposedException + or IOException + or OperationCanceledException + or NotSupportedException) + { + CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, cts.Token, ex, timeout); + throw; + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } } } diff --git a/sdk/core/Azure.Core/src/Pipeline/HttpClientTransport.Response.cs b/sdk/core/Azure.Core/src/Pipeline/HttpClientTransport.Response.cs index 604df526b3e3..2f957332ce5c 100644 --- a/sdk/core/Azure.Core/src/Pipeline/HttpClientTransport.Response.cs +++ b/sdk/core/Azure.Core/src/Pipeline/HttpClientTransport.Response.cs @@ -1,11 +1,14 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +using System; using System.ClientModel.Primitives; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; namespace Azure.Core.Pipeline { @@ -41,6 +44,14 @@ public override Stream? ContentStream set => _pipelineResponse.ContentStream = value; } + public override BinaryData Content => _pipelineResponse.Content; + + public override BinaryData ReadContent(CancellationToken cancellationToken = default) + => _pipelineResponse.ReadContent(cancellationToken); + + public override async ValueTask ReadContentAsync(CancellationToken cancellationToken = default) + => await base.ReadContentAsync(cancellationToken).ConfigureAwait(false); + protected internal override bool ContainsHeader(string name) => _pipelineResponse.Headers.TryGetValue(name, out _); diff --git a/sdk/core/Azure.Core/src/Pipeline/HttpWebRequestTransport.cs b/sdk/core/Azure.Core/src/Pipeline/HttpWebRequestTransport.cs index f3153bbe9011..77b0501fc8fb 100644 --- a/sdk/core/Azure.Core/src/Pipeline/HttpWebRequestTransport.cs +++ b/sdk/core/Azure.Core/src/Pipeline/HttpWebRequestTransport.cs @@ -328,6 +328,8 @@ public override Stream? ContentStream } } + // TODO: Implement Content and ReadContent + public override string ClientRequestId { get; set; } public override void Dispose() diff --git a/sdk/core/Azure.Core/src/Pipeline/Internal/ResponseBodyPolicy.cs b/sdk/core/Azure.Core/src/Pipeline/Internal/ResponseBodyPolicy.cs index 8dd5b5ed8cb3..1b798a7bd803 100644 --- a/sdk/core/Azure.Core/src/Pipeline/Internal/ResponseBodyPolicy.cs +++ b/sdk/core/Azure.Core/src/Pipeline/Internal/ResponseBodyPolicy.cs @@ -50,6 +50,8 @@ private async ValueTask ProcessSyncOrAsync(HttpMessage message, ReadOnlyMemory()); + /// /// Gets the client request id that was sent to the server as x-ms-client-request-id headers. /// @@ -28,14 +34,6 @@ public abstract class Response : PipelineResponse /// public new virtual ResponseHeaders Headers => new ResponseHeaders(this); - /// - /// Gets the contents of HTTP response, if it is available. - /// - /// - /// Throws when is not a . - /// - public new virtual BinaryData Content => base.Content; - /// /// TBD. /// @@ -48,6 +46,44 @@ protected override PipelineResponseHeaders GetHeadersCore() throw new NotImplementedException(); } + /// + /// Gets the contents of HTTP response, if it is available. + /// + /// + /// Throws when content is not buffered. + /// + public override BinaryData Content + { + get + { + if (ContentStream == null) + { + return s_EmptyBinaryData; + } + + // Base implementation must create the buffer on-demand + // because it cannot know whether the ContentStream has + // been replaced since _bufferedContent was set. Subtypes + // should override this implementation to be able to use + // cached content. + if (ContentStream is not MemoryStream memoryContent) + { + throw new InvalidOperationException($"The response is not buffered."); + } + + if (memoryContent.TryGetBuffer(out ArraySegment segment)) + { + return new BinaryData(segment.AsMemory()); + } + else + { + return new BinaryData(memoryContent.ToArray()); + } + } + } + + internal TimeSpan NetworkTimeout { get; set; } + internal HttpMessageSanitizer Sanitizer { get; set; } = HttpMessageSanitizer.Default; internal RequestFailedDetailsParser? RequestFailedDetailsParser { get; set; } @@ -124,6 +160,62 @@ internal static void DisposeStreamIfNotBuffered(ref Stream? stream) } } + /// + /// TBD. + /// + /// + /// + /// + public override BinaryData ReadContent(CancellationToken cancellationToken = default) + { + // Derived types should provide an implementation that allows caching + // to improve performance. + if (ContentStream is null) + { + return s_EmptyBinaryData; + } + + MemoryStream bufferStream = new(); + + Stream? contentStream = ContentStream; + contentStream.CopyTo(bufferStream, NetworkTimeout, cancellationToken); + contentStream.Dispose(); + + bufferStream.Position = 0; + ContentStream = bufferStream; + + return BinaryData.FromStream(bufferStream); + } + + /// + /// TBD. + /// + /// + /// + /// + public override async ValueTask ReadContentAsync(CancellationToken cancellationToken = default) + { + // Derived types should provide an implementation that allows caching + // to improve performance. + if (ContentStream is null) + { + return s_EmptyBinaryData; + } + + MemoryStream bufferStream = new(); + + Stream? contentStream = ContentStream; + await contentStream.CopyToAsync(bufferStream, NetworkTimeout, cancellationToken).ConfigureAwait(false); + contentStream.Dispose(); + + bufferStream.Position = 0; + ContentStream = bufferStream; + + return BinaryData.FromStream(bufferStream); + } + + private class BufferedContentStream : MemoryStream { } + #region Private implementation subtypes of abstract Response types private class AzureCoreResponse : Response { @@ -180,6 +272,16 @@ protected internal override bool TryGetHeaderValues(string name, [NotNullWhen(tr { throw new NotSupportedException(DefaultMessage); } + + public override BinaryData ReadContent(CancellationToken cancellationToken = default) + { + throw new NotSupportedException(DefaultMessage); + } + + public override ValueTask ReadContentAsync(CancellationToken cancellationToken = default) + { + throw new NotSupportedException(DefaultMessage); + } } #endregion } diff --git a/sdk/core/Azure.Core/src/Shared/CancellationHelper.cs b/sdk/core/Azure.Core/src/Shared/CancellationHelper.cs index 73773b9a3fcd..43019e0c5274 100644 --- a/sdk/core/Azure.Core/src/Shared/CancellationHelper.cs +++ b/sdk/core/Azure.Core/src/Shared/CancellationHelper.cs @@ -53,5 +53,25 @@ internal static void ThrowIfCancellationRequested(CancellationToken cancellation ThrowOperationCanceledException(innerException: null, cancellationToken); } } + + /// Throws a cancellation exception if cancellation has been requested via or . + /// The customer provided token. + /// The linked token that is cancelled on timeout provided token. + /// The inner exception to use. + /// The timeout used for the operation. +#pragma warning disable CA1068 // Cancellation token has to be the last parameter + internal static void ThrowIfCancellationRequestedOrTimeout(CancellationToken cancellationToken, CancellationToken timeoutToken, Exception? innerException, TimeSpan timeout) +#pragma warning restore CA1068 + { + ThrowIfCancellationRequested(cancellationToken); + + if (timeoutToken.IsCancellationRequested) + { + throw CreateOperationCanceledException( + innerException, + timeoutToken, + $"The operation was cancelled because it exceeded the configured timeout of {timeout:g}. "); + } + } } -} \ No newline at end of file +} diff --git a/sdk/core/System.ClientModel/api/System.ClientModel.net6.0.cs b/sdk/core/System.ClientModel/api/System.ClientModel.net6.0.cs index 3af3e84a6abd..2e77c2960315 100644 --- a/sdk/core/System.ClientModel/api/System.ClientModel.net6.0.cs +++ b/sdk/core/System.ClientModel/api/System.ClientModel.net6.0.cs @@ -205,7 +205,7 @@ protected PipelineRequestHeaders() { } public abstract partial class PipelineResponse : System.IDisposable { protected PipelineResponse() { } - public virtual System.BinaryData Content { get { throw null; } } + public abstract System.BinaryData Content { get; } public abstract System.IO.Stream? ContentStream { get; set; } public System.ClientModel.Primitives.PipelineResponseHeaders Headers { get { throw null; } } public virtual bool IsError { get { throw null; } } @@ -213,6 +213,8 @@ protected PipelineResponse() { } public abstract int Status { get; } public abstract void Dispose(); protected abstract System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore(); + public abstract System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); + public abstract System.Threading.Tasks.ValueTask ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); protected virtual void SetIsErrorCore(bool isError) { } } public abstract partial class PipelineResponseHeaders : System.Collections.Generic.IEnumerable>, System.Collections.IEnumerable diff --git a/sdk/core/System.ClientModel/api/System.ClientModel.netstandard2.0.cs b/sdk/core/System.ClientModel/api/System.ClientModel.netstandard2.0.cs index 0e44a8d8acec..d7b017a5c400 100644 --- a/sdk/core/System.ClientModel/api/System.ClientModel.netstandard2.0.cs +++ b/sdk/core/System.ClientModel/api/System.ClientModel.netstandard2.0.cs @@ -204,7 +204,7 @@ protected PipelineRequestHeaders() { } public abstract partial class PipelineResponse : System.IDisposable { protected PipelineResponse() { } - public virtual System.BinaryData Content { get { throw null; } } + public abstract System.BinaryData Content { get; } public abstract System.IO.Stream? ContentStream { get; set; } public System.ClientModel.Primitives.PipelineResponseHeaders Headers { get { throw null; } } public virtual bool IsError { get { throw null; } } @@ -212,6 +212,8 @@ protected PipelineResponse() { } public abstract int Status { get; } public abstract void Dispose(); protected abstract System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore(); + public abstract System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); + public abstract System.Threading.Tasks.ValueTask ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); protected virtual void SetIsErrorCore(bool isError) { } } public abstract partial class PipelineResponseHeaders : System.Collections.Generic.IEnumerable>, System.Collections.IEnumerable diff --git a/sdk/core/System.ClientModel/src/Convenience/ClientResultException.cs b/sdk/core/System.ClientModel/src/Convenience/ClientResultException.cs index bf86832f1dba..095ac80cfd1a 100644 --- a/sdk/core/System.ClientModel/src/Convenience/ClientResultException.cs +++ b/sdk/core/System.ClientModel/src/Convenience/ClientResultException.cs @@ -67,7 +67,7 @@ public override void GetObjectData(SerializationInfo info, StreamingContext cont private static string CreateMessage(PipelineResponse response) { - response.BufferContent(); + response.ReadContent(); StringBuilder messageBuilder = new(); diff --git a/sdk/core/System.ClientModel/src/Internal/CancellationHelper.cs b/sdk/core/System.ClientModel/src/Internal/CancellationHelper.cs index 869516e461bf..0f11babcbe41 100644 --- a/sdk/core/System.ClientModel/src/Internal/CancellationHelper.cs +++ b/sdk/core/System.ClientModel/src/Internal/CancellationHelper.cs @@ -48,4 +48,24 @@ internal static void ThrowIfCancellationRequested(CancellationToken cancellation ThrowOperationCanceledException(innerException: null, cancellationToken); } } -} \ No newline at end of file + + /// Throws a cancellation exception if cancellation has been requested via or . + /// The customer provided token. + /// The linked token that is cancelled on timeout provided token. + /// The inner exception to use. + /// The timeout used for the operation. +#pragma warning disable CA1068 // Cancellation token has to be the last parameter + internal static void ThrowIfCancellationRequestedOrTimeout(CancellationToken cancellationToken, CancellationToken timeoutToken, Exception? innerException, TimeSpan timeout) +#pragma warning restore CA1068 + { + ThrowIfCancellationRequested(cancellationToken); + + if (timeoutToken.IsCancellationRequested) + { + throw CreateOperationCanceledException( + innerException, + timeoutToken, + $"The operation was cancelled because it exceeded the configured timeout of {timeout:g}. "); + } + } +} diff --git a/sdk/core/System.ClientModel/src/Internal/ReadTimeoutStream.cs b/sdk/core/System.ClientModel/src/Internal/ReadTimeoutStream.cs index fd8c3bdd96d7..ee7f045ec535 100644 --- a/sdk/core/System.ClientModel/src/Internal/ReadTimeoutStream.cs +++ b/sdk/core/System.ClientModel/src/Internal/ReadTimeoutStream.cs @@ -68,18 +68,18 @@ public override int Read(byte[] buffer, int offset, int count) // We dispose stream on timeout so catch and check if cancellation token was cancelled catch (IOException ex) { - ResponseBufferingPolicy.ThrowIfCancellationRequestedOrTimeout(default, source.Token, ex, _readTimeout); + CancellationHelper.ThrowIfCancellationRequestedOrTimeout(default, source.Token, ex, _readTimeout); throw; } // We dispose stream on timeout so catch and check if cancellation token was cancelled catch (ObjectDisposedException ex) { - ResponseBufferingPolicy.ThrowIfCancellationRequestedOrTimeout(default, source.Token, ex, _readTimeout); + CancellationHelper.ThrowIfCancellationRequestedOrTimeout(default, source.Token, ex, _readTimeout); throw; } catch (OperationCanceledException ex) { - ResponseBufferingPolicy.ThrowIfCancellationRequestedOrTimeout(default, source.Token, ex, _readTimeout); + CancellationHelper.ThrowIfCancellationRequestedOrTimeout(default, source.Token, ex, _readTimeout); throw; } finally @@ -100,18 +100,18 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, // We dispose stream on timeout so catch and check if cancellation token was cancelled catch (IOException ex) { - ResponseBufferingPolicy.ThrowIfCancellationRequestedOrTimeout(cancellationToken, source.Token, ex, _readTimeout); + CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, source.Token, ex, _readTimeout); throw; } // We dispose stream on timeout so catch and check if cancellation token was cancelled catch (ObjectDisposedException ex) { - ResponseBufferingPolicy.ThrowIfCancellationRequestedOrTimeout(cancellationToken, source.Token, ex, _readTimeout); + CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, source.Token, ex, _readTimeout); throw; } catch (OperationCanceledException ex) { - ResponseBufferingPolicy.ThrowIfCancellationRequestedOrTimeout(cancellationToken, source.Token, ex, _readTimeout); + CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, source.Token, ex, _readTimeout); throw; } finally diff --git a/sdk/core/System.ClientModel/src/Internal/StreamExtensions.cs b/sdk/core/System.ClientModel/src/Internal/StreamExtensions.cs index 21fdef014f56..dbc25ac4cf4b 100644 --- a/sdk/core/System.ClientModel/src/Internal/StreamExtensions.cs +++ b/sdk/core/System.ClientModel/src/Internal/StreamExtensions.cs @@ -12,6 +12,9 @@ namespace System.ClientModel.Internal; internal static class StreamExtensions { + // Same value as Stream.CopyTo uses by default + private const int DefaultCopyBufferSize = 81920; + public static async Task WriteAsync(this Stream stream, ReadOnlyMemory buffer, CancellationToken cancellation = default) { Argument.AssertNotNull(stream, nameof(stream)); @@ -86,4 +89,81 @@ public static async Task WriteAsync(this Stream stream, ReadOnlySequence b ArrayPool.Shared.Return(array); } } + + public static async Task CopyToAsync(this Stream source, Stream destination, TimeSpan timeout, CancellationToken cancellationToken) + { + using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(timeout); + + // If cancellation is possible (whether due to network timeout or a user cancellation token being passed), then + // register callback to dispose the stream on cancellation. + if (timeout != Timeout.InfiniteTimeSpan || cancellationToken.CanBeCanceled) + { + cts.Token.Register(state => ((Stream?)state)?.Dispose(), source); + } + + byte[] buffer = ArrayPool.Shared.Rent(DefaultCopyBufferSize); + + try + { + while (true) + { +#pragma warning disable CA1835 // ReadAsync(Memory<>) overload is not available in all targets + int bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cts.Token).ConfigureAwait(false); +#pragma warning restore // ReadAsync(Memory<>) overload is not available in all targets + if (bytesRead == 0) + break; + await destination.WriteAsync(new ReadOnlyMemory(buffer, 0, bytesRead), cts.Token).ConfigureAwait(false); + } + } + catch (Exception ex) when (ex is ObjectDisposedException + or IOException + or OperationCanceledException + or NotSupportedException) + { + CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, cts.Token, ex, timeout); + throw; + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + + public static void CopyTo(this Stream source, Stream destination, TimeSpan timeout, CancellationToken cancellationToken) + { + using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(timeout); + + // If cancellation is possible (whether due to network timeout or a user cancellation token being passed), then + // register callback to dispose the stream on cancellation. + if (timeout != Timeout.InfiniteTimeSpan || cancellationToken.CanBeCanceled) + { + cts.Token.Register(state => ((Stream?)state)?.Dispose(), source); + } + + byte[] buffer = ArrayPool.Shared.Rent(DefaultCopyBufferSize); + + try + { + int read; + while ((read = source.Read(buffer, 0, buffer.Length)) != 0) + { + cts.Token.ThrowIfCancellationRequested(); + destination.Write(buffer, 0, read); + } + } + catch (Exception ex) when (ex is ObjectDisposedException + or IOException + or OperationCanceledException + or NotSupportedException) + { + CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, cts.Token, ex, timeout); + throw; + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } } diff --git a/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs b/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs index 8e13b24ff2d0..121827d1af3e 100644 --- a/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs +++ b/sdk/core/System.ClientModel/src/Message/PipelineResponse.cs @@ -1,8 +1,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -using System.Buffers; -using System.ClientModel.Internal; using System.IO; using System.Threading; using System.Threading.Tasks; @@ -12,7 +10,7 @@ namespace System.ClientModel.Primitives; public abstract class PipelineResponse : IDisposable { // TODO(matell): The .NET Framework team plans to add BinaryData.Empty in dotnet/runtime#49670, and we can use it then. - private static readonly BinaryData s_emptyBinaryData = new(Array.Empty()); + internal static readonly BinaryData s_EmptyBinaryData = new(Array.Empty()); private bool _isError = false; @@ -35,30 +33,11 @@ public abstract class PipelineResponse : IDisposable /// public abstract Stream? ContentStream { get; set; } - public virtual BinaryData Content - { - get - { - if (ContentStream == null) - { - return s_emptyBinaryData; - } - - if (!TryGetBufferedContent(out MemoryStream bufferedContent)) - { - throw new InvalidOperationException($"The response is not buffered."); - } - - if (bufferedContent.TryGetBuffer(out ArraySegment segment)) - { - return new BinaryData(segment.AsMemory()); - } - else - { - return new BinaryData(bufferedContent.ToArray()); - } - } - } + public abstract BinaryData Content { get; } + + public abstract BinaryData ReadContent(CancellationToken cancellationToken = default); + + public abstract ValueTask ReadContentAsync(CancellationToken cancellationToken = default); /// /// Indicates whether the status code of the returned response is considered @@ -77,96 +56,35 @@ public virtual BinaryData Content public abstract void Dispose(); - #region Response Buffering - - // Same value as Stream.CopyTo uses by default - private const int DefaultCopyBufferSize = 81920; - - internal bool TryGetBufferedContent(out MemoryStream bufferedContent) - { - if (ContentStream is MemoryStream content) - { - bufferedContent = content; - return true; - } - - bufferedContent = default!; - return false; - } - - internal void BufferContent(TimeSpan? timeout = default, CancellationTokenSource? cts = default) - { - Stream? responseContentStream = ContentStream; - if (responseContentStream == null || TryGetBufferedContent(out _)) - { - // No need to buffer content. - return; - } - - MemoryStream bufferStream = new(); - CopyTo(responseContentStream, bufferStream, timeout ?? NetworkTimeout, cts ?? new CancellationTokenSource()); - responseContentStream.Dispose(); - bufferStream.Position = 0; - ContentStream = bufferStream; - } - - internal async Task BufferContentAsync(TimeSpan? timeout = default, CancellationTokenSource? cts = default) - { - Stream? responseContentStream = ContentStream; - if (responseContentStream == null || TryGetBufferedContent(out _)) - { - // No need to buffer content. - return; - } - - MemoryStream bufferStream = new(); - await CopyToAsync(responseContentStream, bufferStream, timeout ?? NetworkTimeout, cts ?? new CancellationTokenSource()).ConfigureAwait(false); - responseContentStream.Dispose(); - bufferStream.Position = 0; - ContentStream = bufferStream; - } - - private static async Task CopyToAsync(Stream source, Stream destination, TimeSpan timeout, CancellationTokenSource cancellationTokenSource) - { - byte[] buffer = ArrayPool.Shared.Rent(DefaultCopyBufferSize); - try - { - while (true) - { - cancellationTokenSource.CancelAfter(timeout); -#pragma warning disable CA1835 // ReadAsync(Memory<>) overload is not available in all targets - int bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationTokenSource.Token).ConfigureAwait(false); -#pragma warning restore // ReadAsync(Memory<>) overload is not available in all targets - if (bytesRead == 0) break; - await destination.WriteAsync(new ReadOnlyMemory(buffer, 0, bytesRead), cancellationTokenSource.Token).ConfigureAwait(false); - } - } - finally - { - cancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan); - ArrayPool.Shared.Return(buffer); - } - } - - private static void CopyTo(Stream source, Stream destination, TimeSpan timeout, CancellationTokenSource cancellationTokenSource) - { - byte[] buffer = ArrayPool.Shared.Rent(DefaultCopyBufferSize); - try - { - int read; - while ((read = source.Read(buffer, 0, buffer.Length)) != 0) - { - cancellationTokenSource.Token.ThrowIfCancellationRequested(); - cancellationTokenSource.CancelAfter(timeout); - destination.Write(buffer, 0, read); - } - } - finally - { - cancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan); - ArrayPool.Shared.Return(buffer); - } - } - - #endregion + //internal void BufferContent(TimeSpan? timeout = default, CancellationTokenSource? cts = default) + //{ + // Stream? responseContentStream = ContentStream; + // if (responseContentStream == null || TryGetBufferedContent(out _)) + // { + // // No need to buffer content. + // return; + // } + + // MemoryStream bufferStream = new(); + // CopyTo(responseContentStream, bufferStream, timeout ?? NetworkTimeout, cts ?? new CancellationTokenSource()); + // responseContentStream.Dispose(); + // bufferStream.Position = 0; + // ContentStream = bufferStream; + //} + + //internal async Task BufferContentAsync(TimeSpan? timeout = default, CancellationTokenSource? cts = default) + //{ + // Stream? responseContentStream = ContentStream; + // if (responseContentStream == null || TryGetBufferedContent(out _)) + // { + // // No need to buffer content. + // return; + // } + + // MemoryStream bufferStream = new(); + // await CopyToAsync(responseContentStream, bufferStream, timeout ?? NetworkTimeout, cts ?? new CancellationTokenSource()).ConfigureAwait(false); + // responseContentStream.Dispose(); + // bufferStream.Position = 0; + // ContentStream = bufferStream; + //} } diff --git a/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs b/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs index dc7c8e2f5228..409e31b5631f 100644 --- a/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs +++ b/sdk/core/System.ClientModel/src/Pipeline/HttpClientPipelineTransport.Response.cs @@ -1,8 +1,11 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +using System.ClientModel.Internal; using System.IO; using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; namespace System.ClientModel.Primitives; @@ -20,6 +23,7 @@ private class HttpPipelineResponse : PipelineResponse private readonly HttpContent _httpResponseContent; private Stream? _contentStream; + private BinaryData? _bufferedContent; private bool _disposed; @@ -39,17 +43,114 @@ protected override PipelineResponseHeaders GetHeadersCore() public override Stream? ContentStream { - get => _contentStream; + get + { + if (_contentStream is not null) + { + return _contentStream; + } + + if (_bufferedContent is not null) + { + return _bufferedContent.ToStream(); + } + + return null; + } set { - // Make sure we don't dispose the content if the stream was replaced + // Don't dispose the content if the stream is replaced. + // This means the content remains available for reading headers. _httpResponse.Content = null; _contentStream = value; + + // Invalidate the cache since the source-stream has been replaced. + _bufferedContent = null; + } + } + + public override BinaryData Content + { + get + { + // TODO: Consolidate with part of ReadContent implementation? + if (_bufferedContent is not null) + { + return _bufferedContent; + } + + if (_contentStream == null) + { + _bufferedContent = s_EmptyBinaryData; + return _bufferedContent; + } + + if (_contentStream is not MemoryStream memoryStream) + { + throw new InvalidOperationException($"The response is not buffered."); + } + + // Support mock responses that don't use the transport to buffer. + if (memoryStream.TryGetBuffer(out ArraySegment segment)) + { + _bufferedContent = new BinaryData(segment.AsMemory()); + } + else + { + _bufferedContent = new BinaryData(memoryStream.ToArray()); + } + + return _bufferedContent; } } - #region IDisposable + public override BinaryData ReadContent(CancellationToken cancellationToken = default) + => ReadContentSyncOrAsync(cancellationToken, async: false).EnsureCompleted(); + + public override async ValueTask ReadContentAsync(CancellationToken cancellationToken = default) + => await ReadContentSyncOrAsync(cancellationToken, async: true).ConfigureAwait(false); + + private async ValueTask ReadContentSyncOrAsync(CancellationToken cancellationToken, bool async) + { + if (_bufferedContent is not null) + { + // Content has already been buffered. + return _bufferedContent; + } + + if (_contentStream == null) + { + // Content is not buffered but there is no source stream. + // Our contract from Azure.Core is to return empty BinaryData in this case. + _bufferedContent = s_EmptyBinaryData; + return _bufferedContent; + } + + // ContentStream still holds the source stream. Buffer the content + // and dispose the source stream. + BufferedContentStream bufferStream = new(); + + if (async) + { + await _contentStream.CopyToAsync(bufferStream, NetworkTimeout, cancellationToken).ConfigureAwait(false); + } + else + { + _contentStream.CopyTo(bufferStream, NetworkTimeout, cancellationToken); + } + + _contentStream.Dispose(); + _contentStream = null; + + bufferStream.Position = 0; + + _bufferedContent = bufferStream.TryGetBuffer(out ArraySegment segment) ? + new BinaryData(segment.AsMemory()) : + new BinaryData(bufferStream.ToArray()); + + return _bufferedContent; + } public override void Dispose() { @@ -62,41 +163,25 @@ protected virtual void Dispose(bool disposing) { if (disposing && !_disposed) { - var httpResponse = _httpResponse; + HttpResponseMessage httpResponse = _httpResponse; httpResponse?.Dispose(); - // Some notes on this: - // - // 1. If the content is buffered, we want it to remain available to the - // client for model deserialization and in case the end user of the - // client calls OutputMessage.GetRawResponse. So, we don't dispose it. + // This response type has two states: + // 1. _contentStream holds a "source stream" which has not + // been buffered; _bufferedContent is null. + // 2. _bufferedContent is set and _contentStream is null. // - // If the content is buffered, we assume that the entity that did the - // buffering took responsibility for disposing the network stream. - // - // 2. If the content is not buffered, we dispose it so that we don't leave - // a network connection open. - // - // One tricky piece here is that in some cases, we may not have buffered - // the content because we wanted to pass the live network stream out of - // the client method and back to the end-user caller of the client e.g. - // for a streaming API. If the latter is the case, the client should have - // called the HttpMessage.ExtractResponseContent method to obtain a reference - // to the network stream, and the response content was replaced by a stream - // that we are ok to dispose here. In this case, the network stream is - // not disposed, because the entity that replaced the response content - // intentionally left the network stream undisposed. - - var contentStream = _contentStream; - if (contentStream is not null && !TryGetBufferedContent(out _)) - { - contentStream?.Dispose(); - _contentStream = null; - } + // Given this, if _contentStream is not null, we are holding + // a source stream and will dispose it. + + Stream? contentStream = _contentStream; + contentStream?.Dispose(); + _contentStream = null; _disposed = true; } } - #endregion + + private class BufferedContentStream : MemoryStream { } } -} \ No newline at end of file +} diff --git a/sdk/core/System.ClientModel/src/Pipeline/ResponseBufferingPolicy.cs b/sdk/core/System.ClientModel/src/Pipeline/ResponseBufferingPolicy.cs index 9bec762e3cd2..00a8ca2cf638 100644 --- a/sdk/core/System.ClientModel/src/Pipeline/ResponseBufferingPolicy.cs +++ b/sdk/core/System.ClientModel/src/Pipeline/ResponseBufferingPolicy.cs @@ -52,7 +52,7 @@ private async ValueTask ProcessSyncOrAsync(PipelineMessage message, IReadOnlyLis } catch (OperationCanceledException ex) { - ThrowIfCancellationRequestedOrTimeout(oldToken, cts.Token, ex, invocationNetworkTimeout); + CancellationHelper.ThrowIfCancellationRequestedOrTimeout(oldToken, cts.Token, ex, invocationNetworkTimeout); throw; } finally @@ -65,11 +65,9 @@ private async ValueTask ProcessSyncOrAsync(PipelineMessage message, IReadOnlyLis message.Response!.NetworkTimeout = invocationNetworkTimeout; Stream? responseContentStream = message.Response!.ContentStream; - if (responseContentStream is null || - message.Response.TryGetBufferedContent(out var _)) + if (responseContentStream is null) { - // There is either no content on the response, or the content has already - // been buffered. + // There is no content to buffer. return; } @@ -87,22 +85,15 @@ private async ValueTask ProcessSyncOrAsync(PipelineMessage message, IReadOnlyLis // If we got this far, buffer the response. - // If cancellation is possible (whether due to network timeout or a user cancellation token being passed), then - // register callback to dispose the stream on cancellation. - if (invocationNetworkTimeout != Timeout.InfiniteTimeSpan || oldToken.CanBeCanceled) - { - cts.Token.Register(state => ((Stream?)state)?.Dispose(), responseContentStream); - } - try { if (async) { - await message.Response.BufferContentAsync(invocationNetworkTimeout, cts).ConfigureAwait(false); + await message.Response.ReadContentAsync(cts.Token).ConfigureAwait(false); } else { - message.Response.BufferContent(invocationNetworkTimeout, cts); + message.Response.ReadContent(cts.Token); } } // We dispose stream on timeout or user cancellation so catch and check if cancellation token was cancelled @@ -112,28 +103,8 @@ or IOException or OperationCanceledException or NotSupportedException) { - ThrowIfCancellationRequestedOrTimeout(oldToken, cts.Token, ex, invocationNetworkTimeout); + CancellationHelper.ThrowIfCancellationRequestedOrTimeout(oldToken, cts.Token, ex, invocationNetworkTimeout); throw; } } - - /// Throws a cancellation exception if cancellation has been requested via or . - /// The customer provided token. - /// The linked token that is cancelled on timeout provided token. - /// The inner exception to use. - /// The timeout used for the operation. -#pragma warning disable CA1068 // Cancellation token has to be the last parameter - internal static void ThrowIfCancellationRequestedOrTimeout(CancellationToken originalToken, CancellationToken timeoutToken, Exception? inner, TimeSpan timeout) -#pragma warning restore CA1068 - { - CancellationHelper.ThrowIfCancellationRequested(originalToken); - - if (timeoutToken.IsCancellationRequested) - { - throw CancellationHelper.CreateOperationCanceledException( - inner, - timeoutToken, - $"The operation was cancelled because it exceeded the configured timeout of {timeout:g}. "); - } - } -} \ No newline at end of file +} diff --git a/sdk/core/System.ClientModel/tests/Pipeline/ClientPipelineFunctionalTests.cs b/sdk/core/System.ClientModel/tests/Pipeline/ClientPipelineFunctionalTests.cs index 86e3bc1d831f..d44a225c3831 100644 --- a/sdk/core/System.ClientModel/tests/Pipeline/ClientPipelineFunctionalTests.cs +++ b/sdk/core/System.ClientModel/tests/Pipeline/ClientPipelineFunctionalTests.cs @@ -216,6 +216,7 @@ public void TimesOutBodyBuffering() NetworkTimeout = TimeSpan.FromMilliseconds(500), RetryPolicy = new MockRetryPolicy(maxRetries: 0, i => TimeSpan.FromMilliseconds(10)), }; + ClientPipeline pipeline = ClientPipeline.Create(options); using TestServer testServer = new TestServer( diff --git a/sdk/core/System.ClientModel/tests/TestFramework/Mocks/MockPipelineResponse.cs b/sdk/core/System.ClientModel/tests/TestFramework/Mocks/MockPipelineResponse.cs index 1bc236899fcd..8b407e9cfa37 100644 --- a/sdk/core/System.ClientModel/tests/TestFramework/Mocks/MockPipelineResponse.cs +++ b/sdk/core/System.ClientModel/tests/TestFramework/Mocks/MockPipelineResponse.cs @@ -5,6 +5,8 @@ using System.ClientModel.Primitives; using System.IO; using System.Text; +using System.Threading; +using System.Threading.Tasks; namespace ClientModel.Tests.Mocks; @@ -13,6 +15,7 @@ public class MockPipelineResponse : PipelineResponse private int _status; private string _reasonPhrase; private Stream? _contentStream; + private BinaryData? _bufferedContent; private readonly PipelineResponseHeaders _headers; @@ -50,6 +53,31 @@ public override Stream? ContentStream set => _contentStream = value; } + public override BinaryData Content + { + get + { + if (_contentStream is null) + { + return BinaryData.FromString(""); + } + + if (ContentStream is not MemoryStream memoryContent) + { + throw new InvalidOperationException($"The response is not buffered."); + } + + if (memoryContent.TryGetBuffer(out ArraySegment segment)) + { + return new BinaryData(segment.AsMemory()); + } + else + { + return new BinaryData(memoryContent.ToArray()); + } + } + } + protected override PipelineResponseHeaders GetHeadersCore() => _headers; public sealed override void Dispose() @@ -73,4 +101,48 @@ protected void Dispose(bool disposing) _disposed = true; } } + + public override BinaryData ReadContent(CancellationToken cancellationToken = default) + { + if (_bufferedContent is not null) + { + return _bufferedContent; + } + + if (_contentStream is null) + { + _bufferedContent = BinaryData.FromString(string.Empty); + return _bufferedContent; + } + + MemoryStream bufferStream = new(); + _contentStream.CopyTo(bufferStream); + _contentStream.Dispose(); + _contentStream = bufferStream; + + _bufferedContent = BinaryData.FromStream(bufferStream); + return _bufferedContent; + } + + public override async ValueTask ReadContentAsync(CancellationToken cancellationToken = default) + { + if (_bufferedContent is not null) + { + return _bufferedContent; + } + + if (_contentStream is null) + { + _bufferedContent = BinaryData.FromString(string.Empty); + return _bufferedContent; + } + + MemoryStream bufferStream = new(); + await _contentStream.CopyToAsync(bufferStream).ConfigureAwait(false); + _contentStream.Dispose(); + _contentStream = bufferStream; + + _bufferedContent = BinaryData.FromStream(bufferStream); + return _bufferedContent; + } } diff --git a/sdk/core/System.ClientModel/tests/TestFramework/Mocks/MockPipelineTransport.cs b/sdk/core/System.ClientModel/tests/TestFramework/Mocks/MockPipelineTransport.cs index 17cb7968dd7d..c811ae130446 100644 --- a/sdk/core/System.ClientModel/tests/TestFramework/Mocks/MockPipelineTransport.cs +++ b/sdk/core/System.ClientModel/tests/TestFramework/Mocks/MockPipelineTransport.cs @@ -6,6 +6,7 @@ using System.ClientModel.Primitives; using System.Collections.Generic; using System.IO; +using System.Threading; using System.Threading.Tasks; namespace ClientModel.Tests.Mocks; @@ -176,11 +177,23 @@ public override Stream? ContentStream set => throw new NotImplementedException(); } + public override BinaryData Content => throw new NotImplementedException(); + protected override PipelineResponseHeaders GetHeadersCore() { throw new NotImplementedException(); } public override void Dispose() { } + + public override BinaryData ReadContent(CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + + public override ValueTask ReadContentAsync(CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } } } diff --git a/sdk/core/System.ClientModel/tests/TestFramework/Mocks/ObservableTransport.cs b/sdk/core/System.ClientModel/tests/TestFramework/Mocks/ObservableTransport.cs index 7bb84c2bf79f..b9cc98db95d9 100644 --- a/sdk/core/System.ClientModel/tests/TestFramework/Mocks/ObservableTransport.cs +++ b/sdk/core/System.ClientModel/tests/TestFramework/Mocks/ObservableTransport.cs @@ -6,6 +6,7 @@ using System.ClientModel.Primitives; using System.Collections.Generic; using System.IO; +using System.Threading; using System.Threading.Tasks; namespace ClientModel.Tests.Mocks; @@ -137,6 +138,8 @@ public override Stream? ContentStream set => throw new NotImplementedException(); } + public override BinaryData Content => throw new NotImplementedException(); + protected override PipelineResponseHeaders GetHeadersCore() { throw new NotImplementedException(); @@ -146,5 +149,15 @@ public override void Dispose() { throw new NotImplementedException(); } + + public override BinaryData ReadContent(CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + + public override ValueTask ReadContentAsync(CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } } } diff --git a/sdk/core/System.ClientModel/tests/client/MapsClientTests.cs b/sdk/core/System.ClientModel/tests/client/MapsClientTests.cs index 20aea50d5044..3cc23bd2e443 100644 --- a/sdk/core/System.ClientModel/tests/client/MapsClientTests.cs +++ b/sdk/core/System.ClientModel/tests/client/MapsClientTests.cs @@ -375,6 +375,8 @@ public override Stream ContentStream set => _stream = value; } + public override BinaryData Content => BinaryData.FromStream(_stream); + public override void Dispose() { _stream?.Dispose(); @@ -384,6 +386,16 @@ protected override PipelineResponseHeaders GetHeadersCore() { throw new NotImplementedException(); } + + public override BinaryData ReadContent(CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + + public override ValueTask ReadContentAsync(CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } } private class CustomHeaders : PipelineRequestHeaders diff --git a/sdk/core/System.ClientModel/tests/nullableenabledclient/MapsClientTests.cs b/sdk/core/System.ClientModel/tests/nullableenabledclient/MapsClientTests.cs index e400e916fafe..4ab39c69547c 100644 --- a/sdk/core/System.ClientModel/tests/nullableenabledclient/MapsClientTests.cs +++ b/sdk/core/System.ClientModel/tests/nullableenabledclient/MapsClientTests.cs @@ -407,6 +407,8 @@ public override Stream? ContentStream set => _stream = value; } + public override BinaryData Content => BinaryData.FromStream(_stream!); + public override void Dispose() { _stream?.Dispose(); @@ -416,6 +418,16 @@ protected override PipelineResponseHeaders GetHeadersCore() { throw new NotImplementedException(); } + + public override BinaryData ReadContent(CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + + public override ValueTask ReadContentAsync(CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } } private class CustomHeaders : PipelineRequestHeaders