diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index d17bc6771e95..abe6b760891f 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -70,7 +70,10 @@ pub struct ChainStore { heaviest_tipset_key_provider: Arc, /// Heaviest tipset cache - heaviest_tipset_cache: Arc>, + heaviest_tipset: Arc>, + + /// F3 finalized tipset cache + f3_finalized_tipset: Arc>>, /// Used as a cache for tipset `lookbacks`. chain_index: Arc>>, @@ -129,26 +132,42 @@ where genesis_block_header: CachingBlockHeader, ) -> anyhow::Result { let (publisher, _) = broadcast::channel(SINK_CAP); - let chain_index = Arc::new(ChainIndex::new(Arc::clone(&db))); let validated_blocks = Mutex::new(HashSet::default()); let head = if let Some(head_tsk) = heaviest_tipset_key_provider .heaviest_tipset_key() .context("failed to load head tipset key")? - && let Some(head) = chain_index - .load_tipset(&head_tsk) - .context("failed to load head tipset")? { - head + Tipset::load_required(&db, &head_tsk) + .with_context(|| format!("failed to load head tipset with key {head_tsk}"))? } else { Tipset::from(&genesis_block_header) }; + let heaviest_tipset = Arc::new(RwLock::new(head)); + let f3_finalized_tipset: Arc>> = Default::default(); + let chain_index = Arc::new( + ChainIndex::new(db.clone()).with_is_tipset_finalized(Box::new({ + let chain_finality = chain_config.policy.chain_finality; + let heaviest_tipset = heaviest_tipset.clone(); + let f3_finalized_tipset = f3_finalized_tipset.clone(); + move |ts| { + let finalized = f3_finalized_tipset + .read() + .as_ref() + .map(|ts| ts.epoch()) + .unwrap_or_default() + .max(heaviest_tipset.read().epoch() - chain_finality); + ts.epoch() <= finalized + } + })), + ); let cs = Self { head_changes_tx: publisher, chain_index, tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config.clone()), db, heaviest_tipset_key_provider, - heaviest_tipset_cache: Arc::new(RwLock::new(head)), + heaviest_tipset, + f3_finalized_tipset, genesis_block_header, validated_blocks, eth_mappings, @@ -159,6 +178,16 @@ where Ok(cs) } + /// Sets F3 finalized tipset + pub fn set_f3_finalized_tipset(&self, ts: Tipset) { + self.f3_finalized_tipset.write().replace(ts); + } + + /// Gets F3 finalized tipset + pub fn f3_finalized_tipset(&self) -> Option { + self.f3_finalized_tipset.read().clone() + } + /// Cache for messages in tipsets, keyed by tipset key. pub fn messages_in_tipset_cache(&self) -> &MessagesInTipsetCache { &self.messages_in_tipset_cache @@ -169,7 +198,7 @@ where head.key().save(self.blockstore())?; self.heaviest_tipset_key_provider .set_heaviest_tipset_key(head.key())?; - let old_head = std::mem::replace(&mut *self.heaviest_tipset_cache.write(), head.clone()); + let old_head = std::mem::replace(&mut *self.heaviest_tipset.write(), head.clone()); if crate::utils::broadcast::has_subscribers(&self.head_changes_tx) { let changes = match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) @@ -242,7 +271,7 @@ where /// Returns the currently tracked heaviest tipset. pub fn heaviest_tipset(&self) -> Tipset { - self.heaviest_tipset_cache.read().clone() + self.heaviest_tipset.read().clone() } /// Returns the genesis tipset. diff --git a/src/chain/store/index.rs b/src/chain/store/index.rs index c78501129bbc..1d08857834b3 100644 --- a/src/chain/store/index.rs +++ b/src/chain/store/index.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0, MIT use std::num::NonZeroUsize; -use std::sync::LazyLock; use crate::beacon::{BeaconEntry, IGNORE_DRAND}; use crate::blocks::{Tipset, TipsetKey}; @@ -19,13 +18,21 @@ const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(2880_usize); type TipsetCache = SizeTrackingLruCache; +type TipsetHeightCache = SizeTrackingLruCache; + +type IsTipsetFinalizedFn = Box bool + Send + Sync>; + /// Keeps look-back tipsets in cache at a given interval `skip_length` and can /// be used to look-back at the chain to retrieve an old tipset. pub struct ChainIndex { - /// `Arc` reference tipset cache. + /// tipset key to tipset mappings. ts_cache: TipsetCache, + /// epoch to tipset key mappings. + ts_height_cache: TipsetHeightCache, /// `Blockstore` pointer needed to load tipsets from cold storage. db: DB, + /// check whether a tipset is finalized + is_tipset_finalized: Option, } #[derive(Debug, Clone, Copy)] @@ -41,7 +48,23 @@ impl ChainIndex { pub fn new(db: DB) -> Self { let ts_cache = SizeTrackingLruCache::new_with_metrics("tipset".into(), DEFAULT_TIPSET_CACHE_SIZE); - Self { ts_cache, db } + let ts_height_cache: SizeTrackingLruCache = + SizeTrackingLruCache::new_with_metrics( + "tipset_by_height".into(), + // 20480 * 900 = 18432000 which is sufficient for mainnet + nonzero!(20480_usize), + ); + Self { + ts_cache, + ts_height_cache, + db, + is_tipset_finalized: None, + } + } + + pub fn with_is_tipset_finalized(mut self, f: IsTipsetFinalizedFn) -> Self { + self.is_tipset_finalized = Some(f); + self } pub fn db(&self) -> &DB { @@ -129,24 +152,21 @@ impl ChainIndex { ) -> Result { use crate::shim::policy::policy_constants::CHAIN_FINALITY; - static CACHE: LazyLock> = LazyLock::new(|| { - SizeTrackingLruCache::new_with_metrics( - "tipset_by_height".into(), - // 20480 * 900 = 18432000 which is sufficient for mainnet - nonzero!(20480_usize), - ) - }); - // use `CHAIN_FINALITY` as checkpoint interval + const CHECKPOINT_INTERVAL: ChainEpoch = CHAIN_FINALITY; fn next_checkpoint(epoch: ChainEpoch) -> ChainEpoch { - epoch - epoch.mod_floor(&CHAIN_FINALITY) + CHAIN_FINALITY + epoch - epoch.mod_floor(&CHECKPOINT_INTERVAL) + CHECKPOINT_INTERVAL + } + fn is_checkpoint(epoch: ChainEpoch) -> bool { + epoch.mod_floor(&CHECKPOINT_INTERVAL) == 0 } let from_epoch = from.epoch(); let mut checkpoint_from_epoch = to; while checkpoint_from_epoch < from_epoch { - if let Some(checkpoint_from_key) = CACHE.get_cloned(&checkpoint_from_epoch) + if let Some(checkpoint_from_key) = + self.ts_height_cache.get_cloned(&checkpoint_from_epoch) && let Ok(Some(checkpoint_from)) = self.load_tipset(&checkpoint_from_key) { from = checkpoint_from; @@ -165,11 +185,19 @@ impl ChainIndex { ))); } + let from_epoch = from.epoch(); + let is_finalized = |ts: &Tipset| { + if let Some(is_finalized_fn) = &self.is_tipset_finalized { + is_finalized_fn(ts) + } else { + ts.epoch() <= from_epoch - CHAIN_FINALITY + } + }; for (child, parent) in from.chain(&self.db).tuple_windows() { - // use `child.epoch() + CHAIN_FINALITY <= from_epoch` - // to ensure the cached child is finalized(not on a fork). - if child.epoch() % CHAIN_FINALITY == 0 && child.epoch() + CHAIN_FINALITY <= from_epoch { - CACHE.push(child.epoch(), child.key().clone()); + // update cache only when child is finalized. + if is_checkpoint(child.epoch()) && is_finalized(&child) { + self.ts_height_cache + .push(child.epoch(), child.key().clone()); } if to == child.epoch() { diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 8dbb4709f0a0..d1c125afebfc 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -35,12 +35,13 @@ use crate::utils; use crate::utils::misc::env::is_env_truthy; use crate::utils::{proofs_api::ensure_proof_params_downloaded, version::FOREST_VERSION_STRING}; use anyhow::{Context as _, bail}; +use backon::{ExponentialBuilder, Retryable}; use dialoguer::theme::ColorfulTheme; use futures::{Future, FutureExt}; use std::path::Path; use std::sync::Arc; use std::sync::OnceLock; -use std::time::Instant; +use std::time::{Duration, Instant}; use tokio::{ net::TcpListener, signal::{ @@ -479,6 +480,44 @@ fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) -> ); } }); + tokio::task::spawn({ + let chain_store = ctx.chain_store().clone(); + async move { + // wait 1s to let F3 RPC server start + tokio::time::sleep(Duration::from_secs(1)).await; + match (|| crate::rpc::f3::F3GetLatestCertificate::get()) + .retry(ExponentialBuilder::default()) + .await + { + Ok(f3_finalized_cert) => { + let f3_finalized_head = f3_finalized_cert.chain_head(); + match chain_store + .chain_index() + .load_required_tipset(&f3_finalized_head.key) + { + Ok(ts) => { + chain_store.set_f3_finalized_tipset(ts); + tracing::info!( + "Set F3 finalized tipset to epoch {} and key {}", + f3_finalized_head.epoch, + f3_finalized_head.key, + ); + } + Err(e) => { + tracing::error!( + "Failed to get F3 finalized tipset epoch {} and key {}: {e}", + f3_finalized_head.epoch, + f3_finalized_head.key + ); + } + } + } + Err(e) => { + tracing::error!("Failed to get F3 latest certificate: {e}"); + } + } + } + }); } Ok(()) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 9bd4e05c30b9..3e2d96f8162b 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -1116,28 +1116,16 @@ impl ChainGetTipSetV2 { pub async fn get_latest_finalized_tipset( ctx: &Ctx, ) -> anyhow::Result { - let Ok(f3_finalized_cert) = crate::rpc::f3::F3GetLatestCertificate::get().await else { + let Some(f3_finalized_head) = ctx.chain_store().f3_finalized_tipset() else { return Self::get_ec_finalized_tipset(ctx); }; - - let f3_finalized_head = f3_finalized_cert.chain_head(); let head = ctx.chain_store().heaviest_tipset(); // Latest F3 finalized tipset is older than EC finality, falling back to EC finality - if head.epoch() > f3_finalized_head.epoch + ctx.chain_config().policy.chain_finality { - return Self::get_ec_finalized_tipset(ctx); + if head.epoch() > f3_finalized_head.epoch() + ctx.chain_config().policy.chain_finality { + Self::get_ec_finalized_tipset(ctx) + } else { + Ok(f3_finalized_head) } - - let ts = ctx - .chain_index() - .load_required_tipset(&f3_finalized_head.key) - .map_err(|e| { - anyhow::anyhow!( - "Failed to load F3 finalized tipset at epoch {} with key {}: {e}", - f3_finalized_head.epoch, - f3_finalized_head.key, - ) - })?; - Ok(ts) } pub fn get_ec_finalized_tipset(ctx: &Ctx) -> anyhow::Result { diff --git a/src/rpc/methods/f3.rs b/src/rpc/methods/f3.rs index b3bc2fd47728..2b330f539bed 100644 --- a/src/rpc/methods/f3.rs +++ b/src/rpc/methods/f3.rs @@ -574,8 +574,10 @@ impl RpcMethod<1> for Finalize { )?; let ts = Arc::new(Tipset::from(fts)); ctx.chain_store().put_tipset(&ts)?; - ctx.chain_store().set_heaviest_tipset(finalized_ts)?; + ctx.chain_store() + .set_heaviest_tipset(finalized_ts.clone())?; } + ctx.chain_store().set_f3_finalized_tipset(finalized_ts); } Ok(()) }