Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions src/chain_sync/tipset_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::shim::{
address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
gas::price_list_by_network_version, message::Message, state_tree::StateTree,
};
use crate::state_manager::StateLookupPolicy;
use crate::state_manager::{Error as StateManagerError, StateManager, utils::is_valid_for_sending};
use crate::{
blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
Expand Down Expand Up @@ -101,6 +102,21 @@ pub async fn validate_tipset<DB: Blockstore + Send + Sync + 'static>(
state_manager: &Arc<StateManager<DB>>,
full_tipset: FullTipset,
bad_block_cache: Option<Arc<BadBlockCache>>,
) -> Result<(), TipsetSyncerError> {
validate_tipset_internal(
state_manager,
full_tipset,
bad_block_cache,
StateLookupPolicy::Enabled,
Comment thread
hanabi1224 marked this conversation as resolved.
Outdated
)
.await
}

pub(crate) async fn validate_tipset_internal<DB: Blockstore + Send + Sync + 'static>(
state_manager: &Arc<StateManager<DB>>,
full_tipset: FullTipset,
bad_block_cache: Option<Arc<BadBlockCache>>,
state_lookup: StateLookupPolicy,
) -> Result<(), TipsetSyncerError> {
if full_tipset
.key()
Expand All @@ -118,7 +134,11 @@ pub async fn validate_tipset<DB: Blockstore + Send + Sync + 'static>(
let blocks = full_tipset.into_blocks();
let mut validations = JoinSet::new();
for b in blocks {
validations.spawn(validate_block(state_manager.clone(), Arc::new(b)));
validations.spawn(validate_block(
state_manager.clone(),
Arc::new(b),
state_lookup,
));
}

while let Some(result) = validations.join_next().await {
Expand Down Expand Up @@ -171,6 +191,7 @@ pub async fn validate_tipset<DB: Blockstore + Send + Sync + 'static>(
async fn validate_block<DB: Blockstore + Sync + Send + 'static>(
state_manager: Arc<StateManager<DB>>,
block: Arc<Block>,
state_lookup: StateLookupPolicy,
) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
trace!(
Expand Down Expand Up @@ -230,6 +251,7 @@ async fn validate_block<DB: Blockstore + Sync + Send + 'static>(
state_manager.clone(),
block.clone(),
base_tipset.clone(),
state_lookup,
));

