diff --git a/Cargo.lock b/Cargo.lock index d2046ac0615..0e6d9d77e51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4950,6 +4950,7 @@ dependencies = [ "reth-rpc-api", "reth-rpc-engine-api", "reth-rpc-types", + "reth-tasks", "reth-tracing", "reth-transaction-pool", "serde", diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index 0afb2a016ea..54c16749f0f 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -11,6 +11,7 @@ use reth_rpc_builder::{ RpcServerHandle, ServerBuilder, TransportRpcModuleConfig, }; use reth_rpc_engine_api::EngineApiHandle; +use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -103,11 +104,12 @@ impl RpcServerArgs { } /// Convenience function for starting a rpc server with configs which extracted from cli args. - pub(crate) async fn start_rpc_server( + pub(crate) async fn start_rpc_server( &self, client: Client, pool: Pool, network: Network, + executor: Tasks, ) -> Result where Client: BlockProvider @@ -115,9 +117,11 @@ impl RpcServerArgs { + StateProviderFactory + EvmEnvProvider + Clone + + Unpin + 'static, Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, + Tasks: TaskSpawner + Clone + 'static, { reth_rpc_builder::launch( client, @@ -125,16 +129,18 @@ impl RpcServerArgs { network, self.transport_rpc_module_config(), self.rpc_server_config(), + executor, ) .await } /// Create Engine API server. - pub(crate) async fn start_auth_server( + pub(crate) async fn start_auth_server( &self, client: Client, pool: Pool, network: Network, + executor: Tasks, handle: EngineApiHandle, ) -> Result where @@ -143,16 +149,27 @@ impl RpcServerArgs { + StateProviderFactory + EvmEnvProvider + Clone + + Unpin + 'static, Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, + Tasks: TaskSpawner + Clone + 'static, { let socket_address = SocketAddr::new( self.auth_addr.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)), self.auth_port.unwrap_or(constants::DEFAULT_AUTH_PORT), ); let secret = self.jwt_secret().map_err(|err| RpcError::Custom(err.to_string()))?; - reth_rpc_builder::auth::launch(client, pool, network, handle, socket_address, secret).await + reth_rpc_builder::auth::launch( + client, + pool, + network, + executor, + handle, + socket_address, + secret, + ) + .await } /// Creates the [TransportRpcModuleConfig] from cli args. diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 082d2c5cbba..1c8753b6082 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -160,7 +160,12 @@ impl Command { let _rpc_server = self .rpc - .start_rpc_server(shareable_db.clone(), test_transaction_pool.clone(), network.clone()) + .start_rpc_server( + shareable_db.clone(), + test_transaction_pool.clone(), + network.clone(), + ctx.task_executor.clone(), + ) .await?; info!(target: "reth::cli", "Started RPC server"); @@ -174,6 +179,7 @@ impl Command { shareable_db, test_transaction_pool, network.clone(), + ctx.task_executor.clone(), engine_api_handle, ) .await?; diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index 67e32a50adc..6f9e41a588a 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -15,6 +15,7 @@ reth-rpc = { path = "../rpc" } reth-rpc-api = { path = "../rpc-api" } reth-rpc-engine-api = { path = "../rpc-engine-api" } reth-rpc-types = { path = "../rpc-types" } +reth-tasks = { path = "../../tasks" } reth-transaction-pool = { path = "../../transaction-pool" } jsonrpsee = { version = "0.16", features = ["server"] } diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 4bccd977a1f..48ca32d922a 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -14,11 +14,12 @@ pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint}; use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory}; use reth_rpc::{ - AdminApi, AuthLayer, DebugApi, EngineApi, EthApi, JwtAuthValidator, JwtSecret, NetApi, - TraceApi, Web3Api, + eth::cache::EthStateCache, AdminApi, AuthLayer, DebugApi, EngineApi, EthApi, JwtAuthValidator, + JwtSecret, NetApi, TraceApi, Web3Api, }; use reth_rpc_api::servers::*; use reth_rpc_engine_api::EngineApiHandle; +use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; use serde::{Deserialize, Serialize, Serializer}; use std::{ @@ -31,22 +32,32 @@ use strum::{AsRefStr, EnumString, EnumVariantNames, ParseError, VariantNames}; use tower::layer::util::{Identity, Stack}; use tower_http::cors::{AllowOrigin, Any, CorsLayer}; -/// Configure and launch an auth server with `engine` and `eth` namespaces. -pub async fn launch( +/// Configure and launch an auth server with `engine` and a _new_ `eth` namespace. +pub async fn launch( client: Client, pool: Pool, network: Network, + executor: Tasks, handle: EngineApiHandle, socket_addr: SocketAddr, secret: JwtSecret, ) -> Result where - Client: - BlockProvider + HeaderProvider + StateProviderFactory + EvmEnvProvider + Clone + 'static, + Client: BlockProvider + + HeaderProvider + + StateProviderFactory + + EvmEnvProvider + + Clone + + Unpin + + 'static, Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, + Tasks: TaskSpawner + Clone + 'static, { - launch_with_eth_api(EthApi::new(client, pool, network), handle, socket_addr, secret).await + // spawn a new cache task + let eth_cache = EthStateCache::spawn_with(client.clone(), Default::default(), executor); + launch_with_eth_api(EthApi::new(client, pool, network, eth_cache), handle, socket_addr, secret) + .await } /// Configure and launch an auth server with existing EthApi implementation. @@ -57,8 +68,13 @@ pub async fn launch_with_eth_api( secret: JwtSecret, ) -> Result where - Client: - BlockProvider + HeaderProvider + StateProviderFactory + EvmEnvProvider + Clone + 'static, + Client: BlockProvider + + HeaderProvider + + StateProviderFactory + + EvmEnvProvider + + Clone + + Unpin + + 'static, Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, { @@ -71,7 +87,7 @@ where let middleware = tower::ServiceBuilder::new().layer(AuthLayer::new(JwtAuthValidator::new(secret))); - // By default both http and ws are enabled. + // By default, both http and ws are enabled. let server = ServerBuilder::new().set_middleware(middleware).build(socket_addr).await?; server.start(module) diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 1a87511228a..2f9b16dbc41 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -28,10 +28,11 @@ //! use reth_network_api::{NetworkInfo, Peers}; //! use reth_provider::{BlockProvider, HeaderProvider, StateProviderFactory, EvmEnvProvider}; //! use reth_rpc_builder::{RethRpcModule, RpcModuleBuilder, RpcServerConfig, ServerBuilder, TransportRpcModuleConfig}; +//! use reth_tasks::TokioTaskExecutor; //! use reth_transaction_pool::TransactionPool; //! pub async fn launch(client: Client, pool: Pool, network: Network) //! where -//! Client: BlockProvider + HeaderProvider + StateProviderFactory + EvmEnvProvider + Clone + 'static, +//! Client: BlockProvider + HeaderProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static, //! Pool: TransactionPool + Clone + 'static, //! Network: NetworkInfo + Peers + Clone + 'static, //! { @@ -42,7 +43,7 @@ //! RethRpcModule::Eth, //! RethRpcModule::Web3, //! ]); -//! let transport_modules = RpcModuleBuilder::new(client, pool, network).build(transports); +//! let transport_modules = RpcModuleBuilder::new(client, pool, network, TokioTaskExecutor::default()).build(transports); //! let handle = RpcServerConfig::default() //! .with_http(ServerBuilder::default()) //! .start(transport_modules) @@ -86,27 +87,36 @@ pub mod auth; /// Common RPC constants. pub mod constants; use constants::*; +use reth_rpc::eth::cache::EthStateCache; +use reth_tasks::TaskSpawner; /// Cors utilities. mod cors; /// Convenience function for starting a server in one step. -pub async fn launch( +pub async fn launch( client: Client, pool: Pool, network: Network, module_config: impl Into, server_config: impl Into, + executor: Tasks, ) -> Result where - Client: - BlockProvider + HeaderProvider + StateProviderFactory + EvmEnvProvider + Clone + 'static, + Client: BlockProvider + + HeaderProvider + + StateProviderFactory + + EvmEnvProvider + + Clone + + Unpin + + 'static, Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, + Tasks: TaskSpawner + Clone + 'static, { let module_config = module_config.into(); let server_config = server_config.into(); - RpcModuleBuilder::new(client, pool, network) + RpcModuleBuilder::new(client, pool, network, executor) .build(module_config) .start_server(server_config) .await @@ -116,57 +126,74 @@ where /// /// This is the main entrypoint for up RPC servers. #[derive(Debug)] -pub struct RpcModuleBuilder { +pub struct RpcModuleBuilder { /// The Client type to when creating all rpc handlers client: Client, /// The Pool type to when creating all rpc handlers pool: Pool, /// The Network type to when creating all rpc handlers network: Network, + /// How additional tasks are spawned, for example in the eth pubsub namespace + executor: Tasks, } // === impl RpcBuilder === -impl RpcModuleBuilder { +impl RpcModuleBuilder { /// Create a new instance of the builder - pub fn new(client: Client, pool: Pool, network: Network) -> Self { - Self { client, pool, network } + pub fn new(client: Client, pool: Pool, network: Network, executor: Tasks) -> Self { + Self { client, pool, network, executor } } /// Configure the client instance. - pub fn with_client(self, client: C) -> RpcModuleBuilder + pub fn with_client(self, client: C) -> RpcModuleBuilder where C: BlockProvider + StateProviderFactory + EvmEnvProvider + 'static, { - let Self { pool, network, .. } = self; - RpcModuleBuilder { client, network, pool } + let Self { pool, network, executor, .. } = self; + RpcModuleBuilder { client, network, pool, executor } } /// Configure the transaction pool instance. - pub fn with_pool

(self, pool: P) -> RpcModuleBuilder + pub fn with_pool

(self, pool: P) -> RpcModuleBuilder where P: TransactionPool + 'static, { - let Self { client, network, .. } = self; - RpcModuleBuilder { client, network, pool } + let Self { client, network, executor, .. } = self; + RpcModuleBuilder { client, network, pool, executor } } /// Configure the network instance. - pub fn with_network(self, network: N) -> RpcModuleBuilder + pub fn with_network(self, network: N) -> RpcModuleBuilder where N: NetworkInfo + Peers + 'static, { - let Self { client, pool, .. } = self; - RpcModuleBuilder { client, network, pool } + let Self { client, pool, executor, .. } = self; + RpcModuleBuilder { client, network, pool, executor } + } + + /// Configure the task executor to use for additional tasks. + pub fn with_executor(self, executor: T) -> RpcModuleBuilder + where + T: TaskSpawner + 'static, + { + let Self { pool, network, client, .. } = self; + RpcModuleBuilder { client, network, pool, executor } } } -impl RpcModuleBuilder +impl RpcModuleBuilder where - Client: - BlockProvider + HeaderProvider + StateProviderFactory + EvmEnvProvider + Clone + 'static, + Client: BlockProvider + + HeaderProvider + + StateProviderFactory + + EvmEnvProvider + + Clone + + Unpin + + 'static, Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, + Tasks: TaskSpawner + Clone + 'static, { /// Configures all [RpcModule]s specific to the given [TransportRpcModuleConfig] which can be /// used to start the transport server(s). @@ -175,9 +202,9 @@ where pub fn build(self, module_config: TransportRpcModuleConfig) -> TransportRpcModules<()> { let mut modules = TransportRpcModules::default(); - let Self { client, pool, network } = self; + let Self { client, pool, network, executor } = self; - let mut registry = RethModuleRegistry::new(client, pool, network); + let mut registry = RethModuleRegistry::new(client, pool, network, executor); if !module_config.is_empty() { let TransportRpcModuleConfig { http, ws, ipc } = module_config; @@ -190,9 +217,9 @@ where } } -impl Default for RpcModuleBuilder<(), (), ()> { +impl Default for RpcModuleBuilder<(), (), (), ()> { fn default() -> Self { - RpcModuleBuilder::new((), (), ()) + RpcModuleBuilder::new((), (), (), ()) } } @@ -258,11 +285,12 @@ impl RpcModuleSelection { /// Note: This will always create new instance of the module handlers and is therefor only /// recommended for launching standalone transports. If multiple transports need to be /// configured it's recommended to use the [RpcModuleBuilder]. - pub fn standalone_module( + pub fn standalone_module( &self, client: Client, pool: Pool, network: Network, + executor: Tasks, ) -> RpcModule<()> where Client: BlockProvider @@ -270,11 +298,13 @@ impl RpcModuleSelection { + StateProviderFactory + EvmEnvProvider + Clone + + Unpin + 'static, Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, + Tasks: TaskSpawner + Clone + 'static, { - let mut registry = RethModuleRegistry::new(client, pool, network); + let mut registry = RethModuleRegistry::new(client, pool, network, executor); registry.module_for(self) } @@ -354,10 +384,13 @@ impl Serialize for RethRpcModule { } /// A Helper type the holds instances of the configured modules. -pub struct RethModuleRegistry { +pub struct RethModuleRegistry { client: Client, pool: Pool, network: Network, + executor: Tasks, + /// Holds a clone of the async [EthStateCache] channel. + eth_cache: Option, /// Holds a clone of the actual [EthApi] namespace impl since this can be required by other /// namespaces eth_api: Option>, @@ -367,10 +400,18 @@ pub struct RethModuleRegistry { // === impl RethModuleRegistry === -impl RethModuleRegistry { +impl RethModuleRegistry { /// Creates a new, empty instance. - pub fn new(client: Client, pool: Pool, network: Network) -> Self { - Self { client, pool, network, eth_api: None, modules: Default::default() } + pub fn new(client: Client, pool: Pool, network: Network, executor: Tasks) -> Self { + Self { + client, + pool, + network, + eth_api: None, + executor, + modules: Default::default(), + eth_cache: None, + } } /// Returns all installed methods @@ -388,7 +429,7 @@ impl RethModuleRegistry { } } -impl RethModuleRegistry +impl RethModuleRegistry where Network: NetworkInfo + Peers + Clone + 'static, { @@ -407,12 +448,18 @@ where } } -impl RethModuleRegistry +impl RethModuleRegistry where - Client: - BlockProvider + HeaderProvider + StateProviderFactory + EvmEnvProvider + Clone + 'static, + Client: BlockProvider + + HeaderProvider + + StateProviderFactory + + EvmEnvProvider + + Clone + + Unpin + + 'static, Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, + Tasks: TaskSpawner + Clone + 'static, { /// Register Eth Namespace pub fn register_eth(&mut self) -> &mut Self { @@ -486,11 +533,28 @@ where .collect::>() } + /// Returns the [EthStateCache] frontend + /// + /// This will spawn exactly one [EthStateCache] service if this is the first time the cache is + /// requested. + pub fn eth_cache(&mut self) -> EthStateCache { + self.eth_cache + .get_or_insert_with(|| { + EthStateCache::spawn_with( + self.client.clone(), + Default::default(), + self.executor.clone(), + ) + }) + .clone() + } + /// Returns the configured [EthApi] or creates it if it does not exist yet fn eth_api(&mut self) -> EthApi { + let cache = self.eth_cache(); self.eth_api .get_or_insert_with(|| { - EthApi::new(self.client.clone(), self.pool.clone(), self.network.clone()) + EthApi::new(self.client.clone(), self.pool.clone(), self.network.clone(), cache) }) .clone() } diff --git a/crates/rpc/rpc-builder/tests/it/utils.rs b/crates/rpc/rpc-builder/tests/it/utils.rs index bc9f696591d..5e8e8527b0e 100644 --- a/crates/rpc/rpc-builder/tests/it/utils.rs +++ b/crates/rpc/rpc-builder/tests/it/utils.rs @@ -4,6 +4,7 @@ use reth_rpc_builder::{ RpcModuleBuilder, RpcModuleSelection, RpcServerConfig, RpcServerHandle, TransportRpcModuleConfig, }; +use reth_tasks::TokioTaskExecutor; use reth_transaction_pool::test_utils::{testing_pool, TestPool}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; @@ -50,9 +51,11 @@ pub async fn launch_http_ws(modules: impl Into) -> RpcServer } /// Returns an [RpcModuleBuilder] with testing components. -pub fn test_rpc_builder() -> RpcModuleBuilder { +pub fn test_rpc_builder() -> RpcModuleBuilder +{ RpcModuleBuilder::default() .with_client(NoopProvider::default()) .with_pool(testing_pool()) .with_network(NoopNetwork::default()) + .with_executor(TokioTaskExecutor::default()) } diff --git a/crates/rpc/rpc/src/eth/api/mod.rs b/crates/rpc/rpc/src/eth/api/mod.rs index 960c19d4b2f..fa3dfed1d83 100644 --- a/crates/rpc/rpc/src/eth/api/mod.rs +++ b/crates/rpc/rpc/src/eth/api/mod.rs @@ -13,7 +13,7 @@ use reth_primitives::{ use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory}; use std::num::NonZeroUsize; -use crate::eth::error::EthResult; +use crate::eth::{cache::EthStateCache, error::EthResult}; use reth_provider::providers::ChainState; use reth_rpc_types::FeeHistoryCache; use reth_transaction_pool::TransactionPool; @@ -67,8 +67,8 @@ pub struct EthApi { impl EthApi { /// Creates a new, shareable instance. - pub fn new(client: Client, pool: Pool, network: Network) -> Self { - let inner = EthApiInner { client, pool, network, signers: Default::default() }; + pub fn new(client: Client, pool: Pool, network: Network, eth_cache: EthStateCache) -> Self { + let inner = EthApiInner { client, pool, network, signers: Default::default(), eth_cache }; Self { inner: Arc::new(inner), fee_history_cache: FeeHistoryCache::new( @@ -216,4 +216,6 @@ struct EthApiInner { network: Network, /// All configured Signers signers: Vec>, + /// The async cache frontend for eth related data + eth_cache: EthStateCache, } diff --git a/crates/rpc/rpc/src/eth/api/server.rs b/crates/rpc/rpc/src/eth/api/server.rs index 1e382017837..ea8afb35564 100644 --- a/crates/rpc/rpc/src/eth/api/server.rs +++ b/crates/rpc/rpc/src/eth/api/server.rs @@ -378,6 +378,7 @@ where #[cfg(test)] mod tests { + use crate::eth::cache::EthStateCache; use jsonrpsee::{ core::{error::Error as RpcError, RpcResult}, types::error::{CallError, INVALID_PARAMS_CODE}, @@ -394,7 +395,12 @@ mod tests { #[tokio::test] /// Handler for: `eth_test_fee_history` async fn test_fee_history() { - let eth_api = EthApi::new(NoopProvider::default(), testing_pool(), NoopNetwork::default()); + let eth_api = EthApi::new( + NoopProvider::default(), + testing_pool(), + NoopNetwork::default(), + EthStateCache::spawn(NoopProvider::default(), Default::default()), + ); let response = eth_api.fee_history(1.into(), BlockNumberOrTag::Latest.into(), None).await; assert!(matches!(response, RpcResult::Err(RpcError::Call(CallError::Custom(_))))); @@ -434,7 +440,12 @@ mod tests { .push(base_fee_per_gas.map(|fee| U256::try_from(fee).unwrap()).unwrap_or_default()); } - let eth_api = EthApi::new(mock_provider, testing_pool(), NoopNetwork::default()); + let eth_api = EthApi::new( + mock_provider, + testing_pool(), + NoopNetwork::default(), + EthStateCache::spawn(NoopProvider::default(), Default::default()), + ); let response = eth_api.fee_history((newest_block + 1).into(), newest_block.into(), None).await; diff --git a/crates/rpc/rpc/src/eth/api/transactions.rs b/crates/rpc/rpc/src/eth/api/transactions.rs index e33e6459745..f861c0585b4 100644 --- a/crates/rpc/rpc/src/eth/api/transactions.rs +++ b/crates/rpc/rpc/src/eth/api/transactions.rs @@ -46,6 +46,7 @@ where #[cfg(test)] mod tests { + use crate::eth::cache::EthStateCache; use reth_primitives::{hex_literal::hex, Bytes}; use reth_provider::test_utils::NoopProvider; use reth_transaction_pool::{test_utils::testing_pool, TransactionPool}; @@ -58,7 +59,12 @@ mod tests { let pool = testing_pool(); - let eth_api = EthApi::new(noop_provider, pool.clone(), ()); + let eth_api = EthApi::new( + noop_provider, + pool.clone(), + (), + EthStateCache::spawn(NoopProvider::default(), Default::default()), + ); // https://etherscan.io/tx/0xa694b71e6c128a2ed8e2e0f6770bddbe52e3bb8f10e8472f9a79ab81497a8b5d let tx_1 = Bytes::from(hex!("02f871018303579880850555633d1b82520894eee27662c2b8eba3cd936a23f039f3189633e4c887ad591c62bdaeb180c080a07ea72c68abfb8fca1bd964f0f99132ed9280261bdca3e549546c0205e800f7d0a05b4ef3039e9c9b9babc179a1878fb825b5aaf5aed2fa8744854150157b08d6f3")); diff --git a/crates/rpc/rpc/src/eth/cache.rs b/crates/rpc/rpc/src/eth/cache.rs index edc7ed06091..adab25f93b2 100644 --- a/crates/rpc/rpc/src/eth/cache.rs +++ b/crates/rpc/rpc/src/eth/cache.rs @@ -4,9 +4,10 @@ use futures::StreamExt; use reth_interfaces::{provider::ProviderError, Result}; use reth_primitives::{Block, H256}; use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory}; -use reth_tasks::TaskSpawner; +use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use revm::primitives::{BlockEnv, CfgEnv}; use schnellru::{ByMemoryUsage, Limiter, LruMap}; +use serde::{Deserialize, Serialize}; use std::{ collections::{hash_map::Entry, HashMap}, future::Future, @@ -30,23 +31,43 @@ type BlockLruCache = MultiConsumerLruCache = MultiConsumerLruCache; +/// Settings for the [EthStateCache] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct EthStateCacheConfig { + /// Max number of bytes for cached block data. + /// + /// Default is 50MB + pub max_block_bytes: usize, + /// Max number of bytes for cached env data. + /// + /// Default is 500kb (env configs are very small) + pub max_env_bytes: usize, +} + +impl Default for EthStateCacheConfig { + fn default() -> Self { + Self { max_block_bytes: 50 * 1024 * 1024, max_env_bytes: 500 * 1024 } + } +} + /// Provides async access to cached eth data /// -/// This is the frontend to the [EthStateCacheService] which manages cached data on a different +/// This is the frontend for the async caching service which manages cached data on a different /// task. #[derive(Debug, Clone)] -pub(crate) struct EthStateCache { +pub struct EthStateCache { to_service: UnboundedSender, } impl EthStateCache { /// Creates and returns both [EthStateCache] frontend and the memory bound service. - fn create( + fn create( client: Client, - action_task_spawner: Box, + action_task_spawner: Tasks, max_block_bytes: usize, max_env_bytes: usize, - ) -> (Self, EthStateCacheService) { + ) -> (Self, EthStateCacheService) { let (to_service, rx) = unbounded_channel(); let service = EthStateCacheService { client, @@ -60,21 +81,34 @@ impl EthStateCache { (cache, service) } + /// Creates a new async LRU backed cache service task and spawns it to a new task via + /// [tokio::spawn]. + /// + /// See also [Self::spawn_with] + pub fn spawn(client: Client, config: EthStateCacheConfig) -> Self + where + Client: StateProviderFactory + BlockProvider + EvmEnvProvider + Clone + Unpin + 'static, + { + Self::spawn_with(client, config, TokioTaskExecutor::default()) + } + /// Creates a new async LRU backed cache service task and spawns it to a new task via the given /// spawner. /// /// The cache is memory limited by the given max bytes values. - pub(crate) fn spawn( + pub fn spawn_with( client: Client, - spawner: Box, - max_block_bytes: usize, - max_env_bytes: usize, + config: EthStateCacheConfig, + executor: Tasks, ) -> Self where Client: StateProviderFactory + BlockProvider + EvmEnvProvider + Clone + Unpin + 'static, + Tasks: TaskSpawner + Clone + 'static, { - let (this, service) = Self::create(client, spawner.clone(), max_block_bytes, max_env_bytes); - spawner.spawn(Box::pin(service)); + let EthStateCacheConfig { max_block_bytes, max_env_bytes } = config; + let (this, service) = + Self::create(client, executor.clone(), max_block_bytes, max_env_bytes); + executor.spawn(Box::pin(service)); this } @@ -107,7 +141,7 @@ impl EthStateCache { /// /// This type is an endless future that listens for incoming messages from the user facing /// [EthStateCache] via a channel. If the requested data is not cached then it spawns a new task -/// that does the IO and sends the result back to it. This way the [EthStateCacheService] only +/// that does the IO and sends the result back to it. This way the caching service only /// handles messages and does LRU lookups and never blocking IO. /// /// Caution: The channel for the data is _unbounded_ it is assumed that this is mainly used by the @@ -116,6 +150,7 @@ impl EthStateCache { #[must_use = "Type does nothing unless spawned"] pub(crate) struct EthStateCacheService< Client, + Tasks, LimitBlocks = ByMemoryUsage, LimitEnvs = ByMemoryUsage, > where @@ -133,12 +168,13 @@ pub(crate) struct EthStateCacheService< /// Receiver half of the action channel. action_rx: UnboundedReceiverStream, /// The type that's used to spawn tasks that do the actual work - action_task_spawner: Box, + action_task_spawner: Tasks, } -impl Future for EthStateCacheService +impl Future for EthStateCacheService where Client: StateProviderFactory + BlockProvider + EvmEnvProvider + Clone + Unpin + 'static, + Tasks: TaskSpawner + Clone + 'static, { type Output = (); diff --git a/crates/rpc/rpc/src/eth/mod.rs b/crates/rpc/rpc/src/eth/mod.rs index 0a79cc7630a..42500801a36 100644 --- a/crates/rpc/rpc/src/eth/mod.rs +++ b/crates/rpc/rpc/src/eth/mod.rs @@ -1,7 +1,7 @@ //! `eth` namespace handler implementation. mod api; -mod cache; +pub mod cache; pub(crate) mod error; mod filter; mod pubsub; diff --git a/crates/rpc/rpc/src/lib.rs b/crates/rpc/rpc/src/lib.rs index 2281d828e69..f5c60506da7 100644 --- a/crates/rpc/rpc/src/lib.rs +++ b/crates/rpc/rpc/src/lib.rs @@ -14,7 +14,7 @@ mod admin; mod debug; mod engine; -mod eth; +pub mod eth; mod layers; mod net; mod trace; diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index e9e93ac7092..3e408a1cc1f 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -66,7 +66,7 @@ pub mod shutdown; /// ``` /// /// The [TaskSpawner] trait is [DynClone] so `Box` are also `Clone`. -pub trait TaskSpawner: Send + Sync + std::fmt::Debug + DynClone { +pub trait TaskSpawner: Send + Sync + Unpin + std::fmt::Debug + DynClone { /// Spawns the task onto the runtime. /// See also [`Handle::spawn`]. fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()>;