Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/blocks/tipset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use super::{Block, CachingBlockHeader, RawBlockHeader, Ticket};
use crate::{
chain_sync::TipsetValidator,
cid_collections::SmallCidNonEmptyVec,
networks::{calibnet, mainnet},
shim::clock::ChainEpoch,
Expand Down Expand Up @@ -546,6 +547,15 @@ impl FullTipset {
pub fn weight(&self) -> &BigInt {
&self.first_block().header().weight
}
/// Persists the tipset into the blockstore.
pub fn persist(&self, db: &impl Blockstore) -> anyhow::Result<()> {
for block in self.blocks() {
// To persist `TxMeta` that is required for loading tipset messages
TipsetValidator::validate_msg_root(db, block)?;
block.persist(db)?;
}
Ok(())
}
}

fn verify_block_headers<'a>(
Expand Down
12 changes: 9 additions & 3 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ pub fn block_messages<DB>(
where
DB: Blockstore,
{
let (bls_cids, secpk_cids) = read_msg_cids(db, &bh.messages)?;
let (bls_cids, secpk_cids) = read_msg_cids(db, bh)?;

let bls_msgs: Vec<Message> = messages_from_cids(db, &bls_cids)?;
let secp_msgs: Vec<SignedMessage> = messages_from_cids(db, &secpk_cids)?;
Expand All @@ -491,17 +491,23 @@ where
}

/// Returns a tuple of CIDs for both unsigned and signed messages
pub fn read_msg_cids<DB>(db: &DB, msg_cid: &Cid) -> Result<(Vec<Cid>, Vec<Cid>), Error>
pub fn read_msg_cids<DB>(
db: &DB,
block_header: &CachingBlockHeader,
) -> Result<(Vec<Cid>, Vec<Cid>), Error>
where
DB: Blockstore,
{
let msg_cid = &block_header.messages;
if let Some(roots) = db.get_cbor::<TxMeta>(msg_cid)? {
let bls_cids = read_amt_cids(db, &roots.bls_message_root)?;
let secpk_cids = read_amt_cids(db, &roots.secp_message_root)?;
Ok((bls_cids, secpk_cids))
} else {
Err(Error::UndefinedKey(format!(
"no msg root with cid {msg_cid}"
"no msg root with cid {msg_cid} at epoch {} in block {}",
block_header.epoch,
block_header.cid(),
)))
}
}
Expand Down
23 changes: 23 additions & 0 deletions src/chain/store/index.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2019-2025 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::sync::LazyLock;
use std::{num::NonZeroUsize, sync::Arc};

use crate::beacon::{BeaconEntry, IGNORE_DRAND_VAR};
Expand All @@ -13,6 +14,7 @@ use crate::utils::misc::env::is_env_truthy;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;
use nonzero_ext::nonzero;
use num::Integer;

const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(131072_usize);

Expand Down Expand Up @@ -120,6 +122,23 @@ impl<DB: Blockstore> ChainIndex<DB> {
from: Arc<Tipset>,
resolve: ResolveNullTipset,
) -> Result<Arc<Tipset>, Error> {
const CHECKPOINT_INTERVAL: ChainEpoch = 1000;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the chain finality constant here, or a value expressed in terms of chain finality?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

static CACHE: LazyLock<SizeTrackingLruCache<ChainEpoch, Arc<Tipset>>> =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why choose Tipset for the value instead of just TipsetKey? I expect a Tipset to have a much larger memory footprint.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Change to using TipsetKey

@elmattic elmattic Aug 18, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will a chekpoint tipset value be cached in the ChainStore's TipsetCache? If not, is this possible or do we even want this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to using self.load_tipset instead which is tipset_cache aware

LazyLock::new(|| {
SizeTrackingLruCache::new_with_default_metrics_registry(
"tipset_by_height".into(),
4096.try_into().expect("infallible"),
)
});

fn next_checkpoint(epoch: ChainEpoch) -> ChainEpoch {
epoch - epoch.mod_floor(&CHECKPOINT_INTERVAL) + CHECKPOINT_INTERVAL
}

let checkpoint_from_epoch = next_checkpoint(to);
let checkpoint_from = CACHE.get_cloned(&checkpoint_from_epoch);
let from = checkpoint_from.unwrap_or(from);

Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
if to == 0 {
return Ok(Arc::new(Tipset::from(from.genesis(&self.db)?)));
}
Expand All @@ -131,6 +150,10 @@ impl<DB: Blockstore> ChainIndex<DB> {
}

for (child, parent) in self.chain(from).tuple_windows() {
if child.epoch() % CHECKPOINT_INTERVAL == 0 {
CACHE.push(child.epoch(), child.clone());
}

if to == child.epoch() {
return Ok(child);
}
Expand Down
35 changes: 15 additions & 20 deletions src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
network.clone(),
state_manager.chain_store().clone(),
Some(source),
tipset_keys,
&tipset_keys,
)
.await
.inspect_err(|e| debug!("Querying full tipset failed: {}", e))
Expand All @@ -188,7 +188,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
network.clone(),
state_manager.chain_store().clone(),
None,
key,
&key,
)
.await
}
Expand Down Expand Up @@ -431,25 +431,22 @@ fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
network.peer_manager().unmark_peer_bad(&peer_id);
}