// Base fee check
Expand Down Expand Up @@ -278,7 +300,7 @@ async fn validate_block<DB: Blockstore + Sync + Send + 'static>(
async move {
let header = block.header();
let (state_root, receipt_root) = state_manager
.tipset_state(&base_tipset)
.tipset_state_internal(&base_tipset, state_lookup)
.await
.map_err(|e| {
TipsetSyncerError::Calculation(format!("Failed to calculate state: {e}"))
Expand Down Expand Up @@ -356,6 +378,7 @@ async fn check_block_messages<DB: Blockstore + Send + Sync + 'static>(
state_manager: Arc<StateManager<DB>>,
block: Arc<Block>,
base_tipset: Tipset,
state_lookup: StateLookupPolicy,
) -> Result<(), TipsetSyncerError> {
let network_version = state_manager
.chain_config()
Expand Down Expand Up @@ -439,7 +462,7 @@ async fn check_block_messages<DB: Blockstore + Send + Sync + 'static>(

let mut account_sequences: HashMap<Address, u64> = HashMap::default();
let (state_root, _) = state_manager
.tipset_state(&base_tipset)
.tipset_state_internal(&base_tipset, state_lookup)
.await
.map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e}")))?;
let tree =
Expand Down
13 changes: 8 additions & 5 deletions src/dev/subcommands/state_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{
blocks::Tipset,
chain::{ChainStore, index::ResolveNullTipset},
chain_sync::{load_full_tipset, tipset_syncer::validate_tipset},
chain_sync::{load_full_tipset, tipset_syncer::validate_tipset_internal},
cli_shared::{chain_path, read_config},
db::{
MemoryDB, SettingsStoreExt,
Expand All @@ -15,7 +15,7 @@ use crate::{
interpreter::VMTrace,
networks::{ChainConfig, NetworkChain},
shim::clock::ChainEpoch,
state_manager::{StateManager, StateOutput},
state_manager::{StateLookupPolicy, StateManager, StateOutput},
tool::subcommands::api_cmd::generate_test_snapshot,
};
use nonzero_ext::nonzero;
Expand Down Expand Up @@ -236,7 +236,7 @@ impl ValidateCommand {
let epoch = ts.epoch();
let fts = load_full_tipset(&chain_store, ts.key())?;
let state_manager = Arc::new(StateManager::new(chain_store)?);
validate_tipset(&state_manager, fts, None).await?;
validate_tipset_internal(&state_manager, fts, None, StateLookupPolicy::Disabled).await?;
let mut db_snapshot = vec![];
db.export_forest_car(&mut db_snapshot).await?;
println!(
Expand Down Expand Up @@ -287,7 +287,8 @@ impl ReplayValidateCommand {
for _ in 0..n.get() {
let fts = fts.clone();
let start = Instant::now();
validate_tipset(&state_manager, fts, None).await?;
validate_tipset_internal(&state_manager, fts, None, StateLookupPolicy::Disabled)
.await?;
println!(
"epoch: {epoch}, took {}.",
humantime::format_duration(start.elapsed())
Expand All @@ -298,5 +299,7 @@ impl ReplayValidateCommand {
}

fn disable_tipset_cache() {
unsafe { std::env::set_var("FOREST_TIPSET_CACHE_DISABLED", "1") };
unsafe {
std::env::set_var("FOREST_TIPSET_CACHE_DISABLED", "1");
}
}
24 changes: 22 additions & 2 deletions src/state_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,17 +459,27 @@ where
/// either be cached or will be calculated and fill the cache. Tipset
/// state for a given tipset is guaranteed not to be computed twice.
pub async fn tipset_state(self: &Arc<Self>, tipset: &Tipset) -> anyhow::Result<CidPair> {
self.tipset_state_internal(tipset, StateLookupPolicy::Enabled)
.await
}

pub(crate) async fn tipset_state_internal(
self: &Arc<Self>,
tipset: &Tipset,
state_lookup: StateLookupPolicy,
) -> anyhow::Result<CidPair> {
let StateOutput {
state_root,
receipt_root,
..
} = self.tipset_state_output(tipset).await?;
} = self.tipset_state_output(tipset, state_lookup).await?;
Ok((state_root, receipt_root))
}

pub async fn tipset_state_output(
self: &Arc<Self>,
tipset: &Tipset,
state_lookup: StateLookupPolicy,
) -> anyhow::Result<StateOutput> {
let key = tipset.key();
self.cache
Expand All @@ -483,7 +493,9 @@ where

// First, try to look up the state and receipt if not found in the blockstore
// compute it
if let Some(state_from_child) = self.try_lookup_state_from_next_tipset(tipset) {
if matches!(state_lookup, StateLookupPolicy::Enabled)
&& let Some(state_from_child) = self.try_lookup_state_from_next_tipset(tipset)
{
return Ok(state_from_child);
}

Expand Down Expand Up @@ -2075,3 +2087,11 @@ where

Ok(output)
}

/// Whether or not to lookup the state output from the next tipset before computing a state
#[derive(Debug, Copy, Clone, Default)]
pub enum StateLookupPolicy {
#[default]
Enabled,
Disabled,
}
46 changes: 42 additions & 4 deletions src/state_manager/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ pub mod state_compute {
use crate::{
blocks::Tipset,
chain::store::ChainStore,
daemon::bundle::load_actor_bundles,
db::{
MemoryDB,
car::{AnyCar, ManyCar},
Expand Down Expand Up @@ -262,6 +263,7 @@ pub mod state_compute {
genesis_header,
)?);
let state_manager = Arc::new(StateManager::new(chain_store.clone())?);
load_actor_bundles(&db, chain).await?;
if warmup {
state_compute(state_manager.clone(), ts.clone()).await;
}
Expand All @@ -280,17 +282,18 @@ pub mod state_compute {
use super::*;
use crate::{
blocks::FullTipset,
chain_sync::{load_full_tipset, tipset_syncer::validate_tipset},
chain_sync::{load_full_tipset, tipset_syncer::validate_tipset_internal},
state_manager::StateLookupPolicy,
};

pub async fn get_state_validate_snapshot(
async fn get_state_validate_snapshot(
chain: &NetworkChain,
epoch: i64,
) -> anyhow::Result<PathBuf> {
get_state_snapshot(chain, "state_validate", epoch).await
}

pub async fn prepare_state_validate(
async fn prepare_state_validate(
chain: &NetworkChain,
snapshot: &Path,
) -> anyhow::Result<(Arc<StateManager<ManyCar>>, FullTipset)> {
Expand Down Expand Up @@ -347,7 +350,42 @@ pub mod state_compute {
let chain = NetworkChain::Mainnet;
let snapshot = get_state_validate_snapshot(&chain, 5688000).await.unwrap();
let (sm, fts) = prepare_state_validate(&chain, &snapshot).await.unwrap();
validate_tipset(&sm, fts, None).await.unwrap();
validate_tipset_internal(&sm, fts, None, StateLookupPolicy::Disabled)
.await
.unwrap();
}

// Shark state migration
#[tokio::test(flavor = "multi_thread")]
async fn state_validate_calibnet_16802() {
let chain = NetworkChain::Calibnet;
let snapshot = get_state_validate_snapshot(&chain, 16802).await.unwrap();
let (sm, fts) = prepare_state_validate(&chain, &snapshot).await.unwrap();
validate_tipset_internal(&sm, fts, None, StateLookupPolicy::Disabled)
.await
.unwrap();
}

// Hygge state migration
#[tokio::test(flavor = "multi_thread")]
async fn state_validate_calibnet_322356() {
let chain = NetworkChain::Calibnet;
let snapshot = get_state_validate_snapshot(&chain, 322356).await.unwrap();
let (sm, fts) = prepare_state_validate(&chain, &snapshot).await.unwrap();
validate_tipset_internal(&sm, fts, None, StateLookupPolicy::Disabled)
.await
.unwrap();
}

// Lightning state migration
#[tokio::test(flavor = "multi_thread")]
async fn state_validate_calibnet_489096() {
let chain = NetworkChain::Calibnet;
let snapshot = get_state_validate_snapshot(&chain, 489096).await.unwrap();
let (sm, fts) = prepare_state_validate(&chain, &snapshot).await.unwrap();
validate_tipset_internal(&sm, fts, None, StateLookupPolicy::Disabled)
.await
.unwrap();
}
}
}
Expand Down
Loading