Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### 2025-10-27

- Run "engine_newPayload" block execution in a dedicated worker thread. [#5051](https://github.com/lambdaclass/ethrex/pull/5051)
- Reusing FindNode message per lookup loop instead of randomizing the key for each message. [#5047](https://github.com/lambdaclass/ethrex/pull/5047)

### 2025-10-23
Expand Down
2 changes: 1 addition & 1 deletion benches/benches/build_block_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ pub async fn bench_payload(input: &(Arc<Blockchain>, Block, &Store)) -> (Duratio
// 3. engine_newPayload is called, this eventually calls Blockchain::add_block
// which takes transactions from the mempool and fills the block with them.
let since = Instant::now();
blockchain.add_block(block).await.unwrap();
blockchain.add_block(block).unwrap();
let executed = Instant::now();
// EXTRA: Sanity check to not benchmark n empty block.
assert!(
Expand Down
1 change: 0 additions & 1 deletion cmd/ethrex/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,6 @@ pub async fn import_blocks(

blockchain
.add_block(block)
.await
.inspect_err(|err| match err {
// Block number 1's parent not found, the chain must not belong to the same network as the genesis file
ChainError::ParentNotFound if number == 1 => warn!("The chain file is not compatible with the genesis file. Are you sure you selected the correct network?"),
Expand Down
2 changes: 1 addition & 1 deletion cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ pub async fn regenerate_head_state(
.await?
.ok_or_else(|| eyre::eyre!("Block {i} not found"))?;

blockchain.add_block(block).await?;
blockchain.add_block(block)?;
}

info!("Finished regenerating state");
Expand Down
17 changes: 7 additions & 10 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ impl Blockchain {
}

/// Executes a block withing a new vm instance and state
async fn execute_block(
fn execute_block(
&self,
block: &Block,
) -> Result<(BlockExecutionResult, Vec<AccountUpdate>), ChainError> {
// Validate if it can be the new head and find the parent
let Ok(parent_header) = find_parent_header(&block.header, &self.storage) else {
// If the parent is not present, we store it as pending.
self.storage.add_pending_block(block.clone()).await?;
self.storage.add_pending_block(block.clone())?;
return Err(ChainError::ParentNotFound);
};

Expand Down Expand Up @@ -388,8 +388,7 @@ impl Blockchain {
// We cannot ensure that the users of this function have the necessary
// state stored, so in order for it to not assume anything, we update
// the storage with the new state after re-execution
self.store_block(block.clone(), account_updates_list, execution_result)
.await?;
self.store_block(block.clone(), account_updates_list, execution_result)?;

for (address, (witness, _storage_trie)) in storage_tries_after_update {
let mut witness = witness.lock().map_err(|_| {
Expand Down Expand Up @@ -499,7 +498,7 @@ impl Blockchain {
})
}

pub async fn store_block(
pub fn store_block(
&self,
block: Block,
account_updates_list: AccountUpdatesList,
Expand All @@ -518,13 +517,12 @@ impl Blockchain {

self.storage
.store_block_updates(update_batch)
.await
.map_err(|e| e.into())
}

pub async fn add_block(&self, block: Block) -> Result<(), ChainError> {
pub fn add_block(&self, block: Block) -> Result<(), ChainError> {
let since = Instant::now();
let (res, updates) = self.execute_block(&block).await?;
let (res, updates) = self.execute_block(&block)?;
let executed = Instant::now();

// Apply the account updates over the last block's state and compute the new state root
Expand All @@ -541,7 +539,7 @@ impl Blockchain {
);

let merkleized = Instant::now();
let result = self.store_block(block, account_updates_list, res).await;
let result = self.store_block(block, account_updates_list, res);
let stored = Instant::now();

if self.options.perf_logs_enabled {
Expand Down Expand Up @@ -727,7 +725,6 @@ impl Blockchain {

self.storage
.store_block_updates(update_batch)
.await
.map_err(|e| (e.into(), None))?;

let elapsed_seconds = interval.elapsed().as_secs_f64();
Expand Down
17 changes: 4 additions & 13 deletions crates/blockchain/smoke_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod blockchain_integration_test {
// Add first block. We'll make it canonical.
let block_1a = new_block(&store, &genesis_header).await;
let hash_1a = block_1a.hash();
blockchain.add_block(block_1a.clone()).await.unwrap();
blockchain.add_block(block_1a.clone()).unwrap();
store
.forkchoice_update(None, 1, hash_1a, None, None)
.await
Expand All @@ -45,7 +45,6 @@ mod blockchain_integration_test {
let hash_1b = block_1b.hash();
blockchain
.add_block(block_1b.clone())
.await
.expect("Could not add block 1b.");
let retrieved_1b = store.get_block_header_by_hash(hash_1b).unwrap().unwrap();

Expand All @@ -57,7 +56,6 @@ mod blockchain_integration_test {
let hash_2 = block_2.hash();
blockchain
.add_block(block_2.clone())
.await
.expect("Could not add block 2.");
let retrieved_2 = store.get_block_header_by_hash(hash_2).unwrap();

Expand Down Expand Up @@ -92,7 +90,7 @@ mod blockchain_integration_test {
// Build a single valid block.
let block_1 = new_block(&store, &genesis_header).await;
let hash_1 = block_1.hash();
blockchain.add_block(block_1.clone()).await.unwrap();
blockchain.add_block(block_1.clone()).unwrap();
apply_fork_choice(&store, hash_1, H256::zero(), H256::zero())
.await
.unwrap();
Expand All @@ -101,7 +99,7 @@ mod blockchain_integration_test {
let mut block_2 = new_block(&store, &block_1.header).await;
block_2.header.parent_hash = H256::random();
let hash_2 = block_2.hash();
let result = blockchain.add_block(block_2.clone()).await;
let result = blockchain.add_block(block_2.clone());
assert!(matches!(result, Err(ChainError::ParentNotFound)));

// block 2 should now be pending.
Expand All @@ -127,7 +125,7 @@ mod blockchain_integration_test {
// Add first block. Not canonical.
let block_1a = new_block(&store, &genesis_header).await;
let hash_1a = block_1a.hash();
blockchain.add_block(block_1a.clone()).await.unwrap();
blockchain.add_block(block_1a.clone()).unwrap();
let retrieved_1a = store.get_block_header_by_hash(hash_1a).unwrap().unwrap();

assert!(!is_canonical(&store, 1, hash_1a).await.unwrap());
Expand All @@ -137,7 +135,6 @@ mod blockchain_integration_test {
let hash_1b = block_1b.hash();
blockchain
.add_block(block_1b.clone())
.await
.expect("Could not add block 1b.");
apply_fork_choice(&store, hash_1b, genesis_hash, genesis_hash)
.await
Expand All @@ -154,7 +151,6 @@ mod blockchain_integration_test {
let hash_2 = block_2.hash();
blockchain
.add_block(block_2.clone())
.await
.expect("Could not add block 2.");
apply_fork_choice(&store, hash_2, genesis_hash, genesis_hash)
.await
Expand Down Expand Up @@ -201,15 +197,13 @@ mod blockchain_integration_test {
let hash_1 = block_1.hash();
blockchain
.add_block(block_1.clone())
.await
.expect("Could not add block 1b.");

// Add child at height 2.
let block_2 = new_block(&store, &block_1.header).await;
let hash_2 = block_2.hash();
blockchain
.add_block(block_2.clone())
.await
.expect("Could not add block 2.");

assert!(!is_canonical(&store, 1, hash_1).await.unwrap());
Expand Down Expand Up @@ -253,15 +247,13 @@ mod blockchain_integration_test {
let block_1 = new_block(&store, &genesis_header).await;
blockchain
.add_block(block_1.clone())
.await
.expect("Could not add block 1b.");

// Add child at height 2.
let block_2 = new_block(&store, &block_1.header).await;
let hash_2 = block_2.hash();
blockchain
.add_block(block_2.clone())
.await
.expect("Could not add block 2.");

assert_eq!(
Expand All @@ -281,7 +273,6 @@ mod blockchain_integration_test {
let hash_b = block_1b.hash();
blockchain
.add_block(block_1b.clone())
.await
.expect("Could not add block b.");

// The latest block should be the same.
Expand Down
2 changes: 1 addition & 1 deletion crates/l2/based/block_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl BlockFetcher {

async fn store_batch(&mut self, batch: &[Block]) -> Result<(), BlockFetcherError> {
for block in batch.iter() {
self.blockchain.add_block(block.clone()).await?;
self.blockchain.add_block(block.clone())?;

let block_hash = block.hash();

Expand Down
2 changes: 2 additions & 0 deletions crates/l2/networking/rpc/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub async fn start_api(
// TODO: Refactor how filters are handled,
// filters are used by the filters endpoints (eth_newFilter, eth_getFilterChanges, ...etc)
let active_filters = Arc::new(Mutex::new(HashMap::new()));
let block_worker_channel = ethrex_rpc::start_block_executor(blockchain.clone());
let service_context = RpcApiContext {
l1_ctx: ethrex_rpc::RpcApiContext {
storage,
Expand All @@ -103,6 +104,7 @@ pub async fn start_api(
gas_tip_estimator: Arc::new(TokioMutex::new(GasTipEstimator::new())),
log_filter_handler,
gas_ceil,
block_worker_channel,
},
valid_delegation_addresses,
sponsor_pk,
Expand Down
3 changes: 1 addition & 2 deletions crates/l2/sequencer/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ impl BlockProducer {
let block_hash = block.hash();
self.store_fee_config_by_block(block.header.number).await?;
self.blockchain
.store_block(block, account_updates_list, execution_result)
.await?;
.store_block(block, account_updates_list, execution_result)?;
info!(
"Stored new block {:x}, transaction_count {}",
block_hash, transactions_count
Expand Down
19 changes: 8 additions & 11 deletions crates/l2/sequencer/l1_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,16 +524,14 @@ impl L1Committer {
"no account updated".to_owned(),
))?;

one_time_checkpoint_blockchain
.store_block(
potential_batch_block.clone(),
account_updates_list,
BlockExecutionResult {
receipts,
requests: vec![],
},
)
.await?;
one_time_checkpoint_blockchain.store_block(
potential_batch_block.clone(),
account_updates_list,
BlockExecutionResult {
receipts,
requests: vec![],
},
)?;
}

// Accumulate block data with the rest of the batch.
Expand Down Expand Up @@ -1257,7 +1255,6 @@ pub async fn regenerate_head_state(

blockchain
.add_block(block)
.await
.map_err(|err| CommitterError::FailedToCreateCheckpoint(err.to_string()))?;
}

Expand Down
22 changes: 9 additions & 13 deletions crates/networking/p2p/rlpx/l2/l2_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,19 +393,15 @@ async fn process_new_block(
let block = Arc::<Block>::try_unwrap(block).map_err(|_| {
PeerConnectionError::InternalError("Failed to take ownership of block".to_string())
})?;
established
.blockchain
.add_block(block)
.await
.inspect_err(|e| {
log_peer_error!(
&established.node,
&format!(
"Error adding new block {} with hash {:?}, error: {e}",
block_number, block_hash
),
);
})?;
established.blockchain.add_block(block).inspect_err(|e| {
log_peer_error!(
&established.node,
&format!(
"Error adding new block {} with hash {:?}, error: {e}",
block_number, block_hash
),
);
})?;

apply_fork_choice(&established.storage, block_hash, block_hash, block_hash)
.await
Expand Down
2 changes: 1 addition & 1 deletion crates/networking/p2p/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ impl Syncer {
let mut last_valid_hash = H256::default();
for block in blocks {
let block_hash = block.hash();
blockchain.add_block(block).await.map_err(|e| {
blockchain.add_block(block).map_err(|e| {
(
e,
Some(BatchBlockProcessingFailure {
Expand Down
17 changes: 16 additions & 1 deletion crates/networking/rpc/engine/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ethrex_common::{H256, U256};
use ethrex_p2p::sync::SyncMode;
use ethrex_rlp::error::RLPDecodeError;
use serde_json::Value;
use tokio::sync::oneshot;
use tracing::{debug, error, info, warn};

use crate::rpc::{RpcApiContext, RpcHandler};
Expand Down Expand Up @@ -678,6 +679,20 @@ fn validate_block_hash(payload: &ExecutionPayload, block: &Block) -> Result<(),
Ok(())
}

pub async fn add_block(ctx: &RpcApiContext, block: Block) -> Result<(), ChainError> {
let (notify_send, notify_recv) = oneshot::channel();
ctx.block_worker_channel
.send((notify_send, block))
.map_err(|e| {
ChainError::Custom(format!(
"failed to send block execution request to worker: {e}"
))
})?;
notify_recv
.await
.map_err(|e| ChainError::Custom(format!("failed to receive block execution result: {e}")))?
Comment on lines +684 to +693
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to check for edge cases like two competing blocks being received at the same time, in the future. That could lead to increased latency.

}

async fn try_execute_payload(
block: Block,
context: &RpcApiContext,
Expand All @@ -696,7 +711,7 @@ async fn try_execute_payload(
// Execute and store the block
info!(%block_hash, %block_number, "Executing payload");

match context.blockchain.add_block(block).await {
match add_block(context, block).await {
Err(ChainError::ParentNotFound) => {
// Start sync
context.syncer.sync_to_head(block_hash);
Expand Down
2 changes: 1 addition & 1 deletion crates/networking/rpc/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod types;
pub mod utils;
pub use clients::{EngineClient, EthClient};

pub use rpc::start_api;
pub use rpc::{start_api, start_block_executor};

// TODO: These exports are needed by ethrex-l2-rpc, but we do not want to
// export them in the public API of this crate.
Expand Down
Loading