Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Commit

Permalink
Refactored HTTP Out (#2171)
Browse files Browse the repository at this point in the history
  • Loading branch information
KrzysztofCwalina authored Mar 20, 2018
1 parent 0236c48 commit ea28ffe
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 117 deletions.
21 changes: 12 additions & 9 deletions samples/AzCopyCore/AzCopyCore/AzCopyCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<!--
<RuntimeFrameworkVersion>2.1.0-preview2-26209-04</RuntimeFrameworkVersion>
-->
<CoreFxPackageVersion>4.5.0-preview2-26213-06</CoreFxPackageVersion>
<CoreFxLabPackageVersion>0.1.0-preview2-180213-4</CoreFxLabPackageVersion>
</PropertyGroup>
Expand All @@ -19,18 +21,19 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.DotNet.ILCompiler" Version="1.0.0-alpha-26223-02" />
<PackageReference Include="System.Memory" Version="$(CoreFxPackageVersion)" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="$(CoreFxPackageVersion)" />
<PackageReference Include="Microsoft.DotNet.ILCompiler" Version="1.0.0-alpha-26320-02" />
<PackageReference Include="System.Buffers.ReaderWriter" Version="0.1.0-preview2-180320-2" />
<PackageReference Include="System.Memory" Version="4.5.0-preview3-26319-04" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.0-preview3-26319-04" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="System.Azure.Experimental" Version="$(CoreFxLabPackageVersion)" />
<PackageReference Include="System.Buffers.Experimental" Version="$(CoreFxLabPackageVersion)" />
<PackageReference Include="System.Text.Http" Version="$(CoreFxLabPackageVersion)" />
<PackageReference Include="System.Text.Http.Parser" Version="$(CoreFxLabPackageVersion)" />
<PackageReference Include="System.Text.Utf8String" Version="$(CoreFxLabPackageVersion)" />
<PackageReference Include="System.IO.Pipelines" Version="$(CoreFxLabPackageVersion)" />
<PackageReference Include="System.Azure.Experimental" Version="0.1.0-preview2-180320-2" />
<PackageReference Include="System.Buffers.Experimental" Version="0.1.0-preview2-180320-2" />
<PackageReference Include="System.Text.Http" Version="0.1.0-preview2-180320-2" />
<PackageReference Include="System.Text.Http.Parser" Version="0.1.0-preview2-180320-2" />
<PackageReference Include="System.Text.Utf8String" Version="0.1.0-preview2-180320-2" />
<PackageReference Include="System.IO.Pipelines" Version="4.5.0-preview3-26319-04" />
</ItemGroup>

</Project>
6 changes: 3 additions & 3 deletions samples/AzCopyCore/AzCopyCore/Helpers/CommandLine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public bool Contains(ReadOnlySpan<char> optionName)
{
for (int i = 0; i < _options.Length; i++)
{
if (_options[i].AsReadOnlySpan().StartsWith(optionName)) return true;
if (_options[i].AsSpan().StartsWith(optionName)) return true;
}
return false;
}
Expand All @@ -34,7 +34,7 @@ public ReadOnlyMemory<char> Get(string optionName)
string candidate = _options[i];
if (candidate.StartsWith(optionName))
{
return candidate.AsReadOnlyMemory().Slice(optionName.Length);
return candidate.AsMemory().Slice(optionName.Length);
}
}
return ReadOnlyMemory<char>.Empty;
Expand All @@ -46,7 +46,7 @@ public ReadOnlySpan<char> GetSpan(ReadOnlySpan<char> optionName)

for (int i = 0; i < _options.Length; i++)
{
ReadOnlySpan<char> candidate = _options[i].AsReadOnlySpan();
ReadOnlySpan<char> candidate = _options[i].AsSpan();
if (candidate.StartsWith(optionName))
{
return candidate.Slice(optionName.Length);
Expand Down
80 changes: 80 additions & 0 deletions samples/AzCopyCore/AzCopyCore/HttpClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Buffers;
using System.IO.Pipelines;
using System.Text.Http.Parser;
using System.Threading.Tasks;

// SocketClient is an experimental low-allocating/low-copy HTTP client API
// TODO (pri 2): need to support cancellations
namespace System.Net.Experimental
{
public readonly struct PipeHttpClient
{
static readonly HttpParser s_headersParser = new HttpParser();

readonly IDuplexPipe _pipe;

public PipeHttpClient(IDuplexPipe pipe)
{
_pipe = pipe;
}

public async ValueTask<TResponse> SendRequest<TRequest, TResponse>(TRequest request)
where TRequest : IPipeWritable
where TResponse : IHttpResponseHandler, new()
{
await request.WriteAsync(_pipe.Output).ConfigureAwait(false);

PipeReader reader = _pipe.Input;
TResponse response = await ParseResponseAsync<TResponse>(reader).ConfigureAwait(false);
await response.OnBody(reader);
return response;
}

static async ValueTask<T> ParseResponseAsync<T>(PipeReader reader)
where T : IHttpResponseHandler, new()
{
var handler = new T();
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
// TODO (pri 2): this should not be static, or ParseHeaders should be static
if (HttpParser.ParseResponseLine(ref handler, ref buffer, out int rlConsumed))
{
reader.AdvanceTo(buffer.GetPosition(rlConsumed));
break;
}
reader.AdvanceTo(buffer.Start, buffer.End);
}

while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
if (s_headersParser.ParseHeaders(ref handler, buffer, out int hdConsumed))
{
reader.AdvanceTo(buffer.GetPosition(hdConsumed));
break;
}
reader.AdvanceTo(buffer.Start, buffer.End);
}

await handler.OnBody(reader);
return handler;
}

public bool IsConnected => _pipe != null;
}

public interface IHttpResponseHandler : IHttpHeadersHandler, IHttpResponseLineHandler
{
ValueTask OnBody(PipeReader body);
}
}