async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
network: SyncNetworkContext<DB>,
chain_store: Arc<ChainStore<DB>>,
peer_id: Option<PeerId>,
tipset_keys: TipsetKey,
tipset_keys: &TipsetKey,
) -> anyhow::Result<FullTipset> {
// Attempt to load from the store
if let Ok(full_tipset) = load_full_tipset(&chain_store, tipset_keys.clone()) {
if let Ok(full_tipset) = load_full_tipset(&chain_store, tipset_keys) {
return Ok(full_tipset);
}
// Load from the network
let tipset = network
.chain_exchange_fts(peer_id, &tipset_keys.clone())
.chain_exchange_full_tipset(peer_id, tipset_keys)
.await
.map_err(|e| anyhow::anyhow!(e))?;

for block in tipset.blocks() {
block.persist(&chain_store.db)?;
}
tipset.persist(&chain_store.db)?;

Ok(tipset)
}
Expand All @@ -458,15 +455,15 @@ async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
network: SyncNetworkContext<DB>,
chain_store: Arc<ChainStore<DB>>,
peer_id: Option<PeerId>,
tipset_keys: TipsetKey,
tipset_keys: &TipsetKey,
) -> anyhow::Result<Vec<FullTipset>> {
// Attempt to load from the store
if let Ok(full_tipset) = load_full_tipset(&chain_store, tipset_keys.clone()) {
if let Ok(full_tipset) = load_full_tipset(&chain_store, tipset_keys) {
return Ok(vec![full_tipset]);
}
// Load from the network
let tipsets = network
.chain_exchange_full_tipsets(peer_id, &tipset_keys.clone())
.chain_exchange_full_tipsets(peer_id, tipset_keys)
.await
.map_err(|e| anyhow::anyhow!(e))?;

Expand All @@ -479,13 +476,12 @@ async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
Ok(tipsets)
}

fn load_full_tipset<DB: Blockstore>(
pub fn load_full_tipset<DB: Blockstore>(
chain_store: &ChainStore<DB>,
tipset_keys: TipsetKey,
tipset_keys: &TipsetKey,
) -> anyhow::Result<FullTipset> {
// Retrieve tipset from store based on passed in TipsetKey
let ts = chain_store.chain_index.load_required_tipset(&tipset_keys)?;

let ts = chain_store.chain_index.load_required_tipset(tipset_keys)?;
let blocks: Vec<_> = ts
.block_headers()
.iter()
Expand All @@ -499,7 +495,6 @@ fn load_full_tipset<DB: Blockstore>(
})
})
.try_collect()?;

