Skip to content
Merged
2 changes: 1 addition & 1 deletion benches/benches/build_block_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ pub async fn bench_payload(input: &(Arc<Blockchain>, Block, &Store)) -> (Duratio
// 3. engine_newPayload is called, this eventually calls Blockchain::add_block
// which takes transactions from the mempool and fills the block with them.
let since = Instant::now();
blockchain.add_block(block).await.unwrap();
blockchain.add_block(block).unwrap();
let executed = Instant::now();
// EXTRA: Sanity check to not benchmark n empty block.
assert!(
Expand Down
1 change: 0 additions & 1 deletion cmd/ethrex/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,6 @@ pub async fn import_blocks(

blockchain
.add_block(block)
.await
.inspect_err(|err| match err {
// Block number 1's parent not found, the chain must not belong to the same network as the genesis file
ChainError::ParentNotFound if number == 1 => warn!("The chain file is not compatible with the genesis file. Are you sure you selected the correct network?"),
Expand Down
2 changes: 1 addition & 1 deletion cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ pub async fn regenerate_head_state(
.await?
.ok_or_else(|| eyre::eyre!("Block {i} not found"))?;

blockchain.add_block(block).await?;
blockchain.add_block(block)?;
}

info!("Finished regenerating state");
Expand Down
1 change: 0 additions & 1 deletion cmd/ethrex/l2/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ impl Command {

let account_updates_list = store
.apply_account_updates_from_trie_batch(trie, account_updates.values())
.await
.map_err(|e| format!("Error applying account updates: {e}"))
.unwrap();

Expand Down
21 changes: 8 additions & 13 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ impl Blockchain {
}

/// Executes a block withing a new vm instance and state
async fn execute_block(
fn execute_block(
&self,
block: &Block,
) -> Result<(BlockExecutionResult, Vec<AccountUpdate>), ChainError> {
// Validate if it can be the new head and find the parent
let Ok(parent_header) = find_parent_header(&block.header, &self.storage) else {
// If the parent is not present, we store it as pending.
self.storage.add_pending_block(block.clone()).await?;
self.storage.add_pending_block(block.clone())?;
return Err(ChainError::ParentNotFound);
};

Expand Down Expand Up @@ -388,8 +388,7 @@ impl Blockchain {
// We cannot ensure that the users of this function have the necessary
// state stored, so in order for it to not assume anything, we update
// the storage with the new state after re-execution
self.store_block(block.clone(), account_updates_list, execution_result)
.await?;
self.store_block(block.clone(), account_updates_list, execution_result)?;

for (address, (witness, _storage_trie)) in storage_tries_after_update {
let mut witness = witness.lock().map_err(|_| {
Expand Down Expand Up @@ -499,7 +498,7 @@ impl Blockchain {
})
}

pub async fn store_block(
pub fn store_block(
&self,
block: Block,
account_updates_list: AccountUpdatesList,
Expand All @@ -518,20 +517,18 @@ impl Blockchain {

self.storage
.store_block_updates(update_batch)
.await
.map_err(|e| e.into())
}

pub async fn add_block(&self, block: Block) -> Result<(), ChainError> {
pub fn add_block(&self, block: Block) -> Result<(), ChainError> {
let since = Instant::now();
let (res, updates) = self.execute_block(&block).await?;
let (res, updates) = self.execute_block(&block)?;
let executed = Instant::now();

// Apply the account updates over the last block's state and compute the new state root
let account_updates_list = self
.storage
.apply_account_updates_batch(block.header.parent_hash, &updates)
.await?
.apply_account_updates_batch(block.header.parent_hash, &updates)?
.ok_or(ChainError::ParentStateNotFound)?;

let (gas_used, gas_limit, block_number, transactions_count) = (
Expand All @@ -542,7 +539,7 @@ impl Blockchain {
);

let merkleized = Instant::now();
let result = self.store_block(block, account_updates_list, res).await;
let result = self.store_block(block, account_updates_list, res);
let stored = Instant::now();

if self.options.perf_logs_enabled {
Expand Down Expand Up @@ -707,7 +704,6 @@ impl Blockchain {
let account_updates_list = self
.storage
.apply_account_updates_batch(first_block_header.parent_hash, &account_updates)
.await
.map_err(|e| (e.into(), None))?
.ok_or((ChainError::ParentStateNotFound, None))?;

Expand All @@ -729,7 +725,6 @@ impl Blockchain {

self.storage
.store_block_updates(update_batch)
.await
.map_err(|e| (e.into(), None))?;

let elapsed_seconds = interval.elapsed().as_secs_f64();
Expand Down
3 changes: 1 addition & 2 deletions crates/blockchain/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,8 +633,7 @@ impl Blockchain {

let ret_acount_updates_list = self
.storage
.apply_account_updates_batch(context.parent_hash(), &account_updates)
.await?
.apply_account_updates_batch(context.parent_hash(), &account_updates)?
.ok_or(ChainError::ParentStateNotFound)?;

let state_root = ret_acount_updates_list.state_trie_hash;
Expand Down
17 changes: 4 additions & 13 deletions crates/blockchain/smoke_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod blockchain_integration_test {
// Add first block. We'll make it canonical.
let block_1a = new_block(&store, &genesis_header).await;
let hash_1a = block_1a.hash();
blockchain.add_block(block_1a.clone()).await.unwrap();
blockchain.add_block(block_1a.clone()).unwrap();
store
.forkchoice_update(None, 1, hash_1a, None, None)
.await
Expand All @@ -45,7 +45,6 @@ mod blockchain_integration_test {
let hash_1b = block_1b.hash();
blockchain
.add_block(block_1b.clone())
.await
.expect("Could not add block 1b.");
let retrieved_1b = store.get_block_header_by_hash(hash_1b).unwrap().unwrap();

Expand All @@ -57,7 +56,6 @@ mod blockchain_integration_test {
let hash_2 = block_2.hash();
blockchain
.add_block(block_2.clone())
.await
.expect("Could not add block 2.");
let retrieved_2 = store.get_block_header_by_hash(hash_2).unwrap();

Expand Down Expand Up @@ -92,7 +90,7 @@ mod blockchain_integration_test {
// Build a single valid block.
let block_1 = new_block(&store, &genesis_header).await;
let hash_1 = block_1.hash();
blockchain.add_block(block_1.clone()).await.unwrap();
blockchain.add_block(block_1.clone()).unwrap();
apply_fork_choice(&store, hash_1, H256::zero(), H256::zero())
.await
.unwrap();
Expand All @@ -101,7 +99,7 @@ mod blockchain_integration_test {
let mut block_2 = new_block(&store, &block_1.header).await;
block_2.header.parent_hash = H256::random();
let hash_2 = block_2.hash();
let result = blockchain.add_block(block_2.clone()).await;
let result = blockchain.add_block(block_2.clone());
assert!(matches!(result, Err(ChainError::ParentNotFound)));

// block 2 should now be pending.
Expand All @@ -127,7 +125,7 @@ mod blockchain_integration_test {
// Add first block. Not canonical.
let block_1a = new_block(&store, &genesis_header).await;
let hash_1a = block_1a.hash();
blockchain.add_block(block_1a.clone()).await.unwrap();
blockchain.add_block(block_1a.clone()).unwrap();
let retrieved_1a = store.get_block_header_by_hash(hash_1a).unwrap().unwrap();

assert!(!is_canonical(&store, 1, hash_1a).await.unwrap());
Expand All @@ -137,7 +135,6 @@ mod blockchain_integration_test {
let hash_1b = block_1b.hash();
blockchain
.add_block(block_1b.clone())
.await
.expect("Could not add block 1b.");
apply_fork_choice(&store, hash_1b, genesis_hash, genesis_hash)
.await
Expand All @@ -154,7 +151,6 @@ mod blockchain_integration_test {
let hash_2 = block_2.hash();
blockchain
.add_block(block_2.clone())
.await
.expect("Could not add block 2.");
apply_fork_choice(&store, hash_2, genesis_hash, genesis_hash)
.await
Expand Down Expand Up @@ -201,15 +197,13 @@ mod blockchain_integration_test {
let hash_1 = block_1.hash();
blockchain
.add_block(block_1.clone())
.await
.expect("Could not add block 1b.");

// Add child at height 2.
let block_2 = new_block(&store, &block_1.header).await;
let hash_2 = block_2.hash();
blockchain
.add_block(block_2.clone())
.await
.expect("Could not add block 2.");

assert!(!is_canonical(&store, 1, hash_1).await.unwrap());
Expand Down Expand Up @@ -253,15 +247,13 @@ mod blockchain_integration_test {
let block_1 = new_block(&store, &genesis_header).await;
blockchain
.add_block(block_1.clone())
.await
.expect("Could not add block 1b.");

// Add child at height 2.
let block_2 = new_block(&store, &block_1.header).await;
let hash_2 = block_2.hash();
blockchain
.add_block(block_2.clone())
.await
.expect("Could not add block 2.");

assert_eq!(
Expand All @@ -281,7 +273,6 @@ mod blockchain_integration_test {
let hash_b = block_1b.hash();
blockchain
.add_block(block_1b.clone())
.await
.expect("Could not add block b.");

// The latest block should be the same.
Expand Down
2 changes: 1 addition & 1 deletion crates/l2/based/block_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl BlockFetcher {

async fn store_batch(&mut self, batch: &[Block]) -> Result<(), BlockFetcherError> {
for block in batch.iter() {
self.blockchain.add_block(block.clone()).await?;
self.blockchain.add_block(block.clone())?;

let block_hash = block.hash();

Expand Down
2 changes: 2 additions & 0 deletions crates/l2/networking/rpc/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub async fn start_api(
// TODO: Refactor how filters are handled,
// filters are used by the filters endpoints (eth_newFilter, eth_getFilterChanges, ...etc)
let active_filters = Arc::new(Mutex::new(HashMap::new()));
let block_worker_channel = ethrex_rpc::start_block_executor(blockchain.clone());
let service_context = RpcApiContext {
l1_ctx: ethrex_rpc::RpcApiContext {
storage,
Expand All @@ -103,6 +104,7 @@ pub async fn start_api(
gas_tip_estimator: Arc::new(TokioMutex::new(GasTipEstimator::new())),
log_filter_handler,
gas_ceil,
block_worker_channel,
},
valid_delegation_addresses,
sponsor_pk,
Expand Down
6 changes: 2 additions & 4 deletions crates/l2/sequencer/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,17 +203,15 @@ impl BlockProducer {

let account_updates_list = self
.store
.apply_account_updates_batch(block.header.parent_hash, &account_updates)
.await?
.apply_account_updates_batch(block.header.parent_hash, &account_updates)?
.ok_or(ChainError::ParentStateNotFound)?;

let transactions_count = block.body.transactions.len();
let block_number = block.header.number;
let block_hash = block.hash();
self.store_fee_config_by_block(block.header.number).await?;
self.blockchain
.store_block(block, account_updates_list, execution_result)
.await?;
.store_block(block, account_updates_list, execution_result)?;
info!(
"Stored new block {:x}, transaction_count {}",
block_hash, transactions_count
Expand Down
22 changes: 9 additions & 13 deletions crates/l2/sequencer/l1_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,22 +519,19 @@ impl L1Committer {
.apply_account_updates_batch(
potential_batch_block.header.parent_hash,
&account_updates,
)
.await?
)?
.ok_or(CommitterError::FailedToGetInformationFromStorage(
"no account updated".to_owned(),
))?;

one_time_checkpoint_blockchain
.store_block(
potential_batch_block.clone(),
account_updates_list,
BlockExecutionResult {
receipts,
requests: vec![],
},
)
.await?;
one_time_checkpoint_blockchain.store_block(
potential_batch_block.clone(),
account_updates_list,
BlockExecutionResult {
receipts,
requests: vec![],
},
)?;
}

// Accumulate block data with the rest of the batch.
Expand Down Expand Up @@ -1258,7 +1255,6 @@ pub async fn regenerate_head_state(

blockchain
.add_block(block)
.await
.map_err(|err| CommitterError::FailedToCreateCheckpoint(err.to_string()))?;
}

Expand Down
22 changes: 9 additions & 13 deletions crates/networking/p2p/rlpx/l2/l2_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,19 +393,15 @@ async fn process_new_block(
let block = Arc::<Block>::try_unwrap(block).map_err(|_| {
PeerConnectionError::InternalError("Failed to take ownership of block".to_string())
})?;
established
.blockchain
.add_block(block)
.await
.inspect_err(|e| {
log_peer_error!(
&established.node,
&format!(
"Error adding new block {} with hash {:?}, error: {e}",
block_number, block_hash
),
);
})?;
established.blockchain.add_block(block).inspect_err(|e| {
log_peer_error!(
&established.node,
&format!(
"Error adding new block {} with hash {:?}, error: {e}",
block_number, block_hash
),
);
})?;

apply_fork_choice(&established.storage, block_hash, block_hash, block_hash)
.await
Expand Down
2 changes: 1 addition & 1 deletion crates/networking/p2p/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ impl Syncer {
let mut last_valid_hash = H256::default();
for block in blocks {
let block_hash = block.hash();
blockchain.add_block(block).await.map_err(|e| {
blockchain.add_block(block).map_err(|e| {
(
e,
Some(BatchBlockProcessingFailure {
Expand Down
13 changes: 12 additions & 1 deletion crates/networking/rpc/engine/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ethrex_common::{H256, U256};
use ethrex_p2p::sync::SyncMode;
use ethrex_rlp::error::RLPDecodeError;
use serde_json::Value;
use tokio::sync::oneshot;
use tracing::{debug, error, info, warn};

use crate::rpc::{RpcApiContext, RpcHandler};
Expand Down Expand Up @@ -678,6 +679,16 @@ fn validate_block_hash(payload: &ExecutionPayload, block: &Block) -> Result<(),
Ok(())
}

pub async fn add_block(ctx: &RpcApiContext, block: Block) -> Result<(), ChainError> {
let (notify_send, notify_recv) = oneshot::channel();
ctx.block_worker_channel
.send((notify_send, block))
.map_err(|e| ChainError::Custom(format!("failed to send message: {e}")))?;
notify_recv
.await
.map_err(|e| ChainError::Custom(format!("recv failed: {e}")))?
}

async fn try_execute_payload(
block: Block,
context: &RpcApiContext,
Expand All @@ -696,7 +707,7 @@ async fn try_execute_payload(
// Execute and store the block
info!(%block_hash, %block_number, "Executing payload");

match context.blockchain.add_block(block).await {
match add_block(context, block).await {
Err(ChainError::ParentNotFound) => {
// Start sync
context.syncer.sync_to_head(block_hash);
Expand Down
Loading
Loading