Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.net461.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,15 @@ public abstract partial class Response : System.ClientModel.Primitives.PipelineR
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
4 changes: 3 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.net472.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,15 @@ public abstract partial class Response : System.ClientModel.Primitives.PipelineR
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
4 changes: 3 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.net6.0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,15 @@ public abstract partial class Response : System.ClientModel.Primitives.PipelineR
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
4 changes: 3 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.netstandard2.0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,15 @@ public abstract partial class Response : System.ClientModel.Primitives.PipelineR
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
80 changes: 80 additions & 0 deletions sdk/core/Azure.Core/src/Internal/AzureBaseBuffersExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ namespace Azure.Core.Buffers
{
internal static class AzureBaseBuffersExtensions
{
// Same value as Stream.CopyTo uses by default
private const int DefaultCopyBufferSize = 81920;

public static async Task WriteAsync(this Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellation = default)
{
Argument.AssertNotNull(stream, nameof(stream));
Expand Down Expand Up @@ -87,5 +90,82 @@ public static async Task WriteAsync(this Stream stream, ReadOnlySequence<byte> b
ArrayPool<byte>.Shared.Return(array);
}
}

public static async Task CopyToAsync(this Stream source, Stream destination, TimeSpan timeout, CancellationToken cancellationToken)
{
using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeout);

// If cancellation is possible (whether due to network timeout or a user cancellation token being passed), then
// register callback to dispose the stream on cancellation.
if (timeout != Timeout.InfiniteTimeSpan || cancellationToken.CanBeCanceled)
{
cts.Token.Register(state => ((Stream?)state)?.Dispose(), source);
}

byte[] buffer = ArrayPool<byte>.Shared.Rent(DefaultCopyBufferSize);

try
{
while (true)
{
#pragma warning disable CA1835 // ReadAsync(Memory<>) overload is not available in all targets
int bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cts.Token).ConfigureAwait(false);
#pragma warning restore // ReadAsync(Memory<>) overload is not available in all targets
if (bytesRead == 0)
break;
await destination.WriteAsync(new ReadOnlyMemory<byte>(buffer, 0, bytesRead), cts.Token).ConfigureAwait(false);
}
}
catch (Exception ex) when (ex is ObjectDisposedException
or IOException
or OperationCanceledException
or NotSupportedException)
{
CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, cts.Token, ex, timeout);
throw;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}

public static void CopyTo(this Stream source, Stream destination, TimeSpan timeout, CancellationToken cancellationToken)
{
using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeout);

// If cancellation is possible (whether due to network timeout or a user cancellation token being passed), then
// register callback to dispose the stream on cancellation.
if (timeout != Timeout.InfiniteTimeSpan || cancellationToken.CanBeCanceled)
{
cts.Token.Register(state => ((Stream?)state)?.Dispose(), source);
}

byte[] buffer = ArrayPool<byte>.Shared.Rent(DefaultCopyBufferSize);

try
{
int read;
while ((read = source.Read(buffer, 0, buffer.Length)) != 0)
{
cts.Token.ThrowIfCancellationRequested();
destination.Write(buffer, 0, read);
}
}
catch (Exception ex) when (ex is ObjectDisposedException
or IOException
or OperationCanceledException
or NotSupportedException)
{
CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, cts.Token, ex, timeout);
throw;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
}
11 changes: 11 additions & 0 deletions sdk/core/Azure.Core/src/Pipeline/HttpClientTransport.Response.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.ClientModel.Primitives;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Core.Pipeline
{
Expand Down Expand Up @@ -41,6 +44,14 @@ public override Stream? ContentStream
set => _pipelineResponse.ContentStream = value;
}

public override BinaryData Content => _pipelineResponse.Content;

public override BinaryData ReadContent(CancellationToken cancellationToken = default)
=> _pipelineResponse.ReadContent(cancellationToken);

public override async ValueTask<BinaryData> ReadContentAsync(CancellationToken cancellationToken = default)
=> await base.ReadContentAsync(cancellationToken).ConfigureAwait(false);

protected internal override bool ContainsHeader(string name)
=> _pipelineResponse.Headers.TryGetValue(name, out _);

Expand Down
2 changes: 2 additions & 0 deletions sdk/core/Azure.Core/src/Pipeline/HttpWebRequestTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ public override Stream? ContentStream
}
}

// TODO: Implement Content and ReadContent

public override string ClientRequestId { get; set; }