// Construct FullTipset
let fts = FullTipset::new(blocks)?;
Ok(fts)
Expand Down Expand Up @@ -602,7 +597,7 @@ impl<DB: Blockstore> SyncStateMachine<DB> {
if self.stateless_mode || tipset.key() == self.cs.genesis_tipset().key() {
// Skip validation in stateless mode and for genesis tipset
true
} else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents().clone()) {
} else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents()) {
let head_ts = self.cs.heaviest_tipset();
// Treat post-head-epoch tipsets as not validated to fix <https://github.com/ChainSafe/forest/issues/5677>
// basically, the follow task should always start from the current head which could be manually set
Expand Down Expand Up @@ -877,7 +872,7 @@ impl SyncTask {
}
SyncTask::FetchTipset(key, _epoch) => {
if let Ok(parents) =
get_full_tipset_batch(network.clone(), cs.clone(), None, key).await
get_full_tipset_batch(network.clone(), cs.clone(), None, &key).await
{
Some(SyncEvent::NewFullTipsets(
parents.into_iter().map(Arc::new).collect(),
Expand Down
2 changes: 1 addition & 1 deletion src/chain_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod validation;

pub use self::{
bad_block_cache::BadBlockCache,
chain_follower::ChainFollower,
chain_follower::{ChainFollower, load_full_tipset},
chain_muxer::SyncConfig,
consensus::collect_errs,
sync_status::{ForkSyncInfo, ForkSyncStage, NodeSyncStatus, SyncStatusReport},
Expand Down
36 changes: 33 additions & 3 deletions src/chain_sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,39 @@ where
.await
}

/// Send a `chain_exchange` request for messages to assemble a full tipset with a local tipset,
/// If `peer_id` is `None`, requests will be sent to a set of shuffled peers.
pub async fn chain_exchange_messages(
&self,
peer_id: Option<PeerId>,
ts: &Tipset,
) -> Result<FullTipset, String> {
let mut bundles: Vec<TipsetBundle> = self
.handle_chain_exchange_request(
peer_id,
ts.key(),
NonZeroU64::new(1).expect("Infallible"),
MESSAGES,
|_| true,
)
.await
.expect("infallible");

if bundles.len() != 1 {
return Err(format!(
"chain exchange request returned {} tipsets, 1 expected.",
bundles.len()
));
}
let mut bundle = bundles.remove(0);
bundle.blocks = ts.block_headers().to_vec();
bundle.try_into()
}

Comment thread
coderabbitai[bot] marked this conversation as resolved.
/// Send a `chain_exchange` request for a single full tipset (includes
/// messages) If `peer_id` is `None`, requests will be sent to a set of
/// shuffled peers.
pub async fn chain_exchange_fts(
pub async fn chain_exchange_full_tipset(
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKey,
Expand All @@ -179,7 +208,7 @@ where

if fts.len() != 1 {
return Err(format!(
"Full tipset request returned {} tipsets",
"Full tipset request returned {} tipsets, 1 expected.",
fts.len()
));
}
Expand Down Expand Up @@ -212,7 +241,8 @@ where
validate: F,
) -> Result<Vec<T>, String>
where
T: TryFrom<TipsetBundle, Error = String> + Send + Sync + 'static,
T: TryFrom<TipsetBundle> + Send + Sync + 'static,
<T as TryFrom<TipsetBundle>>::Error: std::fmt::Display,
F: Fn(&Vec<T>) -> bool,
{
let request = ChainExchangeRequest {
Expand Down
3 changes: 1 addition & 2 deletions src/chain_sync/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl TipsetValidator<'_> {
// matches the mst root in the block header 2. Ensuring it has not
// previously been seen in the bad blocks cache
for block in self.0.blocks() {
self.validate_msg_root(&chainstore.db, block)?;
Self::validate_msg_root(&chainstore.db, block)?;
if let Some(bad_block_cache) = bad_block_cache
&& bad_block_cache.peek(block.cid()).is_some()
{
Expand Down Expand Up @@ -104,7 +104,6 @@ impl TipsetValidator<'_> {
}

pub fn validate_msg_root<DB: Blockstore>(
&self,
blockstore: &DB,
block: &Block,
) -> Result<(), TipsetValidationError> {
Expand Down
8 changes: 6 additions & 2 deletions src/libp2p/chain_exchange/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,19 @@ impl ChainExchangeResponse {
/// implementation.
pub fn into_result<T>(self) -> Result<Vec<T>, String>
where
T: TryFrom<TipsetBundle, Error = String>,
T: TryFrom<TipsetBundle>,
<T as TryFrom<TipsetBundle>>::Error: std::fmt::Display,
{
if self.status != ChainExchangeResponseStatus::Success
&& self.status != ChainExchangeResponseStatus::PartialResponse
{
return Err(format!("Status {:?}: {}", self.status, self.message));
}

self.chain.into_iter().map(T::try_from).collect()
self.chain
.into_iter()
.map(|i| T::try_from(i).map_err(|e| e.to_string()))
.collect()
}
}
/// Contains all BLS and SECP messages and their indexes per block
Expand Down
2 changes: 1 addition & 1 deletion src/libp2p/chain_exchange/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ where
let mut secp_msg_includes: Vec<Vec<u64>> = vec![];

for block_header in tipset.block_headers().iter() {
let (bls_cids, secp_cids) = crate::chain::read_msg_cids(db, &block_header.messages)?;
let (bls_cids, secp_cids) = crate::chain::read_msg_cids(db, block_header)?;

let mut bls_include = Vec::with_capacity(bls_cids.len());
for bls_cid in bls_cids.into_iter() {
Expand Down
3 changes: 1 addition & 2 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,7 @@ impl RpcMethod<1> for ChainGetBlockMessages {
(block_cid,): Self::Params,
) -> Result<Self::Ok, ServerError> {
let blk: CachingBlockHeader = ctx.store().get_cbor_required(&block_cid)?;
let blk_msgs = &blk.messages;
let (unsigned_cids, signed_cids) = crate::chain::read_msg_cids(ctx.store(), blk_msgs)?;
let (unsigned_cids, signed_cids) = crate::chain::read_msg_cids(ctx.store(), &blk)?;
let (bls_msg, secp_msg) =
crate::chain::block_messages_from_cids(ctx.store(), &unsigned_cids, &signed_cids)?;
let cids = unsigned_cids.into_iter().chain(signed_cids).collect();
Expand Down
4 changes: 2 additions & 2 deletions src/rpc/methods/f3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl GetPowerTable {
const BLOCKSTORE_CACHE_CAP: usize = 65536;
static BLOCKSTORE_CACHE: LazyLock<LruBlockstoreReadCache> = LazyLock::new(|| {
LruBlockstoreReadCache::new_with_default_metrics_registry(
"get_powertable_cache".into(),
"get_powertable".into(),
BLOCKSTORE_CACHE_CAP.try_into().expect("Infallible"),
)
});
Expand Down Expand Up @@ -583,7 +583,7 @@ impl RpcMethod<1> for Finalize {
);
let fts = ctx
.sync_network_context
.chain_exchange_fts(None, &tsk)
.chain_exchange_full_tipset(None, &tsk)
.await?;
for block in fts.blocks() {
block.persist(ctx.store())?;
Expand Down
19 changes: 13 additions & 6 deletions src/rpc/methods/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1432,18 +1432,25 @@ impl RpcMethod<1> for ForestStateCompute {
ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
(epoch,): Self::Params,
) -> Result<Self::Ok, ServerError> {
let tipset = ctx.chain_index().tipset_by_height(
let ts = ctx.chain_index().tipset_by_height(
epoch,
ctx.chain_store().heaviest_tipset(),
ResolveNullTipset::TakeOlder,
)?;
// Attempt to load full tipset from the store
if crate::chain_sync::load_full_tipset(ctx.chain_store(), ts.key()).is_err() {
// Load full tipset from the network
let fts = ctx
.sync_network_context
.chain_exchange_messages(None, &ts)
.await
.map_err(|e| anyhow::anyhow!(e))?;
fts.persist(ctx.store())?;
}

let StateOutput { state_root, .. } = ctx
.state_manager
.compute_tipset_state(
tipset,
crate::state_manager::NO_CALLBACK,
VMTrace::NotTraced,
)
.compute_tipset_state(ts, crate::state_manager::NO_CALLBACK, VMTrace::NotTraced)
.await?;

Ok(state_root)
Expand Down
Loading