Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 55 additions & 32 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use super::{
index::{ChainIndex, ResolveNullTipset},
tipset_tracker::TipsetTracker,
};
use crate::db::{EthMappingsStore, EthMappingsStoreExt};
use crate::interpreter::{BlockMessages, VMTrace};
use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage};
use crate::networks::{ChainConfig, Height};
Expand All @@ -23,7 +21,15 @@ use crate::{
blocks::{CachingBlockHeader, Tipset, TipsetKey, TxMeta},
db::HeaviestTipsetKeyProvider,
};
use crate::{
db::{EthMappingsStore, EthMappingsStoreExt},
rpc::chain::PathChange,
};
use crate::{fil_cns, utils::cache::SizeTrackingLruCache};
use crate::{
interpreter::{BlockMessages, VMTrace},
rpc::chain::PathChanges,
};
use ahash::{HashMap, HashMapExt, HashSet};
use anyhow::Context as _;
use cid::Cid;
Expand All @@ -35,8 +41,8 @@ use nonzero_ext::nonzero;
use parking_lot::{Mutex, RwLock};
use serde::{Serialize, de::DeserializeOwned};
use std::{num::NonZeroUsize, sync::Arc};
use tokio::sync::broadcast::{self, Sender as Publisher};
use tracing::{debug, trace, warn};
use tokio::sync::broadcast;
use tracing::{debug, error, trace, warn};

// A cap on the size of the future_sink
const SINK_CAP: usize = 200;
Expand All @@ -47,17 +53,16 @@ pub type ChainEpochDelta = ChainEpoch;

/// `Enum` for `pubsub` channel that defines message type variant and data
/// contained in message type.
#[derive(Clone, Debug)]
pub enum HeadChange {
Apply(Tipset),
}
pub type HeadChange = PathChange<Tipset>;

pub type HeadChanges = PathChanges<Tipset>;

