Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/blocks/tipset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use super::{Block, CachingBlockHeader, RawBlockHeader, Ticket};
use crate::{
chain_sync::TipsetValidator,
cid_collections::SmallCidNonEmptyVec,
networks::{calibnet, mainnet},
shim::clock::ChainEpoch,
Expand Down Expand Up @@ -546,6 +547,15 @@ impl FullTipset {
pub fn weight(&self) -> &BigInt {
&self.first_block().header().weight
}
/// Persists the tipset into the blockstore.
pub fn persist(&self, db: &impl Blockstore) -> anyhow::Result<()> {
for block in self.blocks() {
// To persist `TxMeta` that is required for loading tipset messages
TipsetValidator::validate_msg_root(db, block)?;
block.persist(db)?;
}
Ok(())
}
}

fn verify_block_headers<'a>(
Expand Down
12 changes: 9 additions & 3 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ pub fn block_messages<DB>(
where
DB: Blockstore,
{
let (bls_cids, secpk_cids) = read_msg_cids(db, &bh.messages)?;
let (bls_cids, secpk_cids) = read_msg_cids(db, bh)?;

let bls_msgs: Vec<Message> = messages_from_cids(db, &bls_cids)?;
let secp_msgs: Vec<SignedMessage> = messages_from_cids(db, &secpk_cids)?;
Expand All @@ -491,17 +491,23 @@ where
}

/// Returns a tuple of CIDs for both unsigned and signed messages
pub fn read_msg_cids<DB>(db: &DB, msg_cid: &Cid) -> Result<(Vec<Cid>, Vec<Cid>), Error>
pub fn read_msg_cids<DB>(
db: &DB,
block_header: &CachingBlockHeader,
) -> Result<(Vec<Cid>, Vec<Cid>), Error>
where
DB: Blockstore,
{
let msg_cid = &block_header.messages;
if let Some(roots) = db.get_cbor::<TxMeta>(msg_cid)? {
let bls_cids = read_amt_cids(db, &roots.bls_message_root)?;
let secpk_cids = read_amt_cids(db, &roots.secp_message_root)?;
Ok((bls_cids, secpk_cids))
} else {
Err(Error::UndefinedKey(format!(
"no msg root with cid {msg_cid}"
"no msg root with cid {msg_cid} at epoch {} in block {}",
block_header.epoch,
block_header.cid(),
)))
}
}
Expand Down
37 changes: 36 additions & 1 deletion src/chain/store/index.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2019-2025 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::sync::LazyLock;
use std::{num::NonZeroUsize, sync::Arc};

use crate::beacon::{BeaconEntry, IGNORE_DRAND_VAR};
Expand All @@ -13,6 +14,7 @@ use crate::utils::misc::env::is_env_truthy;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;
use nonzero_ext::nonzero;
use num::Integer;

const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(131072_usize);

Expand Down Expand Up @@ -117,9 +119,36 @@ impl<DB: Blockstore> ChainIndex<DB> {
pub fn tipset_by_height(
&self,
to: ChainEpoch,
from: Arc<Tipset>,
mut from: Arc<Tipset>,
resolve: ResolveNullTipset,
) -> Result<Arc<Tipset>, Error> {
use crate::shim::policy::policy_constants::CHAIN_FINALITY;

static CACHE: LazyLock<SizeTrackingLruCache<ChainEpoch, TipsetKey>> = LazyLock::new(|| {
SizeTrackingLruCache::new_with_default_metrics_registry(
"tipset_by_height".into(),
4096.try_into().expect("infallible"),
)
});

// use `CHAIN_FINALITY` as checkpoint interval
fn next_checkpoint(epoch: ChainEpoch) -> ChainEpoch {
epoch - epoch.mod_floor(&CHAIN_FINALITY) + CHAIN_FINALITY
}

let from_epoch = from.epoch();

let mut checkpoint_from_epoch = to;
while checkpoint_from_epoch < from_epoch {
if let Some(checkpoint_from_key) = CACHE.get_cloned(&checkpoint_from_epoch)
&& let Ok(Some(checkpoint_from)) = self.load_tipset(&checkpoint_from_key)
{
from = checkpoint_from;
break;
}
checkpoint_from_epoch = next_checkpoint(checkpoint_from_epoch);
}

if to == 0 {
return Ok(Arc::new(Tipset::from(from.genesis(&self.db)?)));
}
Expand All @@ -131,6 +160,12 @@ impl<DB: Blockstore> ChainIndex<DB> {
}

for (child, parent) in self.chain(from).tuple_windows() {
// use `child.epoch() + CHAIN_FINALITY <= from_epoch`
// to ensure the cached child is finalized(not on a fork).
if child.epoch() % CHAIN_FINALITY == 0 && child.epoch() + CHAIN_FINALITY <= from_epoch {
CACHE.push(child.epoch(), child.key().clone());
}

if to == child.epoch() {
return Ok(child);
}
Expand Down
35 changes: 15 additions & 20 deletions src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
network.clone(),
state_manager.chain_store().clone(),
Some(source),
tipset_keys,
&tipset_keys,
)
.await
.inspect_err(|e| debug!("Querying full tipset failed: {}", e))
Expand All @@ -188,7 +188,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
network.clone(),
state_manager.chain_store().clone(),
None,
key,
&key,
)
.await
}
Expand Down Expand Up @@ -431,25 +431,22 @@ fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
network.peer_manager().unmark_peer_bad(&peer_id);
}

async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
network: SyncNetworkContext<DB>,
chain_store: Arc<ChainStore<DB>>,
peer_id: Option<PeerId>,
tipset_keys: TipsetKey,
tipset_keys: &TipsetKey,
) -> anyhow::Result<FullTipset> {
// Attempt to load from the store
if let Ok(full_tipset) = load_full_tipset(&chain_store, tipset_keys.clone()) {
if let Ok(full_tipset) = load_full_tipset(&chain_store, tipset_keys) {
return Ok(full_tipset);
}
// Load from the network
let tipset = network
.chain_exchange_fts(peer_id, &tipset_keys.clone())
.chain_exchange_full_tipset(peer_id, tipset_keys)
.await
.map_err(|e| anyhow::anyhow!(e))?;

for block in tipset.blocks() {
block.persist(&chain_store.db)?;
}
tipset.persist(&chain_store.db)?;

Ok(tipset)
}
Expand All @@ -458,15 +455,15 @@ async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
network: SyncNetworkContext<DB>,
chain_store: Arc<ChainStore<DB>>,
peer_id: Option<PeerId>,
tipset_keys: TipsetKey,
tipset_keys: &TipsetKey,
) -> anyhow::Result<Vec<FullTipset>> {
// Attempt to load from the store
if let Ok(full_tipset) = load_full_tipset(&chain_store, tipset_keys.clone()) {
if let Ok(full_tipset) = load_full_tipset(&chain_store, tipset_keys) {
return Ok(vec![full_tipset]);
}
// Load from the network
let tipsets = network
.chain_exchange_full_tipsets(peer_id, &tipset_keys.clone())
.chain_exchange_full_tipsets(peer_id, tipset_keys)
.await
.map_err(|e| anyhow::anyhow!(e))?;

Expand All @@ -479,13 +476,12 @@ async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
Ok(tipsets)
}

fn load_full_tipset<DB: Blockstore>(
pub fn load_full_tipset<DB: Blockstore>(
chain_store: &ChainStore<DB>,
tipset_keys: TipsetKey,
tipset_keys: &TipsetKey,
) -> anyhow::Result<FullTipset> {
// Retrieve tipset from store based on passed in TipsetKey
let ts = chain_store.chain_index.load_required_tipset(&tipset_keys)?;

let ts = chain_store.chain_index.load_required_tipset(tipset_keys)?;
let blocks: Vec<_> = ts
.block_headers()
.iter()
Expand All @@ -499,7 +495,6 @@ fn load_full_tipset<DB: Blockstore>(
})
})
.try_collect()?;

