Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 150 additions & 125 deletions src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Core.Threading;
using Nethermind.JsonRpc.Exceptions;
using Nethermind.JsonRpc.Modules;
using Nethermind.Logging;
Expand All @@ -26,6 +25,9 @@ namespace Nethermind.JsonRpc;

public class JsonRpcService : IJsonRpcService
{
private readonly static Lock _reparseLock = new();
private static Dictionary<TypeAsKey, bool> _reparseReflectionCache = new();

private readonly ILogger _logger;
private readonly IRpcModuleProvider _rpcModuleProvider;
private readonly HashSet<string> _methodsLoggingFiltering;
Expand Down Expand Up @@ -106,6 +108,75 @@ private Task<JsonRpcResponse> ExecuteRequestAsync(JsonRpcRequest rpcRequest, Jso

private async Task<JsonRpcResponse> ExecuteAsync(JsonRpcRequest request, string methodName, ResolvedMethodInfo method, JsonRpcContext context)
{
const string GetLogsMethodName = "eth_getLogs";

JsonRpcErrorResponse? value = PrepareParameters(request, methodName, method, out object[]? parameters, out bool hasMissing);
if (value is not null)
{
return value;
}

IRpcModule rpcModule = await _rpcModuleProvider.Rent(methodName, method.ReadOnly);
if (rpcModule is IContextAwareRpcModule contextAwareModule)
{
contextAwareModule.Context = context;
}
bool returnImmediately = methodName != GetLogsMethodName;
Action? returnAction = returnImmediately ? null : () => _rpcModuleProvider.Return(methodName, rpcModule);
IResultWrapper? resultWrapper = null;
try
{
// Execute method
object invocationResult = hasMissing ?
method.MethodInfo.Invoke(rpcModule, parameters) :
method.Invoker.Invoke(rpcModule, new Span<object?>(parameters));

switch (invocationResult)
{
case IResultWrapper wrapper:
resultWrapper = wrapper;
break;
case Task task:
await task;
resultWrapper = GetResultProperty(task)?.GetValue(task) as IResultWrapper;
break;
}
}
catch (Exception ex)
{
return HandleInvocationException(ex, methodName, request, returnAction);
}
finally
{
if (returnImmediately)
{
_rpcModuleProvider.Return(methodName, rpcModule);
}
}

if (resultWrapper is null)
{
return HandleMissingResultWrapper(request, methodName, returnAction);
}

Result result = resultWrapper.Result;
return result.ResultType != ResultType.Success
? GetErrorResponse(methodName, resultWrapper.ErrorCode, result.Error, resultWrapper.Data, request.Id, returnAction, resultWrapper.IsTemporary)
: GetSuccessResponse(methodName, resultWrapper.Data, request.Id, returnAction);

[MethodImpl(MethodImplOptions.NoInlining)]
JsonRpcResponse HandleMissingResultWrapper(JsonRpcRequest request, string methodName, Action returnAction)
{
string errorMessage = $"Method {methodName} execution result does not implement IResultWrapper";
if (_logger.IsError) _logger.Error(errorMessage);
return GetErrorResponse(methodName, ErrorCodes.InternalError, errorMessage, null, request.Id, returnAction);
}
}

private JsonRpcErrorResponse? PrepareParameters(JsonRpcRequest request, string methodName, ResolvedMethodInfo method, out object[]? parameters, out bool hasMissing)
{
parameters = null;
hasMissing = false;
JsonElement providedParameters = request.Params;

if (_logger.IsTrace) LogRequest(methodName, providedParameters, method.ExpectedParameters);
Expand Down Expand Up @@ -166,87 +237,48 @@ private async Task<JsonRpcResponse> ExecuteAsync(JsonRpcRequest request, string

missingParamsCount -= explicitNullableParamsCount;

//prepare parameters
object[]? parameters = null;
bool hasMissing = false;
// Prepare parameters
if (method.ExpectedParameters.Length > 0)
{
(parameters, hasMissing) = DeserializeParameters(method.ExpectedParameters, providedParametersLength, providedParameters, missingParamsCount);
if (parameters is null)
try
{
(parameters, hasMissing) = DeserializeParameters(method.ExpectedParameters, providedParametersLength, providedParameters, missingParamsCount);
}
catch (Exception e)
{
if (_logger.IsWarn) _logger.Warn($"Incorrect JSON RPC parameters when calling {methodName} with params [{string.Join(", ", providedParameters)}]");
if (_logger.IsWarn) _logger.Warn($"Incorrect JSON RPC parameters when calling {methodName} with params [{string.Join(", ", providedParameters)}] {e}");
return GetErrorResponse(methodName, ErrorCodes.InvalidParams, "Invalid params", null, request.Id);
}
}

//execute method
IResultWrapper resultWrapper = null;
IRpcModule rpcModule = await _rpcModuleProvider.Rent(methodName, method.ReadOnly);
if (rpcModule is IContextAwareRpcModule contextAwareModule)
{
contextAwareModule.Context = context;
}
bool returnImmediately = methodName != "eth_getLogs";
Action? returnAction = returnImmediately ? null : () => _rpcModuleProvider.Return(methodName, rpcModule);
try
{
object invocationResult = hasMissing ?
method.MethodInfo.Invoke(rpcModule, parameters) :
method.Invoker.Invoke(rpcModule, new Span<object?>(parameters));
return null;
}

switch (invocationResult)
{
case IResultWrapper wrapper:
resultWrapper = wrapper;
break;
case Task task:
await task;
resultWrapper = GetResultProperty(task)?.GetValue(task) as IResultWrapper;
break;
}
}
catch (Exception e) when (e is TargetParameterCountException || e is ArgumentException)
{
return GetErrorResponse(methodName, ErrorCodes.InvalidParams, e.Message, e.ToString(), request.Id, returnAction);
}
catch (TargetInvocationException e) when (e.InnerException is JsonException)
{
return GetErrorResponse(methodName, ErrorCodes.InvalidParams, "Invalid params", e.InnerException?.ToString(), request.Id, returnAction);
}
catch (Exception e) when (e is OperationCanceledException || e.InnerException is OperationCanceledException)
{
string errorMessage = $"{methodName} request was canceled due to enabled timeout.";
return GetErrorResponse(methodName, ErrorCodes.Timeout, errorMessage, null, request.Id, returnAction);
}
catch (Exception e) when (e.InnerException is InsufficientBalanceException)
private JsonRpcErrorResponse HandleInvocationException(Exception ex, string methodName, JsonRpcRequest request, Action? returnAction)
{
return ex switch
{
return GetErrorResponse(methodName, ErrorCodes.InvalidInput, e.InnerException.Message, e.ToString(), request.Id, returnAction);
}
catch (Exception ex)
TargetParameterCountException or ArgumentException =>
GetErrorResponse(methodName, ErrorCodes.InvalidParams, ex.Message, ex.ToString(), request.Id, returnAction),

TargetInvocationException and { InnerException: JsonException } =>
GetErrorResponse(methodName, ErrorCodes.InvalidParams, "Invalid params", ex.InnerException?.ToString(), request.Id, returnAction),

OperationCanceledException or { InnerException: OperationCanceledException } =>
GetErrorResponse(methodName, ErrorCodes.Timeout,
$"{methodName} request was canceled due to enabled timeout.", null, request.Id, returnAction),

{ InnerException: InsufficientBalanceException } =>
GetErrorResponse(methodName, ErrorCodes.InvalidInput, ex.InnerException.Message, ex.ToString(), request.Id, returnAction),

_ => HandleException(ex, methodName, request, returnAction)
};

JsonRpcErrorResponse HandleException(Exception ex, string methodName, JsonRpcRequest request, Action? returnAction)
{
if (_logger.IsError) _logger.Error($"Error during method execution, request: {request}", ex);
return GetErrorResponse(methodName, ErrorCodes.InternalError, "Internal error", ex.ToString(), request.Id, returnAction);
}
finally
{
if (returnImmediately)
{
_rpcModuleProvider.Return(methodName, rpcModule);
}
}

if (resultWrapper is null)
{
string errorMessage = $"Method {methodName} execution result does not implement IResultWrapper";
if (_logger.IsError) _logger.Error(errorMessage);
return GetErrorResponse(methodName, ErrorCodes.InternalError, errorMessage, null, request.Id, returnAction);
}

Result? result = resultWrapper.Result;

return result.ResultType != ResultType.Success
? GetErrorResponse(methodName, resultWrapper.ErrorCode, result.Error, resultWrapper.Data, request.Id, returnAction, resultWrapper.IsTemporary)
: GetSuccessResponse(methodName, resultWrapper.Data, request.Id, returnAction);
}

private PropertyInfo? GetResultProperty(Task task)
Expand Down Expand Up @@ -363,8 +395,12 @@ private void LogRequest(string methodName, JsonElement providedParameters, Expec
{
if (providedParameter.ValueKind == JsonValueKind.String)
{
JsonConverter converter = EthereumJsonSerializer.JsonOptions.GetConverter(paramType);
executionParam = converter.GetType().Namespace.StartsWith("System.", StringComparison.Ordinal)
if (!_reparseReflectionCache.TryGetValue(paramType, out bool reparseString))
{
reparseString = CreateNewCacheEntry(paramType);
}

executionParam = reparseString
? JsonSerializer.Deserialize(providedParameter.GetString(), paramType, EthereumJsonSerializer.JsonOptions)
: providedParameter.Deserialize(paramType, EthereumJsonSerializer.JsonOptions);
}
Expand All @@ -377,72 +413,61 @@ private void LogRequest(string methodName, JsonElement providedParameters, Expec
return executionParam;
}

private (object[]? parameters, bool hasMissing) DeserializeParameters(
ExpectedParameter[] expectedParameters,
int providedParametersLength,
JsonElement providedParameters,
int missingParamsCount)
private static bool CreateNewCacheEntry(Type paramType)
{
const int parallelThreshold = 4;
try
lock (_reparseLock)
{
bool hasMissing = false;
int totalLength = providedParametersLength + missingParamsCount;

if (totalLength == 0) return (Array.Empty<object>(), false);
// Re-check inside the lock in case another thread already added it
if (_reparseReflectionCache.TryGetValue(paramType, out bool reparseString))
{
return reparseString;
}

object[] executionParameters = new object[totalLength];
JsonConverter converter = EthereumJsonSerializer.JsonOptions.GetConverter(paramType);
reparseString = converter.GetType().Namespace.StartsWith("System.", StringComparison.Ordinal);

if (providedParametersLength <= parallelThreshold)
// Copy-on-write: create a new dictionary so we don't mutate
Dictionary<TypeAsKey, bool> reparseReflectionCache = new Dictionary<TypeAsKey, bool>(_reparseReflectionCache)
{
for (int i = 0; i < providedParametersLength; i++)
{
JsonElement providedParameter = providedParameters[i];
ExpectedParameter expectedParameter = expectedParameters[i];
[paramType] = reparseString
};

object? parameter = DeserializeParameter(providedParameter, expectedParameter);
executionParameters[i] = parameter;
if (!hasMissing && ReferenceEquals(parameter, Type.Missing))
{
hasMissing = true;
}
}
}
else if (providedParametersLength > parallelThreshold)
{
ParallelUnbalancedWork.For(
0,
providedParametersLength,
ParallelUnbalancedWork.DefaultOptions,
(providedParameters, expectedParameters, executionParameters, hasMissing),
static (i, state) =>
{
JsonElement providedParameter = state.providedParameters[i];
ExpectedParameter expectedParameter = state.expectedParameters[i];
// Publish the new cache instance atomically by swapping the reference.
_reparseReflectionCache = reparseReflectionCache;
return reparseString;
}
}

object? parameter = DeserializeParameter(providedParameter, expectedParameter);
state.executionParameters[i] = parameter;
if (!state.hasMissing && ReferenceEquals(parameter, Type.Missing))
{
state.hasMissing = true;
}
private static (object[]? parameters, bool hasMissing) DeserializeParameters(
ExpectedParameter[] expectedParameters,
int providedParametersLength,
JsonElement providedParameters,
int missingParamsCount)
{
int totalLength = providedParametersLength + missingParamsCount;
if (totalLength == 0) return (Array.Empty<object>(), false);

return state;
});
}
object[] executionParameters = new object[totalLength];

for (int i = providedParametersLength; i < totalLength; i++)
{
executionParameters[i] = Type.Missing;
}
hasMissing |= providedParametersLength < totalLength;
return (executionParameters, hasMissing);
bool hasMissing = missingParamsCount != 0;
int i = 0;
var enumerator = providedParameters.EnumerateArray();
while (enumerator.MoveNext())
{
ExpectedParameter expectedParameter = expectedParameters[i];

object? parameter = DeserializeParameter(enumerator.Current, expectedParameter);
executionParameters[i] = parameter;
hasMissing |= ReferenceEquals(parameter, Type.Missing);
i++;
}
catch (Exception e)

for (i = providedParametersLength; i < totalLength; i++)
{
if (_logger.IsWarn) _logger.Warn("Error while parsing JSON RPC request parameters " + e);
return (null, false);
executionParameters[i] = Type.Missing;
}

return (executionParameters, hasMissing);
}

private static JsonRpcResponse GetSuccessResponse(string methodName, object result, object id, Action? disposableAction)
Expand Down