Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Added AsStream to PipeReader and PipeWriter #35399

Merged
merged 5 commits into from
Feb 19, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/System.IO.Pipelines/ref/System.IO.Pipelines.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public abstract partial class PipeReader
protected PipeReader() { }
public abstract void AdvanceTo(System.SequencePosition consumed);
public abstract void AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined);
public virtual System.IO.Stream AsStream() { throw null; }
public abstract void CancelPendingRead();
public abstract void Complete(System.Exception exception = null);
public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand All @@ -62,6 +63,7 @@ public abstract partial class PipeWriter : System.Buffers.IBufferWriter<byte>
{
protected PipeWriter() { }
public abstract void Advance(int bytes);
public virtual System.IO.Stream AsStream() { throw null; }
public abstract void CancelPendingFlush();
public abstract void Complete(System.Exception exception = null);
protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down
65 changes: 37 additions & 28 deletions src/System.IO.Pipelines/src/Resources/Strings.resx
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
<?xml version="1.0" encoding="utf-8"?>
<root>
<!--
Microsoft ResX Schema

<!--
Microsoft ResX Schema
Version 2.0

The primary goals of this format is to allow a simple XML format
that is mostly human readable. The generation and parsing of the
various data types are done through the TypeConverter classes
The primary goals of this format is to allow a simple XML format
that is mostly human readable. The generation and parsing of the
various data types are done through the TypeConverter classes
associated with the data types.

Example:

... ado.net/XML headers & schema ...
<resheader name="resmimetype">text/microsoft-resx</resheader>
<resheader name="version">2.0</resheader>
Expand All @@ -26,36 +26,36 @@
<value>[base64 mime encoded string representing a byte array form of the .NET Framework object]</value>
<comment>This is a comment</comment>
</data>

There are any number of "resheader" rows that contain simple
There are any number of "resheader" rows that contain simple
name/value pairs.

Each data row contains a name, and value. The row also contains a
type or mimetype. Type corresponds to a .NET class that support
text/value conversion through the TypeConverter architecture.
Classes that don't support this are serialized and stored with the
Each data row contains a name, and value. The row also contains a
type or mimetype. Type corresponds to a .NET class that support
text/value conversion through the TypeConverter architecture.
Classes that don't support this are serialized and stored with the
mimetype set.

The mimetype is used for serialized objects, and tells the
ResXResourceReader how to depersist the object. This is currently not
The mimetype is used for serialized objects, and tells the
ResXResourceReader how to depersist the object. This is currently not
extensible. For a given mimetype the value must be set accordingly:

Note - application/x-microsoft.net.object.binary.base64 is the format
that the ResXResourceWriter will generate, however the reader can
Note - application/x-microsoft.net.object.binary.base64 is the format
that the ResXResourceWriter will generate, however the reader can
read any of the formats listed below.

mimetype: application/x-microsoft.net.object.binary.base64
value : The object must be serialized with
value : The object must be serialized with
: System.Runtime.Serialization.Formatters.Binary.BinaryFormatter
: and then encoded with base64 encoding.

mimetype: application/x-microsoft.net.object.soap.base64
value : The object must be serialized with
value : The object must be serialized with
: System.Runtime.Serialization.Formatters.Soap.SoapFormatter
: and then encoded with base64 encoding.

mimetype: application/x-microsoft.net.object.bytearray.base64
value : The object must be serialized into a byte array
value : The object must be serialized into a byte array
: using a System.ComponentModel.TypeConverter
: and then encoded with base64 encoding.
-->
Expand Down Expand Up @@ -135,15 +135,24 @@
<data name="ConcurrentOperationsNotSupported" xml:space="preserve">
<value>Concurrent reads or writes are not supported.</value>
</data>
<data name="FlushCanceledOnPipeWriter" xml:space="preserve">
<value>Flush was canceled on underlying PipeWriter.</value>
</data>
<data name="GetResultBeforeCompleted" xml:space="preserve">
<value>Can't GetResult unless awaiter is completed.</value>
</data>
<data name="InvalidZeroByteRead" xml:space="preserve">
<value>The PipeReader returned 0 bytes when the ReadResult was not completed or cancelled.</value>
davidfowl marked this conversation as resolved.
Show resolved Hide resolved
</data>
<data name="NoReadingOperationToComplete" xml:space="preserve">
<value>No reading operation to complete.</value>
</data>
<data name="NoWritingOperation" xml:space="preserve">
<value>No writing operation. Make sure GetMemory() was called.</value>
</data>
<data name="ReadCanceledOnPipeReader" xml:space="preserve">
<value>Read was canceled on underlying PipeReader.</value>
</data>
<data name="ReaderAndWriterHasToBeCompleted" xml:space="preserve">
<value>Both reader and writer has to be completed to be able to reset the pipe.</value>
</data>
Expand All @@ -156,4 +165,4 @@
<data name="WritingAfterCompleted" xml:space="preserve">
<value>Writing is not allowed after writer was completed.</value>
</data>
</root>
</root>
2 changes: 2 additions & 0 deletions src/System.IO.Pipelines/src/System.IO.Pipelines.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
<Compile Include="System\IO\Pipelines\PipeOptions.cs" />
<Compile Include="System\IO\Pipelines\PipeReader.cs" />
<Compile Include="System\IO\Pipelines\PipeOperationState.cs" />
<Compile Include="System\IO\Pipelines\PipeReaderStream.cs" />
<Compile Include="System\IO\Pipelines\PipeScheduler.cs" />
<Compile Include="System\IO\Pipelines\PipeWriter.cs" />
<Compile Include="System\IO\Pipelines\PipeWriterStream.cs" />
<Compile Include="System\IO\Pipelines\ReadResult.cs" />
<Compile Include="System\IO\Pipelines\ResultFlags.cs" />
<Compile Include="System\IO\Pipelines\StreamPipeExtensions.cs" />
Expand Down
13 changes: 12 additions & 1 deletion src/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace System.IO.Pipelines
/// </summary>
public abstract partial class PipeReader
{
private PipeReaderStream _stream;

/// <summary>
/// Attempt to synchronously read data the <see cref="PipeReader"/>.
/// </summary>
Expand Down Expand Up @@ -48,6 +50,15 @@ public abstract partial class PipeReader
/// </remarks>
public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined);

/// <summary>
///
/// </summary>
/// <returns></returns>
public virtual Stream AsStream()
{
return _stream ?? (_stream = new PipeReaderStream(this));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return _stream ?? (_stream = new PipeReaderStream(this));
return _stream ??= new PipeReaderStream(this));

It is C#8 already 😎

}

