From b452e3786afca2b47071b42dea2d417ceb297d78 Mon Sep 17 00:00:00 2001 From: "lukasz.rozmej" Date: Tue, 13 Jan 2026 11:54:20 +0100 Subject: [PATCH 01/18] Parse incoming Jsons in chunks --- .../JsonRpcProcessorTests.cs | 62 ++++ .../Nethermind.JsonRpc/JsonRpcProcessor.cs | 267 +++++++----------- 2 files changed, 167 insertions(+), 162 deletions(-) diff --git a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs index d95e38b8488..c89a0fe4349 100644 --- a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs +++ b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.IO.Abstractions; +using System.IO.Pipelines; using System.Linq; using System.Text; using System.Threading; @@ -409,4 +410,65 @@ public void Cannot_accept_null_file_system() Substitute.For(), null!, LimboLogs.Instance)); } + + [Test] + public async Task Can_process_multiple_large_requests_arriving_in_chunks() + { + Pipe pipe = new(); + JsonRpcProcessor processor = Initialize(); + JsonRpcContext context = new(RpcEndpoint.Http); + + // Create 5 large JSON-RPC requests (~10KB each) + List requests = Enumerable.Range(0, 5) + .Select(i => CreateLargeRequest(i, targetSize: 10_000)) + .ToList(); + + string allRequestsJson = string.Join("\n", requests); + byte[] bytes = Encoding.UTF8.GetBytes(allRequestsJson); + + // Start processing task (reads from pipe.Reader) + ValueTask> processTask = processor + .ProcessAsync(pipe.Reader, context) + .ToListAsync(); + + // Write data in 1KB chunks with small delays to simulate network + const int chunkSize = 1024; + for (int i = 0; i < bytes.Length; i += chunkSize) + { + int size = Math.Min(chunkSize, bytes.Length - i); + await pipe.Writer.WriteAsync(new ReadOnlyMemory(bytes, i, size)); + await Task.Delay(1); + } + await pipe.Writer.CompleteAsync(); + + // Verify all 5 requests processed + List results = await processTask; + results.Should().HaveCount(5); + for (int i = 0; i < 5; i++) + { + results[i].Response.Should().NotBeNull(); + } + results.DisposeItems(); + } + + private static string CreateLargeRequest(int id, int targetSize) + { + StringBuilder sb = new(); + sb.Append($"{{\"jsonrpc\":\"2.0\",\"id\":{id},\"method\":\"test_method\",\"params\":["); + + int currentSize = sb.Length + 2; // account for closing ]} + bool first = true; + int paramIndex = 0; + while (currentSize < targetSize) + { + string param = $"\"param_{paramIndex++}_padding\""; + if (!first) sb.Append(','); + sb.Append(param); + currentSize += param.Length + (first ? 0 : 1); + first = false; + } + + sb.Append("]}"); + return sb.ToString(); + } } diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index db3c5edb739..9e43f72f104 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -11,7 +11,6 @@ using System.IO.Pipelines; using System.Linq; using System.Runtime.CompilerServices; -using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -123,6 +122,8 @@ private JsonRpcRequest DeserializeObject(JsonElement element) private ArrayPoolList DeserializeArray(JsonElement element) => new(element.GetArrayLength(), element.EnumerateArray().Select(DeserializeObject)); + private static readonly JsonReaderOptions _jsonReaderOptions = new() { AllowMultipleValues = true }; + public async IAsyncEnumerable ProcessAsync(PipeReader reader, JsonRpcContext context) { if (ProcessExit.IsCancellationRequested) @@ -139,199 +140,150 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso long startTime = Stopwatch.GetTimestamp(); using CancellationTokenSource timeoutSource = _jsonRpcConfig.BuildTimeoutCancellationToken(); - // Initializes a buffer to store the data read from the reader. - ReadOnlySequence buffer = default; + JsonReaderState readerState = new(_jsonReaderOptions); + bool shouldExit = false; try { - // Asynchronously reads data from the PipeReader. - ReadResult readResult = await reader.ReadToEndAsync(timeoutSource.Token); + while (!shouldExit) + { + ReadResult readResult = await reader.ReadAsync(timeoutSource.Token); + ReadOnlySequence buffer = readResult.Buffer; + bool isCompleted = readResult.IsCompleted || readResult.IsCanceled; - buffer = readResult.Buffer; - // Placeholder for a result in case of deserialization failure. - JsonRpcResult? deserializationFailureResult = null; + JsonRpcResult? result = null; - // Processes the buffer while it's not empty; before going out to outer loop to get more data. - while (!buffer.IsEmpty) - { - JsonDocument? jsonDocument = null; - JsonRpcRequest? model = null; - ArrayPoolList? collection = null; - try + // Process one JSON document from the buffer + buffer = buffer.TrimStart(); + if (!buffer.IsEmpty) { - // Tries to parse the JSON from the buffer. - if (!TryParseJson(ref buffer, out jsonDocument)) + if (TryParseJson(ref buffer, isCompleted, ref readerState, out JsonDocument? jsonDocument)) { - deserializationFailureResult = GetParsingError(startTime, in buffer, "Error during parsing/validation."); + try + { + result = await ProcessJsonDocument(jsonDocument, context, startTime); + } + catch (BadHttpRequestException e) + { + Metrics.JsonRpcRequestDeserializationFailures++; + if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}"); + shouldExit = true; + } + catch (ConnectionResetException e) + { + if (_logger.IsTrace) _logger.Trace($"Connection reset.{Environment.NewLine}{e}"); + shouldExit = true; + } + catch (JsonException ex) + { + result = GetParsingError(startTime, ex); + shouldExit = true; + } } - else + else if (isCompleted && !buffer.IsEmpty) { - // Deserializes the JSON document into a request object or a collection of requests. - (model, collection) = DeserializeObjectOrArray(jsonDocument); + result = GetParsingError(startTime); + shouldExit = true; } } - catch (BadHttpRequestException e) - { - // Increments failure metric and logs the exception, then stops processing. - Metrics.JsonRpcRequestDeserializationFailures++; - if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}"); - yield break; - } - catch (ConnectionResetException e) - { - // Logs exception, then stop processing. - if (_logger.IsTrace) _logger.Trace($"Connection reset.{Environment.NewLine}{e}"); - yield break; - } - catch (Exception ex) - { - deserializationFailureResult = GetParsingError(startTime, in buffer, "Error during parsing/validation.", ex); - } - - // Checks for deserialization failure and yields the result. - if (deserializationFailureResult.HasValue) - { - yield return deserializationFailureResult.Value; - break; - } - - // Handles a single JSON RPC request. - if (model is not null) - { - if (_logger.IsDebug) _logger.Debug($"JSON RPC request {model.Method}"); - - // Processes the individual request. - JsonRpcResult.Entry result = await HandleSingleRequest(model, context); - result.Response.AddDisposable(() => jsonDocument.Dispose()); - - // Returns the result of the processed request. - yield return JsonRpcResult.Single(RecordResponse(result)); - } - // Processes a collection of JSON RPC requests. - if (collection is not null) - { - if (_logger.IsDebug) _logger.Debug($"{collection.Count} JSON RPC requests"); - - // Checks for authentication and batch size limit. - if (!context.IsAuthenticated && collection.Count > _jsonRpcConfig.MaxBatchSize) - { - if (_logger.IsWarn) _logger.Warn($"The batch size limit was exceeded. The requested batch size {collection.Count}, and the current config setting is JsonRpc.{nameof(_jsonRpcConfig.MaxBatchSize)} = {_jsonRpcConfig.MaxBatchSize}."); - JsonRpcErrorResponse? response = _jsonRpcService.GetErrorResponse(ErrorCodes.LimitExceeded, "Batch size limit exceeded"); - response.AddDisposable(() => jsonDocument.Dispose()); - - deserializationFailureResult = JsonRpcResult.Single(RecordResponse(response, RpcReport.Error)); - collection.Dispose(); - yield return deserializationFailureResult.Value; - break; - } - JsonRpcBatchResult jsonRpcBatchResult = new((e, c) => IterateRequest(collection, context, e).GetAsyncEnumerator(c)); - jsonRpcBatchResult.AddDisposable(() => collection.Dispose()); - yield return JsonRpcResult.Collection(jsonRpcBatchResult); - } + reader.AdvanceTo(buffer.Start, buffer.End); - // Handles invalid requests. - if (model is null && collection is null) + if (result.HasValue) { - Metrics.JsonRpcInvalidRequests++; - JsonRpcErrorResponse errorResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Invalid request"); - errorResponse.AddDisposable(() => jsonDocument.Dispose()); - - if (_logger.IsTrace) - { - TraceResult(errorResponse); - TraceFailure(startTime); - } - deserializationFailureResult = JsonRpcResult.Single(RecordResponse(errorResponse, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMilliseconds, false))); - yield return deserializationFailureResult.Value; - break; + yield return result.Value; } - buffer = buffer.TrimStart(); + shouldExit |= isCompleted && buffer.IsEmpty; } + } + finally + { + await reader.CompleteAsync(); + } + } - // Checks if the deserialization failed - if (deserializationFailureResult.HasValue) + private static bool TryParseJson(ref ReadOnlySequence buffer, bool isFinalBlock, ref JsonReaderState readerState, [NotNullWhen(true)] out JsonDocument? jsonDocument) + { + try + { + Utf8JsonReader jsonReader = new(buffer, isFinalBlock, readerState); + if (!JsonDocument.TryParseValue(ref jsonReader, out jsonDocument)) { - yield break; + // Preserve state for resumption when more data arrives + readerState = jsonReader.CurrentState; + return false; } - // Checks if the read operation is completed. - if (readResult.IsCompleted) - { - if (buffer.Length > 0 && HasNonWhitespace(buffer)) - { - yield return GetParsingError(startTime, in buffer, "Error during parsing/validation: incomplete request."); - } - } + buffer = buffer.Slice(jsonReader.BytesConsumed); + // Reset state for the next document + readerState = new(_jsonReaderOptions); + return true; } - finally + catch (JsonException) { - // Advances the reader to the end of the buffer if not null. - if (!buffer.FirstSpan.IsNull()) - { - reader.AdvanceTo(buffer.End); - } + jsonDocument = null; + // Reset state on error + readerState = new(_jsonReaderOptions); + return false; } - - // Completes the PipeReader's asynchronous reading operation. - await reader.CompleteAsync(); - - [MethodImpl(MethodImplOptions.NoInlining)] - void TraceFailure(long startTime) => _logger.Trace($" Failed request handled in {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms"); } - // Handles general exceptions during parsing and validation. - // Sends an error response and stops the stopwatch. - private JsonRpcResult GetParsingError(long startTime, ref readonly ReadOnlySequence buffer, string error, Exception? exception = null) + private async Task ProcessJsonDocument(JsonDocument jsonDocument, JsonRpcContext context, long startTime) { - Metrics.JsonRpcRequestDeserializationFailures++; + var (model, collection) = DeserializeObjectOrArray(jsonDocument); - if (_logger.IsError) + // Handles a single JSON RPC request + if (model is not null) { - _logger.Error(error, exception); + if (_logger.IsDebug) _logger.Debug($"JSON RPC request {model.Method}"); + + JsonRpcResult.Entry result = await HandleSingleRequest(model, context); + result.Response.AddDisposable(jsonDocument.Dispose); + + return JsonRpcResult.Single(RecordResponse(result)); } - if (_logger.IsDebug) + // Processes a collection of JSON RPC requests + if (collection is not null) { - // Attempt to get and log the request body from the bytes buffer if Debug logging is enabled - const int sliceSize = 1000; - if (Encoding.UTF8.TryGetStringSlice(in buffer, sliceSize, out bool isFullString, out string data)) + if (_logger.IsDebug) _logger.Debug($"{collection.Count} JSON RPC requests"); + + if (!context.IsAuthenticated && collection.Count > _jsonRpcConfig.MaxBatchSize) { - error = isFullString - ? $"{error} Data:\n{data}\n" - : $"{error} Data (first {sliceSize} chars):\n{data[..sliceSize]}\n"; + if (_logger.IsWarn) _logger.Warn($"The batch size limit was exceeded. The requested batch size {collection.Count}, and the current config setting is JsonRpc.{nameof(_jsonRpcConfig.MaxBatchSize)} = {_jsonRpcConfig.MaxBatchSize}."); + JsonRpcErrorResponse? errorResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.LimitExceeded, "Batch size limit exceeded"); + errorResponse.AddDisposable(jsonDocument.Dispose); - _logger.Debug(error); + collection.Dispose(); + return JsonRpcResult.Single(RecordResponse(errorResponse, RpcReport.Error)); } + JsonRpcBatchResult jsonRpcBatchResult = new((e, c) => IterateRequest(collection, context, e).GetAsyncEnumerator(c)); + jsonRpcBatchResult.AddDisposable(jsonDocument.Dispose); + jsonRpcBatchResult.AddDisposable(collection.Dispose); + return JsonRpcResult.Collection(jsonRpcBatchResult); } - JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message"); - if (_logger.IsTrace) TraceResult(response); - return JsonRpcResult.Single(RecordResponse(response, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMicroseconds, false))); - } + // Handles invalid requests (neither object nor array) + Metrics.JsonRpcInvalidRequests++; + JsonRpcErrorResponse invalidResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Invalid request"); + invalidResponse.AddDisposable(jsonDocument.Dispose); - private static bool HasNonWhitespace(ReadOnlySequence buffer) - { - static bool HasNonWhitespace(ReadOnlySpan span) - { - static ReadOnlySpan WhiteSpace() => " \n\r\t"u8; - return span.IndexOfAnyExcept(WhiteSpace()) >= 0; - } - - if (buffer.IsSingleSegment) + if (_logger.IsTrace) { - return HasNonWhitespace(buffer.FirstSpan); + TraceResult(invalidResponse); + _logger.Trace($" Failed request handled in {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms"); } + return JsonRpcResult.Single(RecordResponse(invalidResponse, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMilliseconds, false))); + } - foreach (ReadOnlyMemory memory in buffer) - { - if (HasNonWhitespace(memory.Span)) - { - return true; - } - } + private JsonRpcResult GetParsingError(long startTime, Exception? exception = null) + { + Metrics.JsonRpcRequestDeserializationFailures++; + if (_logger.IsError) _logger.Error("Error during parsing/validation.", exception); - return false; + JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message"); + if (_logger.IsTrace) TraceResult(response); + return JsonRpcResult.Single(RecordResponse(response, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMicroseconds, false))); } private async IAsyncEnumerable IterateRequest( @@ -399,15 +351,6 @@ static bool HasNonWhitespace(ReadOnlySpan span) return result; } - private static bool TryParseJson(ref ReadOnlySequence buffer, [NotNullWhen(true)] out JsonDocument? jsonDocument) - { - Utf8JsonReader reader = new(buffer); - if (!JsonDocument.TryParseValue(ref reader, out jsonDocument)) return false; - buffer = buffer.Slice(reader.BytesConsumed); - return true; - - } - private bool IsRecordingRequest => (_jsonRpcConfig.RpcRecorderState & RpcRecorderState.Request) != 0; private bool IsRecordingResponse => (_jsonRpcConfig.RpcRecorderState & RpcRecorderState.Response) != 0; From 97d1114990e3aa2d7be34abd466cc08433bf776f Mon Sep 17 00:00:00 2001 From: "lukasz.rozmej" Date: Tue, 13 Jan 2026 12:49:17 +0100 Subject: [PATCH 02/18] changes --- .../Nethermind.JsonRpc/JsonRpcProcessor.cs | 94 ++++++++++--------- 1 file changed, 51 insertions(+), 43 deletions(-) diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index 9e43f72f104..d00179d59b3 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -11,6 +11,7 @@ using System.IO.Pipelines; using System.Linq; using System.Runtime.CompilerServices; +using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -152,42 +153,47 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso JsonRpcResult? result = null; - // Process one JSON document from the buffer buffer = buffer.TrimStart(); if (!buffer.IsEmpty) { - if (TryParseJson(ref buffer, isCompleted, ref readerState, out JsonDocument? jsonDocument)) + try { - try + if (TryParseJson(ref buffer, isCompleted, ref readerState, out JsonDocument? jsonDocument)) { result = await ProcessJsonDocument(jsonDocument, context, startTime); } - catch (BadHttpRequestException e) + else if (isCompleted) { - Metrics.JsonRpcRequestDeserializationFailures++; - if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}"); - shouldExit = true; - } - catch (ConnectionResetException e) - { - if (_logger.IsTrace) _logger.Trace($"Connection reset.{Environment.NewLine}{e}"); - shouldExit = true; - } - catch (JsonException ex) - { - result = GetParsingError(startTime, ex); - shouldExit = true; + buffer = buffer.TrimStart(); + if (!buffer.IsEmpty) + { + result = GetParsingError(startTime, in buffer, "Error during parsing/validation: incomplete request."); + shouldExit = true; + } } } - else if (isCompleted && !buffer.IsEmpty) + catch (BadHttpRequestException e) + { + Metrics.JsonRpcRequestDeserializationFailures++; + if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}"); + shouldExit = true; + } + catch (ConnectionResetException e) { - result = GetParsingError(startTime); + if (_logger.IsTrace) _logger.Trace($"Connection reset.{Environment.NewLine}{e}"); shouldExit = true; } + catch (JsonException ex) + { + result = GetParsingError(startTime, in buffer, "Error during parsing/validation.", ex); + shouldExit = true; + } + finally + { + reader.AdvanceTo(buffer.Start, buffer.End); + } } - reader.AdvanceTo(buffer.Start, buffer.End); - if (result.HasValue) { yield return result.Value; @@ -204,33 +210,21 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso private static bool TryParseJson(ref ReadOnlySequence buffer, bool isFinalBlock, ref JsonReaderState readerState, [NotNullWhen(true)] out JsonDocument? jsonDocument) { - try + Utf8JsonReader jsonReader = new(buffer, isFinalBlock, readerState); + if (!JsonDocument.TryParseValue(ref jsonReader, out jsonDocument)) { - Utf8JsonReader jsonReader = new(buffer, isFinalBlock, readerState); - if (!JsonDocument.TryParseValue(ref jsonReader, out jsonDocument)) - { - // Preserve state for resumption when more data arrives - readerState = jsonReader.CurrentState; - return false; - } - - buffer = buffer.Slice(jsonReader.BytesConsumed); - // Reset state for the next document - readerState = new(_jsonReaderOptions); - return true; - } - catch (JsonException) - { - jsonDocument = null; - // Reset state on error - readerState = new(_jsonReaderOptions); + readerState = jsonReader.CurrentState; // Preserve state for resumption when more data arrives return false; } + + buffer = buffer.Slice(jsonReader.BytesConsumed); + readerState = new(_jsonReaderOptions); // Reset state for the next document + return true; } private async Task ProcessJsonDocument(JsonDocument jsonDocument, JsonRpcContext context, long startTime) { - var (model, collection) = DeserializeObjectOrArray(jsonDocument); + (JsonRpcRequest? model, ArrayPoolList? collection) = DeserializeObjectOrArray(jsonDocument); // Handles a single JSON RPC request if (model is not null) @@ -276,10 +270,24 @@ private static bool TryParseJson(ref ReadOnlySequence buffer, bool isFinal return JsonRpcResult.Single(RecordResponse(invalidResponse, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMilliseconds, false))); } - private JsonRpcResult GetParsingError(long startTime, Exception? exception = null) + private JsonRpcResult GetParsingError(long startTime, ref readonly ReadOnlySequence buffer, string error, Exception? exception = null) { Metrics.JsonRpcRequestDeserializationFailures++; - if (_logger.IsError) _logger.Error("Error during parsing/validation.", exception); + if (_logger.IsError) _logger.Error(error, exception); + + if (_logger.IsDebug) + { + // Attempt to get and log the request body from the bytes buffer if Debug logging is enabled + const int sliceSize = 1000; + if (Encoding.UTF8.TryGetStringSlice(in buffer, sliceSize, out bool isFullString, out string data)) + { + error = isFullString + ? $"{error} Data:\n{data}\n" + : $"{error} Data (first {sliceSize} chars):\n{data[..sliceSize]}\n"; + + _logger.Debug(error); + } + } JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message"); if (_logger.IsTrace) TraceResult(response); From b9247b2cb6fb131fe4ee694b60eb48dfe9734dcf Mon Sep 17 00:00:00 2001 From: "lukasz.rozmej" Date: Tue, 13 Jan 2026 20:55:26 +0100 Subject: [PATCH 03/18] better error handling --- .../Nethermind.JsonRpc/JsonRpcProcessor.cs | 72 +++++++++---------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index d00179d59b3..67d7b9602b6 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -149,57 +149,55 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso { ReadResult readResult = await reader.ReadAsync(timeoutSource.Token); ReadOnlySequence buffer = readResult.Buffer; - bool isCompleted = readResult.IsCompleted || readResult.IsCanceled; - JsonRpcResult? result = null; - - buffer = buffer.TrimStart(); - if (!buffer.IsEmpty) + try { - try + bool isCompleted = readResult.IsCompleted || readResult.IsCanceled; + JsonRpcResult? result = null; + buffer = buffer.TrimStart(); + if (!buffer.IsEmpty) { - if (TryParseJson(ref buffer, isCompleted, ref readerState, out JsonDocument? jsonDocument)) - { - result = await ProcessJsonDocument(jsonDocument, context, startTime); - } - else if (isCompleted) + try { - buffer = buffer.TrimStart(); - if (!buffer.IsEmpty) + if (TryParseJson(ref buffer, isCompleted, ref readerState, out JsonDocument? jsonDocument)) + { + result = await ProcessJsonDocument(jsonDocument, context, startTime); + } + else if (isCompleted && !buffer.IsEmpty) { result = GetParsingError(startTime, in buffer, "Error during parsing/validation: incomplete request."); shouldExit = true; } } + catch (BadHttpRequestException e) + { + Metrics.JsonRpcRequestDeserializationFailures++; + if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}"); + shouldExit = true; + } + catch (ConnectionResetException e) + { + if (_logger.IsTrace) _logger.Trace($"Connection reset.{Environment.NewLine}{e}"); + shouldExit = true; + } + catch (JsonException ex) + { + result = GetParsingError(startTime, in buffer, "Error during parsing/validation.", ex); + shouldExit = true; + } } - catch (BadHttpRequestException e) - { - Metrics.JsonRpcRequestDeserializationFailures++; - if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}"); - shouldExit = true; - } - catch (ConnectionResetException e) - { - if (_logger.IsTrace) _logger.Trace($"Connection reset.{Environment.NewLine}{e}"); - shouldExit = true; - } - catch (JsonException ex) - { - result = GetParsingError(startTime, in buffer, "Error during parsing/validation.", ex); - shouldExit = true; - } - finally + + if (result.HasValue) { - reader.AdvanceTo(buffer.Start, buffer.End); + yield return result.Value; } - } - if (result.HasValue) + shouldExit |= isCompleted && buffer.IsEmpty; + } + finally { - yield return result.Value; + reader.AdvanceTo(buffer.Start, buffer.End); } - - shouldExit |= isCompleted && buffer.IsEmpty; } } finally @@ -289,7 +287,7 @@ private JsonRpcResult GetParsingError(long startTime, ref readonly ReadOnlySeque } } - JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message"); + JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "parse error"); if (_logger.IsTrace) TraceResult(response); return JsonRpcResult.Single(RecordResponse(response, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMicroseconds, false))); } From 3ed5ac99b5fb7f964bed5c8b33a41882281615ad Mon Sep 17 00:00:00 2001 From: "lukasz.rozmej" Date: Tue, 13 Jan 2026 21:15:36 +0100 Subject: [PATCH 04/18] move start time --- src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index 67d7b9602b6..607a2be0322 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -138,15 +138,14 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso reader = await RecordRequest(reader); } - long startTime = Stopwatch.GetTimestamp(); using CancellationTokenSource timeoutSource = _jsonRpcConfig.BuildTimeoutCancellationToken(); - JsonReaderState readerState = new(_jsonReaderOptions); bool shouldExit = false; try { while (!shouldExit) { + long startTime = Stopwatch.GetTimestamp(); ReadResult readResult = await reader.ReadAsync(timeoutSource.Token); ReadOnlySequence buffer = readResult.Buffer; From eb47bfe1272eeac8c776ed4085234da3b347d0aa Mon Sep 17 00:00:00 2001 From: "lukasz.rozmej" Date: Tue, 13 Jan 2026 21:29:28 +0100 Subject: [PATCH 05/18] dispose jsondocument on exception --- .../Nethermind.JsonRpc/JsonRpcProcessor.cs | 72 ++++++++++--------- 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index 607a2be0322..ae579e506ed 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -221,50 +221,58 @@ private static bool TryParseJson(ref ReadOnlySequence buffer, bool isFinal private async Task ProcessJsonDocument(JsonDocument jsonDocument, JsonRpcContext context, long startTime) { - (JsonRpcRequest? model, ArrayPoolList? collection) = DeserializeObjectOrArray(jsonDocument); - - // Handles a single JSON RPC request - if (model is not null) + try { - if (_logger.IsDebug) _logger.Debug($"JSON RPC request {model.Method}"); + (JsonRpcRequest? model, ArrayPoolList? collection) = DeserializeObjectOrArray(jsonDocument); - JsonRpcResult.Entry result = await HandleSingleRequest(model, context); - result.Response.AddDisposable(jsonDocument.Dispose); + // Handles a single JSON RPC request + if (model is not null) + { + if (_logger.IsDebug) _logger.Debug($"JSON RPC request {model.Method}"); - return JsonRpcResult.Single(RecordResponse(result)); - } + JsonRpcResult.Entry result = await HandleSingleRequest(model, context); + result.Response.AddDisposable(jsonDocument.Dispose); - // Processes a collection of JSON RPC requests - if (collection is not null) - { - if (_logger.IsDebug) _logger.Debug($"{collection.Count} JSON RPC requests"); + return JsonRpcResult.Single(RecordResponse(result)); + } - if (!context.IsAuthenticated && collection.Count > _jsonRpcConfig.MaxBatchSize) + // Processes a collection of JSON RPC requests + if (collection is not null) { - if (_logger.IsWarn) _logger.Warn($"The batch size limit was exceeded. The requested batch size {collection.Count}, and the current config setting is JsonRpc.{nameof(_jsonRpcConfig.MaxBatchSize)} = {_jsonRpcConfig.MaxBatchSize}."); - JsonRpcErrorResponse? errorResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.LimitExceeded, "Batch size limit exceeded"); - errorResponse.AddDisposable(jsonDocument.Dispose); + if (_logger.IsDebug) _logger.Debug($"{collection.Count} JSON RPC requests"); + + if (!context.IsAuthenticated && collection.Count > _jsonRpcConfig.MaxBatchSize) + { + if (_logger.IsWarn) _logger.Warn($"The batch size limit was exceeded. The requested batch size {collection.Count}, and the current config setting is JsonRpc.{nameof(_jsonRpcConfig.MaxBatchSize)} = {_jsonRpcConfig.MaxBatchSize}."); + JsonRpcErrorResponse? errorResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.LimitExceeded, "Batch size limit exceeded"); + errorResponse.AddDisposable(jsonDocument.Dispose); - collection.Dispose(); - return JsonRpcResult.Single(RecordResponse(errorResponse, RpcReport.Error)); + collection.Dispose(); + return JsonRpcResult.Single(RecordResponse(errorResponse, RpcReport.Error)); + } + JsonRpcBatchResult jsonRpcBatchResult = new((e, c) => IterateRequest(collection, context, e).GetAsyncEnumerator(c)); + jsonRpcBatchResult.AddDisposable(jsonDocument.Dispose); + jsonRpcBatchResult.AddDisposable(collection.Dispose); + return JsonRpcResult.Collection(jsonRpcBatchResult); } - JsonRpcBatchResult jsonRpcBatchResult = new((e, c) => IterateRequest(collection, context, e).GetAsyncEnumerator(c)); - jsonRpcBatchResult.AddDisposable(jsonDocument.Dispose); - jsonRpcBatchResult.AddDisposable(collection.Dispose); - return JsonRpcResult.Collection(jsonRpcBatchResult); - } - // Handles invalid requests (neither object nor array) - Metrics.JsonRpcInvalidRequests++; - JsonRpcErrorResponse invalidResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Invalid request"); - invalidResponse.AddDisposable(jsonDocument.Dispose); + // Handles invalid requests (neither object nor array) + Metrics.JsonRpcInvalidRequests++; + JsonRpcErrorResponse invalidResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Invalid request"); + invalidResponse.AddDisposable(jsonDocument.Dispose); - if (_logger.IsTrace) + if (_logger.IsTrace) + { + TraceResult(invalidResponse); + _logger.Trace($" Failed request handled in {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms"); + } + return JsonRpcResult.Single(RecordResponse(invalidResponse, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMilliseconds, false))); + } + catch { - TraceResult(invalidResponse); - _logger.Trace($" Failed request handled in {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms"); + jsonDocument.Dispose(); + throw; } - return JsonRpcResult.Single(RecordResponse(invalidResponse, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMilliseconds, false))); } private JsonRpcResult GetParsingError(long startTime, ref readonly ReadOnlySequence buffer, string error, Exception? exception = null) From 72b442d6f5da232aa2763022679202001a77f3b1 Mon Sep 17 00:00:00 2001 From: "lukasz.rozmej" Date: Tue, 13 Jan 2026 21:30:33 +0100 Subject: [PATCH 06/18] fix microseconds --- src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index ae579e506ed..a9406e13af3 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -266,7 +266,7 @@ private static bool TryParseJson(ref ReadOnlySequence buffer, bool isFinal TraceResult(invalidResponse); _logger.Trace($" Failed request handled in {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms"); } - return JsonRpcResult.Single(RecordResponse(invalidResponse, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMilliseconds, false))); + return JsonRpcResult.Single(RecordResponse(invalidResponse, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMicroseconds, false))); } catch { From 0994bf6fa713b4c1fa5f0e36822b63785a0dbfdb Mon Sep 17 00:00:00 2001 From: Lukasz Rozmej Date: Wed, 14 Jan 2026 14:54:00 +0100 Subject: [PATCH 07/18] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index a9406e13af3..ae579e506ed 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -266,7 +266,7 @@ private static bool TryParseJson(ref ReadOnlySequence buffer, bool isFinal TraceResult(invalidResponse); _logger.Trace($" Failed request handled in {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms"); } - return JsonRpcResult.Single(RecordResponse(invalidResponse, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMicroseconds, false))); + return JsonRpcResult.Single(RecordResponse(invalidResponse, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMilliseconds, false))); } catch { From 90582da53034a6e84166bb6a1ff2c9dfb580b816 Mon Sep 17 00:00:00 2001 From: "lukasz.rozmej" Date: Mon, 26 Jan 2026 11:25:48 +0100 Subject: [PATCH 08/18] fix copilot review --- .../Nethermind.JsonRpc/JsonRpcProcessor.cs | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index ae579e506ed..7642df076ea 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -148,6 +148,7 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso long startTime = Stopwatch.GetTimestamp(); ReadResult readResult = await reader.ReadAsync(timeoutSource.Token); ReadOnlySequence buffer = readResult.Buffer; + bool advanced = false; try { @@ -167,6 +168,9 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso result = GetParsingError(startTime, in buffer, "Error during parsing/validation: incomplete request."); shouldExit = true; } + + reader.AdvanceTo(buffer.Start, buffer.End); + advanced = true; } catch (BadHttpRequestException e) { @@ -195,7 +199,10 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso } finally { - reader.AdvanceTo(buffer.Start, buffer.End); + if (!advanced) + { + reader.AdvanceTo(buffer.Start, buffer.End); + } } } } @@ -208,15 +215,13 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso private static bool TryParseJson(ref ReadOnlySequence buffer, bool isFinalBlock, ref JsonReaderState readerState, [NotNullWhen(true)] out JsonDocument? jsonDocument) { Utf8JsonReader jsonReader = new(buffer, isFinalBlock, readerState); - if (!JsonDocument.TryParseValue(ref jsonReader, out jsonDocument)) - { - readerState = jsonReader.CurrentState; // Preserve state for resumption when more data arrives - return false; - } - + bool parsed = JsonDocument.TryParseValue(ref jsonReader, out jsonDocument); buffer = buffer.Slice(jsonReader.BytesConsumed); - readerState = new(_jsonReaderOptions); // Reset state for the next document - return true; + readerState = parsed + ? new(_jsonReaderOptions) // Reset state for the next document + : jsonReader.CurrentState; // Preserve state for resumption when more data arrives + + return parsed; } private async Task ProcessJsonDocument(JsonDocument jsonDocument, JsonRpcContext context, long startTime) From 870cd187ba9069fc124e1aaafe163ce4c2d20c26 Mon Sep 17 00:00:00 2001 From: "lukasz.rozmej" Date: Mon, 26 Jan 2026 11:31:38 +0100 Subject: [PATCH 09/18] whitespace --- src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index 7642df076ea..ab582a7f234 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -168,7 +168,7 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso result = GetParsingError(startTime, in buffer, "Error during parsing/validation: incomplete request."); shouldExit = true; } - + reader.AdvanceTo(buffer.Start, buffer.End); advanced = true; } From 9c7e94a03aee78503dde79a579dbcb9236423d92 Mon Sep 17 00:00:00 2001 From: "lukasz.rozmej" Date: Fri, 30 Jan 2026 11:25:15 +0100 Subject: [PATCH 10/18] fix TrimStart --- .../Extensions/ReadOnlySequenceExtensions.cs | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/Nethermind/Nethermind.Core/Extensions/ReadOnlySequenceExtensions.cs b/src/Nethermind/Nethermind.Core/Extensions/ReadOnlySequenceExtensions.cs index f53858d950b..a9cf2c3e9fd 100644 --- a/src/Nethermind/Nethermind.Core/Extensions/ReadOnlySequenceExtensions.cs +++ b/src/Nethermind/Nethermind.Core/Extensions/ReadOnlySequenceExtensions.cs @@ -12,24 +12,34 @@ public static class ReadOnlySequenceExtensions public static ReadOnlySequence TrimStart(this ReadOnlySequence sequence, byte[]? chars = null) { - SequencePosition position = sequence.Start; ReadOnlySpan charsSpan = chars ?? WhitespaceChars; - while (sequence.TryGet(ref position, out ReadOnlyMemory memory)) + SequencePosition start = sequence.Start; + + foreach (ReadOnlyMemory memory in sequence) { ReadOnlySpan span = memory.Span; int index = span.IndexOfAnyExcept(charsSpan); - if (index != 0) + if (index == -1) + { + // The entire segment is trimmed chars, advance past it + start = sequence.GetPosition(span.Length, start); + } + else if (index > 0) { - // if index == -1, then the whole span is whitespace - sequence = sequence.Slice(index != -1 ? index : span.Length); + // Found non-trimmed char partway through the segment + start = sequence.GetPosition(index, start); + return sequence.Slice(start); } else { - return sequence; + // First char is non-trimmed, we're done + return sequence.Slice(start); } } - return sequence; + // The entire sequence was trimmed chars + return sequence.Slice(sequence.End); + } } } From 32fd7b084a6995abf10849c04e949cb459e5e639 Mon Sep 17 00:00:00 2001 From: "lukasz.rozmej" Date: Fri, 30 Jan 2026 11:27:55 +0100 Subject: [PATCH 11/18] Guard ReadAsync --- .../Nethermind.JsonRpc/JsonRpcProcessor.cs | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index ab582a7f234..f393210b33a 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -146,7 +146,23 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso while (!shouldExit) { long startTime = Stopwatch.GetTimestamp(); - ReadResult readResult = await reader.ReadAsync(timeoutSource.Token); + + ReadResult readResult; + try + { + readResult = await reader.ReadAsync(timeoutSource.Token); + } + catch (BadHttpRequestException e) + { + Handle(e); + break; + } + catch (ConnectionResetException e) + { + Handle(e); + break; + } + ReadOnlySequence buffer = readResult.Buffer; bool advanced = false; @@ -174,13 +190,12 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso } catch (BadHttpRequestException e) { - Metrics.JsonRpcRequestDeserializationFailures++; - if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}"); + Handle(e); shouldExit = true; } catch (ConnectionResetException e) { - if (_logger.IsTrace) _logger.Trace($"Connection reset.{Environment.NewLine}{e}"); + Handle(e); shouldExit = true; } catch (JsonException ex) @@ -212,6 +227,17 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso } } + private void Handle(ConnectionResetException e) + { + if (_logger.IsTrace) _logger.Trace($"Connection reset.{Environment.NewLine}{e}"); + } + + private void Handle(BadHttpRequestException e) + { + Metrics.JsonRpcRequestDeserializationFailures++; + if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}"); + } + private static bool TryParseJson(ref ReadOnlySequence buffer, bool isFinalBlock, ref JsonReaderState readerState, [NotNullWhen(true)] out JsonDocument? jsonDocument) { Utf8JsonReader jsonReader = new(buffer, isFinalBlock, readerState); From aa1c6ac00f0adf586b395bd4c4157e3ac812fb65 Mon Sep 17 00:00:00 2001 From: "lukasz.rozmej" Date: Fri, 30 Jan 2026 11:30:14 +0100 Subject: [PATCH 12/18] fix --- .../Nethermind.Core/Extensions/ReadOnlySequenceExtensions.cs | 1 - src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Nethermind/Nethermind.Core/Extensions/ReadOnlySequenceExtensions.cs b/src/Nethermind/Nethermind.Core/Extensions/ReadOnlySequenceExtensions.cs index a9cf2c3e9fd..716dfbbd41e 100644 --- a/src/Nethermind/Nethermind.Core/Extensions/ReadOnlySequenceExtensions.cs +++ b/src/Nethermind/Nethermind.Core/Extensions/ReadOnlySequenceExtensions.cs @@ -41,5 +41,4 @@ public static ReadOnlySequence TrimStart(this ReadOnlySequence seque // The entire sequence was trimmed chars return sequence.Slice(sequence.End); } - } } diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index f393210b33a..96528fb2578 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -162,7 +162,7 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso Handle(e); break; } - + ReadOnlySequence buffer = readResult.Buffer; bool advanced = false; From 86a85375c4e0bc6fd74b70672a500e07a3736ecb Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Fri, 30 Jan 2026 12:21:41 +0100 Subject: [PATCH 13/18] Fix test endpoint to match transport semantics for multiple JSON requests (#10356) * Initial plan * Change test endpoint from Http to Ws for multiple requests test Co-authored-by: LukaszRozmej <12445221+LukaszRozmej@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: LukaszRozmej <12445221+LukaszRozmej@users.noreply.github.com> --- src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs index c89a0fe4349..9e43ec53344 100644 --- a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs +++ b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs @@ -416,7 +416,7 @@ public async Task Can_process_multiple_large_requests_arriving_in_chunks() { Pipe pipe = new(); JsonRpcProcessor processor = Initialize(); - JsonRpcContext context = new(RpcEndpoint.Http); + JsonRpcContext context = new(RpcEndpoint.Ws); // Create 5 large JSON-RPC requests (~10KB each) List requests = Enumerable.Range(0, 5) From 667afe7e540afe8a7714b873eb4daf1632df97ce Mon Sep 17 00:00:00 2001 From: Lukasz Rozmej Date: Fri, 30 Jan 2026 21:38:18 +0100 Subject: [PATCH 14/18] Update src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index 96528fb2578..7523c80f4ec 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -170,7 +170,10 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso { bool isCompleted = readResult.IsCompleted || readResult.IsCanceled; JsonRpcResult? result = null; - buffer = buffer.TrimStart(); + if (readerState.BytesConsumed == 0) + { + buffer = buffer.TrimStart(); + } if (!buffer.IsEmpty) { try From b75404d72ae3100abda5ba986dfdab17aa9919c0 Mon Sep 17 00:00:00 2001 From: "lukasz.rozmej" Date: Fri, 30 Jan 2026 22:00:47 +0100 Subject: [PATCH 15/18] fixes --- .../JsonRpcProcessorTests.cs | 3 +- .../Nethermind.JsonRpc/JsonRpcProcessor.cs | 30 +++++++++++-------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs index 9e43ec53344..49eea1a821f 100644 --- a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs +++ b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs @@ -437,7 +437,8 @@ public async Task Can_process_multiple_large_requests_arriving_in_chunks() { int size = Math.Min(chunkSize, bytes.Length - i); await pipe.Writer.WriteAsync(new ReadOnlyMemory(bytes, i, size)); - await Task.Delay(1); + await pipe.Writer.FlushAsync(); + await Task.Yield(); } await pipe.Writer.CompleteAsync(); diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index 7523c80f4ec..300cd12232b 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -55,18 +55,19 @@ public JsonRpcProcessor(IJsonRpcService jsonRpcService, IJsonRpcConfig jsonRpcCo public CancellationToken ProcessExit => _processExitSource?.Token ?? default; - private (JsonRpcRequest? Model, ArrayPoolList? Collection) DeserializeObjectOrArray(JsonDocument doc) + private void DeserializeObjectOrArray(JsonDocument doc, out JsonRpcRequest? model, out ArrayPoolList? collection) { - return doc.RootElement.ValueKind switch + collection = null; + model = null; + switch (doc.RootElement.ValueKind) { - JsonValueKind.Array => (null, DeserializeArray(doc.RootElement)), - JsonValueKind.Object => (DeserializeObject(doc.RootElement), null), - _ => ThrowInvalid() - }; - - [DoesNotReturn, StackTraceHidden] - static (JsonRpcRequest? Model, ArrayPoolList? Collection) ThrowInvalid() - => throw new JsonException("Invalid"); + case JsonValueKind.Array: + collection = DeserializeArray(doc.RootElement); + break; + case JsonValueKind.Object: + model = DeserializeObject(doc.RootElement); + break; + } } private JsonRpcRequest DeserializeObject(JsonElement element) @@ -140,6 +141,7 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso using CancellationTokenSource timeoutSource = _jsonRpcConfig.BuildTimeoutCancellationToken(); JsonReaderState readerState = new(_jsonReaderOptions); + bool freshState = true; bool shouldExit = false; try { @@ -170,15 +172,17 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso { bool isCompleted = readResult.IsCompleted || readResult.IsCanceled; JsonRpcResult? result = null; - if (readerState.BytesConsumed == 0) + if (freshState) { buffer = buffer.TrimStart(); } + if (!buffer.IsEmpty) { try { - if (TryParseJson(ref buffer, isCompleted, ref readerState, out JsonDocument? jsonDocument)) + freshState = TryParseJson(ref buffer, isCompleted, ref readerState, out JsonDocument? jsonDocument); + if (freshState) { result = await ProcessJsonDocument(jsonDocument, context, startTime); } @@ -257,7 +261,7 @@ private static bool TryParseJson(ref ReadOnlySequence buffer, bool isFinal { try { - (JsonRpcRequest? model, ArrayPoolList? collection) = DeserializeObjectOrArray(jsonDocument); + DeserializeObjectOrArray(jsonDocument, out JsonRpcRequest? model, out ArrayPoolList? collection); // Handles a single JSON RPC request if (model is not null) From dda2677d14c14910a42eed88617c65d05c7943c7 Mon Sep 17 00:00:00 2001 From: "lukasz.rozmej" Date: Fri, 30 Jan 2026 22:17:08 +0100 Subject: [PATCH 16/18] more fixes --- .../JsonRpcProcessorTests.cs | 10 ++++------ .../Nethermind.JsonRpc/JsonRpcProcessor.cs | 2 +- src/Nethermind/Nethermind.JsonRpc/RpcReport.cs | 15 ++------------- 3 files changed, 7 insertions(+), 20 deletions(-) diff --git a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs index 49eea1a821f..bad1db40589 100644 --- a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs +++ b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs @@ -25,7 +25,7 @@ public class JsonRpcProcessorTests(bool returnErrors) { private readonly JsonRpcErrorResponse _errorResponse = new(); - private JsonRpcProcessor Initialize(JsonRpcConfig? config = null) + private JsonRpcProcessor Initialize(JsonRpcConfig? config = null, RpcRecorderState recorderState = RpcRecorderState.All) { IJsonRpcService service = Substitute.For(); service.SendRequestAsync(Arg.Any(), Arg.Any()).Returns(ci => returnErrors ? new JsonRpcErrorResponse { Id = ci.Arg().Id } : new JsonRpcSuccessResponse { Id = ci.Arg().Id }); @@ -34,11 +34,9 @@ private JsonRpcProcessor Initialize(JsonRpcConfig? config = null) IFileSystem fileSystem = Substitute.For(); - /* we enable recorder always to have an easy smoke test for recording - * and this is fine because recorder is non-critical component - */ + // we enable recorder always to have an easy smoke test for recording and this is fine because recorder is a non-critical component config ??= new JsonRpcConfig(); - config.RpcRecorderState = RpcRecorderState.All; + config.RpcRecorderState = recorderState; return new JsonRpcProcessor(service, config, fileSystem, LimboLogs.Instance); } @@ -415,7 +413,7 @@ public void Cannot_accept_null_file_system() public async Task Can_process_multiple_large_requests_arriving_in_chunks() { Pipe pipe = new(); - JsonRpcProcessor processor = Initialize(); + JsonRpcProcessor processor = Initialize(recorderState: RpcRecorderState.None); JsonRpcContext context = new(RpcEndpoint.Ws); // Create 5 large JSON-RPC requests (~10KB each) diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index 300cd12232b..7a1094d6bbf 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -304,7 +304,7 @@ private static bool TryParseJson(ref ReadOnlySequence buffer, bool isFinal TraceResult(invalidResponse); _logger.Trace($" Failed request handled in {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms"); } - return JsonRpcResult.Single(RecordResponse(invalidResponse, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMilliseconds, false))); + return JsonRpcResult.Single(RecordResponse(invalidResponse, new RpcReport("# parsing error #", (long)Stopwatch.GetElapsedTime(startTime).TotalMicroseconds, false))); } catch { diff --git a/src/Nethermind/Nethermind.JsonRpc/RpcReport.cs b/src/Nethermind/Nethermind.JsonRpc/RpcReport.cs index f0d2112de52..483f0aca644 100644 --- a/src/Nethermind/Nethermind.JsonRpc/RpcReport.cs +++ b/src/Nethermind/Nethermind.JsonRpc/RpcReport.cs @@ -3,19 +3,8 @@ namespace Nethermind.JsonRpc { - public readonly struct RpcReport + public readonly record struct RpcReport(string Method, long HandlingTimeMicroseconds, bool Success) { - public static readonly RpcReport Error = new RpcReport("# error #", 0, false); - - public RpcReport(string method, long handlingTimeMicroseconds, bool success) - { - Method = method; - HandlingTimeMicroseconds = handlingTimeMicroseconds; - Success = success; - } - - public string Method { get; } - public long HandlingTimeMicroseconds { get; } - public bool Success { get; } + public static readonly RpcReport Error = new("# error #", 0, false); } } From b882c96dbea728e11601e362340d3960b36ace76 Mon Sep 17 00:00:00 2001 From: Lukasz Rozmej Date: Sat, 31 Jan 2026 01:01:04 +0100 Subject: [PATCH 17/18] Update src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs index bad1db40589..99d9b33ba9c 100644 --- a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs +++ b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs @@ -435,7 +435,6 @@ public async Task Can_process_multiple_large_requests_arriving_in_chunks() { int size = Math.Min(chunkSize, bytes.Length - i); await pipe.Writer.WriteAsync(new ReadOnlyMemory(bytes, i, size)); - await pipe.Writer.FlushAsync(); await Task.Yield(); } await pipe.Writer.CompleteAsync(); From f4c77d5260496b03af4d73e178222f49ebb17362 Mon Sep 17 00:00:00 2001 From: "lukasz.rozmej" Date: Sun, 1 Feb 2026 11:48:03 +0100 Subject: [PATCH 18/18] make http endpoint AllowMultipleValues = false --- .../Nethermind.JsonRpc/JsonRpcProcessor.cs | 22 +++++++++---- .../Nethermind.Runner/JsonRpc/Startup.cs | 33 +++++++------------ 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index 7a1094d6bbf..bd042072fa1 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -21,6 +21,7 @@ using Nethermind.Core.Collections; using Nethermind.Core.Extensions; using Nethermind.Core.Resettables; +using Nethermind.JsonRpc.Modules; using Nethermind.Logging; using Nethermind.Serialization.Json; @@ -52,8 +53,7 @@ public JsonRpcProcessor(IJsonRpcService jsonRpcService, IJsonRpcConfig jsonRpcCo } } - public CancellationToken ProcessExit - => _processExitSource?.Token ?? default; + public CancellationToken ProcessExit => _processExitSource?.Token ?? default; private void DeserializeObjectOrArray(JsonDocument doc, out JsonRpcRequest? model, out ArrayPoolList? collection) { @@ -124,7 +124,7 @@ private JsonRpcRequest DeserializeObject(JsonElement element) private ArrayPoolList DeserializeArray(JsonElement element) => new(element.GetArrayLength(), element.EnumerateArray().Select(DeserializeObject)); - private static readonly JsonReaderOptions _jsonReaderOptions = new() { AllowMultipleValues = true }; + private static readonly JsonReaderOptions _socketJsonReaderOptions = new() { AllowMultipleValues = true }; public async IAsyncEnumerable ProcessAsync(PipeReader reader, JsonRpcContext context) { @@ -140,7 +140,7 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso } using CancellationTokenSource timeoutSource = _jsonRpcConfig.BuildTimeoutCancellationToken(); - JsonReaderState readerState = new(_jsonReaderOptions); + JsonReaderState readerState = CreateJsonReaderState(context); bool freshState = true; bool shouldExit = false; try @@ -181,7 +181,7 @@ public async IAsyncEnumerable ProcessAsync(PipeReader reader, Jso { try { - freshState = TryParseJson(ref buffer, isCompleted, ref readerState, out JsonDocument? jsonDocument); + freshState = TryParseJson(ref buffer, isCompleted, ref readerState, out JsonDocument? jsonDocument, context); if (freshState) { result = await ProcessJsonDocument(jsonDocument, context, startTime); @@ -245,18 +245,26 @@ private void Handle(BadHttpRequestException e) if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}"); } - private static bool TryParseJson(ref ReadOnlySequence buffer, bool isFinalBlock, ref JsonReaderState readerState, [NotNullWhen(true)] out JsonDocument? jsonDocument) + private static bool TryParseJson( + ref ReadOnlySequence buffer, + bool isFinalBlock, + ref JsonReaderState readerState, + [NotNullWhen(true)] out JsonDocument? jsonDocument, + JsonRpcContext context) { Utf8JsonReader jsonReader = new(buffer, isFinalBlock, readerState); bool parsed = JsonDocument.TryParseValue(ref jsonReader, out jsonDocument); buffer = buffer.Slice(jsonReader.BytesConsumed); readerState = parsed - ? new(_jsonReaderOptions) // Reset state for the next document + ? CreateJsonReaderState(context) // Reset state for the next document : jsonReader.CurrentState; // Preserve state for resumption when more data arrives return parsed; } + private static JsonReaderState CreateJsonReaderState(JsonRpcContext context) => + new(context.RpcEndpoint == RpcEndpoint.Http ? default : _socketJsonReaderOptions); + private async Task ProcessJsonDocument(JsonDocument jsonDocument, JsonRpcContext context, long startTime) { try diff --git a/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs b/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs index 2ca43216e00..995fb81479e 100644 --- a/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs +++ b/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs @@ -120,7 +120,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpc // If request is local, don't use response compression, // as it allocates a lot, but doesn't improve much for loopback app.UseWhen(ctx => - !IsLocalhost(ctx.Connection.RemoteIpAddress), + !IsLocalhost(ctx.Connection.RemoteIpAddress!), builder => builder.UseResponseCompression()); if (initConfig.WebSocketsEnabled) @@ -154,7 +154,6 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpc if (logger.IsError) logger.Error("Unable to initialize health checks. Check if you have Nethermind.HealthChecks.dll in your plugins folder.", e); } - IServiceProvider services = app.ApplicationServices; endpoints.MapDataFeeds(lifetime); } }); @@ -192,7 +191,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpc } if (method == "GET" && ctx.Request.Headers.Accept.Count > 0 && - !ctx.Request.Headers.Accept[0].Contains("text/html", StringComparison.Ordinal)) + !ctx.Request.Headers.Accept[0]!.Contains("text/html", StringComparison.Ordinal)) { await ctx.Response.WriteAsync("Nethermind JSON RPC"); } @@ -242,12 +241,10 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpc await jsonSerializer.SerializeAsync(resultWriter, entry.Response); _ = jsonRpcLocalStats.ReportCall(entry.Report); - // We reached the limit and don't want to responded to more request in the batch + // We reached the limit and don't want to respond to more request in the batch if (!jsonRpcContext.IsAuthenticated && resultWriter.WrittenCount > jsonRpcConfig.MaxBatchResponseBodySize) { - if (logger.IsWarn) - logger.Warn( - $"The max batch response body size exceeded. The current response size {resultWriter.WrittenCount}, and the config setting is JsonRpc.{nameof(jsonRpcConfig.MaxBatchResponseBodySize)} = {jsonRpcConfig.MaxBatchResponseBodySize}"); + if (logger.IsWarn) logger.Warn($"The max batch response body size exceeded. The current response size {resultWriter.WrittenCount}, and the config setting is JsonRpc.{nameof(jsonRpcConfig.MaxBatchResponseBodySize)} = {jsonRpcConfig.MaxBatchResponseBodySize}"); enumerator.IsStopped = true; } } @@ -307,7 +304,7 @@ await PushErrorResponse(e.StatusCode, e.StatusCode == StatusCodes.Status413Paylo } finally { - Interlocked.Add(ref Nethermind.JsonRpc.Metrics.JsonRpcBytesReceivedHttp, ctx.Request.ContentLength ?? request.Length); + Interlocked.Add(ref Metrics.JsonRpcBytesReceivedHttp, ctx.Request.ContentLength ?? request.Length); } } Task SerializeTimeoutException(CountingWriter resultStream) @@ -361,51 +358,45 @@ private static bool IsResourceUnavailableError(JsonRpcResponse? response) or JsonRpcErrorResponse { Error.Code: ErrorCodes.LimitExceeded }; } - private sealed class CountingPipeReader : PipeReader + private sealed class CountingPipeReader(PipeReader stream) : PipeReader { - private readonly PipeReader _wrappedReader; private ReadOnlySequence _currentSequence; public long Length { get; private set; } - public CountingPipeReader(PipeReader stream) - { - _wrappedReader = stream; - } - public override void AdvanceTo(SequencePosition consumed) { Length += _currentSequence.GetOffset(consumed); - _wrappedReader.AdvanceTo(consumed); + stream.AdvanceTo(consumed); } public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) { Length += _currentSequence.GetOffset(consumed); - _wrappedReader.AdvanceTo(consumed, examined); + stream.AdvanceTo(consumed, examined); } public override void CancelPendingRead() { - _wrappedReader.CancelPendingRead(); + stream.CancelPendingRead(); } public override void Complete(Exception? exception = null) { Length += _currentSequence.Length; - _wrappedReader.Complete(exception); + stream.Complete(exception); } public override async ValueTask ReadAsync(CancellationToken cancellationToken = default) { - ReadResult result = await _wrappedReader.ReadAsync(cancellationToken); + ReadResult result = await stream.ReadAsync(cancellationToken); _currentSequence = result.Buffer; return result; } public override bool TryRead(out ReadResult result) { - bool didRead = _wrappedReader.TryRead(out result); + bool didRead = stream.TryRead(out result); if (didRead) { _currentSequence = result.Buffer;