Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Commit

Permalink
Cleanup pipelines code and tests (#2103)
Browse files Browse the repository at this point in the history
  • Loading branch information
pakrym authored Feb 7, 2018
1 parent 33936e5 commit 1783bd6
Show file tree
Hide file tree
Showing 92 changed files with 2,791 additions and 3,283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ protected static void WriteCommonHeaders<TFormatter>(
Http.Version version,
int statuCode,
string reasonCode)
where TFormatter : ITextOutput
where TFormatter : ITextBufferWriter
{
var currentTime = DateTime.UtcNow;
formatter.AppendHttpStatusLine(version, statuCode, new Utf8Span(reasonCode));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace Microsoft.Net
{
public class TcpConnectionFormatter : ITextOutput, IDisposable
public class TcpConnectionFormatter : ITextBufferWriter, IDisposable
{
static byte[] s_terminator = new Utf8Span("0\r\n\r\n").Bytes.ToArray();
const int ChunkPrefixSize = 10;
Expand Down
43 changes: 20 additions & 23 deletions samples/System.IO.Pipelines.Samples/CompressionSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,35 @@ public class CompressionSample : ISample
{
public Task Run()
{
using (var bufferPool = new MemoryPool())
{
var filePath = Path.GetFullPath("Program.cs");
var filePath = Path.GetFullPath("Program.cs");

// This is what Stream looks like
//var fs = File.OpenRead(filePath);
//var compressed = new MemoryStream();
//var compressStream = new DeflateStream(compressed, CompressionMode.Compress);
//fs.CopyTo(compressStream);
//compressStream.Flush();
//compressed.Seek(0, SeekOrigin.Begin);
// This is what Stream looks like
//var fs = File.OpenRead(filePath);
//var compressed = new MemoryStream();
//var compressStream = new DeflateStream(compressed, CompressionMode.Compress);
//fs.CopyTo(compressStream);
//compressStream.Flush();
//compressed.Seek(0, SeekOrigin.Begin);

var options = new PipeOptions(bufferPool);
var options = new PipeOptions();

var input = ReadableFilePipelineFactoryExtensions.ReadFile(options, filePath)
.DeflateCompress(options, CompressionLevel.Optimal)
.DeflateDecompress(options);
var input = ReadableFilePipelineFactoryExtensions.ReadFile(options, filePath)
.DeflateCompress(options, CompressionLevel.Optimal)
.DeflateDecompress(options);

// Wrap the console in a pipeline writer
// Wrap the console in a pipeline writer

var outputPipe = new Pipe(options);
outputPipe.Reader.CopyToEndAsync(Console.OpenStandardOutput());
var outputPipe = new Pipe(options);
outputPipe.Reader.CopyToEndAsync(Console.OpenStandardOutput());

// Copy from the file reader to the console writer
input.CopyToAsync(outputPipe.Writer).GetAwaiter().GetResult();
// Copy from the file reader to the console writer
input.CopyToAsync(outputPipe.Writer).GetAwaiter().GetResult();

input.Complete();
input.Complete();

outputPipe.Writer.Complete();
outputPipe.Writer.Complete();

return Task.CompletedTask;
}
return Task.CompletedTask;
}
}
}
8 changes: 4 additions & 4 deletions samples/System.IO.Pipelines.Samples/Framing/Codec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,18 @@ public class Line

public class LineHandler : IFrameHandler<Line>
{
private PipelineTextOutput _textOutput;
private PipelineTextBufferWriter _textBufferWriter;

public void Initialize(IDuplexPipe connection)
{
_textOutput = new PipelineTextOutput(connection.Output, SymbolTable.InvariantUtf8);
_textBufferWriter = new PipelineTextBufferWriter(connection.Output, SymbolTable.InvariantUtf8);
}

public Task HandleAsync(Line message)
{
// Echo back to the caller
_textOutput.Append(message.Data);
return _textOutput.FlushAsync();
_textBufferWriter.Append(message.Data);
return _textBufferWriter.FlushAsync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ IEnumerator IEnumerable.GetEnumerator()
return GetEnumerator();
}

public void CopyTo(bool chunk, IOutput buffer)
public void CopyTo(bool chunk, IBufferWriter buffer)
{
foreach (var header in _headers)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected async Task ProcessConnection(IDuplexPipe connection)

// Writing directly to pooled buffers
var output = connection.Output;
var formatter = new OutputFormatter<PipeWriter>(output, SymbolTable.InvariantUtf8);
var formatter = new BufferWriterFormatter<PipeWriter>(output, SymbolTable.InvariantUtf8);
formatter.Append("HTTP/1.1 200 OK");
formatter.Append("\r\nContent-Length: 13");
formatter.Append("\r\nContent-Type: text/plain");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class RawSocketHttpClientSample : RawHttpClientSampleBase

public RawSocketHttpClientSample()
{
pool = new MemoryPool();
pool = MemoryPool<byte>.Shared;
}

protected override Task<IDuplexPipe> GetConnection()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace System.IO.Pipelines.Samples
{
public class SocketHttpClientHandler : PipelineHttpClientHandler
{
MemoryPool<byte> bufferPool = new MemoryPool();
MemoryPool<byte> bufferPool = MemoryPool<byte>.Shared;

protected override void Dispose(bool disposing)
{
Expand Down
30 changes: 13 additions & 17 deletions samples/System.IO.Pipelines.Samples/Socket/SocketHttpServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,21 @@ protected override async void StartAccepting<TContext>(IHttpApplication<TContext
_listenSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
_listenSocket.Bind(new IPEndPoint(ip, port));
_listenSocket.Listen(10);

using (var memoryPool = new MemoryPool())
while (true)
{
while (true)
try
{
var clientSocket = await _listenSocket.AcceptAsync();
clientSocket.NoDelay = true;
var task = ProcessConnection(application, MemoryPool<byte>.Shared, clientSocket);
}
catch (ObjectDisposedException)
{
break;
}
catch (Exception)
{
try
{
var clientSocket = await _listenSocket.AcceptAsync();
clientSocket.NoDelay = true;
var task = ProcessConnection(application, memoryPool, clientSocket);
}
catch (ObjectDisposedException)
{
break;
}
catch (Exception)
{
/* Ignored */
}
/* Ignored */
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public static class BufferExtensions
{
const int stackLength = 32;

public static void Pipe(this IBufferOperation transformation, ReadOnlyBuffer<byte> source, IOutput destination)
public static void Pipe(this IBufferOperation transformation, ReadOnlyBuffer<byte> source, IBufferWriter destination)
{
int afterMergeSlice = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public ref partial struct BufferWriter
public static BufferWriter Create(Span<byte> buffer) => new BufferWriter(buffer);

public static BufferWriter<TOutput> Create<TOutput>(TOutput output)
where TOutput : IOutput
where TOutput : IBufferWriter
=> new BufferWriter<TOutput>(output);

private BufferWriter(Span<byte> buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace System.Buffers.Text
{
public ref struct BufferWriter<TOutput> where TOutput : IOutput
public ref struct BufferWriter<TOutput> where TOutput : IBufferWriter
{
TOutput _output;
Span<byte> _buffer;
Expand Down
28 changes: 28 additions & 0 deletions src/System.Buffers.Primitives/System/Buffers/IBufferWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace System.Buffers
{
/// <summary>
/// Represents a <see cref="Byte"/> sink
/// </summary>
public interface IBufferWriter
{
/// <summary>
/// Notifies <see cref="IBufferWriter"/> that <paramref cref="bytes"/> amount of data was written to <see cref="Span{Byte}"/>/<see cref="Memory{Byte}"/>
/// </summary>
void Advance(int bytes);

/// <summary>
/// Requests the <see cref="Memory{Byte}"/> of at least <paramref cref="minimumLength"/> in size.
/// If <see cref="minimumLength"/> is equal to <code>0</code>, currently available memory would get returned.
/// </summary>
Memory<byte> GetMemory(int minimumLength = 0);

/// <summary>
/// Requests the <see cref="Span{Byte}"/> of at least <paramref cref="minimumLength"/> in size.
/// If <see cref="minimumLength"/> is equal to <code>0</code>, currently available memory would get returned.
/// </summary>
Span<byte> GetSpan(int minimumLength = 0);
}
}
14 changes: 0 additions & 14 deletions src/System.Buffers.Primitives/System/Buffers/IOutput.cs

This file was deleted.

10 changes: 5 additions & 5 deletions src/System.Buffers.Primitives/System/Buffers/OutputExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ namespace System.Buffers
{
public static class OutputExtensions
{
public static void Write(this IOutput output, ReadOnlySpan<byte> source)
public static void Write(this IBufferWriter bufferWriter, ReadOnlySpan<byte> source)
{
var buffer = output.GetMemory();
var buffer = bufferWriter.GetMemory();

// Fast path, try copying to the available memory directly
if (source.Length <= buffer.Length)
{
source.CopyTo(buffer.Span);
output.Advance(source.Length);
bufferWriter.Advance(source.Length);
return;
}

Expand All @@ -24,7 +24,7 @@ public static void Write(this IOutput output, ReadOnlySpan<byte> source)
{
var writable = Math.Min(remaining, buffer.Length);

buffer = output.GetMemory(writable);
buffer = bufferWriter.GetMemory(writable);

if (writable == 0)
{
Expand All @@ -36,7 +36,7 @@ public static void Write(this IOutput output, ReadOnlySpan<byte> source)
remaining -= writable;
offset += writable;

output.Advance(writable);
bufferWriter.Advance(writable);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/System.Buffers.Primitives/System/Buffers/OutputWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ namespace System.Buffers
{
public static class OutputWriter
{
public static OutputWriter<T> Create<T>(T output) where T : IOutput
public static OutputWriter<T> Create<T>(T output) where T : IBufferWriter
{
return new OutputWriter<T>(output);
}
}

public ref struct OutputWriter<T> where T : IOutput
public ref struct OutputWriter<T> where T : IBufferWriter
{
private T _output;
private Span<byte> _span;
Expand Down
2 changes: 1 addition & 1 deletion src/System.IO.Pipelines.File/FileReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public unsafe static void IOCallback(uint errorCode, uint numBytes, NativeOverla
}
}

private static async Task Continue(ValueAwaiter<FlushResult> awaitable, ReadOperation operation)
private static async Task Continue(PipeAwaiter<FlushResult> awaitable, ReadOperation operation)
{
// Keep reading once we get the completion
var flushResult = await awaitable;
Expand Down
4 changes: 2 additions & 2 deletions src/System.IO.Pipelines.Networking.Libuv/UvThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public UvThread()

public UvLoopHandle Loop { get; private set; }

public MemoryPool<byte> Pool { get; } = new MemoryPool();
public MemoryPool<byte> Pool { get; } = MemoryPool<byte>.Shared;

public WriteReqPool WriteReqPool { get; }

Expand Down Expand Up @@ -138,7 +138,7 @@ public void Dispose()

Pool.Dispose();
}

public override void Schedule(Action<object> action, object state)
{
// REVIEW: Should we inline actions if we're already on the libuv thread?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace System.IO.Pipelines.Networking.Sockets
Expand Down Expand Up @@ -79,7 +78,7 @@ internal SocketConnection(Socket socket, MemoryPool<byte> pool)
if (pool == null)
{
_ownsPool = true;
pool = new MemoryPool();
pool = MemoryPool<byte>.Shared;
}
_pool = pool;

Expand Down Expand Up @@ -351,7 +350,7 @@ private async Task ReceiveFromSocketAndPushToWriterAsync()
// before flushing (entirely arbitrarily) - might want to make this configurable later

haveWriteBuffer = true;
int FlushInputEveryBytes = 4 * Pool.MaxBufferSize;
int FlushInputEveryBytes = 24;

if (initialSegment.Array != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SocketListener : IDisposable
public SocketListener(MemoryPool<byte> pool = null)
{
_ownsPool = pool == null;
_pool = pool ?? new MemoryPool();
_pool = pool ?? MemoryPool<byte>.Shared;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

namespace System.IO.Pipelines.Text.Primitives
{
public class PipelineTextOutput : ITextOutput
public class PipelineTextBufferWriter : ITextBufferWriter
{
private readonly PipeWriter _writer;
private bool _needAlloc = true;

public PipelineTextOutput(PipeWriter writer, SymbolTable symbolTable)
public PipelineTextBufferWriter(PipeWriter writer, SymbolTable symbolTable)
{
_writer = writer;
SymbolTable = symbolTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ namespace System.IO.Pipelines.Text.Primitives
{
public static class PipelineWriterExtensions
{
public static PipelineTextOutput AsTextOutput(this PipeWriter writer, SymbolTable symbolTable)
public static PipelineTextBufferWriter AsTextOutput(this PipeWriter writer, SymbolTable symbolTable)
{
return new PipelineTextOutput(writer, symbolTable);
return new PipelineTextBufferWriter(writer, symbolTable);
}
}
}
Loading

0 comments on commit 1783bd6

Please sign in to comment.