/// <summary>
/// Cancel to currently pending or if none is pending next call to <see cref="ReadAsync"/>, without completing the <see cref="PipeReader"/>.
/// </summary>
Expand Down Expand Up @@ -99,7 +110,7 @@ private async Task CopyToAsyncCore(Stream destination, CancellationToken cancell

if (result.IsCanceled)
{
throw new OperationCanceledException();
ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
}

while (buffer.TryGet(ref position, out ReadOnlyMemory<byte> memory))
Expand Down
103 changes: 103 additions & 0 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using System;
davidfowl marked this conversation as resolved.
Show resolved Hide resolved
using System.Buffers;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace System.IO.Pipelines
{
internal class PipeReaderStream : Stream
davidfowl marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly PipeReader _pipeReader;

public PipeReaderStream(PipeReader pipeReader)
{
Debug.Assert(pipeReader != null);
_pipeReader = pipeReader;
}

public override bool CanRead => true;

public override bool CanSeek => false;

public override bool CanWrite => false;

public override long Length => throw new NotSupportedException();

public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }

public override void Flush() => throw new NotSupportedException();
davidfowl marked this conversation as resolved.
Show resolved Hide resolved

public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
}

public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();

public override void SetLength(long value) => throw new NotSupportedException();

public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is relying on Memory's argument validation. That will end up allowing a null buffer if offset and count are 0. Might want to add a null / throw check here, but up to you.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine relying on the memory ctor.

