Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions crates/optimism/bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,23 @@ async fn launch_node_with_storage<S>(
where
S: OpProofsStore + Clone + 'static,
{
let storage_clone = storage.clone();
let storage_exec = storage.clone();
let storage_rpc = storage.clone();
let proofs_history_enabled = args.proofs_history;
let handle = builder
.node(OpNode::new(args.rollup_args))
.install_exex_if(proofs_history_enabled, "proofs-history", async move |exex_context| {
Ok(OpProofsExEx::new(exex_context, storage, args.proofs_history_window).run().boxed())
Ok(OpProofsExEx::new(exex_context, storage_exec, args.proofs_history_window)
.run()
.boxed())
})
.extend_rpc_modules(move |ctx| {
if proofs_history_enabled {
let api_ext = EthApiExt::new(ctx.registry.eth_api().clone(), storage_clone.clone());
let api_ext = EthApiExt::new(ctx.registry.eth_api().clone(), storage_rpc.clone());
let debug_ext = DebugApiExt::new(
ctx.node().provider().clone(),
ctx.registry.eth_api().clone(),
storage_clone,
storage_rpc,
Box::new(ctx.node().task_executor().clone()),
ctx.node().evm_config().clone(),
);
Expand Down
4 changes: 4 additions & 0 deletions crates/optimism/exex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ reth-chainspec.workspace = true
# proofs exex handles `TrieUpdates` in notifications
reth-optimism-trie = { workspace = true, features = ["serde-bincode-compat"] }

# alloy
alloy-consensus.workspace = true

# misc
eyre.workspace = true
futures-util.workspace = true
derive_more.workspace = true
tracing.workspace = true

[dev-dependencies]
tokio = { workspace = true, features = ["test-util", "rt-multi-thread", "macros"] }
Expand Down
95 changes: 85 additions & 10 deletions crates/optimism/exex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,43 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

use alloy_consensus::BlockHeader;
use derive_more::Constructor;
use futures_util::TryStreamExt;
use reth_chainspec::ChainInfo;
use reth_exex::{ExExContext, ExExEvent};
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::{FullNodeComponents, NodePrimitives};
use reth_node_types::NodeTypes;
use reth_optimism_trie::{BackfillJob, OpProofsStore};
use reth_optimism_trie::{live::LiveTrieCollector, BackfillJob, OpProofsStorage, OpProofsStore};
use reth_provider::{BlockNumReader, DBProvider, DatabaseProviderFactory};
use tracing::{debug, error};

/// OP Proofs ExEx - processes blocks and tracks state changes within fault proof window.
///
/// Saves and serves trie nodes to make proofs faster. This handles the process of
/// saving the current state, new blocks as they're added, and serving proof RPCs
/// based on the saved data.
#[derive(Debug, Constructor)]
pub struct OpProofsExEx<Node, S>
pub struct OpProofsExEx<Node, Storage>
where
Node: FullNodeComponents,
S: OpProofsStore + Clone,
{
/// The ExEx context containing the node related utilities e.g. provider, notifications,
/// events.
ctx: ExExContext<Node>,
/// The type of storage DB.
storage: S,
storage: OpProofsStorage<Storage>,
/// The window to span blocks for proofs history. Value is the number of blocks, received as
/// cli arg.
#[expect(dead_code)]
proofs_history_window: u64,
}

impl<Node, S, Primitives> OpProofsExEx<Node, S>
impl<Node, Storage, Primitives> OpProofsExEx<Node, Storage>
where
Node: FullNodeComponents<Types: NodeTypes<Primitives = Primitives>>,
Primitives: NodePrimitives,
S: OpProofsStore + Clone,
Storage: OpProofsStore + Clone + 'static,
{
/// Main execution loop for the ExEx
pub async fn run(mut self) -> eyre::Result<()> {
Expand All @@ -53,10 +54,84 @@ where
let ChainInfo { best_number, best_hash } = self.ctx.provider().chain_info()?;
BackfillJob::new(self.storage.clone(), &db_tx).run(best_number, best_hash).await?;

let collector = LiveTrieCollector::new(
self.ctx.evm_config().clone(),
self.ctx.provider().clone(),
&self.storage,
);

while let Some(notification) = self.ctx.notifications.try_next().await? {
// match &notification {
// _ => {}
// };
match &notification {
ExExNotification::ChainCommitted { new } => {
debug!(
block_number = new.tip().number(),
block_hash = ?new.tip().hash(),
"ChainCommitted notification received",
);

// Get latest stored number (ignore stored hash for now)
let latest_stored_block_number =
match self.storage.get_latest_block_number().await? {
Some((n, _)) => n,
None => {
return Err(eyre::eyre!("No blocks stored in proofs storage"));
}
};

// If tip is not newer than what we have, nothing to do.
if new.tip().number() <= latest_stored_block_number {
debug!(
block_number = new.tip().number(),
latest_stored = latest_stored_block_number,
"Tip number is less than or equal to latest stored, skipping"
);
continue;
}

// Start from the next block after the latest stored one.
let start = latest_stored_block_number.saturating_add(1);
debug!(
start,
end = new.tip().number(),
"Applying updates for blocks in committed chain"
);
for block_number in start..=new.tip().number() {
match new.blocks().get(&block_number) {
Some(block) => {
collector.execute_and_store_block_updates(block).await?;
}
None => {
error!(
block_number,
"Missing block in committed chain, stopping incremental application",
);
return Err(eyre::eyre!(
"Missing block {} in committed chain",
block_number
));
}
}
}
}
ExExNotification::ChainReorged { old, new } => {
debug!(
old_block_number = old.tip().number(),
old_block_hash = ?old.tip().hash(),
new_block_number = new.tip().number(),
new_block_hash = ?new.tip().hash(),
"ChainReorged notification received",
);
unimplemented!("Chain reorg handling not yet implemented in OpProofsExEx");
}
ExExNotification::ChainReverted { old } => {
debug!(
old_block_number = old.tip().number(),
old_block_hash = ?old.tip().hash(),
"ChainReverted notification received",
);
unimplemented!("Chain revert handling not yet implemented");
}
};

if let Some(committed_chain) = notification.committed_chain() {
self.ctx
Expand Down
2 changes: 1 addition & 1 deletion crates/optimism/trie/src/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ where
?execute_block_duration,
?calculate_state_root_duration,
?write_trie_updates_duration,
"Stored trie updates",
"Trie updates stored successfully",
);

Ok(())
Expand Down