From 8a2e35c75876905d6a2257a77bd220666659d8ae Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 25 Jul 2023 18:28:30 +0200 Subject: [PATCH 1/8] feat: use tracing pool --- Cargo.lock | 3 +- crates/rpc/rpc-builder/Cargo.toml | 3 -- crates/rpc/rpc-builder/src/auth.rs | 7 +--- crates/rpc/rpc-builder/src/eth.rs | 15 ++++--- crates/rpc/rpc-builder/src/lib.rs | 21 ++++------ crates/rpc/rpc/Cargo.toml | 1 + crates/rpc/rpc/src/call_guard.rs | 26 ------------- crates/rpc/rpc/src/eth/api/mod.rs | 7 ++++ crates/rpc/rpc/src/eth/api/server.rs | 3 +- crates/rpc/rpc/src/eth/api/state.rs | 7 +++- crates/rpc/rpc/src/eth/api/transactions.rs | 25 +++++++++++- crates/rpc/rpc/src/lib.rs | 4 +- .../src/tracing_call.rs} | 39 ++++++++++++++++++- 13 files changed, 98 insertions(+), 63 deletions(-) delete mode 100644 crates/rpc/rpc/src/call_guard.rs rename crates/rpc/{rpc-builder/src/tracing_pool.rs => rpc/src/tracing_call.rs} (71%) diff --git a/Cargo.lock b/Cargo.lock index b2c14b5d471..c852eda1156 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5681,6 +5681,7 @@ dependencies = [ "jsonwebtoken", "pin-project", "rand 0.8.5", + "rayon", "reth-consensus-common", "reth-interfaces", "reth-metrics", @@ -5740,8 +5741,6 @@ version = "0.1.0-alpha.4" dependencies = [ "hyper", "jsonrpsee", - "pin-project", - "rayon", "reth-beacon-consensus", "reth-interfaces", "reth-ipc", diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index c0078969d38..8d51e178022 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -33,9 +33,6 @@ strum = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] } thiserror = { workspace = true } tracing = { workspace = true } -rayon = { workspace = true } -pin-project = { workspace = true } -tokio = { workspace = true, features = ["sync"] } [dev-dependencies] reth-tracing = { path = "../../tracing" } diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 758713bbdeb..355ac95971b 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -14,11 +14,7 @@ use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{ BlockReaderIdExt, EvmEnvProvider, HeaderProvider, ReceiptProviderIdExt, StateProviderFactory, }; -use reth_rpc::{ - eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, - AuthLayer, Claims, EngineEthApi, EthApi, EthFilter, EthSubscriptionIdProvider, - JwtAuthValidator, JwtSecret, -}; +use reth_rpc::{eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, AuthLayer, Claims, EngineEthApi, EthApi, EthFilter, EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret, TracingCallPool}; use reth_rpc_api::{servers::*, EngineApiServer}; use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; @@ -64,6 +60,7 @@ where gas_oracle, EthConfig::default().rpc_gas_cap, Box::new(executor.clone()), + TracingCallPool::build().expect("failed to build tracing pool") ); let eth_filter = EthFilter::new( provider, diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index 34ec4989c62..89a228c9b99 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -1,11 +1,8 @@ -use reth_rpc::{ - eth::{ - cache::{EthStateCache, EthStateCacheConfig}, - gas_oracle::GasPriceOracleConfig, - RPC_DEFAULT_GAS_CAP, - }, - EthApi, EthFilter, EthPubSub, -}; +use reth_rpc::{eth::{ + cache::{EthStateCache, EthStateCacheConfig}, + gas_oracle::GasPriceOracleConfig, + RPC_DEFAULT_GAS_CAP, +}, EthApi, EthFilter, EthPubSub, TracingCallPool}; use serde::{Deserialize, Serialize}; /// The default maximum of logs in a single response. @@ -25,6 +22,8 @@ pub struct EthHandlers { pub filter: EthFilter, /// Handler for subscriptions only available for transports that support it (ws, ipc) pub pubsub: EthPubSub, + /// The configured tracing call pool + pub tracing_call_pool: TracingCallPool, } /// Additional config values for the eth namespace diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index afa7a37bf2b..a114ebb64cd 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -116,14 +116,10 @@ use reth_provider::{ BlockReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, EvmEnvProvider, StateProviderFactory, }; -use reth_rpc::{ - eth::{ - cache::{cache_new_blocks_task, EthStateCache}, - gas_oracle::GasPriceOracle, - }, - AdminApi, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, - NetApi, OtterscanApi, RPCApi, RethApi, TraceApi, TracingCallGuard, TxPoolApi, Web3Api, -}; +use reth_rpc::{eth::{ + cache::{cache_new_blocks_task, EthStateCache}, + gas_oracle::GasPriceOracle, +}, AdminApi, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, NetApi, OtterscanApi, RPCApi, RethApi, TraceApi, TracingCallGuard, TxPoolApi, Web3Api, TracingCallPool}; use reth_rpc_api::{servers::*, EngineApiServer}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use reth_transaction_pool::TransactionPool; @@ -154,9 +150,6 @@ mod eth; /// Common RPC constants. pub mod constants; -/// Additional support for tracing related rpc calls -pub mod tracing_pool; - // re-export for convenience pub use crate::eth::{EthConfig, EthHandlers}; pub use jsonrpsee::server::ServerBuilder; @@ -892,7 +885,7 @@ where &mut self, namespaces: impl Iterator, ) -> Vec { - let EthHandlers { api: eth_api, cache: eth_cache, filter: eth_filter, pubsub: eth_pubsub } = + let EthHandlers { api: eth_api, cache: eth_cache, filter: eth_filter, pubsub: eth_pubsub, tracing_call_pool } = self.with_eth(|eth| eth.clone()); // Create a copy, so we can list out all the methods for rpc_ api @@ -994,6 +987,7 @@ where ); let executor = Box::new(self.executor.clone()); + let tracing_call_pool = TracingCallPool::build().expect("failed to build tracing pool"); let api = EthApi::with_spawner( self.provider.clone(), self.pool.clone(), @@ -1002,6 +996,7 @@ where gas_oracle, self.config.eth.rpc_gas_cap, executor.clone(), + tracing_call_pool.clone(), ); let filter = EthFilter::new( self.provider.clone(), @@ -1019,7 +1014,7 @@ where executor, ); - let eth = EthHandlers { api, cache, filter, pubsub }; + let eth = EthHandlers { api, cache, filter, pubsub, tracing_call_pool }; self.eth = Some(eth); } f(self.eth.as_ref().expect("exists; qed")) diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index 11111cafbc6..37bbb8b302f 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -48,6 +48,7 @@ tower = "0.4" tokio-stream = { workspace = true, features = ["sync"] } tokio-util = "0.7" pin-project = { workspace = true } +rayon = { workspace = true } bytes.workspace = true secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recovery"] } diff --git a/crates/rpc/rpc/src/call_guard.rs b/crates/rpc/rpc/src/call_guard.rs deleted file mode 100644 index bec4ed3ca98..00000000000 --- a/crates/rpc/rpc/src/call_guard.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::sync::Arc; -use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore}; - -/// RPC Tracing call guard semaphore. -/// -/// This is used to restrict the number of concurrent RPC requests to tracing methods like -/// `debug_traceTransaction` because they can consume a lot of memory and CPU. -#[derive(Clone, Debug)] -pub struct TracingCallGuard(Arc); - -impl TracingCallGuard { - /// Create a new `TracingCallGuard` with the given maximum number of tracing calls in parallel. - pub fn new(max_tracing_requests: u32) -> Self { - Self(Arc::new(Semaphore::new(max_tracing_requests as usize))) - } - - /// See also [Semaphore::acquire_owned] - pub async fn acquire_owned(self) -> Result { - self.0.acquire_owned().await - } - - /// See also [Semaphore::acquire_many_owned] - pub async fn acquire_many_owned(self, n: u32) -> Result { - self.0.acquire_many_owned(n).await - } -} diff --git a/crates/rpc/rpc/src/eth/api/mod.rs b/crates/rpc/rpc/src/eth/api/mod.rs index de5d12f8d10..b5e8bd9ad30 100644 --- a/crates/rpc/rpc/src/eth/api/mod.rs +++ b/crates/rpc/rpc/src/eth/api/mod.rs @@ -37,6 +37,7 @@ mod sign; mod state; mod transactions; +use crate::TracingCallPool; pub use transactions::{EthTransactions, TransactionSource}; /// `Eth` API trait. @@ -88,6 +89,7 @@ where eth_cache: EthStateCache, gas_oracle: GasPriceOracle, gas_cap: impl Into, + tracing_call_pool: TracingCallPool, ) -> Self { Self::with_spawner( provider, @@ -97,6 +99,7 @@ where gas_oracle, gas_cap.into().into(), Box::::default(), + tracing_call_pool, ) } @@ -109,6 +112,7 @@ where gas_oracle: GasPriceOracle, gas_cap: u64, task_spawner: Box, + tracing_call_pool: TracingCallPool, ) -> Self { // get the block number of the latest block let latest_block = provider @@ -129,6 +133,7 @@ where starting_block: U256::from(latest_block), task_spawner, pending_block: Default::default(), + tracing_call_pool, }; Self { inner: Arc::new(inner) } } @@ -421,4 +426,6 @@ struct EthApiInner { task_spawner: Box, /// Cached pending block if any pending_block: Mutex>, + /// A pool dedicated to tracing calls + tracing_call_pool: TracingCallPool, } diff --git a/crates/rpc/rpc/src/eth/api/server.rs b/crates/rpc/rpc/src/eth/api/server.rs index 1cca7addc90..663308cd895 100644 --- a/crates/rpc/rpc/src/eth/api/server.rs +++ b/crates/rpc/rpc/src/eth/api/server.rs @@ -392,7 +392,7 @@ where mod tests { use crate::{ eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, - EthApi, + EthApi, TracingCallPool, }; use jsonrpsee::types::error::INVALID_PARAMS_CODE; use reth_interfaces::test_utils::{generators, generators::Rng}; @@ -428,6 +428,7 @@ mod tests { cache.clone(), GasPriceOracle::new(provider, Default::default(), cache), ETHEREUM_BLOCK_GAS_LIMIT, + TracingCallPool::build().expect("failed to build tracing pool"), ) } diff --git a/crates/rpc/rpc/src/eth/api/state.rs b/crates/rpc/rpc/src/eth/api/state.rs index 0930bf0b6c5..2887ac58fb8 100644 --- a/crates/rpc/rpc/src/eth/api/state.rs +++ b/crates/rpc/rpc/src/eth/api/state.rs @@ -146,7 +146,10 @@ where #[cfg(test)] mod tests { use super::*; - use crate::eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}; + use crate::{ + eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, + TracingCallPool, + }; use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, StorageKey, StorageValue}; use reth_provider::test_utils::{ExtendedAccount, MockEthProvider, NoopProvider}; use reth_transaction_pool::test_utils::testing_pool; @@ -165,6 +168,7 @@ mod tests { cache.clone(), GasPriceOracle::new(NoopProvider::default(), Default::default(), cache), ETHEREUM_BLOCK_GAS_LIMIT, + TracingCallPool::build().expect("failed to build tracing pool"), ); let address = Address::random(); let storage = eth_api.storage_at(address, U256::ZERO.into(), None).unwrap(); @@ -186,6 +190,7 @@ mod tests { cache.clone(), GasPriceOracle::new(mock_provider, Default::default(), cache), ETHEREUM_BLOCK_GAS_LIMIT, + TracingCallPool::build().expect("failed to build tracing pool"), ); let storage_key: U256 = storage_key.into(); diff --git a/crates/rpc/rpc/src/eth/api/transactions.rs b/crates/rpc/rpc/src/eth/api/transactions.rs index e8dd2ed7ccb..0720967c14d 100644 --- a/crates/rpc/rpc/src/eth/api/transactions.rs +++ b/crates/rpc/rpc/src/eth/api/transactions.rs @@ -54,6 +54,12 @@ pub trait EthTransactions: Send + Sync { where F: FnOnce(StateProviderBox<'_>) -> EthResult; + /// Executes the closure with the state that corresponds to the given [BlockId] on a new task + async fn spawn_with_state_at_block(&self, at: BlockId, f: F) -> EthResult + where + F: FnOnce(StateProviderBox<'_>) -> EthResult + Send + 'static, + T: Send + 'static; + /// Returns the revm evm env for the requested [BlockId] /// /// If the [BlockId] this will return the [BlockId::Hash] of the block the env was configured @@ -245,6 +251,22 @@ where f(state) } + async fn spawn_with_state_at_block(&self, at: BlockId, f: F) -> EthResult + where + F: FnOnce(StateProviderBox<'_>) -> EthResult + Send + 'static, + T: Send + 'static, + { + let this = self.clone(); + self.inner + .tracing_call_pool + .spawn(move || { + let state = this.state_at(at)?; + f(state) + }) + .await + .map_err(|_| EthApiError::InternalTracingError)? + } + async fn evm_env_at(&self, at: BlockId) -> EthResult<(CfgEnv, BlockEnv, BlockId)> { if at.is_pending() { let PendingBlockEnv { cfg, block_env, origin } = self.pending_block_env_and_cfg()?; @@ -878,7 +900,7 @@ mod tests { use super::*; use crate::{ eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, - EthApi, + EthApi, TracingCallPool, }; use reth_network_api::noop::NoopNetwork; use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, hex_literal::hex, Bytes}; @@ -900,6 +922,7 @@ mod tests { cache.clone(), GasPriceOracle::new(noop_provider, Default::default(), cache), ETHEREUM_BLOCK_GAS_LIMIT, + TracingCallPool::build().expect("failed to build tracing pool"), ); // https://etherscan.io/tx/0xa694b71e6c128a2ed8e2e0f6770bddbe52e3bb8f10e8472f9a79ab81497a8b5d diff --git a/crates/rpc/rpc/src/lib.rs b/crates/rpc/rpc/src/lib.rs index b885608139a..e1818523f29 100644 --- a/crates/rpc/rpc/src/lib.rs +++ b/crates/rpc/rpc/src/lib.rs @@ -31,7 +31,6 @@ //! disk-io, hence these calls are spawned as futures to a blocking task manually. mod admin; -mod call_guard; mod debug; mod engine; pub mod eth; @@ -41,11 +40,11 @@ mod otterscan; mod reth; mod rpc; mod trace; +pub mod tracing_call; mod txpool; mod web3; pub use admin::AdminApi; -pub use call_guard::TracingCallGuard; pub use debug::DebugApi; pub use engine::{EngineApi, EngineEthApi}; pub use eth::{EthApi, EthApiSpec, EthFilter, EthPubSub, EthSubscriptionIdProvider}; @@ -55,6 +54,7 @@ pub use otterscan::OtterscanApi; pub use reth::RethApi; pub use rpc::RPCApi; pub use trace::TraceApi; +pub use tracing_call::{TracingCallGuard, TracingCallPool}; pub use txpool::TxPoolApi; pub use web3::Web3Api; diff --git a/crates/rpc/rpc-builder/src/tracing_pool.rs b/crates/rpc/rpc/src/tracing_call.rs similarity index 71% rename from crates/rpc/rpc-builder/src/tracing_pool.rs rename to crates/rpc/rpc/src/tracing_call.rs index dd305611780..26956ae2d50 100644 --- a/crates/rpc/rpc-builder/src/tracing_pool.rs +++ b/crates/rpc/rpc/src/tracing_call.rs @@ -8,9 +8,46 @@ use std::{ task::{ready, Context, Poll}, thread, }; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore}; + +/// RPC Tracing call guard semaphore. +/// +/// This is used to restrict the number of concurrent RPC requests to tracing methods like +/// `debug_traceTransaction` because they can consume a lot of memory and CPU. +/// +/// This types serves as an entry guard for the [TracingCallPool] and is used to rate limit parallel +/// tracing calls on the pool. +#[derive(Clone, Debug)] +pub struct TracingCallGuard(Arc); + +impl TracingCallGuard { + /// Create a new `TracingCallGuard` with the given maximum number of tracing calls in parallel. + pub fn new(max_tracing_requests: u32) -> Self { + Self(Arc::new(Semaphore::new(max_tracing_requests as usize))) + } + + /// See also [Semaphore::acquire_owned] + pub async fn acquire_owned(self) -> Result { + self.0.acquire_owned().await + } + + /// See also [Semaphore::acquire_many_owned] + pub async fn acquire_many_owned(self, n: u32) -> Result { + self.0.acquire_many_owned(n).await + } +} /// Used to execute tracing calls on a rayon threadpool from within a tokio runtime. +/// +/// This is a dedicated threadpool for tracing calls which are CPU bound. +/// RPC calls that perform blocking IO (disk lookups) are not executed on this pool but on the tokio +/// runtime's blocking pool, which performs poorly with CPU bound tasks. Once the tokio blocking +/// pool is saturated it is converted into a queue, tracing calls could then interfere with the +/// queue and block other RPC calls. +/// +/// See also [tokio-docs] for more information. +/// +/// [tokio-docs]: https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code #[derive(Clone, Debug)] pub struct TracingCallPool { pool: Arc, From a2f42d38593422e90e7340284712f0b17262cbcd Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 25 Jul 2023 18:34:43 +0200 Subject: [PATCH 2/8] feat: perform tx traces on tracing pool --- crates/rpc/rpc/src/eth/api/transactions.rs | 14 ++++-- crates/rpc/rpc/src/trace.rs | 57 ++++++++++------------ 2 files changed, 36 insertions(+), 35 deletions(-) diff --git a/crates/rpc/rpc/src/eth/api/transactions.rs b/crates/rpc/rpc/src/eth/api/transactions.rs index 0720967c14d..9e7393958a0 100644 --- a/crates/rpc/rpc/src/eth/api/transactions.rs +++ b/crates/rpc/rpc/src/eth/api/transactions.rs @@ -212,6 +212,9 @@ pub trait EthTransactions: Send + Sync { /// state by executing them first. /// The callback `f` is invoked with the [ResultAndState] after the transaction was executed and /// the database that points to the beginning of the transaction. + /// + /// Note: this is expected to be be executed on a threadpool where blocking is allowed, such as + /// [TracingCallPool](crate::tracing_call::TracingCallPool). async fn trace_transaction_in_block( &self, hash: H256, @@ -225,7 +228,9 @@ pub trait EthTransactions: Send + Sync { ResultAndState, StateCacheDB<'a>, ) -> EthResult - + Send; + + Send + + 'static, + R: Send + 'static; } #[async_trait] @@ -625,7 +630,9 @@ where ResultAndState, StateCacheDB<'a>, ) -> EthResult - + Send, + + Send + + 'static, + R: Send + 'static, { let (transaction, block) = match self.transaction_and_block(hash).await? { None => return Ok(None), @@ -640,7 +647,7 @@ where let parent_block = block.parent_hash; let block_txs = block.body; - self.with_state_at_block(parent_block.into(), |state| { + self.spawn_with_state_at_block(parent_block.into(), move |state| { let mut db = SubState::new(State::new(state)); // replay all transactions prior to the targeted transaction @@ -652,6 +659,7 @@ where let (res, _, db) = inspect_and_return_db(db, env, &mut inspector)?; f(tx_info, inspector, res, db) }) + .await .map(Some) } } diff --git a/crates/rpc/rpc/src/trace.rs b/crates/rpc/rpc/src/trace.rs index ff2d09e50c7..5d3f6fe7c38 100644 --- a/crates/rpc/rpc/src/trace.rs +++ b/crates/rpc/rpc/src/trace.rs @@ -250,22 +250,19 @@ where trace_types: HashSet, ) -> EthResult { let config = tracing_config(&trace_types); - self.on_blocking_task(|this| async move { - this.inner - .eth_api - .trace_transaction_in_block(hash, config, |_, inspector, res, db| { - let trace_res = inspector.into_parity_builder().into_trace_results_with_state( - res, - &trace_types, - &db, - )?; - Ok(trace_res) - }) - .await - .transpose() - .ok_or_else(|| EthApiError::TransactionNotFound)? - }) - .await + self.inner + .eth_api + .trace_transaction_in_block(hash, config, move |_, inspector, res, db| { + let trace_res = inspector.into_parity_builder().into_trace_results_with_state( + res, + &trace_types, + &db, + )?; + Ok(trace_res) + }) + .await + .transpose() + .ok_or_else(|| EthApiError::TransactionNotFound)? } /// Returns transaction trace objects at the given index @@ -308,22 +305,18 @@ where &self, hash: H256, ) -> EthResult>> { - self.on_blocking_task(|this| async move { - this.inner - .eth_api - .trace_transaction_in_block( - hash, - TracingInspectorConfig::default_parity(), - |tx_info, inspector, _, _| { - let traces = inspector - .into_parity_builder() - .into_localized_transaction_traces(tx_info); - Ok(traces) - }, - ) - .await - }) - .await + self.inner + .eth_api + .trace_transaction_in_block( + hash, + TracingInspectorConfig::default_parity(), + move |tx_info, inspector, _, _| { + let traces = + inspector.into_parity_builder().into_localized_transaction_traces(tx_info); + Ok(traces) + }, + ) + .await } /// Executes all transactions of a block and returns a list of callback results. From 02939f8abdb001fe9a2d66293552ee2b386a9c6b Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 25 Jul 2023 18:42:00 +0200 Subject: [PATCH 3/8] feat: perform trace at with state on tracing pool --- crates/rpc/rpc/src/eth/api/transactions.rs | 25 +++++++++++++++------- crates/rpc/rpc/src/trace.rs | 12 +++++------ 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/crates/rpc/rpc/src/eth/api/transactions.rs b/crates/rpc/rpc/src/eth/api/transactions.rs index 9e7393958a0..cfd18ff8a5d 100644 --- a/crates/rpc/rpc/src/eth/api/transactions.rs +++ b/crates/rpc/rpc/src/eth/api/transactions.rs @@ -173,6 +173,8 @@ pub trait EthTransactions: Send + Sync { /// /// The callback is then called with the [TracingInspector] and the [ResultAndState] after the /// configured [Env] was inspected. + /// + /// Caution: this is blocking fn trace_at( &self, env: Env, @@ -190,7 +192,7 @@ pub trait EthTransactions: Send + Sync { /// /// The callback is then called with the [TracingInspector] and the [ResultAndState] after the /// configured [Env] was inspected. - fn trace_at_with_state( + async fn trace_at_with_state( &self, env: Env, config: TracingInspectorConfig, @@ -198,7 +200,10 @@ pub trait EthTransactions: Send + Sync { f: F, ) -> EthResult where - F: for<'a> FnOnce(TracingInspector, ResultAndState, StateCacheDB<'a>) -> EthResult; + F: for<'a> FnOnce(TracingInspector, ResultAndState, StateCacheDB<'a>) -> EthResult + + Send + + 'static, + R: Send + 'static; /// Fetches the transaction and the transaction's block async fn transaction_and_block( @@ -213,9 +218,9 @@ pub trait EthTransactions: Send + Sync { /// The callback `f` is invoked with the [ResultAndState] after the transaction was executed and /// the database that points to the beginning of the transaction. /// - /// Note: this is expected to be be executed on a threadpool where blocking is allowed, such as + /// Note: Implementers should use a threadpool where blocking is allowed, such as /// [TracingCallPool](crate::tracing_call::TracingCallPool). - async fn trace_transaction_in_block( + async fn spawn_trace_transaction_in_block( &self, hash: H256, config: TracingInspectorConfig, @@ -580,7 +585,7 @@ where }) } - fn trace_at_with_state( + async fn trace_at_with_state( &self, env: Env, config: TracingInspectorConfig, @@ -588,15 +593,19 @@ where f: F, ) -> EthResult where - F: for<'a> FnOnce(TracingInspector, ResultAndState, StateCacheDB<'a>) -> EthResult, + F: for<'a> FnOnce(TracingInspector, ResultAndState, StateCacheDB<'a>) -> EthResult + + Send + + 'static, + R: Send + 'static, { - self.with_state_at_block(at, |state| { + self.spawn_with_state_at_block(at, move |state| { let db = SubState::new(State::new(state)); let mut inspector = TracingInspector::new(config); let (res, _, db) = inspect_and_return_db(db, env, &mut inspector)?; f(inspector, res, db) }) + .await } async fn transaction_and_block( @@ -617,7 +626,7 @@ where Ok(block.map(|block| (transaction, block.seal(block_hash)))) } - async fn trace_transaction_in_block( + async fn spawn_trace_transaction_in_block( &self, hash: H256, config: TracingInspectorConfig, diff --git a/crates/rpc/rpc/src/trace.rs b/crates/rpc/rpc/src/trace.rs index 5d3f6fe7c38..374570213a9 100644 --- a/crates/rpc/rpc/src/trace.rs +++ b/crates/rpc/rpc/src/trace.rs @@ -166,16 +166,16 @@ where let config = tracing_config(&trace_types); - self.on_blocking_task(|this| async move { - this.inner.eth_api.trace_at_with_state(env, config, at, |inspector, res, db| { + self.inner + .eth_api + .trace_at_with_state(env, config, at, move |inspector, res, db| { Ok(inspector.into_parity_builder().into_trace_results_with_state( res, &trace_types, &db, )?) }) - }) - .await + .await } /// Performs multiple call traces on top of the same block. i.e. transaction n will be executed @@ -252,7 +252,7 @@ where let config = tracing_config(&trace_types); self.inner .eth_api - .trace_transaction_in_block(hash, config, move |_, inspector, res, db| { + .spawn_trace_transaction_in_block(hash, config, move |_, inspector, res, db| { let trace_res = inspector.into_parity_builder().into_trace_results_with_state( res, &trace_types, @@ -307,7 +307,7 @@ where ) -> EthResult>> { self.inner .eth_api - .trace_transaction_in_block( + .spawn_trace_transaction_in_block( hash, TracingInspectorConfig::default_parity(), move |tx_info, inspector, _, _| { From bc08e5a143f6ea893e1352b07260b5047796e3db Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 25 Jul 2023 18:55:51 +0200 Subject: [PATCH 4/8] more tracing pool --- crates/rpc/rpc-builder/src/lib.rs | 2 +- crates/rpc/rpc/src/eth/api/mod.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index a114ebb64cd..f871a93c33c 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -885,7 +885,7 @@ where &mut self, namespaces: impl Iterator, ) -> Vec { - let EthHandlers { api: eth_api, cache: eth_cache, filter: eth_filter, pubsub: eth_pubsub, tracing_call_pool } = + let EthHandlers { api: eth_api, cache: eth_cache, filter: eth_filter, pubsub: eth_pubsub, tracing_call_pool: _ } = self.with_eth(|eth| eth.clone()); // Create a copy, so we can list out all the methods for rpc_ api diff --git a/crates/rpc/rpc/src/eth/api/mod.rs b/crates/rpc/rpc/src/eth/api/mod.rs index b5e8bd9ad30..20ecf114e2a 100644 --- a/crates/rpc/rpc/src/eth/api/mod.rs +++ b/crates/rpc/rpc/src/eth/api/mod.rs @@ -104,6 +104,7 @@ where } /// Creates a new, shareable instance. + #[allow(clippy::too_many_arguments)] pub fn with_spawner( provider: Provider, pool: Pool, From f202070d2b7565e0945e6236a65c2abd40c2fdef Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 25 Jul 2023 19:29:26 +0200 Subject: [PATCH 5/8] chore: remove on_blocking tasks --- crates/rpc/rpc-builder/src/auth.rs | 8 +- crates/rpc/rpc-builder/src/eth.rs | 13 +- crates/rpc/rpc-builder/src/lib.rs | 36 ++-- crates/rpc/rpc/src/debug.rs | 147 ++++++----------- crates/rpc/rpc/src/eth/api/transactions.rs | 92 +++++------ crates/rpc/rpc/src/trace.rs | 182 +++++++-------------- 6 files changed, 189 insertions(+), 289 deletions(-) diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 355ac95971b..d7d14f963d1 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -14,7 +14,11 @@ use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{ BlockReaderIdExt, EvmEnvProvider, HeaderProvider, ReceiptProviderIdExt, StateProviderFactory, }; -use reth_rpc::{eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, AuthLayer, Claims, EngineEthApi, EthApi, EthFilter, EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret, TracingCallPool}; +use reth_rpc::{ + eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, + AuthLayer, Claims, EngineEthApi, EthApi, EthFilter, EthSubscriptionIdProvider, + JwtAuthValidator, JwtSecret, TracingCallPool, +}; use reth_rpc_api::{servers::*, EngineApiServer}; use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; @@ -60,7 +64,7 @@ where gas_oracle, EthConfig::default().rpc_gas_cap, Box::new(executor.clone()), - TracingCallPool::build().expect("failed to build tracing pool") + TracingCallPool::build().expect("failed to build tracing pool"), ); let eth_filter = EthFilter::new( provider, diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index 89a228c9b99..b372d3be77e 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -1,8 +1,11 @@ -use reth_rpc::{eth::{ - cache::{EthStateCache, EthStateCacheConfig}, - gas_oracle::GasPriceOracleConfig, - RPC_DEFAULT_GAS_CAP, -}, EthApi, EthFilter, EthPubSub, TracingCallPool}; +use reth_rpc::{ + eth::{ + cache::{EthStateCache, EthStateCacheConfig}, + gas_oracle::GasPriceOracleConfig, + RPC_DEFAULT_GAS_CAP, + }, + EthApi, EthFilter, EthPubSub, TracingCallPool, +}; use serde::{Deserialize, Serialize}; /// The default maximum of logs in a single response. diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index f871a93c33c..7f301226d61 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -116,10 +116,15 @@ use reth_provider::{ BlockReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, EvmEnvProvider, StateProviderFactory, }; -use reth_rpc::{eth::{ - cache::{cache_new_blocks_task, EthStateCache}, - gas_oracle::GasPriceOracle, -}, AdminApi, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, NetApi, OtterscanApi, RPCApi, RethApi, TraceApi, TracingCallGuard, TxPoolApi, Web3Api, TracingCallPool}; +use reth_rpc::{ + eth::{ + cache::{cache_new_blocks_task, EthStateCache}, + gas_oracle::GasPriceOracle, + }, + AdminApi, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, + NetApi, OtterscanApi, RPCApi, RethApi, TraceApi, TracingCallGuard, TracingCallPool, TxPoolApi, + Web3Api, +}; use reth_rpc_api::{servers::*, EngineApiServer}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use reth_transaction_pool::TransactionPool; @@ -806,15 +811,9 @@ where let eth = self.eth_handlers(); self.modules.insert( RethRpcModule::Trace, - TraceApi::new( - self.provider.clone(), - eth.api.clone(), - eth.cache, - Box::new(self.executor.clone()), - self.tracing_call_guard.clone(), - ) - .into_rpc() - .into(), + TraceApi::new(self.provider.clone(), eth.api.clone(), self.tracing_call_guard.clone()) + .into_rpc() + .into(), ); self } @@ -885,8 +884,13 @@ where &mut self, namespaces: impl Iterator, ) -> Vec { - let EthHandlers { api: eth_api, cache: eth_cache, filter: eth_filter, pubsub: eth_pubsub, tracing_call_pool: _ } = - self.with_eth(|eth| eth.clone()); + let EthHandlers { + api: eth_api, + cache: eth_cache, + filter: eth_filter, + pubsub: eth_pubsub, + tracing_call_pool: _, + } = self.with_eth(|eth| eth.clone()); // Create a copy, so we can list out all the methods for rpc_ api let namespaces: Vec<_> = namespaces.collect(); @@ -923,8 +927,6 @@ where RethRpcModule::Trace => TraceApi::new( self.provider.clone(), eth_api.clone(), - eth_cache.clone(), - Box::new(self.executor.clone()), self.tracing_call_guard.clone(), ) .into_rpc() diff --git a/crates/rpc/rpc/src/debug.rs b/crates/rpc/rpc/src/debug.rs index d6abad21fe2..3c2da93f61d 100644 --- a/crates/rpc/rpc/src/debug.rs +++ b/crates/rpc/rpc/src/debug.rs @@ -40,8 +40,8 @@ use revm_primitives::{ db::{DatabaseCommit, DatabaseRef}, BlockEnv, CfgEnv, }; -use std::{future::Future, sync::Arc}; -use tokio::sync::{mpsc, oneshot, AcquireError, OwnedSemaphorePermit}; +use std::sync::Arc; +use tokio::sync::{mpsc, AcquireError, OwnedSemaphorePermit}; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; /// `debug` API implementation. @@ -74,30 +74,13 @@ where Provider: BlockReaderIdExt + HeaderProvider + 'static, Eth: EthTransactions + 'static, { - /// Executes the future on a new blocking task. - async fn on_blocking_task(&self, c: C) -> EthResult - where - C: FnOnce(Self) -> F, - F: Future> + Send + 'static, - R: Send + 'static, - { - let (tx, rx) = oneshot::channel(); - let this = self.clone(); - let f = c(this); - self.inner.task_spawner.spawn_blocking(Box::pin(async move { - let res = f.await; - let _ = tx.send(res); - })); - rx.await.map_err(|_| EthApiError::InternalTracingError)? - } - /// Acquires a permit to execute a tracing call. async fn acquire_trace_permit(&self) -> Result { self.inner.tracing_call_guard.clone().acquire_owned().await } - /// Trace the entire block - fn trace_block_with_sync( + /// Trace the entire block asynchronously + async fn trace_block_with( &self, at: BlockId, transactions: Vec, @@ -107,43 +90,31 @@ where ) -> EthResult> { // replay all transactions of the block let this = self.clone(); - self.inner.eth_api.with_state_at_block(at, move |state| { - let mut results = Vec::with_capacity(transactions.len()); - let mut db = SubState::new(State::new(state)); - - let mut transactions = transactions.into_iter().peekable(); - while let Some(tx) = transactions.next() { - let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?; - let tx = tx_env_with_recovered(&tx); - let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx }; - let (result, state_changes) = - this.trace_transaction(opts.clone(), env, at, &mut db)?; - results.push(TraceResult::Success { result }); - - if transactions.peek().is_some() { - // need to apply the state changes of this transaction before executing the next - // transaction - db.commit(state_changes) - } - } + self.inner + .eth_api + .spawn_with_state_at_block(at, move |state| { + let mut results = Vec::with_capacity(transactions.len()); + let mut db = SubState::new(State::new(state)); - Ok(results) - }) - } + let mut transactions = transactions.into_iter().peekable(); + while let Some(tx) = transactions.next() { + let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?; + let tx = tx_env_with_recovered(&tx); + let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx }; + let (result, state_changes) = + this.trace_transaction(opts.clone(), env, at, &mut db)?; + results.push(TraceResult::Success { result }); + + if transactions.peek().is_some() { + // need to apply the state changes of this transaction before executing the + // next transaction + db.commit(state_changes) + } + } - /// Trace the entire block asynchronously - async fn trace_block_with( - &self, - at: BlockId, - transactions: Vec, - cfg: CfgEnv, - block_env: BlockEnv, - opts: GethDebugTracingOptions, - ) -> EthResult> { - self.on_blocking_task(|this| async move { - this.trace_block_with_sync(at, transactions, cfg, block_env, opts) - }) - .await + Ok(results) + }) + .await } /// Replays the given block and returns the trace of each transaction. @@ -171,17 +142,6 @@ where &self, block_id: BlockId, opts: GethDebugTracingOptions, - ) -> EthResult> { - self.on_blocking_task( - |this| async move { this.try_debug_trace_block(block_id, opts).await }, - ) - .await - } - - async fn try_debug_trace_block( - &self, - block_id: BlockId, - opts: GethDebugTracingOptions, ) -> EthResult> { let block_hash = self .inner @@ -199,7 +159,7 @@ where // its parent block's state let state_at = block.parent_hash; - self.trace_block_with_sync(state_at.into(), block.body, cfg, block_env, opts) + self.trace_block_with(state_at.into(), block.body, cfg, block_env, opts).await } /// Trace the transaction according to the provided options. @@ -221,8 +181,10 @@ where let state_at: BlockId = block.parent_hash.into(); let block_txs = block.body; - self.on_blocking_task(|this| async move { - this.inner.eth_api.with_state_at_block(state_at, |state| { + let this = self.clone(); + self.inner + .eth_api + .spawn_with_state_at_block(state_at, move |state| { // configure env for the target transaction let tx = transaction.into_recovered(); @@ -239,8 +201,7 @@ where let env = Env { cfg, block: block_env, tx: tx_env_with_recovered(&tx) }; this.trace_transaction(opts, env, state_at, &mut db).map(|(trace, _)| trace) }) - }) - .await + .await } /// The debug_traceCall method lets you run an `eth_call` within the context of the given block @@ -250,22 +211,6 @@ where call: CallRequest, block_id: Option, opts: GethDebugTracingCallOptions, - ) -> EthResult { - self.on_blocking_task(|this| async move { - this.try_debug_trace_call(call, block_id, opts).await - }) - .await - } - - /// The debug_traceCall method lets you run an `eth_call` within the context of the given block - /// execution using the final state of parent block as the base. - /// - /// Caution: while this is async, this may still be blocking on necessary DB io. - async fn try_debug_trace_call( - &self, - call: CallRequest, - block_id: Option, - opts: GethDebugTracingCallOptions, ) -> EthResult { let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)); let GethDebugTracingCallOptions { tracing_options, state_overrides, block_overrides } = @@ -278,10 +223,13 @@ where GethDebugTracerType::BuiltInTracer(tracer) => match tracer { GethDebugBuiltInTracerType::FourByteTracer => { let mut inspector = FourByteInspector::default(); - let (_res, _) = self + let inspector = self .inner .eth_api - .inspect_call_at(call, at, overrides, &mut inspector) + .spawn_with_call_at(call, at, overrides, move |db, env| { + inspect(db, env, &mut inspector)?; + Ok(inspector) + }) .await?; return Ok(FourByteFrame::from(inspector).into()) } @@ -295,10 +243,13 @@ where .set_record_logs(call_config.with_log.unwrap_or_default()), ); - let _ = self + let inspector = self .inner .eth_api - .inspect_call_at(call, at, overrides, &mut inspector) + .spawn_with_call_at(call, at, overrides, move |db, env| { + inspect(db, env, &mut inspector)?; + Ok(inspector) + }) .await?; let frame = inspector.into_geth_builder().geth_call_traces(call_config); @@ -351,8 +302,14 @@ where let mut inspector = TracingInspector::new(inspector_config); - let (res, _) = - self.inner.eth_api.inspect_call_at(call, at, overrides, &mut inspector).await?; + let (res, inspector) = self + .inner + .eth_api + .spawn_with_call_at(call, at, overrides, move |db, env| { + let (res, _) = inspect(db, env, &mut inspector)?; + Ok((res, inspector)) + }) + .await?; let gas_used = res.result.gas_used(); let return_value = result_output(&res.result).unwrap_or_default().into(); let frame = inspector.into_geth_builder().geth_traces(gas_used, return_value, config); @@ -365,6 +322,8 @@ where /// Returns the trace frame and the state that got updated after executing the transaction. /// /// Note: this does not apply any state overrides if they're configured in the `opts`. + /// + /// Caution: this is blocking and should be performed on a blocking task. fn trace_transaction( &self, opts: GethDebugTracingOptions, diff --git a/crates/rpc/rpc/src/eth/api/transactions.rs b/crates/rpc/rpc/src/eth/api/transactions.rs index cfd18ff8a5d..82c01c826dd 100644 --- a/crates/rpc/rpc/src/eth/api/transactions.rs +++ b/crates/rpc/rpc/src/eth/api/transactions.rs @@ -40,7 +40,9 @@ use revm_primitives::{utilities::create_address, Env, ResultAndState, SpecId}; /// Helper alias type for the state's [CacheDB] pub(crate) type StateCacheDB<'r> = CacheDB>>; -/// Commonly used transaction related functions for the [EthApi] type in the `eth_` namespace +/// Commonly used transaction related functions for the [EthApi] type in the `eth_` namespace. +/// +/// Async functions that are spawned onto the [TracingCallPool] begin with `spawn_` #[async_trait::async_trait] pub trait EthTransactions: Send + Sync { /// Returns default gas limit to use for `eth_call` and tracing RPC methods. @@ -127,8 +129,8 @@ pub trait EthTransactions: Send + Sync { async fn send_transaction(&self, request: TransactionRequest) -> EthResult; /// Prepares the state and env for the given [CallRequest] at the given [BlockId] and executes - /// the closure. - async fn with_call_at( + /// the closure on a new task returning the result of the closure. + async fn spawn_with_call_at( &self, request: CallRequest, at: BlockId, @@ -136,7 +138,8 @@ pub trait EthTransactions: Send + Sync { f: F, ) -> EthResult where - F: for<'r> FnOnce(StateCacheDB<'r>, Env) -> EthResult + Send; + F: for<'r> FnOnce(StateCacheDB<'r>, Env) -> EthResult + Send + 'static, + R: Send + 'static; /// Executes the call request at the given [BlockId]. async fn transact_call_at( @@ -146,8 +149,9 @@ pub trait EthTransactions: Send + Sync { overrides: EvmOverrides, ) -> EthResult<(ResultAndState, Env)>; - /// Executes the call request at the given [BlockId] - async fn inspect_call_at( + /// Executes the call request at the given [BlockId] on a new task and returns the result of the + /// inspect call. + async fn spawn_inspect_call_at( &self, request: CallRequest, at: BlockId, @@ -155,18 +159,7 @@ pub trait EthTransactions: Send + Sync { inspector: I, ) -> EthResult<(ResultAndState, Env)> where - I: for<'r> Inspector> + Send; - - /// Executes the call request at the given [BlockId] - async fn inspect_call_at_and_return_state<'a, I>( - &'a self, - request: CallRequest, - at: BlockId, - overrides: EvmOverrides, - inspector: I, - ) -> EthResult<(ResultAndState, Env, StateCacheDB<'a>)> - where - I: Inspector> + Send; + I: for<'r> Inspector> + Send + 'static; /// Executes the transaction on top of the given [BlockId] with a tracer configured by the /// config. @@ -192,7 +185,7 @@ pub trait EthTransactions: Send + Sync { /// /// The callback is then called with the [TracingInspector] and the [ResultAndState] after the /// configured [Env] was inspected. - async fn trace_at_with_state( + async fn spawn_trace_at_with_state( &self, env: Env, config: TracingInspectorConfig, @@ -505,7 +498,7 @@ where Ok(hash) } - async fn with_call_at( + async fn spawn_with_call_at( &self, request: CallRequest, at: BlockId, @@ -513,15 +506,29 @@ where f: F, ) -> EthResult where - F: for<'r> FnOnce(StateCacheDB<'r>, Env) -> EthResult + Send, + F: for<'r> FnOnce(StateCacheDB<'r>, Env) -> EthResult + Send + 'static, + R: Send + 'static, { let (cfg, block_env, at) = self.evm_env_at(at).await?; - let state = self.state_at(at)?; - let mut db = SubState::new(State::new(state)); - - let env = - prepare_call_env(cfg, block_env, request, self.call_gas_limit(), &mut db, overrides)?; - f(db, env) + let this = self.clone(); + self.inner + .tracing_call_pool + .spawn(move || { + let state = this.state_at(at)?; + let mut db = SubState::new(State::new(state)); + + let env = prepare_call_env( + cfg, + block_env, + request, + this.call_gas_limit(), + &mut db, + overrides, + )?; + f(db, env) + }) + .await + .map_err(|_| EthApiError::InternalTracingError)? } async fn transact_call_at( @@ -530,10 +537,11 @@ where at: BlockId, overrides: EvmOverrides, ) -> EthResult<(ResultAndState, Env)> { - self.with_call_at(request, at, overrides, |mut db, env| transact(&mut db, env)).await + self.spawn_with_call_at(request, at, overrides, move |mut db, env| transact(&mut db, env)) + .await } - async fn inspect_call_at( + async fn spawn_inspect_call_at( &self, request: CallRequest, at: BlockId, @@ -541,28 +549,10 @@ where inspector: I, ) -> EthResult<(ResultAndState, Env)> where - I: for<'r> Inspector> + Send, + I: for<'r> Inspector> + Send + 'static, { - self.with_call_at(request, at, overrides, |db, env| inspect(db, env, inspector)).await - } - - async fn inspect_call_at_and_return_state<'a, I>( - &'a self, - request: CallRequest, - at: BlockId, - overrides: EvmOverrides, - inspector: I, - ) -> EthResult<(ResultAndState, Env, StateCacheDB<'a>)> - where - I: Inspector> + Send, - { - let (cfg, block_env, at) = self.evm_env_at(at).await?; - let state = self.state_at(at)?; - let mut db = SubState::new(State::new(state)); - - let env = - prepare_call_env(cfg, block_env, request, self.call_gas_limit(), &mut db, overrides)?; - inspect_and_return_db(db, env, inspector) + self.spawn_with_call_at(request, at, overrides, move |db, env| inspect(db, env, inspector)) + .await } fn trace_at( @@ -585,7 +575,7 @@ where }) } - async fn trace_at_with_state( + async fn spawn_trace_at_with_state( &self, env: Env, config: TracingInspectorConfig, diff --git a/crates/rpc/rpc/src/trace.rs b/crates/rpc/rpc/src/trace.rs index 374570213a9..494741097c7 100644 --- a/crates/rpc/rpc/src/trace.rs +++ b/crates/rpc/rpc/src/trace.rs @@ -1,8 +1,7 @@ use crate::{ eth::{ - cache::EthStateCache, error::{EthApiError, EthResult}, - revm_utils::{inspect, prepare_call_env, EvmOverrides}, + revm_utils::{inspect, inspect_and_return_db, prepare_call_env, EvmOverrides}, utils::recover_raw_transaction, EthTransactions, }, @@ -29,11 +28,10 @@ use reth_rpc_types::{ trace::{filter::TraceFilter, parity::*}, BlockError, BlockOverrides, CallRequest, Index, TransactionInfo, }; -use reth_tasks::TaskSpawner; use revm::{db::CacheDB, primitives::Env}; use revm_primitives::{db::DatabaseCommit, ExecutionResult, ResultAndState}; -use std::{collections::HashSet, future::Future, sync::Arc}; -use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit}; +use std::{collections::HashSet, sync::Arc}; +use tokio::sync::{AcquireError, OwnedSemaphorePermit}; /// `trace` API implementation. /// @@ -51,20 +49,8 @@ impl TraceApi { } /// Create a new instance of the [TraceApi] - pub fn new( - provider: Provider, - eth_api: Eth, - eth_cache: EthStateCache, - task_spawner: Box, - tracing_call_guard: TracingCallGuard, - ) -> Self { - let inner = Arc::new(TraceApiInner { - provider, - eth_api, - eth_cache, - task_spawner, - tracing_call_guard, - }); + pub fn new(provider: Provider, eth_api: Eth, tracing_call_guard: TracingCallGuard) -> Self { + let inner = Arc::new(TraceApiInner { provider, eth_api, tracing_call_guard }); Self { inner } } @@ -83,23 +69,6 @@ where Provider: BlockReader + StateProviderFactory + EvmEnvProvider + ChainSpecProvider + 'static, Eth: EthTransactions + 'static, { - /// Executes the future on a new blocking task. - async fn on_blocking_task(&self, c: C) -> EthResult - where - C: FnOnce(Self) -> F, - F: Future> + Send + 'static, - R: Send + 'static, - { - let (tx, rx) = oneshot::channel(); - let this = self.clone(); - let f = c(this); - self.inner.task_spawner.spawn_blocking(Box::pin(async move { - let res = f.await; - let _ = tx.send(res); - })); - rx.await.map_err(|_| EthApiError::InternalTracingError)? - } - /// Executes the given call and returns a number of possible traces for it. pub async fn trace_call( &self, @@ -108,43 +77,23 @@ where block_id: Option, state_overrides: Option, block_overrides: Option>, - ) -> EthResult { - self.on_blocking_task(|this| async move { - this.try_trace_call( - call, - trace_types, - block_id, - EvmOverrides::new(state_overrides, block_overrides), - ) - .await - }) - .await - } - - async fn try_trace_call( - &self, - call: CallRequest, - trace_types: HashSet, - block_id: Option, - overrides: EvmOverrides, ) -> EthResult { let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)); let config = tracing_config(&trace_types); + let overrides = EvmOverrides::new(state_overrides, block_overrides); let mut inspector = TracingInspector::new(config); - - let (res, _, db) = self - .inner + self.inner .eth_api - .inspect_call_at_and_return_state(call, at, overrides, &mut inspector) - .await?; - - let trace_res = inspector.into_parity_builder().into_trace_results_with_state( - res, - &trace_types, - &db, - )?; - - Ok(trace_res) + .spawn_with_call_at(call, at, overrides, move |db, env| { + let (res, _, db) = inspect_and_return_db(db, env, &mut inspector)?; + let trace_res = inspector.into_parity_builder().into_trace_results_with_state( + res, + &trace_types, + &db, + )?; + Ok(trace_res) + }) + .await } /// Traces a call to `eth_sendRawTransaction` without making the call, returning the traces. @@ -168,7 +117,7 @@ where self.inner .eth_api - .trace_at_with_state(env, config, at, move |inspector, res, db| { + .spawn_trace_at_with_state(env, config, at, move |inspector, res, db| { Ok(inspector.into_parity_builder().into_trace_results_with_state( res, &trace_types, @@ -190,10 +139,11 @@ where let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Pending)); let (cfg, block_env, at) = self.inner.eth_api.evm_env_at(at).await?; - self.on_blocking_task(|this| async move { - let gas_limit = this.inner.eth_api.call_gas_limit(); - // execute all transactions on top of each other and record the traces - this.inner.eth_api.with_state_at_block(at, move |state| { + let gas_limit = self.inner.eth_api.call_gas_limit(); + // execute all transactions on top of each other and record the traces + self.inner + .eth_api + .spawn_with_state_at_block(at, move |state| { let mut results = Vec::with_capacity(calls.len()); let mut db = SubState::new(State::new(state)); @@ -239,8 +189,7 @@ where Ok(results) }) - }) - .await + .await } /// Replays a transaction, returning the traces. @@ -364,48 +313,46 @@ where let block_hash = block.hash; let transactions = block.body; - self.on_blocking_task(|this| async move { - // replay all transactions of the block - this.inner - .eth_api - .with_state_at_block(state_at.into(), move |state| { - let mut results = Vec::with_capacity(transactions.len()); - let mut db = SubState::new(State::new(state)); - - let mut transactions = transactions.into_iter().enumerate().peekable(); - - while let Some((idx, tx)) = transactions.next() { - let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?; - let tx_info = TransactionInfo { - hash: Some(tx.hash()), - index: Some(idx as u64), - block_hash: Some(block_hash), - block_number: Some(block_env.number.try_into().unwrap_or(u64::MAX)), - base_fee: Some(block_env.basefee.try_into().unwrap_or(u64::MAX)), - }; - - let tx = tx_env_with_recovered(&tx); - let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx }; - - let mut inspector = TracingInspector::new(config); - let (res, _) = inspect(&mut db, env, &mut inspector)?; - let ResultAndState { result, state } = res; - results.push(f(tx_info, inspector, result, &state, &db)?); - - // need to apply the state changes of this transaction before executing the - // next transaction - if transactions.peek().is_some() { - // need to apply the state changes of this transaction before executing - // the next transaction - db.commit(state) - } + // replay all transactions of the block + self.inner + .eth_api + .spawn_with_state_at_block(state_at.into(), move |state| { + let mut results = Vec::with_capacity(transactions.len()); + let mut db = SubState::new(State::new(state)); + + let mut transactions = transactions.into_iter().enumerate().peekable(); + + while let Some((idx, tx)) = transactions.next() { + let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?; + let tx_info = TransactionInfo { + hash: Some(tx.hash()), + index: Some(idx as u64), + block_hash: Some(block_hash), + block_number: Some(block_env.number.try_into().unwrap_or(u64::MAX)), + base_fee: Some(block_env.basefee.try_into().unwrap_or(u64::MAX)), + }; + + let tx = tx_env_with_recovered(&tx); + let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx }; + + let mut inspector = TracingInspector::new(config); + let (res, _) = inspect(&mut db, env, &mut inspector)?; + let ResultAndState { result, state } = res; + results.push(f(tx_info, inspector, result, &state, &db)?); + + // need to apply the state changes of this transaction before executing the + // next transaction + if transactions.peek().is_some() { + // need to apply the state changes of this transaction before executing + // the next transaction + db.commit(state) } + } - Ok(results) - }) - .map(Some) - }) - .await + Ok(results) + }) + .await + .map(Some) } /// Returns traces created at given block. @@ -619,11 +566,6 @@ struct TraceApiInner { provider: Provider, /// Access to commonly used code of the `eth` namespace eth_api: Eth, - /// The async cache frontend for eth-related data - #[allow(unused)] // we need this for trace_filter eventually - eth_cache: EthStateCache, - /// The type that can spawn tasks which would otherwise be blocking. - task_spawner: Box, // restrict the number of concurrent calls to `trace_*` tracing_call_guard: TracingCallGuard, } From 5828110703297949f990bcea3895f2545bbefbb0 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 25 Jul 2023 19:34:36 +0200 Subject: [PATCH 6/8] clippy happy --- crates/rpc/rpc-builder/src/lib.rs | 2 +- crates/rpc/rpc/src/eth/api/transactions.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 7f301226d61..bb88fb670b6 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -886,9 +886,9 @@ where ) -> Vec { let EthHandlers { api: eth_api, - cache: eth_cache, filter: eth_filter, pubsub: eth_pubsub, + cache: _, tracing_call_pool: _, } = self.with_eth(|eth| eth.clone()); diff --git a/crates/rpc/rpc/src/eth/api/transactions.rs b/crates/rpc/rpc/src/eth/api/transactions.rs index 82c01c826dd..bd92e778528 100644 --- a/crates/rpc/rpc/src/eth/api/transactions.rs +++ b/crates/rpc/rpc/src/eth/api/transactions.rs @@ -42,7 +42,8 @@ pub(crate) type StateCacheDB<'r> = CacheDB>>; /// Commonly used transaction related functions for the [EthApi] type in the `eth_` namespace. /// -/// Async functions that are spawned onto the [TracingCallPool] begin with `spawn_` +/// Async functions that are spawned onto the +/// [TracingCallPool](crate::tracing_call::TracingCallPool) begin with `spawn_` #[async_trait::async_trait] pub trait EthTransactions: Send + Sync { /// Returns default gas limit to use for `eth_call` and tracing RPC methods. From c36ec69c90e5c93eba81c848b05280a098d06a88 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 26 Jul 2023 18:22:53 +0200 Subject: [PATCH 7/8] remove rayon --- Cargo.lock | 1 + crates/rpc/rpc-builder/Cargo.toml | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 119ae278d57..53b1bb362e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5742,6 +5742,7 @@ version = "0.1.0-alpha.4" dependencies = [ "hyper", "jsonrpsee", + "pin-project", "reth-beacon-consensus", "reth-interfaces", "reth-ipc", diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index aeb1d300d23..2eec168bc5a 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -34,7 +34,6 @@ strum = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] } thiserror.workspace = true tracing.workspace = true -rayon.workspace = true pin-project.workspace = true tokio = { workspace = true, features = ["sync"] } From 2a50fba3317dc3cc6709e9fcf032e5edd27c9e22 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 26 Jul 2023 18:23:42 +0200 Subject: [PATCH 8/8] remove unused --- Cargo.lock | 1 - crates/rpc/rpc-builder/Cargo.toml | 2 -- 2 files changed, 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53b1bb362e5..119ae278d57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5742,7 +5742,6 @@ version = "0.1.0-alpha.4" dependencies = [ "hyper", "jsonrpsee", - "pin-project", "reth-beacon-consensus", "reth-interfaces", "reth-ipc", diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index 2eec168bc5a..7f8869e64de 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -34,8 +34,6 @@ strum = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] } thiserror.workspace = true tracing.workspace = true -pin-project.workspace = true -tokio = { workspace = true, features = ["sync"] } [dev-dependencies] reth-tracing = { path = "../../tracing" }