public override void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ private async ValueTask ProcessSyncOrAsync(HttpMessage message, ReadOnlyMemory<H
{
_policy.Process(message, processor, -1);
}

message.Response.NetworkTimeout = invocationNetworkTimeout;
}
catch (TaskCanceledException e)
{
Expand Down
118 changes: 110 additions & 8 deletions sdk/core/Azure.Core/src/Response.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Core.Buffers;

namespace Azure
{
Expand All @@ -18,6 +21,9 @@ namespace Azure
public abstract class Response : PipelineResponse
#pragma warning restore AZC0012 // Avoid single word type names
{
// TODO(matell): The .NET Framework team plans to add BinaryData.Empty in dotnet/runtime#49670, and we can use it then.
private static readonly BinaryData s_EmptyBinaryData = new(Array.Empty<byte>());

/// <summary>
/// Gets the client request id that was sent to the server as <c>x-ms-client-request-id</c> headers.
/// </summary>
Expand All @@ -28,14 +34,6 @@ public abstract class Response : PipelineResponse
/// </summary>
public new virtual ResponseHeaders Headers => new ResponseHeaders(this);

/// <summary>
/// Gets the contents of HTTP response, if it is available.
/// </summary>
/// <remarks>
/// Throws <see cref="InvalidOperationException"/> when <see cref="PipelineResponse.ContentStream"/> is not a <see cref="MemoryStream"/>.
/// </remarks>
public new virtual BinaryData Content => base.Content;

/// <summary>
/// TBD.
/// </summary>
Expand All @@ -48,6 +46,44 @@ protected override PipelineResponseHeaders GetHeadersCore()
throw new NotImplementedException();
}

/// <summary>
/// Gets the contents of HTTP response, if it is available.
/// </summary>
/// <remarks>
/// Throws <see cref="InvalidOperationException"/> when content is not buffered.
/// </remarks>
public override BinaryData Content
{
get
{
if (ContentStream == null)
{
return s_EmptyBinaryData;
}

// Base implementation must create the buffer on-demand
// because it cannot know whether the ContentStream has
// been replaced since _bufferedContent was set. Subtypes
// should override this implementation to be able to use
// cached content.
if (ContentStream is not MemoryStream memoryContent)
{
throw new InvalidOperationException($"The response is not buffered.");
}

if (memoryContent.TryGetBuffer(out ArraySegment<byte> segment))
{
return new BinaryData(segment.AsMemory());
}
else
{
return new BinaryData(memoryContent.ToArray());
}
}
}

internal TimeSpan NetworkTimeout { get; set; }

internal HttpMessageSanitizer Sanitizer { get; set; } = HttpMessageSanitizer.Default;

internal RequestFailedDetailsParser? RequestFailedDetailsParser { get; set; }
Expand Down Expand Up @@ -124,6 +160,62 @@ internal static void DisposeStreamIfNotBuffered(ref Stream? stream)
}
}

/// <summary>
/// TBD.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public override BinaryData ReadContent(CancellationToken cancellationToken = default)
{
// Derived types should provide an implementation that allows caching
// to improve performance.
if (ContentStream is null)
{
return s_EmptyBinaryData;
}

MemoryStream bufferStream = new();

Stream? contentStream = ContentStream;
contentStream.CopyTo(bufferStream, NetworkTimeout, cancellationToken);
contentStream.Dispose();

bufferStream.Position = 0;
ContentStream = bufferStream;

return BinaryData.FromStream(bufferStream);
}

/// <summary>
/// TBD.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public override async ValueTask<BinaryData> ReadContentAsync(CancellationToken cancellationToken = default)
{
// Derived types should provide an implementation that allows caching
// to improve performance.
if (ContentStream is null)
{
return s_EmptyBinaryData;
}

MemoryStream bufferStream = new();

Stream? contentStream = ContentStream;
await contentStream.CopyToAsync(bufferStream, NetworkTimeout, cancellationToken).ConfigureAwait(false);
contentStream.Dispose();

bufferStream.Position = 0;
ContentStream = bufferStream;

return BinaryData.FromStream(bufferStream);
}

private class BufferedContentStream : MemoryStream { }

#region Private implementation subtypes of abstract Response types
private class AzureCoreResponse<T> : Response<T>
{
Expand Down Expand Up @@ -180,6 +272,16 @@ protected internal override bool TryGetHeaderValues(string name, [NotNullWhen(tr
{
throw new NotSupportedException(DefaultMessage);
}

public override BinaryData ReadContent(CancellationToken cancellationToken = default)
{
throw new NotSupportedException(DefaultMessage);
}

public override ValueTask<BinaryData> ReadContentAsync(CancellationToken cancellationToken = default)
{
throw new NotSupportedException(DefaultMessage);
}
}
#endregion
}
Expand Down
22 changes: 21 additions & 1 deletion sdk/core/Azure.Core/src/Shared/CancellationHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,25 @@ internal static void ThrowIfCancellationRequested(CancellationToken cancellation
ThrowOperationCanceledException(innerException: null, cancellationToken);
}
}

/// <summary>Throws a cancellation exception if cancellation has been requested via <paramref name="cancellationToken"/> or <paramref name="timeoutToken"/>.</summary>
/// <param name="cancellationToken">The customer provided token.</param>
/// <param name="timeoutToken">The linked token that is cancelled on timeout provided token.</param>
/// <param name="innerException">The inner exception to use.</param>
/// <param name="timeout">The timeout used for the operation.</param>
#pragma warning disable CA1068 // Cancellation token has to be the last parameter
internal static void ThrowIfCancellationRequestedOrTimeout(CancellationToken cancellationToken, CancellationToken timeoutToken, Exception? innerException, TimeSpan timeout)
#pragma warning restore CA1068
{
ThrowIfCancellationRequested(cancellationToken);

if (timeoutToken.IsCancellationRequested)
{
throw CreateOperationCanceledException(
innerException,
timeoutToken,
$"The operation was cancelled because it exceeded the configured timeout of {timeout:g}. ");
}
}
}
}
}
Loading