Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/ethereum/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ alloy-network.workspace = true
alloy-rpc-types-eth.workspace = true
alloy-rpc-types-engine.workspace = true

# async
tokio.workspace = true

# revm with required ethereum features
# Note: this must be kept to ensure all features are properly enabled/forwarded
revm = { workspace = true, features = ["secp256k1", "blst", "c-kzg"] }
Expand Down
8 changes: 8 additions & 0 deletions crates/ethereum/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ where
let Self { inner } = self;
EthereumAddOns::new(inner.with_rpc_middleware(rpc_middleware))
}

/// Sets the tokio runtime for the RPC servers.
///
/// Caution: This runtime must not be created from within asynchronous context.
pub fn with_tokio_runtime(self, tokio_runtime: Option<tokio::runtime::Handle>) -> Self {
let Self { inner } = self;
Self { inner: inner.with_tokio_runtime(tokio_runtime) }
}
}

impl<N, EthB, PVB, EB, EVB, RpcMiddleware> NodeAddOns<N>
Expand Down
38 changes: 38 additions & 0 deletions crates/ethereum/node/tests/it/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,44 @@ async fn test_eth_launcher() {
});
}

#[test]
fn test_eth_launcher_with_tokio_runtime() {
// #[tokio::test] can not be used here because we need to create a custom tokio runtime
// and it would be dropped before the test is finished, resulting in a panic.
Comment on lines +77 to +78
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for flagging this, I also added this to the addons fns

let main_rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");

let custom_rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");

main_rt.block_on(async {
let tasks = TaskManager::current();
let config = NodeConfig::test();
let db = create_test_rw_db();
let _builder =
NodeBuilder::new(config)
.with_database(db)
.with_launch_context(tasks.executor())
.with_types_and_provider::<EthereumNode, BlockchainProvider<
NodeTypesWithDBAdapter<EthereumNode, Arc<TempDatabase<DatabaseEnv>>>,
>>()
.with_components(EthereumNode::components())
.with_add_ons(
EthereumAddOns::default().with_tokio_runtime(Some(custom_rt.handle().clone())),
)
.apply(|builder| {
let _ = builder.db();
builder
})
.launch_with_fn(|builder| {
let launcher = EngineNodeLauncher::new(
tasks.executor(),
builder.config().datadir(),
Default::default(),
);
builder.launch_with(launcher)
});
});
}

#[test]
fn test_node_setup() {
let config = NodeConfig::test();
Expand Down
51 changes: 49 additions & 2 deletions crates/node/builder/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ pub struct RpcAddOns<
/// This middleware is applied to all RPC requests across all transports (HTTP, WS, IPC).
/// See [`RpcAddOns::with_rpc_middleware`] for more details.
rpc_middleware: RpcMiddleware,
/// Optional custom tokio runtime for the RPC server.
tokio_runtime: Option<tokio::runtime::Handle>,
}

impl<Node, EthB, PVB, EB, EVB, RpcMiddleware> Debug
Expand Down Expand Up @@ -488,6 +490,7 @@ where
engine_api_builder,
engine_validator_builder,
rpc_middleware,
tokio_runtime: None,
}
}

Expand All @@ -502,6 +505,7 @@ where
payload_validator_builder,
engine_validator_builder,
rpc_middleware,
tokio_runtime,
..
} = self;
RpcAddOns {
Expand All @@ -511,6 +515,7 @@ where
engine_api_builder,
engine_validator_builder,
rpc_middleware,
tokio_runtime,
}
}

Expand All @@ -525,6 +530,7 @@ where
engine_api_builder,
engine_validator_builder,
rpc_middleware,
tokio_runtime,
..
} = self;
RpcAddOns {
Expand All @@ -534,6 +540,7 @@ where
engine_api_builder,
engine_validator_builder,
rpc_middleware,
tokio_runtime,
}
}

Expand All @@ -548,6 +555,7 @@ where
payload_validator_builder,
engine_api_builder,
rpc_middleware,
tokio_runtime,
..
} = self;
RpcAddOns {
Expand All @@ -557,6 +565,7 @@ where
engine_api_builder,
engine_validator_builder,
rpc_middleware,
tokio_runtime,
}
}

Expand Down Expand Up @@ -608,6 +617,7 @@ where
payload_validator_builder,
engine_api_builder,
engine_validator_builder,
tokio_runtime,
..
} = self;
RpcAddOns {
Expand All @@ -617,6 +627,31 @@ where
engine_api_builder,
engine_validator_builder,
rpc_middleware,
tokio_runtime,
}
}

/// Sets the tokio runtime for the RPC servers.
///
/// Caution: This runtime must not be created from within asynchronous context.
pub fn with_tokio_runtime(self, tokio_runtime: Option<tokio::runtime::Handle>) -> Self {
let Self {
hooks,
eth_api_builder,
payload_validator_builder,
engine_validator_builder,
engine_api_builder,
rpc_middleware,
..
} = self;
Self {
hooks,
eth_api_builder,
payload_validator_builder,
engine_validator_builder,
engine_api_builder,
rpc_middleware,
tokio_runtime,
}
}

Expand All @@ -632,6 +667,7 @@ where
engine_api_builder,
engine_validator_builder,
rpc_middleware,
tokio_runtime,
} = self;
let rpc_middleware = Stack::new(rpc_middleware, layer);
RpcAddOns {
Expand All @@ -641,6 +677,7 @@ where
engine_api_builder,
engine_validator_builder,
rpc_middleware,
tokio_runtime,
}
}

