diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index 564bef0f..00245bed 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -10,7 +10,10 @@ use alloy_primitives::B64; use eyre::{WrapErr as _, bail}; use op_alloy_rpc_types_engine::OpFlashblockPayload; use op_revm::L1BlockInfo; -use reth::revm::{State, database::StateProviderDatabase}; +use reth::{ + revm::{State, database::StateProviderDatabase}, + tasks::TaskSpawner, +}; use reth_basic_payload_builder::PayloadConfig; use reth_node_builder::Events; use reth_optimism_chainspec::OpChainSpec; @@ -30,7 +33,7 @@ use tracing::warn; /// /// In the case of a payload built by this node, it is broadcast to peers and an event is sent to the payload builder. /// In the case of a payload received from a peer, it is executed and if successful, an event is sent to the payload builder. -pub(crate) struct PayloadHandler { +pub(crate) struct PayloadHandler { // receives new payloads built by this builder. built_rx: mpsc::Receiver, // receives incoming p2p messages from peers. @@ -43,12 +46,15 @@ pub(crate) struct PayloadHandler { ctx: OpPayloadSyncerCtx, // chain client client: Client, + // task executor + task_executor: Tasks, cancel: tokio_util::sync::CancellationToken, } -impl PayloadHandler +impl PayloadHandler where Client: ClientBounds + 'static, + Tasks: TaskSpawner + Clone + Unpin + 'static, { #[allow(clippy::too_many_arguments)] pub(crate) fn new( @@ -58,6 +64,7 @@ where payload_events_handle: tokio::sync::broadcast::Sender>, ctx: OpPayloadSyncerCtx, client: Client, + task_executor: Tasks, cancel: tokio_util::sync::CancellationToken, ) -> Self { Self { @@ -67,6 +74,7 @@ where payload_events_handle, ctx, client, + task_executor, cancel, } } @@ -79,6 +87,7 @@ where payload_events_handle, ctx, client, + task_executor, cancel, } = self; @@ -104,7 +113,7 @@ where // execute the flashblock on a thread where blocking is acceptable, // as it's potentially a heavy operation - tokio::task::spawn_blocking(move || { + task_executor.spawn_blocking(Box::pin(async move { let res = execute_flashblock( payload, ctx, @@ -122,7 +131,7 @@ where tracing::error!(target: "payload_builder", error = ?e, "failed to execute external received flashblock"); } } - }); + })); } } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index 256d2de5..b6941a70 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -157,6 +157,7 @@ impl FlashblocksServiceBuilder { payload_service.payload_events_handle(), syncer_ctx, ctx.provider().clone(), + ctx.task_executor().clone(), cancel, );