From c68a125852011b55de604bbf5c61893e17be9519 Mon Sep 17 00:00:00 2001 From: Brennan Date: Sun, 6 Oct 2024 16:06:43 -0700 Subject: [PATCH] Allow un-examining in PipeReader.AdvanceTo(...) (#107360) --- .../src/System/IO/Pipelines/Pipe.cs | 13 ++- .../src/System/IO/Pipelines/PipeReader.cs | 5 +- .../tests/BackpressureTests.cs | 28 +++++++ .../tests/PipeLengthTests.cs | 24 +----- ...etstandard.cs => PipeReaderStreamTests.cs} | 30 ++++++- .../tests/PipeReaderWriterFacts.cs | 82 +++++++++++++++++++ .../tests/System.IO.Pipelines.Tests.csproj | 6 +- 7 files changed, 152 insertions(+), 36 deletions(-) rename src/libraries/System.IO.Pipelines/tests/{PipeReaderStreamTests.nonnetstandard.cs => PipeReaderStreamTests.cs} (92%) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 2cf4ddc028c49..e375311e54559 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -458,8 +458,6 @@ internal void AdvanceReader(in SequencePosition consumed, in SequencePosition ex ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } - // TODO: Use new SequenceMarshal.TryGetReadOnlySequenceSegment to get the correct data - // directly casting only works because the type value in ReadOnlySequenceSegment is 0 AdvanceReader((BufferSegment?)consumed.GetObject(), consumed.GetInteger(), (BufferSegment?)examined.GetObject(), examined.GetInteger()); } @@ -486,14 +484,11 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu if (examinedSegment != null && _lastExaminedIndex >= 0) { + // This can be negative resulting in _unconsumedBytes increasing, this should be safe because we've already checked that + // examined >= consumed above, so we can't get into a state where we un-examine too much long examinedBytes = BufferSegment.GetLength(_lastExaminedIndex, examinedSegment, examinedIndex); long oldLength = _unconsumedBytes; - if (examinedBytes < 0) - { - ThrowHelper.ThrowInvalidOperationException_InvalidExaminedPosition(); - } - _unconsumedBytes -= examinedBytes; // Store the absolute position @@ -505,6 +500,8 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu if (oldLength >= ResumeWriterThreshold && _unconsumedBytes < ResumeWriterThreshold) { + // Should only release backpressure if we made forward progress + Debug.Assert(examinedBytes > 0); _writerAwaitable.Complete(out completionData); } } @@ -570,7 +567,7 @@ void MoveReturnEndToNextBlock() // but only if writer is not completed yet if (examinedEverything && !_writerCompletion.IsCompleted) { - Debug.Assert(_writerAwaitable.IsCompleted, "PipeWriter.FlushAsync is isn't completed and will deadlock"); + Debug.Assert(_writerAwaitable.IsCompleted, "PipeWriter.FlushAsync isn't completed and will deadlock"); _readerAwaitable.SetUncompleted(); } diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs index 5d9135f6e37b7..5022bf20e654f 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs @@ -86,7 +86,7 @@ protected virtual async ValueTask ReadAtLeastAsyncCore(int minimumSi /// The previously returned from must not be accessed after this call. /// This is equivalent to calling with identical examined and consumed positions. /// The examined data communicates to the pipeline when it should signal more data is available. - /// Because the consumed parameter doubles as the examined parameter, the consumed parameter should be greater than or equal to the examined position in the previous call to `AdvanceTo`. Otherwise, an is thrown. + /// public abstract void AdvanceTo(SequencePosition consumed); /// Moves forward the pipeline's read cursor to after the consumed data, marking the data as processed, read and examined. @@ -94,8 +94,7 @@ protected virtual async ValueTask ReadAtLeastAsyncCore(int minimumSi /// Marks the extent of the data that has been read and examined. /// The memory for the consumed data will be released and no longer available. /// The previously returned from must not be accessed after this call. - /// The examined data communicates to the pipeline when it should signal more data is available. - /// The examined parameter should be greater than or equal to the examined position in the previous call to `AdvanceTo`. Otherwise, an is thrown. + /// The examined data communicates to the pipeline when it should signal more data is available. public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined); /// Returns a representation of the . diff --git a/src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs b/src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs index aa82ad48ac8c0..cf6c4c30f83bf 100644 --- a/src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs @@ -241,5 +241,33 @@ public async Task FlushAsyncThrowsIfReaderCompletedWithException() invalidOperationException = await Assert.ThrowsAsync(async () => await writableBuffer.FlushAsync()); Assert.Equal("Reader failed", invalidOperationException.Message); } + + [Fact] + public void FlushAsyncAwaitableDoesNotCompleteWhenReaderUnexamines() + { + PipeWriter writableBuffer = _pipe.Writer.WriteEmpty(PauseWriterThreshold); + ValueTask flushAsync = writableBuffer.FlushAsync(); + + ReadResult result = _pipe.Reader.ReadAsync().GetAwaiter().GetResult(); + SequencePosition examined = result.Buffer.GetPosition(2); + // Examine 2, don't advance consumed + _pipe.Reader.AdvanceTo(result.Buffer.Start, examined); + + Assert.False(flushAsync.IsCompleted); + + result = _pipe.Reader.ReadAsync().GetAwaiter().GetResult(); + // Examine 1 which is less than the previous examined index of 2 + examined = result.Buffer.GetPosition(1); + _pipe.Reader.AdvanceTo(result.Buffer.Start, examined); + + Assert.False(flushAsync.IsCompleted); + + // Just make sure we can still release backpressure + result = _pipe.Reader.ReadAsync().GetAwaiter().GetResult(); + examined = result.Buffer.GetPosition(ResumeWriterThreshold + 1); + _pipe.Reader.AdvanceTo(examined, examined); + + Assert.True(flushAsync.IsCompleted); + } } } diff --git a/src/libraries/System.IO.Pipelines/tests/PipeLengthTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeLengthTests.cs index 8b01b86556f5b..24a384a45f448 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeLengthTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeLengthTests.cs @@ -220,24 +220,6 @@ public async Task ExaminedAtSecondLastBlockWorks() Assert.Equal(0, _pipe.Length); } - [Fact] - public async Task ExaminedLessThanBeforeThrows() - { - _pipe.Writer.WriteEmpty(10); - await _pipe.Writer.FlushAsync(); - - ReadResult result = await _pipe.Reader.ReadAsync(); - _pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.End); - - Assert.Equal(0, _pipe.Length); - - _pipe.Writer.WriteEmpty(10); - await _pipe.Writer.FlushAsync(); - - result = await _pipe.Reader.ReadAsync(); - Assert.Throws(() => _pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.Start)); - } - [Fact] public async Task ConsumedGreaterThanExaminedThrows() { @@ -284,10 +266,10 @@ public async Task AdvanceFollowedByWriteAsyncTest() Memory buffer = new byte[26]; Pipe pipe = new(new PipeOptions(minimumSegmentSize: 1)); - var mem = pipe.Writer.GetMemory(14)[..14]; - buffer[..14].CopyTo(mem); + var mem = pipe.Writer.GetMemory(14).Slice(0, 14); + buffer.Slice(0, 14).CopyTo(mem); pipe.Writer.Advance(14); - await pipe.Writer.WriteAsync(buffer[14..]); + await pipe.Writer.WriteAsync(buffer.Slice(14)); ReadResult res = await pipe.Reader.ReadAsync(); Assert.Equal(res.Buffer.Length, buffer.Length); } diff --git a/src/libraries/System.IO.Pipelines/tests/PipeReaderStreamTests.nonnetstandard.cs b/src/libraries/System.IO.Pipelines/tests/PipeReaderStreamTests.cs similarity index 92% rename from src/libraries/System.IO.Pipelines/tests/PipeReaderStreamTests.nonnetstandard.cs rename to src/libraries/System.IO.Pipelines/tests/PipeReaderStreamTests.cs index 5e817b6334ae9..2e56e73d4a78f 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeReaderStreamTests.nonnetstandard.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeReaderStreamTests.cs @@ -38,7 +38,9 @@ public async Task DisposingPipeReaderStreamCompletesPipeReader(bool dataInPipe) for (int i = 0; i < 2; i++) { s.Dispose(); +#if NET await s.DisposeAsync(); +#endif } // Make sure OnReaderCompleted was invoked. @@ -296,6 +298,32 @@ public void AsStreamDoNotCompleteReader() pipeReader.AsStream(leaveOpen: true).Dispose(); } + // Regression test: https://github.com/dotnet/runtime/issues/107213 + [Fact] + public async Task ZeroByteReadWorksWhenExaminedDoesNotEqualConsumed() + { + Pipe pipe = new Pipe(); + Stream stream = pipe.Reader.AsStream(); + + await pipe.Writer.WriteAsync(new byte[2]); + + ReadResult readResult = await pipe.Reader.ReadAsync(); + // Consume: 0, Advance: 2 + pipe.Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); + + // Write more so that the next read will see unexamined bytes available and not block + await pipe.Writer.WriteAsync(new byte[2]); + + // Zero-byte read to test that advancing (via PipeReader.AdvanceTo(consumed)) doesn't throw due to examined being less than + // the last examined position + int result = await stream.ReadAsync(Memory.Empty); + Assert.Equal(0, result); + + // Real read to make sure data is immediately available + result = await stream.ReadAsync(new byte[100]); + Assert.Equal(4, result); + } + public class BuggyPipeReader : PipeReader { public override void AdvanceTo(SequencePosition consumed) @@ -405,7 +433,7 @@ public static IEnumerable ReadCalls ReadAsyncDelegate readSpanSync = (stream, data) => { - return Task.FromResult(stream.Read(data)); + return Task.FromResult(stream.Read(data, 0, data.Length)); }; yield return new object[] { readArrayAsync }; diff --git a/src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs b/src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs index c5cd74d8469e5..173f49856c175 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs @@ -757,6 +757,88 @@ public async Task ReadAsyncReturnsDataAfterCanceledRead() pipe.Reader.AdvanceTo(readResult.Buffer.End); } + // Regression test: https://github.com/dotnet/runtime/issues/107213 + [Fact] + public async Task AdvanceToWithoutExaminedCanUnExamine() + { + PipeWriter buffer = _pipe.Writer; + buffer.Write("Hello Worl"u8.ToArray()); + await buffer.FlushAsync(); + + bool gotData = _pipe.Reader.TryRead(out ReadResult result); + Assert.True(gotData); + + Assert.Equal("Hello Worl", Encoding.ASCII.GetString(result.Buffer.ToArray())); + + // Advance past 'Hello ' and examine everything else + _pipe.Reader.AdvanceTo(result.Buffer.GetPosition(6), result.Buffer.End); + + // Write so that the next ReadAsync will be unblocked + buffer.Write("d"u8.ToArray()); + await buffer.FlushAsync(); + + result = await _pipe.Reader.ReadAsync(); + + Assert.Equal("World", Encoding.ASCII.GetString(result.Buffer.ToArray())); + + // Previous examine is at the end of 'Worl', calling AdvanceTo without passing examined will unexamine (not externally visible) + // But more importantly, it will work and not throw that you're unexamining + _pipe.Reader.AdvanceTo(result.Buffer.Start); + + // Double check that ReadAsync is still unblocked and works + result = await _pipe.Reader.ReadAsync(); + Assert.Equal("World", Encoding.ASCII.GetString(result.Buffer.ToArray())); + } + + [Fact] + public async Task AdvanceToWithExaminedCanUnExamine() + { + PipeWriter buffer = _pipe.Writer; + buffer.Write("Hello Worl"u8.ToArray()); + await buffer.FlushAsync(); + + bool gotData = _pipe.Reader.TryRead(out ReadResult result); + Assert.True(gotData); + + Assert.Equal("Hello Worl", Encoding.ASCII.GetString(result.Buffer.ToArray())); + + // Advance past 'Hello ' and examine everything else + _pipe.Reader.AdvanceTo(result.Buffer.GetPosition(6), result.Buffer.End); + + // Write so that the next ReadAsync will be unblocked + buffer.Write("d"u8.ToArray()); + await buffer.FlushAsync(); + + result = await _pipe.Reader.ReadAsync(); + + Assert.Equal("World", Encoding.ASCII.GetString(result.Buffer.ToArray())); + + // Previous examine is at the end of 'Worl', calling AdvanceTo without passing examined will unexamine (not externally visible) + // But more importantly, it will work and not throw that you're unexamining + _pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.GetPosition(1)); + + // Double check that ReadAsync is still unblocked and works + result = await _pipe.Reader.ReadAsync(); + Assert.Equal("World", Encoding.ASCII.GetString(result.Buffer.ToArray())); + } + + [Fact] + public async Task ExaminedCannotBeBeforeConsumed() + { + PipeWriter buffer = _pipe.Writer; + buffer.Write("Hello World"u8.ToArray()); + await buffer.FlushAsync(); + + bool gotData = _pipe.Reader.TryRead(out ReadResult result); + Assert.True(gotData); + + Assert.Equal("Hello World", Encoding.ASCII.GetString(result.Buffer.ToArray())); + + InvalidOperationException ex = Assert.Throws( + () => _pipe.Reader.AdvanceTo(result.Buffer.GetPosition(6), result.Buffer.GetPosition(5))); + Assert.Equal("The examined position must be greater than or equal to the consumed position.", ex.Message); + } + private bool IsTaskWithResult(ValueTask task) { return task == new ValueTask(task.Result); diff --git a/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj b/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj index dacaab35e5d4b..19851efa3ec0a 100644 --- a/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj +++ b/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj @@ -1,4 +1,4 @@ - + true $(NetCoreAppCurrent);$(NetFrameworkMinimum) @@ -25,6 +25,7 @@ + @@ -43,15 +44,14 @@ + - -