Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 97 additions & 15 deletions cmd/ethrex/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,19 @@ pub enum Subcommand {
removedb: bool,
#[arg(long, action = ArgAction::SetTrue)]
l2: bool,
#[arg(
long = "export-bal",
value_name = "FILE",
help = "Export BALs produced during sequential execution to a single RLP file (concatenated, one per block)"
)]
export_bal: Option<String>,
#[arg(
long = "with-bal",
value_name = "FILE",
help = "Load BALs from a single RLP file and use the parallel execution path",
conflicts_with = "export_bal"
)]
with_bal: Option<String>,
},
#[command(
name = "export",
Expand Down Expand Up @@ -645,7 +658,13 @@ impl Subcommand {
)
.await?;
}
Subcommand::ImportBench { path, removedb, l2 } => {
Subcommand::ImportBench {
path,
removedb,
l2,
export_bal,
with_bal,
} => {
if removedb {
remove_db(&effective_datadir, opts.force);
}
Expand All @@ -667,6 +686,8 @@ impl Subcommand {
precompile_cache_enabled: !opts.no_precompile_cache,
..Default::default()
},
export_bal.as_deref(),
with_bal.as_deref(),
)
.await?;
}
Expand Down Expand Up @@ -897,6 +918,8 @@ pub async fn import_blocks_bench(
datadir: &Path,
genesis: Genesis,
blockchain_opts: BlockchainOptions,
export_bal_path: Option<&str>,
with_bal_path: Option<&str>,
) -> Result<(), ChainError> {
let start_time = Instant::now();
init_datadir(datadir);
Expand All @@ -905,6 +928,30 @@ pub async fn import_blocks_bench(
regenerate_head_state(&store, &blockchain).await.unwrap();
let path_metadata = metadata(path).expect("Failed to read path");

if let Some(bal_path) = export_bal_path {
info!(path = %bal_path, "Will export BALs to file");
}

// Pre-load all BALs into memory upfront to avoid per-block I/O during benchmark
let preloaded_bals = if let Some(bal_path) = with_bal_path {
info!(path = %bal_path, "Loading BALs from file (parallel path)");
use ethrex_common::types::block_access_list::BlockAccessList;
use ethrex_rlp::decode::RLPDecode as _;
let data = std::fs::read(bal_path).expect("Failed to read BAL file");
Comment thread
edg-l marked this conversation as resolved.
Outdated
let mut remaining = data.as_slice();
let mut bals = Vec::new();
while !remaining.is_empty() {
let (bal, rest) =
BlockAccessList::decode_unfinished(remaining).expect("Failed to decode BAL");
bals.push(bal);
remaining = rest;
}
info!(count = bals.len(), "Loaded BALs into memory");
Some(bals)
} else {
None
};

// If it's an .rlp file it will be just one chain, but if it's a directory there can be multiple chains.
let chains: Vec<Vec<Block>> = if path_metadata.is_dir() {
info!(path = %path, "Importing blocks from directory");
Expand All @@ -929,6 +976,7 @@ pub async fn import_blocks_bench(
vec![utils::read_chain_file(path)]
};

let mut exported_bals = Vec::new();
let mut total_blocks_imported = 0;
for blocks in chains {
let size = blocks.len();
Expand All @@ -938,6 +986,7 @@ pub async fn import_blocks_bench(
.collect::<Vec<_>>();
// Execute block by block
let mut last_progress_log = Instant::now();
let mut bal_index = 0usize;
for (index, block) in blocks.into_iter().enumerate() {
let hash = block.hash();
let number = block.header.number;
Expand Down Expand Up @@ -965,21 +1014,44 @@ pub async fn import_blocks_bench(
validate_block_body(&block.header, &block.body, &ethrex_crypto::NativeCrypto)
.map_err(InvalidBlockError::InvalidBody)?;

blockchain
.add_block_pipeline(block, None)
.inspect_err(|err| match err {
// Block number 1's parent not found, the chain must not belong to the same network as the genesis file
ChainError::ParentNotFound if number == 1 => warn!("The chain file is not compatible with the genesis file. Are you sure you selected the correct network?"),
_ => warn!("Failed to add block {number} with hash {hash:#x}"),
})?;
// Look up preloaded BAL for this block (if --with-bal was provided).
// BALs are only produced for Amsterdam+ blocks, so use a separate counter
// that only advances for blocks that have a BAL hash in the header.
let bal = if block.header.block_access_list_hash.is_some() {
let b = preloaded_bals.as_ref().and_then(|bals| bals.get(bal_index));
Comment thread
edg-l marked this conversation as resolved.
bal_index += 1;
b
} else {
None
};
Comment on lines +1040 to +1046

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 When --with-bal is active but bal_index has grown past the end of preloaded_bals (e.g. the BAL file was exported from a shorter run, or there is a miscount between runs), bals.get(bal_index) silently returns None and the block is executed on the sequential path. No warning is logged, so a user relying on this bench to test parallel execution would get no indication that the BAL-parallel path was never exercised.

Suggested change
let bal = if block.header.block_access_list_hash.is_some() {
let b = preloaded_bals.as_ref().and_then(|bals| bals.get(bal_index));
bal_index += 1;
b
} else {
None
};
let bal = if block.header.block_access_list_hash.is_some() {
let b = preloaded_bals.as_ref().and_then(|bals| {
let entry = bals.get(bal_index);
if entry.is_none() {
warn!(block = number, bal_index, "BAL file exhausted; falling back to sequential execution for this block");
}
entry
});
bal_index += 1;
b
} else {
None
};
Prompt To Fix With AI
This is a comment left during a code review.
Path: cmd/ethrex/cli.rs
Line: 1020-1026

Comment:
When `--with-bal` is active but `bal_index` has grown past the end of `preloaded_bals` (e.g. the BAL file was exported from a shorter run, or there is a miscount between runs), `bals.get(bal_index)` silently returns `None` and the block is executed on the sequential path. No warning is logged, so a user relying on this bench to test parallel execution would get no indication that the BAL-parallel path was never exercised.

```suggestion
            let bal = if block.header.block_access_list_hash.is_some() {
                let b = preloaded_bals.as_ref().and_then(|bals| {
                    let entry = bals.get(bal_index);
                    if entry.is_none() {
                        warn!(block = number, bal_index, "BAL file exhausted; falling back to sequential execution for this block");
                    }
                    entry
                });
                bal_index += 1;
                b
            } else {
                None
            };
```

How can I resolve this? If you propose a fix, please make it concise.


if export_bal_path.is_some() {
// Sequential path: execute and capture the produced BAL
let produced_bal = blockchain
.add_block_pipeline_bal(block, None)
.inspect_err(|err| match err {
ChainError::ParentNotFound if number == 1 => warn!("The chain file is not compatible with the genesis file. Are you sure you selected the correct network?"),
_ => warn!("Failed to add block {number} with hash {hash:#x}"),
})?;

if let Some(bal) = produced_bal {
exported_bals.push(bal);
}
} else {
// Normal path (or parallel if BAL was loaded)
blockchain
.add_block_pipeline(block, bal)
.inspect_err(|err| match err {
ChainError::ParentNotFound if number == 1 => warn!("The chain file is not compatible with the genesis file. Are you sure you selected the correct network?"),
_ => warn!("Failed to add block {number} with hash {hash:#x}"),
})?;
}

// TODO: replace this
// This sleep is because we have a background process writing to disk the last layer
// And until it's done we can't execute the new block
// Because this wants to compare against running a real node in terms of reported performance
// It takes less than 500ms, so this is good enough, but we should report the performance
// without taking into account that wait.
tokio::time::sleep(Duration::from_millis(500)).await;
// Wait for the trie-update worker's Phase 2 (disk write of bottom-most
// diff layer) and Phase 3 (in-memory layer removal) for the block just
// applied to drain. Keeps the next block's per-block timer from
// absorbing the previous block's background persistence cost.
store.wait_for_persistence_idle().await?;
}

// Make head canonical and label all special blocks correctly.
Expand All @@ -998,6 +1070,16 @@ pub async fn import_blocks_bench(
total_blocks_imported += size;
}

// Write all exported BALs to a single file
if let Some(bal_path) = export_bal_path {
let mut buf = Vec::new();
for bal in &exported_bals {
bal.encode(&mut buf);
}
std::fs::write(bal_path, &buf).expect("Failed to write BAL file");
info!(count = exported_bals.len(), "Exported BALs to file");
}

let total_duration = start_time.elapsed();
info!(
blocks = total_blocks_imported,
Expand Down
47 changes: 41 additions & 6 deletions crates/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ pub struct Store {
trie_cache: Arc<RwLock<Arc<TrieLayerCache>>>,
/// Channel for controlling the FlatKeyValue generator background task.
flatkeyvalue_control_tx: std::sync::mpsc::SyncSender<FKVGeneratorControlMessage>,
/// Channel for sending trie updates to the background worker.
trie_update_worker_tx: std::sync::mpsc::SyncSender<TrieUpdate>,
/// Channel for sending trie updates (and idle pings) to the background worker.
trie_update_worker_tx: std::sync::mpsc::SyncSender<TrieMessage>,
/// Cached latest canonical block header.
///
/// Wrapped in Arc for cheap reads with infrequent writes.
Expand Down Expand Up @@ -270,6 +270,23 @@ pub struct AccountUpdatesList {
}

impl Store {
/// Block until the trie-update background worker has drained every prior
/// message and is waiting for new work — i.e. Phase 2 (disk write of the
/// bottom-most diff layer) and Phase 3 (in-memory layer removal) for all
/// previously-applied updates have completed.
///
/// Implementation: the worker channel is `sync_channel(0)`, so a send only
/// returns once the worker calls `recv()` on the next loop iteration.
/// `TrieMessage::Ping` carries no work, so the send completing is itself
/// the idle signal.
pub async fn wait_for_persistence_idle(&self) -> Result<(), StoreError> {
Comment thread
edg-l marked this conversation as resolved.
let tx = self.trie_update_worker_tx.clone();
tokio::task::spawn_blocking(move || tx.send(TrieMessage::Ping))
.await
.map_err(|e| StoreError::Custom(format!("wait_for_persistence_idle join: {e}")))?
.map_err(|e| StoreError::Custom(format!("wait_for_persistence_idle send: {e}")))
}

/// Add a block in a single transaction.
/// This will store -> BlockHeader, BlockBody, BlockTransactions, BlockNumber.
pub async fn add_block(&self, block: Block) -> Result<(), StoreError> {
Expand Down Expand Up @@ -1414,9 +1431,11 @@ impl Store {
child_state_root: last_state_root,
is_batch,
};
trie_upd_worker_tx.send(trie_update).map_err(|e| {
StoreError::Custom(format!("failed to read new trie layer notification: {e}"))
})?;
trie_upd_worker_tx
.send(TrieMessage::Update(trie_update))
.map_err(|e| {
StoreError::Custom(format!("failed to read new trie layer notification: {e}"))
})?;
let mut tx = db.begin_write()?;

for block in update_batch.blocks {
Expand Down Expand Up @@ -1611,7 +1630,7 @@ impl Store {
let rx = trie_upd_rx;
loop {
match rx.recv() {
Ok(trie_update) => {
Ok(TrieMessage::Update(trie_update)) => {
// FIXME: what should we do on error?
let _ = apply_trie_updates(
backend.as_ref(),
Expand All @@ -1621,6 +1640,10 @@ impl Store {
)
.inspect_err(|err| error!("apply_trie_updates failed: {err}"));
}
Ok(TrieMessage::Ping) => {
// Rendezvous handshake only — sender just wanted to know
// we were idle and back at recv(). No work to do.
}
Err(err) => {
debug!("Trie update sender disconnected: {err}");
return;
Expand Down Expand Up @@ -2928,6 +2951,18 @@ struct TrieUpdate {
is_batch: bool,
}

/// Messages handled by the trie-update background worker.
///
/// `Ping` is a no-op the worker handles between real updates. Because the
/// worker channel is `sync_channel(0)` (rendezvous), a successful `Ping` send
/// proves the worker has finished its previous iteration (Phase 1+2+3) and is
/// blocked back at `recv()` — i.e. persistence is idle. See
/// `Store::wait_for_persistence_idle`.
enum TrieMessage {
Update(TrieUpdate),
Ping,
}

// NOTE: we don't receive `Store` here to avoid cyclic dependencies
// with the other end of `fkv_ctl`
fn apply_trie_updates(
Expand Down
Loading