Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Commit

Permalink
Added AsStream to PipeReader and PipeWriter (#35399)
Browse files Browse the repository at this point in the history
- This adds a new virtual member to PipeReader and PipeWriter to get a read only or write only stream from the PipeReader and PipeWriter
- This introduces a new field on the base types
- Added tests
  • Loading branch information
davidfowl authored Feb 19, 2019
1 parent 383e8d6 commit 968a6a4
Show file tree
Hide file tree
Showing 11 changed files with 846 additions and 30 deletions.
2 changes: 2 additions & 0 deletions src/System.IO.Pipelines/ref/System.IO.Pipelines.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public abstract partial class PipeReader
protected PipeReader() { }
public abstract void AdvanceTo(System.SequencePosition consumed);
public abstract void AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined);
public virtual System.IO.Stream AsStream() { throw null; }
public abstract void CancelPendingRead();
public abstract void Complete(System.Exception exception = null);
public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand All @@ -62,6 +63,7 @@ public abstract partial class PipeWriter : System.Buffers.IBufferWriter<byte>
{
protected PipeWriter() { }
public abstract void Advance(int bytes);
public virtual System.IO.Stream AsStream() { throw null; }
public abstract void CancelPendingFlush();
public abstract void Complete(System.Exception exception = null);
protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down
65 changes: 37 additions & 28 deletions src/System.IO.Pipelines/src/Resources/Strings.resx
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
<?xml version="1.0" encoding="utf-8"?>
<root>
<!--
Microsoft ResX Schema
<!--
Microsoft ResX Schema
Version 2.0
The primary goals of this format is to allow a simple XML format
that is mostly human readable. The generation and parsing of the
various data types are done through the TypeConverter classes
The primary goals of this format is to allow a simple XML format
that is mostly human readable. The generation and parsing of the
various data types are done through the TypeConverter classes
associated with the data types.
Example:
... ado.net/XML headers & schema ...
<resheader name="resmimetype">text/microsoft-resx</resheader>
<resheader name="version">2.0</resheader>
Expand All @@ -26,36 +26,36 @@
<value>[base64 mime encoded string representing a byte array form of the .NET Framework object]</value>
<comment>This is a comment</comment>
</data>
There are any number of "resheader" rows that contain simple
There are any number of "resheader" rows that contain simple
name/value pairs.
Each data row contains a name, and value. The row also contains a
type or mimetype. Type corresponds to a .NET class that support
text/value conversion through the TypeConverter architecture.
Classes that don't support this are serialized and stored with the
Each data row contains a name, and value. The row also contains a
type or mimetype. Type corresponds to a .NET class that support
text/value conversion through the TypeConverter architecture.
Classes that don't support this are serialized and stored with the
mimetype set.
The mimetype is used for serialized objects, and tells the
ResXResourceReader how to depersist the object. This is currently not
The mimetype is used for serialized objects, and tells the
ResXResourceReader how to depersist the object. This is currently not
extensible. For a given mimetype the value must be set accordingly:
Note - application/x-microsoft.net.object.binary.base64 is the format
that the ResXResourceWriter will generate, however the reader can
Note - application/x-microsoft.net.object.binary.base64 is the format
that the ResXResourceWriter will generate, however the reader can
read any of the formats listed below.
mimetype: application/x-microsoft.net.object.binary.base64
value : The object must be serialized with
value : The object must be serialized with
: System.Runtime.Serialization.Formatters.Binary.BinaryFormatter
: and then encoded with base64 encoding.
mimetype: application/x-microsoft.net.object.soap.base64
value : The object must be serialized with
value : The object must be serialized with
: System.Runtime.Serialization.Formatters.Soap.SoapFormatter
: and then encoded with base64 encoding.
mimetype: application/x-microsoft.net.object.bytearray.base64
value : The object must be serialized into a byte array
value : The object must be serialized into a byte array
: using a System.ComponentModel.TypeConverter
: and then encoded with base64 encoding.
-->
Expand Down Expand Up @@ -135,15 +135,24 @@
<data name="ConcurrentOperationsNotSupported" xml:space="preserve">
<value>Concurrent reads or writes are not supported.</value>
</data>
<data name="FlushCanceledOnPipeWriter" xml:space="preserve">
<value>Flush was canceled on underlying PipeWriter.</value>
</data>
<data name="GetResultBeforeCompleted" xml:space="preserve">
<value>Can't GetResult unless awaiter is completed.</value>
</data>
<data name="InvalidZeroByteRead" xml:space="preserve">
<value>The PipeReader returned 0 bytes when the ReadResult was not completed or canceled.</value>
</data>
<data name="NoReadingOperationToComplete" xml:space="preserve">
<value>No reading operation to complete.</value>
</data>
<data name="NoWritingOperation" xml:space="preserve">
<value>No writing operation. Make sure GetMemory() was called.</value>
</data>
<data name="ReadCanceledOnPipeReader" xml:space="preserve">
<value>Read was canceled on underlying PipeReader.</value>
</data>
<data name="ReaderAndWriterHasToBeCompleted" xml:space="preserve">
<value>Both reader and writer has to be completed to be able to reset the pipe.</value>
</data>
Expand All @@ -156,4 +165,4 @@
<data name="WritingAfterCompleted" xml:space="preserve">
<value>Writing is not allowed after writer was completed.</value>
</data>
</root>
</root>
5 changes: 5 additions & 0 deletions src/System.IO.Pipelines/src/System.IO.Pipelines.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
<Configurations>netcoreapp-Debug;netcoreapp-Release;netstandard-Debug;netstandard-Release</Configurations>
</PropertyGroup>
<ItemGroup>
<Compile Include="$(CommonPath)\CoreLib\System\Threading\Tasks\TaskToApm.cs">
<Link>Common\CoreLib\System\Threading\Tasks\TaskToApm.cs</Link>
</Compile>
<Compile Include="Properties\InternalsVisibleTo.cs" />
<Compile Include="System\IO\Pipelines\BufferSegment.cs" />
<Compile Include="System\IO\Pipelines\CompletionData.cs" />
Expand All @@ -20,8 +23,10 @@
<Compile Include="System\IO\Pipelines\PipeOptions.cs" />
<Compile Include="System\IO\Pipelines\PipeReader.cs" />
<Compile Include="System\IO\Pipelines\PipeOperationState.cs" />
<Compile Include="System\IO\Pipelines\PipeReaderStream.cs" />
<Compile Include="System\IO\Pipelines\PipeScheduler.cs" />
<Compile Include="System\IO\Pipelines\PipeWriter.cs" />
<Compile Include="System\IO\Pipelines\PipeWriterStream.cs" />
<Compile Include="System\IO\Pipelines\ReadResult.cs" />
<Compile Include="System\IO\Pipelines\ResultFlags.cs" />
<Compile Include="System\IO\Pipelines\StreamPipeExtensions.cs" />
Expand Down
13 changes: 12 additions & 1 deletion src/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace System.IO.Pipelines
/// </summary>
public abstract partial class PipeReader
{
private PipeReaderStream _stream;

/// <summary>
/// Attempt to synchronously read data the <see cref="PipeReader"/>.
/// </summary>
Expand Down Expand Up @@ -48,6 +50,15 @@ public abstract partial class PipeReader
/// </remarks>
public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined);

/// <summary>
///
/// </summary>
/// <returns></returns>
public virtual Stream AsStream()
{
return _stream ?? (_stream = new PipeReaderStream(this));
}

/// <summary>
/// Cancel to currently pending or if none is pending next call to <see cref="ReadAsync"/>, without completing the <see cref="PipeReader"/>.
/// </summary>
Expand Down Expand Up @@ -99,7 +110,7 @@ private async Task CopyToAsyncCore(Stream destination, CancellationToken cancell

if (result.IsCanceled)
{
throw new OperationCanceledException();
ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
}

while (buffer.TryGet(ref position, out ReadOnlyMemory<byte> memory))
Expand Down
114 changes: 114 additions & 0 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace System.IO.Pipelines
{
internal sealed class PipeReaderStream : Stream
{
private readonly PipeReader _pipeReader;

public PipeReaderStream(PipeReader pipeReader)
{
Debug.Assert(pipeReader != null);
_pipeReader = pipeReader;
}

public override bool CanRead => true;

public override bool CanSeek => false;

public override bool CanWrite => false;

public override long Length => throw new NotSupportedException();

public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }

public override void Flush()
{
}

public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
}

public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();

public override void SetLength(long value) => throw new NotSupportedException();

public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();

public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
TaskToApm.Begin(ReadAsync(buffer, offset, count, default), callback, state);

public sealed override int EndRead(IAsyncResult asyncResult) =>
TaskToApm.End<int>(asyncResult);

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}

#if !netstandard
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
return ReadAsyncInternal(buffer, cancellationToken);
}
#endif

