diff --git a/samples/AzCopyCore/AzCopyCore.sln b/samples/AzCopyCore/AzCopyCore.sln new file mode 100644 index 00000000000..5f45f5a23de --- /dev/null +++ b/samples/AzCopyCore/AzCopyCore.sln @@ -0,0 +1,28 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.27323.2 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzCopyCore", "AzCopyCore\AzCopyCore.csproj", "{0D5ECA4A-2037-46FD-A684-372C7DDCB577}" +EndProject +Global + GlobalSection(Performance) = preSolution + HasPerformanceSessions = true + EndGlobalSection + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {0D5ECA4A-2037-46FD-A684-372C7DDCB577}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0D5ECA4A-2037-46FD-A684-372C7DDCB577}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0D5ECA4A-2037-46FD-A684-372C7DDCB577}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0D5ECA4A-2037-46FD-A684-372C7DDCB577}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {1C22ED6F-9702-40F9-8BFC-CB5192009D9E} + EndGlobalSection +EndGlobal diff --git a/samples/AzCopyCore/AzCopyCore/AzCopyCore.csproj b/samples/AzCopyCore/AzCopyCore/AzCopyCore.csproj new file mode 100644 index 00000000000..141dd99c4c2 --- /dev/null +++ b/samples/AzCopyCore/AzCopyCore/AzCopyCore.csproj @@ -0,0 +1,29 @@ + + + + Exe + netcoreapp2.1 + 2.1.0-preview2-26209-04 + + + + 7.2 + + + + + + + + + + + + + + + + + + + diff --git a/samples/AzCopyCore/AzCopyCore/Program.cs b/samples/AzCopyCore/AzCopyCore/Program.cs new file mode 100644 index 00000000000..87d95aa8ffd --- /dev/null +++ b/samples/AzCopyCore/AzCopyCore/Program.cs @@ -0,0 +1,168 @@ +using System; +using System.Azure.Authentication; +using System.Azure.Storage; +using System.Azure.Storage.Requests; +using System.Buffers; +using System.Diagnostics; +using System.IO; +using System.Threading.Tasks; + +static class Program +{ + internal static TraceSource Log = new TraceSource("AzCopyCore"); + + static void PrintUsage() + { + Console.WriteLine("dotnet AzCopyCore.dll /Source: /Dest: [Options]"); + Console.WriteLine("\tOptions:"); + Console.WriteLine("\t - /DestKey:"); + Console.WriteLine("\t - /SourceKey:"); + } + + static void Main(string[] args) + { + Log.Listeners.Add(new TextWriterTraceListener(Console.Out)); + Log.Switch.Level = SourceLevels.Error; + + var options = new CommandOptions(args); + ReadOnlyMemory source = options.Get("/Source:"); + ReadOnlyMemory destination = options.Get("/Dest:"); + + // transfer from file system to storage + if (destination.Span.StartsWith("http://")) { + TransferDirectoryToStorage(source, destination, options); + } + + // transfer from storage to file system + else if (source.Span.StartsWith("http://")) { + TransferStorageFileToDirectory(source, destination, options); + } + + else { PrintUsage(); } + + if (Debugger.IsAttached) + { + Console.WriteLine("Press ENTER to exit ..."); + Console.ReadLine(); + } + } + + static void TransferDirectoryToStorage(ReadOnlyMemory localDirectory, ReadOnlyMemory storageDirectory, CommandOptions options) + { + var directoryPath = new string(localDirectory.Span); + if (!Directory.Exists(directoryPath)) + { + Console.WriteLine($"Source directory not found."); + return; + } + if (!options.Contains("/DestKey:")) + { + Console.WriteLine("/DestKey option not found."); + return; + } + + ReadOnlyMemory key = options.Get("/DestKey:"); + byte[] keyBytes = key.Span.ComputeKeyBytes(); + + ReadOnlyMemory storageFullPath = storageDirectory.Slice(7); + int pathStart = storageFullPath.Span.IndexOf('/'); + ReadOnlyMemory host = storageFullPath.Slice(0, pathStart); + ReadOnlyMemory path = storageFullPath.Slice(pathStart + 1); + ReadOnlyMemory account = storageFullPath.Slice(0, storageFullPath.Span.IndexOf('.')); + + using (var client = new StorageClient(keyBytes, account, host)) + { + client.Log = Log; + foreach (var filepath in Directory.EnumerateFiles(directoryPath)) + { + var filename = Path.GetFileName(filepath); + var storagePath = new string(path.Span) + "/" + filename; + + // TODO (pri 3): this loop keeps going through all files, even if the key is wrong + if (CopyLocalFileToStorageFile(client, filepath, storagePath).Result) + { + Console.WriteLine($"Uploaded {filepath} to {storagePath}"); + } + } + } + } + + static void TransferStorageFileToDirectory(ReadOnlyMemory storageFile, ReadOnlyMemory localDirectory, CommandOptions options) + { + var directory = new string(localDirectory.Span); + if (!options.Contains("/SourceKey:")) + { + Console.WriteLine("/SourceKey option not found."); + return; + } + + ReadOnlyMemory key = options.Get("/SourceKey:"); + byte[] keyBytes = key.Span.ComputeKeyBytes(); + + ReadOnlyMemory storageFullPath = storageFile.Slice(7); + int pathStart = storageFullPath.Span.IndexOf('/'); + ReadOnlyMemory host = storageFullPath.Slice(0, pathStart); + ReadOnlyMemory storagePath = storageFullPath.Slice(pathStart + 1); + ReadOnlyMemory account = storageFullPath.Slice(0, storageFullPath.Span.IndexOf('.')); + ReadOnlyMemory file = storagePath.Slice(storagePath.Span.LastIndexOf('/') + 1); + + // TODO (pri 3): use the new directory APIs once they become avaliable + if (!Directory.Exists(directory)) + { + Directory.CreateDirectory(directory); + } + + string destinationPath = directory + "\\" + new string(file.Span); + using (var client = new StorageClient(keyBytes, account, host)) + { + if(CopyStorageFileToLocalFile(client, new string(storagePath.Span), destinationPath).Result) + { + Console.WriteLine($"Downloaded {storagePath} to {destinationPath}"); + } + } + } + + static async ValueTask CopyLocalFileToStorageFile(StorageClient client, string localFilePath, string storagePath) + { + // TODO (pri 3): make file i/o more efficient + FileInfo fileInfo = new FileInfo(localFilePath); + + var createRequest = new CreateFileRequest(storagePath, fileInfo.Length); + var response = await client.SendRequest(createRequest).ConfigureAwait(false); + + if (response.StatusCode == 201) + { + using (var bytes = new FileStream(localFilePath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true)) + { + var putRequest = new PutRangeRequest(storagePath, bytes); + response = await client.SendRequest(putRequest).ConfigureAwait(false); + if (response.StatusCode == 201) return true; + } + } + + Log.WriteError($"Response Error {response.StatusCode}"); + return false; + } + + static async ValueTask CopyStorageFileToLocalFile(StorageClient client, string storagePath, string localFilePath) + { + var request = new GetFileRequest(storagePath); + var response = await client.SendRequest(request).ConfigureAwait(false); + + if (response.StatusCode != 200) + { + Log.WriteError($"Response Error {response.StatusCode}"); + return false; + } + + ulong bytesToRead = response.ContentLength; + using (var file = new FileStream(localFilePath, FileMode.Create, FileAccess.Write, FileShare.None, 4096, FileOptions.Asynchronous)) + { + await file.WriteAsync(response.Body, bytesToRead); + } + + return true; + } +} + + diff --git a/samples/AzCopyCore/AzCopyCore/SocketClient.cs b/samples/AzCopyCore/AzCopyCore/SocketClient.cs new file mode 100644 index 00000000000..0883035a290 --- /dev/null +++ b/samples/AzCopyCore/AzCopyCore/SocketClient.cs @@ -0,0 +1,207 @@ +using System.Buffers; +using System.Diagnostics; +using System.IO; +using System.IO.Pipelines; +using System.Net.Security; +using System.Net.Sockets; +using System.Text.Http.Parser; +using System.Threading.Tasks; + +// SocketClient is an experimental low-allocating/low-copy HTTP client API +// TODO (pri 2): need to support cancellations +namespace System.Net.Experimental +{ + public interface IResponse : IHttpHeadersHandler, IHttpResponseLineHandler { + void OnBody(PipeReader body); + } + + public abstract class RequestWriter where T : IPipeWritable + { + public abstract Text.Http.Parser.Http.Method Verb { get; } + + public async Task WriteAsync(PipeWriter writer, T request) + { + WriteRequestLineAndHeaders(writer, ref request); + await WriteBody(writer, request).ConfigureAwait(false); + await writer.FlushAsync(); + } + + // TODO (pri 2): writing the request line should not be abstract; writing headers should. + protected abstract void WriteRequestLineAndHeaders(PipeWriter writer, ref T request); + protected virtual Task WriteBody(PipeWriter writer, T request) { return Task.CompletedTask; } + } + + public struct SocketClient : IDisposable + { + readonly Pipe _requestPipe; + readonly Pipe _responsePipe; + readonly Socket _socket; + readonly Stream _stream; + + // TODO (pri 3): would be nice to make this whole struct read-only + Task _responseReader; + Task _requestWriter; + public TraceSource Log; + + SocketClient(Socket socket, SslStream stream) + { + _socket = socket; + _stream = stream; + _requestPipe = new Pipe(); + _responsePipe = new Pipe(); + _responseReader = null; + _requestWriter = null; + Log = null; + } + + public static async Task ConnectAsync(string host, int port, bool tls = false) + { + var socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + + // TODO (pri 3): all this TLS code is not tested + // TODO (pri 3): would be great to get flat APIs for TLS + SslStream tlsStream = null; + if (tls) + { + var networkStream = new NetworkStream(socket); + tlsStream = new SslStream(networkStream); + await tlsStream.AuthenticateAsClientAsync(host).ConfigureAwait(false); + } + else + { + await socket.ConnectAsync(host, port).ConfigureAwait(false); + } + + var client = new SocketClient(socket, tlsStream); + client._responseReader = client.ReceiveAsync(); + client._requestWriter = client.SendAsync(); + + return client; + } + + public async ValueTask SendRequest(TRequest request) + where TRequest : IPipeWritable + where TResponse : IResponse, new() + { + await request.WriteAsync(_requestPipe.Writer).ConfigureAwait(false); + + var reader = _responsePipe.Reader; + var response = await HttpExtensions.ParseAsync(reader, Log).ConfigureAwait(false); + response.OnBody(reader); + return response; + } + + async Task SendAsync() + { + var reader = _requestPipe.Reader; + try + { + while (true) + { + var result = await reader.ReadAsync(); + var buffer = result.Buffer; + + try + { + if (!buffer.IsEmpty) + { + for (var position = buffer.Start; buffer.TryGet(ref position, out ReadOnlyMemory segment);) + { + await WriteToSocketAsync(segment).ConfigureAwait(false); + } + } + else if (result.IsCompleted) + { + break; + } + } + finally + { + reader.AdvanceTo(buffer.End); + } + } + } + catch(Exception e) + { + Log.WriteError(e.ToString()); + } + finally + { + reader.Complete(); + } + } + + async Task ReceiveAsync() + { + var writer = _responsePipe.Writer; + try + { + while (true) + { + // just wait for data in the socket + await ReadFromSocketAsync(Memory.Empty); + + while (HasData) + { + var buffer = writer.GetMemory(); + var readBytes = await ReadFromSocketAsync(buffer).ConfigureAwait(false); + if (readBytes == 0) break; + + if (Log != null) Log.WriteInformation($"RESPONSE {readBytes}", buffer.Slice(0, readBytes)); + + writer.Advance(readBytes); + await writer.FlushAsync(); + } + } + } + finally + { + writer.Complete(); + } + } + + async Task WriteToSocketAsync(ReadOnlyMemory buffer) + { + if (_stream != null) + { + await _stream.WriteAsync(buffer).ConfigureAwait(false); + await _stream.FlushAsync().ConfigureAwait(false); + } + else + { + await _socket.SendAsync(buffer, SocketFlags.None).ConfigureAwait(false); + } + } + + async ValueTask ReadFromSocketAsync(Memory buffer) + { + if (_stream != null) + { + return await _stream.ReadAsync(buffer).ConfigureAwait(false); + } + else + { + return await _socket.ReceiveAsync(buffer, SocketFlags.None).ConfigureAwait(false); + } + } + + bool HasData + { + get { + if (_stream != null) return _stream.Length != 0; + return _socket.Available != 0; + } + } + + public void Dispose() + { + _stream?.Dispose(); + _socket.Dispose(); + } + + public bool IsConnected => _socket != null; + } +} + + + diff --git a/samples/AzCopyCore/AzCopyCore/StorageClient.cs b/samples/AzCopyCore/AzCopyCore/StorageClient.cs new file mode 100644 index 00000000000..a92b7ce4b48 --- /dev/null +++ b/samples/AzCopyCore/AzCopyCore/StorageClient.cs @@ -0,0 +1,113 @@ +using System.Azure.Authentication; +using System.Azure.Storage.Requests; +using System.Buffers.Cryptography; +using System.Buffers.Text; +using System.Diagnostics; +using System.IO.Pipelines; +using System.Net.Experimental; +using System.Text; +using System.Text.Http.Parser; +using System.Threading.Tasks; + +namespace System.Azure.Storage +{ + public class StorageClient : IDisposable + { + SocketClient _socket; + Sha256 _hash; + string _host; + int _port; + string _accountName; + TraceSource _log; + + public StorageClient(ReadOnlyMemory masterKey, ReadOnlyMemory accountName, ReadOnlyMemory host, int port = 80) + { + _host = new string(host.Span); + _accountName = new string(accountName.Span); + _port = port; + var keyBytes = Key.ComputeKeyBytes(masterKey.Span); + _hash = Sha256.Create(keyBytes); + + } + + public StorageClient(byte[] keyBytes, ReadOnlyMemory accountName, ReadOnlyMemory host, int port = 80) + { + _host = new string(host.Span); + _accountName = new string(accountName.Span); + _port = port; + _hash = Sha256.Create(keyBytes); + } + + public TraceSource Log { + get { return _log; } + set { _log = value; _socket.Log = Log; } + } + + public string Host => _host; + + public string AccountName => _accountName; + + internal Sha256 Hash => _hash; + + public async ValueTask SendRequest(TRequest request) + where TRequest : IStorageRequest + { + if (!_socket.IsConnected) + { + _socket = await SocketClient.ConnectAsync(_host, _port).ConfigureAwait(false); + _socket.Log = Log; + } + request.Client = this; + + var response = await _socket.SendRequest(request).ConfigureAwait(false); + if (request.ConsumeBody) await ConsumeResponseBody(response.Body); + return response; + } + + // for some reason some responses contain a body, despite the fact that the MSDN docs say there is no body, so + // I need to skip the body without understanding what it is (it's "0\n\r\n\r", BTW) + static async Task ConsumeResponseBody(PipeReader reader) + { + var body = await reader.ReadAsync(); + var bodyBuffer = body.Buffer; + reader.AdvanceTo(bodyBuffer.End); + } + + public void Dispose() + { + _socket.Dispose(); + _hash.Dispose(); + } + } + + public struct StorageResponse : IResponse + { + static byte[] s_contentLength = Encoding.UTF8.GetBytes("Content-Length"); + + ulong _contentLength; + public ulong ContentLength => _contentLength; + public ushort StatusCode { get; private set; } + public PipeReader Body { get; private set; } + + public void OnStatusLine(Http.Version version, ushort statusCode, ReadOnlySpan status) + { + StatusCode = statusCode; + } + + public void OnHeader(ReadOnlySpan name, ReadOnlySpan value) + { + if (name.SequenceEqual(s_contentLength)) + { + if (!Utf8Parser.TryParse(value, out _contentLength, out _)) + { + throw new Exception("invalid header"); + } + } + } + + public void OnBody(PipeReader body) { + Body = body; + } + } +} + diff --git a/samples/AzCopyCore/AzCopyCore/StorageRequests.cs b/samples/AzCopyCore/AzCopyCore/StorageRequests.cs new file mode 100644 index 00000000000..eb7ca47bfc4 --- /dev/null +++ b/samples/AzCopyCore/AzCopyCore/StorageRequests.cs @@ -0,0 +1,196 @@ +using System.Azure.Authentication; +using System.Buffers; +using System.Buffers.Text; +using System.Buffers.Transformations; +using System.IO; +using System.IO.Pipelines; +using System.Net.Experimental; +using System.Text.Http.Formatter; +using System.Text.Http.Parser; +using System.Threading.Tasks; + +namespace System.Azure.Storage.Requests +{ + public interface IStorageRequest : IPipeWritable + { + StorageClient Client { get; set; } + string RequestPath { get; } + string CanonicalizedResource { get; } + long ContentLength { get; } + bool ConsumeBody { get; } + + } + + // This is a helper class for impementing writers for various Storage requests. + // Subclasses of this type (see below) know how to write specific headers for each Storage Rest API. + abstract class StorageRequestWriter : RequestWriter where T : IStorageRequest + { + // TODO (pri 3): this should be cached with some expiration policy + protected DateTime Time => DateTime.UtcNow; + + // TODO (pri 2): it would be good if this could advance and flush instead demanding larger and larger buffers. + protected override void WriteRequestLineAndHeaders(PipeWriter writer, ref T arguments) + { + var memory = writer.GetSpan(); + BufferWriter bufferWriter = memory.AsHttpWriter(); + bufferWriter.Enlarge = (int desiredSize) => + { + return writer.GetMemory(desiredSize); + }; + + bufferWriter.WriteRequestLine(Verb, Http.Version.Http11, arguments.RequestPath); + + int headersStart = bufferWriter.WrittenCount; + WriteXmsHeaders(ref bufferWriter, ref arguments); + Span headersBuffer = bufferWriter.Written.Slice(headersStart); + + var authenticationHeader = new StorageAuthorizationHeader() + { + // TODO (pri 1): the hash is not thread safe. is that OK? + Hash = arguments.Client.Hash, + HttpVerb = AsString(Verb), + AccountName = arguments.Client.AccountName, + CanonicalizedResource = arguments.CanonicalizedResource, + // TODO (pri 1): this allocation should be eliminated + CanonicalizedHeaders = new WritableBytes(headersBuffer.ToArray()), + ContentLength = arguments.ContentLength + }; + // TODO (pri 3): the default should be defaulted + bufferWriter.WriteHeader("Authorization", authenticationHeader, default); + + WriteOtherHeaders(ref bufferWriter, ref arguments); + bufferWriter.WriteEoh(); + + writer.Advance(bufferWriter.WrittenCount); + } + + protected abstract void WriteXmsHeaders(ref BufferWriter writer, ref T arguments); + + protected virtual void WriteOtherHeaders(ref BufferWriter writer, ref T arguments) { + writer.WriteHeader("Content-Length", arguments.ContentLength); + writer.WriteHeader("Host", arguments.Client.Host); + } + + // TODO (pri 2): this should be UTF8 + public static string AsString(Http.Method verb) + { + if (verb == Http.Method.Get) return "GET"; + if (verb == Http.Method.Put) return "PUT"; + throw new NotImplementedException(); + } + } + + public struct PutRangeRequest : IStorageRequest + { + public Stream FileContent { get; set; } // TODO (pri 3): should there be a way to write from file handle or PipeReader? + public string FilePath { get; set; } + + // TODO (pri 3): I dont like how the client property is a public API + public StorageClient Client { get; set; } + + public PutRangeRequest(string filePath, Stream fileContent) + { + FilePath = filePath; + FileContent = fileContent; + Client = null; + } + + public long ContentLength => FileContent.Length; + // TODO (pri 2): would be nice to elimnate these allocations + public string RequestPath => FilePath + "?comp=range"; + public string CanonicalizedResource => FilePath + "\ncomp:range"; + public bool ConsumeBody => true; + + // TODO (pri 3): can this be an extension method? All implementations are the same. + public async Task WriteAsync(PipeWriter writer) + => await requestWriter.WriteAsync(writer, this).ConfigureAwait(false); + + static readonly Writer requestWriter = new Writer(); + class Writer : StorageRequestWriter + { + public override Http.Method Verb => Http.Method.Put; + + protected override async Task WriteBody(PipeWriter writer, PutRangeRequest arguments) + => await writer.WriteAsync(arguments.FileContent); + + protected override void WriteXmsHeaders(ref BufferWriter writer, ref PutRangeRequest arguments) + { + long size = arguments.FileContent.Length; + writer.WriteHeader("x-ms-date", Time, 'R'); + // TODO (pri 3): this allocation should be eliminated + writer.WriteHeader("x-ms-range", $"bytes=0-{size-1}"); + writer.WriteHeader("x-ms-version", "2017-04-17"); + writer.WriteHeader("x-ms-write", "update"); + } + } + } + + public struct CreateFileRequest : IStorageRequest + { + public long FileSize { get; set; } + public string RequestPath { get; set; } + public StorageClient Client { get; set; } + + public CreateFileRequest(string requestPath, long fileSize) + { + RequestPath = requestPath; + FileSize = fileSize; + Client = null; + } + + public bool ConsumeBody => true; + public long ContentLength => 0; + public string CanonicalizedResource => RequestPath; + + public async Task WriteAsync(PipeWriter writer) + => await requestWriter.WriteAsync(writer, this).ConfigureAwait(false); + + static readonly Writer requestWriter = new Writer(); + class Writer : StorageRequestWriter + { + public override Http.Method Verb => Http.Method.Put; + + protected override void WriteXmsHeaders(ref BufferWriter writer, ref CreateFileRequest arguments) + { + writer.WriteHeader("x-ms-content-length", arguments.FileSize); + writer.WriteHeader("x-ms-date", Time, 'R'); + writer.WriteHeader("x-ms-type", "file"); + writer.WriteHeader("x-ms-version", "2017-04-17"); + } + } + } + + public struct GetFileRequest : IStorageRequest + { + public StorageClient Client { get; set; } + public string FilePath { get; set; } + + public GetFileRequest(string filePath) + { + FilePath = filePath; + Client = null; + } + + public long ContentLength => 0; + public string RequestPath => FilePath; + public string CanonicalizedResource => FilePath; + public bool ConsumeBody => false; + + public async Task WriteAsync(PipeWriter writer) + => await requestWriter.WriteAsync(writer, this).ConfigureAwait(false); + + static readonly Writer requestWriter = new Writer(); + + class Writer : StorageRequestWriter + { + public override Http.Method Verb => Http.Method.Get; + + protected override void WriteXmsHeaders(ref BufferWriter writer, ref GetFileRequest arguments) + { + writer.WriteHeader("x-ms-date", Time, 'R'); + writer.WriteHeader("x-ms-version", "2017-04-17"); + } + } + } +} + diff --git a/samples/AzCopyCore/AzCopyCore/temp/Extensions.cs b/samples/AzCopyCore/AzCopyCore/temp/Extensions.cs new file mode 100644 index 00000000000..3965d3f6200 --- /dev/null +++ b/samples/AzCopyCore/AzCopyCore/temp/Extensions.cs @@ -0,0 +1,221 @@ +using System.Buffers.Text; +using System.Diagnostics; +using System.IO; +using System.IO.Pipelines; +using System.Text; +using System.Text.Http.Parser; +using System.Threading.Tasks; + +namespace System.Buffers +{ + // TODO (pri 3): Add to the platform (but NetStandard does not support the stream APIs) + static class GeneralExtensions + { + /// + /// Copies bytes from ReadOnlyBuffer to a Stream + /// + public static async Task WriteAsync(this Stream stream, ReadOnlyBuffer buffer) + { + for (var position = buffer.Start; buffer.TryGet(ref position, out var memory);) + { + await stream.WriteAsync(memory).ConfigureAwait(false); + } + } + + /// + /// Copies bytes from PipeReader to a Stream + /// + public static async Task WriteAsync(this Stream stream, PipeReader reader, ulong bytes) + { + while (bytes > 0) + { + var result = await reader.ReadAsync(); + ReadOnlyBuffer bodyBuffer = result.Buffer; + if (bytes < (ulong)bodyBuffer.Length) + { + throw new NotImplementedException(); + } + bytes -= (ulong)bodyBuffer.Length; + await stream.WriteAsync(bodyBuffer).ConfigureAwait(false); + await stream.FlushAsync().ConfigureAwait(false); + reader.AdvanceTo(bodyBuffer.End); + } + } + + /// + /// Copies bytes from Stream to PipeWriter + /// + public static async Task WriteAsync(this PipeWriter writer, Stream stream) + { + if (!stream.CanRead) throw new ArgumentException("Stream.CanRead returned false", nameof(stream)); + while (true) + { + var buffer = writer.GetMemory(); + if (buffer.Length == 0) throw new NotSupportedException("PipeWriter.GetMemory returned an empty buffer."); + int read = await stream.ReadAsync(buffer).ConfigureAwait(false); + if (read == 0) return; + writer.Advance(read); + await writer.FlushAsync(); + } + } + } + + // TODO (pri 3): Is TraceSource the right logger here? + public static class TraceListenerExtensions + { + public static void WriteInformation(this TraceSource source, string tag, ReadOnlyMemory utf8Text) + { + if (source.Switch.ShouldTrace(TraceEventType.Information)) + { + var message = Encodings.Utf8.ToString(utf8Text.Span); + source.TraceInformation($"{tag}:\n{message}\n"); + } + } + + public static void WriteError(this TraceSource source, string message) + { + if (source.Switch.ShouldTrace(TraceEventType.Error)) + { + var color = Console.ForegroundColor; + Console.ForegroundColor = ConsoleColor.Red; + source.TraceEvent(TraceEventType.Error, 0, message); + Console.ForegroundColor = color; + } + } + } + + public static class HttpExtensions + { + static HttpParser s_headersParser = new HttpParser(); + private const byte ByteLF = (byte)'\n'; + private const byte ByteCR = (byte)'\r'; + private const long maxRequestLineLength = 1024; + static readonly byte[] s_Eol = Encoding.ASCII.GetBytes("\r\n"); + static readonly byte[] s_http11 = Encoding.ASCII.GetBytes("HTTP/1.1"); + static readonly byte[] s_http10 = Encoding.ASCII.GetBytes("HTTP/1.0"); + static readonly byte[] s_http20 = Encoding.ASCII.GetBytes("HTTP/2.0"); + + // TODO (pri 2): move to corfxlab + public static bool ParseResponseLine(ref T handler, ref ReadOnlyBuffer buffer, out int consumedBytes) where T : IHttpResponseLineHandler + { + var line = buffer.First.Span; + var lf = line.IndexOf(ByteLF); + if (lf >= 0) + { + line = line.Slice(0, lf + 1); + } + else if (buffer.IsSingleSegment) + { + consumedBytes = 0; + return false; + } + else + { + long index = Sequence.IndexOf(buffer, ByteLF); + if(index < 0) + { + consumedBytes = 0; + return false; + } + if(index > maxRequestLineLength) + { + throw new Exception("invalid response (LF too far)"); + } + line = line.Slice(0, lf + 1); + } + + if(line[lf - 1] != ByteCR) + { + throw new Exception("invalid response (no CR)"); + } + + Http.Version version; + if (line.StartsWith(s_http11)) { version = Http.Version.Http11; } + // TODO (pri 2): add HTTP2 to HTTP.Version + else if (line.StartsWith(s_http20)) { version = Http.Version.Unknown; } + else if (line.StartsWith(s_http10)) { version = Http.Version.Http10; } + else + { + throw new Exception("invalid response (version)"); + } + + int codeStart = line.IndexOf((byte)' ') + 1; + var codeSlice = line.Slice(codeStart); + if (!Utf8Parser.TryParse(codeSlice, out ushort code, out consumedBytes)) + { + throw new Exception("invalid response (status code)"); + } + + var reasonStart = consumedBytes + 1; + var reason = codeSlice.Slice(reasonStart, codeSlice.Length - reasonStart - 2); + consumedBytes = lf + s_Eol.Length; + + handler.OnStatusLine(version, code, reason); + + return true; + } + + // TODO (pri 3): Add to the platform, but this would require common logging API + public static async ValueTask ParseAsync(PipeReader reader, TraceSource log = null) + where T : IHttpResponseLineHandler, IHttpHeadersHandler, new() + { + var result = await reader.ReadAsync(); + ReadOnlyBuffer buffer = result.Buffer; + + if (log != null) log.WriteInformation("RESPONSE: ", buffer.First); + + var handler = new T(); + if (!ParseResponseLine(ref handler, ref buffer, out int rlConsumed)) + { + throw new NotImplementedException("could not parse the response"); + } + + buffer = buffer.Slice(rlConsumed); + if (!s_headersParser.ParseHeaders(ref handler, buffer, out int hdConsumed)) + { + throw new NotImplementedException("could not parse the response"); + } + + reader.AdvanceTo(buffer.GetPosition(buffer.Start, hdConsumed)); + + return handler; + } + } + + // TODO (pri 3): Should I use the command line library? + class CommandOptions + { + readonly string[] _options; + + public CommandOptions(string[] options) + { + _options = options; + } + + public bool Contains(string optionName) + { + for (int i = 0; i < _options.Length; i++) + { + var candidate = _options[i]; + if (candidate.StartsWith(optionName)) return true; + } + return false; + } + + public ReadOnlyMemory Get(string optionName) + { + if (optionName.Length < 1) throw new ArgumentOutOfRangeException(nameof(optionName)); + + for (int i = 0; i < _options.Length; i++) + { + var candidate = _options[i]; + if (candidate.StartsWith(optionName)) + { + var option = candidate.AsReadOnlyMemory(); + return option.Slice(optionName.Length); + } + } + return ReadOnlyMemory.Empty; + } + } +}