diff --git a/CHANGELOG.md b/CHANGELOG.md index f8404b43a57..2839e686f8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,8 @@ - [#6856](https://github.com/ChainSafe/forest/pull/6856): Return ethereum compatible error `BlockRangeExceeded` with code `-32005` when block range exceeds in the eth filter and logs API. +- [`#6893`](https://github.com/ChainSafe/forest/issues/6893): Fixed occasional lock contention during tipset validation. + ## Forest v0.32.4 "Mild Inconvenience" This is a non-mandatory release for all node operators. It enables F3 finality resolution on ETH v1 RPC methods. diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 5dec2067c99..46f6668150a 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -175,21 +175,48 @@ async fn maybe_import_snapshot( .client .snapshot_head .unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch()); - assert!(current_height.is_positive()); - match validate_from.is_negative() { - // allow --height=-1000 to scroll back from the current head - true => ctx - .state_manager - .validate_range((current_height + validate_from)..=current_height)?, - false => ctx - .state_manager - .validate_range(validate_from..=current_height)?, - } + + let validation_range = validation_range(current_height, validate_from)?; + // `validate_range` is CPU-bound (drives rayon-parallel VM execution) and + // can run for minutes. Safer to spawn it on a blocking thread. + let state_manager = ctx.state_manager.clone(); + tokio::task::spawn_blocking(move || state_manager.validate_range(validation_range)) + .await??; } Ok(()) } +/// Returns the range of epochs to validate. This includes special handling for negative `from` +/// values, which are interpreted as offsets from the current epoch. +fn validation_range( + current: ChainEpoch, + from: ChainEpoch, +) -> anyhow::Result> { + anyhow::ensure!( + current.is_positive(), + "current head epoch {current} is invalid" + ); + + // Negative values scroll back from the current head (e.g. --height=-1000). + // `saturating_add` + `.max(0)` keeps extreme negatives from underflowing or + // wrapping to a huge positive (which would silently produce an empty range). + let start = if from.is_negative() { + current.saturating_add(from).max(0) + } else { + from + }; + + // An absolute `--height` past the head would otherwise produce an empty + // range and silently succeed without validating anything. + anyhow::ensure!( + start <= current, + "requested validation start epoch {start} is beyond the current head at epoch {current}", + ); + + Ok(start..=current) +} + async fn maybe_start_metrics_service( services: &mut JoinSet>, config: &Config, @@ -820,3 +847,40 @@ where { tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") }) } + +#[cfg(test)] +mod tests { + use rstest::rstest; + + use super::*; + + #[rstest] + #[case::current_non_positive(0, 1, anyhow::Result::Err(anyhow::anyhow!( + "current head epoch 0 is invalid" + )))] + #[case::current_non_positive(-1, 1, anyhow::Result::Err(anyhow::anyhow!( + "current head epoch 0 is invalid" + )))] + #[case::from_positive_beyond_head(10, 11, anyhow::Result::Err(anyhow::anyhow!( + "requested validation start epoch 11 is beyond the current head at epoch 10" + )))] + #[case::from_positive_within_range(10, 5, anyhow::Result::Ok(5..=10))] + #[case::from_zero(10, 0, anyhow::Result::Ok(0..=10))] + #[case::from_negative_within_range(10, -5, anyhow::Result::Ok(5..=10))] + #[case::from_negative_beyond_range(10, -15, anyhow::Result::Ok(0..=10))] + fn test_validation_range( + #[case] current: ChainEpoch, + #[case] from: ChainEpoch, + #[case] expected: anyhow::Result>, + ) { + let result = validation_range(current, from); + match expected { + Ok(expected_range) => { + assert_eq!(result.unwrap(), expected_range); + } + Err(_) => { + assert!(result.is_err()); + } + } + } +} diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index 9586618b019..0e3b1bffc12 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -75,7 +75,6 @@ use itertools::Itertools as _; use nonzero_ext::nonzero; use num::BigInt; use num_traits::identities::Zero; -use rayon::prelude::ParallelBridge; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::ops::RangeInclusive; @@ -1645,8 +1644,10 @@ where /// Validates all tipsets at epoch `start..=end` behind the heaviest tipset. /// - /// This spawns [`rayon::current_num_threads`] threads to do the compute-heavy work - /// of tipset validation. + /// Tipsets are processed sequentially. The compute-intensive work inside each + /// tipset (`bellperson` proof verification, FVM batch seal verification, etc.) + /// is already heavily rayon-parallelized. Parallelizing the outer loop actually introduces + /// some issues due to locks in the aforementioned crates. So don't do it. /// /// # What is validation? /// Every state transition returns a new _state root_, which is typically retained in, e.g., snapshots. @@ -1662,10 +1663,6 @@ where /// - assert that they match /// /// See [`Self::compute_tipset_state_blocking`] for an explanation of state transitions. - /// - /// # Known issues - /// This function is blocking, but we do observe threads waiting and synchronizing. - /// This is suspected to be due something in the VM or its `WASM` runtime. #[tracing::instrument(skip(self))] pub fn validate_range(&self, epochs: RangeInclusive) -> anyhow::Result<()> { let heaviest = self.heaviest_tipset(); @@ -1852,44 +1849,42 @@ where DB: Blockstore + Send + Sync + 'static, T: Iterator + Send, { - use rayon::iter::ParallelIterator as _; - tipsets - .tuple_windows() - .par_bridge() - .try_for_each(|(child, parent)| { - info!(height = parent.epoch(), "compute parent state"); - let ExecutedTipset { - state_root: actual_state, - receipt_root: actual_receipt, - .. - } = apply_block_messages( - genesis_timestamp, - chain_index.shallow_clone(), - chain_config.shallow_clone(), - beacon.shallow_clone(), - engine, - parent, - NO_CALLBACK, - VMTrace::NotTraced, - ) - .context("couldn't compute tipset state")?; - let expected_receipt = child.min_ticket_block().message_receipts; - let expected_state = child.parent_state(); - match (expected_state, expected_receipt) == (&actual_state, actual_receipt) { - true => Ok(()), - false => { - error!( - height = child.epoch(), - ?expected_state, - ?expected_receipt, - ?actual_state, - ?actual_receipt, - "state mismatch" - ); - bail!("state mismatch"); - } - } - }) + // Validate one tipset at a time. Parallelizing the outer loop across tipsets + // might wedge the global rayon pool. + // Sequential outer iteration leaves the entire rayon pool free for that + // already-rich inner parallelism. + for (child, parent) in tipsets.tuple_windows() { + info!(height = parent.epoch(), "compute parent state"); + let ExecutedTipset { + state_root: actual_state, + receipt_root: actual_receipt, + .. + } = apply_block_messages( + genesis_timestamp, + chain_index.shallow_clone(), + chain_config.shallow_clone(), + beacon.shallow_clone(), + engine, + parent, + NO_CALLBACK, + VMTrace::NotTraced, + ) + .context("couldn't compute tipset state")?; + let expected_receipt = child.min_ticket_block().message_receipts; + let expected_state = child.parent_state(); + if (expected_state, expected_receipt) != (&actual_state, actual_receipt) { + error!( + height = child.epoch(), + ?expected_state, + ?expected_receipt, + ?actual_state, + ?actual_receipt, + "state mismatch" + ); + bail!("state mismatch"); + } + } + Ok(()) } /// Shared context for creating VMs and preparing tipset state.