Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Introducing System.IO.Pipelines #980

Merged
merged 3 commits into from
Nov 15, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 129 additions & 69 deletions corefxlab.sln

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions samples/System.IO.Pipelines.Samples/AspNetHttpServerSample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System.Text;
using System.IO.Pipelines.Samples.Http;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;

namespace System.IO.Pipelines.Samples
{
public class AspNetHttpServerSample
{
private static readonly UTF8Encoding _utf8Encoding = new UTF8Encoding(false);
private static readonly byte[] _helloWorldPayload = Encoding.UTF8.GetBytes("Hello, World!");

public static void Run()
{
using (var httpServer = new HttpServer())
{
var host = new WebHostBuilder()
.UseUrls("http://*:5000")
.UseServer(httpServer)
// .UseKestrel()
.Configure(app =>
{
app.Run(context =>
{
context.Response.StatusCode = 200;
context.Response.ContentType = "text/plain";
// HACK: Setting the Content-Length header manually avoids the cost of serializing the int to a string.
// This is instead of: httpContext.Response.ContentLength = _helloWorldPayload.Length;
context.Response.Headers["Content-Length"] = "13";
return context.Response.Body.WriteAsync(_helloWorldPayload, 0, _helloWorldPayload.Length);
});
})
.Build();
host.Run();
}
}
}
}
42 changes: 42 additions & 0 deletions samples/System.IO.Pipelines.Samples/CompressionSample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;
using System.IO;
using System.IO.Compression;
using System.IO.Pipelines.Compression;
using System.IO.Pipelines.File;

namespace System.IO.Pipelines.Samples
{
public class CompressionSample
{
public static void Run()
{
using (var cf = new PipelineFactory())
{
var filePath = Path.GetFullPath("Program.cs");

// This is what Stream looks like
//var fs = File.OpenRead(filePath);
//var compressed = new MemoryStream();
//var compressStream = new DeflateStream(compressed, CompressionMode.Compress);
//fs.CopyTo(compressStream);
//compressStream.Flush();
//compressed.Seek(0, SeekOrigin.Begin);
// var input = channelFactory.MakeReadableChannel(compressed);

var input = cf.ReadFile(filePath)
.DeflateCompress(cf, CompressionLevel.Optimal)
.DeflateDecompress(cf);

// Wrap the console in a writable channel
var output = cf.CreateWriter(Console.OpenStandardOutput());

// Copy from the file channel to the console channel
input.CopyToAsync(output).GetAwaiter().GetResult();

input.Complete();
Copy link

@dazinator dazinator Dec 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you forget to call Complete() ? Do you think there would be any benefit if IPipelineReader / 'IPipelineReader' implemented IDisposable, and called Complete() in it's dispose?


output.Complete();
}
}
}
}
142 changes: 142 additions & 0 deletions samples/System.IO.Pipelines.Samples/Framing/Codec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
using System;
using System.IO;
using System.Net;
using System.Text;
using System.Text.Formatting;
using System.Threading.Tasks;
using System.IO.Pipelines.Networking.Libuv;
using System.IO.Pipelines.Text.Primitives;

namespace System.IO.Pipelines.Samples.Framing
{
public static class ProtocolHandling
{
public static void Run()
{
var ip = IPAddress.Any;
int port = 5000;
var thread = new UvThread();
var listener = new UvTcpListener(thread, new IPEndPoint(ip, port));
listener.OnConnection(async connection =>
{
var channel = MakePipeline(connection);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't you rename "channel" as well?


var decoder = new LineDecoder();
var handler = new LineHandler();

// Initialize the handler with the channel
handler.Initialize(channel);

try
{
while (true)
{
// Wait for data
var result = await channel.Input.ReadAsync();
var input = result.Buffer;

try
{
if (input.IsEmpty && result.IsCompleted)
{
// No more data
break;
}

Line line;
while (decoder.TryDecode(ref input, out line))
{
await handler.HandleAsync(line);
}

if (!input.IsEmpty && result.IsCompleted)
{
// Didn't get the whole frame and the connection ended
throw new EndOfStreamException();
}
}
finally
{
// Consume the input
channel.Input.Advance(input.Start, input.End);
}
}
}
finally
{
// Close the input channel, which will tell the producer to stop producing
channel.Input.Complete();

// Close the output channel, which will close the connection
channel.Output.Complete();
}
});

listener.StartAsync().GetAwaiter().GetResult();

Console.WriteLine($"Listening on {ip} on port {port}");
Console.ReadKey();

listener.Dispose();
thread.Dispose();
}

public static IPipelineConnection MakePipeline(IPipelineConnection channel)
{
// Do something fancy here to wrap the channel, SSL etc
return channel;
}
}

public class Line
{
public string Data { get; set; }
}

public class LineHandler : IFrameHandler<Line>
{
private WritableChannelFormatter _formatter;

public void Initialize(IPipelineConnection channel)
{
_formatter = new WritableChannelFormatter(channel.Output, EncodingData.InvariantUtf8);
}

public Task HandleAsync(Line message)
{
// Echo back to the caller
_formatter.Append(message.Data);
return _formatter.FlushAsync();
}
}

public class LineDecoder : IFrameDecoder<Line>
{
public bool TryDecode(ref ReadableBuffer input, out Line frame)
{
ReadableBuffer slice;
ReadCursor cursor;
if (input.TrySliceTo((byte)'\r', (byte)'\n', out slice, out cursor))
{
frame = new Line { Data = slice.GetUtf8String() };
input = input.Slice(cursor).Slice(1);
return true;
}

frame = null;
return false;
}
}

public interface IFrameDecoder<TInput>
{
bool TryDecode(ref ReadableBuffer input, out TInput frame);
}

public interface IFrameHandler<TInput>
{
void Initialize(IPipelineConnection channel);

Task HandleAsync(TInput message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;

namespace System.IO.Pipelines.Samples
{
public class ChannelHttpContent : HttpContent
{
private readonly IPipelineReader _output;

public ChannelHttpContent(IPipelineReader output)
{
_output = output;
}

public int ContentLength { get; set; }

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
int remaining = ContentLength;

while (remaining > 0)
{
var result = await _output.ReadAsync();
var inputBuffer = result.Buffer;

var fin = result.IsCompleted;

var consumed = inputBuffer.Start;

try
{
if (inputBuffer.IsEmpty && fin)
{
return;
}

var data = inputBuffer.Slice(0, remaining);

foreach (var memory in data)
{
ArraySegment<byte> buffer;

unsafe
{
if (!memory.TryGetArray(out buffer))
{
// Fall back to copies if this was native memory and we were unable to get
// something we could write
buffer = new ArraySegment<byte>(memory.Span.ToArray());
}
}

await stream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count);
}

consumed = data.End;
remaining -= data.Length;
}
finally
{
_output.Advance(consumed);
}
}
}

protected override bool TryComputeLength(out long length)
{
length = ContentLength;
return true;
}
}
}
Loading