Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,33 @@ public static class ReadOnlySequenceExtensions

public static ReadOnlySequence<byte> TrimStart(this ReadOnlySequence<byte> sequence, byte[]? chars = null)
{
SequencePosition position = sequence.Start;
ReadOnlySpan<byte> charsSpan = chars ?? WhitespaceChars;
while (sequence.TryGet(ref position, out ReadOnlyMemory<byte> memory))
SequencePosition start = sequence.Start;

foreach (ReadOnlyMemory<byte> memory in sequence)
{
ReadOnlySpan<byte> span = memory.Span;
int index = span.IndexOfAnyExcept(charsSpan);

if (index != 0)
if (index == -1)
{
// The entire segment is trimmed chars, advance past it
start = sequence.GetPosition(span.Length, start);
}
else if (index > 0)
{
// if index == -1, then the whole span is whitespace
sequence = sequence.Slice(index != -1 ? index : span.Length);
// Found non-trimmed char partway through the segment
start = sequence.GetPosition(index, start);
return sequence.Slice(start);
}
else
{
return sequence;
// First char is non-trimmed, we're done
return sequence.Slice(start);
}
}

return sequence;
// The entire sequence was trimmed chars
return sequence.Slice(sequence.End);
}
}
70 changes: 65 additions & 5 deletions src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.IO.Abstractions;
using System.IO.Pipelines;
using System.Linq;
using System.Text;
using System.Threading;
Expand All @@ -24,7 +25,7 @@ public class JsonRpcProcessorTests(bool returnErrors)
{
private readonly JsonRpcErrorResponse _errorResponse = new();

private JsonRpcProcessor Initialize(JsonRpcConfig? config = null)
private JsonRpcProcessor Initialize(JsonRpcConfig? config = null, RpcRecorderState recorderState = RpcRecorderState.All)
{
IJsonRpcService service = Substitute.For<IJsonRpcService>();
service.SendRequestAsync(Arg.Any<JsonRpcRequest>(), Arg.Any<JsonRpcContext>()).Returns(ci => returnErrors ? new JsonRpcErrorResponse { Id = ci.Arg<JsonRpcRequest>().Id } : new JsonRpcSuccessResponse { Id = ci.Arg<JsonRpcRequest>().Id });
Expand All @@ -33,11 +34,9 @@ private JsonRpcProcessor Initialize(JsonRpcConfig? config = null)

IFileSystem fileSystem = Substitute.For<IFileSystem>();

/* we enable recorder always to have an easy smoke test for recording
* and this is fine because recorder is non-critical component
*/
// we enable recorder always to have an easy smoke test for recording and this is fine because recorder is a non-critical component
config ??= new JsonRpcConfig();
config.RpcRecorderState = RpcRecorderState.All;
config.RpcRecorderState = recorderState;

return new JsonRpcProcessor(service, config, fileSystem, LimboLogs.Instance);
}
Expand Down Expand Up @@ -409,4 +408,65 @@ public void Cannot_accept_null_file_system()
Substitute.For<IJsonRpcConfig>(),
null!, LimboLogs.Instance));
}

[Test]
public async Task Can_process_multiple_large_requests_arriving_in_chunks()
{
Pipe pipe = new();
JsonRpcProcessor processor = Initialize(recorderState: RpcRecorderState.None);
JsonRpcContext context = new(RpcEndpoint.Ws);

// Create 5 large JSON-RPC requests (~10KB each)
List<string> requests = Enumerable.Range(0, 5)
.Select(i => CreateLargeRequest(i, targetSize: 10_000))
.ToList();

string allRequestsJson = string.Join("\n", requests);
byte[] bytes = Encoding.UTF8.GetBytes(allRequestsJson);

// Start processing task (reads from pipe.Reader)
ValueTask<List<JsonRpcResult>> processTask = processor
.ProcessAsync(pipe.Reader, context)
.ToListAsync();

// Write data in 1KB chunks with small delays to simulate network
const int chunkSize = 1024;
for (int i = 0; i < bytes.Length; i += chunkSize)
{
int size = Math.Min(chunkSize, bytes.Length - i);
await pipe.Writer.WriteAsync(new ReadOnlyMemory<byte>(bytes, i, size));
await Task.Yield();
}
await pipe.Writer.CompleteAsync();

// Verify all 5 requests processed
List<JsonRpcResult> results = await processTask;
results.Should().HaveCount(5);
for (int i = 0; i < 5; i++)
{
results[i].Response.Should().NotBeNull();
}
results.DisposeItems();
}

private static string CreateLargeRequest(int id, int targetSize)
{
StringBuilder sb = new();
sb.Append($"{{\"jsonrpc\":\"2.0\",\"id\":{id},\"method\":\"test_method\",\"params\":[");

int currentSize = sb.Length + 2; // account for closing ]}
bool first = true;
int paramIndex = 0;
while (currentSize < targetSize)
{
string param = $"\"param_{paramIndex++}_padding\"";
if (!first) sb.Append(',');
sb.Append(param);
currentSize += param.Length + (first ? 0 : 1);
first = false;
}

sb.Append("]}");
return sb.ToString();
}
}
Loading
Loading