return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}

#if !netstandard
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
return ReadAsyncInternal(buffer, cancellationToken);
}
#endif

private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
{
ReadResult result = await _pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false);

if (result.IsCanceled)
{
ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
}

ReadOnlySequence<byte> sequence = result.Buffer;
long bufferLength = sequence.Length;
SequencePosition consumed = sequence.End;

try
{
if (bufferLength != 0)
{
int actual = (int)Math.Min(bufferLength, buffer.Length);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cast to int? Just keep it as a long as you compare to buffer length afterwards.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return value is an int so it has to be an int eventually.


ReadOnlySequence<byte> slice = actual == bufferLength ? sequence : sequence.Slice(0, actual);
consumed = slice.End;
slice.CopyTo(buffer.Span);

return actual;
}

if (result.IsCompleted)
{
return 0;
}
}
finally
{
_pipeReader.AdvanceTo(consumed);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand the purpose of the try/finally. I'm not sure what here could cause an exception, but if something did and we haven't yet gotten to consumed = slice.End, we'll end up advancing past everything that was read, but if such an exception happened after consumed = slice.End, then we'll end up advancing only past the length up to actual, and in any exceptional case, the consumer didn't actually read anything so it seems we shouldn't advance at all?

I guess I'm wondering why this try/finally is needed at all, and why instead there isn't just a _pipeReader.AdvanceTo(slice.End) call just before return actual;? Does it have to do with the PipeReader contract around pairings of ReadAsync and AdvanceTo?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right it’s about not leaving the reader in that “I’m still reading” state. Some of that has since been relaxed where AdvanceTo can be skipped if the next call is Complete.

But it doesn’t look like anything should really throw here. Even if it did, it’s possible you don’t want to make it consume everything unless it returned from the branch where data is returned.

Tl;DR this code is old and was tweaked a bunch of times and maybe there’s some paranoia in here that may no longer be strictly required

}

// This is a buggy PipeReader implementation that returns 0 byte reads even though the PipeReader
// isn't completed or cancelled
davidfowl marked this conversation as resolved.
Show resolved Hide resolved
ThrowHelper.ThrowInvalidOperationException_InvalidZeroByteRead();
return 0;
}

/// <inheritdoc />
davidfowl marked this conversation as resolved.
Show resolved Hide resolved
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
// Delegate to CopyToAsync on the PipeReader
return _pipeReader.CopyToAsync(destination, cancellationToken);
}
}
}
13 changes: 12 additions & 1 deletion src/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace System.IO.Pipelines
/// </summary>
public abstract partial class PipeWriter : IBufferWriter<byte>
{
private PipeWriterStream _stream;

/// <summary>
/// Marks the <see cref="PipeWriter"/> as being complete, meaning no more items will be written to it.
/// </summary>
Expand Down Expand Up @@ -43,6 +45,15 @@ public abstract partial class PipeWriter : IBufferWriter<byte>
/// <inheritdoc />
public abstract Span<byte> GetSpan(int sizeHint = 0);

/// <summary>
///
/// </summary>
/// <returns></returns>
public virtual Stream AsStream()
{
return _stream ?? (_stream = new PipeWriterStream(this));
}

/// <summary>
/// Writes <paramref name="source"/> to the pipe and makes data accessible to <see cref="PipeReader"/>
/// </summary>
Expand Down Expand Up @@ -76,7 +87,7 @@ protected internal virtual async Task CopyFromAsync(Stream source, CancellationT

if (result.IsCanceled)
{
throw new OperationCanceledException();
ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
}

if (result.IsCompleted)
Expand Down
Loading