Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/node/builder/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,7 @@ impl<'a, N: FullNodeComponents<Types: NodeTypes<ChainSpec: Hardforks + EthereumH
.proof_permits(self.config.proof_permits)
.gas_oracle_config(self.config.gas_oracle)
.max_batch_size(self.config.max_batch_size)
.max_blocking_io_requests(self.config.max_blocking_io_requests)
.pending_block_kind(self.config.pending_block_kind)
.raw_tx_forwarder(self.config.raw_tx_forwarder)
.evm_memory_limit(self.config.rpc_evm_memory_limit)
Expand Down
12 changes: 12 additions & 0 deletions crates/node/core/src/args/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub(crate) const RPC_DEFAULT_MAX_REQUEST_SIZE_MB: u32 = 15;
pub(crate) const RPC_DEFAULT_MAX_RESPONSE_SIZE_MB: u32 = 160;

/// Default number of incoming connections.
///
/// This restricts how many active connections (http, ws) the server accepts.
/// Once exceeded, the server can reject new connections.
pub(crate) const RPC_DEFAULT_MAX_CONNECTIONS: u32 = 500;

/// Parameters for configuring the rpc more granularity via CLI
Expand Down Expand Up @@ -166,6 +169,14 @@ pub struct RpcServerArgs {
#[arg(long = "rpc.max-tracing-requests", alias = "rpc-max-tracing-requests", value_name = "COUNT", default_value_t = constants::default_max_tracing_requests())]
pub rpc_max_tracing_requests: usize,

/// Maximum number of concurrent blocking IO requests.
///
/// Blocking IO requests include `eth_call`, `eth_estimateGas`, and similar methods that
/// require EVM execution. These are spawned as blocking tasks to avoid blocking the async
/// runtime.
#[arg(long = "rpc.max-blocking-io-requests", alias = "rpc-max-blocking-io-requests", value_name = "COUNT", default_value_t = constants::DEFAULT_MAX_BLOCKING_IO_REQUEST)]
pub rpc_max_blocking_io_requests: usize,

/// Maximum number of blocks for `trace_filter` requests.
#[arg(long = "rpc.max-trace-filter-blocks", alias = "rpc-max-trace-filter-blocks", value_name = "COUNT", default_value_t = constants::DEFAULT_MAX_TRACE_FILTER_BLOCKS)]
pub rpc_max_trace_filter_blocks: u64,
Expand Down Expand Up @@ -414,6 +425,7 @@ impl Default for RpcServerArgs {
rpc_max_subscriptions_per_connection: RPC_DEFAULT_MAX_SUBS_PER_CONN.into(),
rpc_max_connections: RPC_DEFAULT_MAX_CONNECTIONS.into(),
rpc_max_tracing_requests: constants::default_max_tracing_requests(),
rpc_max_blocking_io_requests: constants::DEFAULT_MAX_BLOCKING_IO_REQUEST,
rpc_max_trace_filter_blocks: constants::DEFAULT_MAX_TRACE_FILTER_BLOCKS,
rpc_max_blocks_per_filter: constants::DEFAULT_MAX_BLOCKS_PER_FILTER.into(),
rpc_max_logs_per_response: (constants::DEFAULT_MAX_LOGS_PER_RESPONSE as u64).into(),
Expand Down
5 changes: 5 additions & 0 deletions crates/optimism/rpc/src/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ where
fn tracing_task_guard(&self) -> &BlockingTaskGuard {
self.inner.eth_api.blocking_task_guard()
}

#[inline]
fn blocking_io_task_guard(&self) -> &Arc<tokio::sync::Semaphore> {
self.inner.eth_api.blocking_io_request_semaphore()
}
}

impl<N, Rpc> LoadFee for OpEthApi<N, Rpc>
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/rpc-builder/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl RethRpcServerConfig for RpcServerArgs {
fn eth_config(&self) -> EthConfig {
EthConfig::default()
.max_tracing_requests(self.rpc_max_tracing_requests)
.max_blocking_io_requests(self.rpc_max_blocking_io_requests)
.max_trace_filter_blocks(self.rpc_max_trace_filter_blocks)
.max_blocks_per_filter(self.rpc_max_blocks_per_filter.unwrap_or_max())
.max_logs_per_response(self.rpc_max_logs_per_response.unwrap_or_max() as usize)
Expand Down
121 changes: 116 additions & 5 deletions crates/rpc/rpc-eth-api/src/helpers/blocking_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,151 @@ use reth_tasks::{
pool::{BlockingTaskGuard, BlockingTaskPool},
TaskSpawner,
};
use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit};
use std::sync::Arc;
use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore};

use crate::EthApiTypes;

/// Executes code on a blocking thread.
/// Helpers for spawning blocking operations.
///
/// Operations can be blocking because they require lots of CPU work and/or IO.
///
/// This differentiates between workloads that are primarily CPU bound and heavier in general (such
/// as tracing tasks) and tasks that have a more balanced profile (io and cpu), such as `eth_call`
/// and alike.
///
/// This provides access to semaphores that permit how many of those are permitted concurrently.
/// It's expected that tracing related tasks are configured with a lower threshold, because not only
/// are they CPU heavy but they can also accumulate more memory for the traces.
pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static {
/// Returns a handle for spawning IO heavy blocking tasks.
///
/// Runtime access in default trait method implementations.
fn io_task_spawner(&self) -> impl TaskSpawner;

/// Returns a handle for spawning CPU heavy blocking tasks.
/// Returns a handle for spawning __CPU heavy__ blocking tasks, such as tracing requests.
///
/// Thread pool access in default trait method implementations.
fn tracing_task_pool(&self) -> &BlockingTaskPool;

/// Returns handle to semaphore for pool of CPU heavy blocking tasks.
fn tracing_task_guard(&self) -> &BlockingTaskGuard;

/// Returns handle to semaphore for blocking IO tasks.
///
/// This semaphore is used to limit concurrent blocking IO operations like `eth_call`,
/// `eth_estimateGas`, and similar methods that require EVM execution.
fn blocking_io_task_guard(&self) -> &Arc<Semaphore>;

/// Acquires a permit from the tracing task semaphore.
///
/// This should be used for __CPU heavy__ operations like `debug_traceTransaction`,
/// `debug_traceCall`, and similar tracing methods. These tasks are typically:
/// - Primarily CPU bound with intensive computation
/// - Can accumulate significant memory for trace results
/// - Expected to have lower concurrency limits than general blocking IO tasks
///
/// For blocking IO tasks like `eth_call` or `eth_estimateGas`, use
/// [`acquire_owned_blocking_io`](Self::acquire_owned_blocking_io) instead.
///
/// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`).
fn acquire_owned(
fn acquire_owned_tracing(
&self,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.tracing_task_guard().clone().acquire_owned()
}

/// Acquires multiple permits from the tracing task semaphore.
///
/// This should be used for particularly heavy tracing operations that require more resources
/// than a standard trace. The permit count should reflect the expected resource consumption
/// relative to a standard tracing operation.
///
/// Like [`acquire_owned_tracing`](Self::acquire_owned_tracing), this is specifically for
/// CPU-intensive tracing tasks, not general blocking IO operations.
///
/// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
fn acquire_many_owned(
fn acquire_many_owned_tracing(
&self,
n: u32,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.tracing_task_guard().clone().acquire_many_owned(n)
}

/// Acquires a permit from the blocking IO request semaphore.
///
/// This should be used for operations like `eth_call`, `eth_estimateGas`, and similar methods
/// that require EVM execution and are spawned as blocking tasks.
///
/// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`).
fn acquire_owned_blocking_io(
&self,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.blocking_io_task_guard().clone().acquire_owned()
}

/// Acquires multiple permits from the blocking IO request semaphore.
///
/// This should be used for operations that may require more resources than a single permit
/// allows.
///
/// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
fn acquire_many_owned_blocking_io(
&self,
n: u32,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.blocking_io_task_guard().clone().acquire_many_owned(n)
}

/// Acquires permits from the blocking IO request semaphore based on a calculated weight.
///
/// The weight determines the maximum number of concurrent requests of this type that can run.
/// For example, if the semaphore has 256 total permits and `weight=10`, then at most 10
/// concurrent requests of this type are allowed.
///
/// The permits acquired per request is calculated as `total_permits / weight`, with an
/// adjustment: if this result is even, we add 1 to ensure that `weight - 1` permits are
/// always available for other tasks, preventing complete semaphore exhaustion.
///
/// This should be used to explicitly limit concurrent requests based on their expected
/// resource consumption:
///
/// - **Block range queries**: Higher weight for larger ranges (fewer concurrent requests)
/// - **Complex calls**: Higher weight for expensive operations
/// - **Batch operations**: Higher weight for larger batches
/// - **Historical queries**: Higher weight for deeper history lookups
///
/// # Examples
///
/// ```ignore
/// // For a heavy request, use higher weight to limit concurrency
/// let weight = 20; // Allow at most 20 concurrent requests of this type
/// let _permit = self.acquire_weighted_blocking_io(weight).await?;
/// ```
///
/// This helps prevent resource exhaustion from concurrent expensive operations while allowing
/// many cheap operations to run in parallel.
///
/// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
fn acquire_weighted_blocking_io(
&self,
weight: u32,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
let guard = self.blocking_io_task_guard();
let total_permits = guard.available_permits().max(1) as u32;
let weight = weight.max(1);
let mut permits_to_acquire = (total_permits / weight).max(1);

// If total_permits divides evenly by weight, add 1 to ensure that when `weight`
// concurrent requests are running, at least `weight - 1` permits remain available
// for other tasks
if total_permits.is_multiple_of(weight) {
permits_to_acquire += 1;
}

guard.clone().acquire_many_owned(permits_to_acquire)
}

/// Executes the future on a new blocking task.
///
/// Note: This is expected for futures that are dominated by blocking IO operations, for tracing
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/rpc-eth-api/src/helpers/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
overrides: EvmOverrides,
) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
async move {
let _permit = self.acquire_owned_blocking_io().await;
let res =
self.transact_call_at(request, block_number.unwrap_or_default(), overrides).await?;

Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc-eth-api/src/helpers/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub trait EthState: LoadState + SpawnBlocking {
{
Ok(async move {
let _permit = self
.acquire_owned()
.acquire_owned_tracing()
.await
.map_err(RethError::other)
.map_err(EthApiError::Internal)?;
Expand Down
23 changes: 20 additions & 3 deletions crates/rpc/rpc-eth-types/src/builder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use crate::{
};
use reqwest::Url;
use reth_rpc_server_types::constants::{
default_max_tracing_requests, DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_BLOCKS_PER_FILTER,
DEFAULT_MAX_LOGS_PER_RESPONSE, DEFAULT_MAX_SIMULATE_BLOCKS, DEFAULT_MAX_TRACE_FILTER_BLOCKS,
DEFAULT_PROOF_PERMITS, RPC_DEFAULT_SEND_RAW_TX_SYNC_TIMEOUT_SECS,
default_max_tracing_requests, DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_BLOCKING_IO_REQUEST,
DEFAULT_MAX_BLOCKS_PER_FILTER, DEFAULT_MAX_LOGS_PER_RESPONSE, DEFAULT_MAX_SIMULATE_BLOCKS,
DEFAULT_MAX_TRACE_FILTER_BLOCKS, DEFAULT_PROOF_PERMITS,
RPC_DEFAULT_SEND_RAW_TX_SYNC_TIMEOUT_SECS,
};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -68,6 +69,15 @@ pub struct EthConfig {
pub eth_proof_window: u64,
/// The maximum number of tracing calls that can be executed in concurrently.
pub max_tracing_requests: usize,
/// The maximum number of blocking IO calls that can be executed in concurrently.
///
/// Requests such as `eth_call`, `eth_estimateGas` and alike require evm execution, which is
/// considered blocking since it's usually more heavy on the IO side but also CPU constrained.
/// It is expected that these are spawned as short lived blocking tokio tasks. This config
/// determines how many can be spawned concurrently, to avoid a build up in the tokio's
/// blocking pool queue since there's only a limited number of threads available. This setting
/// restricts how many tasks are spawned concurrently.
pub max_blocking_io_requests: usize,
/// Maximum number of blocks for `trace_filter` requests.
pub max_trace_filter_blocks: u64,
/// Maximum number of blocks that could be scanned per filter request in `eth_getLogs` calls.
Expand Down Expand Up @@ -116,6 +126,7 @@ impl Default for EthConfig {
gas_oracle: GasPriceOracleConfig::default(),
eth_proof_window: DEFAULT_ETH_PROOF_WINDOW,
max_tracing_requests: default_max_tracing_requests(),
max_blocking_io_requests: DEFAULT_MAX_BLOCKING_IO_REQUEST,
max_trace_filter_blocks: DEFAULT_MAX_TRACE_FILTER_BLOCKS,
max_blocks_per_filter: DEFAULT_MAX_BLOCKS_PER_FILTER,
max_logs_per_response: DEFAULT_MAX_LOGS_PER_RESPONSE,
Expand Down Expand Up @@ -152,6 +163,12 @@ impl EthConfig {
self
}

/// Configures the maximum number of blocking IO requests
pub const fn max_blocking_io_requests(mut self, max_requests: usize) -> Self {
self.max_blocking_io_requests = max_requests;
self
}

/// Configures the maximum block length to scan per `eth_getLogs` request
pub const fn max_blocks_per_filter(mut self, max_blocks: u64) -> Self {
self.max_blocks_per_filter = max_blocks;
Expand Down
14 changes: 14 additions & 0 deletions crates/rpc/rpc-server-types/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ pub const DEFAULT_MAX_LOGS_PER_RESPONSE: usize = 20_000;
/// The default maximum number of blocks for `trace_filter` requests.
pub const DEFAULT_MAX_TRACE_FILTER_BLOCKS: u64 = 100;

/// Setting for how many concurrent (heavier) _blocking_ IO requests are allowed.
///
/// What is considered a blocking IO request can depend on the RPC method. In general anything that
/// requires IO is considered blocking and should be spawned as blocking. This setting is however,
/// primarily intended for heavier blocking requests that require evm execution for example,
/// `eth_call` and alike. This is intended to be used with a semaphore that must be acquired before
/// a new task is spawned to avoid unnecessary pooling if the number of inflight requests exceeds
/// the available threads in the pool.
///
/// tokio's blocking pool, has a default of 512 and could grow unbounded, since requests like
/// `eth_call` also require a lot of cpu which will occupy the thread, we can set this to a lower
/// value.
pub const DEFAULT_MAX_BLOCKING_IO_REQUEST: usize = 256;

/// The default maximum number tracing requests we're allowing concurrently.
/// Tracing is mostly CPU bound so we're limiting the number of concurrent requests to something
/// lower that the number of cores, in order to minimize the impact on the rest of the system.
Expand Down
Loading
Loading