From 57810eedafe2fffdf510b624b5a5b9075c6724a4 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Wed, 8 Oct 2025 15:26:56 -0300 Subject: [PATCH 01/38] Download all headers new to old as first step of fullsync --- crates/networking/p2p/sync.rs | 230 ++++++++++++++++++--------- crates/storage/api.rs | 8 + crates/storage/store.rs | 12 ++ crates/storage/store_db/in_memory.rs | 12 ++ crates/storage/store_db/rocksdb.rs | 36 +++++ 5 files changed, 227 insertions(+), 71 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 98509958e82..1afe0eed5ff 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -324,109 +324,197 @@ impl Syncer { /// # Returns /// /// Returns an error if the sync fails at any given step and aborts all active processes - async fn sync_cycle_full(&mut self, sync_head: H256, store: Store) -> Result<(), SyncError> { + async fn sync_cycle_full( + &mut self, + mut sync_head: H256, + store: Store, + ) -> Result<(), SyncError> { // Request all block headers between the current head and the sync head // We will begin from the current head so that we download the earliest state first // This step is not parallelized - let mut block_sync_state = FullBlockSyncState::new(store.clone()); + let block_sync_state = FullBlockSyncState::new(store.clone()); // Check if we have some blocks downloaded from a previous sync attempt // This applies only to snap sync—full sync always starts fetching headers // from the canonical block, which updates as new block headers are fetched. - let mut current_head = block_sync_state.get_current_head().await?; + + // Update current fetch head + let current_head = block_sync_state.get_current_head().await?; info!( "Syncing from current head {:?} to sync_head {:?}", current_head, sync_head ); + info!("Starting header download"); + let mut start_block_number = 0; + let mut end_block_number = 0; loop { - debug!("Sync Log 1: In Full Sync"); - debug!( - "Sync Log 3: State current headers len {}", - block_sync_state.current_headers.len() - ); - debug!( - "Sync Log 4: State current blocks len {}", - block_sync_state.current_blocks.len() - ); - - debug!("Requesting Block Headers from {current_head}"); - - let Some(mut block_headers) = self + let Some(block_headers) = self .peers - .request_block_headers_from_hash(current_head, BlockRequestOrder::OldToNew) + .request_block_headers_from_hash(sync_head, BlockRequestOrder::NewToOld) .await? else { warn!("Sync failed to find target block header, aborting"); debug!("Sync Log 8: Sync failed to find target block header, aborting"); return Ok(()); }; - debug!("Sync Log 9: Received {} block headers", block_headers.len()); + end_block_number = end_block_number.max(block_headers.first().as_ref().unwrap().number); - let (first_block_hash, first_block_number, first_block_parent_hash) = - match block_headers.first() { - Some(header) => (header.hash(), header.number, header.parent_hash), - None => continue, - }; - let (last_block_hash, last_block_number) = match block_headers.last() { - Some(header) => (header.hash(), header.number), - None => continue, - }; - // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck - // on a loop when the target head is not found, i.e. on a reorg with a side-chain. - if first_block_hash == last_block_hash - && first_block_hash == current_head - && current_head != sync_head - { - // There is no path to the sync head this goes back until it find a common ancerstor - warn!("Sync failed to find target block header, going back to the previous parent"); - current_head = first_block_parent_hash; - continue; - } - - debug!( + info!( "Received {} block headers| First Number: {} Last Number: {}", block_headers.len(), - first_block_number, - last_block_number + block_headers.first().as_ref().unwrap().number, + block_headers.last().as_ref().unwrap().number, ); - // Filter out everything after the sync_head - let mut sync_head_found = false; - if let Some(index) = block_headers - .iter() - .position(|header| header.hash() == sync_head) - { - sync_head_found = true; - block_headers.drain(index + 1..); + // // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck + // // on a loop when the target head is not found, i.e. on a reorg with a side-chain. + // if first_block_hash == last_block_hash + // && first_block_hash == current_head + // && current_head != sync_head + // { + // // There is no path to the sync head this goes back until it find a common ancerstor + // warn!("Sync failed to find target block header, going back to the previous parent"); + // current_head = first_block_parent_hash; + // continue; + // } + + sync_head = block_headers.last().as_ref().unwrap().parent_hash; + if store.is_canonical_sync(sync_head)? { + // Incoming chain merged with current chain + start_block_number = block_headers.last().as_ref().unwrap().number; + break; } + store.add_fullsync_batch(block_headers).await?; + } - // Update current fetch head - current_head = last_block_hash; + info!("Downloading Bodies and executing blocks"); + for start in (start_block_number..end_block_number).step_by(*EXECUTE_BATCH_SIZE) { + let batch_size = EXECUTE_BATCH_SIZE.min((end_block_number - start) as usize); + let final_batch = end_block_number == start + batch_size as u64; + // Retrieve batch from DB + info!( + "Processing batch from block number {start} to {}", + start + *EXECUTE_BATCH_SIZE as u64 + ); + let mut headers = store + .read_fullsync_batch(start, *EXECUTE_BATCH_SIZE as u64) + .await?; + let mut blocks = Vec::new(); + // Request block bodies + // Download block bodies + while !headers.is_empty() { + let header_batch = &headers[..min(MAX_BLOCK_BODIES_TO_REQUEST, headers.len())]; + let bodies = self + .peers + .request_and_validate_block_bodies(header_batch) + .await? + .ok_or(SyncError::BodiesNotFound)?; + debug!("Obtained: {} block bodies", bodies.len()); + let block_batch = headers + .drain(..bodies.len()) + .zip(bodies) + .map(|(header, body)| Block { header, body }); + blocks.extend(block_batch); + } + // Execute blocks - // Discard the first header as we already have it - if block_headers.len() > 1 { - let mut finished = false; - while !finished { - let headers = std::mem::take(&mut block_headers); - let block_headers_iter = headers.into_iter().skip(1); - (finished, sync_head_found) = block_sync_state - .process_incoming_headers( - block_headers_iter, - sync_head, - sync_head_found, - self.blockchain.clone(), - self.peers.clone(), - self.cancel_token.clone(), - ) - .await?; + info!( + "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", + blocks.len(), + blocks.first().ok_or(SyncError::NoBlocks)?.hash(), + blocks.last().ok_or(SyncError::NoBlocks)?.hash() + ); + let execution_start = Instant::now(); + // Copy some values for later + let blocks_len = blocks.len(); + let numbers_and_hashes = blocks + .iter() + .map(|b| (b.header.number, b.hash())) + .collect::>(); + let (last_block_number, last_block_hash) = numbers_and_hashes + .last() + .cloned() + .ok_or(SyncError::InvalidRangeReceived)?; + let (first_block_number, first_block_hash) = numbers_and_hashes + .first() + .cloned() + .ok_or(SyncError::InvalidRangeReceived)?; + + let blocks_hashes = blocks + .iter() + .map(|block| block.hash()) + .collect::>(); + + // Run the batch + if let Err((err, batch_failure)) = Syncer::add_blocks( + self.blockchain.clone(), + blocks, + final_batch, + self.cancel_token.clone(), + ) + .await + { + if let Some(batch_failure) = batch_failure { + warn!("Failed to add block during FullSync: {err}"); + // Since running the batch failed we set the failing block and it's descendants with having an invalid ancestor on the following cases. + if let ChainError::InvalidBlock(_) = err { + let mut block_hashes_with_invalid_ancestor: Vec = vec![]; + if let Some(index) = blocks_hashes + .iter() + .position(|x| x == &batch_failure.failed_block_hash) + { + block_hashes_with_invalid_ancestor = + blocks_hashes[index..].to_vec(); + } + + for hash in block_hashes_with_invalid_ancestor { + store + .set_latest_valid_ancestor(hash, batch_failure.last_valid_hash) + .await?; + } + // // We also set with having an invalid ancestor all the hashes remaining which are descendants as well. + // for header in &self.current_headers { + // self.store + // .set_latest_valid_ancestor( + // header.hash(), + // batch_failure.last_valid_hash, + // ) + // .await?; + // } + } } + return Err(err.into()); } - if sync_head_found { - break; - }; + store + .forkchoice_update( + Some(numbers_and_hashes), + last_block_number, + last_block_hash, + None, + None, + ) + .await?; + + let execution_time: f64 = execution_start.elapsed().as_millis() as f64 / 1000.0; + let blocks_per_second = blocks_len as f64 / execution_time; + + info!( + "[SYNCING] Executed & stored {} blocks in {:.3} seconds.\n\ + Started at block with hash {} (number {}).\n\ + Finished at block with hash {} (number {}).\n\ + Blocks per second: {:.3}", + blocks_len, + execution_time, + first_block_hash, + first_block_number, + last_block_hash, + last_block_number, + blocks_per_second + ); } + Ok(()) } diff --git a/crates/storage/api.rs b/crates/storage/api.rs index 051289c73bc..7946c19f78f 100644 --- a/crates/storage/api.rs +++ b/crates/storage/api.rs @@ -346,4 +346,12 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { &self, account_codes: Vec<(H256, Bytes)>, ) -> Result<(), StoreError>; + + async fn add_fullsync_batch(&self, headers: Vec) -> Result<(), StoreError>; + + async fn read_fullsync_batch( + &self, + start: BlockNumber, + limit: u64, + ) -> Result, StoreError>; } diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 8fa690c4b04..2035bb46074 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -1267,6 +1267,18 @@ impl Store { ) -> Result<(), StoreError> { self.engine.write_account_code_batch(account_codes).await } + + pub async fn add_fullsync_batch(&self, headers: Vec) -> Result<(), StoreError> { + self.engine.add_fullsync_batch(headers).await + } + + pub async fn read_fullsync_batch( + &self, + start: BlockNumber, + limit: u64, + ) -> Result, StoreError> { + self.engine.read_fullsync_batch(start, limit).await + } } pub struct AncestorIterator { diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index d64a4479d9a..9128da56386 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -617,6 +617,18 @@ impl StoreEngine for Store { Ok(()) } + + async fn add_fullsync_batch(&self, headers: Vec) -> Result<(), StoreError> { + todo!() + } + + async fn read_fullsync_batch( + &self, + start: BlockNumber, + limit: u64, + ) -> Result, StoreError> { + todo!() + } } impl Debug for Store { diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index 1c08d4ed034..35a7581e74f 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -101,6 +101,11 @@ const CF_PENDING_BLOCKS: &str = "pending_blocks"; /// - [`Vec`] = `BlockHashRLP::from(latest_valid).bytes().clone()` const CF_INVALID_ANCESTORS: &str = "invalid_ancestors"; +/// Block headers downloaded during fullsync column family: [`u8;_`] => [`Vec`] +/// - [`u8;_`] = `block_number.to_le_bytes()` +/// - [`Vec`] = `BlockHeaderRLP::from(block.header.clone()).bytes().clone()` +const CF_FULLSYNC_HEADERS: &str = "headers"; + #[derive(Debug)] pub struct Store { db: Arc>, @@ -163,6 +168,7 @@ impl Store { CF_STORAGE_TRIES_NODES, CF_PENDING_BLOCKS, CF_INVALID_ANCESTORS, + CF_FULLSYNC_HEADERS, ]; // Get existing column families to know which ones to drop later @@ -1389,6 +1395,36 @@ impl StoreEngine for Store { self.write_batch_async(batch_ops).await } + + async fn add_fullsync_batch(&self, headers: Vec) -> Result<(), StoreError> { + let mut batch_ops = Vec::new(); + + for header in headers { + let number_value = header.number.to_le_bytes().to_vec(); + let header_value = BlockHeaderRLP::from(header).bytes().clone(); + + batch_ops.push((CF_FULLSYNC_HEADERS.to_string(), number_value, header_value)); + } + + self.write_batch_async(batch_ops).await + } + + async fn read_fullsync_batch( + &self, + start: BlockNumber, + limit: u64, + ) -> Result, StoreError> { + self.read_bulk_async( + CF_FULLSYNC_HEADERS, + (start..start + limit).map(|n| n.to_le_bytes()).collect(), + |bytes| { + BlockHeaderRLP::from_bytes(bytes) + .to() + .map_err(StoreError::from) + }, + ) + .await + } } /// Open column families From 81866cedd2f76ec286ce70ec08486486729445f4 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Thu, 9 Oct 2025 17:10:41 -0300 Subject: [PATCH 02/38] fix --- crates/networking/p2p/sync.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 1afe0eed5ff..633c91acfcd 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -348,7 +348,7 @@ impl Syncer { let mut start_block_number = 0; let mut end_block_number = 0; loop { - let Some(block_headers) = self + let Some(mut block_headers) = self .peers .request_block_headers_from_hash(sync_head, BlockRequestOrder::NewToOld) .await? @@ -378,11 +378,20 @@ impl Syncer { // current_head = first_block_parent_hash; // continue; // } - - sync_head = block_headers.last().as_ref().unwrap().parent_hash; - if store.is_canonical_sync(sync_head)? { + sync_head = block_headers.last().unwrap().parent_hash; + if store.is_canonical_sync(sync_head)? || sync_head.is_zero() { // Incoming chain merged with current chain - start_block_number = block_headers.last().as_ref().unwrap().number; + // Filter out already canonical blocks from batch + let mut first_canon_block = block_headers.len(); + for (index, header) in block_headers.iter().enumerate() { + if store.is_canonical_sync(header.hash())? { + first_canon_block = index; + break; + } + } + block_headers.drain(first_canon_block..block_headers.len()); + start_block_number = block_headers.last().as_ref().unwrap().number.max(1); + store.add_fullsync_batch(block_headers).await?; break; } store.add_fullsync_batch(block_headers).await?; From c02da601b1b0b9627b60a6ee7eae041dc8c93361 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Mon, 13 Oct 2025 16:31:08 -0300 Subject: [PATCH 03/38] Some fixes + debug --- crates/networking/p2p/sync.rs | 180 ++++++++++++++++++---------------- 1 file changed, 97 insertions(+), 83 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 633c91acfcd..2e83dec2922 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -345,8 +345,20 @@ impl Syncer { ); info!("Starting header download"); - let mut start_block_number = 0; - let mut end_block_number = 0; + let start_block_number; + let end_block_number; + + // Check if the sync_head is a pending block + let mut pending_blocks = vec![]; + while let Some(block) = store.get_pending_block(sync_head).await? { + sync_head = block.header.parent_hash; + pending_blocks.push(block); + } + if !pending_blocks.is_empty() { + let pending_numbers = pending_blocks.iter().map(|b| b.header.number).collect::>(); + info!("Fetched pending blocks: {pending_numbers:?}"); + } + loop { let Some(mut block_headers) = self .peers @@ -358,7 +370,6 @@ impl Syncer { return Ok(()); }; debug!("Sync Log 9: Received {} block headers", block_headers.len()); - end_block_number = end_block_number.max(block_headers.first().as_ref().unwrap().number); info!( "Received {} block headers| First Number: {} Last Number: {}", @@ -367,17 +378,6 @@ impl Syncer { block_headers.last().as_ref().unwrap().number, ); - // // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck - // // on a loop when the target head is not found, i.e. on a reorg with a side-chain. - // if first_block_hash == last_block_hash - // && first_block_hash == current_head - // && current_head != sync_head - // { - // // There is no path to the sync head this goes back until it find a common ancerstor - // warn!("Sync failed to find target block header, going back to the previous parent"); - // current_head = first_block_parent_hash; - // continue; - // } sync_head = block_headers.last().unwrap().parent_hash; if store.is_canonical_sync(sync_head)? || sync_head.is_zero() { // Incoming chain merged with current chain @@ -391,6 +391,8 @@ impl Syncer { } block_headers.drain(first_canon_block..block_headers.len()); start_block_number = block_headers.last().as_ref().unwrap().number.max(1); + end_block_number = block_headers.first().as_ref().unwrap().number + 1; + info!("Fullsync range: {start_block_number}..{end_block_number}"); store.add_fullsync_batch(block_headers).await?; break; } @@ -427,14 +429,29 @@ impl Syncer { blocks.extend(block_batch); } // Execute blocks - info!( "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", blocks.len(), blocks.first().ok_or(SyncError::NoBlocks)?.hash(), blocks.last().ok_or(SyncError::NoBlocks)?.hash() ); - let execution_start = Instant::now(); + self.add_blocks_in_batch(blocks, final_batch, store.clone()).await?; + + + } + + // Execute pending blocks + if !pending_blocks.is_empty() { + info!("Executing pending blocks {}", pending_blocks.len()); + self.add_blocks_in_batch(pending_blocks, true, store).await?; + + } + + Ok(()) + } + +async fn add_blocks_in_batch(&self, blocks: Vec, final_batch: bool, store: Store) -> Result<(), SyncError> { + let execution_start = Instant::now(); // Copy some values for later let blocks_len = blocks.len(); let numbers_and_hashes = blocks @@ -454,79 +471,76 @@ impl Syncer { .iter() .map(|block| block.hash()) .collect::>(); + // Run the batch + if let Err((err, batch_failure)) = Syncer::add_blocks( + self.blockchain.clone(), + blocks, + final_batch, + self.cancel_token.clone(), + ) + .await + { + if let Some(batch_failure) = batch_failure { + warn!("Failed to add block during FullSync: {err}"); + // Since running the batch failed we set the failing block and it's descendants with having an invalid ancestor on the following cases. + if let ChainError::InvalidBlock(_) = err { + let mut block_hashes_with_invalid_ancestor: Vec = vec![]; + if let Some(index) = blocks_hashes + .iter() + .position(|x| x == &batch_failure.failed_block_hash) + { + block_hashes_with_invalid_ancestor = + blocks_hashes[index..].to_vec(); + } - // Run the batch - if let Err((err, batch_failure)) = Syncer::add_blocks( - self.blockchain.clone(), - blocks, - final_batch, - self.cancel_token.clone(), - ) - .await - { - if let Some(batch_failure) = batch_failure { - warn!("Failed to add block during FullSync: {err}"); - // Since running the batch failed we set the failing block and it's descendants with having an invalid ancestor on the following cases. - if let ChainError::InvalidBlock(_) = err { - let mut block_hashes_with_invalid_ancestor: Vec = vec![]; - if let Some(index) = blocks_hashes - .iter() - .position(|x| x == &batch_failure.failed_block_hash) - { - block_hashes_with_invalid_ancestor = - blocks_hashes[index..].to_vec(); - } - - for hash in block_hashes_with_invalid_ancestor { - store - .set_latest_valid_ancestor(hash, batch_failure.last_valid_hash) - .await?; - } - // // We also set with having an invalid ancestor all the hashes remaining which are descendants as well. - // for header in &self.current_headers { - // self.store - // .set_latest_valid_ancestor( - // header.hash(), - // batch_failure.last_valid_hash, - // ) - // .await?; - // } - } + for hash in block_hashes_with_invalid_ancestor { + store + .set_latest_valid_ancestor(hash, batch_failure.last_valid_hash) + .await?; } - return Err(err.into()); + // // We also set with having an invalid ancestor all the hashes remaining which are descendants as well. + // for header in &self.current_headers { + // self.store + // .set_latest_valid_ancestor( + // header.hash(), + // batch_failure.last_valid_hash, + // ) + // .await?; + // } } - - store - .forkchoice_update( - Some(numbers_and_hashes), - last_block_number, - last_block_hash, - None, - None, - ) - .await?; - - let execution_time: f64 = execution_start.elapsed().as_millis() as f64 / 1000.0; - let blocks_per_second = blocks_len as f64 / execution_time; - - info!( - "[SYNCING] Executed & stored {} blocks in {:.3} seconds.\n\ - Started at block with hash {} (number {}).\n\ - Finished at block with hash {} (number {}).\n\ - Blocks per second: {:.3}", - blocks_len, - execution_time, - first_block_hash, - first_block_number, - last_block_hash, - last_block_number, - blocks_per_second - ); } - - Ok(()) + return Err(err.into()); } + store + .forkchoice_update( + Some(numbers_and_hashes), + last_block_number, + last_block_hash, + None, + None, + ) + .await?; + + let execution_time: f64 = execution_start.elapsed().as_millis() as f64 / 1000.0; + let blocks_per_second = blocks_len as f64 / execution_time; + + info!( + "[SYNCING] Executed & stored {} blocks in {:.3} seconds.\n\ + Started at block with hash {} (number {}).\n\ + Finished at block with hash {} (number {}).\n\ + Blocks per second: {:.3}", + blocks_len, + execution_time, + first_block_hash, + first_block_number, + last_block_hash, + last_block_number, + blocks_per_second + ); + Ok(()) +} + /// Executes the given blocks and stores them /// If sync_head_found is true, they will be executed one by one /// If sync_head_found is false, they will be executed in a single batch From 678d29b5905125c53cf213f850c6c057337da07d Mon Sep 17 00:00:00 2001 From: fmoletta Date: Mon, 13 Oct 2025 18:21:48 -0300 Subject: [PATCH 04/38] fix --- crates/networking/p2p/sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 2e83dec2922..e56dcff7667 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -406,10 +406,10 @@ impl Syncer { // Retrieve batch from DB info!( "Processing batch from block number {start} to {}", - start + *EXECUTE_BATCH_SIZE as u64 + start + batch_size as u64 ); let mut headers = store - .read_fullsync_batch(start, *EXECUTE_BATCH_SIZE as u64) + .read_fullsync_batch(start, batch_size as u64) .await?; let mut blocks = Vec::new(); // Request block bodies From 67da9ca631917100024927f8788059369bbfd317 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Mon, 13 Oct 2025 18:22:22 -0300 Subject: [PATCH 05/38] Invalidate child when checking for invalid parent --- crates/networking/rpc/engine/fork_choice.rs | 2 ++ crates/networking/rpc/engine/payload.rs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/crates/networking/rpc/engine/fork_choice.rs b/crates/networking/rpc/engine/fork_choice.rs index adfbcdd9352..8c97e7f6b92 100644 --- a/crates/networking/rpc/engine/fork_choice.rs +++ b/crates/networking/rpc/engine/fork_choice.rs @@ -200,6 +200,8 @@ async fn handle_forkchoice( .get_latest_valid_ancestor(head_block.parent_hash) .await? { + // Invalidate the child too + context.storage.set_latest_valid_ancestor(head_block.hash(), latest_valid_hash).await?; return Ok(( None, ForkChoiceResponse::from(PayloadStatus::invalid_with( diff --git a/crates/networking/rpc/engine/payload.rs b/crates/networking/rpc/engine/payload.rs index a912aa83a79..7b932fc52c8 100644 --- a/crates/networking/rpc/engine/payload.rs +++ b/crates/networking/rpc/engine/payload.rs @@ -571,6 +571,8 @@ async fn validate_ancestors( .get_latest_valid_ancestor(block.header.parent_hash) .await? { + // Invalidate child too + context.storage.set_latest_valid_ancestor(block.header.hash(), latest_valid_hash).await?; return Ok(Some(PayloadStatus::invalid_with( latest_valid_hash, "Parent header has been previously invalidated.".into(), From e2a93d6b72b2de9c815729d01e5385d3b4e33fca Mon Sep 17 00:00:00 2001 From: fmoletta Date: Mon, 13 Oct 2025 18:32:18 -0300 Subject: [PATCH 06/38] Tidy up code --- crates/networking/p2p/sync.rs | 200 ++++++++++---------- crates/networking/rpc/engine/fork_choice.rs | 5 +- crates/networking/rpc/engine/payload.rs | 5 +- 3 files changed, 107 insertions(+), 103 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index e56dcff7667..302b0c6eceb 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -344,21 +344,17 @@ impl Syncer { current_head, sync_head ); - info!("Starting header download"); - let start_block_number; - let end_block_number; - - // Check if the sync_head is a pending block + // Check if the sync_head is a pending block, if so, gather all pending blocks belonging to its chain let mut pending_blocks = vec![]; while let Some(block) = store.get_pending_block(sync_head).await? { sync_head = block.header.parent_hash; pending_blocks.push(block); } - if !pending_blocks.is_empty() { - let pending_numbers = pending_blocks.iter().map(|b| b.header.number).collect::>(); - info!("Fetched pending blocks: {pending_numbers:?}"); - } + let start_block_number; + let end_block_number; + + // Request and store all block headers from the advertised sync head loop { let Some(mut block_headers) = self .peers @@ -382,17 +378,16 @@ impl Syncer { if store.is_canonical_sync(sync_head)? || sync_head.is_zero() { // Incoming chain merged with current chain // Filter out already canonical blocks from batch - let mut first_canon_block = block_headers.len(); - for (index, header) in block_headers.iter().enumerate() { + let mut first_canon_block = block_headers.len(); + for (index, header) in block_headers.iter().enumerate() { if store.is_canonical_sync(header.hash())? { first_canon_block = index; break; } - } + } block_headers.drain(first_canon_block..block_headers.len()); start_block_number = block_headers.last().as_ref().unwrap().number.max(1); end_block_number = block_headers.first().as_ref().unwrap().number + 1; - info!("Fullsync range: {start_block_number}..{end_block_number}"); store.add_fullsync_batch(block_headers).await?; break; } @@ -408,9 +403,7 @@ impl Syncer { "Processing batch from block number {start} to {}", start + batch_size as u64 ); - let mut headers = store - .read_fullsync_batch(start, batch_size as u64) - .await?; + let mut headers = store.read_fullsync_batch(start, batch_size as u64).await?; let mut blocks = Vec::new(); // Request block bodies // Download block bodies @@ -435,111 +428,116 @@ impl Syncer { blocks.first().ok_or(SyncError::NoBlocks)?.hash(), blocks.last().ok_or(SyncError::NoBlocks)?.hash() ); - self.add_blocks_in_batch(blocks, final_batch, store.clone()).await?; - - + self.add_blocks_in_batch(blocks, final_batch, store.clone()) + .await?; } // Execute pending blocks if !pending_blocks.is_empty() { - info!("Executing pending blocks {}", pending_blocks.len()); - self.add_blocks_in_batch(pending_blocks, true, store).await?; - + info!( + "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", + pending_blocks.len(), + pending_blocks.first().ok_or(SyncError::NoBlocks)?.hash(), + pending_blocks.last().ok_or(SyncError::NoBlocks)?.hash() + ); + self.add_blocks_in_batch(pending_blocks, true, store) + .await?; } Ok(()) } -async fn add_blocks_in_batch(&self, blocks: Vec, final_batch: bool, store: Store) -> Result<(), SyncError> { - let execution_start = Instant::now(); - // Copy some values for later - let blocks_len = blocks.len(); - let numbers_and_hashes = blocks - .iter() - .map(|b| (b.header.number, b.hash())) - .collect::>(); - let (last_block_number, last_block_hash) = numbers_and_hashes - .last() - .cloned() - .ok_or(SyncError::InvalidRangeReceived)?; - let (first_block_number, first_block_hash) = numbers_and_hashes - .first() - .cloned() - .ok_or(SyncError::InvalidRangeReceived)?; + async fn add_blocks_in_batch( + &self, + blocks: Vec, + final_batch: bool, + store: Store, + ) -> Result<(), SyncError> { + let execution_start = Instant::now(); + // Copy some values for later + let blocks_len = blocks.len(); + let numbers_and_hashes = blocks + .iter() + .map(|b| (b.header.number, b.hash())) + .collect::>(); + let (last_block_number, last_block_hash) = numbers_and_hashes + .last() + .cloned() + .ok_or(SyncError::InvalidRangeReceived)?; + let (first_block_number, first_block_hash) = numbers_and_hashes + .first() + .cloned() + .ok_or(SyncError::InvalidRangeReceived)?; - let blocks_hashes = blocks - .iter() - .map(|block| block.hash()) - .collect::>(); - // Run the batch - if let Err((err, batch_failure)) = Syncer::add_blocks( - self.blockchain.clone(), - blocks, - final_batch, - self.cancel_token.clone(), - ) - .await - { - if let Some(batch_failure) = batch_failure { - warn!("Failed to add block during FullSync: {err}"); - // Since running the batch failed we set the failing block and it's descendants with having an invalid ancestor on the following cases. - if let ChainError::InvalidBlock(_) = err { - let mut block_hashes_with_invalid_ancestor: Vec = vec![]; - if let Some(index) = blocks_hashes - .iter() - .position(|x| x == &batch_failure.failed_block_hash) - { - block_hashes_with_invalid_ancestor = - blocks_hashes[index..].to_vec(); - } + let blocks_hashes = blocks.iter().map(|block| block.hash()).collect::>(); + // Run the batch + if let Err((err, batch_failure)) = Syncer::add_blocks( + self.blockchain.clone(), + blocks, + final_batch, + self.cancel_token.clone(), + ) + .await + { + if let Some(batch_failure) = batch_failure { + warn!("Failed to add block during FullSync: {err}"); + // Since running the batch failed we set the failing block and it's descendants with having an invalid ancestor on the following cases. + if let ChainError::InvalidBlock(_) = err { + let mut block_hashes_with_invalid_ancestor: Vec = vec![]; + if let Some(index) = blocks_hashes + .iter() + .position(|x| x == &batch_failure.failed_block_hash) + { + block_hashes_with_invalid_ancestor = blocks_hashes[index..].to_vec(); + } - for hash in block_hashes_with_invalid_ancestor { - store - .set_latest_valid_ancestor(hash, batch_failure.last_valid_hash) - .await?; + for hash in block_hashes_with_invalid_ancestor { + store + .set_latest_valid_ancestor(hash, batch_failure.last_valid_hash) + .await?; + } + // // We also set with having an invalid ancestor all the hashes remaining which are descendants as well. + // for header in &self.current_headers { + // self.store + // .set_latest_valid_ancestor( + // header.hash(), + // batch_failure.last_valid_hash, + // ) + // .await?; + // } } - // // We also set with having an invalid ancestor all the hashes remaining which are descendants as well. - // for header in &self.current_headers { - // self.store - // .set_latest_valid_ancestor( - // header.hash(), - // batch_failure.last_valid_hash, - // ) - // .await?; - // } } + return Err(err.into()); } - return Err(err.into()); - } - store - .forkchoice_update( - Some(numbers_and_hashes), - last_block_number, - last_block_hash, - None, - None, - ) - .await?; + store + .forkchoice_update( + Some(numbers_and_hashes), + last_block_number, + last_block_hash, + None, + None, + ) + .await?; - let execution_time: f64 = execution_start.elapsed().as_millis() as f64 / 1000.0; - let blocks_per_second = blocks_len as f64 / execution_time; + let execution_time: f64 = execution_start.elapsed().as_millis() as f64 / 1000.0; + let blocks_per_second = blocks_len as f64 / execution_time; - info!( - "[SYNCING] Executed & stored {} blocks in {:.3} seconds.\n\ + info!( + "[SYNCING] Executed & stored {} blocks in {:.3} seconds.\n\ Started at block with hash {} (number {}).\n\ Finished at block with hash {} (number {}).\n\ Blocks per second: {:.3}", - blocks_len, - execution_time, - first_block_hash, - first_block_number, - last_block_hash, - last_block_number, - blocks_per_second - ); - Ok(()) -} + blocks_len, + execution_time, + first_block_hash, + first_block_number, + last_block_hash, + last_block_number, + blocks_per_second + ); + Ok(()) + } /// Executes the given blocks and stores them /// If sync_head_found is true, they will be executed one by one diff --git a/crates/networking/rpc/engine/fork_choice.rs b/crates/networking/rpc/engine/fork_choice.rs index 8c97e7f6b92..5f45330bec6 100644 --- a/crates/networking/rpc/engine/fork_choice.rs +++ b/crates/networking/rpc/engine/fork_choice.rs @@ -201,7 +201,10 @@ async fn handle_forkchoice( .await? { // Invalidate the child too - context.storage.set_latest_valid_ancestor(head_block.hash(), latest_valid_hash).await?; + context + .storage + .set_latest_valid_ancestor(head_block.hash(), latest_valid_hash) + .await?; return Ok(( None, ForkChoiceResponse::from(PayloadStatus::invalid_with( diff --git a/crates/networking/rpc/engine/payload.rs b/crates/networking/rpc/engine/payload.rs index 7b932fc52c8..3d2a2eca898 100644 --- a/crates/networking/rpc/engine/payload.rs +++ b/crates/networking/rpc/engine/payload.rs @@ -572,7 +572,10 @@ async fn validate_ancestors( .await? { // Invalidate child too - context.storage.set_latest_valid_ancestor(block.header.hash(), latest_valid_hash).await?; + context + .storage + .set_latest_valid_ancestor(block.header.hash(), latest_valid_hash) + .await?; return Ok(Some(PayloadStatus::invalid_with( latest_valid_hash, "Parent header has been previously invalidated.".into(), From 931de19acaa2037164dd7899b9d5b4dd266fc9c5 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Mon, 13 Oct 2025 18:38:30 -0300 Subject: [PATCH 07/38] Avoid writing header batch to DB if we only have 1 header batch to process --- crates/networking/p2p/sync.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 302b0c6eceb..a676b5e766a 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -353,6 +353,8 @@ impl Syncer { let start_block_number; let end_block_number; + let mut headers = vec![]; + let mut single_batch = true; // Request and store all block headers from the advertised sync head loop { @@ -388,10 +390,16 @@ impl Syncer { block_headers.drain(first_canon_block..block_headers.len()); start_block_number = block_headers.last().as_ref().unwrap().number.max(1); end_block_number = block_headers.first().as_ref().unwrap().number + 1; - store.add_fullsync_batch(block_headers).await?; + // If the fullsync consists of a single batch of headers we can just keep them in memory instead of writing them to Store + if single_batch { + headers = block_headers; + } else { + store.add_fullsync_batch(block_headers).await?; + } break; } store.add_fullsync_batch(block_headers).await?; + single_batch = false; } info!("Downloading Bodies and executing blocks"); @@ -403,7 +411,9 @@ impl Syncer { "Processing batch from block number {start} to {}", start + batch_size as u64 ); - let mut headers = store.read_fullsync_batch(start, batch_size as u64).await?; + if !single_batch { + headers = store.read_fullsync_batch(start, batch_size as u64).await?; + } let mut blocks = Vec::new(); // Request block bodies // Download block bodies From b5ce864644239c112dc99dca9ee73de93cd93d19 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Mon, 13 Oct 2025 18:43:57 -0300 Subject: [PATCH 08/38] Add InMemory impl --- crates/storage/store_db/in_memory.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index 9128da56386..369604e4bdc 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -35,6 +35,8 @@ pub struct StoreInner { invalid_ancestors: HashMap, // Stores current Snap Sate snap_state: SnapState, + // Stores fetched headers during a fullsync + fullsync_headers: HashMap, } #[derive(Default, Debug)] @@ -619,7 +621,10 @@ impl StoreEngine for Store { } async fn add_fullsync_batch(&self, headers: Vec) -> Result<(), StoreError> { - todo!() + self.inner()? + .fullsync_headers + .extend(headers.into_iter().map(|h| (h.number, h))); + Ok(()) } async fn read_fullsync_batch( @@ -627,7 +632,18 @@ impl StoreEngine for Store { start: BlockNumber, limit: u64, ) -> Result, StoreError> { - todo!() + let store = self.inner()?; + (start..start + limit) + .map(|ref n| { + store + .fullsync_headers + .get(n) + .cloned() + .ok_or(StoreError::Custom(format!( + "Missing fullsync header for block {n}" + ))) + }) + .collect::, _>>() } } From 524055c02a364a03ef58885afd77771123952471 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Mon, 13 Oct 2025 18:49:51 -0300 Subject: [PATCH 09/38] Wipe header table after sync is done' --- crates/networking/p2p/sync.rs | 3 ++- crates/storage/api.rs | 2 ++ crates/storage/store.rs | 4 ++++ crates/storage/store_db/in_memory.rs | 5 +++++ crates/storage/store_db/rocksdb.rs | 22 ++++++++++++++++++++++ 5 files changed, 35 insertions(+), 1 deletion(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index a676b5e766a..eb2bcdad260 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -450,10 +450,11 @@ impl Syncer { pending_blocks.first().ok_or(SyncError::NoBlocks)?.hash(), pending_blocks.last().ok_or(SyncError::NoBlocks)?.hash() ); - self.add_blocks_in_batch(pending_blocks, true, store) + self.add_blocks_in_batch(pending_blocks, true, store.clone()) .await?; } + store.clear_fullsync_headers().await?; Ok(()) } diff --git a/crates/storage/api.rs b/crates/storage/api.rs index 7946c19f78f..c26ebb97560 100644 --- a/crates/storage/api.rs +++ b/crates/storage/api.rs @@ -354,4 +354,6 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { start: BlockNumber, limit: u64, ) -> Result, StoreError>; + + async fn clear_fullsync_headers(&self) -> Result<(), StoreError>; } diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 2035bb46074..ea0aa569d5d 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -1279,6 +1279,10 @@ impl Store { ) -> Result, StoreError> { self.engine.read_fullsync_batch(start, limit).await } + + pub async fn clear_fullsync_headers(&self) -> Result<(), StoreError> { + self.engine.clear_fullsync_headers().await + } } pub struct AncestorIterator { diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index 369604e4bdc..4a22096706f 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -645,6 +645,11 @@ impl StoreEngine for Store { }) .collect::, _>>() } + + async fn clear_fullsync_headers(&self) -> Result<(), StoreError> { + self.inner()?.fullsync_headers.clear(); + Ok(()) + } } impl Debug for Store { diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index 35a7581e74f..c76c86138dd 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -1425,6 +1425,28 @@ impl StoreEngine for Store { ) .await } + + async fn clear_fullsync_headers(&self) -> Result<(), StoreError> { + let db = self.db.clone(); + + tokio::task::spawn_blocking(move || { + let cf = db + .cf_handle(CF_FULLSYNC_HEADERS) + .ok_or_else(|| StoreError::Custom("Column family not found".to_string()))?; + + let mut iter = db.iterator_cf(&cf, rocksdb::IteratorMode::Start); + let mut batch = WriteBatchWithTransaction::default(); + + while let Some(Ok((key, _))) = iter.next() { + batch.delete_cf(&cf, key); + } + + db.write(batch) + .map_err(|e| StoreError::Custom(format!("RocksDB batch write error: {}", e))) + }) + .await + .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))? + } } /// Open column families From acca8585655c92215e6a97cec35a5a58c248a014 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Mon, 13 Oct 2025 19:10:30 -0300 Subject: [PATCH 10/38] fix --- crates/networking/p2p/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index eb2bcdad260..fa7fb82da18 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -392,7 +392,7 @@ impl Syncer { end_block_number = block_headers.first().as_ref().unwrap().number + 1; // If the fullsync consists of a single batch of headers we can just keep them in memory instead of writing them to Store if single_batch { - headers = block_headers; + headers = block_headers.into_iter().rev().collect(); } else { store.add_fullsync_batch(block_headers).await?; } From 181bc041493243fc787b07880021ea0dc81dc0e6 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 11:27:00 -0300 Subject: [PATCH 11/38] Clippy --- crates/networking/p2p/sync.rs | 28 +++++++++++++++++++++++----- crates/storage/store_db/in_memory.rs | 2 +- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index fa7fb82da18..12f7a17fe39 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -372,11 +372,19 @@ impl Syncer { info!( "Received {} block headers| First Number: {} Last Number: {}", block_headers.len(), - block_headers.first().as_ref().unwrap().number, - block_headers.last().as_ref().unwrap().number, + block_headers + .first() + .as_ref() + .ok_or(SyncError::NoBlocks)? + .number, + block_headers + .last() + .as_ref() + .ok_or(SyncError::NoBlocks)? + .number, ); - sync_head = block_headers.last().unwrap().parent_hash; + sync_head = block_headers.last().ok_or(SyncError::NoBlocks)?.parent_hash; if store.is_canonical_sync(sync_head)? || sync_head.is_zero() { // Incoming chain merged with current chain // Filter out already canonical blocks from batch @@ -388,8 +396,18 @@ impl Syncer { } } block_headers.drain(first_canon_block..block_headers.len()); - start_block_number = block_headers.last().as_ref().unwrap().number.max(1); - end_block_number = block_headers.first().as_ref().unwrap().number + 1; + start_block_number = block_headers + .last() + .as_ref() + .ok_or(SyncError::NoBlocks)? + .number + .max(1); + end_block_number = block_headers + .first() + .as_ref() + .ok_or(SyncError::NoBlocks)? + .number + + 1; // If the fullsync consists of a single batch of headers we can just keep them in memory instead of writing them to Store if single_batch { headers = block_headers.into_iter().rev().collect(); diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index 4a22096706f..605a9394fa3 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -646,7 +646,7 @@ impl StoreEngine for Store { .collect::, _>>() } - async fn clear_fullsync_headers(&self) -> Result<(), StoreError> { + async fn clear_fullsync_headers(&self) -> Result<(), StoreError> { self.inner()?.fullsync_headers.clear(); Ok(()) } From 26d5066c34227da5b60447e2badc24b95f5a0ff2 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 11:55:48 -0300 Subject: [PATCH 12/38] fix --- crates/networking/p2p/sync.rs | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 12f7a17fe39..3af022c5118 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -352,7 +352,7 @@ impl Syncer { } let start_block_number; - let end_block_number; + let mut end_block_number = 0; let mut headers = vec![]; let mut single_batch = true; @@ -383,6 +383,13 @@ impl Syncer { .ok_or(SyncError::NoBlocks)? .number, ); + end_block_number = end_block_number.max( + block_headers + .first() + .as_ref() + .ok_or(SyncError::NoBlocks)? + .number, + ); sync_head = block_headers.last().ok_or(SyncError::NoBlocks)?.parent_hash; if store.is_canonical_sync(sync_head)? || sync_head.is_zero() { @@ -402,12 +409,6 @@ impl Syncer { .ok_or(SyncError::NoBlocks)? .number .max(1); - end_block_number = block_headers - .first() - .as_ref() - .ok_or(SyncError::NoBlocks)? - .number - + 1; // If the fullsync consists of a single batch of headers we can just keep them in memory instead of writing them to Store if single_batch { headers = block_headers.into_iter().rev().collect(); @@ -419,6 +420,7 @@ impl Syncer { store.add_fullsync_batch(block_headers).await?; single_batch = false; } + end_block_number = end_block_number + 1; info!("Downloading Bodies and executing blocks"); for start in (start_block_number..end_block_number).step_by(*EXECUTE_BATCH_SIZE) { @@ -525,15 +527,6 @@ impl Syncer { .set_latest_valid_ancestor(hash, batch_failure.last_valid_hash) .await?; } - // // We also set with having an invalid ancestor all the hashes remaining which are descendants as well. - // for header in &self.current_headers { - // self.store - // .set_latest_valid_ancestor( - // header.hash(), - // batch_failure.last_valid_hash, - // ) - // .await?; - // } } } return Err(err.into()); From 41475ab44b8dd3ae83d221e6c3e93fd3eb7877bf Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 12:00:09 -0300 Subject: [PATCH 13/38] Tidy up code --- crates/networking/p2p/sync.rs | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 3af022c5118..ee6f08f661b 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -369,27 +369,16 @@ impl Syncer { }; debug!("Sync Log 9: Received {} block headers", block_headers.len()); + let first_header = block_headers.first().ok_or(SyncError::NoBlocks)?; + let last_header = block_headers.last().ok_or(SyncError::NoBlocks)?; + info!( "Received {} block headers| First Number: {} Last Number: {}", block_headers.len(), - block_headers - .first() - .as_ref() - .ok_or(SyncError::NoBlocks)? - .number, - block_headers - .last() - .as_ref() - .ok_or(SyncError::NoBlocks)? - .number, - ); - end_block_number = end_block_number.max( - block_headers - .first() - .as_ref() - .ok_or(SyncError::NoBlocks)? - .number, + first_header.number, + last_header.number, ); + end_block_number = end_block_number.max(first_header.number); sync_head = block_headers.last().ok_or(SyncError::NoBlocks)?.parent_hash; if store.is_canonical_sync(sync_head)? || sync_head.is_zero() { @@ -405,7 +394,6 @@ impl Syncer { block_headers.drain(first_canon_block..block_headers.len()); start_block_number = block_headers .last() - .as_ref() .ok_or(SyncError::NoBlocks)? .number .max(1); From f8c2936a8136727536ea3348d591685d6d51d439 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 12:30:40 -0300 Subject: [PATCH 14/38] fix(snap->full) launch full sync cycle instead of relying on BlockState::into_fullsync --- crates/networking/p2p/sync.rs | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index ee6f08f661b..87f78084aa5 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -174,7 +174,7 @@ impl Syncer { /// Performs the sync cycle described in `start_sync`, returns an error if the sync fails at any given step and aborts all active processes async fn sync_cycle_snap(&mut self, sync_head: H256, store: Store) -> Result<(), SyncError> { // Take picture of the current sync mode, we will update the original value when we need to - let mut sync_mode = SyncMode::Snap; + let sync_mode = SyncMode::Snap; // Request all block headers between the current head and the sync head // We will begin from the current head so that we download the earliest state first // This step is not parallelized @@ -275,9 +275,8 @@ impl Syncer { debug!( "Sync head is less than {MIN_FULL_BLOCKS} blocks away, switching to FullSync" ); - sync_mode = SyncMode::Full; self.snap_enabled.store(false, Ordering::Relaxed); - block_sync_state = block_sync_state.into_fullsync().await?; + return self.sync_cycle_full(sync_head, store.clone()).await; } } @@ -329,20 +328,7 @@ impl Syncer { mut sync_head: H256, store: Store, ) -> Result<(), SyncError> { - // Request all block headers between the current head and the sync head - // We will begin from the current head so that we download the earliest state first - // This step is not parallelized - let block_sync_state = FullBlockSyncState::new(store.clone()); - // Check if we have some blocks downloaded from a previous sync attempt - // This applies only to snap sync—full sync always starts fetching headers - // from the canonical block, which updates as new block headers are fetched. - - // Update current fetch head - let current_head = block_sync_state.get_current_head().await?; - info!( - "Syncing from current head {:?} to sync_head {:?}", - current_head, sync_head - ); + info!("Syncing to sync_head {:?}", sync_head); // Check if the sync_head is a pending block, if so, gather all pending blocks belonging to its chain let mut pending_blocks = vec![]; @@ -351,6 +337,9 @@ impl Syncer { pending_blocks.push(block); } + // Request all block headers between the sync head and our local chain + // We will begin from the sync head so that we download the latest state first, ensuring we follow the correct chain + // This step is not parallelized let start_block_number; let mut end_block_number = 0; let mut headers = vec![]; From e0845fec130278812fd349bac503cc1cb65a3cb2 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 12:44:40 -0300 Subject: [PATCH 15/38] Remove BlockSyncState::Full --- crates/networking/p2p/sync.rs | 261 +--------------------------------- 1 file changed, 7 insertions(+), 254 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 87f78084aa5..90350b3008b 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -173,12 +173,10 @@ impl Syncer { /// Performs the sync cycle described in `start_sync`, returns an error if the sync fails at any given step and aborts all active processes async fn sync_cycle_snap(&mut self, sync_head: H256, store: Store) -> Result<(), SyncError> { - // Take picture of the current sync mode, we will update the original value when we need to - let sync_mode = SyncMode::Snap; // Request all block headers between the current head and the sync head // We will begin from the current head so that we download the earliest state first // This step is not parallelized - let mut block_sync_state = BlockSyncState::new(&sync_mode, store.clone()); + let mut block_sync_state = BlockSyncState::new_snap(store.clone()); // Check if we have some blocks downloaded from a previous sync attempt // This applies only to snap sync—full sync always starts fetching headers // from the canonical block, which updates as new block headers are fetched. @@ -201,7 +199,6 @@ impl Syncer { debug!( "Sync Log 2: State block hashes len {}", match block_sync_state { - BlockSyncState::Full(_) => 0, BlockSyncState::Snap(ref snap_block_sync_state) => snap_block_sync_state.block_hashes.len(), } @@ -268,7 +265,7 @@ impl Syncer { current_head_number = last_block_number; // If the sync head is less than 64 blocks away from our current head switch to full-sync - if sync_mode == SyncMode::Snap && sync_head_found { + if sync_head_found { let latest_block_number = store.get_latest_block_number().await?; if last_block_number.saturating_sub(latest_block_number) < MIN_FULL_BLOCKS as u64 { // Too few blocks for a snap sync, switching to full sync @@ -285,18 +282,6 @@ impl Syncer { let block_headers_iter = block_headers.into_iter().skip(1); match block_sync_state { - BlockSyncState::Full(ref mut state) => { - state - .process_incoming_headers( - block_headers_iter, - sync_head, - sync_head_found, - self.blockchain.clone(), - self.peers.clone(), - self.cancel_token.clone(), - ) - .await?; - } BlockSyncState::Snap(ref mut state) => { state.process_incoming_headers(block_headers_iter).await? } @@ -308,13 +293,11 @@ impl Syncer { }; } - if let SyncMode::Snap = sync_mode { - self.snap_sync(&store, &mut block_sync_state).await?; + self.snap_sync(&store, &mut block_sync_state).await?; - store.clear_snap_state().await?; + store.clear_snap_state().await?; + self.snap_enabled.store(false, Ordering::Relaxed); - self.snap_enabled.store(false, Ordering::Relaxed); - } Ok(()) } @@ -624,7 +607,6 @@ async fn store_receipts( /// Persisted State during the Block Sync phase #[derive(Clone)] pub enum BlockSyncState { - Full(FullBlockSyncState), Snap(SnapBlockSyncState), } @@ -635,221 +617,17 @@ pub struct SnapBlockSyncState { store: Store, } -/// Persisted State during the Block Sync phase for FullSync -#[derive(Clone)] -pub struct FullBlockSyncState { - current_headers: Vec, - current_blocks: Vec, - store: Store, -} - impl BlockSyncState { - fn new(sync_mode: &SyncMode, store: Store) -> Self { - match sync_mode { - SyncMode::Full => BlockSyncState::Full(FullBlockSyncState::new(store)), - SyncMode::Snap => BlockSyncState::Snap(SnapBlockSyncState::new(store)), - } + fn new_snap(store: Store) -> Self { + BlockSyncState::Snap(SnapBlockSyncState::new(store)) } /// Obtain the current head from where to start or resume block sync async fn get_current_head(&self) -> Result { match self { - BlockSyncState::Full(state) => state.get_current_head().await, BlockSyncState::Snap(state) => state.get_current_head().await, } } - - /// Converts self into a FullSync state, does nothing if self is already a FullSync state - pub async fn into_fullsync(self) -> Result { - // Switch from Snap to Full sync and vice versa - let state = match self { - BlockSyncState::Full(state) => state, - BlockSyncState::Snap(state) => state.into_fullsync().await?, - }; - Ok(Self::Full(state)) - } -} - -impl FullBlockSyncState { - fn new(store: Store) -> Self { - Self { - store, - current_headers: Vec::new(), - current_blocks: Vec::new(), - } - } - - /// Obtain the current head from where to start or resume block sync - async fn get_current_head(&self) -> Result { - self.store - .get_latest_canonical_block_hash() - .await? - .ok_or(SyncError::NoLatestCanonical) - } - - /// Saves incoming headers, requests as many block bodies as needed to complete - /// an execution batch and executes it. - /// An incomplete batch may be executed if the sync_head was already found - /// Returns bool finish to know whether the amount of block headers was less than MAX_BLOCK_BODIES_TO_REQUEST - /// to determine if there's still more blocks to download. - /// Returns bool sync_head_found to know whether full sync was completed. - async fn process_incoming_headers( - &mut self, - block_headers: impl Iterator, - sync_head: H256, - sync_head_found_in_block_headers: bool, - blockchain: Arc, - mut peers: PeerHandler, - cancel_token: CancellationToken, - ) -> Result<(bool, bool), SyncError> { - info!("Processing incoming headers full sync"); - self.current_headers.extend(block_headers); - - let mut sync_head_found = sync_head_found_in_block_headers; - let finished = self.current_headers.len() <= MAX_BLOCK_BODIES_TO_REQUEST; - // if self.current_headers.len() < *EXECUTE_BATCH_SIZE && !sync_head_found { - // // We don't have enough headers to fill up a batch, lets request more - // return Ok(()); - // } - // If we have enough headers to fill execution batches, request the matching bodies - // while self.current_headers.len() >= *EXECUTE_BATCH_SIZE - // || !self.current_headers.is_empty() && sync_head_found - // { - // Download block bodies - let headers = - &self.current_headers[..min(MAX_BLOCK_BODIES_TO_REQUEST, self.current_headers.len())]; - let bodies = peers - .request_and_validate_block_bodies(headers) - .await? - .ok_or(SyncError::BodiesNotFound)?; - debug!("Obtained: {} block bodies", bodies.len()); - let blocks = self - .current_headers - .drain(..bodies.len()) - .zip(bodies) - .map(|(header, body)| Block { header, body }); - self.current_blocks.extend(blocks); - // } - - // If we have the sync_head as a pending block from a new_payload request and its parent_hash matches the hash of the latest received header - // we set the sync_head as found. Then we add it in current_blocks for execution. - if let Some(block) = self.store.get_pending_block(sync_head).await? { - if let Some(last_block) = self.current_blocks.last() { - if last_block.hash() == block.header.parent_hash { - self.current_blocks.push(block); - sync_head_found = true; - } - } - } - // Execute full blocks - // while self.current_blocks.len() >= *EXECUTE_BATCH_SIZE - // || (!self.current_blocks.is_empty() && sync_head_found) - // { - // Now that we have a full batch, we can execute and store the blocks in batch - - info!( - "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", - self.current_blocks.len(), - self.current_blocks - .first() - .ok_or(SyncError::NoBlocks)? - .hash(), - self.current_blocks - .last() - .ok_or(SyncError::NoBlocks)? - .hash() - ); - let execution_start = Instant::now(); - let block_batch: Vec = self - .current_blocks - .drain(..min(*EXECUTE_BATCH_SIZE, self.current_blocks.len())) - .collect(); - // Copy some values for later - let blocks_len = block_batch.len(); - let numbers_and_hashes = block_batch - .iter() - .map(|b| (b.header.number, b.hash())) - .collect::>(); - let (last_block_number, last_block_hash) = numbers_and_hashes - .last() - .cloned() - .ok_or(SyncError::InvalidRangeReceived)?; - let (first_block_number, first_block_hash) = numbers_and_hashes - .first() - .cloned() - .ok_or(SyncError::InvalidRangeReceived)?; - - let block_batch_hashes = block_batch - .iter() - .map(|block| block.hash()) - .collect::>(); - - // Run the batch - if let Err((err, batch_failure)) = Syncer::add_blocks( - blockchain.clone(), - block_batch, - sync_head_found, - cancel_token.clone(), - ) - .await - { - if let Some(batch_failure) = batch_failure { - warn!("Failed to add block during FullSync: {err}"); - // Since running the batch failed we set the failing block and it's descendants with having an invalid ancestor on the following cases. - if let ChainError::InvalidBlock(_) = err { - let mut block_hashes_with_invalid_ancestor: Vec = vec![]; - if let Some(index) = block_batch_hashes - .iter() - .position(|x| x == &batch_failure.failed_block_hash) - { - block_hashes_with_invalid_ancestor = block_batch_hashes[index..].to_vec(); - } - - for hash in block_hashes_with_invalid_ancestor { - self.store - .set_latest_valid_ancestor(hash, batch_failure.last_valid_hash) - .await?; - } - // We also set with having an invalid ancestor all the hashes remaining which are descendants as well. - for header in &self.current_headers { - self.store - .set_latest_valid_ancestor(header.hash(), batch_failure.last_valid_hash) - .await?; - } - } - } - return Err(err.into()); - } - - self.store - .forkchoice_update( - Some(numbers_and_hashes), - last_block_number, - last_block_hash, - None, - None, - ) - .await?; - - let execution_time: f64 = execution_start.elapsed().as_millis() as f64 / 1000.0; - let blocks_per_second = blocks_len as f64 / execution_time; - - info!( - "[SYNCING] Executed & stored {} blocks in {:.3} seconds.\n\ - Started at block with hash {} (number {}).\n\ - Finished at block with hash {} (number {}).\n\ - Blocks per second: {:.3}", - blocks_len, - execution_time, - first_block_hash, - first_block_number, - last_block_hash, - last_block_number, - blocks_per_second - ); - // } - Ok((finished, sync_head_found)) - } } impl SnapBlockSyncState { @@ -892,29 +670,6 @@ impl SnapBlockSyncState { self.store.add_block_headers(block_headers_vec).await?; Ok(()) } - - /// Converts self into a FullSync state. - /// Clears SnapSync checkpoints from the Store - /// In the rare case that block headers were stored in a previous iteration, these will be fetched and saved to the FullSync state for full retrieval and execution - async fn into_fullsync(self) -> Result { - // For all collected hashes we must also have the corresponding headers stored - // As this switch will only happen when the sync_head is 64 blocks away or less from our latest block - // The headers to fetch will be at most 64, and none in the most common case - let mut current_headers = Vec::new(); - for hash in self.block_hashes { - let header = self - .store - .get_block_header_by_hash(hash)? - .ok_or(SyncError::CorruptDB)?; - current_headers.push(header); - } - self.store.clear_snap_state().await?; - Ok(FullBlockSyncState { - current_headers, - current_blocks: Vec::new(), - store: self.store, - }) - } } /// Safety function that frees all peer and logs an error if we found freed peers when not expectig to @@ -939,7 +694,6 @@ impl Syncer { // - Fetch the pivot block's state via snap p2p requests // - Execute blocks after the pivot (like in full-sync) let pivot_hash = match block_sync_state { - BlockSyncState::Full(_) => return Err(SyncError::NotInSnapSync), BlockSyncState::Snap(snap_block_sync_state) => snap_block_sync_state .block_hashes .last() @@ -1320,7 +1074,6 @@ impl Syncer { store.add_block(block).await?; let numbers_and_hashes = match block_sync_state { - BlockSyncState::Full(_) => return Err(SyncError::NotInSnapSync), BlockSyncState::Snap(snap_block_sync_state) => snap_block_sync_state .block_hashes .iter() From 116ec2c1f8aa8927b6d6851efcb968a3486e2357 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 14:54:59 -0300 Subject: [PATCH 16/38] Remove BlockSyncState enum --- crates/networking/p2p/peer_handler.rs | 10 ++-- crates/networking/p2p/sync.rs | 84 +++++++++------------------ 2 files changed, 30 insertions(+), 64 deletions(-) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 887381ef817..776fb77d66c 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -22,8 +22,7 @@ use crate::{ connection::server::CastMessage, eth::{ blocks::{ - BLOCK_HEADER_LIMIT, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, - HashOrNumber, + BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, HashOrNumber, BLOCK_HEADER_LIMIT }, receipts::GetReceipts, }, @@ -35,10 +34,9 @@ use crate::{ }, }, snap::encodable_to_proof, - sync::{AccountStorageRoots, BlockSyncState, block_is_stale, update_pivot}, + sync::{block_is_stale, update_pivot, AccountStorageRoots, SnapBlockSyncState}, utils::{ - SendMessageError, dump_to_file, get_account_state_snapshot_file, - get_account_storages_snapshot_file, + dump_to_file, get_account_state_snapshot_file, get_account_storages_snapshot_file, SendMessageError }, }; use tracing::{debug, error, info, trace, warn}; @@ -736,7 +734,7 @@ impl PeerHandler { limit: H256, account_state_snapshots_dir: &Path, pivot_header: &mut BlockHeader, - block_sync_state: &mut BlockSyncState, + block_sync_state: &mut SnapBlockSyncState, ) -> Result<(), PeerHandlerError> { METRICS .current_step diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 90350b3008b..d4054c3d716 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -176,7 +176,7 @@ impl Syncer { // Request all block headers between the current head and the sync head // We will begin from the current head so that we download the earliest state first // This step is not parallelized - let mut block_sync_state = BlockSyncState::new_snap(store.clone()); + let mut block_sync_state = SnapBlockSyncState::new(store.clone()); // Check if we have some blocks downloaded from a previous sync attempt // This applies only to snap sync—full sync always starts fetching headers // from the canonical block, which updates as new block headers are fetched. @@ -198,10 +198,7 @@ impl Syncer { debug!("Sync Log 1: In snap sync"); debug!( "Sync Log 2: State block hashes len {}", - match block_sync_state { - BlockSyncState::Snap(ref snap_block_sync_state) => - snap_block_sync_state.block_hashes.len(), - } + block_sync_state.block_hashes.len() ); debug!("Requesting Block Headers from {current_head}"); @@ -281,11 +278,9 @@ impl Syncer { if block_headers.len() > 1 { let block_headers_iter = block_headers.into_iter().skip(1); - match block_sync_state { - BlockSyncState::Snap(ref mut state) => { - state.process_incoming_headers(block_headers_iter).await? - } - } + block_sync_state + .process_incoming_headers(block_headers_iter) + .await?; } if sync_head_found { @@ -380,7 +375,7 @@ impl Syncer { store.add_fullsync_batch(block_headers).await?; single_batch = false; } - end_block_number = end_block_number + 1; + end_block_number += 1; info!("Downloading Bodies and executing blocks"); for start in (start_block_number..end_block_number).step_by(*EXECUTE_BATCH_SIZE) { @@ -604,12 +599,6 @@ async fn store_receipts( Ok(()) } -/// Persisted State during the Block Sync phase -#[derive(Clone)] -pub enum BlockSyncState { - Snap(SnapBlockSyncState), -} - /// Persisted State during the Block Sync phase for SnapSync #[derive(Clone)] pub struct SnapBlockSyncState { @@ -617,19 +606,6 @@ pub struct SnapBlockSyncState { store: Store, } -impl BlockSyncState { - fn new_snap(store: Store) -> Self { - BlockSyncState::Snap(SnapBlockSyncState::new(store)) - } - - /// Obtain the current head from where to start or resume block sync - async fn get_current_head(&self) -> Result { - match self { - BlockSyncState::Snap(state) => state.get_current_head().await, - } - } -} - impl SnapBlockSyncState { fn new(store: Store) -> Self { Self { @@ -687,18 +663,16 @@ impl Syncer { async fn snap_sync( &mut self, store: &Store, - block_sync_state: &mut BlockSyncState, + block_sync_state: &mut SnapBlockSyncState, ) -> Result<(), SyncError> { // snap-sync: launch tasks to fetch blocks and state in parallel // - Fetch each block's body and its receipt via eth p2p requests // - Fetch the pivot block's state via snap p2p requests // - Execute blocks after the pivot (like in full-sync) - let pivot_hash = match block_sync_state { - BlockSyncState::Snap(snap_block_sync_state) => snap_block_sync_state - .block_hashes - .last() - .ok_or(SyncError::NoBlockHeaders)?, - }; + let pivot_hash = block_sync_state + .block_hashes + .last() + .ok_or(SyncError::NoBlockHeaders)?; let mut pivot_header = store .get_block_header_by_hash(*pivot_hash)? .ok_or(SyncError::CorruptDB)?; @@ -1073,15 +1047,13 @@ impl Syncer { store.add_block(block).await?; - let numbers_and_hashes = match block_sync_state { - BlockSyncState::Snap(snap_block_sync_state) => snap_block_sync_state - .block_hashes - .iter() - .rev() - .enumerate() - .map(|(i, hash)| (pivot_header.number - i as u64, *hash)) - .collect::>(), - }; + let numbers_and_hashes = block_sync_state + .block_hashes + .iter() + .rev() + .enumerate() + .map(|(i, hash)| (pivot_header.number - i as u64, *hash)) + .collect::>(); store .forkchoice_update( @@ -1145,7 +1117,7 @@ pub async fn update_pivot( block_number: u64, block_timestamp: u64, peers: &mut PeerHandler, - block_sync_state: &mut BlockSyncState, + block_sync_state: &mut SnapBlockSyncState, ) -> Result { // We multiply the estimation by 0.9 in order to account for missing slots (~9% in tesnets) let new_pivot_block_number = block_number @@ -1183,17 +1155,13 @@ pub async fn update_pivot( // Reward peer peers.peer_table.record_success(&peer_id).await?; info!("Succesfully updated pivot"); - if let BlockSyncState::Snap(sync_state) = block_sync_state { - let block_headers = peers - .request_block_headers(block_number + 1, pivot.hash()) - .await? - .ok_or(SyncError::NoBlockHeaders)?; - sync_state - .process_incoming_headers(block_headers.into_iter()) - .await?; - } else { - return Err(SyncError::NotInSnapSync); - } + let block_headers = peers + .request_block_headers(block_number + 1, pivot.hash()) + .await? + .ok_or(SyncError::NoBlockHeaders)?; + block_sync_state + .process_incoming_headers(block_headers.into_iter()) + .await?; *METRICS.sync_head_hash.lock().await = pivot.hash(); return Ok(pivot.clone()); } From c9be0e659382f673e09df537cc87e8bd29d0ff79 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 15:29:02 -0300 Subject: [PATCH 17/38] Doc new methods --- crates/storage/api.rs | 3 +++ crates/storage/store.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/crates/storage/api.rs b/crates/storage/api.rs index c26ebb97560..0866050f6c2 100644 --- a/crates/storage/api.rs +++ b/crates/storage/api.rs @@ -347,13 +347,16 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { account_codes: Vec<(H256, Bytes)>, ) -> Result<(), StoreError>; + /// Add a batch of headers downloaded during fullsync async fn add_fullsync_batch(&self, headers: Vec) -> Result<(), StoreError>; + /// Read a batch of headers downloaded during fullsync async fn read_fullsync_batch( &self, start: BlockNumber, limit: u64, ) -> Result, StoreError>; + /// Clear all headers downloaded during fullsync async fn clear_fullsync_headers(&self) -> Result<(), StoreError>; } diff --git a/crates/storage/store.rs b/crates/storage/store.rs index ea0aa569d5d..e7cb54c936d 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -1268,10 +1268,12 @@ impl Store { self.engine.write_account_code_batch(account_codes).await } + /// Add a batch of headers downloaded during fullsync pub async fn add_fullsync_batch(&self, headers: Vec) -> Result<(), StoreError> { self.engine.add_fullsync_batch(headers).await } + /// Read a batch of headers downloaded during fullsync pub async fn read_fullsync_batch( &self, start: BlockNumber, @@ -1280,6 +1282,7 @@ impl Store { self.engine.read_fullsync_batch(start, limit).await } + /// Clear all headers downloaded during fullsync pub async fn clear_fullsync_headers(&self) -> Result<(), StoreError> { self.engine.clear_fullsync_headers().await } From dda6c752a6b3d57f244c6ce575b8068b2aa8ba1f Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 15:40:46 -0300 Subject: [PATCH 18/38] Normalize tracing --- crates/networking/p2p/sync.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index d4054c3d716..565675f43c6 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -339,7 +339,7 @@ impl Syncer { let first_header = block_headers.first().ok_or(SyncError::NoBlocks)?; let last_header = block_headers.last().ok_or(SyncError::NoBlocks)?; - info!( + debug!( "Received {} block headers| First Number: {} Last Number: {}", block_headers.len(), first_header.number, @@ -377,15 +377,11 @@ impl Syncer { } end_block_number += 1; - info!("Downloading Bodies and executing blocks"); + // Download block bodies and execute full blocks in batches for start in (start_block_number..end_block_number).step_by(*EXECUTE_BATCH_SIZE) { let batch_size = EXECUTE_BATCH_SIZE.min((end_block_number - start) as usize); let final_batch = end_block_number == start + batch_size as u64; // Retrieve batch from DB - info!( - "Processing batch from block number {start} to {}", - start + batch_size as u64 - ); if !single_batch { headers = store.read_fullsync_batch(start, batch_size as u64).await?; } From 4f2eb9da8fdc1636d6c7f72c2a1a907149cd5a89 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 16:02:27 -0300 Subject: [PATCH 19/38] [REVERTME] more agressive logging + unwraps --- crates/networking/p2p/sync.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 565675f43c6..ef9fddcc7cc 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -334,12 +334,12 @@ impl Syncer { debug!("Sync Log 8: Sync failed to find target block header, aborting"); return Ok(()); }; - debug!("Sync Log 9: Received {} block headers", block_headers.len()); + info!("Sync Log 9: Received {} block headers", block_headers.len()); - let first_header = block_headers.first().ok_or(SyncError::NoBlocks)?; - let last_header = block_headers.last().ok_or(SyncError::NoBlocks)?; + let first_header = block_headers.first().unwrap(); + let last_header = block_headers.last().unwrap(); - debug!( + info!( "Received {} block headers| First Number: {} Last Number: {}", block_headers.len(), first_header.number, @@ -347,7 +347,7 @@ impl Syncer { ); end_block_number = end_block_number.max(first_header.number); - sync_head = block_headers.last().ok_or(SyncError::NoBlocks)?.parent_hash; + sync_head = block_headers.last().unwrap().parent_hash; if store.is_canonical_sync(sync_head)? || sync_head.is_zero() { // Incoming chain merged with current chain // Filter out already canonical blocks from batch @@ -361,7 +361,7 @@ impl Syncer { block_headers.drain(first_canon_block..block_headers.len()); start_block_number = block_headers .last() - .ok_or(SyncError::NoBlocks)? + .unwrap() .number .max(1); // If the fullsync consists of a single batch of headers we can just keep them in memory instead of writing them to Store @@ -395,7 +395,7 @@ impl Syncer { .request_and_validate_block_bodies(header_batch) .await? .ok_or(SyncError::BodiesNotFound)?; - debug!("Obtained: {} block bodies", bodies.len()); + info!("Obtained: {} block bodies", bodies.len()); let block_batch = headers .drain(..bodies.len()) .zip(bodies) @@ -406,8 +406,8 @@ impl Syncer { info!( "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", blocks.len(), - blocks.first().ok_or(SyncError::NoBlocks)?.hash(), - blocks.last().ok_or(SyncError::NoBlocks)?.hash() + blocks.first().unwrap().hash(), + blocks.last().unwrap().hash() ); self.add_blocks_in_batch(blocks, final_batch, store.clone()) .await?; @@ -418,8 +418,8 @@ impl Syncer { info!( "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", pending_blocks.len(), - pending_blocks.first().ok_or(SyncError::NoBlocks)?.hash(), - pending_blocks.last().ok_or(SyncError::NoBlocks)?.hash() + pending_blocks.first().unwrap().hash(), + pending_blocks.last().unwrap().hash() ); self.add_blocks_in_batch(pending_blocks, true, store.clone()) .await?; @@ -612,7 +612,7 @@ impl SnapBlockSyncState { /// Obtain the current head from where to start or resume block sync async fn get_current_head(&self) -> Result { - if let Some(head) = self.store.get_header_download_checkpoint().await? { + if let Some(head) = dbg!(self.store.get_header_download_checkpoint().await?) { Ok(head) } else { self.store From 648970ee7cfffd7d93ece7cc50675c6ec2e7bfa6 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 16:27:44 -0300 Subject: [PATCH 20/38] Fix --- crates/networking/p2p/peer_handler.rs | 8 +++++--- crates/networking/p2p/sync.rs | 12 ++++++------ crates/storage/api.rs | 4 ++-- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 776fb77d66c..e016262a600 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -22,7 +22,8 @@ use crate::{ connection::server::CastMessage, eth::{ blocks::{ - BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, HashOrNumber, BLOCK_HEADER_LIMIT + BLOCK_HEADER_LIMIT, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, + HashOrNumber, }, receipts::GetReceipts, }, @@ -34,9 +35,10 @@ use crate::{ }, }, snap::encodable_to_proof, - sync::{block_is_stale, update_pivot, AccountStorageRoots, SnapBlockSyncState}, + sync::{AccountStorageRoots, SnapBlockSyncState, block_is_stale, update_pivot}, utils::{ - dump_to_file, get_account_state_snapshot_file, get_account_storages_snapshot_file, SendMessageError + SendMessageError, dump_to_file, get_account_state_snapshot_file, + get_account_storages_snapshot_file, }, }; use tracing::{debug, error, info, trace, warn}; diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index ef9fddcc7cc..e9c10d94ecf 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -318,7 +318,7 @@ impl Syncer { // Request all block headers between the sync head and our local chain // We will begin from the sync head so that we download the latest state first, ensuring we follow the correct chain // This step is not parallelized - let start_block_number; + let mut start_block_number; let mut end_block_number = 0; let mut headers = vec![]; let mut single_batch = true; @@ -346,6 +346,7 @@ impl Syncer { last_header.number, ); end_block_number = end_block_number.max(first_header.number); + start_block_number = last_header.number; sync_head = block_headers.last().unwrap().parent_hash; if store.is_canonical_sync(sync_head)? || sync_head.is_zero() { @@ -359,11 +360,9 @@ impl Syncer { } } block_headers.drain(first_canon_block..block_headers.len()); - start_block_number = block_headers - .last() - .unwrap() - .number - .max(1); + if !block_headers.is_empty() { + start_block_number = block_headers.last().unwrap().number + } // If the fullsync consists of a single batch of headers we can just keep them in memory instead of writing them to Store if single_batch { headers = block_headers.into_iter().rev().collect(); @@ -376,6 +375,7 @@ impl Syncer { single_batch = false; } end_block_number += 1; + start_block_number = start_block_number.max(1); // Download block bodies and execute full blocks in batches for start in (start_block_number..end_block_number).step_by(*EXECUTE_BATCH_SIZE) { diff --git a/crates/storage/api.rs b/crates/storage/api.rs index 0866050f6c2..003a13416e8 100644 --- a/crates/storage/api.rs +++ b/crates/storage/api.rs @@ -350,13 +350,13 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { /// Add a batch of headers downloaded during fullsync async fn add_fullsync_batch(&self, headers: Vec) -> Result<(), StoreError>; - /// Read a batch of headers downloaded during fullsync + /// Read a batch of headers downloaded during fullsync async fn read_fullsync_batch( &self, start: BlockNumber, limit: u64, ) -> Result, StoreError>; - /// Clear all headers downloaded during fullsync + /// Clear all headers downloaded during fullsync async fn clear_fullsync_headers(&self) -> Result<(), StoreError>; } From a3cc99494672ae3cf758db92f360537d8d0437a0 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 16:53:48 -0300 Subject: [PATCH 21/38] fix --- crates/networking/p2p/sync.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index e9c10d94ecf..60b09a0f7f9 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -402,15 +402,17 @@ impl Syncer { .map(|(header, body)| Block { header, body }); blocks.extend(block_batch); } - // Execute blocks - info!( - "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", - blocks.len(), - blocks.first().unwrap().hash(), - blocks.last().unwrap().hash() - ); - self.add_blocks_in_batch(blocks, final_batch, store.clone()) - .await?; + if !blocks.is_empty() { + // Execute blocks + info!( + "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", + blocks.len(), + blocks.first().unwrap().hash(), + blocks.last().unwrap().hash() + ); + self.add_blocks_in_batch(blocks, final_batch, store.clone()) + .await?; + } } // Execute pending blocks From 8733ab28f889ae673d9c433347c44a1638f91855 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 17:17:26 -0300 Subject: [PATCH 22/38] fix --- crates/networking/p2p/sync.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 60b09a0f7f9..96847784a32 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -311,6 +311,10 @@ impl Syncer { // Check if the sync_head is a pending block, if so, gather all pending blocks belonging to its chain let mut pending_blocks = vec![]; while let Some(block) = store.get_pending_block(sync_head).await? { + if store.is_canonical_sync(block.hash())? { + // Ignore canonic blocks still in pending + break; + } sync_head = block.header.parent_hash; pending_blocks.push(block); } From 8a70cc57670f5760db47ce21dc65fa9b4e6f77e1 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 17:34:50 -0300 Subject: [PATCH 23/38] Revert "[REVERTME] more agressive logging + unwraps" This reverts commit 4f2eb9da8fdc1636d6c7f72c2a1a907149cd5a89. --- crates/networking/p2p/sync.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 96847784a32..405561f4302 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -338,12 +338,12 @@ impl Syncer { debug!("Sync Log 8: Sync failed to find target block header, aborting"); return Ok(()); }; - info!("Sync Log 9: Received {} block headers", block_headers.len()); + debug!("Sync Log 9: Received {} block headers", block_headers.len()); - let first_header = block_headers.first().unwrap(); - let last_header = block_headers.last().unwrap(); + let first_header = block_headers.first().ok_or(SyncError::NoBlocks)?; + let last_header = block_headers.last().ok_or(SyncError::NoBlocks)?; - info!( + debug!( "Received {} block headers| First Number: {} Last Number: {}", block_headers.len(), first_header.number, @@ -352,7 +352,7 @@ impl Syncer { end_block_number = end_block_number.max(first_header.number); start_block_number = last_header.number; - sync_head = block_headers.last().unwrap().parent_hash; + sync_head = block_headers.last().ok_or(SyncError::NoBlocks)?.parent_hash; if store.is_canonical_sync(sync_head)? || sync_head.is_zero() { // Incoming chain merged with current chain // Filter out already canonical blocks from batch @@ -365,7 +365,7 @@ impl Syncer { } block_headers.drain(first_canon_block..block_headers.len()); if !block_headers.is_empty() { - start_block_number = block_headers.last().unwrap().number + start_block_number = block_headers.last().ok_or(SyncError::NoBlocks)?.number } // If the fullsync consists of a single batch of headers we can just keep them in memory instead of writing them to Store if single_batch { @@ -399,7 +399,7 @@ impl Syncer { .request_and_validate_block_bodies(header_batch) .await? .ok_or(SyncError::BodiesNotFound)?; - info!("Obtained: {} block bodies", bodies.len()); + debug!("Obtained: {} block bodies", bodies.len()); let block_batch = headers .drain(..bodies.len()) .zip(bodies) @@ -411,8 +411,8 @@ impl Syncer { info!( "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", blocks.len(), - blocks.first().unwrap().hash(), - blocks.last().unwrap().hash() + blocks.first().ok_or(SyncError::NoBlocks)?.hash(), + blocks.last().ok_or(SyncError::NoBlocks)?.hash() ); self.add_blocks_in_batch(blocks, final_batch, store.clone()) .await?; @@ -424,8 +424,8 @@ impl Syncer { info!( "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", pending_blocks.len(), - pending_blocks.first().unwrap().hash(), - pending_blocks.last().unwrap().hash() + pending_blocks.first().ok_or(SyncError::NoBlocks)?.hash(), + pending_blocks.last().ok_or(SyncError::NoBlocks)?.hash() ); self.add_blocks_in_batch(pending_blocks, true, store.clone()) .await?; @@ -618,7 +618,7 @@ impl SnapBlockSyncState { /// Obtain the current head from where to start or resume block sync async fn get_current_head(&self) -> Result { - if let Some(head) = dbg!(self.store.get_header_download_checkpoint().await?) { + if let Some(head) = self.store.get_header_download_checkpoint().await? { Ok(head) } else { self.store From 90ddd87d5f5f521c38f918ede05f4c0cd72bdca4 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 17:39:13 -0300 Subject: [PATCH 24/38] fix --- crates/networking/p2p/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 405561f4302..d44118371c9 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -316,7 +316,7 @@ impl Syncer { break; } sync_head = block.header.parent_hash; - pending_blocks.push(block); + pending_blocks.insert(0, block); } // Request all block headers between the sync head and our local chain From 12d7ba476d829e1a927464bf1df21132218d5f1a Mon Sep 17 00:00:00 2001 From: fmoletta Date: Tue, 14 Oct 2025 17:51:14 -0300 Subject: [PATCH 25/38] upgrade log --- crates/networking/p2p/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index d44118371c9..34649c32cda 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -343,7 +343,7 @@ impl Syncer { let first_header = block_headers.first().ok_or(SyncError::NoBlocks)?; let last_header = block_headers.last().ok_or(SyncError::NoBlocks)?; - debug!( + info!( "Received {} block headers| First Number: {} Last Number: {}", block_headers.len(), first_header.number, From 1739c431f2b8dd1ac777ea8475975e4ed403e254 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Wed, 15 Oct 2025 10:05:41 -0300 Subject: [PATCH 26/38] [REVERTME] debug --- crates/storage/store.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/crates/storage/store.rs b/crates/storage/store.rs index e7cb54c936d..37dc8849829 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -24,7 +24,7 @@ use std::{ sync::RwLock, }; use std::{fmt::Debug, path::Path}; -use tracing::{debug, error, info, instrument}; +use tracing::{debug, error, info, instrument, warn}; /// Number of state trie segments to fetch concurrently during state sync pub const STATE_TRIE_SEGMENTS: usize = 2; /// Maximum amount of reads from the snapshot in a single transaction to avoid performance hits due to long-living reads @@ -787,6 +787,14 @@ impl Store { safe: Option, finalized: Option, ) -> Result<(), StoreError> { + match new_canonical_blocks { + Some(ref n_h_s) => { + for (n, h) in n_h_s { + info!("Marking block {n} with hash {h} as canonical"); + } + }, + None => warn!("FCU with no new canonical blocks"), + } // Updates first the latest_block_header // to avoid nonce inconsistencies #3927. *self From 119c5cf73a7200dd80d19283f8afd1cf039efa37 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Thu, 16 Oct 2025 11:34:18 -0300 Subject: [PATCH 27/38] Fix table name --- crates/storage/store_db/rocksdb.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index 08004d9b7af..090dfac5ee3 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -104,7 +104,7 @@ const CF_INVALID_ANCESTORS: &str = "invalid_ancestors"; /// Block headers downloaded during fullsync column family: [`u8;_`] => [`Vec`] /// - [`u8;_`] = `block_number.to_le_bytes()` /// - [`Vec`] = `BlockHeaderRLP::from(block.header.clone()).bytes().clone()` -const CF_FULLSYNC_HEADERS: &str = "headers"; +const CF_FULLSYNC_HEADERS: &str = "fullsync_headers"; #[derive(Debug)] pub struct Store { From 9d8128df4c5b2f7503d00c9676edc5b113062f5c Mon Sep 17 00:00:00 2001 From: fmoletta Date: Thu, 16 Oct 2025 11:45:52 -0300 Subject: [PATCH 28/38] Revert "[REVERTME] debug" This reverts commit 1739c431f2b8dd1ac777ea8475975e4ed403e254. --- crates/storage/store.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/crates/storage/store.rs b/crates/storage/store.rs index c3e9f9a89b6..e7cb54c936d 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -24,7 +24,7 @@ use std::{ sync::RwLock, }; use std::{fmt::Debug, path::Path}; -use tracing::{debug, error, info, instrument, warn}; +use tracing::{debug, error, info, instrument}; /// Number of state trie segments to fetch concurrently during state sync pub const STATE_TRIE_SEGMENTS: usize = 2; /// Maximum amount of reads from the snapshot in a single transaction to avoid performance hits due to long-living reads @@ -787,14 +787,6 @@ impl Store { safe: Option, finalized: Option, ) -> Result<(), StoreError> { - match new_canonical_blocks { - Some(ref n_h_s) => { - for (n, h) in n_h_s { - info!("Marking block {n} with hash {h} as canonical"); - } - } - None => warn!("FCU with no new canonical blocks"), - } // Updates first the latest_block_header // to avoid nonce inconsistencies #3927. *self From 88f651357b6dd1cef34a785cdcc491e508ef9ff3 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Thu, 16 Oct 2025 12:04:52 -0300 Subject: [PATCH 29/38] Fix: retry if given empty block headers --- crates/networking/p2p/peer_handler.rs | 4 ++-- crates/networking/p2p/sync.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index ce4c31f0199..657cac36b1b 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -469,11 +469,11 @@ impl PeerHandler { ) .await { - if are_block_headers_chained(&block_headers, &order) { + if are_block_headers_chained(&block_headers, &order) && !block_headers.is_empty() { return Ok(Some(block_headers)); } else { warn!( - "[SYNCING] Received invalid headers from peer, penalizing peer {peer_id}" + "[SYNCING] Received empty/invalid headers from peer, penalizing peer {peer_id}" ); } } diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 15658e3b738..7e77ce87a2b 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -366,7 +366,7 @@ impl Syncer { end_block_number = end_block_number.max(first_header.number); start_block_number = last_header.number; - sync_head = block_headers.last().ok_or(SyncError::NoBlocks)?.parent_hash; + sync_head = last_header.parent_hash; if store.is_canonical_sync(sync_head)? || sync_head.is_zero() { // Incoming chain merged with current chain // Filter out already canonical blocks from batch From a23be6e018f0098a97c30dac340054454de45409 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Thu, 16 Oct 2025 12:14:14 -0300 Subject: [PATCH 30/38] fmt --- crates/networking/p2p/peer_handler.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 657cac36b1b..14a24cead0a 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -469,7 +469,9 @@ impl PeerHandler { ) .await { - if are_block_headers_chained(&block_headers, &order) && !block_headers.is_empty() { + if are_block_headers_chained(&block_headers, &order) + && !block_headers.is_empty() + { return Ok(Some(block_headers)); } else { warn!( From 593e273c1d680579c49f5427be1af2b05d1ba788 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Thu, 16 Oct 2025 13:26:27 -0300 Subject: [PATCH 31/38] Use drop_cf to clear fullsync headers table --- crates/storage/store_db/rocksdb.rs | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index d5ece1f8368..7c65e14e87c 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -1520,19 +1520,7 @@ impl StoreEngine for Store { let db = self.db.clone(); tokio::task::spawn_blocking(move || { - let cf = db - .cf_handle(CF_FULLSYNC_HEADERS) - .ok_or_else(|| StoreError::Custom("Column family not found".to_string()))?; - - let mut iter = db.iterator_cf(&cf, rocksdb::IteratorMode::Start); - let mut batch = WriteBatchWithTransaction::default(); - - while let Some(Ok((key, _))) = iter.next() { - batch.delete_cf(&cf, key); - } - - db.write(batch) - .map_err(|e| StoreError::Custom(format!("RocksDB batch write error: {}", e))) + db.drop_cf(CF_FULLSYNC_HEADERS).map_err(StoreError::from) }) .await .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))? From b82100b6199c32b42ef730ead9e9e95a81c00aef Mon Sep 17 00:00:00 2001 From: fmoletta Date: Thu, 16 Oct 2025 14:33:37 -0300 Subject: [PATCH 32/38] Remove limit on engine-withdrawals hive test --- .github/workflows/pr-main_l1.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr-main_l1.yaml b/.github/workflows/pr-main_l1.yaml index 15691741c79..1560c178063 100644 --- a/.github/workflows/pr-main_l1.yaml +++ b/.github/workflows/pr-main_l1.yaml @@ -167,7 +167,7 @@ jobs: artifact_prefix: engine_paris - name: "Engine withdrawal tests" simulation: ethereum/engine - limit: "engine-withdrawals/Corrupted Block Hash Payload|Empty Withdrawals|engine-withdrawals test loader|GetPayloadBodies|GetPayloadV2 Block Value|Max Initcode Size|Sync after 2 blocks - Withdrawals on Genesis|Withdraw many accounts|Withdraw to a single account|Withdraw to two accounts|Withdraw zero amount|Withdraw many accounts|Withdrawals Fork on Block 1 - 1 Block Re-Org|Withdrawals Fork on Block 1 - 8 Block Re-Org NewPayload|Withdrawals Fork on Block 2|Withdrawals Fork on Block 3|Withdrawals Fork on Block 8 - 10 Block Re-Org NewPayload|Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org [^S]|Withdrawals Fork on Canonical Block 8 / Side Block 9 - 10 Block Re-Org [^S]" + limit: "engine-withdrawals" artifact_prefix: engine_withdrawals # Investigate this test # - name: "Sync" From 5678b09d64c2c52ae67440ab3334e10ead9113a3 Mon Sep 17 00:00:00 2001 From: fmoletta Date: Thu, 16 Oct 2025 14:34:51 -0300 Subject: [PATCH 33/38] fix(lang) --- crates/networking/p2p/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 7e77ce87a2b..4ef9592da95 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -326,7 +326,7 @@ impl Syncer { let mut pending_blocks = vec![]; while let Some(block) = store.get_pending_block(sync_head).await? { if store.is_canonical_sync(block.hash())? { - // Ignore canonic blocks still in pending + // Ignore canonical blocks still in pending break; } sync_head = block.header.parent_hash; From 5a736d59ae14bc5f46dbf14a46c727a4ed23647f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 16 Oct 2025 15:27:07 -0300 Subject: [PATCH 34/38] chore: uncomment passing reorg tests --- tooling/reorgs/src/main.rs | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index e17e1ac5853..582e0380c72 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -33,19 +33,18 @@ async fn main() { info!(""); run_test(&cmd_path, no_reorgs_full_sync_smoke_test).await; + run_test(&cmd_path, test_reorg_back_to_base).await; - // TODO: uncomment once #4676 is fixed - // run_test(&cmd_path, test_reorg_back_to_base).await; - // // This test is flaky 50% of the time, check that it runs correctly 30 times in a row - // // TODO(#4775): make it deterministic - // for _ in 0..30 { - // run_test(&cmd_path, test_chain_split).await; - // } - // run_test(&cmd_path, test_one_block_reorg_and_back).await; - // run_test(&cmd_path, test_reorg_back_to_base_with_common_ancestor).await; - // run_test(&cmd_path, test_storage_slots_reorg).await; - - // run_test(&cmd_path, test_many_blocks_reorg).await; + // This test is flaky 50% of the time, check that it runs correctly multiple times in a row + // TODO(#4775): make it deterministic + for _ in 0..10 { + run_test(&cmd_path, test_chain_split).await; + } + + run_test(&cmd_path, test_one_block_reorg_and_back).await; + run_test(&cmd_path, test_reorg_back_to_base_with_common_ancestor).await; + run_test(&cmd_path, test_storage_slots_reorg).await; + run_test(&cmd_path, test_many_blocks_reorg).await; } async fn get_ethrex_version(cmd_path: &Path) -> String { @@ -104,7 +103,6 @@ async fn no_reorgs_full_sync_smoke_test(simulator: Arc>) { node1.update_forkchoice(&base_chain).await; } -#[expect(unused)] async fn test_reorg_back_to_base(simulator: Arc>) { let mut simulator = simulator.lock().await; @@ -124,7 +122,6 @@ async fn test_reorg_back_to_base(simulator: Arc>) { node0.update_forkchoice(&base_chain).await; } -#[expect(unused)] async fn test_reorg_back_to_base_with_common_ancestor(simulator: Arc>) { let mut simulator = simulator.lock().await; @@ -149,7 +146,6 @@ async fn test_reorg_back_to_base_with_common_ancestor(simulator: Arc>) { let mut simulator = simulator.lock().await; @@ -172,7 +168,6 @@ async fn test_chain_split(simulator: Arc>) { node0.update_forkchoice(&base_chain).await; } -#[expect(unused)] async fn test_one_block_reorg_and_back(simulator: Arc>) { let mut simulator = simulator.lock().await; let signer: Signer = LocalSigner::new( @@ -243,7 +238,6 @@ async fn test_one_block_reorg_and_back(simulator: Arc>) { assert_eq!(new_balance, initial_balance); } -#[expect(unused)] async fn test_many_blocks_reorg(simulator: Arc>) { let mut simulator = simulator.lock().await; let signer: Signer = LocalSigner::new( @@ -315,7 +309,6 @@ async fn test_many_blocks_reorg(simulator: Arc>) { assert_eq!(new_balance, initial_balance + transfer_amount); } -#[expect(unused)] async fn test_storage_slots_reorg(simulator: Arc>) { let mut simulator = simulator.lock().await; // Initcode for deploying a contract that receives two `bytes32` parameters and sets `storage[param0] = param1` From 211bedd693699b794aab588c5df4cf3e67624c66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 16 Oct 2025 20:37:41 -0300 Subject: [PATCH 35/38] Revert "Use drop_cf to clear fullsync headers table" This reverts commit 593e273c1d680579c49f5427be1af2b05d1ba788. --- crates/storage/store_db/rocksdb.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index 7c65e14e87c..d5ece1f8368 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -1520,7 +1520,19 @@ impl StoreEngine for Store { let db = self.db.clone(); tokio::task::spawn_blocking(move || { - db.drop_cf(CF_FULLSYNC_HEADERS).map_err(StoreError::from) + let cf = db + .cf_handle(CF_FULLSYNC_HEADERS) + .ok_or_else(|| StoreError::Custom("Column family not found".to_string()))?; + + let mut iter = db.iterator_cf(&cf, rocksdb::IteratorMode::Start); + let mut batch = WriteBatchWithTransaction::default(); + + while let Some(Ok((key, _))) = iter.next() { + batch.delete_cf(&cf, key); + } + + db.write(batch) + .map_err(|e| StoreError::Custom(format!("RocksDB batch write error: {}", e))) }) .await .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))? From 2b9bcb2df6a01ebfe16d887b3e2f14a46686981a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 17 Oct 2025 11:24:26 -0300 Subject: [PATCH 36/38] refactor: check we got headers first --- crates/networking/p2p/peer_handler.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 14a24cead0a..a06d1db806d 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -469,8 +469,8 @@ impl PeerHandler { ) .await { - if are_block_headers_chained(&block_headers, &order) - && !block_headers.is_empty() + if !block_headers.is_empty() + && are_block_headers_chained(&block_headers, &order) { return Ok(Some(block_headers)); } else { From b7acb614325378ffca9b73e84b099bb58b6a07e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 17 Oct 2025 11:32:04 -0300 Subject: [PATCH 37/38] refactor: remove unreachable error --- crates/networking/p2p/sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 4ef9592da95..7f36475e789 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -378,8 +378,8 @@ impl Syncer { } } block_headers.drain(first_canon_block..block_headers.len()); - if !block_headers.is_empty() { - start_block_number = block_headers.last().ok_or(SyncError::NoBlocks)?.number + if let Some(last_header) = block_headers.last() { + start_block_number = last_header.number; } // If the fullsync consists of a single batch of headers we can just keep them in memory instead of writing them to Store if single_batch { From 21e3b132804d26c8e47bb7d5f787148216a0f5e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 17 Oct 2025 11:33:20 -0300 Subject: [PATCH 38/38] chore: fix typo in comment --- crates/networking/p2p/sync.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 7f36475e789..ec306f9ba86 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -483,7 +483,8 @@ impl Syncer { { if let Some(batch_failure) = batch_failure { warn!("Failed to add block during FullSync: {err}"); - // Since running the batch failed we set the failing block and it's descendants with having an invalid ancestor on the following cases. + // Since running the batch failed we set the failing block and its descendants + // with having an invalid ancestor on the following cases. if let ChainError::InvalidBlock(_) = err { let mut block_hashes_with_invalid_ancestor: Vec = vec![]; if let Some(index) = blocks_hashes