Expand Down Expand Up @@ -717,6 +754,7 @@ where
F: FnOnce(RpcModuleContainer<'_, N, EthB::EthApi>) -> eyre::Result<()>,
{
let rpc_middleware = self.rpc_middleware.clone();
let tokio_runtime = self.tokio_runtime.clone();
let setup_ctx = self.setup_rpc_components(ctx, ext).await?;
let RpcSetupContext {
node,
Expand All @@ -730,7 +768,11 @@ where
engine_handle,
} = setup_ctx;

let server_config = config.rpc.rpc_server_config().set_rpc_middleware(rpc_middleware);
let server_config = config
.rpc
.rpc_server_config()
.set_rpc_middleware(rpc_middleware)
.with_tokio_runtime(tokio_runtime);
let rpc_server_handle = Self::launch_rpc_server_internal(server_config, &modules).await?;

let handles =
Expand Down Expand Up @@ -783,6 +825,7 @@ where
F: FnOnce(RpcModuleContainer<'_, N, EthB::EthApi>) -> eyre::Result<()>,
{
let rpc_middleware = self.rpc_middleware.clone();
let tokio_runtime = self.tokio_runtime.clone();
let setup_ctx = self.setup_rpc_components(ctx, ext).await?;
let RpcSetupContext {
node,
Expand All @@ -796,7 +839,11 @@ where
engine_handle,
} = setup_ctx;

let server_config = config.rpc.rpc_server_config().set_rpc_middleware(rpc_middleware);
let server_config = config
.rpc
.rpc_server_config()
.set_rpc_middleware(rpc_middleware)
.with_tokio_runtime(tokio_runtime);

let (rpc, auth) = if disable_auth {
// Only launch the RPC server, use a noop auth handle
Expand Down
5 changes: 3 additions & 2 deletions crates/optimism/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ alloy-rpc-types-engine.workspace = true
alloy-rpc-types-eth.workspace = true
alloy-consensus.workspace = true

# async
tokio.workspace = true

# misc
clap.workspace = true
serde.workspace = true
Expand All @@ -65,7 +68,6 @@ eyre.workspace = true
# test-utils dependencies
reth-e2e-test-utils = { workspace = true, optional = true }
alloy-genesis = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }

[dev-dependencies]
Expand Down Expand Up @@ -99,7 +101,6 @@ test-utils = [
"reth-tasks",
"reth-e2e-test-utils",
"alloy-genesis",
"tokio",
"serde_json",
"reth-node-builder/test-utils",
"reth-chainspec/test-utils",
Expand Down
17 changes: 16 additions & 1 deletion crates/optimism/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,8 @@ pub struct OpAddOnsBuilder<NetworkT, RpcMiddleware = Identity> {
min_suggested_priority_fee: u64,
/// RPC middleware to use
rpc_middleware: RpcMiddleware,
/// Optional tokio runtime to use for the RPC server.
tokio_runtime: Option<tokio::runtime::Handle>,
}

impl<NetworkT> Default for OpAddOnsBuilder<NetworkT> {
Expand All @@ -671,6 +673,7 @@ impl<NetworkT> Default for OpAddOnsBuilder<NetworkT> {
min_suggested_priority_fee: 1_000_000,
_nt: PhantomData,
rpc_middleware: Identity::new(),
tokio_runtime: None,
}
}
}
Expand Down Expand Up @@ -712,6 +715,14 @@ impl<NetworkT, RpcMiddleware> OpAddOnsBuilder<NetworkT, RpcMiddleware> {
self
}

/// Configures a custom tokio runtime for the RPC server.
///
/// Caution: This runtime must not be created from within asynchronous context.
pub fn with_tokio_runtime(mut self, tokio_runtime: Option<tokio::runtime::Handle>) -> Self {
self.tokio_runtime = tokio_runtime;
self
}

/// Configure the RPC middleware to use
pub fn with_rpc_middleware<T>(self, rpc_middleware: T) -> OpAddOnsBuilder<NetworkT, T> {
let Self {
Expand All @@ -721,6 +732,7 @@ impl<NetworkT, RpcMiddleware> OpAddOnsBuilder<NetworkT, RpcMiddleware> {
da_config,
enable_tx_conditional,
min_suggested_priority_fee,
tokio_runtime,
_nt,
..
} = self;
Expand All @@ -733,6 +745,7 @@ impl<NetworkT, RpcMiddleware> OpAddOnsBuilder<NetworkT, RpcMiddleware> {
min_suggested_priority_fee,
_nt,
rpc_middleware,
tokio_runtime,
}
}
}
Expand All @@ -757,6 +770,7 @@ impl<NetworkT, RpcMiddleware> OpAddOnsBuilder<NetworkT, RpcMiddleware> {
min_suggested_priority_fee,
historical_rpc,
rpc_middleware,
tokio_runtime,
..
} = self;

Expand All @@ -770,7 +784,8 @@ impl<NetworkT, RpcMiddleware> OpAddOnsBuilder<NetworkT, RpcMiddleware> {
EB::default(),
EVB::default(),
rpc_middleware,
),
)
.with_tokio_runtime(tokio_runtime),
da_config.unwrap_or_default(),
sequencer_url,
sequencer_headers,
Expand Down
3 changes: 2 additions & 1 deletion crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,8 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
}

/// Configures a custom tokio runtime for the rpc server.
pub fn with_tokio_runtime(mut self, tokio_runtime: tokio::runtime::Handle) -> Self {
pub fn with_tokio_runtime(mut self, tokio_runtime: Option<tokio::runtime::Handle>) -> Self {
let Some(tokio_runtime) = tokio_runtime else { return self };
if let Some(http_server_config) = self.http_server_config {
self.http_server_config =
Some(http_server_config.custom_tokio_runtime(tokio_runtime.clone()));
Expand Down
Loading