Skip to content

Commit

Permalink
Allow un-examining in PipeReader.AdvanceTo(...) (#107360)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy authored Oct 6, 2024
1 parent 112ef3d commit 438cf85
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,6 @@ internal void AdvanceReader(in SequencePosition consumed, in SequencePosition ex
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}

// TODO: Use new SequenceMarshal.TryGetReadOnlySequenceSegment to get the correct data
// directly casting only works because the type value in ReadOnlySequenceSegment is 0
AdvanceReader((BufferSegment?)consumed.GetObject(), consumed.GetInteger(), (BufferSegment?)examined.GetObject(), examined.GetInteger());
}

Expand All @@ -486,14 +484,11 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu

if (examinedSegment != null && _lastExaminedIndex >= 0)
{
// This can be negative resulting in _unconsumedBytes increasing, this should be safe because we've already checked that
// examined >= consumed above, so we can't get into a state where we un-examine too much
long examinedBytes = BufferSegment.GetLength(_lastExaminedIndex, examinedSegment, examinedIndex);
long oldLength = _unconsumedBytes;

if (examinedBytes < 0)
{
ThrowHelper.ThrowInvalidOperationException_InvalidExaminedPosition();
}

_unconsumedBytes -= examinedBytes;

// Store the absolute position
Expand All @@ -505,6 +500,8 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu
if (oldLength >= ResumeWriterThreshold &&
_unconsumedBytes < ResumeWriterThreshold)
{
// Should only release backpressure if we made forward progress
Debug.Assert(examinedBytes > 0);
_writerAwaitable.Complete(out completionData);
}
}
Expand Down Expand Up @@ -570,7 +567,7 @@ void MoveReturnEndToNextBlock()
// but only if writer is not completed yet
if (examinedEverything && !_writerCompletion.IsCompleted)
{
Debug.Assert(_writerAwaitable.IsCompleted, "PipeWriter.FlushAsync is isn't completed and will deadlock");
Debug.Assert(_writerAwaitable.IsCompleted, "PipeWriter.FlushAsync isn't completed and will deadlock");

_readerAwaitable.SetUncompleted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,15 @@ protected virtual async ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSi
/// The <see cref="System.IO.Pipelines.ReadResult.Buffer" /> previously returned from <see cref="System.IO.Pipelines.PipeReader.ReadAsync(System.Threading.CancellationToken)" /> must not be accessed after this call.
/// This is equivalent to calling <see cref="System.IO.Pipelines.PipeReader.AdvanceTo(System.SequencePosition,System.SequencePosition)" /> with identical examined and consumed positions.
/// The examined data communicates to the pipeline when it should signal more data is available.
/// Because the consumed parameter doubles as the examined parameter, the consumed parameter should be greater than or equal to the examined position in the previous call to `AdvanceTo`. Otherwise, an <see cref="System.InvalidOperationException" /> is thrown.</remarks>
/// </remarks>
public abstract void AdvanceTo(SequencePosition consumed);

/// <summary>Moves forward the pipeline's read cursor to after the consumed data, marking the data as processed, read and examined.</summary>
/// <param name="consumed">Marks the extent of the data that has been successfully processed.</param>
/// <param name="examined">Marks the extent of the data that has been read and examined.</param>
/// <remarks>The memory for the consumed data will be released and no longer available.
/// The <see cref="System.IO.Pipelines.ReadResult.Buffer" /> previously returned from <see cref="System.IO.Pipelines.PipeReader.ReadAsync(System.Threading.CancellationToken)" /> must not be accessed after this call.
/// The examined data communicates to the pipeline when it should signal more data is available.
/// The examined parameter should be greater than or equal to the examined position in the previous call to `AdvanceTo`. Otherwise, an <see cref="System.InvalidOperationException" /> is thrown.</remarks>
/// The examined data communicates to the pipeline when it should signal more data is available.</remarks>
public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined);

/// <summary>Returns a <see cref="System.IO.Stream" /> representation of the <see cref="System.IO.Pipelines.PipeReader" />.</summary>
Expand Down
28 changes: 28 additions & 0 deletions src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,5 +241,33 @@ public async Task FlushAsyncThrowsIfReaderCompletedWithException()
invalidOperationException = await Assert.ThrowsAsync<InvalidOperationException>(async () => await writableBuffer.FlushAsync());
Assert.Equal("Reader failed", invalidOperationException.Message);
}

