diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index fb18955ed0f5..84086620907b 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -6,8 +6,6 @@ use super::{ index::{ChainIndex, ResolveNullTipset}, tipset_tracker::TipsetTracker, }; -use crate::db::{EthMappingsStore, EthMappingsStoreExt}; -use crate::interpreter::{BlockMessages, VMTrace}; use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage}; use crate::networks::{ChainConfig, Height}; @@ -23,7 +21,15 @@ use crate::{ blocks::{CachingBlockHeader, Tipset, TipsetKey, TxMeta}, db::HeaviestTipsetKeyProvider, }; +use crate::{ + db::{EthMappingsStore, EthMappingsStoreExt}, + rpc::chain::PathChange, +}; use crate::{fil_cns, utils::cache::SizeTrackingLruCache}; +use crate::{ + interpreter::{BlockMessages, VMTrace}, + rpc::chain::PathChanges, +}; use ahash::{HashMap, HashMapExt, HashSet}; use anyhow::Context as _; use cid::Cid; @@ -35,8 +41,8 @@ use nonzero_ext::nonzero; use parking_lot::{Mutex, RwLock}; use serde::{Serialize, de::DeserializeOwned}; use std::{num::NonZeroUsize, sync::Arc}; -use tokio::sync::broadcast::{self, Sender as Publisher}; -use tracing::{debug, trace, warn}; +use tokio::sync::broadcast; +use tracing::{debug, error, trace, warn}; // A cap on the size of the future_sink const SINK_CAP: usize = 200; @@ -47,17 +53,16 @@ pub type ChainEpochDelta = ChainEpoch; /// `Enum` for `pubsub` channel that defines message type variant and data /// contained in message type. -#[derive(Clone, Debug)] -pub enum HeadChange { - Apply(Tipset), -} +pub type HeadChange = PathChange; + +pub type HeadChanges = PathChanges; /// Stores chain data such as heaviest tipset and cached tipset info at each /// epoch. This structure is thread-safe, and all caches are wrapped in a mutex /// to allow a consistent `ChainStore` to be shared across tasks. pub struct ChainStore { /// Publisher for head change events - publisher: Publisher, + head_changes_tx: broadcast::Sender, /// key-value `datastore`. db: Arc, @@ -66,7 +71,7 @@ pub struct ChainStore { heaviest_tipset_key_provider: Arc, /// Heaviest tipset cache - heaviest_tipset_cache: Arc>>, + heaviest_tipset_cache: Arc>, /// Used as a cache for tipset `lookbacks`. chain_index: Arc>>, @@ -124,14 +129,24 @@ where let (publisher, _) = broadcast::channel(SINK_CAP); let chain_index = Arc::new(ChainIndex::new(Arc::clone(&db))); let validated_blocks = Mutex::new(HashSet::default()); - + let head = if let Some(head_tsk) = heaviest_tipset_key_provider + .heaviest_tipset_key() + .context("failed to load head tipset key")? + && let Some(head) = chain_index + .load_tipset(&head_tsk) + .context("failed to load head tipset")? + { + head + } else { + Tipset::from(&genesis_block_header) + }; let cs = Self { - publisher, + head_changes_tx: publisher, chain_index, tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config.clone()), db, heaviest_tipset_key_provider, - heaviest_tipset_cache: Default::default(), + heaviest_tipset_cache: Arc::new(RwLock::new(head)), genesis_block_header, validated_blocks, eth_mappings, @@ -142,14 +157,31 @@ where } /// Sets heaviest tipset - pub fn set_heaviest_tipset(&self, ts: Tipset) -> Result<(), Error> { + pub fn set_heaviest_tipset(&self, head: Tipset) -> Result<(), Error> { + head.key().save(self.blockstore())?; self.heaviest_tipset_key_provider - .set_heaviest_tipset_key(ts.key())?; - *self.heaviest_tipset_cache.write() = Some(ts.clone()); - ts.key().save(self.blockstore())?; - if self.publisher.send(HeadChange::Apply(ts)).is_err() { - debug!("did not publish head change, no active receivers"); + .set_heaviest_tipset_key(head.key())?; + let old_head = std::mem::replace(&mut *self.heaviest_tipset_cache.write(), head.clone()); + + let changes = match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) { + Ok(changes) => changes, + Err(e) => { + // Do not warn when the old head is genesis + if old_head.epoch() > 0 { + error!("failed to get chain path changes: {e}"); + } + // Fallback to single apply + PathChanges { + applies: vec![head], + reverts: vec![], + } + } + }; + + if self.head_changes_tx.send(changes).is_err() { + debug!("did not publish changes, no active receivers"); } + Ok(()) } @@ -200,16 +232,7 @@ where /// Returns the currently tracked heaviest tipset. pub fn heaviest_tipset(&self) -> Tipset { - if let Some(ts) = &*self.heaviest_tipset_cache.read() { - return ts.clone(); - } - let tsk = self - .heaviest_tipset_key_provider - .heaviest_tipset_key() - .unwrap_or_else(|_| TipsetKey::from(nunny::vec![*self.genesis_block_header.cid()])); - self.chain_index - .load_required_tipset(&tsk) - .expect("failed to load heaviest tipset") + self.heaviest_tipset_cache.read().clone() } /// Returns the genesis tipset. @@ -217,9 +240,9 @@ where Tipset::from(self.genesis_block_header()) } - /// Returns a reference to the publisher of head changes. - pub fn publisher(&self) -> &Publisher { - &self.publisher + /// Subscribes head changes. + pub fn subscribe_head_changes(&self) -> broadcast::Receiver { + self.head_changes_tx.subscribe() } /// Returns key-value store instance. diff --git a/src/chain_sync/chain_follower.rs b/src/chain_sync/chain_follower.rs index 4764046293e1..ee52cdcb677c 100644 --- a/src/chain_sync/chain_follower.rs +++ b/src/chain_sync/chain_follower.rs @@ -27,7 +27,7 @@ use crate::{ tipset_syncer::{TipsetSyncerError, validate_tipset}, }, libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest}, - message_pool::{MessagePool, MpoolRpcProvider}, + message_pool::MessagePool, networks::calculate_expected_epoch, shim::clock::ChainEpoch, state_manager::StateManager, @@ -78,7 +78,7 @@ pub struct ChainFollower { stateless_mode: bool, /// Message pool - mem_pool: Arc>>, + mem_pool: Arc>>>, } impl ChainFollower { @@ -88,7 +88,7 @@ impl ChainFollower { genesis: Tipset, net_handler: flume::Receiver, stateless_mode: bool, - mem_pool: Arc>>, + mem_pool: Arc>>>, ) -> Self { let (tipset_sender, tipset_receiver) = flume::bounded(20); let disable_bad_block_cache = is_env_truthy("FOREST_DISABLE_BAD_BLOCK_CACHE"); @@ -135,7 +135,7 @@ pub async fn chain_follower( network_rx: flume::Receiver, tipset_receiver: flume::Receiver, network: SyncNetworkContext, - mem_pool: Arc>>, + mem_pool: Arc>>>, sync_status: SyncStatus, genesis: Tipset, stateless_mode: bool, diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 765519caba4b..91c0a974010e 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -7,7 +7,7 @@ pub mod db_util; pub mod main; use crate::blocks::Tipset; -use crate::chain::HeadChange; +use crate::chain::ChainStore; use crate::chain::index::ResolveNullTipset; use crate::chain_sync::network_context::SyncNetworkContext; use crate::chain_sync::{ChainFollower, SyncStatus}; @@ -23,7 +23,7 @@ use crate::daemon::{ use crate::db::gc::SnapshotGarbageCollector; use crate::db::ttl::EthMappingCollector; use crate::libp2p::{Libp2pService, PeerManager}; -use crate::message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider}; +use crate::message_pool::{MessagePool, MpoolConfig}; use crate::networks::{self, ChainConfig}; use crate::rpc::RPCState; use crate::rpc::eth::filter::EthEventHandler; @@ -293,11 +293,9 @@ fn create_mpool( services: &mut JoinSet>, p2p_service: &Libp2pService, ctx: &AppContext, -) -> anyhow::Result>>> { - let publisher = ctx.state_manager.chain_store().publisher(); - let provider = MpoolRpcProvider::new(publisher.clone(), ctx.state_manager.clone()); +) -> anyhow::Result>>>> { Ok(MessagePool::new( - provider, + ctx.state_manager.chain_store().clone(), p2p_service.network_sender().clone(), MpoolConfig::load_config(ctx.db.writer().as_ref())?, ctx.state_manager.chain_config().clone(), @@ -309,7 +307,7 @@ fn create_mpool( fn create_chain_follower( opts: &CliOpts, p2p_service: &Libp2pService, - mpool: Arc>>, + mpool: Arc>>>, ctx: &AppContext, ) -> anyhow::Result> { let network_send = p2p_service.network_sender().clone(); @@ -370,7 +368,7 @@ async fn maybe_start_health_check_service( fn maybe_start_rpc_service( services: &mut JoinSet>, config: &Config, - mpool: Arc>>, + mpool: Arc>>>, chain_follower: &ChainFollower, start_time: chrono::DateTime, shutdown: mpsc::Sender<()>, @@ -498,21 +496,19 @@ fn maybe_start_indexer_service( && !opts.stateless && !ctx.state_manager.chain_config().is_devnet() { - let mut receiver = ctx.state_manager.chain_store().publisher().subscribe(); + let mut head_changes_rx = ctx.state_manager.chain_store().subscribe_head_changes(); let chain_store = ctx.state_manager.chain_store().clone(); services.spawn(async move { tracing::info!("Starting indexer service"); // Continuously listen for head changes loop { - let HeadChange::Apply(ts) = receiver.recv().await?; - - tracing::debug!("Indexing tipset {}", ts.key()); - - let delegated_messages = - chain_store.headers_delegated_messages(ts.block_headers().iter())?; - - chain_store.process_signed_messages(&delegated_messages)?; + for ts in head_changes_rx.recv().await?.applies { + tracing::debug!("Indexing tipset {}", ts.key()); + let delegated_messages = + chain_store.headers_delegated_messages(ts.block_headers().iter())?; + chain_store.process_signed_messages(&delegated_messages)?; + } } }); diff --git a/src/db/car/many.rs b/src/db/car/many.rs index 6b53c3ba6747..28184d03bebe 100644 --- a/src/db/car/many.rs +++ b/src/db/car/many.rs @@ -130,12 +130,12 @@ impl ManyCar { Ok(()) } - pub fn heaviest_tipset_key(&self) -> anyhow::Result { - self.read_only + pub fn heaviest_tipset_key(&self) -> anyhow::Result> { + Ok(self + .read_only .read() .peek() - .map(|w| AnyCar::heaviest_tipset_key(&w.car)) - .context("ManyCar store doesn't have a heaviest tipset key") + .map(|w| AnyCar::heaviest_tipset_key(&w.car))) } pub fn heaviest_tipset(&self) -> anyhow::Result { @@ -252,9 +252,9 @@ impl EthMappingsStore for ManyCar { } impl super::super::HeaviestTipsetKeyProvider for ManyCar { - fn heaviest_tipset_key(&self) -> anyhow::Result { + fn heaviest_tipset_key(&self) -> anyhow::Result> { match SettingsStoreExt::read_obj::(self, crate::db::setting_keys::HEAD_KEY)? { - Some(tsk) => Ok(tsk), + Some(tsk) => Ok(Some(tsk)), None => self.heaviest_tipset_key(), } } diff --git a/src/db/gc/snapshot.rs b/src/db/gc/snapshot.rs index 6417214988dc..60c53e266b63 100644 --- a/src/db/gc/snapshot.rs +++ b/src/db/gc/snapshot.rs @@ -282,7 +282,7 @@ where tracing::warn!("{e}"); } - *self.memory_db_head_key.write() = db.heaviest_tipset_key().ok(); + *self.memory_db_head_key.write() = db.heaviest_tipset_key()?; db.unsubscribe_write_ops(); match joinset.join_next().await { Some(Ok(map)) => { diff --git a/src/db/memory.rs b/src/db/memory.rs index 5419d06f2ff7..42a4fa70f2cb 100644 --- a/src/db/memory.rs +++ b/src/db/memory.rs @@ -154,9 +154,8 @@ impl BitswapStoreReadWrite for MemoryDB { } impl super::HeaviestTipsetKeyProvider for MemoryDB { - fn heaviest_tipset_key(&self) -> anyhow::Result { - SettingsStoreExt::read_obj::(self, crate::db::setting_keys::HEAD_KEY)? - .context("head key not found") + fn heaviest_tipset_key(&self) -> anyhow::Result> { + SettingsStoreExt::read_obj::(self, crate::db::setting_keys::HEAD_KEY) } fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> { diff --git a/src/db/mod.rs b/src/db/mod.rs index f833be73ca1c..307ab244190a 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -188,7 +188,7 @@ impl PersistentStore for MemoryBlockstore { #[auto_impl::auto_impl(&, Arc)] pub trait HeaviestTipsetKeyProvider { /// Returns the currently tracked heaviest tipset. - fn heaviest_tipset_key(&self) -> anyhow::Result; + fn heaviest_tipset_key(&self) -> anyhow::Result>; /// Sets heaviest tipset. fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()>; diff --git a/src/db/parity_db.rs b/src/db/parity_db.rs index 8c6970c1ef10..1d80e78482a4 100644 --- a/src/db/parity_db.rs +++ b/src/db/parity_db.rs @@ -173,9 +173,8 @@ impl SettingsStore for ParityDb { } impl super::HeaviestTipsetKeyProvider for ParityDb { - fn heaviest_tipset_key(&self) -> anyhow::Result { - super::SettingsStoreExt::read_obj::(self, super::setting_keys::HEAD_KEY)? - .context("head key not found") + fn heaviest_tipset_key(&self) -> anyhow::Result> { + super::SettingsStoreExt::read_obj::(self, super::setting_keys::HEAD_KEY) } fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> { diff --git a/src/message_pool/mod.rs b/src/message_pool/mod.rs index abcbae2807ec..e30066e2449e 100644 --- a/src/message_pool/mod.rs +++ b/src/message_pool/mod.rs @@ -9,7 +9,7 @@ mod msgpool; pub use self::{ config::*, errors::*, - msgpool::{msg_pool::MessagePool, provider::MpoolRpcProvider, *}, + msgpool::{msg_pool::MessagePool, *}, }; pub use block_prob::block_probabilities; diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index 720d39c36154..6fe48064d8bb 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -226,15 +226,25 @@ where let mut repub = false; let mut rmsgs: HashMap> = HashMap::new(); for ts in revert { - let pts = api.load_tipset(ts.parents())?; + let Ok(pts) = api.load_tipset(ts.parents()) else { + tracing::error!("error loading reverted tipset parent"); + continue; + }; *cur_tipset.write() = pts; let mut msgs: Vec = Vec::new(); for block in ts.block_headers() { - let (umsg, smsgs) = api.messages_for_block(block)?; + let Ok((umsg, smsgs)) = api.messages_for_block(block) else { + tracing::error!("error retrieving messages for reverted block"); + continue; + }; msgs.extend(smsgs); for msg in umsg { - let smsg = recover_sig(bls_sig_cache, msg)?; + let msg_cid = msg.cid(); + let Ok(smsg) = recover_sig(bls_sig_cache, msg) else { + tracing::debug!("could not recover signature for bls message {}", msg_cid); + continue; + }; msgs.push(smsg) } } @@ -246,7 +256,10 @@ where for ts in apply { for b in ts.block_headers() { - let (msgs, smsgs) = api.messages_for_block(b)?; + let Ok((msgs, smsgs)) = api.messages_for_block(b) else { + tracing::error!("error retrieving messages for block"); + continue; + }; for msg in smsgs { remove_from_selected_msgs( diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 6333df24debd..427cd989705d 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -9,7 +9,7 @@ use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use crate::blocks::{CachingBlockHeader, Tipset}; -use crate::chain::{HeadChange, MINIMUM_BASE_FEE}; +use crate::chain::{HeadChanges, MINIMUM_BASE_FEE}; #[cfg(test)] use crate::db::SettingsStore; use crate::eth::is_valid_eth_tx_for_sending; @@ -544,41 +544,38 @@ where mp.load_local()?; - let mut subscriber = mp.api.subscribe_head_changes(); + let mut head_changes_rx = mp.api.subscribe_head_changes(); let api = mp.api.clone(); let bls_sig_cache = mp.bls_sig_cache.clone(); let pending = mp.pending.clone(); let republished = mp.republished.clone(); - let cur_tipset = mp.cur_tipset.clone(); + let current_ts = mp.cur_tipset.clone(); let repub_trigger = mp.repub_trigger.clone(); // Reacts to new HeadChanges services.spawn(async move { loop { - match subscriber.recv().await { - Ok(ts) => { - let (cur, rev, app) = match ts { - HeadChange::Apply(tipset) => { - (cur_tipset.clone(), Vec::new(), vec![tipset]) - } - }; - head_change( + match head_changes_rx.recv().await { + Ok(HeadChanges { reverts, applies }) => { + if let Err(e) = head_change( api.as_ref(), bls_sig_cache.as_ref(), repub_trigger.clone(), republished.as_ref(), pending.as_ref(), - cur.as_ref(), - rev, - app, + ¤t_ts, + reverts, + applies, ) .await - .context("Error changing head")?; + { + tracing::warn!("Error changing head: {e}"); + } } Err(RecvError::Lagged(e)) => { - warn!("Head change subscriber lagged: skipping {} events", e); + warn!("Head change subscriber lagged: skipping {e} events"); } Err(RecvError::Closed) => { break Ok(()); diff --git a/src/message_pool/msgpool/provider.rs b/src/message_pool/msgpool/provider.rs index 0f829bd81024..12ea83c4ec76 100644 --- a/src/message_pool/msgpool/provider.rs +++ b/src/message_pool/msgpool/provider.rs @@ -1,11 +1,10 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::sync::Arc; - use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey}; -use crate::chain::HeadChange; +use crate::chain::{ChainStore, HeadChanges}; use crate::message::{ChainMessage, SignedMessage}; +use crate::message_pool::errors::Error; use crate::message_pool::msg_pool::{ MAX_ACTOR_PENDING_MESSAGES, MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES, }; @@ -16,20 +15,19 @@ use crate::shim::{ message::Message, state_tree::{ActorState, StateTree}, }; -use crate::state_manager::StateManager; use crate::utils::db::CborStoreExt; +use auto_impl::auto_impl; use cid::Cid; use fvm_ipld_blockstore::Blockstore; -use tokio::sync::broadcast::{Receiver as Subscriber, Sender as Publisher}; - -use crate::message_pool::errors::Error; +use tokio::sync::broadcast; /// Provider Trait. This trait will be used by the message pool to interact with /// some medium in order to do the operations that are listed below that are /// required for the message pool. +#[auto_impl(Arc)] pub trait Provider { /// Update `Mpool`'s `cur_tipset` whenever there is a change to the provider - fn subscribe_head_changes(&self) -> Subscriber; + fn subscribe_head_changes(&self) -> broadcast::Receiver; /// Get the heaviest Tipset in the provider fn get_heaviest_tipset(&self) -> Tipset; /// Add a message to the `MpoolProvider`, return either Cid or Error @@ -58,29 +56,17 @@ pub trait Provider { } } -/// This is the default Provider implementation that will be used for the -/// `mpool` RPC. -#[derive(derive_more::Constructor)] -pub struct MpoolRpcProvider { - subscriber: Publisher, - sm: Arc>, -} - -impl Provider for MpoolRpcProvider -where - DB: Blockstore + Sync + Send + 'static, -{ - fn subscribe_head_changes(&self) -> Subscriber { - self.subscriber.subscribe() +impl Provider for ChainStore { + fn subscribe_head_changes(&self) -> broadcast::Receiver { + self.subscribe_head_changes() } fn get_heaviest_tipset(&self) -> Tipset { - self.sm.chain_store().heaviest_tipset() + self.heaviest_tipset() } fn put_message(&self, msg: &ChainMessage) -> Result { let cid = self - .sm .blockstore() .put_cbor_default(msg) .map_err(|err| Error::Other(err.to_string()))?; @@ -88,7 +74,7 @@ where } fn get_actor_after(&self, addr: &Address, ts: &Tipset) -> Result { - let state = StateTree::new_from_root(self.sm.blockstore_owned(), ts.parent_state()) + let state = StateTree::new_from_root(self.blockstore().clone(), ts.parent_state()) .map_err(|e| Error::Other(e.to_string()))?; Ok(state.get_required_actor(addr)?) } @@ -97,21 +83,17 @@ where &self, h: &CachingBlockHeader, ) -> Result<(Vec, Vec), Error> { - crate::chain::block_messages(self.sm.blockstore(), h).map_err(|err| err.into()) + crate::chain::block_messages(self.blockstore(), h).map_err(|err| err.into()) } fn load_tipset(&self, tsk: &TipsetKey) -> Result { - Ok(self - .sm - .chain_store() - .chain_index() - .load_required_tipset(tsk)?) + Ok(self.chain_index().load_required_tipset(tsk)?) } fn chain_compute_base_fee(&self, ts: &Tipset) -> Result { - let smoke_height = self.sm.chain_config().epoch(Height::Smoke); - let xxx_height = self.sm.chain_config().epoch(Height::Xxx); - crate::chain::compute_base_fee(self.sm.blockstore(), ts, smoke_height, xxx_height) + let smoke_height = self.chain_config().epoch(Height::Smoke); + let xxx_height = self.chain_config().epoch(Height::Xxx); + crate::chain::compute_base_fee(self.blockstore(), ts, smoke_height, xxx_height) .map_err(|err| err.into()) } } diff --git a/src/message_pool/msgpool/test_provider.rs b/src/message_pool/msgpool/test_provider.rs index f7c8a332a5c2..5dd08dff2a1f 100644 --- a/src/message_pool/msgpool/test_provider.rs +++ b/src/message_pool/msgpool/test_provider.rs @@ -8,7 +8,7 @@ use std::convert::TryFrom; use crate::blocks::{ CachingBlockHeader, ElectionProof, RawBlockHeader, Ticket, Tipset, TipsetKey, VRFProof, }; -use crate::chain::HeadChange; +use crate::chain::HeadChanges; use crate::cid_collections::CidHashMap; use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage}; use crate::message_pool::{Error, provider::Provider}; @@ -18,13 +18,12 @@ use cid::Cid; use num::BigInt; use parking_lot::Mutex; use tokio::sync::broadcast; -use tokio::sync::broadcast::{Receiver as Subscriber, Sender as Publisher}; /// Structure used for creating a provider when writing tests involving message /// pool pub struct TestApi { pub inner: Mutex, - pub publisher: Publisher, + pub head_changes_tx: broadcast::Sender, } #[derive(Default)] @@ -39,13 +38,13 @@ pub struct TestApiInner { impl Default for TestApi { /// Create a new `TestApi` fn default() -> Self { - let (publisher, _) = broadcast::channel(1); + let (head_changes_tx, _) = broadcast::channel(1); TestApi { inner: Mutex::new(TestApiInner { max_actor_pending_messages: 20000, ..TestApiInner::default() }), - publisher, + head_changes_tx, } } } @@ -59,7 +58,7 @@ impl TestApi { max_actor_pending_messages, ..TestApiInner::default() }), - publisher, + head_changes_tx: publisher, } } @@ -80,7 +79,12 @@ impl TestApi { /// Set the heaviest tipset for `TestApi` pub fn set_heaviest_tipset(&self, ts: Tipset) { - self.publisher.send(HeadChange::Apply(ts)).unwrap(); + self.head_changes_tx + .send(HeadChanges { + applies: vec![ts], + reverts: vec![], + }) + .unwrap(); } pub fn next_block(&self) -> CachingBlockHeader { @@ -117,8 +121,8 @@ impl TestApiInner { } impl Provider for TestApi { - fn subscribe_head_changes(&self) -> Subscriber { - self.publisher.subscribe() + fn subscribe_head_changes(&self) -> broadcast::Receiver { + self.head_changes_tx.subscribe() } fn get_heaviest_tipset(&self) -> Tipset { diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index bb76787fd011..35b9dfa1eac9 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -84,26 +84,24 @@ pub(crate) fn new_heads( ) -> (Subscriber, JoinHandle<()>) { let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); - let mut subscriber = data.chain_store().publisher().subscribe(); + let mut head_changes_rx = data.chain_store().subscribe_head_changes(); let handle = tokio::spawn(async move { - while let Ok(v) = subscriber.recv().await { - let headers = match v { - HeadChange::Apply(ts) => { - // Convert the tipset to an Ethereum block with full transaction info - // Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block - match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await { - Ok(block) => ApiHeaders(block), - Err(e) => { - tracing::error!("Failed to convert tipset to eth block: {}", e); - continue; + while let Ok(changes) = head_changes_rx.recv().await { + for ts in changes.applies { + // Convert the tipset to an Ethereum block with full transaction info + // Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block + match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await { + Ok(block) => { + if let Err(e) = sender.send(ApiHeaders(block)) { + tracing::error!("Failed to send headers: {}", e); + return; } } + Err(e) => { + tracing::error!("Failed to convert tipset to eth block: {}", e); + } } - }; - if let Err(e) = sender.send(headers) { - tracing::error!("Failed to send headers: {}", e); - break; } } }); @@ -123,31 +121,25 @@ pub(crate) fn logs( ) -> (Subscriber>, JoinHandle<()>) { let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); - let mut subscriber = ctx.chain_store().publisher().subscribe(); + let mut head_changes_rx = ctx.chain_store().subscribe_head_changes(); let ctx = ctx.clone(); let handle = tokio::spawn(async move { - while let Ok(v) = subscriber.recv().await { - match v { - HeadChange::Apply(ts) => { - match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await { - Ok(logs) => { - if !logs.is_empty() - && let Err(e) = sender.send(logs) - { - tracing::error!( - "Failed to send logs for tipset {}: {}", - ts.key(), - e - ); - break; - } - } - Err(e) => { - tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e); + while let Ok(changes) = head_changes_rx.recv().await { + for ts in changes.applies { + match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await { + Ok(logs) => { + if !logs.is_empty() + && let Err(e) = sender.send(logs) + { + tracing::error!("Failed to send logs for tipset {}: {}", ts.key(), e); + break; } } + Err(e) => { + tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e); + } } } } @@ -850,11 +842,11 @@ impl RpcMethod<2> for ChainGetPath { (from, to): Self::Params, _: &http::Extensions, ) -> Result { - impl_chain_get_path(ctx.chain_store(), &from, &to).map_err(Into::into) + Ok(chain_get_path(ctx.chain_store(), &from, &to)?.into_change_vec()) } } -/// Find the path between two tipsets, as a series of [`PathChange`]s. +/// Find the path between two tipsets, as [`PathChanges`]. /// /// ```text /// 0 - A - B - C - D @@ -871,11 +863,12 @@ impl RpcMethod<2> for ChainGetPath { /// ``` /// /// Exposes errors from the [`Blockstore`], and returns an error if there is no common ancestor. -fn impl_chain_get_path( +pub fn chain_get_path( chain_store: &ChainStore, from: &TipsetKey, to: &TipsetKey, -) -> anyhow::Result> { +) -> anyhow::Result { + let finality = chain_store.chain_config().policy.chain_finality; let mut to_revert = chain_store .load_required_tipset_or_heaviest(from) .context("couldn't load `from`")?; @@ -883,8 +876,15 @@ fn impl_chain_get_path( .load_required_tipset_or_heaviest(to) .context("couldn't load `to`")?; - let mut all_reverts = vec![]; - let mut all_applies = vec![]; + anyhow::ensure!( + (to_apply.epoch() - to_revert.epoch()).abs() <= finality, + "the gap between the new head ({}) and the old head ({}) is larger than chain finality ({finality})", + to_apply.epoch(), + to_revert.epoch() + ); + + let mut reverts = vec![]; + let mut applies = vec![]; // This loop is guaranteed to terminate if the blockstore contain no cycles. // This is currently computationally infeasible. @@ -893,21 +893,18 @@ fn impl_chain_get_path( let next = chain_store .load_required_tipset_or_heaviest(to_revert.parents()) .context("couldn't load ancestor of `from`")?; - all_reverts.push(to_revert); + reverts.push(to_revert); to_revert = next; } else { let next = chain_store .load_required_tipset_or_heaviest(to_apply.parents()) .context("couldn't load ancestor of `to`")?; - all_applies.push(to_apply); + applies.push(to_apply); to_apply = next; } } - Ok(all_reverts - .into_iter() - .map(PathChange::Revert) - .chain(all_applies.into_iter().rev().map(PathChange::Apply)) - .collect()) + applies.reverse(); + Ok(PathChanges { reverts, applies }) } /// Get tipset at epoch. Pick younger tipset if epoch points to a @@ -1323,18 +1320,18 @@ pub(crate) fn chain_notify( .send(vec![ApiHeadChange { change, tipset }]) .expect("receiver is not dropped"); - let mut subscriber = data.chain_store().publisher().subscribe(); + let mut head_changes_rx = data.chain_store().subscribe_head_changes(); tokio::spawn(async move { // Skip first message - let _ = subscriber.recv().await; - - while let Ok(v) = subscriber.recv().await { - let (change, tipset) = match v { - HeadChange::Apply(ts) => ("apply".into(), ts), - }; - - if sender.send(vec![ApiHeadChange { change, tipset }]).is_err() { + let _ = head_changes_rx.recv().await; + while let Ok(changes) = head_changes_rx.recv().await { + let api_changes = changes + .into_change_vec() + .into_iter() + .map(From::from) + .collect(); + if sender.send(api_changes).is_err() { break; } } @@ -1506,6 +1503,21 @@ pub struct ApiHeadChange { } lotus_json_with_self!(ApiHeadChange); +impl From for ApiHeadChange { + fn from(change: HeadChange) -> Self { + match change { + HeadChange::Apply(tipset) => Self { + change: "apply".into(), + tipset, + }, + HeadChange::Revert(tipset) => Self { + change: "revert".into(), + tipset, + }, + } + } +} + #[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(tag = "Type", content = "Val", rename_all = "snake_case")] pub enum PathChange { @@ -1563,6 +1575,33 @@ impl HasLotusJson for PathChange { } } +#[derive(Debug)] +pub struct PathChanges { + pub reverts: Vec, + pub applies: Vec, +} + +impl Clone for PathChanges { + fn clone(&self) -> Self { + let Self { reverts, applies } = self; + Self { + reverts: reverts.clone(), + applies: applies.clone(), + } + } +} + +impl PathChanges { + pub fn into_change_vec(self) -> Vec> { + let Self { reverts, applies } = self; + reverts + .into_iter() + .map(PathChange::Revert) + .chain(applies.into_iter().map(PathChange::Apply)) + .collect() + } +} + #[cfg(test)] impl quickcheck::Arbitrary for PathChange where @@ -1790,8 +1829,9 @@ mod tests { ) } - let actual = - impl_chain_get_path(store, from.make_tipset().key(), to.make_tipset().key()).unwrap(); + let actual = chain_get_path(store, from.make_tipset().key(), to.make_tipset().key()) + .unwrap() + .into_change_vec(); let expected = expected .into_iter() .map(|change| match change { diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 4e7da8d7be18..070614ff78a4 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -173,7 +173,7 @@ mod tests { use crate::db::MemoryDB; use crate::key_management::{KeyStore, KeyStoreConfig}; use crate::libp2p::{NetworkMessage, PeerManager}; - use crate::message_pool::{MessagePool, MpoolRpcProvider}; + use crate::message_pool::MessagePool; use crate::networks::ChainConfig; use crate::rpc::RPCState; use crate::rpc::eth::filter::EthEventHandler; @@ -202,7 +202,6 @@ mod tests { ); let state_manager = Arc::new(StateManager::new(cs_arc.clone()).unwrap()); - let state_manager_for_thread = state_manager.clone(); let cs_for_test = &cs_arc; let mpool_network_send = network_send.clone(); let pool = { @@ -218,13 +217,11 @@ mod tests { db.put_keyed(&i, &bz2).unwrap(); } - let provider = - MpoolRpcProvider::new(cs_arc.publisher().clone(), state_manager_for_thread.clone()); MessagePool::new( - provider, + cs_arc, mpool_network_send, Default::default(), - state_manager_for_thread.chain_config().clone(), + state_manager.chain_config().clone(), &mut services, ) .unwrap() diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index d759728fbbd5..f7cc4bef31be 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -471,7 +471,7 @@ const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE; pub struct RPCState { pub keystore: Arc>, pub state_manager: Arc>, - pub mpool: Arc>>, + pub mpool: Arc>>>, pub bad_blocks: Option>, pub msgs_in_tipset: Arc, pub sync_status: crate::chain_sync::SyncStatus, diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index c042351222c5..2293b5b0ddf6 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -16,7 +16,7 @@ use self::utils::structured; use crate::beacon::{BeaconEntry, BeaconSchedule}; use crate::blocks::{Tipset, TipsetKey}; use crate::chain::{ - ChainStore, HeadChange, + ChainStore, index::{ChainIndex, ResolveNullTipset}, }; use crate::interpreter::{ @@ -1171,7 +1171,7 @@ where look_back_limit: Option, allow_replaced: Option, ) -> Result<(Option, Option), Error> { - let mut subscriber = self.cs.publisher().subscribe(); + let mut head_changes_rx = self.cs.subscribe_head_changes(); let (sender, mut receiver) = oneshot::channel::<()>(); let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid) .map_err(|err| Error::Other(format!("failed to load message {err:}")))?; @@ -1209,9 +1209,18 @@ where // Wait for message to be included in head change. let mut subscriber_poll = tokio::task::spawn(async move { loop { - match subscriber.recv().await { - Ok(subscriber) => match subscriber { - HeadChange::Apply(tipset) => { + match head_changes_rx.recv().await { + Ok(head_changes) => { + for tipset in head_changes.reverts { + if candidate_tipset + .as_ref() + .is_some_and(|candidate| candidate.key() == tipset.key()) + { + candidate_tipset = None; + candidate_receipt = None; + } + } + for tipset in head_changes.applies { if candidate_tipset .as_ref() .map(|s| tipset.epoch() >= s.epoch() + confidence) @@ -1237,7 +1246,7 @@ where candidate_receipt = Some(receipt) } } - }, + } Err(RecvError::Lagged(i)) => { warn!( "wait for message head change subscriber lagged, skipped {} events", diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 84b6a110d877..01b273eaa55c 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -15,7 +15,7 @@ use crate::db::{ use crate::genesis::read_genesis_header; use crate::key_management::{KeyStore, KeyStoreConfig}; use crate::libp2p::PeerManager; -use crate::message_pool::{MessagePool, MpoolRpcProvider}; +use crate::message_pool::MessagePool; use crate::networks::{ChainConfig, NetworkChain}; use crate::rpc::eth::filter::EthEventHandler; use crate::rpc::{RPCState, start_rpc}; @@ -83,7 +83,7 @@ where let (tipset_send, _) = flume::bounded(5); let message_pool = MessagePool::new( - MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), + chain_store.clone(), network_send.clone(), Default::default(), state_manager.chain_config().clone(), diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index 6cedaef505e7..e20f7e4f866f 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -17,7 +17,7 @@ use crate::{ genesis::read_genesis_header, libp2p::{NetworkMessage, PeerManager}, libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite, Block64}, - message_pool::{MessagePool, MpoolRpcProvider}, + message_pool::MessagePool, networks::ChainConfig, shim::address::CurrentNetwork, state_manager::StateManager, @@ -132,7 +132,7 @@ async fn ctx( let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap()); let message_pool = MessagePool::new( - MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), + chain_store.clone(), network_send.clone(), Default::default(), state_manager.chain_config().clone(), @@ -194,7 +194,10 @@ where SettingsStoreExt::write_obj( &self.tracker, crate::db::setting_keys::HEAD_KEY, - &self.inner.heaviest_tipset_key()?, + &self + .inner + .heaviest_tipset_key()? + .context("heaviest tipset key not found")?, )?; } @@ -223,7 +226,7 @@ where } impl HeaviestTipsetKeyProvider for ReadOpsTrackingStore { - fn heaviest_tipset_key(&self) -> anyhow::Result { + fn heaviest_tipset_key(&self) -> anyhow::Result> { self.inner.heaviest_tipset_key() } diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 86feb82e3ec8..463f95e220e0 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -12,7 +12,7 @@ use crate::{ genesis::read_genesis_header, libp2p::{NetworkMessage, PeerManager}, lotus_json::HasLotusJson, - message_pool::{MessagePool, MpoolRpcProvider}, + message_pool::MessagePool, networks::{ChainConfig, NetworkChain}, rpc::{ ApiPaths, RPCState, RpcMethod, RpcMethodExt as _, @@ -150,7 +150,7 @@ async fn ctx( )?); let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap()); let message_pool = MessagePool::new( - MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), + chain_store.clone(), network_send.clone(), Default::default(), state_manager.chain_config().clone(),