31 changes: 18 additions & 13 deletions samples/AzCopyCore/AzCopyCore/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,19 @@ static void PrintUsage()
static void Main(string[] args)
{
Log.Listeners.Add(new ConsoleTraceListener());
Log.Switch.Level = SourceLevels.Error;

long before = GC.GetAllocatedBytesForCurrentThread();
Log.Switch.Level = SourceLevels.Information;

var options = new CommandLine(args);
ReadOnlySpan<char> source = options.GetSpan("/Source:");
ReadOnlySpan<char> destination = options.GetSpan("/Dest:");

long before = GC.GetAllocatedBytesForCurrentThread();
var sw = Stopwatch.StartNew();

// transfer from file system to storage
if (destination.StartsWith("http://"))
{
var sw = new Stopwatch();
sw.Start();
TransferDirectoryToStorage(source, destination, options);
sw.Stop();
Console.WriteLine("Elapsed time: " + sw.ElapsedMilliseconds + " ms");
}

// transfer from storage to file system
Expand All @@ -54,8 +51,14 @@ static void Main(string[] args)

else { PrintUsage(); }

sw.Stop();
long after = GC.GetAllocatedBytesForCurrentThread();
Console.WriteLine($"GC Allocations: {after - before} bytes");

if (Log != null && Log.Switch.ShouldTrace(TraceEventType.Information))
{
Log.TraceInformation("Elapsed time: " + sw.ElapsedMilliseconds + " ms");
Log.TraceInformation($"GC Allocations: {after - before} bytes");
}

if (Debugger.IsAttached)
{
Expand All @@ -76,7 +79,9 @@ static void TransferDirectoryToStorage(ReadOnlySpan<char> localDirectory, ReadOn
byte[] keyBytes;
if (options.Contains("/DestKey:"))
{
keyBytes = options["/DestKey:"].ComputeKeyBytes();
ReadOnlySpan<char> key = options["/DestKey:"];
var str = key.ToString();
keyBytes = key.ComputeKeyBytes();
}
else if (options.Contains("/@:"))
{
Expand All @@ -96,9 +101,9 @@ static void TransferDirectoryToStorage(ReadOnlySpan<char> localDirectory, ReadOn
ReadOnlySpan<char> host = storageFullPath.Slice(0, pathStart);
ReadOnlySpan<char> path = storageFullPath.Slice(pathStart + 1);

using (var client = new StorageClient(keyBytes, account, host))
using (var client = new StorageClient(keyBytes, account, host, 80, Log))
{
client.Log = Log;
//var files = new Files(directoryPath);
foreach (string filepath in Directory.EnumerateFiles(directoryPath))
{
// TODO: Use Path.Join when it becomes available
Expand All @@ -115,7 +120,7 @@ static void TransferDirectoryToStorage(ReadOnlySpan<char> localDirectory, ReadOn
}
}

private static bool TryGetKey(string filename, out ReadOnlySpan<char> line)
static bool TryGetKey(string filename, out ReadOnlySpan<char> line)
{
if (!File.Exists(filename))
{
Expand All @@ -129,7 +134,7 @@ private static bool TryGetKey(string filename, out ReadOnlySpan<char> line)
string firstLine;
if ((firstLine = streamReader.ReadLine()) != null)
{
line = firstLine.AsReadOnlySpan();
line = firstLine.AsSpan();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,12 @@
using System.IO.Pipelines;
using System.Net.Security;
using System.Net.Sockets;
using System.Text.Http.Parser;
using System.Text.Utf8;
using System.Threading.Tasks;
using static System.Buffers.Text.Encodings;

// SocketClient is an experimental low-allocating/low-copy HTTP client API
// TODO (pri 2): need to support cancellations
namespace System.Net.Experimental
{
public interface IResponse : IHttpHeadersHandler, IHttpResponseLineHandler
{
void OnBody(PipeReader body);
}

public abstract class RequestWriter<T> where T : IPipeWritable
{
public abstract Text.Http.Parser.Http.Method Verb { get; }

public async Task WriteAsync(PipeWriter writer, T request)
{
WriteRequestLineAndHeaders(writer, ref request);
await WriteBody(writer, request).ConfigureAwait(false);
await writer.FlushAsync();
}

// TODO (pri 2): writing the request line should not be abstract; writing headers should.
protected abstract void WriteRequestLineAndHeaders(PipeWriter writer, ref T request);
protected virtual Task WriteBody(PipeWriter writer, T request) { return Task.CompletedTask; }
}

public struct SocketClient : IDisposable
public class SocketPipe : IDisposable, IDuplexPipe
{
readonly Pipe _requestPipe;
readonly Pipe _responsePipe;
Expand All @@ -49,7 +25,7 @@ public struct SocketClient : IDisposable
Task _requestWriter;
public TraceSource Log;

SocketClient(Socket socket, SslStream stream)
SocketPipe(Socket socket, SslStream stream)
{
_socket = socket;
_stream = stream;
Expand All @@ -60,7 +36,7 @@ public struct SocketClient : IDisposable
Log = null;
}

public static async Task<SocketClient> ConnectAsync(string host, int port, bool tls = false)
public static async Task<SocketPipe> ConnectAsync(string host, int port, bool tls = false)
{
var socket = new Socket(SocketType.Stream, ProtocolType.Tcp);

Expand All @@ -78,54 +54,13 @@ public static async Task<SocketClient> ConnectAsync(string host, int port, bool
await socket.ConnectAsync(host, port).ConfigureAwait(false);
}

var client = new SocketClient(socket, tlsStream);
var client = new SocketPipe(socket, tlsStream);
client._responseReader = client.ReceiveAsync();
client._requestWriter = client.SendAsync();

return client;
}

public async ValueTask<TResponse> SendRequest<TRequest, TResponse>(TRequest request)
where TRequest : IPipeWritable
where TResponse : IResponse, new()
{
await request.WriteAsync(_requestPipe.Writer).ConfigureAwait(false);

PipeReader reader = _responsePipe.Reader;
TResponse response = await ParseAsync<TResponse>(reader, Log).ConfigureAwait(false);
response.OnBody(reader);
return response;
}

static HttpParser s_headersParser = new HttpParser();

// TODO (pri 3): Add to the platform, but this would require common logging API
public static async ValueTask<T> ParseAsync<T>(PipeReader reader, TraceSource log = null)
where T : IHttpResponseLineHandler, IHttpHeadersHandler, new()
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;

if (log != null) log.TraceInformation("RESPONSE:\n{0}", new Utf8String(buffer.First.Span));

var handler = new T();
// TODO (pri 2): this should not be static, or all should be static
if (!HttpParser.ParseResponseLine(ref handler, ref buffer, out int rlConsumed))
{
throw new NotImplementedException("could not parse the response");
}

buffer = buffer.Slice(rlConsumed);
if (!s_headersParser.ParseHeaders(ref handler, buffer, out int hdConsumed))
{
throw new NotImplementedException("could not parse the response");
}

reader.AdvanceTo(buffer.GetPosition(buffer.Start, hdConsumed));

return handler;
}

async Task SendAsync()
{
PipeReader reader = _requestPipe.Reader;
Expand All @@ -142,6 +77,9 @@ async Task SendAsync()
{
for (SequencePosition position = buffer.Start; buffer.TryGet(ref position, out ReadOnlyMemory<byte> segment);)
{
if (Log != null && Log.Switch.ShouldTrace(TraceEventType.Information))
Log.TraceInformation(Utf8.ToString(segment.Span));

await WriteToSocketAsync(segment).ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -182,7 +120,8 @@ async Task ReceiveAsync()
int readBytes = await ReadFromSocketAsync(buffer).ConfigureAwait(false);
if (readBytes == 0) break;

if (Log != null) Log.TraceInformation(new Utf8String(buffer.Span.Slice(0, readBytes)).ToString());
if (Log != null && Log.Switch.ShouldTrace(TraceEventType.Information))
Log.TraceInformation(Utf8.ToString(buffer.Span.Slice(0, readBytes)));

writer.Advance(readBytes);
await writer.FlushAsync();
Expand Down Expand Up @@ -236,6 +175,10 @@ public void Dispose()
}

public bool IsConnected => _socket != null;

public PipeReader Input => _responsePipe.Reader;

public PipeWriter Output => _requestPipe.Writer;
}
}

Expand Down
Loading

0 comments on commit ea28ffe

Please sign in to comment.