diff --git a/src/Nethermind/Nethermind.Core/Extensions/ReadOnlySequenceExtensions.cs b/src/Nethermind/Nethermind.Core/Extensions/ReadOnlySequenceExtensions.cs index f53858d950b..716dfbbd41e 100644 --- a/src/Nethermind/Nethermind.Core/Extensions/ReadOnlySequenceExtensions.cs +++ b/src/Nethermind/Nethermind.Core/Extensions/ReadOnlySequenceExtensions.cs @@ -12,24 +12,33 @@ public static class ReadOnlySequenceExtensions public static ReadOnlySequence TrimStart(this ReadOnlySequence sequence, byte[]? chars = null) { - SequencePosition position = sequence.Start; ReadOnlySpan charsSpan = chars ?? WhitespaceChars; - while (sequence.TryGet(ref position, out ReadOnlyMemory memory)) + SequencePosition start = sequence.Start; + + foreach (ReadOnlyMemory memory in sequence) { ReadOnlySpan span = memory.Span; int index = span.IndexOfAnyExcept(charsSpan); - if (index != 0) + if (index == -1) + { + // The entire segment is trimmed chars, advance past it + start = sequence.GetPosition(span.Length, start); + } + else if (index > 0) { - // if index == -1, then the whole span is whitespace - sequence = sequence.Slice(index != -1 ? index : span.Length); + // Found non-trimmed char partway through the segment + start = sequence.GetPosition(index, start); + return sequence.Slice(start); } else { - return sequence; + // First char is non-trimmed, we're done + return sequence.Slice(start); } } - return sequence; + // The entire sequence was trimmed chars + return sequence.Slice(sequence.End); } } diff --git a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs index d95e38b8488..99d9b33ba9c 100644 --- a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs +++ b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.IO.Abstractions; +using System.IO.Pipelines; using System.Linq; using System.Text; using System.Threading; @@ -24,7 +25,7 @@ public class JsonRpcProcessorTests(bool returnErrors) { private readonly JsonRpcErrorResponse _errorResponse = new(); - private JsonRpcProcessor Initialize(JsonRpcConfig? config = null) + private JsonRpcProcessor Initialize(JsonRpcConfig? config = null, RpcRecorderState recorderState = RpcRecorderState.All) { IJsonRpcService service = Substitute.For(); service.SendRequestAsync(Arg.Any(), Arg.Any()).Returns(ci => returnErrors ? new JsonRpcErrorResponse { Id = ci.Arg().Id } : new JsonRpcSuccessResponse { Id = ci.Arg().Id }); @@ -33,11 +34,9 @@ private JsonRpcProcessor Initialize(JsonRpcConfig? config = null) IFileSystem fileSystem = Substitute.For(); - /* we enable recorder always to have an easy smoke test for recording - * and this is fine because recorder is non-critical component - */ + // we enable recorder always to have an easy smoke test for recording and this is fine because recorder is a non-critical component config ??= new JsonRpcConfig(); - config.RpcRecorderState = RpcRecorderState.All; + config.RpcRecorderState = recorderState; return new JsonRpcProcessor(service, config, fileSystem, LimboLogs.Instance); } @@ -409,4 +408,65 @@ public void Cannot_accept_null_file_system() Substitute.For(), null!, LimboLogs.Instance)); } + + [Test] + public async Task Can_process_multiple_large_requests_arriving_in_chunks() + { + Pipe pipe = new(); + JsonRpcProcessor processor = Initialize(recorderState: RpcRecorderState.None); + JsonRpcContext context = new(RpcEndpoint.Ws); + + // Create 5 large JSON-RPC requests (~10KB each) + List requests = Enumerable.Range(0, 5) + .Select(i => CreateLargeRequest(i, targetSize: 10_000)) + .ToList(); + + string allRequestsJson = string.Join("\n", requests); + byte[] bytes = Encoding.UTF8.GetBytes(allRequestsJson); + + // Start processing task (reads from pipe.Reader) + ValueTask> processTask = processor + .ProcessAsync(pipe.Reader, context) + .ToListAsync(); + + // Write data in 1KB chunks with small delays to simulate network + const int chunkSize = 1024; + for (int i = 0; i < bytes.Length; i += chunkSize) + { + int size = Math.Min(chunkSize, bytes.Length - i); + await pipe.Writer.WriteAsync(new ReadOnlyMemory(bytes, i, size)); + await Task.Yield(); + } + await pipe.Writer.CompleteAsync(); + + // Verify all 5 requests processed + List results = await processTask; + results.Should().HaveCount(5); + for (int i = 0; i < 5; i++) + { + results[i].Response.Should().NotBeNull(); + } + results.DisposeItems(); + } + + private static string CreateLargeRequest(int id, int targetSize) + { + StringBuilder sb = new(); + sb.Append($"{{\"jsonrpc\":\"2.0\",\"id\":{id},\"method\":\"test_method\",\"params\":["); + + int currentSize = sb.Length + 2; // account for closing ]} + bool first = true; + int paramIndex = 0; + while (currentSize < targetSize) + { + string param = $"\"param_{paramIndex++}_padding\""; + if (!first) sb.Append(','); + sb.Append(param); + currentSize += param.Length + (first ? 0 : 1); + first = false; + } + + sb.Append("]}"); + return sb.ToString(); + } } diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index db3c5edb739..bd042072fa1 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -21,6 +21,7 @@ using Nethermind.Core.Collections; using Nethermind.Core.Extensions; using Nethermind.Core.Resettables; +using Nethermind.JsonRpc.Modules; using Nethermind.Logging; using Nethermind.Serialization.Json; @@ -52,21 +53,21 @@ public JsonRpcProcessor(IJsonRpcService jsonRpcService, IJsonRpcConfig jsonRpcCo } } - public CancellationToken ProcessExit - => _processExitSource?.Token ?? default; + public CancellationToken ProcessExit => _processExitSource?.Token ?? default; - private (JsonRpcRequest? Model, ArrayPoolList? Collection) DeserializeObjectOrArray(JsonDocument doc) + private void DeserializeObjectOrArray(JsonDocument doc, out JsonRpcRequest? model, out ArrayPoolList? collection) { - return doc.RootElement.ValueKind switch + collection = null; + model = null; + switch (doc.RootElement.ValueKind) { - JsonValueKind.Array => (null, DeserializeArray(doc.RootElement)), - JsonValueKind.Object => (DeserializeObject(doc.RootElement), null), - _ => ThrowInvalid() - }; - - [DoesNotReturn, StackTraceHidden] - static (JsonRpcRequest? Model, ArrayPoolList? Collection) ThrowInvalid() - => throw new JsonException("Invalid"); + case JsonValueKind.Array: + collection = DeserializeArray(doc.RootElement); + break; + case JsonValueKind.Object: + model = DeserializeObject(doc.RootElement); + break; + } } private JsonRpcRequest DeserializeObject(JsonElement element) @@ -123,6 +124,8 @@ private JsonRpcRequest DeserializeObject(JsonElement element) private ArrayPoolList DeserializeArray(JsonElement element) => new(element.GetArrayLength(), element.EnumerateArray().Select(DeserializeObject)); + private static readonly JsonReaderOptions _socketJsonReaderOptions = new() { AllowMultipleValues = true }; + public async IAsyncEnumerable ProcessAsync(PipeReader reader, JsonRpcContext context) { if (ProcessExit.IsCancellationRequested) @@ -136,160 +139,192 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso reader = await RecordRequest(reader); } - long startTime = Stopwatch.GetTimestamp(); using CancellationTokenSource timeoutSource = _jsonRpcConfig.BuildTimeoutCancellationToken(); - - // Initializes a buffer to store the data read from the reader. - ReadOnlySequence buffer = default; + JsonReaderState readerState = CreateJsonReaderState(context); + bool freshState = true; + bool shouldExit = false; try { - // Asynchronously reads data from the PipeReader. - ReadResult readResult = await reader.ReadToEndAsync(timeoutSource.Token); - - buffer = readResult.Buffer; - // Placeholder for a result in case of deserialization failure. - JsonRpcResult? deserializationFailureResult = null; - - // Processes the buffer while it's not empty; before going out to outer loop to get more data. - while (!buffer.IsEmpty) + while (!shouldExit) { - JsonDocument? jsonDocument = null; - JsonRpcRequest? model = null; - ArrayPoolList? collection = null; + long startTime = Stopwatch.GetTimestamp(); + + ReadResult readResult; try { - // Tries to parse the JSON from the buffer. - if (!TryParseJson(ref buffer, out jsonDocument)) - { - deserializationFailureResult = GetParsingError(startTime, in buffer, "Error during parsing/validation."); - } - else - { - // Deserializes the JSON document into a request object or a collection of requests. - (model, collection) = DeserializeObjectOrArray(jsonDocument); - } + readResult = await reader.ReadAsync(timeoutSource.Token); } catch (BadHttpRequestException e) { - // Increments failure metric and logs the exception, then stops processing. - Metrics.JsonRpcRequestDeserializationFailures++; - if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}"); - yield break; + Handle(e); + break; } catch (ConnectionResetException e) { - // Logs exception, then stop processing. - if (_logger.IsTrace) _logger.Trace($"Connection reset.{Environment.NewLine}{e}"); - yield break; - } - catch (Exception ex) - { - deserializationFailureResult = GetParsingError(startTime, in buffer, "Error during parsing/validation.", ex); - } - - // Checks for deserialization failure and yields the result. - if (deserializationFailureResult.HasValue) - { - yield return deserializationFailureResult.Value; + Handle(e); break; } - // Handles a single JSON RPC request. - if (model is not null) - { - if (_logger.IsDebug) _logger.Debug($"JSON RPC request {model.Method}"); - - // Processes the individual request. - JsonRpcResult.Entry result = await HandleSingleRequest(model, context); - result.Response.AddDisposable(() => jsonDocument.Dispose()); - - // Returns the result of the processed request. - yield return JsonRpcResult.Single(RecordResponse(result)); - } + ReadOnlySequence buffer = readResult.Buffer; + bool advanced = false; - // Processes a collection of JSON RPC requests. - if (collection is not null) + try { - if (_logger.IsDebug) _logger.Debug($"{collection.Count} JSON RPC requests"); + bool isCompleted = readResult.IsCompleted || readResult.IsCanceled; + JsonRpcResult? result = null; + if (freshState) + { + buffer = buffer.TrimStart(); + } - // Checks for authentication and batch size limit. - if (!context.IsAuthenticated && collection.Count > _jsonRpcConfig.MaxBatchSize) + if (!buffer.IsEmpty) { - if (_logger.IsWarn) _logger.Warn($"The batch size limit was exceeded. The requested batch size {collection.Count}, and the current config setting is JsonRpc.{nameof(_jsonRpcConfig.MaxBatchSize)} = {_jsonRpcConfig.MaxBatchSize}."); - JsonRpcErrorResponse? response = _jsonRpcService.GetErrorResponse(ErrorCodes.LimitExceeded, "Batch size limit exceeded"); - response.AddDisposable(() => jsonDocument.Dispose()); - - deserializationFailureResult = JsonRpcResult.Single(RecordResponse(response, RpcReport.Error)); - collection.Dispose(); - yield return deserializationFailureResult.Value; - break; + try + { + freshState = TryParseJson(ref buffer, isCompleted, ref readerState, out JsonDocument? jsonDocument, context); + if (freshState) + { + result = await ProcessJsonDocument(jsonDocument, context, startTime); + } + else if (isCompleted && !buffer.IsEmpty) + { + result = GetParsingError(startTime, in buffer, "Error during parsing/validation: incomplete request."); + shouldExit = true; + } + + reader.AdvanceTo(buffer.Start, buffer.End); + advanced = true; + } + catch (BadHttpRequestException e) + { + Handle(e); + shouldExit = true; + } + catch (ConnectionResetException e) + { + Handle(e); + shouldExit = true; + } + catch (JsonException ex) + { + result = GetParsingError(startTime, in buffer, "Error during parsing/validation.", ex); + shouldExit = true; + } } - JsonRpcBatchResult jsonRpcBatchResult = new((e, c) => IterateRequest(collection, context, e).GetAsyncEnumerator(c)); - jsonRpcBatchResult.AddDisposable(() => collection.Dispose()); - yield return JsonRpcResult.Collection(jsonRpcBatchResult); - } - // Handles invalid requests. - if (model is null && collection is null) - { - Metrics.JsonRpcInvalidRequests++; - JsonRpcErrorResponse errorResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Invalid request"); - errorResponse.AddDisposable(() => jsonDocument.Dispose()); + if (result.HasValue) + { + yield return result.Value; + } - if (_logger.IsTrace) + shouldExit |= isCompleted && buffer.IsEmpty; + } + finally + { + if (!advanced) { - TraceResult(errorResponse); - TraceFailure(startTime); + reader.AdvanceTo(buffer.Start, buffer.End); } - deserializationFailureResult = JsonRpcResult.Single(RecordResponse(errorResponse, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMilliseconds, false))); - yield return deserializationFailureResult.Value; - break; } - - buffer = buffer.TrimStart(); } + } + finally + { + await reader.CompleteAsync(); + } + } + + private void Handle(ConnectionResetException e) + { + if (_logger.IsTrace) _logger.Trace($"Connection reset.{Environment.NewLine}{e}"); + } + + private void Handle(BadHttpRequestException e) + { + Metrics.JsonRpcRequestDeserializationFailures++; + if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}"); + } + + private static bool TryParseJson( + ref ReadOnlySequence buffer, + bool isFinalBlock, + ref JsonReaderState readerState, + [NotNullWhen(true)] out JsonDocument? jsonDocument, + JsonRpcContext context) + { + Utf8JsonReader jsonReader = new(buffer, isFinalBlock, readerState); + bool parsed = JsonDocument.TryParseValue(ref jsonReader, out jsonDocument); + buffer = buffer.Slice(jsonReader.BytesConsumed); + readerState = parsed + ? CreateJsonReaderState(context) // Reset state for the next document + : jsonReader.CurrentState; // Preserve state for resumption when more data arrives + + return parsed; + } + + private static JsonReaderState CreateJsonReaderState(JsonRpcContext context) => + new(context.RpcEndpoint == RpcEndpoint.Http ? default : _socketJsonReaderOptions); - // Checks if the deserialization failed - if (deserializationFailureResult.HasValue) + private async Task ProcessJsonDocument(JsonDocument jsonDocument, JsonRpcContext context, long startTime) + { + try + { + DeserializeObjectOrArray(jsonDocument, out JsonRpcRequest? model, out ArrayPoolList? collection); + + // Handles a single JSON RPC request + if (model is not null) { - yield break; + if (_logger.IsDebug) _logger.Debug($"JSON RPC request {model.Method}"); + + JsonRpcResult.Entry result = await HandleSingleRequest(model, context); + result.Response.AddDisposable(jsonDocument.Dispose); + + return JsonRpcResult.Single(RecordResponse(result)); } - // Checks if the read operation is completed. - if (readResult.IsCompleted) + // Processes a collection of JSON RPC requests + if (collection is not null) { - if (buffer.Length > 0 && HasNonWhitespace(buffer)) + if (_logger.IsDebug) _logger.Debug($"{collection.Count} JSON RPC requests"); + + if (!context.IsAuthenticated && collection.Count > _jsonRpcConfig.MaxBatchSize) { - yield return GetParsingError(startTime, in buffer, "Error during parsing/validation: incomplete request."); + if (_logger.IsWarn) _logger.Warn($"The batch size limit was exceeded. The requested batch size {collection.Count}, and the current config setting is JsonRpc.{nameof(_jsonRpcConfig.MaxBatchSize)} = {_jsonRpcConfig.MaxBatchSize}."); + JsonRpcErrorResponse? errorResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.LimitExceeded, "Batch size limit exceeded"); + errorResponse.AddDisposable(jsonDocument.Dispose); + + collection.Dispose(); + return JsonRpcResult.Single(RecordResponse(errorResponse, RpcReport.Error)); } + JsonRpcBatchResult jsonRpcBatchResult = new((e, c) => IterateRequest(collection, context, e).GetAsyncEnumerator(c)); + jsonRpcBatchResult.AddDisposable(jsonDocument.Dispose); + jsonRpcBatchResult.AddDisposable(collection.Dispose); + return JsonRpcResult.Collection(jsonRpcBatchResult); } - } - finally - { - // Advances the reader to the end of the buffer if not null. - if (!buffer.FirstSpan.IsNull()) + + // Handles invalid requests (neither object nor array) + Metrics.JsonRpcInvalidRequests++; + JsonRpcErrorResponse invalidResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Invalid request"); + invalidResponse.AddDisposable(jsonDocument.Dispose); + + if (_logger.IsTrace) { - reader.AdvanceTo(buffer.End); + TraceResult(invalidResponse); + _logger.Trace($" Failed request handled in {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms"); } + return JsonRpcResult.Single(RecordResponse(invalidResponse, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMicroseconds, false))); + } + catch + { + jsonDocument.Dispose(); + throw; } - - // Completes the PipeReader's asynchronous reading operation. - await reader.CompleteAsync(); - - [MethodImpl(MethodImplOptions.NoInlining)] - void TraceFailure(long startTime) => _logger.Trace($" Failed request handled in {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms"); } - // Handles general exceptions during parsing and validation. - // Sends an error response and stops the stopwatch. private JsonRpcResult GetParsingError(long startTime, ref readonly ReadOnlySequence buffer, string error, Exception? exception = null) { Metrics.JsonRpcRequestDeserializationFailures++; - - if (_logger.IsError) - { - _logger.Error(error, exception); - } + if (_logger.IsError) _logger.Error(error, exception); if (_logger.IsDebug) { @@ -305,35 +340,11 @@ private JsonRpcResult GetParsingError(long startTime, ref readonly ReadOnlySeque } } - JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message"); + JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "parse error"); if (_logger.IsTrace) TraceResult(response); return JsonRpcResult.Single(RecordResponse(response, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMicroseconds, false))); } - private static bool HasNonWhitespace(ReadOnlySequence buffer) - { - static bool HasNonWhitespace(ReadOnlySpan span) - { - static ReadOnlySpan WhiteSpace() => " \n\r\t"u8; - return span.IndexOfAnyExcept(WhiteSpace()) >= 0; - } - - if (buffer.IsSingleSegment) - { - return HasNonWhitespace(buffer.FirstSpan); - } - - foreach (ReadOnlyMemory memory in buffer) - { - if (HasNonWhitespace(memory.Span)) - { - return true; - } - } - - return false; - } - private async IAsyncEnumerable IterateRequest( ArrayPoolList requests, JsonRpcContext context, @@ -399,15 +410,6 @@ static bool HasNonWhitespace(ReadOnlySpan span) return result; } - private static bool TryParseJson(ref ReadOnlySequence buffer, [NotNullWhen(true)] out JsonDocument? jsonDocument) - { - Utf8JsonReader reader = new(buffer); - if (!JsonDocument.TryParseValue(ref reader, out jsonDocument)) return false; - buffer = buffer.Slice(reader.BytesConsumed); - return true; - - } - private bool IsRecordingRequest => (_jsonRpcConfig.RpcRecorderState & RpcRecorderState.Request) != 0; private bool IsRecordingResponse => (_jsonRpcConfig.RpcRecorderState & RpcRecorderState.Response) != 0; diff --git a/src/Nethermind/Nethermind.JsonRpc/RpcReport.cs b/src/Nethermind/Nethermind.JsonRpc/RpcReport.cs index f0d2112de52..483f0aca644 100644 --- a/src/Nethermind/Nethermind.JsonRpc/RpcReport.cs +++ b/src/Nethermind/Nethermind.JsonRpc/RpcReport.cs @@ -3,19 +3,8 @@ namespace Nethermind.JsonRpc { - public readonly struct RpcReport + public readonly record struct RpcReport(string Method, long HandlingTimeMicroseconds, bool Success) { - public static readonly RpcReport Error = new RpcReport("# error #", 0, false); - - public RpcReport(string method, long handlingTimeMicroseconds, bool success) - { - Method = method; - HandlingTimeMicroseconds = handlingTimeMicroseconds; - Success = success; - } - - public string Method { get; } - public long HandlingTimeMicroseconds { get; } - public bool Success { get; } + public static readonly RpcReport Error = new("# error #", 0, false); } } diff --git a/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs b/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs index 2ca43216e00..995fb81479e 100644 --- a/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs +++ b/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs @@ -120,7 +120,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpc // If request is local, don't use response compression, // as it allocates a lot, but doesn't improve much for loopback app.UseWhen(ctx => - !IsLocalhost(ctx.Connection.RemoteIpAddress), + !IsLocalhost(ctx.Connection.RemoteIpAddress!), builder => builder.UseResponseCompression()); if (initConfig.WebSocketsEnabled) @@ -154,7 +154,6 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpc if (logger.IsError) logger.Error("Unable to initialize health checks. Check if you have Nethermind.HealthChecks.dll in your plugins folder.", e); } - IServiceProvider services = app.ApplicationServices; endpoints.MapDataFeeds(lifetime); } }); @@ -192,7 +191,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpc } if (method == "GET" && ctx.Request.Headers.Accept.Count > 0 && - !ctx.Request.Headers.Accept[0].Contains("text/html", StringComparison.Ordinal)) + !ctx.Request.Headers.Accept[0]!.Contains("text/html", StringComparison.Ordinal)) { await ctx.Response.WriteAsync("Nethermind JSON RPC"); } @@ -242,12 +241,10 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpc await jsonSerializer.SerializeAsync(resultWriter, entry.Response); _ = jsonRpcLocalStats.ReportCall(entry.Report); - // We reached the limit and don't want to responded to more request in the batch + // We reached the limit and don't want to respond to more request in the batch if (!jsonRpcContext.IsAuthenticated && resultWriter.WrittenCount > jsonRpcConfig.MaxBatchResponseBodySize) { - if (logger.IsWarn) - logger.Warn( - $"The max batch response body size exceeded. The current response size {resultWriter.WrittenCount}, and the config setting is JsonRpc.{nameof(jsonRpcConfig.MaxBatchResponseBodySize)} = {jsonRpcConfig.MaxBatchResponseBodySize}"); + if (logger.IsWarn) logger.Warn($"The max batch response body size exceeded. The current response size {resultWriter.WrittenCount}, and the config setting is JsonRpc.{nameof(jsonRpcConfig.MaxBatchResponseBodySize)} = {jsonRpcConfig.MaxBatchResponseBodySize}"); enumerator.IsStopped = true; } } @@ -307,7 +304,7 @@ await PushErrorResponse(e.StatusCode, e.StatusCode == StatusCodes.Status413Paylo } finally { - Interlocked.Add(ref Nethermind.JsonRpc.Metrics.JsonRpcBytesReceivedHttp, ctx.Request.ContentLength ?? request.Length); + Interlocked.Add(ref Metrics.JsonRpcBytesReceivedHttp, ctx.Request.ContentLength ?? request.Length); } } Task SerializeTimeoutException(CountingWriter resultStream) @@ -361,51 +358,45 @@ private static bool IsResourceUnavailableError(JsonRpcResponse? response) or JsonRpcErrorResponse { Error.Code: ErrorCodes.LimitExceeded }; } - private sealed class CountingPipeReader : PipeReader + private sealed class CountingPipeReader(PipeReader stream) : PipeReader { - private readonly PipeReader _wrappedReader; private ReadOnlySequence _currentSequence; public long Length { get; private set; } - public CountingPipeReader(PipeReader stream) - { - _wrappedReader = stream; - } - public override void AdvanceTo(SequencePosition consumed) { Length += _currentSequence.GetOffset(consumed); - _wrappedReader.AdvanceTo(consumed); + stream.AdvanceTo(consumed); } public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) { Length += _currentSequence.GetOffset(consumed); - _wrappedReader.AdvanceTo(consumed, examined); + stream.AdvanceTo(consumed, examined); } public override void CancelPendingRead() { - _wrappedReader.CancelPendingRead(); + stream.CancelPendingRead(); } public override void Complete(Exception? exception = null) { Length += _currentSequence.Length; - _wrappedReader.Complete(exception); + stream.Complete(exception); } public override async ValueTask ReadAsync(CancellationToken cancellationToken = default) { - ReadResult result = await _wrappedReader.ReadAsync(cancellationToken); + ReadResult result = await stream.ReadAsync(cancellationToken); _currentSequence = result.Buffer; return result; } public override bool TryRead(out ReadResult result) { - bool didRead = _wrappedReader.TryRead(out result); + bool didRead = stream.TryRead(out result); if (didRead) { _currentSequence = result.Buffer;