Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@

- [#6856](https://github.com/ChainSafe/forest/pull/6856): Return ethereum compatible error `BlockRangeExceeded` with code `-32005` when block range exceeds in the eth filter and logs API.

- [`#6893`](https://github.com/ChainSafe/forest/issues/6893): Fixed occasional lock contention during tipset validation.

## Forest v0.32.4 "Mild Inconvenience"

This is a non-mandatory release for all node operators. It enables F3 finality resolution on ETH v1 RPC methods.
Expand Down
84 changes: 74 additions & 10 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,21 +175,48 @@ async fn maybe_import_snapshot(
.client
.snapshot_head
.unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch());
assert!(current_height.is_positive());
match validate_from.is_negative() {
// allow --height=-1000 to scroll back from the current head
true => ctx
.state_manager
.validate_range((current_height + validate_from)..=current_height)?,
false => ctx
.state_manager
.validate_range(validate_from..=current_height)?,
}

let validation_range = validation_range(current_height, validate_from)?;
// `validate_range` is CPU-bound (drives rayon-parallel VM execution) and
// can run for minutes. Safer to spawn it on a blocking thread.
let state_manager = ctx.state_manager.clone();
tokio::task::spawn_blocking(move || state_manager.validate_range(validation_range))
.await??;
}

Ok(())
}

/// Returns the range of epochs to validate. This includes special handling for negative `from`
/// values, which are interpreted as offsets from the current epoch.
fn validation_range(
current: ChainEpoch,
from: ChainEpoch,
) -> anyhow::Result<std::ops::RangeInclusive<ChainEpoch>> {
anyhow::ensure!(
current.is_positive(),
"current head epoch {current} is invalid"
);

// Negative values scroll back from the current head (e.g. --height=-1000).
// `saturating_add` + `.max(0)` keeps extreme negatives from underflowing or
// wrapping to a huge positive (which would silently produce an empty range).
let start = if from.is_negative() {
current.saturating_add(from).max(0)
} else {
from
};

// An absolute `--height` past the head would otherwise produce an empty
// range and silently succeed without validating anything.
anyhow::ensure!(
start <= current,
"requested validation start epoch {start} is beyond the current head at epoch {current}",
);

Ok(start..=current)
}

async fn maybe_start_metrics_service(
services: &mut JoinSet<anyhow::Result<()>>,
config: &Config,
Expand Down Expand Up @@ -820,3 +847,40 @@ where
{
tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") })
}

#[cfg(test)]
mod tests {
use rstest::rstest;

use super::*;

#[rstest]
#[case::current_non_positive(0, 1, anyhow::Result::Err(anyhow::anyhow!(
"current head epoch 0 is invalid"
)))]
#[case::current_non_positive(-1, 1, anyhow::Result::Err(anyhow::anyhow!(
"current head epoch 0 is invalid"
)))]
#[case::from_positive_beyond_head(10, 11, anyhow::Result::Err(anyhow::anyhow!(
"requested validation start epoch 11 is beyond the current head at epoch 10"
)))]
#[case::from_positive_within_range(10, 5, anyhow::Result::Ok(5..=10))]
#[case::from_zero(10, 0, anyhow::Result::Ok(0..=10))]
#[case::from_negative_within_range(10, -5, anyhow::Result::Ok(5..=10))]
#[case::from_negative_beyond_range(10, -15, anyhow::Result::Ok(0..=10))]
fn test_validation_range(
#[case] current: ChainEpoch,
#[case] from: ChainEpoch,
#[case] expected: anyhow::Result<std::ops::RangeInclusive<ChainEpoch>>,
) {
let result = validation_range(current, from);
match expected {
Ok(expected_range) => {
assert_eq!(result.unwrap(), expected_range);
}
Err(_) => {
assert!(result.is_err());
}
}
}
}
85 changes: 40 additions & 45 deletions src/state_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ use itertools::Itertools as _;
use nonzero_ext::nonzero;
use num::BigInt;
use num_traits::identities::Zero;
use rayon::prelude::ParallelBridge;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::ops::RangeInclusive;
Expand Down Expand Up @@ -1645,8 +1644,10 @@ where

/// Validates all tipsets at epoch `start..=end` behind the heaviest tipset.
///
/// This spawns [`rayon::current_num_threads`] threads to do the compute-heavy work
/// of tipset validation.
/// Tipsets are processed sequentially. The compute-intensive work inside each
/// tipset (`bellperson` proof verification, FVM batch seal verification, etc.)
/// is already heavily rayon-parallelized. Parallelizing the outer loop actually introduces
/// some issues due to locks in the aforementioned crates. So don't do it.
///
/// # What is validation?
/// Every state transition returns a new _state root_, which is typically retained in, e.g., snapshots.
Expand All @@ -1662,10 +1663,6 @@ where
/// - assert that they match
///
/// See [`Self::compute_tipset_state_blocking`] for an explanation of state transitions.
///
/// # Known issues
/// This function is blocking, but we do observe threads waiting and synchronizing.
/// This is suspected to be due something in the VM or its `WASM` runtime.
#[tracing::instrument(skip(self))]
pub fn validate_range(&self, epochs: RangeInclusive<i64>) -> anyhow::Result<()> {
let heaviest = self.heaviest_tipset();
Expand Down Expand Up @@ -1852,44 +1849,42 @@ where
DB: Blockstore + Send + Sync + 'static,
T: Iterator<Item = Tipset> + Send,
{
use rayon::iter::ParallelIterator as _;
tipsets
.tuple_windows()
.par_bridge()
.try_for_each(|(child, parent)| {
info!(height = parent.epoch(), "compute parent state");
let ExecutedTipset {
state_root: actual_state,
receipt_root: actual_receipt,
..
} = apply_block_messages(
genesis_timestamp,
chain_index.shallow_clone(),
chain_config.shallow_clone(),
beacon.shallow_clone(),
engine,
parent,
NO_CALLBACK,
VMTrace::NotTraced,
)
.context("couldn't compute tipset state")?;
let expected_receipt = child.min_ticket_block().message_receipts;
let expected_state = child.parent_state();
match (expected_state, expected_receipt) == (&actual_state, actual_receipt) {
true => Ok(()),
false => {
error!(
height = child.epoch(),
?expected_state,
?expected_receipt,
?actual_state,
?actual_receipt,
"state mismatch"
);
bail!("state mismatch");
}
}
})
// Validate one tipset at a time. Parallelizing the outer loop across tipsets
// might wedge the global rayon pool.
// Sequential outer iteration leaves the entire rayon pool free for that
// already-rich inner parallelism.
for (child, parent) in tipsets.tuple_windows() {
info!(height = parent.epoch(), "compute parent state");
let ExecutedTipset {
state_root: actual_state,
receipt_root: actual_receipt,
..
} = apply_block_messages(
genesis_timestamp,
chain_index.shallow_clone(),
chain_config.shallow_clone(),
beacon.shallow_clone(),
engine,
parent,
NO_CALLBACK,
VMTrace::NotTraced,
)
.context("couldn't compute tipset state")?;
let expected_receipt = child.min_ticket_block().message_receipts;
let expected_state = child.parent_state();
if (expected_state, expected_receipt) != (&actual_state, actual_receipt) {
error!(
height = child.epoch(),
?expected_state,
?expected_receipt,
?actual_state,
?actual_receipt,
"state mismatch"
);
bail!("state mismatch");
}
}
Ok(())
}

/// Shared context for creating VMs and preparing tipset state.
Expand Down
Loading