diff --git a/crates/ethereum/node/Cargo.toml b/crates/ethereum/node/Cargo.toml index f709fd62837..6b6579820ba 100644 --- a/crates/ethereum/node/Cargo.toml +++ b/crates/ethereum/node/Cargo.toml @@ -45,6 +45,9 @@ alloy-network.workspace = true alloy-rpc-types-eth.workspace = true alloy-rpc-types-engine.workspace = true +# async +tokio.workspace = true + # revm with required ethereum features # Note: this must be kept to ensure all features are properly enabled/forwarded revm = { workspace = true, features = ["secp256k1", "blst", "c-kzg"] } diff --git a/crates/ethereum/node/src/node.rs b/crates/ethereum/node/src/node.rs index 511394e8407..dee30efe006 100644 --- a/crates/ethereum/node/src/node.rs +++ b/crates/ethereum/node/src/node.rs @@ -254,6 +254,14 @@ where let Self { inner } = self; EthereumAddOns::new(inner.with_rpc_middleware(rpc_middleware)) } + + /// Sets the tokio runtime for the RPC servers. + /// + /// Caution: This runtime must not be created from within asynchronous context. + pub fn with_tokio_runtime(self, tokio_runtime: Option) -> Self { + let Self { inner } = self; + Self { inner: inner.with_tokio_runtime(tokio_runtime) } + } } impl NodeAddOns diff --git a/crates/ethereum/node/tests/it/builder.rs b/crates/ethereum/node/tests/it/builder.rs index 4e619f5f3d0..48f1e0da2fb 100644 --- a/crates/ethereum/node/tests/it/builder.rs +++ b/crates/ethereum/node/tests/it/builder.rs @@ -72,6 +72,44 @@ async fn test_eth_launcher() { }); } +#[test] +fn test_eth_launcher_with_tokio_runtime() { + // #[tokio::test] can not be used here because we need to create a custom tokio runtime + // and it would be dropped before the test is finished, resulting in a panic. + let main_rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime"); + + let custom_rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime"); + + main_rt.block_on(async { + let tasks = TaskManager::current(); + let config = NodeConfig::test(); + let db = create_test_rw_db(); + let _builder = + NodeBuilder::new(config) + .with_database(db) + .with_launch_context(tasks.executor()) + .with_types_and_provider::>>, + >>() + .with_components(EthereumNode::components()) + .with_add_ons( + EthereumAddOns::default().with_tokio_runtime(Some(custom_rt.handle().clone())), + ) + .apply(|builder| { + let _ = builder.db(); + builder + }) + .launch_with_fn(|builder| { + let launcher = EngineNodeLauncher::new( + tasks.executor(), + builder.config().datadir(), + Default::default(), + ); + builder.launch_with(launcher) + }); + }); +} + #[test] fn test_node_setup() { let config = NodeConfig::test(); diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index b6b6f344c8a..ea6022a50d6 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -445,6 +445,8 @@ pub struct RpcAddOns< /// This middleware is applied to all RPC requests across all transports (HTTP, WS, IPC). /// See [`RpcAddOns::with_rpc_middleware`] for more details. rpc_middleware: RpcMiddleware, + /// Optional custom tokio runtime for the RPC server. + tokio_runtime: Option, } impl Debug @@ -488,6 +490,7 @@ where engine_api_builder, engine_validator_builder, rpc_middleware, + tokio_runtime: None, } } @@ -502,6 +505,7 @@ where payload_validator_builder, engine_validator_builder, rpc_middleware, + tokio_runtime, .. } = self; RpcAddOns { @@ -511,6 +515,7 @@ where engine_api_builder, engine_validator_builder, rpc_middleware, + tokio_runtime, } } @@ -525,6 +530,7 @@ where engine_api_builder, engine_validator_builder, rpc_middleware, + tokio_runtime, .. } = self; RpcAddOns { @@ -534,6 +540,7 @@ where engine_api_builder, engine_validator_builder, rpc_middleware, + tokio_runtime, } } @@ -548,6 +555,7 @@ where payload_validator_builder, engine_api_builder, rpc_middleware, + tokio_runtime, .. } = self; RpcAddOns { @@ -557,6 +565,7 @@ where engine_api_builder, engine_validator_builder, rpc_middleware, + tokio_runtime, } } @@ -608,6 +617,7 @@ where payload_validator_builder, engine_api_builder, engine_validator_builder, + tokio_runtime, .. } = self; RpcAddOns { @@ -617,6 +627,31 @@ where engine_api_builder, engine_validator_builder, rpc_middleware, + tokio_runtime, + } + } + + /// Sets the tokio runtime for the RPC servers. + /// + /// Caution: This runtime must not be created from within asynchronous context. + pub fn with_tokio_runtime(self, tokio_runtime: Option) -> Self { + let Self { + hooks, + eth_api_builder, + payload_validator_builder, + engine_validator_builder, + engine_api_builder, + rpc_middleware, + .. + } = self; + Self { + hooks, + eth_api_builder, + payload_validator_builder, + engine_validator_builder, + engine_api_builder, + rpc_middleware, + tokio_runtime, } } @@ -632,6 +667,7 @@ where engine_api_builder, engine_validator_builder, rpc_middleware, + tokio_runtime, } = self; let rpc_middleware = Stack::new(rpc_middleware, layer); RpcAddOns { @@ -641,6 +677,7 @@ where engine_api_builder, engine_validator_builder, rpc_middleware, + tokio_runtime, } } @@ -717,6 +754,7 @@ where F: FnOnce(RpcModuleContainer<'_, N, EthB::EthApi>) -> eyre::Result<()>, { let rpc_middleware = self.rpc_middleware.clone(); + let tokio_runtime = self.tokio_runtime.clone(); let setup_ctx = self.setup_rpc_components(ctx, ext).await?; let RpcSetupContext { node, @@ -730,7 +768,11 @@ where engine_handle, } = setup_ctx; - let server_config = config.rpc.rpc_server_config().set_rpc_middleware(rpc_middleware); + let server_config = config + .rpc + .rpc_server_config() + .set_rpc_middleware(rpc_middleware) + .with_tokio_runtime(tokio_runtime); let rpc_server_handle = Self::launch_rpc_server_internal(server_config, &modules).await?; let handles = @@ -783,6 +825,7 @@ where F: FnOnce(RpcModuleContainer<'_, N, EthB::EthApi>) -> eyre::Result<()>, { let rpc_middleware = self.rpc_middleware.clone(); + let tokio_runtime = self.tokio_runtime.clone(); let setup_ctx = self.setup_rpc_components(ctx, ext).await?; let RpcSetupContext { node, @@ -796,7 +839,11 @@ where engine_handle, } = setup_ctx; - let server_config = config.rpc.rpc_server_config().set_rpc_middleware(rpc_middleware); + let server_config = config + .rpc + .rpc_server_config() + .set_rpc_middleware(rpc_middleware) + .with_tokio_runtime(tokio_runtime); let (rpc, auth) = if disable_auth { // Only launch the RPC server, use a noop auth handle diff --git a/crates/optimism/node/Cargo.toml b/crates/optimism/node/Cargo.toml index da5f4675efc..253bb33ca7f 100644 --- a/crates/optimism/node/Cargo.toml +++ b/crates/optimism/node/Cargo.toml @@ -57,6 +57,9 @@ alloy-rpc-types-engine.workspace = true alloy-rpc-types-eth.workspace = true alloy-consensus.workspace = true +# async +tokio.workspace = true + # misc clap.workspace = true serde.workspace = true @@ -65,7 +68,6 @@ eyre.workspace = true # test-utils dependencies reth-e2e-test-utils = { workspace = true, optional = true } alloy-genesis = { workspace = true, optional = true } -tokio = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } [dev-dependencies] @@ -99,7 +101,6 @@ test-utils = [ "reth-tasks", "reth-e2e-test-utils", "alloy-genesis", - "tokio", "serde_json", "reth-node-builder/test-utils", "reth-chainspec/test-utils", diff --git a/crates/optimism/node/src/node.rs b/crates/optimism/node/src/node.rs index 3c238aaf4a8..8107e390b4c 100644 --- a/crates/optimism/node/src/node.rs +++ b/crates/optimism/node/src/node.rs @@ -658,6 +658,8 @@ pub struct OpAddOnsBuilder { min_suggested_priority_fee: u64, /// RPC middleware to use rpc_middleware: RpcMiddleware, + /// Optional tokio runtime to use for the RPC server. + tokio_runtime: Option, } impl Default for OpAddOnsBuilder { @@ -671,6 +673,7 @@ impl Default for OpAddOnsBuilder { min_suggested_priority_fee: 1_000_000, _nt: PhantomData, rpc_middleware: Identity::new(), + tokio_runtime: None, } } } @@ -712,6 +715,14 @@ impl OpAddOnsBuilder { self } + /// Configures a custom tokio runtime for the RPC server. + /// + /// Caution: This runtime must not be created from within asynchronous context. + pub fn with_tokio_runtime(mut self, tokio_runtime: Option) -> Self { + self.tokio_runtime = tokio_runtime; + self + } + /// Configure the RPC middleware to use pub fn with_rpc_middleware(self, rpc_middleware: T) -> OpAddOnsBuilder { let Self { @@ -721,6 +732,7 @@ impl OpAddOnsBuilder { da_config, enable_tx_conditional, min_suggested_priority_fee, + tokio_runtime, _nt, .. } = self; @@ -733,6 +745,7 @@ impl OpAddOnsBuilder { min_suggested_priority_fee, _nt, rpc_middleware, + tokio_runtime, } } } @@ -757,6 +770,7 @@ impl OpAddOnsBuilder { min_suggested_priority_fee, historical_rpc, rpc_middleware, + tokio_runtime, .. } = self; @@ -770,7 +784,8 @@ impl OpAddOnsBuilder { EB::default(), EVB::default(), rpc_middleware, - ), + ) + .with_tokio_runtime(tokio_runtime), da_config.unwrap_or_default(), sequencer_url, sequencer_headers, diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index ce3c12839cb..76e889eec63 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1192,7 +1192,8 @@ impl RpcServerConfig { } /// Configures a custom tokio runtime for the rpc server. - pub fn with_tokio_runtime(mut self, tokio_runtime: tokio::runtime::Handle) -> Self { + pub fn with_tokio_runtime(mut self, tokio_runtime: Option) -> Self { + let Some(tokio_runtime) = tokio_runtime else { return self }; if let Some(http_server_config) = self.http_server_config { self.http_server_config = Some(http_server_config.custom_tokio_runtime(tokio_runtime.clone()));