/// Stores chain data such as heaviest tipset and cached tipset info at each
/// epoch. This structure is thread-safe, and all caches are wrapped in a mutex
/// to allow a consistent `ChainStore` to be shared across tasks.
pub struct ChainStore<DB> {
/// Publisher for head change events
publisher: Publisher<HeadChange>,
head_changes_tx: broadcast::Sender<HeadChanges>,

/// key-value `datastore`.
db: Arc<DB>,
Expand All @@ -66,7 +71,7 @@ pub struct ChainStore<DB> {
heaviest_tipset_key_provider: Arc<dyn HeaviestTipsetKeyProvider + Sync + Send>,

/// Heaviest tipset cache
heaviest_tipset_cache: Arc<RwLock<Option<Tipset>>>,
heaviest_tipset_cache: Arc<RwLock<Tipset>>,

/// Used as a cache for tipset `lookbacks`.
chain_index: Arc<ChainIndex<Arc<DB>>>,
Expand Down Expand Up @@ -124,14 +129,24 @@ where
let (publisher, _) = broadcast::channel(SINK_CAP);
let chain_index = Arc::new(ChainIndex::new(Arc::clone(&db)));
let validated_blocks = Mutex::new(HashSet::default());

let head = if let Some(head_tsk) = heaviest_tipset_key_provider
.heaviest_tipset_key()
.context("failed to load head tipset key")?
&& let Some(head) = chain_index
.load_tipset(&head_tsk)
.context("failed to load head tipset")?
{
head
} else {
Tipset::from(&genesis_block_header)
};
let cs = Self {
publisher,
head_changes_tx: publisher,
chain_index,
tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config.clone()),
db,
heaviest_tipset_key_provider,
heaviest_tipset_cache: Default::default(),
heaviest_tipset_cache: Arc::new(RwLock::new(head)),
genesis_block_header,
validated_blocks,
eth_mappings,
Expand All @@ -142,14 +157,31 @@ where
}

/// Sets heaviest tipset
pub fn set_heaviest_tipset(&self, ts: Tipset) -> Result<(), Error> {
pub fn set_heaviest_tipset(&self, head: Tipset) -> Result<(), Error> {
head.key().save(self.blockstore())?;
self.heaviest_tipset_key_provider
.set_heaviest_tipset_key(ts.key())?;
*self.heaviest_tipset_cache.write() = Some(ts.clone());
ts.key().save(self.blockstore())?;
if self.publisher.send(HeadChange::Apply(ts)).is_err() {
debug!("did not publish head change, no active receivers");
.set_heaviest_tipset_key(head.key())?;
let old_head = std::mem::replace(&mut *self.heaviest_tipset_cache.write(), head.clone());

let changes = match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) {
Ok(changes) => changes,
Err(e) => {
// Do not warn when the old head is genesis
if old_head.epoch() > 0 {
error!("failed to get chain path changes: {e}");
}
// Fallback to single apply
PathChanges {
applies: vec![head],
reverts: vec![],
}
}
};

if self.head_changes_tx.send(changes).is_err() {
debug!("did not publish changes, no active receivers");
}

Ok(())
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

Expand Down Expand Up @@ -200,26 +232,17 @@ where

/// Returns the currently tracked heaviest tipset.
pub fn heaviest_tipset(&self) -> Tipset {
if let Some(ts) = &*self.heaviest_tipset_cache.read() {
return ts.clone();
}
let tsk = self
.heaviest_tipset_key_provider
.heaviest_tipset_key()
.unwrap_or_else(|_| TipsetKey::from(nunny::vec![*self.genesis_block_header.cid()]));
self.chain_index
.load_required_tipset(&tsk)
.expect("failed to load heaviest tipset")
self.heaviest_tipset_cache.read().clone()
}

/// Returns the genesis tipset.
pub fn genesis_tipset(&self) -> Tipset {
Tipset::from(self.genesis_block_header())
}

/// Returns a reference to the publisher of head changes.
pub fn publisher(&self) -> &Publisher<HeadChange> {
&self.publisher
/// Subscribes head changes.
pub fn subscribe_head_changes(&self) -> broadcast::Receiver<HeadChanges> {
self.head_changes_tx.subscribe()
}

/// Returns key-value store instance.
Expand Down
8 changes: 4 additions & 4 deletions src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
tipset_syncer::{TipsetSyncerError, validate_tipset},
},
libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
message_pool::{MessagePool, MpoolRpcProvider},
message_pool::MessagePool,
networks::calculate_expected_epoch,
shim::clock::ChainEpoch,
state_manager::StateManager,
Expand Down Expand Up @@ -78,7 +78,7 @@ pub struct ChainFollower<DB> {
stateless_mode: bool,

/// Message pool
mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
}

impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
Expand All @@ -88,7 +88,7 @@ impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
genesis: Tipset,
net_handler: flume::Receiver<NetworkEvent>,
stateless_mode: bool,
mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
) -> Self {
let (tipset_sender, tipset_receiver) = flume::bounded(20);
let disable_bad_block_cache = is_env_truthy("FOREST_DISABLE_BAD_BLOCK_CACHE");
Expand Down Expand Up @@ -135,7 +135,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
network_rx: flume::Receiver<NetworkEvent>,
tipset_receiver: flume::Receiver<FullTipset>,
network: SyncNetworkContext<DB>,
mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
sync_status: SyncStatus,
genesis: Tipset,
stateless_mode: bool,
Expand Down
30 changes: 13 additions & 17 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod db_util;
pub mod main;

use crate::blocks::Tipset;
use crate::chain::HeadChange;
use crate::chain::ChainStore;
use crate::chain::index::ResolveNullTipset;
use crate::chain_sync::network_context::SyncNetworkContext;
use crate::chain_sync::{ChainFollower, SyncStatus};
Expand All @@ -23,7 +23,7 @@ use crate::daemon::{
use crate::db::gc::SnapshotGarbageCollector;
use crate::db::ttl::EthMappingCollector;
use crate::libp2p::{Libp2pService, PeerManager};
use crate::message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider};
use crate::message_pool::{MessagePool, MpoolConfig};
use crate::networks::{self, ChainConfig};
use crate::rpc::RPCState;
use crate::rpc::eth::filter::EthEventHandler;
Expand Down Expand Up @@ -293,11 +293,9 @@ fn create_mpool(
services: &mut JoinSet<anyhow::Result<()>>,
p2p_service: &Libp2pService<DbType>,
ctx: &AppContext,
) -> anyhow::Result<Arc<MessagePool<MpoolRpcProvider<DbType>>>> {
let publisher = ctx.state_manager.chain_store().publisher();
let provider = MpoolRpcProvider::new(publisher.clone(), ctx.state_manager.clone());
) -> anyhow::Result<Arc<MessagePool<Arc<ChainStore<DbType>>>>> {
Ok(MessagePool::new(
provider,
ctx.state_manager.chain_store().clone(),
p2p_service.network_sender().clone(),
MpoolConfig::load_config(ctx.db.writer().as_ref())?,
ctx.state_manager.chain_config().clone(),
Expand All @@ -309,7 +307,7 @@ fn create_mpool(
fn create_chain_follower(
opts: &CliOpts,
p2p_service: &Libp2pService<DbType>,
mpool: Arc<MessagePool<MpoolRpcProvider<DbType>>>,
mpool: Arc<MessagePool<Arc<ChainStore<DbType>>>>,
ctx: &AppContext,
) -> anyhow::Result<ChainFollower<DbType>> {
let network_send = p2p_service.network_sender().clone();
Expand Down Expand Up @@ -370,7 +368,7 @@ async fn maybe_start_health_check_service(
fn maybe_start_rpc_service(
services: &mut JoinSet<anyhow::Result<()>>,
config: &Config,
mpool: Arc<MessagePool<MpoolRpcProvider<DbType>>>,
mpool: Arc<MessagePool<Arc<ChainStore<DbType>>>>,
chain_follower: &ChainFollower<DbType>,
start_time: chrono::DateTime<chrono::Utc>,
shutdown: mpsc::Sender<()>,
Expand Down Expand Up @@ -498,21 +496,19 @@ fn maybe_start_indexer_service(
&& !opts.stateless
&& !ctx.state_manager.chain_config().is_devnet()
{
let mut receiver = ctx.state_manager.chain_store().publisher().subscribe();
let mut head_changes_rx = ctx.state_manager.chain_store().subscribe_head_changes();
let chain_store = ctx.state_manager.chain_store().clone();
services.spawn(async move {
tracing::info!("Starting indexer service");

// Continuously listen for head changes
loop {
let HeadChange::Apply(ts) = receiver.recv().await?;

tracing::debug!("Indexing tipset {}", ts.key());

let delegated_messages =
chain_store.headers_delegated_messages(ts.block_headers().iter())?;

chain_store.process_signed_messages(&delegated_messages)?;
for ts in head_changes_rx.recv().await?.applies {
tracing::debug!("Indexing tipset {}", ts.key());
let delegated_messages =
chain_store.headers_delegated_messages(ts.block_headers().iter())?;
chain_store.process_signed_messages(&delegated_messages)?;
}
}
});

Expand Down
12 changes: 6 additions & 6 deletions src/db/car/many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ impl<WriterT> ManyCar<WriterT> {
Ok(())
}

pub fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
self.read_only
pub fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
Ok(self
.read_only
.read()
.peek()
.map(|w| AnyCar::heaviest_tipset_key(&w.car))
.context("ManyCar store doesn't have a heaviest tipset key")
.map(|w| AnyCar::heaviest_tipset_key(&w.car)))
}

pub fn heaviest_tipset(&self) -> anyhow::Result<Tipset> {
Expand Down Expand Up @@ -252,9 +252,9 @@ impl<WriterT: EthMappingsStore> EthMappingsStore for ManyCar<WriterT> {
}

impl<T: Blockstore + SettingsStore> super::super::HeaviestTipsetKeyProvider for ManyCar<T> {
fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
match SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)? {
Some(tsk) => Ok(tsk),
Some(tsk) => Ok(Some(tsk)),
None => self.heaviest_tipset_key(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/db/gc/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ where
tracing::warn!("{e}");
}

*self.memory_db_head_key.write() = db.heaviest_tipset_key().ok();
*self.memory_db_head_key.write() = db.heaviest_tipset_key()?;
db.unsubscribe_write_ops();
match joinset.join_next().await {
Some(Ok(map)) => {
Expand Down
5 changes: 2 additions & 3 deletions src/db/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,8 @@ impl BitswapStoreReadWrite for MemoryDB {
}

impl super::HeaviestTipsetKeyProvider for MemoryDB {
fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)?
.context("head key not found")
fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)
}

fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl PersistentStore for MemoryBlockstore {
#[auto_impl::auto_impl(&, Arc)]
pub trait HeaviestTipsetKeyProvider {
/// Returns the currently tracked heaviest tipset.
fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey>;
fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>>;

/// Sets heaviest tipset.
fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()>;
Expand Down
5 changes: 2 additions & 3 deletions src/db/parity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,8 @@ impl SettingsStore for ParityDb {
}

impl super::HeaviestTipsetKeyProvider for ParityDb {
fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
super::SettingsStoreExt::read_obj::<TipsetKey>(self, super::setting_keys::HEAD_KEY)?
.context("head key not found")
fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
super::SettingsStoreExt::read_obj::<TipsetKey>(self, super::setting_keys::HEAD_KEY)
}

fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/message_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod msgpool;
pub use self::{
config::*,
errors::*,
msgpool::{msg_pool::MessagePool, provider::MpoolRpcProvider, *},
msgpool::{msg_pool::MessagePool, *},
};

pub use block_prob::block_probabilities;
21 changes: 17 additions & 4 deletions src/message_pool/msgpool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,25 @@ where
let mut repub = false;
let mut rmsgs: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
for ts in revert {
let pts = api.load_tipset(ts.parents())?;
let Ok(pts) = api.load_tipset(ts.parents()) else {
tracing::error!("error loading reverted tipset parent");
Comment thread
hanabi1224 marked this conversation as resolved.
continue;
};
*cur_tipset.write() = pts;

let mut msgs: Vec<SignedMessage> = Vec::new();
for block in ts.block_headers() {
let (umsg, smsgs) = api.messages_for_block(block)?;
let Ok((umsg, smsgs)) = api.messages_for_block(block) else {
tracing::error!("error retrieving messages for reverted block");
continue;
};
msgs.extend(smsgs);
for msg in umsg {
let smsg = recover_sig(bls_sig_cache, msg)?;
let msg_cid = msg.cid();
let Ok(smsg) = recover_sig(bls_sig_cache, msg) else {
tracing::debug!("could not recover signature for bls message {}", msg_cid);
continue;
};
msgs.push(smsg)
}
}
Expand All @@ -246,7 +256,10 @@ where

for ts in apply {
for b in ts.block_headers() {
let (msgs, smsgs) = api.messages_for_block(b)?;
let Ok((msgs, smsgs)) = api.messages_for_block(b) else {
tracing::error!("error retrieving messages for block");
continue;
};

for msg in smsgs {
remove_from_selected_msgs(
Expand Down
Loading
Loading