From d76c8f1415c0fceb5f3a8712c67f586be134b010 Mon Sep 17 00:00:00 2001 From: Mario Rugiero Date: Fri, 24 Oct 2025 20:58:36 -0300 Subject: [PATCH 01/11] wip: perf(l1): payload block execution in thread Extract the block execution for payloads to an OS thread, make all its callees regular blocking functions, and removes the reliance on Tokio. Caveats: 1. The worker is in the RPC, mostly because the Blockchain is not Clone; 2. The L2 is broken from the removal of async, fee config mostly becomes a mess; 3. The channel is provided by Tokio because the RPC is still async and shouldn't block waiting. --- benches/benches/build_block_benchmark.rs | 2 +- cmd/ethrex/cli.rs | 1 - cmd/ethrex/initializers.rs | 2 +- crates/blockchain/blockchain.rs | 27 +- crates/blockchain/payload.rs | 3 +- crates/blockchain/smoke_test.rs | 17 +- crates/blockchain/tracing.rs | 2 +- crates/l2/based/block_fetcher.rs | 4 +- crates/l2/networking/rpc/rpc.rs | 2 + crates/l2/sequencer/block_producer.rs | 6 +- crates/l2/sequencer/l1_committer.rs | 2 +- .../networking/p2p/rlpx/l2/l2_connection.rs | 22 +- crates/networking/p2p/sync.rs | 2 +- crates/networking/rpc/engine/payload.rs | 13 +- crates/networking/rpc/eth/transaction.rs | 4 +- crates/networking/rpc/lib.rs | 2 +- crates/networking/rpc/rpc.rs | 27 +- crates/networking/rpc/utils.rs | 4 +- crates/storage/api.rs | 4 +- crates/storage/store.rs | 22 +- crates/storage/store_db/in_memory.rs | 4 +- crates/storage/store_db/rocksdb.rs | 237 +++++++++--------- tooling/ef_tests/blockchain/test_runner.rs | 2 +- .../state_v2/src/modules/block_runner.rs | 2 +- .../state_v2/src/modules/result_check.rs | 1 - tooling/migrations/src/cli.rs | 1 - 26 files changed, 215 insertions(+), 200 deletions(-) diff --git a/benches/benches/build_block_benchmark.rs b/benches/benches/build_block_benchmark.rs index e79733430a2..b28395646f9 100644 --- a/benches/benches/build_block_benchmark.rs +++ b/benches/benches/build_block_benchmark.rs @@ -203,7 +203,7 @@ pub async fn bench_payload(input: &(Arc, Block, &Store)) -> (Duratio // 3. engine_newPayload is called, this eventually calls Blockchain::add_block // which takes transactions from the mempool and fills the block with them. let since = Instant::now(); - blockchain.add_block(block).await.unwrap(); + blockchain.add_block(block).unwrap(); let executed = Instant::now(); // EXTRA: Sanity check to not benchmark n empty block. assert!( diff --git a/cmd/ethrex/cli.rs b/cmd/ethrex/cli.rs index a2895c45b11..3547e644d72 100644 --- a/cmd/ethrex/cli.rs +++ b/cmd/ethrex/cli.rs @@ -576,7 +576,6 @@ pub async fn import_blocks( blockchain .add_block(block) - .await .inspect_err(|err| match err { // Block number 1's parent not found, the chain must not belong to the same network as the genesis file ChainError::ParentNotFound if number == 1 => warn!("The chain file is not compatible with the genesis file. Are you sure you selected the correct network?"), diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index 43957fdacd9..83bb2e123e1 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -565,7 +565,7 @@ pub async fn regenerate_head_state( .await? .ok_or_else(|| eyre::eyre!("Block {i} not found"))?; - blockchain.add_block(block).await?; + blockchain.add_block(block)?; } info!("Finished regenerating state"); diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 383a9b2f891..29002fd3b22 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -139,14 +139,14 @@ impl Blockchain { } /// Executes a block withing a new vm instance and state - async fn execute_block( + fn execute_block( &self, block: &Block, ) -> Result<(BlockExecutionResult, Vec), ChainError> { // Validate if it can be the new head and find the parent let Ok(parent_header) = find_parent_header(&block.header, &self.storage) else { // If the parent is not present, we store it as pending. - self.storage.add_pending_block(block.clone()).await?; + self.storage.add_pending_block(block.clone())?; return Err(ChainError::ParentNotFound); }; @@ -156,7 +156,7 @@ impl Blockchain { validate_block(block, &parent_header, &chain_config, ELASTICITY_MULTIPLIER)?; let vm_db = StoreVmDatabase::new(self.storage.clone(), block.header.parent_hash); - let mut vm = self.new_evm(vm_db).await?; + let mut vm = self.new_evm(vm_db)?; let execution_result = vm.execute_block(block)?; let account_updates = vm.get_state_transitions()?; @@ -423,7 +423,7 @@ impl Blockchain { }) } - pub async fn store_block( + pub fn store_block( &self, block: Block, account_updates_list: AccountUpdatesList, @@ -442,20 +442,18 @@ impl Blockchain { self.storage .store_block_updates(update_batch) - .await .map_err(|e| e.into()) } - pub async fn add_block(&self, block: Block) -> Result<(), ChainError> { + pub fn add_block(&self, block: Block) -> Result<(), ChainError> { let since = Instant::now(); - let (res, updates) = self.execute_block(&block).await?; + let (res, updates) = self.execute_block(&block)?; let executed = Instant::now(); // Apply the account updates over the last block's state and compute the new state root let account_updates_list = self .storage - .apply_account_updates_batch(block.header.parent_hash, &updates) - .await? + .apply_account_updates_batch(block.header.parent_hash, &updates)? .ok_or(ChainError::ParentStateNotFound)?; let (gas_used, gas_limit, block_number, transactions_count) = ( @@ -466,7 +464,7 @@ impl Blockchain { ); let merkleized = Instant::now(); - let result = self.store_block(block, account_updates_list, res).await; + let result = self.store_block(block, account_updates_list, res); let stored = Instant::now(); if self.options.perf_logs_enabled { @@ -565,7 +563,7 @@ impl Blockchain { first_block_header.parent_hash, block_hash_cache, ); - let mut vm = self.new_evm(vm_db).await.map_err(|e| (e.into(), None))?; + let mut vm = self.new_evm(vm_db).map_err(|e| (e.into(), None))?; let blocks_len = blocks.len(); let mut all_receipts: Vec<(BlockHash, Vec)> = Vec::with_capacity(blocks_len); @@ -631,7 +629,6 @@ impl Blockchain { let account_updates_list = self .storage .apply_account_updates_batch(first_block_header.parent_hash, &account_updates) - .await .map_err(|e| (e.into(), None))? .ok_or((ChainError::ParentStateNotFound, None))?; @@ -653,7 +650,6 @@ impl Blockchain { self.storage .store_block_updates(update_batch) - .await .map_err(|e| (e.into(), None))?; let elapsed_seconds = interval.elapsed().as_secs_f64(); @@ -942,11 +938,12 @@ impl Blockchain { Ok(result) } - pub async fn new_evm(&self, vm_db: StoreVmDatabase) -> Result { + pub fn new_evm(&self, vm_db: StoreVmDatabase) -> Result { let evm = match &self.options.r#type { BlockchainType::L1 => Evm::new_for_l1(vm_db), BlockchainType::L2(l2_config) => { - Evm::new_for_l2(vm_db, *l2_config.fee_config.read().await)? + unreachable!() + // Evm::new_for_l2(vm_db, *l2_config.fee_config.read().await)? } }; Ok(evm) diff --git a/crates/blockchain/payload.rs b/crates/blockchain/payload.rs index da8e763616e..1c761d7fb41 100644 --- a/crates/blockchain/payload.rs +++ b/crates/blockchain/payload.rs @@ -638,8 +638,7 @@ impl Blockchain { let ret_acount_updates_list = self .storage - .apply_account_updates_batch(context.parent_hash(), &account_updates) - .await? + .apply_account_updates_batch(context.parent_hash(), &account_updates)? .ok_or(ChainError::ParentStateNotFound)?; let state_root = ret_acount_updates_list.state_trie_hash; diff --git a/crates/blockchain/smoke_test.rs b/crates/blockchain/smoke_test.rs index d2a2fb42094..c79a3864a62 100644 --- a/crates/blockchain/smoke_test.rs +++ b/crates/blockchain/smoke_test.rs @@ -30,7 +30,7 @@ mod blockchain_integration_test { // Add first block. We'll make it canonical. let block_1a = new_block(&store, &genesis_header).await; let hash_1a = block_1a.hash(); - blockchain.add_block(block_1a.clone()).await.unwrap(); + blockchain.add_block(block_1a.clone()).unwrap(); store .forkchoice_update(None, 1, hash_1a, None, None) .await @@ -45,7 +45,6 @@ mod blockchain_integration_test { let hash_1b = block_1b.hash(); blockchain .add_block(block_1b.clone()) - .await .expect("Could not add block 1b."); let retrieved_1b = store.get_block_header_by_hash(hash_1b).unwrap().unwrap(); @@ -57,7 +56,6 @@ mod blockchain_integration_test { let hash_2 = block_2.hash(); blockchain .add_block(block_2.clone()) - .await .expect("Could not add block 2."); let retrieved_2 = store.get_block_header_by_hash(hash_2).unwrap(); @@ -92,7 +90,7 @@ mod blockchain_integration_test { // Build a single valid block. let block_1 = new_block(&store, &genesis_header).await; let hash_1 = block_1.hash(); - blockchain.add_block(block_1.clone()).await.unwrap(); + blockchain.add_block(block_1.clone()).unwrap(); apply_fork_choice(&store, hash_1, H256::zero(), H256::zero()) .await .unwrap(); @@ -101,7 +99,7 @@ mod blockchain_integration_test { let mut block_2 = new_block(&store, &block_1.header).await; block_2.header.parent_hash = H256::random(); let hash_2 = block_2.hash(); - let result = blockchain.add_block(block_2.clone()).await; + let result = blockchain.add_block(block_2.clone()); assert!(matches!(result, Err(ChainError::ParentNotFound))); // block 2 should now be pending. @@ -127,7 +125,7 @@ mod blockchain_integration_test { // Add first block. Not canonical. let block_1a = new_block(&store, &genesis_header).await; let hash_1a = block_1a.hash(); - blockchain.add_block(block_1a.clone()).await.unwrap(); + blockchain.add_block(block_1a.clone()).unwrap(); let retrieved_1a = store.get_block_header_by_hash(hash_1a).unwrap().unwrap(); assert!(!is_canonical(&store, 1, hash_1a).await.unwrap()); @@ -137,7 +135,6 @@ mod blockchain_integration_test { let hash_1b = block_1b.hash(); blockchain .add_block(block_1b.clone()) - .await .expect("Could not add block 1b."); apply_fork_choice(&store, hash_1b, genesis_hash, genesis_hash) .await @@ -154,7 +151,6 @@ mod blockchain_integration_test { let hash_2 = block_2.hash(); blockchain .add_block(block_2.clone()) - .await .expect("Could not add block 2."); apply_fork_choice(&store, hash_2, genesis_hash, genesis_hash) .await @@ -201,7 +197,6 @@ mod blockchain_integration_test { let hash_1 = block_1.hash(); blockchain .add_block(block_1.clone()) - .await .expect("Could not add block 1b."); // Add child at height 2. @@ -209,7 +204,6 @@ mod blockchain_integration_test { let hash_2 = block_2.hash(); blockchain .add_block(block_2.clone()) - .await .expect("Could not add block 2."); assert!(!is_canonical(&store, 1, hash_1).await.unwrap()); @@ -253,7 +247,6 @@ mod blockchain_integration_test { let block_1 = new_block(&store, &genesis_header).await; blockchain .add_block(block_1.clone()) - .await .expect("Could not add block 1b."); // Add child at height 2. @@ -261,7 +254,6 @@ mod blockchain_integration_test { let hash_2 = block_2.hash(); blockchain .add_block(block_2.clone()) - .await .expect("Could not add block 2."); assert_eq!( @@ -281,7 +273,6 @@ mod blockchain_integration_test { let hash_b = block_1b.hash(); blockchain .add_block(block_1b.clone()) - .await .expect("Could not add block b."); // The latest block should be the same. diff --git a/crates/blockchain/tracing.rs b/crates/blockchain/tracing.rs index 958577d0926..445b98ffe84 100644 --- a/crates/blockchain/tracing.rs +++ b/crates/blockchain/tracing.rs @@ -107,7 +107,7 @@ impl Blockchain { parent_hash, block_hash_cache, ); - let mut vm = self.new_evm(vm_db).await?; + let mut vm = self.new_evm(vm_db)?; // Run parents to rebuild pre-state for block in blocks_to_re_execute.iter().rev() { vm.rerun_block(block, None)?; diff --git a/crates/l2/based/block_fetcher.rs b/crates/l2/based/block_fetcher.rs index 982756a0a22..39f871a92e3 100644 --- a/crates/l2/based/block_fetcher.rs +++ b/crates/l2/based/block_fetcher.rs @@ -269,7 +269,7 @@ impl BlockFetcher { async fn store_batch(&mut self, batch: &[Block]) -> Result<(), BlockFetcherError> { for block in batch.iter() { - self.blockchain.add_block(block.clone()).await?; + self.blockchain.add_block(block.clone())?; let block_hash = block.hash(); @@ -358,7 +358,7 @@ impl BlockFetcher { let mut acc_account_updates: HashMap = HashMap::new(); for block in batch { let vm_db = StoreVmDatabase::new(self.store.clone(), block.header.parent_hash); - let mut vm = self.blockchain.new_evm(vm_db).await?; + let mut vm = self.blockchain.new_evm(vm_db)?; vm.execute_block(block) .map_err(BlockFetcherError::EvmError)?; let account_updates = vm diff --git a/crates/l2/networking/rpc/rpc.rs b/crates/l2/networking/rpc/rpc.rs index 126dd51b50c..7f712c28aa1 100644 --- a/crates/l2/networking/rpc/rpc.rs +++ b/crates/l2/networking/rpc/rpc.rs @@ -86,6 +86,7 @@ pub async fn start_api( // TODO: Refactor how filters are handled, // filters are used by the filters endpoints (eth_newFilter, eth_getFilterChanges, ...etc) let active_filters = Arc::new(Mutex::new(HashMap::new())); + let block_worker_channel = ethrex_rpc::start_block_executor(blockchain.clone()); let service_context = RpcApiContext { l1_ctx: ethrex_rpc::RpcApiContext { storage, @@ -103,6 +104,7 @@ pub async fn start_api( gas_tip_estimator: Arc::new(TokioMutex::new(GasTipEstimator::new())), log_filter_handler, gas_ceil, + block_worker_channel, }, valid_delegation_addresses, sponsor_pk, diff --git a/crates/l2/sequencer/block_producer.rs b/crates/l2/sequencer/block_producer.rs index b4d4e36ff71..d34095f0ad1 100644 --- a/crates/l2/sequencer/block_producer.rs +++ b/crates/l2/sequencer/block_producer.rs @@ -203,8 +203,7 @@ impl BlockProducer { let account_updates_list = self .store - .apply_account_updates_batch(block.header.parent_hash, &account_updates) - .await? + .apply_account_updates_batch(block.header.parent_hash, &account_updates)? .ok_or(ChainError::ParentStateNotFound)?; let transactions_count = block.body.transactions.len(); @@ -212,8 +211,7 @@ impl BlockProducer { let block_hash = block.hash(); self.store_fee_config_by_block(block.header.number).await?; self.blockchain - .store_block(block, account_updates_list, execution_result) - .await?; + .store_block(block, account_updates_list, execution_result)?; info!( "Stored new block {:x}, transaction_count {}", block_hash, transactions_count diff --git a/crates/l2/sequencer/l1_committer.rs b/crates/l2/sequencer/l1_committer.rs index 6fc1c0fef2d..dfda040444a 100644 --- a/crates/l2/sequencer/l1_committer.rs +++ b/crates/l2/sequencer/l1_committer.rs @@ -424,7 +424,7 @@ impl L1Committer { let vm_db = StoreVmDatabase::new(self.store.clone(), block_to_commit.header.parent_hash); - let mut vm = self.blockchain.new_evm(vm_db).await?; + let mut vm = self.blockchain.new_evm(vm_db)?; vm.execute_block(&block_to_commit)?; vm.get_state_transitions()? }; diff --git a/crates/networking/p2p/rlpx/l2/l2_connection.rs b/crates/networking/p2p/rlpx/l2/l2_connection.rs index 9ae236a0282..01cbf584279 100644 --- a/crates/networking/p2p/rlpx/l2/l2_connection.rs +++ b/crates/networking/p2p/rlpx/l2/l2_connection.rs @@ -393,19 +393,15 @@ async fn process_new_block( let block = Arc::::try_unwrap(block).map_err(|_| { PeerConnectionError::InternalError("Failed to take ownership of block".to_string()) })?; - established - .blockchain - .add_block(block) - .await - .inspect_err(|e| { - log_peer_error!( - &established.node, - &format!( - "Error adding new block {} with hash {:?}, error: {e}", - block_number, block_hash - ), - ); - })?; + established.blockchain.add_block(block).inspect_err(|e| { + log_peer_error!( + &established.node, + &format!( + "Error adding new block {} with hash {:?}, error: {e}", + block_number, block_hash + ), + ); + })?; apply_fork_choice(&established.storage, block_hash, block_hash, block_hash) .await diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 9c129009e23..78a0a916c79 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -548,7 +548,7 @@ impl Syncer { let mut last_valid_hash = H256::default(); for block in blocks { let block_hash = block.hash(); - blockchain.add_block(block).await.map_err(|e| { + blockchain.add_block(block).map_err(|e| { ( e, Some(BatchBlockProcessingFailure { diff --git a/crates/networking/rpc/engine/payload.rs b/crates/networking/rpc/engine/payload.rs index 1ecfeed8a9a..1118b93f1f3 100644 --- a/crates/networking/rpc/engine/payload.rs +++ b/crates/networking/rpc/engine/payload.rs @@ -7,6 +7,7 @@ use ethrex_common::{H256, U256}; use ethrex_p2p::sync::SyncMode; use ethrex_rlp::error::RLPDecodeError; use serde_json::Value; +use tokio::sync::oneshot; use tracing::{debug, error, info, warn}; use crate::rpc::{RpcApiContext, RpcHandler}; @@ -678,6 +679,16 @@ fn validate_block_hash(payload: &ExecutionPayload, block: &Block) -> Result<(), Ok(()) } +pub async fn add_block(ctx: &RpcApiContext, block: Block) -> Result<(), ChainError> { + let (notify_send, notify_recv) = oneshot::channel(); + ctx.block_worker_channel + .send((notify_send, block)) + .map_err(|e| ChainError::Custom(format!("failed to send message: {e}")))?; + notify_recv + .await + .map_err(|e| ChainError::Custom(format!("recv failed: {e}")))? +} + async fn try_execute_payload( block: Block, context: &RpcApiContext, @@ -696,7 +707,7 @@ async fn try_execute_payload( // Execute and store the block info!(%block_hash, %block_number, "Executing payload"); - match context.blockchain.add_block(block).await { + match add_block(context, block).await { Err(ChainError::ParentNotFound) => { // Start sync context.syncer.sync_to_head(block_hash); diff --git a/crates/networking/rpc/eth/transaction.rs b/crates/networking/rpc/eth/transaction.rs index 9ce440bb6e2..0b048e5da8b 100644 --- a/crates/networking/rpc/eth/transaction.rs +++ b/crates/networking/rpc/eth/transaction.rs @@ -348,7 +348,7 @@ impl RpcHandler for CreateAccessListRequest { }; let vm_db = StoreVmDatabase::new(context.storage.clone(), header.hash()); - let mut vm = context.blockchain.new_evm(vm_db).await?; + let mut vm = context.blockchain.new_evm(vm_db)?; // Run transaction and obtain access list let (gas_used, access_list, error) = vm.create_access_list(&self.transaction, &header)?; @@ -572,7 +572,7 @@ async fn simulate_tx( blockchain: Arc, ) -> Result { let vm_db = StoreVmDatabase::new(storage.clone(), block_header.hash()); - let mut vm = blockchain.new_evm(vm_db).await?; + let mut vm = blockchain.new_evm(vm_db)?; match vm.simulate_tx_from_generic(transaction, block_header)? { ExecutionResult::Revert { diff --git a/crates/networking/rpc/lib.rs b/crates/networking/rpc/lib.rs index d52993406b0..8283e610498 100644 --- a/crates/networking/rpc/lib.rs +++ b/crates/networking/rpc/lib.rs @@ -16,7 +16,7 @@ pub mod types; pub mod utils; pub use clients::{EngineClient, EthClient}; -pub use rpc::start_api; +pub use rpc::{start_api, start_block_executor}; // TODO: These exports are needed by ethrex-l2-rpc, but we do not want to // export them in the public API of this crate. diff --git a/crates/networking/rpc/rpc.rs b/crates/networking/rpc/rpc.rs index 4d307e75e29..434688ebf71 100644 --- a/crates/networking/rpc/rpc.rs +++ b/crates/networking/rpc/rpc.rs @@ -53,6 +53,8 @@ use axum_extra::{ }; use bytes::Bytes; use ethrex_blockchain::Blockchain; +use ethrex_blockchain::error::ChainError; +use ethrex_common::types::Block; use ethrex_p2p::peer_handler::PeerHandler; use ethrex_p2p::sync_manager::SyncManager; use ethrex_p2p::types::Node; @@ -67,7 +69,12 @@ use std::{ sync::{Arc, Mutex}, time::Duration, }; -use tokio::{net::TcpListener, sync::Mutex as TokioMutex}; +use tokio::net::TcpListener; +use tokio::sync::{ + Mutex as TokioMutex, + mpsc::{UnboundedSender, unbounded_channel}, + oneshot, +}; use tower_http::cors::CorsLayer; use tracing::{error, info}; use tracing_subscriber::{EnvFilter, Registry, reload}; @@ -164,6 +171,7 @@ pub struct RpcApiContext { pub gas_tip_estimator: Arc>, pub log_filter_handler: Option>, pub gas_ceil: u64, + pub block_worker_channel: UnboundedSender<(oneshot::Sender>, Block)>, } #[derive(Debug, Clone)] @@ -195,6 +203,21 @@ pub const FILTER_DURATION: Duration = { } }; +pub fn start_block_executor( + blockchain: Arc, +) -> UnboundedSender<(oneshot::Sender>, Block)> { + let (block_worker_channel, mut block_receiver) = + unbounded_channel::<(oneshot::Sender>, Block)>(); + std::thread::spawn(move || { + while let Some((notify, block)) = block_receiver.blocking_recv() { + let _ = notify + .send(blockchain.add_block(block)) + .inspect_err(|_| ::tracing::error!("failed to notify caller")); + } + }); + block_worker_channel +} + #[allow(clippy::too_many_arguments)] pub async fn start_api( http_addr: SocketAddr, @@ -215,6 +238,7 @@ pub async fn start_api( // TODO: Refactor how filters are handled, // filters are used by the filters endpoints (eth_newFilter, eth_getFilterChanges, ...etc) let active_filters = Arc::new(Mutex::new(HashMap::new())); + let block_worker_channel = start_block_executor(blockchain.clone()); let service_context = RpcApiContext { storage, blockchain, @@ -231,6 +255,7 @@ pub async fn start_api( gas_tip_estimator: Arc::new(TokioMutex::new(GasTipEstimator::new())), log_filter_handler, gas_ceil, + block_worker_channel, }; // Periodically clean up the active filters for the filters endpoints. diff --git a/crates/networking/rpc/utils.rs b/crates/networking/rpc/utils.rs index c28998ac90b..e54b1125db9 100644 --- a/crates/networking/rpc/utils.rs +++ b/crates/networking/rpc/utils.rs @@ -342,7 +342,7 @@ pub mod test_utils { use crate::{ eth::gas_tip_estimator::GasTipEstimator, - rpc::{NodeData, RpcApiContext, start_api}, + rpc::{NodeData, RpcApiContext, start_api, start_block_executor}, }; pub const TEST_GENESIS: &str = include_str!("../../../fixtures/genesis/l1.json"); @@ -409,6 +409,7 @@ pub mod test_utils { pub async fn default_context_with_storage(storage: Store) -> RpcApiContext { let blockchain = Arc::new(Blockchain::default_with_store(storage.clone())); let local_node_record = example_local_node_record(); + let block_worker_channel = start_block_executor(blockchain.clone()); RpcApiContext { storage, blockchain, @@ -425,6 +426,7 @@ pub mod test_utils { gas_tip_estimator: Arc::new(TokioMutex::new(GasTipEstimator::new())), log_filter_handler: None, gas_ceil: DEFAULT_BUILDER_GAS_CEIL, + block_worker_channel, } } } diff --git a/crates/storage/api.rs b/crates/storage/api.rs index d3246a167c1..02c7a59f627 100644 --- a/crates/storage/api.rs +++ b/crates/storage/api.rs @@ -14,7 +14,7 @@ use ethrex_trie::{Nibbles, Trie}; #[async_trait::async_trait] pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { /// Store changes in a batch from a vec of blocks - async fn apply_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError>; + fn apply_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError>; /// Add a batch of blocks in a single transaction. /// This will store -> BlockHeader, BlockBody, BlockTransactions, BlockNumber. @@ -76,7 +76,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { block_hash: BlockHash, ) -> Result, StoreError>; - async fn add_pending_block(&self, block: Block) -> Result<(), StoreError>; + fn add_pending_block(&self, block: Block) -> Result<(), StoreError>; async fn get_pending_block(&self, block_hash: BlockHash) -> Result, StoreError>; /// Add block number for a given hash diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 080f39768f0..15e1993d130 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -69,8 +69,8 @@ pub struct AccountUpdatesList { } impl Store { - pub async fn store_block_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError> { - self.engine.apply_updates(update_batch).await + pub fn store_block_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError> { + self.engine.apply_updates(update_batch) } pub fn new(path: impl AsRef, engine_type: EngineType) -> Result { @@ -254,9 +254,9 @@ impl Store { self.engine.get_block_bodies_by_hash(hashes).await } - pub async fn add_pending_block(&self, block: Block) -> Result<(), StoreError> { + pub fn add_pending_block(&self, block: Block) -> Result<(), StoreError> { info!("Adding block to pending: {}", block.hash()); - self.engine.add_pending_block(block).await + self.engine.add_pending_block(block) } pub async fn get_pending_block( @@ -360,7 +360,7 @@ impl Store { /// Applies account updates based on the block's latest storage state /// and returns the new state root after the updates have been applied. #[instrument(level = "trace", name = "Trie update", skip_all)] - pub async fn apply_account_updates_batch( + pub fn apply_account_updates_batch( &self, block_hash: BlockHash, account_updates: &[AccountUpdate], @@ -369,16 +369,16 @@ impl Store { return Ok(None); }; - Ok(Some( - self.apply_account_updates_from_trie_batch(state_trie, account_updates) - .await?, - )) + Ok(Some(self.apply_account_updates_from_trie_batch( + state_trie, + account_updates, + )?)) } - pub async fn apply_account_updates_from_trie_batch( + pub fn apply_account_updates_from_trie_batch<'a>( &self, mut state_trie: Trie, - account_updates: impl IntoIterator, + account_updates: impl IntoIterator, ) -> Result { let mut ret_storage_updates = Vec::new(); let mut code_updates = Vec::new(); diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index 4557ec2c02e..3bcf28b9f5a 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -86,7 +86,7 @@ impl Store { #[async_trait::async_trait] impl StoreEngine for Store { - async fn apply_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError> { + fn apply_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError> { let mut store = self.inner()?; // Store trie updates @@ -246,7 +246,7 @@ impl StoreEngine for Store { Ok(()) } - async fn add_pending_block(&self, block: Block) -> Result<(), StoreError> { + fn add_pending_block(&self, block: Block) -> Result<(), StoreError> { self.inner()?.pending_blocks.insert(block.hash(), block); Ok(()) } diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index b9f71221d98..40d46680ab7 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -611,7 +611,7 @@ impl Store { #[async_trait::async_trait] impl StoreEngine for Store { - async fn apply_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError> { + fn apply_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError> { let db = self.db.clone(); let trie_cache = self.trie_cache.clone(); let parent_state_root = self @@ -633,138 +633,133 @@ impl StoreEngine for Store { .state_root; let flatkeyvalue_control_tx = self.flatkeyvalue_control_tx.clone(); - tokio::task::spawn_blocking(move || { - let _span = tracing::trace_span!("Block DB update").entered(); - - let [ - cf_trie_nodes, - cf_flatkeyvalue, - cf_receipts, - cf_codes, - cf_block_numbers, - cf_tx_locations, - cf_headers, - cf_bodies, - cf_misc, - ] = open_cfs( - &db, - [ - CF_TRIE_NODES, - CF_FLATKEYVALUE, - CF_RECEIPTS, - CF_ACCOUNT_CODES, - CF_BLOCK_NUMBERS, - CF_TRANSACTION_LOCATIONS, - CF_HEADERS, - CF_BODIES, - CF_MISC_VALUES, - ], - )?; + let _span = tracing::trace_span!("Block DB update").entered(); + + let [ + cf_trie_nodes, + cf_flatkeyvalue, + cf_receipts, + cf_codes, + cf_block_numbers, + cf_tx_locations, + cf_headers, + cf_bodies, + cf_misc, + ] = open_cfs( + &db, + [ + CF_TRIE_NODES, + CF_FLATKEYVALUE, + CF_RECEIPTS, + CF_ACCOUNT_CODES, + CF_BLOCK_NUMBERS, + CF_TRANSACTION_LOCATIONS, + CF_HEADERS, + CF_BODIES, + CF_MISC_VALUES, + ], + )?; - let mut batch = WriteBatch::default(); + let mut batch = WriteBatch::default(); - let mut updated_trie = false; + let mut updated_trie = false; - let mut trie = trie_cache.write().map_err(|_| StoreError::LockError)?; - if let Some(root) = trie.get_commitable(parent_state_root, COMMIT_THRESHOLD) { - updated_trie = true; - // If the channel is closed, there's nobody to notify - let _ = flatkeyvalue_control_tx.send(FKVGeneratorControlMessage::Stop); + let mut trie = trie_cache.write().map_err(|_| StoreError::LockError)?; + if let Some(root) = trie.get_commitable(parent_state_root, COMMIT_THRESHOLD) { + updated_trie = true; + // If the channel is closed, there's nobody to notify + let _ = flatkeyvalue_control_tx.send(FKVGeneratorControlMessage::Stop); - let last_written = db.get_cf(&cf_misc, "last_written")?.unwrap_or_default(); - let nodes = trie.commit(root).unwrap_or_default(); - for (key, value) in nodes { - let is_leaf = key.len() == 65 || key.len() == 131; + let last_written = db.get_cf(&cf_misc, "last_written")?.unwrap_or_default(); + let nodes = trie.commit(root).unwrap_or_default(); + for (key, value) in nodes { + let is_leaf = key.len() == 65 || key.len() == 131; - if is_leaf && key > last_written { - continue; - } - let cf = if is_leaf { - &cf_flatkeyvalue - } else { - &cf_trie_nodes - }; - if value.is_empty() { - batch.delete_cf(cf, key); - } else { - batch.put_cf(cf, key, value); - } + if is_leaf && key > last_written { + continue; + } + let cf = if is_leaf { + &cf_flatkeyvalue + } else { + &cf_trie_nodes + }; + if value.is_empty() { + batch.delete_cf(cf, key); + } else { + batch.put_cf(cf, key, value); } } - trie.put_batch( - parent_state_root, - last_state_root, - update_batch - .storage_updates - .into_iter() - .flat_map(|(account_hash, nodes)| { - nodes - .into_iter() - .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) - }) - .chain(update_batch.account_updates) - .collect(), - ); - - for block in update_batch.blocks { - let block_number = block.header.number; - let block_hash = block.hash(); + } + trie.put_batch( + parent_state_root, + last_state_root, + update_batch + .storage_updates + .into_iter() + .flat_map(|(account_hash, nodes)| { + nodes + .into_iter() + .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) + }) + .chain(update_batch.account_updates) + .collect(), + ); - let hash_key_rlp = BlockHashRLP::from(block_hash); - let header_value_rlp = BlockHeaderRLP::from(block.header.clone()); - batch.put_cf(&cf_headers, hash_key_rlp.bytes(), header_value_rlp.bytes()); + for block in update_batch.blocks { + let block_number = block.header.number; + let block_hash = block.hash(); - let hash_key: AccountCodeHashRLP = block_hash.into(); - let body_value = BlockBodyRLP::from_bytes(block.body.encode_to_vec()); - batch.put_cf(&cf_bodies, hash_key.bytes(), body_value.bytes()); + let hash_key_rlp = BlockHashRLP::from(block_hash); + let header_value_rlp = BlockHeaderRLP::from(block.header.clone()); + batch.put_cf(&cf_headers, hash_key_rlp.bytes(), header_value_rlp.bytes()); - let hash_key = BlockHashRLP::from(block_hash).bytes().clone(); - batch.put_cf(&cf_block_numbers, hash_key, block_number.to_le_bytes()); + let hash_key: AccountCodeHashRLP = block_hash.into(); + let body_value = BlockBodyRLP::from_bytes(block.body.encode_to_vec()); + batch.put_cf(&cf_bodies, hash_key.bytes(), body_value.bytes()); - for (index, transaction) in block.body.transactions.iter().enumerate() { - let tx_hash = transaction.hash(); - // Key: tx_hash + block_hash - let mut composite_key = Vec::with_capacity(64); - composite_key.extend_from_slice(tx_hash.as_bytes()); - composite_key.extend_from_slice(block_hash.as_bytes()); - let location_value = (block_number, block_hash, index as u64).encode_to_vec(); - batch.put_cf(&cf_tx_locations, composite_key, location_value); - } + let hash_key = BlockHashRLP::from(block_hash).bytes().clone(); + batch.put_cf(&cf_block_numbers, hash_key, block_number.to_le_bytes()); + + for (index, transaction) in block.body.transactions.iter().enumerate() { + let tx_hash = transaction.hash(); + // Key: tx_hash + block_hash + let mut composite_key = Vec::with_capacity(64); + composite_key.extend_from_slice(tx_hash.as_bytes()); + composite_key.extend_from_slice(block_hash.as_bytes()); + let location_value = (block_number, block_hash, index as u64).encode_to_vec(); + batch.put_cf(&cf_tx_locations, composite_key, location_value); } + } - for (block_hash, receipts) in update_batch.receipts { - for (index, receipt) in receipts.into_iter().enumerate() { - let key = (block_hash, index as u64).encode_to_vec(); - let value = receipt.encode_to_vec(); - batch.put_cf(&cf_receipts, key, value); - } + for (block_hash, receipts) in update_batch.receipts { + for (index, receipt) in receipts.into_iter().enumerate() { + let key = (block_hash, index as u64).encode_to_vec(); + let value = receipt.encode_to_vec(); + batch.put_cf(&cf_receipts, key, value); } + } - for (code_hash, code) in update_batch.code_updates { - let mut buf = - Vec::with_capacity(6 + code.bytecode.len() + 2 * code.jump_targets.len()); - code.bytecode.encode(&mut buf); - code.jump_targets - .into_iter() - .flat_map(|t| t.to_le_bytes()) - .collect::>() - .as_slice() - .encode(&mut buf); - batch.put_cf(&cf_codes, code_hash.0, buf); - } + for (code_hash, code) in update_batch.code_updates { + let mut buf = Vec::with_capacity(6 + code.bytecode.len() + 2 * code.jump_targets.len()); + code.bytecode.encode(&mut buf); + code.jump_targets + .into_iter() + .flat_map(|t| t.to_le_bytes()) + .collect::>() + .as_slice() + .encode(&mut buf); + batch.put_cf(&cf_codes, code_hash.0, buf); + } - // Single write operation - let ret = db - .write(batch) - .map_err(|e| StoreError::Custom(format!("RocksDB batch write error: {}", e))); - if updated_trie { - // If the channel is closed, there's nobody to notify - let _ = flatkeyvalue_control_tx.send(FKVGeneratorControlMessage::Continue); - } - ret - }) - .await - .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))? + // Single write operation + let ret = db + .write(batch) + .map_err(|e| StoreError::Custom(format!("RocksDB batch write error: {}", e))); + if updated_trie { + // If the channel is closed, there's nobody to notify + let _ = flatkeyvalue_control_tx.send(FKVGeneratorControlMessage::Continue); + } + ret } /// Add a batch of blocks in a single transaction. @@ -985,11 +980,13 @@ impl StoreEngine for Store { .map_err(StoreError::from) } - async fn add_pending_block(&self, block: Block) -> Result<(), StoreError> { + fn add_pending_block(&self, block: Block) -> Result<(), StoreError> { let hash_key = BlockHashRLP::from(block.hash()).bytes().clone(); let block_value = BlockRLP::from(block).bytes().clone(); - self.write_async(CF_PENDING_BLOCKS, hash_key, block_value) - .await + let cf = self.cf_handle(CF_PENDING_BLOCKS)?; + self.db + .put_cf(&cf, hash_key, block_value) + .map_err(StoreError::RocksdbError) } async fn get_pending_block(&self, block_hash: BlockHash) -> Result, StoreError> { diff --git a/tooling/ef_tests/blockchain/test_runner.rs b/tooling/ef_tests/blockchain/test_runner.rs index 481ca64763d..adbe378457e 100644 --- a/tooling/ef_tests/blockchain/test_runner.rs +++ b/tooling/ef_tests/blockchain/test_runner.rs @@ -112,7 +112,7 @@ async fn run( let hash = block.hash(); // Attempt to add the block as the head of the chain - let chain_result = blockchain.add_block(block).await; + let chain_result = blockchain.add_block(block); match chain_result { Err(error) => { diff --git a/tooling/ef_tests/state_v2/src/modules/block_runner.rs b/tooling/ef_tests/state_v2/src/modules/block_runner.rs index 5ba7bd6e199..7d704f68b9b 100644 --- a/tooling/ef_tests/state_v2/src/modules/block_runner.rs +++ b/tooling/ef_tests/state_v2/src/modules/block_runner.rs @@ -149,7 +149,7 @@ pub async fn run_test(test: &Test, test_case: &TestCase) -> Result<(), RunnerErr ethrex_blockchain::BlockchainOptions::default(), ); - let result = blockchain.add_block(block).await; + let result = blockchain.add_block(block); if result.is_err() && test_case.post.expected_exceptions.is_none() { return Err(RunnerError::Custom( diff --git a/tooling/ef_tests/state_v2/src/modules/result_check.rs b/tooling/ef_tests/state_v2/src/modules/result_check.rs index 0e2683ec3c1..d014fe7e03f 100644 --- a/tooling/ef_tests/state_v2/src/modules/result_check.rs +++ b/tooling/ef_tests/state_v2/src/modules/result_check.rs @@ -123,7 +123,6 @@ pub async fn post_state_root( ) -> H256 { let ret_account_updates_batch = store .apply_account_updates_batch(initial_block_hash, account_updates) - .await .unwrap() .unwrap(); ret_account_updates_batch.state_trie_hash diff --git a/tooling/migrations/src/cli.rs b/tooling/migrations/src/cli.rs index ffd6e4b90c2..013ee93ea6a 100644 --- a/tooling/migrations/src/cli.rs +++ b/tooling/migrations/src/cli.rs @@ -122,7 +122,6 @@ async fn migrate_libmdbx_to_rocksdb( let block_hash = block.hash(); blockchain .add_block(block) - .await .unwrap_or_else(|e| panic!("Cannot add block {block_number} to rocksdb store: {e}")); added_blocks.push((block_number, block_hash)); } From b7b1cb47ccea5b7fafb919bae4a918026694419c Mon Sep 17 00:00:00 2001 From: Mario Rugiero Date: Sun, 26 Oct 2025 19:07:17 -0300 Subject: [PATCH 02/11] set cpu affinity --- Cargo.lock | 12 ++++++++++++ Cargo.toml | 1 + cmd/ethrex/ethrex.rs | 28 +++++++++++++++++++++++++--- crates/networking/rpc/Cargo.toml | 1 + crates/networking/rpc/rpc.rs | 4 ++++ 5 files changed, 43 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 05916ae26d0..891a7f3d1e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1966,6 +1966,17 @@ dependencies = [ "libc", ] +[[package]] +name = "core_affinity" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342" +dependencies = [ + "libc", + "num_cpus", + "winapi", +] + [[package]] name = "cpp_demangle" version = "0.4.5" @@ -3959,6 +3970,7 @@ dependencies = [ "axum 0.8.6", "axum-extra", "bytes", + "core_affinity", "envy", "ethereum-types 0.15.1", "ethrex-blockchain", diff --git a/Cargo.toml b/Cargo.toml index d86c3574955..6612e948cc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,6 +82,7 @@ tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } async-trait = "0.1.88" +core_affinity = "0.8.3" ethereum-types = { version = "0.15.1", features = ["serialize"] } serde = { version = "1.0.203", features = ["derive"] } serde_with = "3.11.0" diff --git a/cmd/ethrex/ethrex.rs b/cmd/ethrex/ethrex.rs index a402852b843..c0c15fb3801 100644 --- a/cmd/ethrex/ethrex.rs +++ b/cmd/ethrex/ethrex.rs @@ -5,7 +5,11 @@ use ethrex::{ utils::{NodeConfigFile, get_client_version, store_node_config_file}, }; use ethrex_p2p::{discv4::peer_table::PeerTable, types::NodeRecord}; -use std::{path::Path, sync::Arc, time::Duration}; +use std::{ + path::Path, + sync::{Arc, atomic::AtomicUsize}, + time::Duration, +}; use tokio::{ signal::unix::{SignalKind, signal}, sync::Mutex, @@ -48,8 +52,7 @@ async fn server_shutdown( info!("Server shutting down!"); } -#[tokio::main] -async fn main() -> eyre::Result<()> { +async fn ethrex_main() -> eyre::Result<()> { let CLI { opts, command } = CLI::parse(); if let Some(subcommand) = command { @@ -78,3 +81,22 @@ async fn main() -> eyre::Result<()> { Ok(()) } + +pub fn main() -> eyre::Result<()> { + let mut core = AtomicUsize::new(0); + let cores = core_affinity::get_core_ids().unwrap_or_default(); + // Reserve core 0 and 1 for OS, 2 for block execution. + let count = cores.len().saturating_sub(3); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(count) + .on_thread_start(|| { + let core_offset = core.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if let Some(core_id) = cores.get(3 + core_offset.rem_euclid(count)) { + core_affinity::set_for_current(*core_id); + } + }) + .build() + .unwrap() + .block_on(ethrex_main()) +} diff --git a/crates/networking/rpc/Cargo.toml b/crates/networking/rpc/Cargo.toml index ef77fbe85e2..8dab31ca49f 100644 --- a/crates/networking/rpc/Cargo.toml +++ b/crates/networking/rpc/Cargo.toml @@ -9,6 +9,7 @@ documentation.workspace = true [dependencies] axum = { features = ["ws"], workspace = true } +core_affinity.workspace = true tower-http.workspace = true serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.117" diff --git a/crates/networking/rpc/rpc.rs b/crates/networking/rpc/rpc.rs index 434688ebf71..849f173e385 100644 --- a/crates/networking/rpc/rpc.rs +++ b/crates/networking/rpc/rpc.rs @@ -209,6 +209,10 @@ pub fn start_block_executor( let (block_worker_channel, mut block_receiver) = unbounded_channel::<(oneshot::Sender>, Block)>(); std::thread::spawn(move || { + let cores = core_affinity::get_core_ids().unwrap_or_default(); + if let Some(core_id) = cores.get(2) { + core_affinity::set_for_current(*core_id); + } while let Some((notify, block)) = block_receiver.blocking_recv() { let _ = notify .send(blockchain.add_block(block)) From cabab442e00f4aef496f720ee1e5623851c8d355 Mon Sep 17 00:00:00 2001 From: Mario Rugiero Date: Sun, 26 Oct 2025 19:08:58 -0300 Subject: [PATCH 03/11] missing dep --- Cargo.lock | 1 + cmd/ethrex/Cargo.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 891a7f3d1e4..98af80adeb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3526,6 +3526,7 @@ dependencies = [ "bytes", "clap 4.5.48", "clap_complete", + "core_affinity", "directories 5.0.1", "ethrex-blockchain", "ethrex-common 5.0.0", diff --git a/cmd/ethrex/Cargo.toml b/cmd/ethrex/Cargo.toml index e91f6dc5304..21713d9c023 100644 --- a/cmd/ethrex/Cargo.toml +++ b/cmd/ethrex/Cargo.toml @@ -31,6 +31,7 @@ tikv-jemallocator = { version = "0.6.0", optional = true, features = [ "unprefixed_malloc_on_supported_platforms", ] } bytes.workspace = true +core_affinity.workspace = true hex.workspace = true tracing.workspace = true tracing-subscriber.workspace = true From 8229e89c2d52b3bd6cdebd378a170810c3daa7e8 Mon Sep 17 00:00:00 2001 From: Mario Rugiero Date: Sun, 26 Oct 2025 19:11:19 -0300 Subject: [PATCH 04/11] move --- cmd/ethrex/ethrex.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/ethrex/ethrex.rs b/cmd/ethrex/ethrex.rs index c0c15fb3801..e8858754d1d 100644 --- a/cmd/ethrex/ethrex.rs +++ b/cmd/ethrex/ethrex.rs @@ -83,14 +83,14 @@ async fn ethrex_main() -> eyre::Result<()> { } pub fn main() -> eyre::Result<()> { - let mut core = AtomicUsize::new(0); + let core = AtomicUsize::new(0); let cores = core_affinity::get_core_ids().unwrap_or_default(); // Reserve core 0 and 1 for OS, 2 for block execution. let count = cores.len().saturating_sub(3); tokio::runtime::Builder::new_multi_thread() .enable_all() .worker_threads(count) - .on_thread_start(|| { + .on_thread_start(move || { let core_offset = core.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if let Some(core_id) = cores.get(3 + core_offset.rem_euclid(count)) { core_affinity::set_for_current(*core_id); From 5cb71c80cd7325efd68eb601b55f364053696e36 Mon Sep 17 00:00:00 2001 From: Mario Rugiero Date: Mon, 27 Oct 2025 14:57:45 -0300 Subject: [PATCH 05/11] Revert "move" This reverts commit 8229e89c2d52b3bd6cdebd378a170810c3daa7e8. --- cmd/ethrex/ethrex.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/ethrex/ethrex.rs b/cmd/ethrex/ethrex.rs index e8858754d1d..c0c15fb3801 100644 --- a/cmd/ethrex/ethrex.rs +++ b/cmd/ethrex/ethrex.rs @@ -83,14 +83,14 @@ async fn ethrex_main() -> eyre::Result<()> { } pub fn main() -> eyre::Result<()> { - let core = AtomicUsize::new(0); + let mut core = AtomicUsize::new(0); let cores = core_affinity::get_core_ids().unwrap_or_default(); // Reserve core 0 and 1 for OS, 2 for block execution. let count = cores.len().saturating_sub(3); tokio::runtime::Builder::new_multi_thread() .enable_all() .worker_threads(count) - .on_thread_start(move || { + .on_thread_start(|| { let core_offset = core.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if let Some(core_id) = cores.get(3 + core_offset.rem_euclid(count)) { core_affinity::set_for_current(*core_id); From 67275ac592b6c5c847122ea163b04f43f5e9994a Mon Sep 17 00:00:00 2001 From: Mario Rugiero Date: Mon, 27 Oct 2025 14:57:52 -0300 Subject: [PATCH 06/11] Revert "missing dep" This reverts commit cabab442e00f4aef496f720ee1e5623851c8d355. --- Cargo.lock | 1 - cmd/ethrex/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 98af80adeb7..891a7f3d1e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3526,7 +3526,6 @@ dependencies = [ "bytes", "clap 4.5.48", "clap_complete", - "core_affinity", "directories 5.0.1", "ethrex-blockchain", "ethrex-common 5.0.0", diff --git a/cmd/ethrex/Cargo.toml b/cmd/ethrex/Cargo.toml index 21713d9c023..e91f6dc5304 100644 --- a/cmd/ethrex/Cargo.toml +++ b/cmd/ethrex/Cargo.toml @@ -31,7 +31,6 @@ tikv-jemallocator = { version = "0.6.0", optional = true, features = [ "unprefixed_malloc_on_supported_platforms", ] } bytes.workspace = true -core_affinity.workspace = true hex.workspace = true tracing.workspace = true tracing-subscriber.workspace = true From 87409cea408e7498404d28a2e6cb461aa6d3ca0f Mon Sep 17 00:00:00 2001 From: Mario Rugiero Date: Mon, 27 Oct 2025 14:57:59 -0300 Subject: [PATCH 07/11] Revert "set cpu affinity" This reverts commit b7b1cb47ccea5b7fafb919bae4a918026694419c. --- Cargo.lock | 12 ------------ Cargo.toml | 1 - cmd/ethrex/ethrex.rs | 28 +++------------------------- crates/networking/rpc/Cargo.toml | 1 - crates/networking/rpc/rpc.rs | 4 ---- 5 files changed, 3 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 891a7f3d1e4..05916ae26d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1966,17 +1966,6 @@ dependencies = [ "libc", ] -[[package]] -name = "core_affinity" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342" -dependencies = [ - "libc", - "num_cpus", - "winapi", -] - [[package]] name = "cpp_demangle" version = "0.4.5" @@ -3970,7 +3959,6 @@ dependencies = [ "axum 0.8.6", "axum-extra", "bytes", - "core_affinity", "envy", "ethereum-types 0.15.1", "ethrex-blockchain", diff --git a/Cargo.toml b/Cargo.toml index 6612e948cc0..d86c3574955 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,7 +82,6 @@ tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } async-trait = "0.1.88" -core_affinity = "0.8.3" ethereum-types = { version = "0.15.1", features = ["serialize"] } serde = { version = "1.0.203", features = ["derive"] } serde_with = "3.11.0" diff --git a/cmd/ethrex/ethrex.rs b/cmd/ethrex/ethrex.rs index c0c15fb3801..a402852b843 100644 --- a/cmd/ethrex/ethrex.rs +++ b/cmd/ethrex/ethrex.rs @@ -5,11 +5,7 @@ use ethrex::{ utils::{NodeConfigFile, get_client_version, store_node_config_file}, }; use ethrex_p2p::{discv4::peer_table::PeerTable, types::NodeRecord}; -use std::{ - path::Path, - sync::{Arc, atomic::AtomicUsize}, - time::Duration, -}; +use std::{path::Path, sync::Arc, time::Duration}; use tokio::{ signal::unix::{SignalKind, signal}, sync::Mutex, @@ -52,7 +48,8 @@ async fn server_shutdown( info!("Server shutting down!"); } -async fn ethrex_main() -> eyre::Result<()> { +#[tokio::main] +async fn main() -> eyre::Result<()> { let CLI { opts, command } = CLI::parse(); if let Some(subcommand) = command { @@ -81,22 +78,3 @@ async fn ethrex_main() -> eyre::Result<()> { Ok(()) } - -pub fn main() -> eyre::Result<()> { - let mut core = AtomicUsize::new(0); - let cores = core_affinity::get_core_ids().unwrap_or_default(); - // Reserve core 0 and 1 for OS, 2 for block execution. - let count = cores.len().saturating_sub(3); - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(count) - .on_thread_start(|| { - let core_offset = core.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if let Some(core_id) = cores.get(3 + core_offset.rem_euclid(count)) { - core_affinity::set_for_current(*core_id); - } - }) - .build() - .unwrap() - .block_on(ethrex_main()) -} diff --git a/crates/networking/rpc/Cargo.toml b/crates/networking/rpc/Cargo.toml index 8dab31ca49f..ef77fbe85e2 100644 --- a/crates/networking/rpc/Cargo.toml +++ b/crates/networking/rpc/Cargo.toml @@ -9,7 +9,6 @@ documentation.workspace = true [dependencies] axum = { features = ["ws"], workspace = true } -core_affinity.workspace = true tower-http.workspace = true serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.117" diff --git a/crates/networking/rpc/rpc.rs b/crates/networking/rpc/rpc.rs index 849f173e385..434688ebf71 100644 --- a/crates/networking/rpc/rpc.rs +++ b/crates/networking/rpc/rpc.rs @@ -209,10 +209,6 @@ pub fn start_block_executor( let (block_worker_channel, mut block_receiver) = unbounded_channel::<(oneshot::Sender>, Block)>(); std::thread::spawn(move || { - let cores = core_affinity::get_core_ids().unwrap_or_default(); - if let Some(core_id) = cores.get(2) { - core_affinity::set_for_current(*core_id); - } while let Some((notify, block)) = block_receiver.blocking_recv() { let _ = notify .send(blockchain.add_block(block)) From 045168b0ebb2ece661e07a9d1a6ba9b6e6e6c600 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 27 Oct 2025 15:03:39 -0300 Subject: [PATCH 08/11] fix: remove awaits on now sync functions --- cmd/ethrex/l2/command.rs | 1 - crates/l2/sequencer/l1_committer.rs | 22 +++++++++------------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/cmd/ethrex/l2/command.rs b/cmd/ethrex/l2/command.rs index 3616aa5f67a..3f8d459251e 100644 --- a/cmd/ethrex/l2/command.rs +++ b/cmd/ethrex/l2/command.rs @@ -429,7 +429,6 @@ impl Command { let account_updates_list = store .apply_account_updates_from_trie_batch(trie, account_updates.values()) - .await .map_err(|e| format!("Error applying account updates: {e}")) .unwrap(); diff --git a/crates/l2/sequencer/l1_committer.rs b/crates/l2/sequencer/l1_committer.rs index dd3ae3b2648..85a732de043 100644 --- a/crates/l2/sequencer/l1_committer.rs +++ b/crates/l2/sequencer/l1_committer.rs @@ -519,22 +519,19 @@ impl L1Committer { .apply_account_updates_batch( potential_batch_block.header.parent_hash, &account_updates, - ) - .await? + )? .ok_or(CommitterError::FailedToGetInformationFromStorage( "no account updated".to_owned(), ))?; - one_time_checkpoint_blockchain - .store_block( - potential_batch_block.clone(), - account_updates_list, - BlockExecutionResult { - receipts, - requests: vec![], - }, - ) - .await?; + one_time_checkpoint_blockchain.store_block( + potential_batch_block.clone(), + account_updates_list, + BlockExecutionResult { + receipts, + requests: vec![], + }, + )?; } // Accumulate block data with the rest of the batch. @@ -1258,7 +1255,6 @@ pub async fn regenerate_head_state( blockchain .add_block(block) - .await .map_err(|err| CommitterError::FailedToCreateCheckpoint(err.to_string()))?; } From 5f987a4a2de4859850913cb921b472e7efb20927 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 27 Oct 2025 15:14:02 -0300 Subject: [PATCH 09/11] docs: update perf changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c17ab4b5b05..ff0f8532f3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### 2025-10-27 +- Run "engine_newPayload" block execution in a dedicated worker thread. [#5051](https://github.com/lambdaclass/ethrex/pull/5051) - Reusing FindNode message per lookup loop instead of randomizing the key for each message. [#5047](https://github.com/lambdaclass/ethrex/pull/5047) ### 2025-10-21 From a3d8bc44f5dda8b6e84521e5c0a7a3c153228734 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 27 Oct 2025 15:19:55 -0300 Subject: [PATCH 10/11] style: remove leading :: --- crates/networking/rpc/rpc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/networking/rpc/rpc.rs b/crates/networking/rpc/rpc.rs index 434688ebf71..116b016140c 100644 --- a/crates/networking/rpc/rpc.rs +++ b/crates/networking/rpc/rpc.rs @@ -212,7 +212,7 @@ pub fn start_block_executor( while let Some((notify, block)) = block_receiver.blocking_recv() { let _ = notify .send(blockchain.add_block(block)) - .inspect_err(|_| ::tracing::error!("failed to notify caller")); + .inspect_err(|_| tracing::error!("failed to notify caller")); } }); block_worker_channel From 34201a656f651d6b2ca56b843903868e4a54e9ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 27 Oct 2025 15:21:11 -0300 Subject: [PATCH 11/11] refactor: change error messages to be less generic --- crates/networking/rpc/engine/payload.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/networking/rpc/engine/payload.rs b/crates/networking/rpc/engine/payload.rs index 1118b93f1f3..3748df72e39 100644 --- a/crates/networking/rpc/engine/payload.rs +++ b/crates/networking/rpc/engine/payload.rs @@ -683,10 +683,14 @@ pub async fn add_block(ctx: &RpcApiContext, block: Block) -> Result<(), ChainErr let (notify_send, notify_recv) = oneshot::channel(); ctx.block_worker_channel .send((notify_send, block)) - .map_err(|e| ChainError::Custom(format!("failed to send message: {e}")))?; + .map_err(|e| { + ChainError::Custom(format!( + "failed to send block execution request to worker: {e}" + )) + })?; notify_recv .await - .map_err(|e| ChainError::Custom(format!("recv failed: {e}")))? + .map_err(|e| ChainError::Custom(format!("failed to receive block execution result: {e}")))? } async fn try_execute_payload(