// Construct FullTipset
let fts = FullTipset::new(blocks)?;
Ok(fts)
Expand Down Expand Up @@ -602,7 +597,7 @@ impl<DB: Blockstore> SyncStateMachine<DB> {
if self.stateless_mode || tipset.key() == self.cs.genesis_tipset().key() {
// Skip validation in stateless mode and for genesis tipset
true
} else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents().clone()) {
} else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents()) {
let head_ts = self.cs.heaviest_tipset();
// Treat post-head-epoch tipsets as not validated to fix <https://github.com/ChainSafe/forest/issues/5677>
// basically, the follow task should always start from the current head which could be manually set
Expand Down Expand Up @@ -877,7 +872,7 @@ impl SyncTask {
}
SyncTask::FetchTipset(key, _epoch) => {
if let Ok(parents) =
get_full_tipset_batch(network.clone(), cs.clone(), None, key).await
get_full_tipset_batch(network.clone(), cs.clone(), None, &key).await
{
Some(SyncEvent::NewFullTipsets(
parents.into_iter().map(Arc::new).collect(),
Expand Down
2 changes: 1 addition & 1 deletion src/chain_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod validation;

pub use self::{
bad_block_cache::BadBlockCache,
chain_follower::ChainFollower,
chain_follower::{ChainFollower, load_full_tipset},
chain_muxer::SyncConfig,
consensus::collect_errs,
sync_status::{ForkSyncInfo, ForkSyncStage, NodeSyncStatus, SyncStatusReport},
Expand Down
35 changes: 32 additions & 3 deletions src/chain_sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,38 @@ where
.await
}

/// Send a `chain_exchange` request for messages to assemble a full tipset with a local tipset,
/// If `peer_id` is `None`, requests will be sent to a set of shuffled peers.
pub async fn chain_exchange_messages(
&self,
peer_id: Option<PeerId>,
ts: &Tipset,
) -> Result<FullTipset, String> {
let mut bundles: Vec<TipsetBundle> = self
.handle_chain_exchange_request(
peer_id,
ts.key(),
NonZeroU64::new(1).expect("Infallible"),
MESSAGES,
|_| true,
)
.await?;

if bundles.len() != 1 {
return Err(format!(
"chain exchange request returned {} tipsets, 1 expected.",
bundles.len()
));
}
let mut bundle = bundles.remove(0);
bundle.blocks = ts.block_headers().to_vec();
bundle.try_into()
}

/// Send a `chain_exchange` request for a single full tipset (includes
/// messages) If `peer_id` is `None`, requests will be sent to a set of
/// shuffled peers.
pub async fn chain_exchange_fts(
pub async fn chain_exchange_full_tipset(
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKey,
Expand All @@ -179,7 +207,7 @@ where

if fts.len() != 1 {
return Err(format!(
"Full tipset request returned {} tipsets",
"Full tipset request returned {} tipsets, 1 expected.",
fts.len()
));
}
Expand Down Expand Up @@ -212,7 +240,8 @@ where
validate: F,
) -> Result<Vec<T>, String>
where
T: TryFrom<TipsetBundle, Error = String> + Send + Sync + 'static,
T: TryFrom<TipsetBundle> + Send + Sync + 'static,
<T as TryFrom<TipsetBundle>>::Error: std::fmt::Display,
F: Fn(&Vec<T>) -> bool,
{
let request = ChainExchangeRequest {
Expand Down
3 changes: 1 addition & 2 deletions src/chain_sync/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl TipsetValidator<'_> {
// matches the mst root in the block header 2. Ensuring it has not
// previously been seen in the bad blocks cache
for block in self.0.blocks() {
self.validate_msg_root(&chainstore.db, block)?;
Self::validate_msg_root(&chainstore.db, block)?;
if let Some(bad_block_cache) = bad_block_cache
&& bad_block_cache.peek(block.cid()).is_some()
{
Expand Down Expand Up @@ -104,7 +104,6 @@ impl TipsetValidator<'_> {
}

pub fn validate_msg_root<DB: Blockstore>(
&self,
blockstore: &DB,
block: &Block,
) -> Result<(), TipsetValidationError> {
Expand Down
8 changes: 6 additions & 2 deletions src/libp2p/chain_exchange/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,19 @@ impl ChainExchangeResponse {
/// implementation.
pub fn into_result<T>(self) -> Result<Vec<T>, String>
where
T: TryFrom<TipsetBundle, Error = String>,
T: TryFrom<TipsetBundle>,
<T as TryFrom<TipsetBundle>>::Error: std::fmt::Display,
{
if self.status != ChainExchangeResponseStatus::Success
&& self.status != ChainExchangeResponseStatus::PartialResponse
{
return Err(format!("Status {:?}: {}", self.status, self.message));
}

self.chain.into_iter().map(T::try_from).collect()
self.chain
.into_iter()
.map(|i| T::try_from(i).map_err(|e| e.to_string()))
.collect()
}
}
/// Contains all BLS and SECP messages and their indexes per block
Expand Down
2 changes: 1 addition & 1 deletion src/libp2p/chain_exchange/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ where
let mut secp_msg_includes: Vec<Vec<u64>> = vec![];

for block_header in tipset.block_headers().iter() {
let (bls_cids, secp_cids) = crate::chain::read_msg_cids(db, &block_header.messages)?;
let (bls_cids, secp_cids) = crate::chain::read_msg_cids(db, block_header)?;

let mut bls_include = Vec::with_capacity(bls_cids.len());
for bls_cid in bls_cids.into_iter() {
Expand Down
3 changes: 1 addition & 2 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,7 @@ impl RpcMethod<1> for ChainGetBlockMessages {
(block_cid,): Self::Params,
) -> Result<Self::Ok, ServerError> {
let blk: CachingBlockHeader = ctx.store().get_cbor_required(&block_cid)?;
let blk_msgs = &blk.messages;
let (unsigned_cids, signed_cids) = crate::chain::read_msg_cids(ctx.store(), blk_msgs)?;
let (unsigned_cids, signed_cids) = crate::chain::read_msg_cids(ctx.store(), &blk)?;
let (bls_msg, secp_msg) =
crate::chain::block_messages_from_cids(ctx.store(), &unsigned_cids, &signed_cids)?;
let cids = unsigned_cids.into_iter().chain(signed_cids).collect();
Expand Down
4 changes: 2 additions & 2 deletions src/rpc/methods/f3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl GetPowerTable {
const BLOCKSTORE_CACHE_CAP: usize = 65536;
static BLOCKSTORE_CACHE: LazyLock<LruBlockstoreReadCache> = LazyLock::new(|| {
LruBlockstoreReadCache::new_with_default_metrics_registry(
"get_powertable_cache".into(),
"get_powertable".into(),
BLOCKSTORE_CACHE_CAP.try_into().expect("Infallible"),
)
});
Expand Down Expand Up @@ -583,7 +583,7 @@ impl RpcMethod<1> for Finalize {
);
let fts = ctx
.sync_network_context
.chain_exchange_fts(None, &tsk)
.chain_exchange_full_tipset(None, &tsk)
.await?;
for block in fts.blocks() {
block.persist(ctx.store())?;
Expand Down
Loading
Loading