From bd9491c97ef874a23beb1354e004da296948fd1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Hodul=C3=A1k?= Date: Wed, 3 Sep 2025 12:37:12 +0200 Subject: [PATCH 1/3] refactor(optimism): Extract pending block building responsibility out of `FlashBlockService` --- crates/optimism/flashblocks/src/lib.rs | 1 + crates/optimism/flashblocks/src/service.rs | 120 ++++---------------- crates/optimism/flashblocks/src/worker.rs | 124 +++++++++++++++++++++ 3 files changed, 145 insertions(+), 100 deletions(-) create mode 100644 crates/optimism/flashblocks/src/worker.rs diff --git a/crates/optimism/flashblocks/src/lib.rs b/crates/optimism/flashblocks/src/lib.rs index 2fba06a9d0e..4614738d620 100644 --- a/crates/optimism/flashblocks/src/lib.rs +++ b/crates/optimism/flashblocks/src/lib.rs @@ -10,6 +10,7 @@ pub use ws::{WsConnect, WsFlashBlockStream}; mod payload; mod sequence; mod service; +mod worker; mod ws; /// Receiver of the most recent [`PendingBlock`] built out of [`FlashBlock`]s. diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index ecaa833f1e9..90279a15557 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -1,25 +1,18 @@ -use crate::{sequence::FlashBlockSequence, ExecutionPayloadBaseV1, FlashBlock}; -use alloy_eips::BlockNumberOrTag; -use alloy_primitives::B256; -use futures_util::{FutureExt, Stream, StreamExt}; -use reth_chain_state::{ - CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions, ExecutedBlock, -}; -use reth_errors::RethError; -use reth_evm::{ - execute::{BlockBuilder, BlockBuilderOutcome}, - ConfigureEvm, +use crate::{ + sequence::FlashBlockSequence, + worker::{BuildArgs, FlashBlockBuilder}, + ExecutionPayloadBaseV1, FlashBlock, }; -use reth_execution_types::ExecutionOutcome; +use futures_util::{FutureExt, Stream, StreamExt}; +use reth_chain_state::{CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions}; +use reth_evm::ConfigureEvm; use reth_primitives_traits::{AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy}; -use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State}; -use reth_rpc_eth_types::{EthApiError, PendingBlock}; -use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory}; +use reth_rpc_eth_types::PendingBlock; +use reth_storage_api::{BlockReaderIdExt, StateProviderFactory}; use std::{ pin::Pin, - sync::Arc, task::{Context, Poll}, - time::{Duration, Instant}, + time::Instant, }; use tokio::pin; use tracing::{debug, trace, warn}; @@ -37,14 +30,8 @@ pub struct FlashBlockService< current: Option>, blocks: FlashBlockSequence, rebuild: bool, - evm_config: EvmConfig, - provider: Provider, + builder: FlashBlockBuilder, canon_receiver: CanonStateNotifications, - /// Cached state reads for the current block. - /// Current `PendingBlock` is built out of a sequence of `FlashBlocks`, and executed again when - /// fb received on top of the same block. Avoid redundant I/O across multiple executions - /// within the same block. - cached_state: Option<(B256, CachedReads)>, } impl FlashBlockService @@ -67,10 +54,8 @@ where rx, current: None, blocks: FlashBlockSequence::new(), - evm_config, canon_receiver: provider.subscribe_to_canonical_state(), - provider, - cached_state: None, + builder: FlashBlockBuilder::new(evm_config, provider), rebuild: false, } } @@ -88,86 +73,21 @@ where warn!("Flashblock service has stopped"); } - /// Returns the cached reads at the given head hash. - /// - /// Returns a new cache instance if this is new `head` hash. - fn cached_reads(&mut self, head: B256) -> CachedReads { - if let Some((tracked, cache)) = self.cached_state.take() { - if tracked == head { - return cache - } - } - - // instantiate a new cache instance - CachedReads::default() - } - - /// Updates the cached reads at the given head hash - fn update_cached_reads(&mut self, head: B256, cached_reads: CachedReads) { - self.cached_state = Some((head, cached_reads)); - } - - /// Returns the [`ExecutedBlock`] made purely out of [`FlashBlock`]s that were received earlier. + /// Returns the [`PendingBlock`] made purely out of [`FlashBlock`]s that were received earlier. /// /// Returns None if the flashblock doesn't attach to the latest header. fn execute(&mut self) -> eyre::Result>> { - trace!("Attempting new flashblock"); + let Some(base) = self.blocks.payload_base() else { + trace!( + flashblock_number = ?self.blocks.block_number(), + count = %self.blocks.count(), + "Missing flashblock payload base" + ); - let latest = self - .provider - .latest_header()? - .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?; - let latest_hash = latest.hash(); - - let Some(attrs) = self.blocks.payload_base() else { - trace!(flashblock_number = ?self.blocks.block_number(), count = %self.blocks.count(), "Missing flashblock payload base"); return Ok(None) }; - if attrs.parent_hash != latest_hash { - trace!(flashblock_parent = ?attrs.parent_hash, local_latest=?latest.num_hash(),"Skipping non consecutive flashblock"); - // doesn't attach to the latest block - return Ok(None) - } - - let state_provider = self.provider.history_by_block_hash(latest.hash())?; - - let mut request_cache = self.cached_reads(latest_hash); - let cached_db = request_cache.as_db_mut(StateProviderDatabase::new(&state_provider)); - let mut state = State::builder().with_database(cached_db).with_bundle_update().build(); - - let mut builder = self - .evm_config - .builder_for_next_block(&mut state, &latest, attrs.into()) - .map_err(RethError::other)?; - - builder.apply_pre_execution_changes()?; - - for tx in self.blocks.ready_transactions() { - let _gas_used = builder.execute_transaction(tx)?; - } - - let BlockBuilderOutcome { execution_result, block, hashed_state, .. } = - builder.finish(NoopProvider::default())?; - - let execution_outcome = ExecutionOutcome::new( - state.take_bundle(), - vec![execution_result.receipts], - block.number(), - vec![execution_result.requests], - ); - - // update cached reads - self.update_cached_reads(latest_hash, request_cache); - - Ok(Some(PendingBlock::with_executed_block( - Instant::now() + Duration::from_secs(1), - ExecutedBlock { - recovered_block: block.into(), - execution_output: Arc::new(execution_outcome), - hashed_state: Arc::new(hashed_state), - }, - ))) + self.builder.execute(BuildArgs { base, transactions: self.blocks.ready_transactions() }) } /// Takes out `current` [`PendingBlock`] if `state` is not preceding it. diff --git a/crates/optimism/flashblocks/src/worker.rs b/crates/optimism/flashblocks/src/worker.rs new file mode 100644 index 00000000000..568b53464fd --- /dev/null +++ b/crates/optimism/flashblocks/src/worker.rs @@ -0,0 +1,124 @@ +use crate::ExecutionPayloadBaseV1; +use alloy_eips::{eip2718::WithEncoded, BlockNumberOrTag}; +use alloy_primitives::B256; +use reth_chain_state::{CanonStateSubscriptions, ExecutedBlock}; +use reth_errors::RethError; +use reth_evm::{ + execute::{BlockBuilder, BlockBuilderOutcome}, + ConfigureEvm, +}; +use reth_execution_types::ExecutionOutcome; +use reth_primitives_traits::{ + AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, Recovered, +}; +use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State}; +use reth_rpc_eth_types::{EthApiError, PendingBlock}; +use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; +use tracing::trace; + +/// The `FlashBlockBuilder` builds [`PendingBlock`] out of a sequence of transactions. +#[derive(Debug)] +pub(crate) struct FlashBlockBuilder { + evm_config: EvmConfig, + provider: Provider, + /// Cached state reads for the current block. + /// Current `PendingBlock` is built out of a sequence of `FlashBlocks`, and executed again when + /// fb received on top of the same block. Avoid redundant I/O across multiple executions + /// within the same block. + cached_state: Option<(B256, CachedReads)>, +} + +impl FlashBlockBuilder { + pub(crate) const fn new(evm_config: EvmConfig, provider: Provider) -> Self { + Self { evm_config, provider, cached_state: None } + } +} + +pub(crate) struct BuildArgs { + pub base: ExecutionPayloadBaseV1, + pub transactions: I, +} + +impl FlashBlockBuilder +where + N: NodePrimitives, + EvmConfig: ConfigureEvm + Unpin>, + Provider: StateProviderFactory + + CanonStateSubscriptions + + BlockReaderIdExt< + Header = HeaderTy, + Block = BlockTy, + Transaction = N::SignedTx, + Receipt = ReceiptTy, + > + Unpin, +{ + /// Returns the [`PendingBlock`] made purely out of transactions and [`ExecutionPayloadBaseV1`] + /// in `args`. + /// + /// Returns `None` if the flashblock doesn't attach to the latest header. + pub(crate) fn execute>>>( + &mut self, + args: BuildArgs, + ) -> eyre::Result>> { + trace!("Building new pending block from flashblocks"); + + let latest = self + .provider + .latest_header()? + .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?; + let latest_hash = latest.hash(); + + if args.base.parent_hash != latest_hash { + trace!(flashblock_parent = ?args.base.parent_hash, local_latest=?latest.num_hash(),"Skipping non consecutive flashblock"); + // doesn't attach to the latest block + return Ok(None) + } + + let state_provider = self.provider.history_by_block_hash(latest.hash())?; + + let mut request_cache = self + .cached_state + .take() + .filter(|(hash, _)| hash == &latest_hash) + .map(|(_, state)| state) + .unwrap_or_default(); + let cached_db = request_cache.as_db_mut(StateProviderDatabase::new(&state_provider)); + let mut state = State::builder().with_database(cached_db).with_bundle_update().build(); + + let mut builder = self + .evm_config + .builder_for_next_block(&mut state, &latest, args.base.into()) + .map_err(RethError::other)?; + + builder.apply_pre_execution_changes()?; + + for tx in args.transactions { + let _gas_used = builder.execute_transaction(tx)?; + } + + let BlockBuilderOutcome { execution_result, block, hashed_state, .. } = + builder.finish(NoopProvider::default())?; + + let execution_outcome = ExecutionOutcome::new( + state.take_bundle(), + vec![execution_result.receipts], + block.number(), + vec![execution_result.requests], + ); + + self.cached_state.replace((latest_hash, request_cache)); + + Ok(Some(PendingBlock::with_executed_block( + Instant::now() + Duration::from_secs(1), + ExecutedBlock { + recovered_block: block.into(), + execution_output: Arc::new(execution_outcome), + hashed_state: Arc::new(hashed_state), + }, + ))) + } +} From 3936b9ce06de0c92a50f834e154cb3782897ee60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Hodul=C3=A1k?= Date: Mon, 8 Sep 2025 09:54:28 +0200 Subject: [PATCH 2/3] feat(optimism): Spawn blocking task for pending block builds in `FlashBlockService` (#18294) --- Cargo.lock | 1 + crates/optimism/flashblocks/Cargo.toml | 1 + crates/optimism/flashblocks/src/service.rs | 126 ++++++++++++++++----- crates/optimism/flashblocks/src/worker.rs | 41 +++---- crates/optimism/rpc/src/eth/mod.rs | 3 +- 5 files changed, 121 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa139f02a36..98405388dbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9339,6 +9339,7 @@ dependencies = [ "reth-rpc-eth-api", "reth-rpc-eth-types", "reth-storage-api", + "reth-tasks", "serde", "serde_json", "test-case", diff --git a/crates/optimism/flashblocks/Cargo.toml b/crates/optimism/flashblocks/Cargo.toml index ad57c45f163..30371242eae 100644 --- a/crates/optimism/flashblocks/Cargo.toml +++ b/crates/optimism/flashblocks/Cargo.toml @@ -23,6 +23,7 @@ reth-rpc-eth-api.workspace = true reth-rpc-eth-types.workspace = true reth-errors.workspace = true reth-storage-api.workspace = true +reth-tasks.workspace = true # alloy alloy-eips = { workspace = true, features = ["serde"] } diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index 90279a15557..1be731213d5 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -3,18 +3,24 @@ use crate::{ worker::{BuildArgs, FlashBlockBuilder}, ExecutionPayloadBaseV1, FlashBlock, }; +use alloy_eips::eip2718::WithEncoded; +use alloy_primitives::B256; use futures_util::{FutureExt, Stream, StreamExt}; use reth_chain_state::{CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions}; use reth_evm::ConfigureEvm; -use reth_primitives_traits::{AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy}; +use reth_primitives_traits::{ + AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, Recovered, +}; +use reth_revm::cached::CachedReads; use reth_rpc_eth_types::PendingBlock; use reth_storage_api::{BlockReaderIdExt, StateProviderFactory}; +use reth_tasks::TaskExecutor; use std::{ pin::Pin, - task::{Context, Poll}, + task::{ready, Context, Poll}, time::Instant, }; -use tokio::pin; +use tokio::{pin, sync::oneshot}; use tracing::{debug, trace, warn}; /// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of @@ -32,13 +38,22 @@ pub struct FlashBlockService< rebuild: bool, builder: FlashBlockBuilder, canon_receiver: CanonStateNotifications, + spawner: TaskExecutor, + job: Option>, + /// Cached state reads for the current block. + /// Current `PendingBlock` is built out of a sequence of `FlashBlocks`, and executed again when + /// fb received on top of the same block. Avoid redundant I/O across multiple executions + /// within the same block. + cached_state: Option<(B256, CachedReads)>, } impl FlashBlockService where N: NodePrimitives, - S: Stream> + Unpin, - EvmConfig: ConfigureEvm + Unpin>, + S: Stream> + Unpin + 'static, + EvmConfig: ConfigureEvm + Unpin> + + Clone + + 'static, Provider: StateProviderFactory + CanonStateSubscriptions + BlockReaderIdExt< @@ -46,10 +61,12 @@ where Block = BlockTy, Transaction = N::SignedTx, Receipt = ReceiptTy, - > + Unpin, + > + Unpin + + Clone + + 'static, { /// Constructs a new `FlashBlockService` that receives [`FlashBlock`]s from `rx` stream. - pub fn new(rx: S, evm_config: EvmConfig, provider: Provider) -> Self { + pub fn new(rx: S, evm_config: EvmConfig, provider: Provider, spawner: TaskExecutor) -> Self { Self { rx, current: None, @@ -57,6 +74,9 @@ where canon_receiver: provider.subscribe_to_canonical_state(), builder: FlashBlockBuilder::new(evm_config, provider), rebuild: false, + spawner, + job: None, + cached_state: None, } } @@ -73,10 +93,12 @@ where warn!("Flashblock service has stopped"); } - /// Returns the [`PendingBlock`] made purely out of [`FlashBlock`]s that were received earlier. + /// Returns the [`BuildArgs`] made purely out of [`FlashBlock`]s that were received earlier. /// - /// Returns None if the flashblock doesn't attach to the latest header. - fn execute(&mut self) -> eyre::Result>> { + /// Returns `None` if the flashblock have no `base`. + fn build_args( + &mut self, + ) -> Option>>>> { let Some(base) = self.blocks.payload_base() else { trace!( flashblock_number = ?self.blocks.block_number(), @@ -84,10 +106,14 @@ where "Missing flashblock payload base" ); - return Ok(None) + return None }; - self.builder.execute(BuildArgs { base, transactions: self.blocks.ready_transactions() }) + Some(BuildArgs { + base, + transactions: self.blocks.ready_transactions().collect::>(), + cached_state: self.cached_state.take(), + }) } /// Takes out `current` [`PendingBlock`] if `state` is not preceding it. @@ -100,8 +126,10 @@ where impl Stream for FlashBlockService where N: NodePrimitives, - S: Stream> + Unpin, - EvmConfig: ConfigureEvm + Unpin>, + S: Stream> + Unpin + 'static, + EvmConfig: ConfigureEvm + Unpin> + + Clone + + 'static, Provider: StateProviderFactory + CanonStateSubscriptions + BlockReaderIdExt< @@ -109,13 +137,52 @@ where Block = BlockTy, Transaction = N::SignedTx, Receipt = ReceiptTy, - > + Unpin, + > + Unpin + + Clone + + 'static, { type Item = eyre::Result>>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); + let result = if let Some((now, rx)) = &mut this.job { + let result = ready!(rx.poll_unpin(cx)).unwrap(); + Some((*now, result)) + } else { + None + }; + + if let Some((now, result)) = result { + this.job.take(); + + match result { + Ok(Some((new_pending, cached_reads))) => { + // built a new pending block + this.current = Some(new_pending.clone()); + this.cached_state = Some((new_pending.block().hash(), cached_reads)); + this.rebuild = false; + + trace!( + parent_hash = %new_pending.block().parent_hash(), + block_number = new_pending.block().number(), + flash_blocks = this.blocks.count(), + elapsed = ?now.elapsed(), + "Built new block with flashblocks" + ); + + return Poll::Ready(Some(Ok(Some(new_pending)))); + } + Ok(None) => { + // nothing to do because tracked flashblock doesn't attach to latest + } + Err(err) => { + // we can ignore this error + debug!(%err, "failed to execute flashblock"); + } + } + } + // consume new flashblocks while they're ready while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) { match result { @@ -147,25 +214,22 @@ where return Poll::Pending } - let now = Instant::now(); // try to build a block on top of latest - match this.execute() { - Ok(Some(new_pending)) => { - // built a new pending block - this.current = Some(new_pending.clone()); - this.rebuild = false; - trace!(parent_hash=%new_pending.block().parent_hash(), block_number=new_pending.block().number(), flash_blocks=this.blocks.count(), elapsed=?now.elapsed(), "Built new block with flashblocks"); - return Poll::Ready(Some(Ok(Some(new_pending)))); - } - Ok(None) => { - // nothing to do because tracked flashblock doesn't attach to latest - } - Err(err) => { - // we can ignore this error - debug!(%err, "failed to execute flashblock"); - } + if let Some(args) = this.build_args() { + let now = Instant::now(); + + let (tx, rx) = oneshot::channel(); + let builder = this.builder.clone(); + + this.spawner.spawn_blocking(async move { + let _ = tx.send(builder.execute(args)); + }); + this.job.replace((now, rx)); } Poll::Pending } } + +type BuildJob = + (Instant, oneshot::Receiver, CachedReads)>>>); diff --git a/crates/optimism/flashblocks/src/worker.rs b/crates/optimism/flashblocks/src/worker.rs index 568b53464fd..f46e362c2d0 100644 --- a/crates/optimism/flashblocks/src/worker.rs +++ b/crates/optimism/flashblocks/src/worker.rs @@ -25,22 +25,18 @@ use tracing::trace; pub(crate) struct FlashBlockBuilder { evm_config: EvmConfig, provider: Provider, - /// Cached state reads for the current block. - /// Current `PendingBlock` is built out of a sequence of `FlashBlocks`, and executed again when - /// fb received on top of the same block. Avoid redundant I/O across multiple executions - /// within the same block. - cached_state: Option<(B256, CachedReads)>, } impl FlashBlockBuilder { pub(crate) const fn new(evm_config: EvmConfig, provider: Provider) -> Self { - Self { evm_config, provider, cached_state: None } + Self { evm_config, provider } } } pub(crate) struct BuildArgs { pub base: ExecutionPayloadBaseV1, pub transactions: I, + pub cached_state: Option<(B256, CachedReads)>, } impl FlashBlockBuilder @@ -61,9 +57,9 @@ where /// /// Returns `None` if the flashblock doesn't attach to the latest header. pub(crate) fn execute>>>( - &mut self, - args: BuildArgs, - ) -> eyre::Result>> { + &self, + mut args: BuildArgs, + ) -> eyre::Result, CachedReads)>> { trace!("Building new pending block from flashblocks"); let latest = self @@ -80,7 +76,7 @@ where let state_provider = self.provider.history_by_block_hash(latest.hash())?; - let mut request_cache = self + let mut request_cache = args .cached_state .take() .filter(|(hash, _)| hash == &latest_hash) @@ -110,15 +106,22 @@ where vec![execution_result.requests], ); - self.cached_state.replace((latest_hash, request_cache)); - - Ok(Some(PendingBlock::with_executed_block( - Instant::now() + Duration::from_secs(1), - ExecutedBlock { - recovered_block: block.into(), - execution_output: Arc::new(execution_outcome), - hashed_state: Arc::new(hashed_state), - }, + Ok(Some(( + PendingBlock::with_executed_block( + Instant::now() + Duration::from_secs(1), + ExecutedBlock { + recovered_block: block.into(), + execution_output: Arc::new(execution_outcome), + hashed_state: Arc::new(hashed_state), + }, + ), + request_cache, ))) } } + +impl Clone for FlashBlockBuilder { + fn clone(&self) -> Self { + Self { evm_config: self.evm_config.clone(), provider: self.provider.clone() } + } +} diff --git a/crates/optimism/rpc/src/eth/mod.rs b/crates/optimism/rpc/src/eth/mod.rs index ccad56af713..6c32e28a5b7 100644 --- a/crates/optimism/rpc/src/eth/mod.rs +++ b/crates/optimism/rpc/src/eth/mod.rs @@ -467,8 +467,9 @@ where stream, ctx.components.evm_config().clone(), ctx.components.provider().clone(), + ctx.components.task_executor().clone(), ); - ctx.components.task_executor().spawn_blocking(Box::pin(service.run(tx))); + ctx.components.task_executor().spawn(Box::pin(service.run(tx))); Some(rx) } else { None From e109463dcf9b98fc86cc373257799486e3620eb3 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 8 Sep 2025 11:01:31 +0200 Subject: [PATCH 3/3] chore: touchups --- crates/optimism/flashblocks/src/service.rs | 151 ++++++++++++--------- crates/optimism/flashblocks/src/worker.rs | 6 +- 2 files changed, 89 insertions(+), 68 deletions(-) diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index 1be731213d5..7fe10f5c23a 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -95,7 +95,7 @@ where /// Returns the [`BuildArgs`] made purely out of [`FlashBlock`]s that were received earlier. /// - /// Returns `None` if the flashblock have no `base`. + /// Returns `None` if the flashblock have no `base` or the base is not a child block of latest. fn build_args( &mut self, ) -> Option>>>> { @@ -109,6 +109,14 @@ where return None }; + // attempt an initial consecutive check + if let Some(latest) = self.builder.provider().latest_header().ok().flatten() { + if latest.hash() != base.parent_hash { + trace!(flashblock_parent=?base.parent_hash, flashblock_number=base.block_number, local_latest=?latest.num_hash(), "Skipping non consecutive build attempt"); + return None; + } + } + Some(BuildArgs { base, transactions: self.blocks.ready_transactions().collect::>(), @@ -146,88 +154,97 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - let result = if let Some((now, rx)) = &mut this.job { - let result = ready!(rx.poll_unpin(cx)).unwrap(); - Some((*now, result)) - } else { - None - }; - - if let Some((now, result)) = result { + loop { + // drive pending build job to completion + let result = match this.job.as_mut() { + Some((now, rx)) => { + let result = ready!(rx.poll_unpin(cx)); + result.ok().map(|res| (*now, res)) + } + None => None, + }; + // reset job this.job.take(); - match result { - Ok(Some((new_pending, cached_reads))) => { - // built a new pending block - this.current = Some(new_pending.clone()); - this.cached_state = Some((new_pending.block().hash(), cached_reads)); - this.rebuild = false; + if let Some((now, result)) = result { + match result { + Ok(Some((new_pending, cached_reads))) => { + // built a new pending block + this.current = Some(new_pending.clone()); + // cache reads + this.cached_state = Some((new_pending.parent_hash(), cached_reads)); + this.rebuild = false; + + trace!( + parent_hash = %new_pending.block().parent_hash(), + block_number = new_pending.block().number(), + flash_blocks = this.blocks.count(), + elapsed = ?now.elapsed(), + "Built new block with flashblocks" + ); + + return Poll::Ready(Some(Ok(Some(new_pending)))); + } + Ok(None) => { + // nothing to do because tracked flashblock doesn't attach to latest + } + Err(err) => { + // we can ignore this error + debug!(%err, "failed to execute flashblock"); + } + } + } + + // consume new flashblocks while they're ready + while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) { + match result { + Ok(flashblock) => match this.blocks.insert(flashblock) { + Ok(_) => this.rebuild = true, + Err(err) => debug!(%err, "Failed to prepare flashblock"), + }, + Err(err) => return Poll::Ready(Some(Err(err))), + } + } + // update on new head block + if let Poll::Ready(Ok(state)) = { + let fut = this.canon_receiver.recv(); + pin!(fut); + fut.poll_unpin(cx) + } { + if let Some(current) = this.on_new_tip(state) { trace!( - parent_hash = %new_pending.block().parent_hash(), - block_number = new_pending.block().number(), - flash_blocks = this.blocks.count(), - elapsed = ?now.elapsed(), - "Built new block with flashblocks" + parent_hash = %current.block().parent_hash(), + block_number = current.block().number(), + "Clearing current flashblock on new canonical block" ); - return Poll::Ready(Some(Ok(Some(new_pending)))); - } - Ok(None) => { - // nothing to do because tracked flashblock doesn't attach to latest - } - Err(err) => { - // we can ignore this error - debug!(%err, "failed to execute flashblock"); + return Poll::Ready(Some(Ok(None))) } } - } - // consume new flashblocks while they're ready - while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) { - match result { - Ok(flashblock) => match this.blocks.insert(flashblock) { - Ok(_) => this.rebuild = true, - Err(err) => debug!(%err, "Failed to prepare flashblock"), - }, - Err(err) => return Poll::Ready(Some(Err(err))), + if !this.rebuild && this.current.is_some() { + return Poll::Pending } - } - if let Poll::Ready(Ok(state)) = { - let fut = this.canon_receiver.recv(); - pin!(fut); - fut.poll_unpin(cx) - } { - if let Some(current) = this.on_new_tip(state) { - trace!( - parent_hash = %current.block().parent_hash(), - block_number = current.block().number(), - "Clearing current flashblock on new canonical block" - ); - - return Poll::Ready(Some(Ok(None))) - } - } + // try to build a block on top of latest + if let Some(args) = this.build_args() { + let now = Instant::now(); - if !this.rebuild && this.current.is_some() { - return Poll::Pending - } + let (tx, rx) = oneshot::channel(); + let builder = this.builder.clone(); - // try to build a block on top of latest - if let Some(args) = this.build_args() { - let now = Instant::now(); + this.spawner.spawn_blocking(async move { + let _ = tx.send(builder.execute(args)); + }); + this.job.replace((now, rx)); - let (tx, rx) = oneshot::channel(); - let builder = this.builder.clone(); + // continue and poll the spawned job + continue + } - this.spawner.spawn_blocking(async move { - let _ = tx.send(builder.execute(args)); - }); - this.job.replace((now, rx)); + return Poll::Pending } - - Poll::Pending } } diff --git a/crates/optimism/flashblocks/src/worker.rs b/crates/optimism/flashblocks/src/worker.rs index f46e362c2d0..c2bf04495ea 100644 --- a/crates/optimism/flashblocks/src/worker.rs +++ b/crates/optimism/flashblocks/src/worker.rs @@ -31,6 +31,10 @@ impl FlashBlockBuilder { pub(crate) const fn new(evm_config: EvmConfig, provider: Provider) -> Self { Self { evm_config, provider } } + + pub(crate) const fn provider(&self) -> &Provider { + &self.provider + } } pub(crate) struct BuildArgs { @@ -60,7 +64,7 @@ where &self, mut args: BuildArgs, ) -> eyre::Result, CachedReads)>> { - trace!("Building new pending block from flashblocks"); + trace!("Attempting new pending block from flashblocks"); let latest = self .provider