diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index fbac80b606d..d5c99b53488 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -1,5 +1,6 @@ use crate::{ExecutionPayloadBaseV1, FlashBlock}; use alloy_eips::{eip2718::WithEncoded, BlockNumberOrTag, Decodable2718}; +use alloy_primitives::B256; use eyre::{eyre, OptionExt}; use futures_util::{FutureExt, Stream, StreamExt}; use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions, ExecutedBlock}; @@ -13,7 +14,7 @@ use reth_primitives_traits::{ AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, RecoveredBlock, SignedTransaction, }; -use reth_revm::{database::StateProviderDatabase, db::State}; +use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State}; use reth_rpc_eth_types::{EthApiError, PendingBlock}; use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory}; use std::{ @@ -40,6 +41,11 @@ pub struct FlashBlockService< evm_config: EvmConfig, provider: Provider, canon_receiver: CanonStateNotifications, + /// Cached state reads for the current block. + /// Current `PendingBlock` is built out of a sequence of `FlashBlocks`, and executed again when + /// fb received on top of the same block. Avoid redundant I/O across multiple executions + /// within the same block. + cached_state: Option<(B256, CachedReads)>, } impl< @@ -65,9 +71,39 @@ impl< evm_config, canon_receiver: provider.subscribe_to_canonical_state(), provider, + cached_state: None, } } + /// Returns the cached reads at the given head hash. + /// + /// Returns a new cache instance if this is new `head` hash. + fn cached_reads(&mut self, head: B256) -> CachedReads { + if let Some((tracked, cache)) = self.cached_state.take() { + if tracked == head { + return cache + } + } + + // instantiate a new cache instance + CachedReads::default() + } + + /// Updates the cached reads at the given head hash + fn update_cached_reads(&mut self, head: B256, cached_reads: CachedReads) { + self.cached_state = Some((head, cached_reads)); + } + + /// Clear the state of the service, including: + /// - All flashblocks sequence of the current pending block + /// - Invalidate latest pending block built + /// - Cache + fn clear(&mut self) { + self.blocks.clear(); + self.current.take(); + self.cached_state.take(); + } + /// Adds the `block` into the collection. /// /// Depending on its index and associated block number, it may: @@ -77,8 +113,8 @@ impl< pub fn add_flash_block(&mut self, flashblock: FlashBlock) { // Flash block at index zero resets the whole state if flashblock.index == 0 { - self.blocks = vec![flashblock]; - self.current.take(); + self.clear(); + self.blocks.push(flashblock); } // Flash block at the following index adds to the collection and invalidates built block else if flashblock.index == self.blocks.last().map(|last| last.index + 1).unwrap_or(0) { @@ -101,8 +137,7 @@ impl< new_block = %flashblock.metadata.block_number, ); - self.blocks.clear(); - self.current.take(); + self.clear(); } } else { debug!("ignoring {flashblock:?}"); @@ -122,6 +157,7 @@ impl< .provider .latest_header()? .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?; + let latest_hash = latest.hash(); let attrs = self .blocks @@ -129,17 +165,19 @@ impl< .find_map(|v| v.base.clone()) .ok_or_eyre("Missing base flashblock")?; - if attrs.parent_hash != latest.hash() { + if attrs.parent_hash != latest_hash { return Err(eyre!("The base flashblock is old")); } let state_provider = self.provider.history_by_block_hash(latest.hash())?; - let state = StateProviderDatabase::new(&state_provider); - let mut db = State::builder().with_database(state).with_bundle_update().build(); + + let mut request_cache = self.cached_reads(latest_hash); + let cached_db = request_cache.as_db_mut(StateProviderDatabase::new(&state_provider)); + let mut state = State::builder().with_database(cached_db).with_bundle_update().build(); let mut builder = self .evm_config - .builder_for_next_block(&mut db, &latest, attrs.into()) + .builder_for_next_block(&mut state, &latest, attrs.into()) .map_err(RethError::other)?; builder.apply_pre_execution_changes()?; @@ -157,12 +195,15 @@ impl< builder.finish(NoopProvider::default())?; let execution_outcome = ExecutionOutcome::new( - db.take_bundle(), + state.take_bundle(), vec![execution_result.receipts], block.number(), vec![execution_result.requests], ); + // update cached reads + self.update_cached_reads(latest_hash, request_cache); + Ok(PendingBlock::with_executed_block( Instant::now() + Duration::from_secs(1), ExecutedBlock {