[Fact]
public void FlushAsyncAwaitableDoesNotCompleteWhenReaderUnexamines()
{
PipeWriter writableBuffer = _pipe.Writer.WriteEmpty(PauseWriterThreshold);
ValueTask<FlushResult> flushAsync = writableBuffer.FlushAsync();

ReadResult result = _pipe.Reader.ReadAsync().GetAwaiter().GetResult();
SequencePosition examined = result.Buffer.GetPosition(2);
// Examine 2, don't advance consumed
_pipe.Reader.AdvanceTo(result.Buffer.Start, examined);

Assert.False(flushAsync.IsCompleted);

result = _pipe.Reader.ReadAsync().GetAwaiter().GetResult();
// Examine 1 which is less than the previous examined index of 2
examined = result.Buffer.GetPosition(1);
_pipe.Reader.AdvanceTo(result.Buffer.Start, examined);

Assert.False(flushAsync.IsCompleted);

// Just make sure we can still release backpressure
result = _pipe.Reader.ReadAsync().GetAwaiter().GetResult();
examined = result.Buffer.GetPosition(ResumeWriterThreshold + 1);
_pipe.Reader.AdvanceTo(examined, examined);

Assert.True(flushAsync.IsCompleted);
}
}
}
24 changes: 3 additions & 21 deletions src/libraries/System.IO.Pipelines/tests/PipeLengthTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,24 +220,6 @@ public async Task ExaminedAtSecondLastBlockWorks()
Assert.Equal(0, _pipe.Length);
}

[Fact]
public async Task ExaminedLessThanBeforeThrows()
{
_pipe.Writer.WriteEmpty(10);
await _pipe.Writer.FlushAsync();

ReadResult result = await _pipe.Reader.ReadAsync();
_pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.End);

Assert.Equal(0, _pipe.Length);

_pipe.Writer.WriteEmpty(10);
await _pipe.Writer.FlushAsync();

result = await _pipe.Reader.ReadAsync();
Assert.Throws<InvalidOperationException>(() => _pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.Start));
}

[Fact]
public async Task ConsumedGreaterThanExaminedThrows()
{
Expand Down Expand Up @@ -284,10 +266,10 @@ public async Task AdvanceFollowedByWriteAsyncTest()
Memory<byte> buffer = new byte[26];
Pipe pipe = new(new PipeOptions(minimumSegmentSize: 1));

var mem = pipe.Writer.GetMemory(14)[..14];
buffer[..14].CopyTo(mem);
var mem = pipe.Writer.GetMemory(14).Slice(0, 14);
buffer.Slice(0, 14).CopyTo(mem);
pipe.Writer.Advance(14);
await pipe.Writer.WriteAsync(buffer[14..]);
await pipe.Writer.WriteAsync(buffer.Slice(14));
ReadResult res = await pipe.Reader.ReadAsync();
Assert.Equal(res.Buffer.Length, buffer.Length);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public async Task DisposingPipeReaderStreamCompletesPipeReader(bool dataInPipe)
for (int i = 0; i < 2; i++)
{
s.Dispose();
#if NET
await s.DisposeAsync();
#endif
}

// Make sure OnReaderCompleted was invoked.
Expand Down Expand Up @@ -296,6 +298,32 @@ public void AsStreamDoNotCompleteReader()
pipeReader.AsStream(leaveOpen: true).Dispose();
}

// Regression test: https://github.com/dotnet/runtime/issues/107213
[Fact]
public async Task ZeroByteReadWorksWhenExaminedDoesNotEqualConsumed()
{
Pipe pipe = new Pipe();
Stream stream = pipe.Reader.AsStream();

await pipe.Writer.WriteAsync(new byte[2]);

ReadResult readResult = await pipe.Reader.ReadAsync();
// Consume: 0, Advance: 2
pipe.Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);

// Write more so that the next read will see unexamined bytes available and not block
await pipe.Writer.WriteAsync(new byte[2]);

// Zero-byte read to test that advancing (via PipeReader.AdvanceTo(consumed)) doesn't throw due to examined being less than
// the last examined position
int result = await stream.ReadAsync(Memory<byte>.Empty);
Assert.Equal(0, result);

// Real read to make sure data is immediately available
result = await stream.ReadAsync(new byte[100]);
Assert.Equal(4, result);
}

public class BuggyPipeReader : PipeReader
{
public override void AdvanceTo(SequencePosition consumed)
Expand Down Expand Up @@ -405,7 +433,7 @@ public static IEnumerable<object[]> ReadCalls

ReadAsyncDelegate readSpanSync = (stream, data) =>
{
return Task.FromResult(stream.Read(data));
return Task.FromResult(stream.Read(data, 0, data.Length));
};

yield return new object[] { readArrayAsync };
Expand Down
82 changes: 82 additions & 0 deletions src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,88 @@ public async Task ReadAsyncReturnsDataAfterCanceledRead()
pipe.Reader.AdvanceTo(readResult.Buffer.End);
}