private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
{
ReadResult result = await _pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false);

if (result.IsCanceled)
{
ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
}

ReadOnlySequence<byte> sequence = result.Buffer;
long bufferLength = sequence.Length;
SequencePosition consumed = sequence.Start;

try
{
if (bufferLength != 0)
{
int actual = (int)Math.Min(bufferLength, buffer.Length);

ReadOnlySequence<byte> slice = actual == bufferLength ? sequence : sequence.Slice(0, actual);
consumed = slice.End;
slice.CopyTo(buffer.Span);

return actual;
}

if (result.IsCompleted)
{
return 0;
}
}
finally
{
_pipeReader.AdvanceTo(consumed);
}

// This is a buggy PipeReader implementation that returns 0 byte reads even though the PipeReader
// isn't completed or canceled
ThrowHelper.ThrowInvalidOperationException_InvalidZeroByteRead();
return 0;
}

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
// Delegate to CopyToAsync on the PipeReader
return _pipeReader.CopyToAsync(destination, cancellationToken);
}
}
}
13 changes: 12 additions & 1 deletion src/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace System.IO.Pipelines
/// </summary>
public abstract partial class PipeWriter : IBufferWriter<byte>
{
private PipeWriterStream _stream;

/// <summary>
/// Marks the <see cref="PipeWriter"/> as being complete, meaning no more items will be written to it.
/// </summary>
Expand Down Expand Up @@ -43,6 +45,15 @@ public abstract partial class PipeWriter : IBufferWriter<byte>
/// <inheritdoc />
public abstract Span<byte> GetSpan(int sizeHint = 0);

/// <summary>
///
/// </summary>
/// <returns></returns>
public virtual Stream AsStream()
{
return _stream ?? (_stream = new PipeWriterStream(this));
}

/// <summary>
/// Writes <paramref name="source"/> to the pipe and makes data accessible to <see cref="PipeReader"/>
/// </summary>
Expand Down Expand Up @@ -76,7 +87,7 @@ protected internal virtual async Task CopyFromAsync(Stream source, CancellationT

if (result.IsCanceled)
{
throw new OperationCanceledException();
ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
}

if (result.IsCompleted)
Expand Down
Loading

0 comments on commit 968a6a4

Please sign in to comment.