From 8236cb6e831457d2ac9396bf290c5f1f153f6d69 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 2 Sep 2025 10:57:14 +0200 Subject: [PATCH 1/2] fix: spawn flashblocks service as blocking --- crates/optimism/flashblocks/src/app.rs | 60 ---------------------- crates/optimism/flashblocks/src/lib.rs | 8 ++- crates/optimism/flashblocks/src/service.rs | 14 ++++- crates/optimism/rpc/src/eth/mod.rs | 22 +++++--- 4 files changed, 35 insertions(+), 69 deletions(-) delete mode 100644 crates/optimism/flashblocks/src/app.rs diff --git a/crates/optimism/flashblocks/src/app.rs b/crates/optimism/flashblocks/src/app.rs deleted file mode 100644 index 9581b709311..00000000000 --- a/crates/optimism/flashblocks/src/app.rs +++ /dev/null @@ -1,60 +0,0 @@ -use crate::{ExecutionPayloadBaseV1, FlashBlockService, WsFlashBlockStream}; -use futures_util::StreamExt; -use reth_chain_state::CanonStateSubscriptions; -use reth_evm::ConfigureEvm; -use reth_primitives_traits::{BlockTy, HeaderTy, NodePrimitives, ReceiptTy}; -use reth_rpc_eth_api::helpers::pending_block::BuildPendingEnv; -use reth_rpc_eth_types::PendingBlock; -use reth_storage_api::{BlockReaderIdExt, StateProviderFactory}; -use tokio::sync::watch; -use url::Url; - -/// Spawns a background task that subscribes over websocket to `ws_url`. -/// -/// Returns a [`FlashBlockRx`] that receives the most recent [`PendingBlock`] built from -/// [`FlashBlock`]s. -/// -/// [`FlashBlock`]: crate::FlashBlock -pub fn launch_wss_flashblocks_service( - ws_url: Url, - evm_config: EvmConfig, - provider: Provider, -) -> FlashBlockRx -where - N: NodePrimitives, - EvmConfig: ConfigureEvm< - Primitives = N, - NextBlockEnvCtx: BuildPendingEnv - + From - + Unpin - + 'static, - > + 'static, - Provider: StateProviderFactory - + CanonStateSubscriptions - + BlockReaderIdExt< - Header = HeaderTy, - Block = BlockTy, - Transaction = N::SignedTx, - Receipt = ReceiptTy, - > + Unpin - + 'static, -{ - let stream = WsFlashBlockStream::new(ws_url); - let mut service = FlashBlockService::new(stream, evm_config, provider); - let (tx, rx) = watch::channel(None); - - tokio::spawn(async move { - while let Some(block) = service.next().await { - if let Ok(block) = block.inspect_err(|e| tracing::error!("{e}")) { - let _ = tx.send(block).inspect_err(|e| tracing::error!("{e}")); - } - } - }); - - rx -} - -/// Receiver of the most recent [`PendingBlock`] built out of [`FlashBlock`]s. -/// -/// [`FlashBlock`]: crate::FlashBlock -pub type FlashBlockRx = watch::Receiver>>; diff --git a/crates/optimism/flashblocks/src/lib.rs b/crates/optimism/flashblocks/src/lib.rs index 7a4e5904c01..f7fb1c5c887 100644 --- a/crates/optimism/flashblocks/src/lib.rs +++ b/crates/optimism/flashblocks/src/lib.rs @@ -1,13 +1,17 @@ //! A downstream integration of Flashblocks. -pub use app::{launch_wss_flashblocks_service, FlashBlockRx}; pub use payload::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashBlock, Metadata, }; +use reth_rpc_eth_types::PendingBlock; pub use service::FlashBlockService; pub use ws::{WsConnect, WsFlashBlockStream}; -mod app; mod payload; mod service; mod ws; + +/// Receiver of the most recent [`PendingBlock`] built out of [`FlashBlock`]s. +/// +/// [`FlashBlock`]: crate::FlashBlock +pub type FlashBlockRx = tokio::sync::watch::Receiver>>; diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index 421d9696e22..2a9ef0db54b 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -50,6 +50,7 @@ pub struct FlashBlockService< impl FlashBlockService where N: NodePrimitives, + S: Stream> + Unpin, EvmConfig: ConfigureEvm + Unpin>, Provider: StateProviderFactory + CanonStateSubscriptions @@ -58,7 +59,7 @@ where Block = BlockTy, Transaction = N::SignedTx, Receipt = ReceiptTy, - >, + > + Unpin, { /// Constructs a new `FlashBlockService` that receives [`FlashBlock`]s from `rx` stream. pub fn new(rx: S, evm_config: EvmConfig, provider: Provider) -> Self { @@ -73,6 +74,17 @@ where } } + /// Drives the services and sends new blocks to the receiver + /// + /// Note: this should be spawned + pub async fn run(mut self, tx: tokio::sync::watch::Sender>>) { + while let Some(block) = self.next().await { + if let Ok(block) = block.inspect_err(|e| tracing::error!("{e}")) { + let _ = tx.send(block).inspect_err(|e| tracing::error!("{e}")); + } + } + } + /// Returns the cached reads at the given head hash. /// /// Returns a new cache instance if this is new `head` hash. diff --git a/crates/optimism/rpc/src/eth/mod.rs b/crates/optimism/rpc/src/eth/mod.rs index d75b9620d48..6de724c1f47 100644 --- a/crates/optimism/rpc/src/eth/mod.rs +++ b/crates/optimism/rpc/src/eth/mod.rs @@ -22,7 +22,8 @@ use reth_evm::ConfigureEvm; use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy}; use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx}; use reth_optimism_flashblocks::{ - launch_wss_flashblocks_service, ExecutionPayloadBaseV1, FlashBlockRx, + ExecutionPayloadBaseV1, FlashBlockRx, FlashBlockService, + WsFlashBlockStream, }; use reth_rpc::eth::{core::EthApiInner, DevSigner}; use reth_rpc_eth_api::{ @@ -43,6 +44,8 @@ use reth_tasks::{ TaskSpawner, }; use std::{fmt, fmt::Formatter, marker::PhantomData, sync::Arc, time::Instant}; +use tokio::sync::watch; +use tracing::info; /// Adapter for [`EthApiInner`], which holds all the data required to serve core `eth_` API. pub type EthApiNodeBackend = EthApiInner; @@ -457,13 +460,20 @@ where None }; - let flashblocks_rx = flashblocks_url.map(|ws_url| { - launch_wss_flashblocks_service( - ws_url, + let flashblocks_rx = if let Some(ws_url) = flashblocks_url { + info!(target: "reth:cli", %ws_url, "Launching flashblocks service"); + let (tx, rx) = watch::channel(None); + let stream = WsFlashBlockStream::new(ws_url); + let service = FlashBlockService::new( + stream, ctx.components.evm_config().clone(), ctx.components.provider().clone(), - ) - }); + ); + ctx.components.task_executor().spawn_blocking(Box::pin(service.run(tx))); + Some(rx) + } else { + None + }; let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner(); From a5805a32be6c7ad9a7d4f4e357b4f0ada2b592d2 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 2 Sep 2025 10:59:01 +0200 Subject: [PATCH 2/2] fmt --- crates/optimism/rpc/src/eth/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/optimism/rpc/src/eth/mod.rs b/crates/optimism/rpc/src/eth/mod.rs index 6de724c1f47..ccad56af713 100644 --- a/crates/optimism/rpc/src/eth/mod.rs +++ b/crates/optimism/rpc/src/eth/mod.rs @@ -22,8 +22,7 @@ use reth_evm::ConfigureEvm; use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy}; use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx}; use reth_optimism_flashblocks::{ - ExecutionPayloadBaseV1, FlashBlockRx, FlashBlockService, - WsFlashBlockStream, + ExecutionPayloadBaseV1, FlashBlockRx, FlashBlockService, WsFlashBlockStream, }; use reth_rpc::eth::{core::EthApiInner, DevSigner}; use reth_rpc_eth_api::{