diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index db3c5edb739..5fa10e88b14 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -17,6 +17,7 @@ using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Http; +using Microsoft.IO; using Nethermind.Config; using Nethermind.Core.Collections; using Nethermind.Core.Extensions; @@ -141,12 +142,12 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso // Initializes a buffer to store the data read from the reader. ReadOnlySequence buffer = default; + RecyclableMemoryStream? bufferedStream = null; + bool readCompleted = false; try { - // Asynchronously reads data from the PipeReader. - ReadResult readResult = await reader.ReadToEndAsync(timeoutSource.Token); - - buffer = readResult.Buffer; + // Asynchronously reads data from the PipeReader while consuming it to avoid backpressure. + (buffer, bufferedStream, readCompleted) = await ReadToEndBufferedAsync(reader, timeoutSource.Token); // Placeholder for a result in case of deserialization failure. JsonRpcResult? deserializationFailureResult = null; @@ -256,7 +257,7 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso } // Checks if the read operation is completed. - if (readResult.IsCompleted) + if (readCompleted) { if (buffer.Length > 0 && HasNonWhitespace(buffer)) { @@ -266,11 +267,7 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso } finally { - // Advances the reader to the end of the buffer if not null. - if (!buffer.FirstSpan.IsNull()) - { - reader.AdvanceTo(buffer.End); - } + bufferedStream?.Dispose(); } // Completes the PipeReader's asynchronous reading operation. @@ -408,6 +405,44 @@ private static bool TryParseJson(ref ReadOnlySequence buffer, [NotNullWhen } + private static async Task<(ReadOnlySequence Buffer, RecyclableMemoryStream Stream, bool IsCompleted)> ReadToEndBufferedAsync(PipeReader reader, CancellationToken cancellationToken) + { + RecyclableMemoryStream stream = RecyclableStream.GetStream("rpc"); + bool isCompleted = false; + + while (true) + { + ReadResult result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); + ReadOnlySequence resultBuffer = result.Buffer; + foreach (ReadOnlyMemory segment in resultBuffer) + { + stream.Write(segment.Span); + } + + reader.AdvanceTo(resultBuffer.End); + + if (result.IsCompleted || result.IsCanceled) + { + isCompleted = result.IsCompleted; + break; + } + } + + ReadOnlySequence buffer = ReadOnlySequence.Empty; + if (stream.Length > 0) + { + if (!stream.TryGetBuffer(out ArraySegment segment)) + { + segment = new ArraySegment(stream.ToArray()); + } + + ReadOnlyMemory memory = new(segment.Array!, segment.Offset, checked((int)stream.Length)); + buffer = new ReadOnlySequence(memory); + } + + return (buffer, stream, isCompleted); + } + private bool IsRecordingRequest => (_jsonRpcConfig.RpcRecorderState & RpcRecorderState.Request) != 0; private bool IsRecordingResponse => (_jsonRpcConfig.RpcRecorderState & RpcRecorderState.Response) != 0;