diff --git a/Cargo.lock b/Cargo.lock index 4983ebc9d34..7d13fff40b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9337,6 +9337,7 @@ dependencies = [ name = "reth-optimism-exex" version = "1.8.2" dependencies = [ + "alloy-consensus", "derive_more", "eyre", "futures", @@ -9349,6 +9350,7 @@ dependencies = [ "reth-optimism-trie", "reth-provider", "tokio", + "tracing", ] [[package]] diff --git a/crates/optimism/bin/src/main.rs b/crates/optimism/bin/src/main.rs index 030ff130ea5..9ca94915e7c 100644 --- a/crates/optimism/bin/src/main.rs +++ b/crates/optimism/bin/src/main.rs @@ -77,20 +77,23 @@ async fn launch_node_with_storage( where S: OpProofsStore + Clone + 'static, { - let storage_clone = storage.clone(); + let storage_exec = storage.clone(); + let storage_rpc = storage.clone(); let proofs_history_enabled = args.proofs_history; let handle = builder .node(OpNode::new(args.rollup_args)) .install_exex_if(proofs_history_enabled, "proofs-history", async move |exex_context| { - Ok(OpProofsExEx::new(exex_context, storage, args.proofs_history_window).run().boxed()) + Ok(OpProofsExEx::new(exex_context, storage_exec, args.proofs_history_window) + .run() + .boxed()) }) .extend_rpc_modules(move |ctx| { if proofs_history_enabled { - let api_ext = EthApiExt::new(ctx.registry.eth_api().clone(), storage_clone.clone()); + let api_ext = EthApiExt::new(ctx.registry.eth_api().clone(), storage_rpc.clone()); let debug_ext = DebugApiExt::new( ctx.node().provider().clone(), ctx.registry.eth_api().clone(), - storage_clone, + storage_rpc, Box::new(ctx.node().task_executor().clone()), ctx.node().evm_config().clone(), ); diff --git a/crates/optimism/exex/Cargo.toml b/crates/optimism/exex/Cargo.toml index 753932cfb2c..8ee85ffc20a 100644 --- a/crates/optimism/exex/Cargo.toml +++ b/crates/optimism/exex/Cargo.toml @@ -23,10 +23,14 @@ reth-chainspec.workspace = true # proofs exex handles `TrieUpdates` in notifications reth-optimism-trie = { workspace = true, features = ["serde-bincode-compat"] } +# alloy +alloy-consensus.workspace = true + # misc eyre.workspace = true futures-util.workspace = true derive_more.workspace = true +tracing.workspace = true [dev-dependencies] tokio = { workspace = true, features = ["test-util", "rt-multi-thread", "macros"] } diff --git a/crates/optimism/exex/src/lib.rs b/crates/optimism/exex/src/lib.rs index 2ba70468e00..98b9990c7e7 100644 --- a/crates/optimism/exex/src/lib.rs +++ b/crates/optimism/exex/src/lib.rs @@ -8,14 +8,16 @@ #![cfg_attr(docsrs, feature(doc_cfg))] #![cfg_attr(not(test), warn(unused_crate_dependencies))] +use alloy_consensus::BlockHeader; use derive_more::Constructor; use futures_util::TryStreamExt; use reth_chainspec::ChainInfo; -use reth_exex::{ExExContext, ExExEvent}; +use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_api::{FullNodeComponents, NodePrimitives}; use reth_node_types::NodeTypes; -use reth_optimism_trie::{BackfillJob, OpProofsStore}; +use reth_optimism_trie::{live::LiveTrieCollector, BackfillJob, OpProofsStorage, OpProofsStore}; use reth_provider::{BlockNumReader, DBProvider, DatabaseProviderFactory}; +use tracing::{debug, error}; /// OP Proofs ExEx - processes blocks and tracks state changes within fault proof window. /// @@ -23,27 +25,26 @@ use reth_provider::{BlockNumReader, DBProvider, DatabaseProviderFactory}; /// saving the current state, new blocks as they're added, and serving proof RPCs /// based on the saved data. #[derive(Debug, Constructor)] -pub struct OpProofsExEx +pub struct OpProofsExEx where Node: FullNodeComponents, - S: OpProofsStore + Clone, { /// The ExEx context containing the node related utilities e.g. provider, notifications, /// events. ctx: ExExContext, /// The type of storage DB. - storage: S, + storage: OpProofsStorage, /// The window to span blocks for proofs history. Value is the number of blocks, received as /// cli arg. #[expect(dead_code)] proofs_history_window: u64, } -impl OpProofsExEx +impl OpProofsExEx where Node: FullNodeComponents>, Primitives: NodePrimitives, - S: OpProofsStore + Clone, + Storage: OpProofsStore + Clone + 'static, { /// Main execution loop for the ExEx pub async fn run(mut self) -> eyre::Result<()> { @@ -53,10 +54,84 @@ where let ChainInfo { best_number, best_hash } = self.ctx.provider().chain_info()?; BackfillJob::new(self.storage.clone(), &db_tx).run(best_number, best_hash).await?; + let collector = LiveTrieCollector::new( + self.ctx.evm_config().clone(), + self.ctx.provider().clone(), + &self.storage, + ); + while let Some(notification) = self.ctx.notifications.try_next().await? { - // match ¬ification { - // _ => {} - // }; + match ¬ification { + ExExNotification::ChainCommitted { new } => { + debug!( + block_number = new.tip().number(), + block_hash = ?new.tip().hash(), + "ChainCommitted notification received", + ); + + // Get latest stored number (ignore stored hash for now) + let latest_stored_block_number = + match self.storage.get_latest_block_number().await? { + Some((n, _)) => n, + None => { + return Err(eyre::eyre!("No blocks stored in proofs storage")); + } + }; + + // If tip is not newer than what we have, nothing to do. + if new.tip().number() <= latest_stored_block_number { + debug!( + block_number = new.tip().number(), + latest_stored = latest_stored_block_number, + "Tip number is less than or equal to latest stored, skipping" + ); + continue; + } + + // Start from the next block after the latest stored one. + let start = latest_stored_block_number.saturating_add(1); + debug!( + start, + end = new.tip().number(), + "Applying updates for blocks in committed chain" + ); + for block_number in start..=new.tip().number() { + match new.blocks().get(&block_number) { + Some(block) => { + collector.execute_and_store_block_updates(block).await?; + } + None => { + error!( + block_number, + "Missing block in committed chain, stopping incremental application", + ); + return Err(eyre::eyre!( + "Missing block {} in committed chain", + block_number + )); + } + } + } + } + ExExNotification::ChainReorged { old, new } => { + debug!( + old_block_number = old.tip().number(), + old_block_hash = ?old.tip().hash(), + new_block_number = new.tip().number(), + new_block_hash = ?new.tip().hash(), + "ChainReorged notification received", + ); + unimplemented!("Chain reorg handling not yet implemented in OpProofsExEx"); + } + ExExNotification::ChainReverted { old } => { + debug!( + old_block_number = old.tip().number(), + old_block_hash = ?old.tip().hash(), + "ChainReverted notification received", + ); + unimplemented!("Chain revert handling not yet implemented"); + } + }; if let Some(committed_chain) = notification.committed_chain() { self.ctx diff --git a/crates/optimism/trie/src/live.rs b/crates/optimism/trie/src/live.rs index 1329f2bd203..c2ac5fc3fb4 100644 --- a/crates/optimism/trie/src/live.rs +++ b/crates/optimism/trie/src/live.rs @@ -119,7 +119,7 @@ where ?execute_block_duration, ?calculate_state_root_duration, ?write_trie_updates_duration, - "Stored trie updates", + "Trie updates stored successfully", ); Ok(())