-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Added AsStream to PipeReader and PipeWriter #35399
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
// Licensed to the .NET Foundation under one or more agreements. | ||
// The .NET Foundation licenses this file to you under the MIT license. | ||
// See the LICENSE file in the project root for more information. | ||
|
||
using System; | ||
using System.Buffers; | ||
using System.Diagnostics; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
namespace System.IO.Pipelines | ||
{ | ||
internal sealed class PipeReaderStream : Stream | ||
{ | ||
private readonly PipeReader _pipeReader; | ||
|
||
public PipeReaderStream(PipeReader pipeReader) | ||
{ | ||
Debug.Assert(pipeReader != null); | ||
_pipeReader = pipeReader; | ||
} | ||
|
||
public override bool CanRead => true; | ||
|
||
public override bool CanSeek => false; | ||
|
||
public override bool CanWrite => false; | ||
|
||
public override long Length => throw new NotSupportedException(); | ||
|
||
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } | ||
|
||
public override void Flush() | ||
{ | ||
} | ||
|
||
public override int Read(byte[] buffer, int offset, int count) | ||
{ | ||
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); | ||
} | ||
|
||
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); | ||
|
||
public override void SetLength(long value) => throw new NotSupportedException(); | ||
|
||
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); | ||
|
||
public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) => | ||
TaskToApm.Begin(ReadAsync(buffer, offset, count, default), callback, state); | ||
|
||
public sealed override int EndRead(IAsyncResult asyncResult) => | ||
TaskToApm.End<int>(asyncResult); | ||
|
||
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is relying on Memory's argument validation. That will end up allowing a null buffer if offset and count are 0. Might want to add a null / throw check here, but up to you. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's fine relying on the memory ctor. |
||
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask(); | ||
} | ||
|
||
#if !netstandard | ||
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) | ||
{ | ||
return ReadAsyncInternal(buffer, cancellationToken); | ||
} | ||
#endif | ||
|
||
private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken) | ||
{ | ||
ReadResult result = await _pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false); | ||
|
||
if (result.IsCanceled) | ||
{ | ||
ThrowHelper.ThrowOperationCanceledException_ReadCanceled(); | ||
} | ||
|
||
ReadOnlySequence<byte> sequence = result.Buffer; | ||
long bufferLength = sequence.Length; | ||
SequencePosition consumed = sequence.Start; | ||
|
||
try | ||
{ | ||
if (bufferLength != 0) | ||
{ | ||
int actual = (int)Math.Min(bufferLength, buffer.Length); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why cast to int? Just keep it as a long as you compare to buffer length afterwards. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The return value is an int so it has to be an int eventually. |
||
|
||
ReadOnlySequence<byte> slice = actual == bufferLength ? sequence : sequence.Slice(0, actual); | ||
consumed = slice.End; | ||
slice.CopyTo(buffer.Span); | ||
|
||
return actual; | ||
} | ||
|
||
if (result.IsCompleted) | ||
{ | ||
return 0; | ||
} | ||
} | ||
finally | ||
{ | ||
_pipeReader.AdvanceTo(consumed); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm trying to understand the purpose of the try/finally. I'm not sure what here could cause an exception, but if something did and we haven't yet gotten to I guess I'm wondering why this try/finally is needed at all, and why instead there isn't just a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right it’s about not leaving the reader in that “I’m still reading” state. Some of that has since been relaxed where AdvanceTo can be skipped if the next call is Complete. But it doesn’t look like anything should really throw here. Even if it did, it’s possible you don’t want to make it consume everything unless it returned from the branch where data is returned. Tl;DR this code is old and was tweaked a bunch of times and maybe there’s some paranoia in here that may no longer be strictly required |
||
} | ||
|
||
// This is a buggy PipeReader implementation that returns 0 byte reads even though the PipeReader | ||
// isn't completed or canceled | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we fix this in 3.0 then? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's nothing to fix. The method is virtual on an abstract class, if the implementation is buggy, this will throw. Our implementations AFAIK aren't buggy. |
||
ThrowHelper.ThrowInvalidOperationException_InvalidZeroByteRead(); | ||
return 0; | ||
} | ||
|
||
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) | ||
{ | ||
// Delegate to CopyToAsync on the PipeReader | ||
return _pipeReader.CopyToAsync(destination, cancellationToken); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is C#8 already 😎