diff --git a/samples/AzCopyCore/AzCopyCore/AzCopyCore.csproj b/samples/AzCopyCore/AzCopyCore/AzCopyCore.csproj index 1a49845b976..ae53bf9a84d 100644 --- a/samples/AzCopyCore/AzCopyCore/AzCopyCore.csproj +++ b/samples/AzCopyCore/AzCopyCore/AzCopyCore.csproj @@ -3,7 +3,9 @@ Exe netcoreapp2.1 + 4.5.0-preview2-26213-06 0.1.0-preview2-180213-4 @@ -19,18 +21,19 @@ - - - + + + + - - - - - - + + + + + + diff --git a/samples/AzCopyCore/AzCopyCore/Helpers/CommandLine.cs b/samples/AzCopyCore/AzCopyCore/Helpers/CommandLine.cs index 7502afc0cac..e521cd17eab 100644 --- a/samples/AzCopyCore/AzCopyCore/Helpers/CommandLine.cs +++ b/samples/AzCopyCore/AzCopyCore/Helpers/CommandLine.cs @@ -18,7 +18,7 @@ public bool Contains(ReadOnlySpan optionName) { for (int i = 0; i < _options.Length; i++) { - if (_options[i].AsReadOnlySpan().StartsWith(optionName)) return true; + if (_options[i].AsSpan().StartsWith(optionName)) return true; } return false; } @@ -34,7 +34,7 @@ public ReadOnlyMemory Get(string optionName) string candidate = _options[i]; if (candidate.StartsWith(optionName)) { - return candidate.AsReadOnlyMemory().Slice(optionName.Length); + return candidate.AsMemory().Slice(optionName.Length); } } return ReadOnlyMemory.Empty; @@ -46,7 +46,7 @@ public ReadOnlySpan GetSpan(ReadOnlySpan optionName) for (int i = 0; i < _options.Length; i++) { - ReadOnlySpan candidate = _options[i].AsReadOnlySpan(); + ReadOnlySpan candidate = _options[i].AsSpan(); if (candidate.StartsWith(optionName)) { return candidate.Slice(optionName.Length); diff --git a/samples/AzCopyCore/AzCopyCore/HttpClient.cs b/samples/AzCopyCore/AzCopyCore/HttpClient.cs new file mode 100644 index 00000000000..b10bfad5665 --- /dev/null +++ b/samples/AzCopyCore/AzCopyCore/HttpClient.cs @@ -0,0 +1,80 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Buffers; +using System.IO.Pipelines; +using System.Text.Http.Parser; +using System.Threading.Tasks; + +// SocketClient is an experimental low-allocating/low-copy HTTP client API +// TODO (pri 2): need to support cancellations +namespace System.Net.Experimental +{ + public readonly struct PipeHttpClient + { + static readonly HttpParser s_headersParser = new HttpParser(); + + readonly IDuplexPipe _pipe; + + public PipeHttpClient(IDuplexPipe pipe) + { + _pipe = pipe; + } + + public async ValueTask SendRequest(TRequest request) + where TRequest : IPipeWritable + where TResponse : IHttpResponseHandler, new() + { + await request.WriteAsync(_pipe.Output).ConfigureAwait(false); + + PipeReader reader = _pipe.Input; + TResponse response = await ParseResponseAsync(reader).ConfigureAwait(false); + await response.OnBody(reader); + return response; + } + + static async ValueTask ParseResponseAsync(PipeReader reader) + where T : IHttpResponseHandler, new() + { + var handler = new T(); + while (true) + { + ReadResult result = await reader.ReadAsync(); + ReadOnlySequence buffer = result.Buffer; + // TODO (pri 2): this should not be static, or ParseHeaders should be static + if (HttpParser.ParseResponseLine(ref handler, ref buffer, out int rlConsumed)) + { + reader.AdvanceTo(buffer.GetPosition(rlConsumed)); + break; + } + reader.AdvanceTo(buffer.Start, buffer.End); + } + + while (true) + { + ReadResult result = await reader.ReadAsync(); + ReadOnlySequence buffer = result.Buffer; + if (s_headersParser.ParseHeaders(ref handler, buffer, out int hdConsumed)) + { + reader.AdvanceTo(buffer.GetPosition(hdConsumed)); + break; + } + reader.AdvanceTo(buffer.Start, buffer.End); + } + + await handler.OnBody(reader); + return handler; + } + + public bool IsConnected => _pipe != null; + } + + public interface IHttpResponseHandler : IHttpHeadersHandler, IHttpResponseLineHandler + { + ValueTask OnBody(PipeReader body); + } +} + + + diff --git a/samples/AzCopyCore/AzCopyCore/Program.cs b/samples/AzCopyCore/AzCopyCore/Program.cs index 42cdd23fe9f..44a127662fa 100644 --- a/samples/AzCopyCore/AzCopyCore/Program.cs +++ b/samples/AzCopyCore/AzCopyCore/Program.cs @@ -28,22 +28,19 @@ static void PrintUsage() static void Main(string[] args) { Log.Listeners.Add(new ConsoleTraceListener()); - Log.Switch.Level = SourceLevels.Error; - - long before = GC.GetAllocatedBytesForCurrentThread(); + Log.Switch.Level = SourceLevels.Information; var options = new CommandLine(args); ReadOnlySpan source = options.GetSpan("/Source:"); ReadOnlySpan destination = options.GetSpan("/Dest:"); + long before = GC.GetAllocatedBytesForCurrentThread(); + var sw = Stopwatch.StartNew(); + // transfer from file system to storage if (destination.StartsWith("http://")) { - var sw = new Stopwatch(); - sw.Start(); TransferDirectoryToStorage(source, destination, options); - sw.Stop(); - Console.WriteLine("Elapsed time: " + sw.ElapsedMilliseconds + " ms"); } // transfer from storage to file system @@ -54,8 +51,14 @@ static void Main(string[] args) else { PrintUsage(); } + sw.Stop(); long after = GC.GetAllocatedBytesForCurrentThread(); - Console.WriteLine($"GC Allocations: {after - before} bytes"); + + if (Log != null && Log.Switch.ShouldTrace(TraceEventType.Information)) + { + Log.TraceInformation("Elapsed time: " + sw.ElapsedMilliseconds + " ms"); + Log.TraceInformation($"GC Allocations: {after - before} bytes"); + } if (Debugger.IsAttached) { @@ -76,7 +79,9 @@ static void TransferDirectoryToStorage(ReadOnlySpan localDirectory, ReadOn byte[] keyBytes; if (options.Contains("/DestKey:")) { - keyBytes = options["/DestKey:"].ComputeKeyBytes(); + ReadOnlySpan key = options["/DestKey:"]; + var str = key.ToString(); + keyBytes = key.ComputeKeyBytes(); } else if (options.Contains("/@:")) { @@ -96,9 +101,9 @@ static void TransferDirectoryToStorage(ReadOnlySpan localDirectory, ReadOn ReadOnlySpan host = storageFullPath.Slice(0, pathStart); ReadOnlySpan path = storageFullPath.Slice(pathStart + 1); - using (var client = new StorageClient(keyBytes, account, host)) + using (var client = new StorageClient(keyBytes, account, host, 80, Log)) { - client.Log = Log; + //var files = new Files(directoryPath); foreach (string filepath in Directory.EnumerateFiles(directoryPath)) { // TODO: Use Path.Join when it becomes available @@ -115,7 +120,7 @@ static void TransferDirectoryToStorage(ReadOnlySpan localDirectory, ReadOn } } - private static bool TryGetKey(string filename, out ReadOnlySpan line) + static bool TryGetKey(string filename, out ReadOnlySpan line) { if (!File.Exists(filename)) { @@ -129,7 +134,7 @@ private static bool TryGetKey(string filename, out ReadOnlySpan line) string firstLine; if ((firstLine = streamReader.ReadLine()) != null) { - line = firstLine.AsReadOnlySpan(); + line = firstLine.AsSpan(); } else { diff --git a/samples/AzCopyCore/AzCopyCore/SocketClient.cs b/samples/AzCopyCore/AzCopyCore/SocketPipe.cs similarity index 61% rename from samples/AzCopyCore/AzCopyCore/SocketClient.cs rename to samples/AzCopyCore/AzCopyCore/SocketPipe.cs index 5dfd7626cae..56e7e0a354b 100644 --- a/samples/AzCopyCore/AzCopyCore/SocketClient.cs +++ b/samples/AzCopyCore/AzCopyCore/SocketPipe.cs @@ -8,36 +8,12 @@ using System.IO.Pipelines; using System.Net.Security; using System.Net.Sockets; -using System.Text.Http.Parser; -using System.Text.Utf8; using System.Threading.Tasks; +using static System.Buffers.Text.Encodings; -// SocketClient is an experimental low-allocating/low-copy HTTP client API -// TODO (pri 2): need to support cancellations namespace System.Net.Experimental { - public interface IResponse : IHttpHeadersHandler, IHttpResponseLineHandler - { - void OnBody(PipeReader body); - } - - public abstract class RequestWriter where T : IPipeWritable - { - public abstract Text.Http.Parser.Http.Method Verb { get; } - - public async Task WriteAsync(PipeWriter writer, T request) - { - WriteRequestLineAndHeaders(writer, ref request); - await WriteBody(writer, request).ConfigureAwait(false); - await writer.FlushAsync(); - } - - // TODO (pri 2): writing the request line should not be abstract; writing headers should. - protected abstract void WriteRequestLineAndHeaders(PipeWriter writer, ref T request); - protected virtual Task WriteBody(PipeWriter writer, T request) { return Task.CompletedTask; } - } - - public struct SocketClient : IDisposable + public class SocketPipe : IDisposable, IDuplexPipe { readonly Pipe _requestPipe; readonly Pipe _responsePipe; @@ -49,7 +25,7 @@ public struct SocketClient : IDisposable Task _requestWriter; public TraceSource Log; - SocketClient(Socket socket, SslStream stream) + SocketPipe(Socket socket, SslStream stream) { _socket = socket; _stream = stream; @@ -60,7 +36,7 @@ public struct SocketClient : IDisposable Log = null; } - public static async Task ConnectAsync(string host, int port, bool tls = false) + public static async Task ConnectAsync(string host, int port, bool tls = false) { var socket = new Socket(SocketType.Stream, ProtocolType.Tcp); @@ -78,54 +54,13 @@ public static async Task ConnectAsync(string host, int port, bool await socket.ConnectAsync(host, port).ConfigureAwait(false); } - var client = new SocketClient(socket, tlsStream); + var client = new SocketPipe(socket, tlsStream); client._responseReader = client.ReceiveAsync(); client._requestWriter = client.SendAsync(); return client; } - public async ValueTask SendRequest(TRequest request) - where TRequest : IPipeWritable - where TResponse : IResponse, new() - { - await request.WriteAsync(_requestPipe.Writer).ConfigureAwait(false); - - PipeReader reader = _responsePipe.Reader; - TResponse response = await ParseAsync(reader, Log).ConfigureAwait(false); - response.OnBody(reader); - return response; - } - - static HttpParser s_headersParser = new HttpParser(); - - // TODO (pri 3): Add to the platform, but this would require common logging API - public static async ValueTask ParseAsync(PipeReader reader, TraceSource log = null) - where T : IHttpResponseLineHandler, IHttpHeadersHandler, new() - { - ReadResult result = await reader.ReadAsync(); - ReadOnlySequence buffer = result.Buffer; - - if (log != null) log.TraceInformation("RESPONSE:\n{0}", new Utf8String(buffer.First.Span)); - - var handler = new T(); - // TODO (pri 2): this should not be static, or all should be static - if (!HttpParser.ParseResponseLine(ref handler, ref buffer, out int rlConsumed)) - { - throw new NotImplementedException("could not parse the response"); - } - - buffer = buffer.Slice(rlConsumed); - if (!s_headersParser.ParseHeaders(ref handler, buffer, out int hdConsumed)) - { - throw new NotImplementedException("could not parse the response"); - } - - reader.AdvanceTo(buffer.GetPosition(buffer.Start, hdConsumed)); - - return handler; - } - async Task SendAsync() { PipeReader reader = _requestPipe.Reader; @@ -142,6 +77,9 @@ async Task SendAsync() { for (SequencePosition position = buffer.Start; buffer.TryGet(ref position, out ReadOnlyMemory segment);) { + if (Log != null && Log.Switch.ShouldTrace(TraceEventType.Information)) + Log.TraceInformation(Utf8.ToString(segment.Span)); + await WriteToSocketAsync(segment).ConfigureAwait(false); } } @@ -182,7 +120,8 @@ async Task ReceiveAsync() int readBytes = await ReadFromSocketAsync(buffer).ConfigureAwait(false); if (readBytes == 0) break; - if (Log != null) Log.TraceInformation(new Utf8String(buffer.Span.Slice(0, readBytes)).ToString()); + if (Log != null && Log.Switch.ShouldTrace(TraceEventType.Information)) + Log.TraceInformation(Utf8.ToString(buffer.Span.Slice(0, readBytes))); writer.Advance(readBytes); await writer.FlushAsync(); @@ -236,6 +175,10 @@ public void Dispose() } public bool IsConnected => _socket != null; + + public PipeReader Input => _responsePipe.Reader; + + public PipeWriter Output => _requestPipe.Writer; } } diff --git a/samples/AzCopyCore/AzCopyCore/StorageClient.cs b/samples/AzCopyCore/AzCopyCore/StorageClient.cs index af3b5b2e1e1..4cc66cc42e4 100644 --- a/samples/AzCopyCore/AzCopyCore/StorageClient.cs +++ b/samples/AzCopyCore/AzCopyCore/StorageClient.cs @@ -18,36 +18,33 @@ namespace System.Azure.Storage { public class StorageClient : IDisposable { - SocketClient _socket; + PipeHttpClient _client; + SocketPipe _pipe; Sha256 _hash; string _host; int _port; string _accountName; - TraceSource _log; - public StorageClient(ReadOnlySpan masterKey, ReadOnlySpan accountName, ReadOnlySpan host, int port = 80) + public StorageClient(ReadOnlySpan masterKey, ReadOnlySpan accountName, ReadOnlySpan host, int port = 80, TraceSource log = null) { _host = new string(host); _accountName = new string(accountName); _port = port; byte[] keyBytes = Key.ComputeKeyBytes(masterKey); _hash = Sha256.Create(keyBytes); - + Log = log; } - public StorageClient(byte[] keyBytes, ReadOnlySpan accountName, ReadOnlySpan host, int port = 80) + public StorageClient(byte[] keyBytes, ReadOnlySpan accountName, ReadOnlySpan host, int port = 80, TraceSource log = null) { _host = new string(host); _accountName = new string(accountName); _port = port; _hash = Sha256.Create(keyBytes); + Log = log; } - public TraceSource Log - { - get { return _log; } - set { _log = value; _socket.Log = Log; } - } + public TraceSource Log { get; } public string Host => _host; @@ -58,14 +55,15 @@ public TraceSource Log public async ValueTask SendRequest(TRequest request) where TRequest : IStorageRequest { - if (!_socket.IsConnected) + if (!_client.IsConnected) { - _socket = await SocketClient.ConnectAsync(_host, _port).ConfigureAwait(false); - _socket.Log = Log; + _pipe = await SocketPipe.ConnectAsync(_host, _port).ConfigureAwait(false); + _pipe.Log = Log; + _client = new PipeHttpClient(_pipe); } request.Client = this; - StorageResponse response = await _socket.SendRequest(request).ConfigureAwait(false); + StorageResponse response = await _client.SendRequest(request).ConfigureAwait(false); if (request.ConsumeBody) await ConsumeResponseBody(response.Body); return response; } @@ -81,12 +79,12 @@ static async Task ConsumeResponseBody(PipeReader reader) public void Dispose() { - _socket.Dispose(); + _pipe.Dispose(); _hash.Dispose(); } } - public struct StorageResponse : IResponse + public struct StorageResponse : IHttpResponseHandler { static byte[] s_contentLength = Encoding.UTF8.GetBytes("Content-Length"); @@ -111,9 +109,10 @@ public void OnHeader(ReadOnlySpan name, ReadOnlySpan value) } } - public void OnBody(PipeReader body) + public ValueTask OnBody(PipeReader body) { Body = body; + return default; } } } diff --git a/samples/AzCopyCore/AzCopyCore/StorageRequests.cs b/samples/AzCopyCore/AzCopyCore/StorageRequests.cs index fb30b88277c..12f0011aea1 100644 --- a/samples/AzCopyCore/AzCopyCore/StorageRequests.cs +++ b/samples/AzCopyCore/AzCopyCore/StorageRequests.cs @@ -3,11 +3,9 @@ // See the LICENSE file in the project root for more information. using System.Azure.Authentication; -using System.Buffers.Text; -using System.Buffers.Transformations; +using System.Buffers.Writer; using System.IO; using System.IO.Pipelines; -using System.Net.Experimental; using System.Text; using System.Text.Http.Formatter; using System.Text.Http.Parser; @@ -25,6 +23,22 @@ public interface IStorageRequest : IPipeWritable } + public abstract class RequestWriter where T : IPipeWritable + { + public abstract Text.Http.Parser.Http.Method Verb { get; } + + public async Task WriteAsync(PipeWriter writer, T request) + { + WriteRequestLineAndHeaders(writer, ref request); + await WriteBody(writer, request).ConfigureAwait(false); + await writer.FlushAsync(); + } + + // TODO (pri 2): writing the request line should not be abstract; writing headers should. + protected abstract void WriteRequestLineAndHeaders(PipeWriter writer, ref T request); + protected virtual Task WriteBody(PipeWriter writer, T request) { return Task.CompletedTask; } + } + // This is a helper class for impementing writers for various Storage requests. // Subclasses of this type (see below) know how to write specific headers for each Storage Rest API. abstract class StorageRequestWriter : RequestWriter where T : IStorageRequest @@ -56,7 +70,7 @@ protected override void WriteRequestLineAndHeaders(PipeWriter writer, ref T argu AccountName = arguments.Client.AccountName, CanonicalizedResource = arguments.CanonicalizedResource, // TODO (pri 1): this allocation should be eliminated - CanonicalizedHeaders = new WritableBytes(headersBuffer.ToArray()), + CanonicalizedHeaders = headersBuffer.ToArray(), ContentLength = arguments.ContentLength }; // TODO (pri 3): the default should be defaulted