Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Core.Extensions;
using Nethermind.Config;
using Nethermind.Logging;
using Nethermind.JsonRpc.Modules;
using NSubstitute;
Expand Down Expand Up @@ -401,6 +402,65 @@ public async Task Can_handle_null_request()
result.DisposeItems();
}

[Test]
public async Task Should_stop_processing_when_shutdown_requested()
{
IJsonRpcService service = Substitute.For<IJsonRpcService>();
service.GetErrorResponse(Arg.Any<int>(), Arg.Any<string>())
.Returns(new JsonRpcErrorResponse { Error = new Error { Code = ErrorCodes.ResourceUnavailable, Message = "Shutting down" } });

IProcessExitSource processExitSource = Substitute.For<IProcessExitSource>();
processExitSource.Token.Returns(new CancellationToken(canceled: true));

JsonRpcProcessor processor = new(
service,
new JsonRpcConfig(),
Substitute.For<IFileSystem>(),
LimboLogs.Instance,
processExitSource);

string request = "{\"id\":67,\"jsonrpc\":\"2.0\",\"method\":\"eth_getTransactionCount\",\"params\":[\"0x7f01d9b227593e033bf8d6fc86e634d27aa85568\",\"0x668c24\"]}";
List<JsonRpcResult> results = await processor.ProcessAsync(request, new JsonRpcContext(RpcEndpoint.Http)).ToListAsync();

results.Should().HaveCount(1);
results[0].Response.Should().BeOfType<JsonRpcErrorResponse>();
((JsonRpcErrorResponse)results[0].Response!).Error!.Code.Should().Be(ErrorCodes.ResourceUnavailable);
await service.DidNotReceive().SendRequestAsync(Arg.Any<JsonRpcRequest>(), Arg.Any<JsonRpcContext>());
results.DisposeItems();
}

[Test]
public async Task Should_complete_pipe_reader_when_shutdown_requested()
{
IJsonRpcService service = Substitute.For<IJsonRpcService>();
service.GetErrorResponse(Arg.Any<int>(), Arg.Any<string>())
.Returns(new JsonRpcErrorResponse { Error = new Error { Code = ErrorCodes.ResourceUnavailable, Message = "Shutting down" } });

IProcessExitSource processExitSource = Substitute.For<IProcessExitSource>();
processExitSource.Token.Returns(new CancellationToken(canceled: true));

JsonRpcProcessor processor = new(
service,
new JsonRpcConfig(),
Substitute.For<IFileSystem>(),
LimboLogs.Instance,
processExitSource);

Pipe pipe = new();
await pipe.Writer.WriteAsync(Encoding.UTF8.GetBytes("{\"id\":1,\"jsonrpc\":\"2.0\",\"method\":\"eth_blockNumber\",\"params\":[]}"));

List<JsonRpcResult> results = await processor.ProcessAsync(pipe.Reader, new JsonRpcContext(RpcEndpoint.Http)).ToListAsync();

results.Should().HaveCount(1);
results[0].Response.Should().BeOfType<JsonRpcErrorResponse>();

// Verify PipeReader was completed by the processor (reading again should throw)
await FluentActions.Invoking(async () => await pipe.Reader.ReadAsync())
.Should().ThrowAsync<InvalidOperationException>();

results.DisposeItems();
}

[Test]
public void Cannot_accept_null_file_system()
{
Expand Down
30 changes: 16 additions & 14 deletions src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,23 +128,25 @@ private ArrayPoolList<JsonRpcRequest> DeserializeArray(JsonElement element) =>

public async IAsyncEnumerable<JsonRpcResult> ProcessAsync(PipeReader reader, JsonRpcContext context)
{
if (ProcessExit.IsCancellationRequested)
try
{
JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ResourceUnavailable, "Shutting down");
yield return JsonRpcResult.Single(RecordResponse(response, new RpcReport("Shutdown", 0, false)));
}
if (ProcessExit.IsCancellationRequested)
{
JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ResourceUnavailable, "Shutting down");
yield return JsonRpcResult.Single(RecordResponse(response, new RpcReport("Shutdown", 0, false)));
yield break;
}

if (IsRecordingRequest)
{
reader = await RecordRequest(reader);
}
if (IsRecordingRequest)
{
reader = await RecordRequest(reader);
}

using CancellationTokenSource timeoutSource = _jsonRpcConfig.BuildTimeoutCancellationToken();
JsonReaderState readerState = CreateJsonReaderState(context);
bool freshState = true;
bool shouldExit = false;

using CancellationTokenSource timeoutSource = _jsonRpcConfig.BuildTimeoutCancellationToken();
JsonReaderState readerState = CreateJsonReaderState(context);
bool freshState = true;
bool shouldExit = false;
try
{
while (!shouldExit)
{
long startTime = Stopwatch.GetTimestamp();
Expand Down