// Regression test: https://github.com/dotnet/runtime/issues/107213
[Fact]
public async Task AdvanceToWithoutExaminedCanUnExamine()
{
PipeWriter buffer = _pipe.Writer;
buffer.Write("Hello Worl"u8.ToArray());
await buffer.FlushAsync();

bool gotData = _pipe.Reader.TryRead(out ReadResult result);
Assert.True(gotData);

Assert.Equal("Hello Worl", Encoding.ASCII.GetString(result.Buffer.ToArray()));

// Advance past 'Hello ' and examine everything else
_pipe.Reader.AdvanceTo(result.Buffer.GetPosition(6), result.Buffer.End);

// Write so that the next ReadAsync will be unblocked
buffer.Write("d"u8.ToArray());
await buffer.FlushAsync();

result = await _pipe.Reader.ReadAsync();

Assert.Equal("World", Encoding.ASCII.GetString(result.Buffer.ToArray()));

// Previous examine is at the end of 'Worl', calling AdvanceTo without passing examined will unexamine (not externally visible)
// But more importantly, it will work and not throw that you're unexamining
_pipe.Reader.AdvanceTo(result.Buffer.Start);

// Double check that ReadAsync is still unblocked and works
result = await _pipe.Reader.ReadAsync();
Assert.Equal("World", Encoding.ASCII.GetString(result.Buffer.ToArray()));
}

[Fact]
public async Task AdvanceToWithExaminedCanUnExamine()
{
PipeWriter buffer = _pipe.Writer;
buffer.Write("Hello Worl"u8.ToArray());
await buffer.FlushAsync();

bool gotData = _pipe.Reader.TryRead(out ReadResult result);
Assert.True(gotData);

Assert.Equal("Hello Worl", Encoding.ASCII.GetString(result.Buffer.ToArray()));

// Advance past 'Hello ' and examine everything else
_pipe.Reader.AdvanceTo(result.Buffer.GetPosition(6), result.Buffer.End);

// Write so that the next ReadAsync will be unblocked
buffer.Write("d"u8.ToArray());
await buffer.FlushAsync();

result = await _pipe.Reader.ReadAsync();

Assert.Equal("World", Encoding.ASCII.GetString(result.Buffer.ToArray()));

// Previous examine is at the end of 'Worl', calling AdvanceTo without passing examined will unexamine (not externally visible)
// But more importantly, it will work and not throw that you're unexamining
_pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.GetPosition(1));

// Double check that ReadAsync is still unblocked and works
result = await _pipe.Reader.ReadAsync();
Assert.Equal("World", Encoding.ASCII.GetString(result.Buffer.ToArray()));
}

[Fact]
public async Task ExaminedCannotBeBeforeConsumed()
{
PipeWriter buffer = _pipe.Writer;
buffer.Write("Hello World"u8.ToArray());
await buffer.FlushAsync();

bool gotData = _pipe.Reader.TryRead(out ReadResult result);
Assert.True(gotData);

Assert.Equal("Hello World", Encoding.ASCII.GetString(result.Buffer.ToArray()));

InvalidOperationException ex = Assert.Throws<InvalidOperationException>(
() => _pipe.Reader.AdvanceTo(result.Buffer.GetPosition(6), result.Buffer.GetPosition(5)));
Assert.Equal("The examined position must be greater than or equal to the consumed position.", ex.Message);
}

private bool IsTaskWithResult<T>(ValueTask<T> task)
{
return task == new ValueTask<T>(task.Result);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TargetFrameworks>$(NetCoreAppCurrent);$(NetFrameworkMinimum)</TargetFrameworks>
Expand All @@ -25,6 +25,7 @@
<Compile Include="PipeCompletionCallbacksTests.cs" />
<Compile Include="PipeOptionsTests.cs" />
<Compile Include="PipeReaderWriterFacts.cs" />
<Compile Include="PipeReaderStreamTests.cs" />
<Compile Include="PipePoolTests.cs" />
<Compile Include="PipeResetTests.cs" />
<Compile Include="PipeTest.cs" />
Expand All @@ -43,15 +44,14 @@
<Compile Include="StreamPipeWriterTests.cs" />
<Compile Include="Infrastructure\ThrowAfterNWritesStream.cs" />
<Compile Include="UnflushedBytesTests.cs" />
<Compile Include="PipeLengthTests.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' == '.NETCoreApp'">
<Compile Include="PipeLengthTests.cs" />
<Compile Include="BufferSegmentPoolTest.cs" />
<Compile Include="PipeReaderWriterFacts.nonnetstandard.cs" />
<Compile Include="PipeResetTests.nonnetstandard.cs" />
<Compile Include="PipePoolTests.nonnetstandard.cs" />
<Compile Include="PipeWriterStreamTests.nonnetstandard.cs" />
<Compile Include="PipeReaderStreamTests.nonnetstandard.cs" />
<Compile Include="PipeReaderWriterStreamTests.nonnetstandard.cs" />
</ItemGroup>
<ItemGroup>
Expand Down

0 comments on commit 438cf85

Please sign in to comment.