diff --git a/client/db/Cargo.toml b/client/db/Cargo.toml index a0a0af9158..48f282535a 100644 --- a/client/db/Cargo.toml +++ b/client/db/Cargo.toml @@ -41,6 +41,7 @@ fp-storage = { workspace = true, features = ["default"] } futures = { workspace = true } maplit = "1.0.2" tempfile = "3.21.0" +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } # Substrate sc-block-builder = { workspace = true } sp-consensus = { workspace = true } diff --git a/client/db/src/kv/mod.rs b/client/db/src/kv/mod.rs index b31be3e3b7..114b0e9a58 100644 --- a/client/db/src/kv/mod.rs +++ b/client/db/src/kv/mod.rs @@ -138,12 +138,16 @@ impl> fc_api::Backend for Backend< // where eth_getBlockByNumber("latest") returns block 0 during initial sync. // Users can check sync status via eth_syncing to determine if the node is // still catching up. + // + // IMPORTANT: This function is intentionally read-only. The persisted pointer + // (LATEST_CANONICAL_INDEXED_BLOCK) is maintained exclusively by the reconciler + // in mapping-sync via advance_latest_pointer(). Writing the pointer here from + // a reader caused a race condition where every failed fast path would lower the + // pointer, racing against the reconciler trying to advance it. let best_number: u64 = self.client.info().best_number.unique_saturated_into(); // Fast path: if best is already indexed and canonical, use it directly. if let Some(canonical_hash) = self.indexed_canonical_hash_at(best_number)? { - self.mapping - .set_latest_canonical_indexed_block(best_number)?; return Ok(canonical_hash); } @@ -153,11 +157,9 @@ impl> fc_api::Backend for Backend< let bounded_start = best_number.saturating_sub(1); // Layer 1 — bounded scan: [best-1 .. best-8k] - if let Some((found_number, found_hash)) = + if let Some((_found_number, found_hash)) = self.find_latest_indexed_canonical_block(bounded_start, INDEXED_RECOVERY_SCAN_LIMIT)? { - self.mapping - .set_latest_canonical_indexed_block(found_number)?; return Ok(found_hash); } @@ -183,11 +185,9 @@ impl> fc_api::Backend for Backend< // 8k bounded + 24k exhaustive = 32k total non-overlapping coverage. let exhaustive_start = bounded_start.saturating_sub(INDEXED_RECOVERY_SCAN_LIMIT); let exhaustive_limit = INDEXED_RECOVERY_SCAN_LIMIT * 3; - if let Some((found_number, found_hash)) = + if let Some((_found_number, found_hash)) = self.find_latest_indexed_canonical_block(exhaustive_start, exhaustive_limit)? { - self.mapping - .set_latest_canonical_indexed_block(found_number)?; return Ok(found_hash); } } @@ -199,11 +199,9 @@ impl> fc_api::Backend for Backend< let deep_start = bounded_start .saturating_sub(INDEXED_RECOVERY_SCAN_LIMIT) .saturating_sub(INDEXED_RECOVERY_SCAN_LIMIT * 3); - if let Some((found_number, found_hash)) = self + if let Some((_found_number, found_hash)) = self .find_latest_indexed_canonical_block(deep_start, INDEXED_DEEP_RECOVERY_SCAN_LIMIT)? { - self.mapping - .set_latest_canonical_indexed_block(found_number)?; return Ok(found_hash); } } @@ -212,8 +210,10 @@ impl> fc_api::Backend for Backend< // Checked after deep recovery so we never return an older block when a newer one // exists in the deep window. // - // When the pointer target is stale (e.g. reorg), walk backward from it to - // find the latest valid indexed canonical block instead of falling to genesis. + // The pointer is maintained exclusively by the reconciler (advance_latest_pointer), + // so it is always monotonically increasing and safe to trust here as a fallback. + // When the pointer target is stale (e.g. reorg not yet reconciled), walk backward + // from it to find the latest valid indexed canonical block. if let Some(persisted_number) = self.mapping.latest_canonical_indexed_block_number()? { if persisted_number <= best_number { if let Some(canonical_hash) = self.indexed_canonical_hash_at(persisted_number)? { @@ -222,13 +222,11 @@ impl> fc_api::Backend for Backend< // Pointer target is stale; backtrack from pointer-1 to find a valid block. if persisted_number > 0 { let backtrack_start = persisted_number.saturating_sub(1); - if let Some((found_number, found_hash)) = self + if let Some((_found_number, found_hash)) = self .find_latest_indexed_canonical_block( backtrack_start, INDEXED_RECOVERY_SCAN_LIMIT, )? { - self.mapping - .set_latest_canonical_indexed_block(found_number)?; return Ok(found_hash); } } @@ -868,6 +866,34 @@ mod tests { ); } + /// latest_block_hash() is read-only: it must never write the pointer. Otherwise RPC + /// calls would lower the pointer when the fast path fails and a scan finds an older + /// block, racing the reconciler and causing "latest" to stick. + #[tokio::test] + async fn latest_block_hash_never_lowers_pointer() { + let env = TestEnv::new(5).await; + for n in 1u64..=3 { + env.index_block(n); + } + // Pointer at 5 (e.g. from a previous reconciler tick); blocks 4 and 5 are not indexed. + env.set_pointer(5); + + let _ = env.latest().await; + // Call again to simulate multiple RPC requests between reconciler ticks. + let _ = env.latest().await; + + let pointer_after = env + .backend + .mapping() + .latest_canonical_indexed_block_number() + .expect("read pointer") + .expect("pointer set"); + assert_eq!( + pointer_after, 5, + "reader must not write the pointer; it must remain 5 and never be lowered to 3" + ); + } + #[tokio::test] async fn exhaustive_scan_finds_indexed_block_beyond_bounded_range() { // With the test scan limit (8), best=20 yields: @@ -981,48 +1007,59 @@ mod tests { } #[tokio::test] - async fn pointer_updates_after_stale_pointer_backtrack_recovery() { - // After backtrack from a stale pointer, the persisted pointer should be - // updated to the block we found. + async fn pointer_unchanged_after_stale_pointer_backtrack_recovery() { + // latest_block_hash() is read-only: even when backtracking from a stale + // pointer, it must not modify the persisted pointer. The reconciler is + // the sole writer. let env = TestEnv::new(40).await; env.index_block(2); env.write_stale_mapping(3); env.set_pointer(3); - let _ = env.latest().await; + let result = env.latest().await; + assert_eq!( + result, env.substrate_hashes[2], + "backtrack must still find block 2" + ); - let updated = env + let pointer = env .backend .mapping() .latest_canonical_indexed_block_number() .expect("read pointer"); assert_eq!( - updated, - Some(2), - "pointer should be updated to block 2 found by backtrack" + pointer, + Some(3), + "read-only: pointer must stay at 3, not be lowered to 2" ); } #[tokio::test] - async fn pointer_updates_after_bounded_scan_recovery() { + async fn pointer_unchanged_after_bounded_scan_recovery() { + // latest_block_hash() is read-only: even when the bounded scan finds a + // higher indexed block, the pointer must not be updated. The reconciler + // is the sole writer. let env = TestEnv::new(10).await; for n in 1u64..=6 { env.index_block(n); } env.set_pointer(3); - let _ = env.latest().await; + let result = env.latest().await; + assert_eq!( + result, env.substrate_hashes[6], + "bounded scan must find block 6" + ); - // After the call, the pointer should have been updated to 6. - let updated = env + let pointer = env .backend .mapping() .latest_canonical_indexed_block_number() .expect("read pointer"); assert_eq!( - updated, - Some(6), - "pointer should be updated to the block found by bounded scan" + pointer, + Some(3), + "read-only: pointer must stay at 3, not be advanced to 6" ); } } diff --git a/client/mapping-sync/Cargo.toml b/client/mapping-sync/Cargo.toml index 8eecb46958..d2609438e1 100644 --- a/client/mapping-sync/Cargo.toml +++ b/client/mapping-sync/Cargo.toml @@ -11,6 +11,7 @@ repository = { workspace = true } targets = ["x86_64-unknown-linux-gnu"] [dependencies] +ethereum-types = { workspace = true } futures = { workspace = true } futures-timer = "3.0.3" log = { workspace = true } @@ -32,7 +33,6 @@ fp-rpc = { workspace = true, features = ["default"] } [dev-dependencies] ethereum = { workspace = true } -ethereum-types = { workspace = true } scale-codec = { workspace = true } sqlx = { workspace = true, features = ["runtime-tokio-native-tls", "sqlite"] } tempfile = "3.21.0" diff --git a/client/mapping-sync/src/kv/canonical_reconciler.rs b/client/mapping-sync/src/kv/canonical_reconciler.rs index 59a94a71da..f899f0b064 100644 --- a/client/mapping-sync/src/kv/canonical_reconciler.rs +++ b/client/mapping-sync/src/kv/canonical_reconciler.rs @@ -21,6 +21,20 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto use crate::ReorgInfo; +/// Extract the Ethereum block hash from a substrate block header's consensus digest. +/// This is pruning-safe: digests are always available regardless of state pruning. +fn eth_hash_from_digest(header: &Block::Header) -> Option { + match fp_consensus::find_post_log(header.digest()) { + Ok(fp_consensus::PostLog::Hashes(h)) => Some(h.block_hash), + Ok(fp_consensus::PostLog::Block(block)) => Some(block.header.hash()), + Ok(fp_consensus::PostLog::BlockHash(hash)) => Some(hash), + Err(_) => match fp_consensus::find_pre_log(header.digest()) { + Ok(fp_consensus::PreLog::Block(block)) => Some(block.header.hash()), + Err(_) => None, + }, + } +} + #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct ReconcileWindow { pub start: u64, @@ -127,12 +141,18 @@ pub fn reconcile_from_cursor_batch>( let finalized_number: u64 = client.info().finalized_number.unique_saturated_into(); let sync_from_number = UniqueSaturatedInto::::unique_saturated_into(sync_from); - let start = frontier_backend + let cursor = frontier_backend .mapping() .canonical_number_repair_cursor()? - .unwrap_or(finalized_number) - .max(sync_from_number) - .min(finalized_number); + .unwrap_or(finalized_number); + + // When the cursor has completed its full descending sweep (reached sync_from), + // wrap back to finalized_number to start a fresh cycle. + let start = if cursor <= sync_from_number { + finalized_number + } else { + cursor.max(sync_from_number).min(finalized_number) + }; let end = start .saturating_sub(max_blocks.saturating_sub(1)) .max(sync_from_number); @@ -151,6 +171,45 @@ pub fn reconcile_from_cursor_batch>( Ok(Some(stats)) } +pub fn reconcile_recent_window>( + client: &C, + storage_override: &dyn fc_storage::StorageOverride, + frontier_backend: &fc_db::kv::Backend, + sync_from: ::Number, + window_size: u64, +) -> Result, String> { + if window_size == 0 { + return Ok(None); + } + + let best_number: u64 = client.info().best_number.unique_saturated_into(); + let sync_from_number = UniqueSaturatedInto::::unique_saturated_into(sync_from); + + // Anchor at best_number, not finalized — finalized can be 0 on parachains + // during startup or sync gaps. best_number is always current. + let end = best_number; + let start = end + .saturating_sub(window_size.saturating_sub(1)) + .max(sync_from_number); + + if end < sync_from_number { + return Ok(None); + } + + let stats = reconcile_range_internal( + client, + storage_override, + frontier_backend, + start, + end, + sync_from_number, + best_number, + ScanDirection::Ascending, + CursorUpdateStrategy::KeepLower, + )?; + Ok(Some(stats)) +} + fn reconcile_range_internal>( client: &C, storage_override: &dyn fc_storage::StorageOverride, @@ -193,20 +252,133 @@ fn reconcile_range_internal>( first_unresolved.get_or_insert(number); return Ok(()); }; - let Some(ethereum_block) = storage_override.current_block(canonical_hash) else { - first_unresolved.get_or_insert(number); - return Ok(()); - }; - let canonical_eth_hash = ethereum_block.header.hash(); - let should_update = - frontier_backend.mapping().block_hash_by_number(number)? != Some(canonical_eth_hash); - if should_update { - frontier_backend - .mapping() - .set_block_hash_by_number(number, canonical_eth_hash)?; - updated = updated.saturating_add(1); + + match storage_override.current_block(canonical_hash) { + Some(ethereum_block) => { + let canonical_eth_hash = ethereum_block.header.hash(); + + let should_update = frontier_backend.mapping().block_hash_by_number(number)? + != Some(canonical_eth_hash); + if should_update { + frontier_backend + .mapping() + .set_block_hash_by_number(number, canonical_eth_hash)?; + updated = updated.saturating_add(1); + } + + let block_mapping_has_canonical = frontier_backend + .mapping() + .block_hash(&canonical_eth_hash)? + .map(|hashes| hashes.contains(&canonical_hash)) + .unwrap_or(false); + + if !block_mapping_has_canonical { + let commitment = fc_db::kv::MappingCommitment:: { + block_hash: canonical_hash, + ethereum_block_hash: canonical_eth_hash, + ethereum_transaction_hashes: ethereum_block + .transactions + .iter() + .map(|tx| tx.hash()) + .collect(), + }; + frontier_backend.mapping().write_hashes( + commitment, + number, + fc_db::kv::NumberMappingWrite::Skip, + )?; + } else if !ethereum_block.transactions.is_empty() { + // BLOCK_MAPPING exists but TRANSACTION_MAPPING may be incomplete + // (block was initially synced with pruned state via write_hashes + // with vec![]). If state is now readable, repair tx mappings. + // Check every tx — a partial write (e.g. crash mid-batch) could + // leave some mappings present while others are missing. + let needs_tx_repair = { + let mut needs = false; + for tx in ðereum_block.transactions { + let has_canonical = frontier_backend + .mapping() + .transaction_metadata(&tx.hash())? + .iter() + .any(|m| m.substrate_block_hash == canonical_hash); + if !has_canonical { + needs = true; + break; + } + } + needs + }; + if needs_tx_repair { + let commitment = fc_db::kv::MappingCommitment:: { + block_hash: canonical_hash, + ethereum_block_hash: canonical_eth_hash, + ethereum_transaction_hashes: ethereum_block + .transactions + .iter() + .map(|tx| tx.hash()) + .collect(), + }; + frontier_backend.mapping().write_hashes( + commitment, + number, + fc_db::kv::NumberMappingWrite::Skip, + )?; + updated = updated.saturating_add(1); + } + } + + // Block fully verified — advance highest_reconciled. + highest_reconciled = + Some(highest_reconciled.map_or(number, |current| current.max(number))); + } + None => { + // State unavailable — re-derive the Ethereum block hash from the + // header digest (pruning-safe) instead of trusting BLOCK_NUMBER_MAPPING, + // which may be stale after a reorg. + let digest_eth_hash = client + .header(canonical_hash) + .map_err(|e| format!("{e:?}"))? + .and_then(|h| eth_hash_from_digest::(&h)); + + let Some(verified_eth_hash) = digest_eth_hash else { + first_unresolved.get_or_insert(number); + return Ok(()); + }; + + // Correct BLOCK_NUMBER_MAPPING if it holds a stale value. + let stored_eth_hash = frontier_backend.mapping().block_hash_by_number(number)?; + if stored_eth_hash != Some(verified_eth_hash) { + frontier_backend + .mapping() + .set_block_hash_by_number(number, verified_eth_hash)?; + updated = updated.saturating_add(1); + } + + let has_block_mapping = frontier_backend + .mapping() + .block_hash(&verified_eth_hash)? + .map(|hashes| hashes.contains(&canonical_hash)) + .unwrap_or(false); + + if !has_block_mapping { + let commitment = fc_db::kv::MappingCommitment:: { + block_hash: canonical_hash, + ethereum_block_hash: verified_eth_hash, + ethereum_transaction_hashes: vec![], + }; + frontier_backend.mapping().write_hashes( + commitment, + number, + fc_db::kv::NumberMappingWrite::Skip, + )?; + updated = updated.saturating_add(1); + } + + highest_reconciled = + Some(highest_reconciled.map_or(number, |current| current.max(number))); + } } - highest_reconciled = Some(highest_reconciled.map_or(number, |current| current.max(number))); + Ok(()) }; diff --git a/client/mapping-sync/src/kv/mod.rs b/client/mapping-sync/src/kv/mod.rs index 4548f31667..d191096b22 100644 --- a/client/mapping-sync/src/kv/mod.rs +++ b/client/mapping-sync/src/kv/mod.rs @@ -30,7 +30,9 @@ use sc_client_api::backend::{Backend, StorageProvider}; use sp_api::{ApiExt, ProvideRuntimeApi}; use sp_blockchain::{Backend as _, HeaderBackend}; use sp_consensus::SyncOracle; -use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto, Zero}; +use sp_runtime::traits::{ + Block as BlockT, Header as HeaderT, SaturatedConversion, UniqueSaturatedInto, Zero, +}; // Frontier use fc_storage::StorageOverride; use fp_consensus::{FindLogError, Hashes, Log, PostLog, PreLog}; @@ -44,14 +46,40 @@ use worker::BestBlockInfo; pub const CANONICAL_NUMBER_REPAIR_BATCH_SIZE: u64 = 2048; +/// Max blocks to backfill in one skip-path call to avoid unbounded stall on heavily pruned nodes. +const BACKFILL_ON_SKIP_MAX_BLOCKS: u64 = 1024; + +/// Number of recent blocks to reconcile on every mapping-sync worker tick. +/// Small enough to be cheap per-tick, large enough to cover typical reorg depth. +pub const PERIODIC_RECONCILE_WINDOW: u64 = 16; + +/// Max blocks to repair per idle tick via the cursor-driven full-history sweep. +/// Keeps per-tick cost bounded while ensuring eventual consistency across the entire chain. +pub const CURSOR_REPAIR_IDLE_BATCH: u64 = 128; + +/// Sync a single block's Ethereum mapping from its consensus digest into the Frontier DB. pub fn sync_block>( + client: &C, storage_override: Arc>, backend: &fc_db::kv::Backend, header: &Block::Header, ) -> Result<(), String> { let substrate_block_hash = header.hash(); let block_number: u64 = (*header.number()).unique_saturated_into(); - let number_mapping_write = fc_db::kv::NumberMappingWrite::Skip; + + // Write BLOCK_NUMBER_MAPPING when this block is canonical at this number, so + // latest_block_hash() / indexed_canonical_hash_at() find it during catch-up. + // Uses only HeaderBackend::hash() — no state access, pruning-safe. + // Ok(None) (block number unknown) falls back to Skip; Err is propagated so + // the block stays unsynced and fetch_header retries it on the next tick. + let canonical_hash_at_number = client + .hash(*header.number()) + .map_err(|e| format!("failed to resolve canonical hash at #{block_number}: {e:?}"))?; + let number_mapping_write = if canonical_hash_at_number == Some(substrate_block_hash) { + fc_db::kv::NumberMappingWrite::Write + } else { + fc_db::kv::NumberMappingWrite::Skip + }; match fp_consensus::find_log(header.digest()) { Ok(log) => { @@ -113,7 +141,28 @@ pub fn sync_block>( ) } } - None => backend.mapping().write_none(substrate_block_hash), + None => { + // State is unavailable — likely pruned. Write a minimal + // commitment so BLOCK_MAPPING and BLOCK_NUMBER_MAPPING are + // populated and indexed_canonical_hash_at() can resolve + // this block. Transaction hashes are unavailable without state. + log::warn!( + target: "mapping-sync", + "State unavailable for block #{block_number} ({substrate_block_hash:?}); \ + writing minimal mapping (no tx hashes). \ + This may indicate the pruning window is too narrow.", + ); + let mapping_commitment = fc_db::kv::MappingCommitment:: { + block_hash: substrate_block_hash, + ethereum_block_hash: expect_eth_block_hash, + ethereum_transaction_hashes: vec![], + }; + backend.mapping().write_hashes( + mapping_commitment, + block_number, + number_mapping_write, + ) + } } } }, @@ -175,6 +224,100 @@ where Ok(()) } +/// Backfill BLOCK_NUMBER_MAPPING for already-synced canonical blocks in `[from..=to]`. +/// Uses only `HeaderBackend` and consensus digests — no state access, pruning-safe. +/// Stops after writing `max_blocks` mappings to avoid unbounded stall on heavily pruned nodes. +/// Returns the count of mappings written. +fn backfill_number_mappings( + client: &C, + substrate_backend: &BE, + frontier_backend: &fc_db::kv::Backend, + from: u64, + to: u64, + max_blocks: u64, +) -> Result +where + C: HeaderBackend, + BE: sp_blockchain::Backend, +{ + let mut written = 0u64; + for number in from..=to { + if written >= max_blocks { + break; + } + if frontier_backend + .mapping() + .block_hash_by_number(number)? + .is_some() + { + continue; + } + let block_number_native = number.saturated_into::<::Number>(); + let canonical_hash = match client.hash(block_number_native) { + Ok(Some(hash)) => hash, + Ok(None) => continue, + Err(e) => { + return Err(format!( + "failed to resolve canonical hash at #{number}: {e:?}" + )) + } + }; + if !frontier_backend.mapping().is_synced(&canonical_hash)? { + continue; + } + let header = match substrate_backend.header(canonical_hash) { + Ok(Some(header)) => header, + Ok(None) => continue, + Err(e) => { + return Err(format!( + "failed to load canonical header {canonical_hash:?} at #{number}: {e:?}" + )) + } + }; + let eth_block_hash = match fp_consensus::find_post_log(header.digest()) { + Ok(PostLog::Hashes(h)) => Some(h.block_hash), + Ok(PostLog::Block(block)) => Some(block.header.hash()), + Ok(PostLog::BlockHash(hash)) => Some(hash), + Err(_) => match fp_consensus::find_pre_log(header.digest()) { + Ok(PreLog::Block(block)) => Some(block.header.hash()), + Err(_) => None, + }, + }; + if let Some(eth_hash) = eth_block_hash { + frontier_backend + .mapping() + .set_block_hash_by_number(number, eth_hash)?; + + let has_block_mapping = frontier_backend + .mapping() + .block_hash(ð_hash)? + .map(|hashes| hashes.contains(&canonical_hash)) + .unwrap_or(false); + if !has_block_mapping { + let commitment = fc_db::kv::MappingCommitment:: { + block_hash: canonical_hash, + ethereum_block_hash: eth_hash, + ethereum_transaction_hashes: vec![], + }; + frontier_backend.mapping().write_hashes( + commitment, + number, + fc_db::kv::NumberMappingWrite::Skip, + )?; + } + + written += 1; + } + } + if written > 0 { + log::debug!( + target: "mapping-sync", + "Backfilled BLOCK_NUMBER_MAPPING for {written} blocks in #{from}..#{to}", + ); + } + Ok(written) +} + pub fn repair_canonical_number_mappings_batch>( client: &C, storage_override: &dyn StorageOverride, @@ -207,6 +350,7 @@ pub fn sync_one_block( storage_override: Arc>, frontier_backend: &fc_db::kv::Backend, sync_from: ::Number, + state_pruning_blocks: Option, strategy: SyncStrategy, sync_oracle: Arc, pubsub_notification_sinks: Arc< @@ -233,6 +377,12 @@ where current_syncing_tips.append(&mut leaves); } + let best_hash = client.info().best_hash; + if SyncStrategy::Parachain == strategy && !frontier_backend.mapping().is_synced(&best_hash)? { + // Add best block to current_syncing_tips + current_syncing_tips.push(best_hash); + } + let mut operating_header = None; while let Some(checking_tip) = current_syncing_tips.pop() { if let Some(checking_header) = fetch_header( @@ -267,7 +417,103 @@ where { return Ok(false); } + + // On pruned nodes: live state window is derived from finalized_number (not best), + // so we skip blocks below (finalized_number - pruning_blocks). That avoids + // depending on unfinalized chain and matches typical state-pruning semantics. + // Jump the syncing tip forward to the window floor, retaining any queued tips + // that are already within the live window (fork/reorg catch-up). + if let Some(pruning_blocks) = state_pruning_blocks { + let finalized_number_u64: u64 = client.info().finalized_number.unique_saturated_into(); + let live_window_start_u64 = finalized_number_u64.saturating_sub(pruning_blocks); + let sync_from_u64: u64 = sync_from.unique_saturated_into(); + let skip_to_u64 = live_window_start_u64.max(sync_from_u64); + let current_number_u64: u64 = (*operating_header.number()).unique_saturated_into(); + + if current_number_u64 < skip_to_u64 { + let skip_to_number = + skip_to_u64.saturated_into::<::Number>(); + match client.hash(skip_to_number) { + Ok(Some(skip_hash)) => { + log::warn!( + target: "mapping-sync", + "Pruned node: skipping blocks #{}..#{} (outside live state window), \ + jumping tip to #{}", + current_number_u64, + skip_to_u64.saturating_sub(1), + skip_to_u64, + ); + // Retain any tips still within the live window rather than + // discarding them — they may be unsynced fork branches that + // need indexing. Replace only the out-of-window tip with + // the skip target. + let mut retained = Vec::with_capacity(current_syncing_tips.len()); + for tip in current_syncing_tips.drain(..) { + match substrate_backend.blockchain().header(tip) { + Ok(Some(h)) => { + let n: u64 = (*h.number()).unique_saturated_into(); + if n >= skip_to_u64 { + retained.push(tip); + } + } + Ok(None) | Err(_) => { + retained.push(tip); + } + } + } + current_syncing_tips = retained; + current_syncing_tips.push(skip_hash); + frontier_backend + .meta() + .write_current_syncing_tips(current_syncing_tips)?; + + // Backfill BLOCK_NUMBER_MAPPING for already-synced + // canonical blocks in the live window so + // latest_block_hash() can find them (handles upgrade + // from old logic that always used Skip). Capped per + // call to avoid unbounded stall on heavily pruned nodes. + let best_number_u64: u64 = + client.info().best_number.unique_saturated_into(); + backfill_number_mappings( + client, + substrate_backend.blockchain(), + frontier_backend, + skip_to_u64, + best_number_u64, + BACKFILL_ON_SKIP_MAX_BLOCKS, + )?; + + return Ok(true); + } + Ok(None) => { + // Target block not yet known to the client (e.g. node still + // syncing headers). Return false to back off and retry later + // rather than falling through to sync a pruned block. + current_syncing_tips.push(operating_header.hash()); + frontier_backend + .meta() + .write_current_syncing_tips(current_syncing_tips)?; + return Ok(false); + } + Err(e) => { + // Transient client error. Back off and retry rather than + // falling through to sync a pruned block. + log::warn!( + target: "mapping-sync", + "Pruned node: failed to resolve skip target #{skip_to_u64}: {e:?}; will retry.", + ); + current_syncing_tips.push(operating_header.hash()); + frontier_backend + .meta() + .write_current_syncing_tips(current_syncing_tips)?; + return Ok(false); + } + } + } + } + sync_block( + client, storage_override.clone(), frontier_backend, &operating_header, @@ -278,6 +524,16 @@ where .meta() .write_current_syncing_tips(current_syncing_tips)?; } + + // Reconcile the most recent window of blocks. + canonical_reconciler::reconcile_recent_window( + client, + storage_override.as_ref(), + frontier_backend, + sync_from, + PERIODIC_RECONCILE_WINDOW, + )?; + // Notify on import and remove closed channels using the unified notification mechanism. let hash = operating_header.hash(); // Use the `is_new_best` status from import time if available. @@ -288,6 +544,7 @@ where let is_new_best = best_info.is_some() || client.info().best_hash == hash; let reorg_info = best_info.and_then(|info| info.reorg_info); + // Reorg-aware reconcile only when this block was actually new-best at import. if is_new_best { let reconcile_stats = canonical_reconciler::reconcile_reorg_window( client, @@ -323,6 +580,7 @@ pub fn sync_blocks( frontier_backend: &fc_db::kv::Backend, limit: usize, sync_from: ::Number, + state_pruning_blocks: Option, strategy: SyncStrategy, sync_oracle: Arc, pubsub_notification_sinks: Arc< @@ -346,6 +604,7 @@ where storage_override.clone(), frontier_backend, sync_from, + state_pruning_blocks, strategy, sync_oracle.clone(), pubsub_notification_sinks.clone(), @@ -393,7 +652,9 @@ mod tests { use ethereum_types::{Address, H256, U256}; use fc_storage::StorageOverride; use fp_rpc::TransactionStatus; + use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA}; use sc_block_builder::BlockBuilderBuilder; + use scale_codec::Encode; use sp_blockchain::HeaderBackend as _; use sp_consensus::BlockOrigin; use sp_runtime::{ @@ -403,12 +664,23 @@ mod tests { }; use substrate_test_runtime_client::{ BlockBuilderExt, ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilder, + TestClientBuilderExt, }; use tempfile::tempdir; - use crate::ReorgInfo; + use sp_runtime::generic::DigestItem; - use super::{canonical_reconciler, repair_canonical_number_mappings_batch}; + use super::{canonical_reconciler, repair_canonical_number_mappings_batch, sync_one_block}; + use crate::{ + EthereumBlockNotification, EthereumBlockNotificationSinks, ReorgInfo, SyncStrategy, + }; + + fn ethereum_digest_item_for(eth_block: ðereum::BlockV3) -> DigestItem { + DigestItem::Consensus( + fp_consensus::FRONTIER_ENGINE_ID, + fp_consensus::PostLog::BlockHash(eth_block.header.hash()).encode(), + ) + } type OpaqueBlock = sp_runtime::generic::Block< Header, @@ -462,7 +734,50 @@ mod tests { } } + /// Stub SyncOracle for tests that call sync_one_block (not syncing, not offline). + struct TestSyncOracleNotSyncing; + impl sp_consensus::SyncOracle for TestSyncOracleNotSyncing { + fn is_major_syncing(&self) -> bool { + false + } + fn is_offline(&self) -> bool { + false + } + } + fn make_ethereum_block(seed: u64) -> ethereum::BlockV3 { + make_ethereum_block_inner(seed, vec![]) + } + + fn make_ethereum_block_with_txs(seed: u64, num_txs: u64) -> ethereum::BlockV3 { + let txs: Vec = (0..num_txs) + .map(|i| { + let sig = ethereum::legacy::TransactionSignature::new( + 27, + H256::from_low_u64_be(seed.saturating_add(i).saturating_add(1)), + H256::from_low_u64_be(seed.saturating_add(i).saturating_add(2)), + ) + .expect("valid signature"); + ethereum::TransactionV3::Legacy(ethereum::LegacyTransaction { + nonce: U256::from(i), + gas_price: U256::from(1), + gas_limit: U256::from(21000), + action: ethereum::TransactionAction::Call( + ethereum_types::H160::from_low_u64_be(seed), + ), + value: U256::zero(), + input: vec![], + signature: sig, + }) + }) + .collect(); + make_ethereum_block_inner(seed, txs) + } + + fn make_ethereum_block_inner( + seed: u64, + transactions: Vec, + ) -> ethereum::BlockV3 { let partial_header = PartialHeader { parent_hash: H256::from_low_u64_be(seed), beneficiary: ethereum_types::H160::from_low_u64_be(seed), @@ -478,7 +793,7 @@ mod tests { mix_hash: H256::from_low_u64_be(seed.saturating_add(3)), nonce: ethereum_types::H64::from_low_u64_be(seed), }; - ethereum::Block::new(partial_header, vec![], vec![]) + ethereum::Block::new(partial_header, transactions, vec![]) } struct SelectiveStorageOverride { @@ -1066,4 +1381,520 @@ mod tests { "second batch should continue backward" ); } + + /// After a pruning skip, tips that are within the live window (>= skip_to) must be + /// retained so fork/reorg catch-up can continue. Window is derived from finalized_number. + #[test] + fn pruning_skip_retains_in_window_tips() { + let tmp = tempdir().expect("create temp dir"); + let builder = TestClientBuilder::new().add_extra_storage( + PALLET_ETHEREUM_SCHEMA.to_vec(), + Encode::encode(&EthereumStorageSchema::V3), + ); + let backend = builder.backend(); + let (client, _) = + builder.build_with_native_executor::(None); + let client = Arc::new(client); + + let frontier_backend = fc_db::kv::Backend::::new( + client.clone(), + &fc_db::kv::DatabaseSettings { + #[cfg(feature = "rocksdb")] + source: sc_client_db::DatabaseSource::RocksDb { + path: tmp.path().to_path_buf(), + cache_size: 0, + }, + #[cfg(not(feature = "rocksdb"))] + source: sc_client_db::DatabaseSource::ParityDb { + path: tmp.path().to_path_buf(), + }, + }, + ) + .expect("frontier backend"); + + // Build chain 0..=10 and finalize so finalized_number = 10. + let mut chain_info = client.chain_info(); + for _ in 1..=10 { + let mut block_builder = BlockBuilderBuilder::new(client.as_ref()) + .on_parent_block(chain_info.best_hash) + .with_parent_block_number(chain_info.best_number) + .build() + .expect("build block"); + block_builder + .push_storage_change(vec![1], None) + .expect("push storage change"); + let block = block_builder.build().expect("build block").block; + futures::executor::block_on(client.import_as_final(BlockOrigin::Own, block)) + .expect("import as final"); + chain_info = client.chain_info(); + } + assert!( + chain_info.finalized_number >= 10, + "finalized number for pruning test" + ); + + let hash_1 = client.hash(1).expect("hash").expect("block 1 exists"); + let hash_2 = client.hash(2).expect("hash").expect("block 2 exists"); + let hash_5 = client.hash(5).expect("hash").expect("block 5 exists"); + + // Tips: one below window (1), one in window (5). With state_pruning_blocks=8, + // live_window_start = 10 - 8 = 2, skip_to = 2. Order so the below-window tip is popped + // first (sync_one_block pops from the end): [in_window, below_window]. + frontier_backend + .meta() + .write_current_syncing_tips(vec![hash_5, hash_1]) + .expect("write tips"); + + let storage_override: Arc> = + Arc::new(NoopStorageOverride); + let sync_oracle: Arc = + Arc::new(TestSyncOracleNotSyncing); + let pubsub_sinks: Arc< + EthereumBlockNotificationSinks>, + > = Arc::new(Default::default()); + let mut best_at_import = HashMap::new(); + + let did_sync = sync_one_block( + client.as_ref(), + &backend, + storage_override, + &frontier_backend, + 0, + Some(8), + SyncStrategy::Normal, + sync_oracle, + pubsub_sinks, + &mut best_at_import, + ) + .expect("sync_one_block"); + assert!(did_sync, "skip path should run and return true"); + + let tips = frontier_backend + .meta() + .current_syncing_tips() + .expect("read tips"); + assert!( + tips.contains(&hash_5), + "in-window tip (block 5) must be retained after skip; tips={tips:?}", + ); + assert!( + tips.contains(&hash_2), + "skip target (block 2) must be in tips after skip; tips={tips:?}", + ); + } + + /// Reconciler None branch: when a block has SYNCED_MAPPING + BLOCK_NUMBER_MAPPING + /// but no BLOCK_MAPPING (the old write_none + backfill path), the reconciler must + /// verify the eth hash from the header digest and repair BLOCK_MAPPING so + /// indexed_canonical_hash_at() can resolve the block. + #[test] + fn reconciler_repairs_missing_block_mapping_on_pruned_blocks() { + let tmp = tempdir().expect("create temp dir"); + let (client, _) = TestClientBuilder::new() + .build_with_native_executor::( + None, + ); + let client = Arc::new(client); + + let frontier_backend = fc_db::kv::Backend::::new( + client.clone(), + &fc_db::kv::DatabaseSettings { + #[cfg(feature = "rocksdb")] + source: sc_client_db::DatabaseSource::RocksDb { + path: tmp.path().to_path_buf(), + cache_size: 0, + }, + #[cfg(not(feature = "rocksdb"))] + source: sc_client_db::DatabaseSource::ParityDb { + path: tmp.path().to_path_buf(), + }, + }, + ) + .expect("frontier backend"); + + let eth_block = make_ethereum_block(1); + let eth_hash = eth_block.header.hash(); + + let chain = client.chain_info(); + let mut builder = BlockBuilderBuilder::new(client.as_ref()) + .on_parent_block(chain.best_hash) + .with_parent_block_number(chain.best_number) + .build() + .expect("build block 1"); + builder + .push_deposit_log_digest_item(ethereum_digest_item_for(ð_block)) + .expect("push ethereum digest"); + let block = builder.build().expect("build block").block; + futures::executor::block_on(client.import_as_final(BlockOrigin::Own, block)) + .expect("import block"); + + let canonical_hash = client + .hash(1) + .expect("query canonical hash") + .expect("canonical hash"); + + // Simulate old write_none path: only SYNCED_MAPPING is set. + frontier_backend + .mapping() + .write_none(canonical_hash) + .expect("write_none"); + + // Simulate backfill: BLOCK_NUMBER_MAPPING is set, but BLOCK_MAPPING is NOT. + frontier_backend + .mapping() + .set_block_hash_by_number(1, eth_hash) + .expect("set block hash by number"); + + // Sanity: BLOCK_MAPPING must be absent. + assert_eq!( + frontier_backend.mapping().block_hash(ð_hash), + Ok(None), + "BLOCK_MAPPING must be absent before reconciler runs" + ); + + // Run reconciler with NoopStorageOverride (state unavailable → hits None branch). + let stats = canonical_reconciler::reconcile_from_cursor_batch( + client.as_ref(), + &NoopStorageOverride, + &frontier_backend, + 1, + 1, + ) + .expect("reconcile") + .expect("stats"); + + // BLOCK_MAPPING must now contain the canonical hash. + let block_mapping = frontier_backend + .mapping() + .block_hash(ð_hash) + .expect("read BLOCK_MAPPING"); + assert!( + block_mapping + .as_ref() + .is_some_and(|hashes| hashes.contains(&canonical_hash)), + "reconciler must repair BLOCK_MAPPING; got {block_mapping:?}" + ); + assert_eq!(stats.updated, 1, "reconciler must report 1 update"); + } + + /// Reconciler None branch: when BLOCK_NUMBER_MAPPING holds a stale eth hash + /// (e.g. after a reorg), the reconciler must re-derive the correct eth hash + /// from the header digest, correct BLOCK_NUMBER_MAPPING, and write BLOCK_MAPPING + /// with the verified hash — not the stale one. + #[test] + fn reconciler_corrects_stale_block_number_mapping_after_reorg() { + let tmp = tempdir().expect("create temp dir"); + let (client, _) = TestClientBuilder::new() + .build_with_native_executor::( + None, + ); + let client = Arc::new(client); + + let frontier_backend = fc_db::kv::Backend::::new( + client.clone(), + &fc_db::kv::DatabaseSettings { + #[cfg(feature = "rocksdb")] + source: sc_client_db::DatabaseSource::RocksDb { + path: tmp.path().to_path_buf(), + cache_size: 0, + }, + #[cfg(not(feature = "rocksdb"))] + source: sc_client_db::DatabaseSource::ParityDb { + path: tmp.path().to_path_buf(), + }, + }, + ) + .expect("frontier backend"); + + let correct_eth_block = make_ethereum_block(1); + let correct_eth_hash = correct_eth_block.header.hash(); + let stale_eth_hash = make_ethereum_block(99).header.hash(); + assert_ne!(correct_eth_hash, stale_eth_hash); + + let chain = client.chain_info(); + let mut builder = BlockBuilderBuilder::new(client.as_ref()) + .on_parent_block(chain.best_hash) + .with_parent_block_number(chain.best_number) + .build() + .expect("build block 1"); + builder + .push_deposit_log_digest_item(ethereum_digest_item_for(&correct_eth_block)) + .expect("push ethereum digest"); + let block = builder.build().expect("build block").block; + futures::executor::block_on(client.import_as_final(BlockOrigin::Own, block)) + .expect("import block"); + + let canonical_hash = client + .hash(1) + .expect("query canonical hash") + .expect("canonical hash"); + + // Simulate a stale BLOCK_NUMBER_MAPPING from a pre-reorg fork. + frontier_backend + .mapping() + .set_block_hash_by_number(1, stale_eth_hash) + .expect("set stale block hash by number"); + + // Run reconciler with NoopStorageOverride (state unavailable → hits None branch). + let stats = canonical_reconciler::reconcile_from_cursor_batch( + client.as_ref(), + &NoopStorageOverride, + &frontier_backend, + 1, + 1, + ) + .expect("reconcile") + .expect("stats"); + + // BLOCK_NUMBER_MAPPING must now hold the correct (digest-derived) eth hash. + assert_eq!( + frontier_backend.mapping().block_hash_by_number(1), + Ok(Some(correct_eth_hash)), + "reconciler must correct stale BLOCK_NUMBER_MAPPING to digest-derived hash" + ); + + // BLOCK_MAPPING must map the correct eth hash to the canonical substrate hash. + let block_mapping = frontier_backend + .mapping() + .block_hash(&correct_eth_hash) + .expect("read BLOCK_MAPPING for correct hash"); + assert!( + block_mapping + .as_ref() + .is_some_and(|hashes| hashes.contains(&canonical_hash)), + "BLOCK_MAPPING must use the verified eth hash; got {block_mapping:?}" + ); + + // The stale eth hash must NOT have a BLOCK_MAPPING pointing to the canonical hash. + let stale_mapping = frontier_backend + .mapping() + .block_hash(&stale_eth_hash) + .expect("read BLOCK_MAPPING for stale hash"); + assert!( + !stale_mapping + .as_ref() + .is_some_and(|hashes| hashes.contains(&canonical_hash)), + "stale eth hash must not be mapped to canonical substrate hash; got {stale_mapping:?}" + ); + + assert!(stats.updated >= 1, "reconciler must report updates"); + } + + /// When the reconciler encounters an unsynced block and state is available, + /// it must write BLOCK_MAPPING with the canonical hash so the block becomes + /// resolvable by indexed_canonical_hash_at / latest_block_hash. + #[test] + fn reconciler_writes_block_mapping_for_unsynced_blocks() { + let tmp = tempdir().expect("create temp dir"); + let (client, _) = TestClientBuilder::new() + .build_with_native_executor::( + None, + ); + let client = Arc::new(client); + + let frontier_backend = fc_db::kv::Backend::::new( + client.clone(), + &fc_db::kv::DatabaseSettings { + #[cfg(feature = "rocksdb")] + source: sc_client_db::DatabaseSource::RocksDb { + path: tmp.path().to_path_buf(), + cache_size: 0, + }, + #[cfg(not(feature = "rocksdb"))] + source: sc_client_db::DatabaseSource::ParityDb { + path: tmp.path().to_path_buf(), + }, + }, + ) + .expect("frontier backend"); + + let chain = client.chain_info(); + let mut builder = BlockBuilderBuilder::new(client.as_ref()) + .on_parent_block(chain.best_hash) + .with_parent_block_number(chain.best_number) + .build() + .expect("build block 1"); + builder + .push_storage_change(vec![1], None) + .expect("push storage change"); + let block = builder.build().expect("build block").block; + futures::executor::block_on(client.import_as_final(BlockOrigin::Own, block)) + .expect("import block"); + + let canonical_hash = client + .hash(1) + .expect("query canonical hash") + .expect("canonical hash"); + let eth_block = make_ethereum_block(1); + let eth_hash = eth_block.header.hash(); + let storage_override = SelectiveStorageOverride { + blocks: HashMap::from([(canonical_hash, eth_block)]), + }; + + // Block is completely unsynced: no SYNCED, no BLOCK_MAPPING, no BLOCK_NUMBER_MAPPING. + assert_eq!( + frontier_backend.mapping().is_synced(&canonical_hash), + Ok(false), + ); + assert_eq!(frontier_backend.mapping().block_hash(ð_hash), Ok(None)); + assert_eq!(frontier_backend.mapping().block_hash_by_number(1), Ok(None)); + + // Run reconciler with state available. + let stats = canonical_reconciler::reconcile_from_cursor_batch( + client.as_ref(), + &storage_override, + &frontier_backend, + 1, + 1, + ) + .expect("reconcile") + .expect("stats"); + + // BLOCK_MAPPING must now contain the canonical hash. + let block_mapping = frontier_backend + .mapping() + .block_hash(ð_hash) + .expect("read BLOCK_MAPPING"); + assert!( + block_mapping + .as_ref() + .is_some_and(|hashes| hashes.contains(&canonical_hash)), + "reconciler must write BLOCK_MAPPING for unsynced blocks; got {block_mapping:?}" + ); + + // BLOCK_NUMBER_MAPPING must be set. + assert_eq!( + frontier_backend.mapping().block_hash_by_number(1), + Ok(Some(eth_hash)), + "reconciler must write BLOCK_NUMBER_MAPPING" + ); + + // is_synced must now be true (write_hashes sets SYNCED_MAPPING). + assert_eq!( + frontier_backend.mapping().is_synced(&canonical_hash), + Ok(true), + "block must be marked as synced after reconciliation" + ); + + assert_eq!(stats.updated, 1); + assert!(stats.highest_reconciled.is_some()); + } + + /// When a block was synced with empty tx hashes (pruned-state path), the reconciler + /// must repair TRANSACTION_MAPPING when state becomes available. This ensures + /// eth_getTransactionByHash works for blocks that were initially synced without state. + #[test] + fn reconciler_repairs_missing_transaction_mapping_on_pruned_blocks() { + let tmp = tempdir().expect("create temp dir"); + let (client, _) = TestClientBuilder::new() + .build_with_native_executor::( + None, + ); + let client = Arc::new(client); + + let frontier_backend = fc_db::kv::Backend::::new( + client.clone(), + &fc_db::kv::DatabaseSettings { + #[cfg(feature = "rocksdb")] + source: sc_client_db::DatabaseSource::RocksDb { + path: tmp.path().to_path_buf(), + cache_size: 0, + }, + #[cfg(not(feature = "rocksdb"))] + source: sc_client_db::DatabaseSource::ParityDb { + path: tmp.path().to_path_buf(), + }, + }, + ) + .expect("frontier backend"); + + let eth_block = make_ethereum_block_with_txs(1, 2); + let eth_hash = eth_block.header.hash(); + let tx_hashes: Vec = eth_block.transactions.iter().map(|tx| tx.hash()).collect(); + assert_eq!(tx_hashes.len(), 2); + + let chain = client.chain_info(); + let mut builder = BlockBuilderBuilder::new(client.as_ref()) + .on_parent_block(chain.best_hash) + .with_parent_block_number(chain.best_number) + .build() + .expect("build block 1"); + builder + .push_storage_change(vec![1], None) + .expect("push storage change"); + let block = builder.build().expect("build block").block; + futures::executor::block_on(client.import_as_final(BlockOrigin::Own, block)) + .expect("import block"); + + let canonical_hash = client + .hash(1) + .expect("query canonical hash") + .expect("canonical hash"); + + // Simulate pruned-state sync: write_hashes with empty tx list. + // This writes BLOCK_MAPPING, BLOCK_NUMBER_MAPPING, SYNCED_MAPPING but + // no TRANSACTION_MAPPING entries. + let minimal_commitment = fc_db::kv::MappingCommitment:: { + block_hash: canonical_hash, + ethereum_block_hash: eth_hash, + ethereum_transaction_hashes: vec![], + }; + frontier_backend + .mapping() + .write_hashes(minimal_commitment, 1, fc_db::kv::NumberMappingWrite::Write) + .expect("write minimal commitment"); + + // Sanity: BLOCK_MAPPING exists, TRANSACTION_MAPPING is empty. + assert!( + frontier_backend + .mapping() + .block_hash(ð_hash) + .expect("read BLOCK_MAPPING") + .is_some_and(|hashes| hashes.contains(&canonical_hash)), + "BLOCK_MAPPING must exist" + ); + assert!( + frontier_backend + .mapping() + .transaction_metadata(&tx_hashes[0]) + .expect("read tx metadata") + .is_empty(), + "TRANSACTION_MAPPING must be empty before repair" + ); + + // Run reconciler with state now available (SelectiveStorageOverride + // returns the ethereum block with transactions). + let storage_override = SelectiveStorageOverride { + blocks: HashMap::from([(canonical_hash, eth_block)]), + }; + let stats = canonical_reconciler::reconcile_from_cursor_batch( + client.as_ref(), + &storage_override, + &frontier_backend, + 1, + 1, + ) + .expect("reconcile") + .expect("stats"); + + // TRANSACTION_MAPPING must now be populated for both transactions. + for (i, tx_hash) in tx_hashes.iter().enumerate() { + let metadata = frontier_backend + .mapping() + .transaction_metadata(tx_hash) + .expect("read tx metadata"); + assert!( + metadata + .iter() + .any(|m| m.substrate_block_hash == canonical_hash + && m.ethereum_index == i as u32), + "tx {i} ({tx_hash:?}) must have TRANSACTION_MAPPING for canonical block; got {metadata:?}" + ); + } + + assert_eq!( + stats.updated, 1, + "reconciler must report 1 update for tx repair" + ); + } } diff --git a/client/mapping-sync/src/kv/worker.rs b/client/mapping-sync/src/kv/worker.rs index 98a0afaa1e..df2755f629 100644 --- a/client/mapping-sync/src/kv/worker.rs +++ b/client/mapping-sync/src/kv/worker.rs @@ -60,6 +60,10 @@ pub struct MappingSyncWorker { have_next: bool, retry_times: usize, sync_from: ::Number, + /// If set, blocks below the live state window (finalized_number - state_pruning_blocks) + /// are skipped during catch-up so the sync tip does not get stuck behind pruned state. + /// Must match the node's state pruning depth (e.g. from config.state_pruning). + state_pruning_blocks: Option, strategy: SyncStrategy, sync_oracle: Arc, @@ -86,6 +90,7 @@ impl MappingSyncWorker { frontier_backend: Arc>, retry_times: usize, sync_from: ::Number, + state_pruning_blocks: Option, strategy: SyncStrategy, sync_oracle: Arc, pubsub_notification_sinks: Arc< @@ -105,6 +110,7 @@ impl MappingSyncWorker { have_next: true, retry_times, sync_from, + state_pruning_blocks, strategy, sync_oracle, @@ -183,6 +189,7 @@ where self.frontier_backend.as_ref(), self.retry_times, self.sync_from, + self.state_pruning_blocks, self.strategy, self.sync_oracle.clone(), self.pubsub_notification_sinks.clone(), @@ -195,16 +202,28 @@ where match result { Ok(have_next) => { if !have_next { - if let Err(e) = super::canonical_reconciler::reconcile_from_cursor_batch( + if let Err(e) = super::canonical_reconciler::reconcile_recent_window( self.client.as_ref(), self.storage_override.as_ref(), self.frontier_backend.as_ref(), self.sync_from, - super::CANONICAL_NUMBER_REPAIR_BATCH_SIZE, + super::PERIODIC_RECONCILE_WINDOW, ) { debug!( target: "reconcile", - "Batch canonical reconcile failed with error {e:?}, retrying." + "Recent window reconcile failed: {e:?}", + ); + } + if let Err(e) = super::repair_canonical_number_mappings_batch( + self.client.as_ref(), + self.storage_override.as_ref(), + self.frontier_backend.as_ref(), + self.sync_from, + super::CURSOR_REPAIR_IDLE_BATCH, + ) { + debug!( + target: "reconcile", + "Cursor repair batch failed: {e:?}", ); } } @@ -346,6 +365,7 @@ mod tests { frontier_backend, 3, 0, + None, SyncStrategy::Normal, Arc::new(test_sync_oracle), pubsub_notification_sinks_inner, @@ -493,6 +513,7 @@ mod tests { frontier_backend, 3, 0, + None, SyncStrategy::Normal, Arc::new(test_sync_oracle), pubsub_notification_sinks_inner, diff --git a/client/mapping-sync/src/sql/mod.rs b/client/mapping-sync/src/sql/mod.rs index 7a31c46ea0..2cf45ec635 100644 --- a/client/mapping-sync/src/sql/mod.rs +++ b/client/mapping-sync/src/sql/mod.rs @@ -1084,19 +1084,27 @@ mod test { futures_timer::Delay::new(Duration::from_millis(100)).await; } - // Test the reorged chain is correctly indexed. - let res = sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks") - .fetch_all(&pool) - .await - .expect("test query result") - .iter() - .map(|row| { - let substrate_block_hash = H256::from_slice(&row.get::, _>(0)[..]); - let is_canon = row.get::(1); - let block_number = row.get::(2); - (substrate_block_hash, is_canon, block_number) - }) - .collect::>(); + // Wait for the indexer to process all 20 blocks (async worker may lag). + let timeout = std::time::Instant::now() + Duration::from_secs(5); + let res = loop { + let rows = + sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks") + .fetch_all(&pool) + .await + .expect("test query result") + .iter() + .map(|row| { + let substrate_block_hash = H256::from_slice(&row.get::, _>(0)[..]); + let is_canon = row.get::(1); + let block_number = row.get::(2); + (substrate_block_hash, is_canon, block_number) + }) + .collect::>(); + if rows.len() == 20 || std::time::Instant::now() >= timeout { + break rows; + } + futures_timer::Delay::new(Duration::from_millis(50)).await; + }; // 20 blocks in total assert_eq!(res.len(), 20); diff --git a/client/rpc/src/eth/mod.rs b/client/rpc/src/eth/mod.rs index 427d0878dd..21d92860b5 100644 --- a/client/rpc/src/eth/mod.rs +++ b/client/rpc/src/eth/mod.rs @@ -259,6 +259,28 @@ where .ok_or_else(|| internal_err("Block number not found for latest indexed block"))? .unique_saturated_into(); + // Fast path: if the latest indexed block itself is readable, use it + // directly. This avoids the cache returning a stale older block when + // the chain tip has advanced. + if self + .storage_override + .current_block(latest_indexed_hash) + .is_some() + { + self.last_readable_latest + .lock() + .map_err(|_| internal_err("last_readable_latest lock poisoned"))? + .replace(latest_indexed_hash); + log::debug!( + target: "rpc", + "latest readable selection cache_hit=false bounded_hit=false exhaustive_hit=false full_miss=false bounded_scanned_hops=0 exhaustive_scanned_hops=0 limit={}", + self.latest_readable_scan_limit, + ); + return Ok(latest_indexed_hash); + } + + // The latest indexed block isn't readable (state pruned or not yet + // available). Fall back to the cache or scan for an older readable block. let cached_hash = *self .last_readable_latest .lock() diff --git a/template/node/src/eth.rs b/template/node/src/eth.rs index bd3a99838e..58b4043df5 100644 --- a/template/node/src/eth.rs +++ b/template/node/src/eth.rs @@ -146,6 +146,7 @@ pub async fn spawn_frontier_tasks( storage_override: Arc>, fee_history_cache: FeeHistoryCache, fee_history_cache_limit: FeeHistoryCacheLimit, + state_pruning_blocks: Option, sync: Arc>, pubsub_notification_sinks: Arc< fc_mapping_sync::EthereumBlockNotificationSinks< @@ -174,6 +175,7 @@ pub async fn spawn_frontier_tasks( b.clone(), 3, 0u32.into(), + state_pruning_blocks, fc_mapping_sync::SyncStrategy::Normal, sync, pubsub_notification_sinks, diff --git a/template/node/src/service.rs b/template/node/src/service.rs index 98a963f16f..5848093c09 100644 --- a/template/node/src/service.rs +++ b/template/node/src/service.rs @@ -501,6 +501,16 @@ where }) }; + // Derive state_pruning_blocks from the node's --state-pruning so the mapping-sync + // worker can skip past pruned blocks during catch-up (KV backend only). + let state_pruning_blocks = config.state_pruning.as_ref().and_then(|mode| { + if let sc_service::PruningMode::Constrained(c) = mode { + c.max_blocks.map(u64::from) + } else { + None + } + }); + let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams { config, client: client.clone(), @@ -526,6 +536,7 @@ where storage_override, fee_history_cache, fee_history_cache_limit, + state_pruning_blocks, sync_service.clone(), pubsub_notification_sinks, ) diff --git a/ts-tests/tests/test-pruning-skip.ts b/ts-tests/tests/test-pruning-skip.ts new file mode 100644 index 0000000000..52666b37bd --- /dev/null +++ b/ts-tests/tests/test-pruning-skip.ts @@ -0,0 +1,62 @@ +import { expect } from "chai"; +import { step } from "mocha-steps"; + +import { + createAndFinalizeBlock, + createAndFinalizeBlockNowait, + describeWithFrontier, + customRequest, + waitForBlock, +} from "./util"; + +// Integration test for KV mapping-sync pruning-skip behavior. When state pruning +// is enabled, the sync worker may have tips behind the live window (finalized - N). +// The worker skips those tips and continues from the window start; in-window tips +// must be retained so sync catches up. This test runs the node with --state-pruning +// and asserts that "latest" stays valid and catches up after a burst of blocks. +const STATE_PRUNING_BLOCKS = 64; +const BLOCKS_PAST_WINDOW = 80; +const BURST_SIZE = 24; + +describeWithFrontier( + "Frontier KV mapping-sync (pruning skip / tip retention)", + (context) => { + step("should index genesis with state pruning enabled", async function () { + const block = (await customRequest(context.web3, "eth_getBlockByNumber", ["0x0", false])).result; + expect(block).to.not.be.null; + }); + + step("should produce blocks past the pruning window and finalize", async function () { + this.timeout(120_000); + const start = Number(await context.web3.eth.getBlockNumber()); + for (let i = 0; i < BLOCKS_PAST_WINDOW - start; i++) { + await createAndFinalizeBlock(context.web3); + } + const end = Number(await context.web3.eth.getBlockNumber()); + expect(end).to.be.gte(BLOCKS_PAST_WINDOW); + }); + + step("should keep latest non-null and catch up after a burst of blocks", async function () { + this.timeout(60_000); + const indexedBefore = Number(await context.web3.eth.getBlockNumber()); + for (let i = 0; i < BURST_SIZE; i++) { + await createAndFinalizeBlockNowait(context.web3); + } + const expectedIndexed = indexedBefore + BURST_SIZE; + const expectedTag = "0x" + expectedIndexed.toString(16); + + // During lag, latest should still be non-null (worker may skip pruned tips). + const latestDuring = (await customRequest(context.web3, "eth_getBlockByNumber", ["latest", false])).result; + expect(latestDuring).to.not.be.null; + expect(parseInt(latestDuring.number, 16)).to.be.at.most(expectedIndexed); + + // Catch-up: wait for the last block to be indexed. + await waitForBlock(context.web3, expectedTag, 30_000); + const latestAfter = (await customRequest(context.web3, "eth_getBlockByNumber", ["latest", false])).result; + expect(latestAfter).to.not.be.null; + expect(parseInt(latestAfter.number, 16)).to.equal(expectedIndexed); + }); + }, + undefined, + ["--state-pruning", String(STATE_PRUNING_BLOCKS)] +); diff --git a/ts-tests/tests/test-reorg-block-consistency.ts b/ts-tests/tests/test-reorg-block-consistency.ts new file mode 100644 index 0000000000..9946b64035 --- /dev/null +++ b/ts-tests/tests/test-reorg-block-consistency.ts @@ -0,0 +1,153 @@ +import { expect } from "chai"; +import { step } from "mocha-steps"; + +import { createAndFinalizeBlock, describeWithFrontier, customRequest, waitForBlock } from "./util"; + +import { GENESIS_ACCOUNT, GENESIS_ACCOUNT_PRIVATE_KEY } from "./config"; + +// Integration test: after a reorg, eth_getBlockByNumber must return the winning +// fork's block content and transactions from the losing fork must not reference +// the old (reorged-out) block. +describeWithFrontier("Frontier RPC (Reorg Block & Transaction Consistency)", (context) => { + async function createBlock(finalize: boolean = true, parentHash: string | null = null): Promise { + const response = await customRequest(context.web3, "engine_createBlock", [true, finalize, parentHash]); + if (!response.result?.hash) { + throw new Error(`Unexpected result: ${JSON.stringify(response)}`); + } + return response.result.hash as string; + } + + async function sendTransfer(nonce: number, value: string = "0x1"): Promise { + const tx = await context.web3.eth.accounts.signTransaction( + { + from: GENESIS_ACCOUNT, + to: "0x0000000000000000000000000000000000000001", + value, + gas: "0x5208", + gasPrice: "0x3B9ACA00", + nonce, + }, + GENESIS_ACCOUNT_PRIVATE_KEY + ); + const result = await customRequest(context.web3, "eth_sendRawTransaction", [tx.rawTransaction]); + return result.result as string; + } + + step("eth_getBlockByNumber should return the winning fork's block after a reorg", async function () { + this.timeout(60_000); + + // Advance a few blocks to have a stable starting point. + for (let i = 0; i < 3; i++) { + await createAndFinalizeBlock(context.web3); + } + + // Capture the current best block number BEFORE creating any forks. + const tipBlock = (await customRequest(context.web3, "eth_getBlockByNumber", ["latest", false])).result; + expect(tipBlock).to.not.be.null; + const tipNumber = parseInt(tipBlock.number, 16); + + // Create the fork anchor (non-finalized). It returns a substrate hash + // that we can use as the parent for competing branches. + const anchor = await createBlock(false); + const reorgHeight = tipNumber + 2; + + // --- Fork A: 2 blocks --- + const a1 = await createBlock(false, anchor); + await createBlock(false, a1); + + // Wait for fork A to be visible at the reorg height, then capture + // the Ethereum block hash (which should belong to fork A right now). + const reorgHeightHex = "0x" + reorgHeight.toString(16); + await waitForBlock(context.web3, reorgHeightHex, 10_000); + const ethBlockForkA = (await customRequest(context.web3, "eth_getBlockByNumber", [reorgHeightHex, false])) + .result; + expect(ethBlockForkA).to.not.be.null; + const ethHashForkA = ethBlockForkA.hash as string; + // The parent of fork A's block at the reorg height is the anchor's Ethereum hash. + // Fork B's block at the same height will share the same Ethereum parent. + const anchorEthHash = ethBlockForkA.parentHash as string; + + // --- Fork B: 3 blocks (longer, wins the reorg) --- + const b1 = await createBlock(false, anchor); + const b2 = await createBlock(false, b1); + await createBlock(false, b2); + + // Wait for fork B to become canonical (longer chain). + const expectedHead = "0x" + (tipNumber + 4).toString(16); + await waitForBlock(context.web3, expectedHead, 20_000); + + // After the reorg, the Ethereum block at the reorg height must differ. + const ethBlockAfterReorg = (await customRequest(context.web3, "eth_getBlockByNumber", [reorgHeightHex, false])) + .result; + + expect(ethBlockAfterReorg).to.not.be.null; + expect(ethBlockAfterReorg.hash).to.not.equal( + ethHashForkA, + "block hash at the reorg height must change to fork B's block" + ); + // Both forks share the same parent (the anchor), so parentHash must be + // the anchor's Ethereum hash. + expect(ethBlockAfterReorg.parentHash).to.equal( + anchorEthHash, + "parent hash must still reference the common ancestor after reorg" + ); + }); + + step("transactions from the losing fork should not reference the old block after a reorg", async function () { + this.timeout(60_000); + + // Capture the current chain tip before forking. + const tipNumber = Number(await context.web3.eth.getBlockNumber()); + + // Create the fork anchor (non-finalized). + const anchor = await createBlock(false); + const a1Height = tipNumber + 2; + + const startNonce = Number(await context.web3.eth.getTransactionCount(GENESIS_ACCOUNT)); + + // Send a tx that will be included in fork A. + const forkATxHash = await sendTransfer(startNonce, "0x1"); + + // --- Fork A: include the tx in 1 block --- + await createBlock(false, anchor); + + // Wait for fork A's block to be indexed so the tx is visible. + const a1HeightHex = "0x" + a1Height.toString(16); + await waitForBlock(context.web3, a1HeightHex, 10_000); + + // Capture the Ethereum block hash where our tx landed in fork A. + const a1EthBlock = (await customRequest(context.web3, "eth_getBlockByNumber", [a1HeightHex, false])).result; + const a1EthHash = a1EthBlock.hash as string; + + // Verify the tx is retrievable while fork A is canonical. + const txBeforeReorg = (await customRequest(context.web3, "eth_getTransactionByHash", [forkATxHash])).result; + expect(txBeforeReorg).to.not.be.null; + expect(txBeforeReorg.hash).to.equal(forkATxHash); + expect(txBeforeReorg.blockHash).to.equal(a1EthHash); + + // --- Fork B: 2 blocks (longer, no matching tx) --- + const b1 = await createBlock(false, anchor); + await createBlock(false, b1); + + // Wait for fork B to become canonical (longer chain). + const expectedHead = "0x" + (tipNumber + 3).toString(16); + await waitForBlock(context.web3, expectedHead, 20_000); + + // After the reorg, the block at fork A's height belongs to fork B. + const blockAfterReorg = (await customRequest(context.web3, "eth_getBlockByNumber", [a1HeightHex, false])) + .result; + expect(blockAfterReorg).to.not.be.null; + expect(blockAfterReorg.hash).to.not.equal(a1EthHash, "block at reorg height must be from fork B after reorg"); + + // The fork A tx should either be null (reorged out and not re-included) + // or have a different block hash (re-included in fork B). Since fork B + // was built without the tx in the pool, it likely won't be re-included. + const txAfterReorg = (await customRequest(context.web3, "eth_getTransactionByHash", [forkATxHash])).result; + if (txAfterReorg !== null) { + expect(txAfterReorg.blockHash).to.not.equal( + a1EthHash, + "reorged tx must not reference the losing fork's Ethereum block hash" + ); + } + }); +});