Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ libp2p = { version = "0.56", default-features = false }
libp2p-swarm-test = { version = "0.6", default-features = false, features = ["tokio"] }
multihash-codetable = { version = "0.1", features = ["blake2b", "blake2s", "blake3", "sha2", "sha3", "strobe"] }
rust2go = "0.4"
serde = { version = "1", default-features = false, features = ["derive"] }
serde = { version = "1", default-features = false, features = ["derive", "rc"] }
serde_yaml = "0.9"
tokio = "1"

Expand Down
153 changes: 48 additions & 105 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@ use super::{
tipset_tracker::TipsetTracker,
};
use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage};
use crate::message::{ChainMessage, SignedMessage};
use crate::networks::{ChainConfig, Height};
use crate::rpc::eth::{eth_tx_from_signed_eth_message, types::EthHash};
use crate::shim::clock::ChainEpoch;
use crate::shim::{
address::Address, econ::TokenAmount, executor::Receipt, message::Message,
state_tree::StateTree, version::NetworkVersion,
};
use crate::shim::{executor::Receipt, message::Message, version::NetworkVersion};
use crate::state_manager::StateOutput;
use crate::utils::db::{BlockstoreExt, CborStoreExt};
use crate::{
Expand All @@ -30,7 +27,7 @@ use crate::{
interpreter::{BlockMessages, VMTrace},
rpc::chain::PathChanges,
};
use ahash::{HashMap, HashMapExt, HashSet};
use ahash::{HashMap, HashSet};
use anyhow::Context as _;
use cid::Cid;
use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
Expand Down Expand Up @@ -89,6 +86,9 @@ pub struct ChainStore<DB> {

/// Needed by the Ethereum mapping.
chain_config: Arc<ChainConfig>,

/// Cache for messages in tipsets, keyed by tipset key.
messages_in_tipset_cache: MessagesInTipsetCache,
}

impl<DB> BitswapStoreRead for ChainStore<DB>
Expand Down Expand Up @@ -151,11 +151,17 @@ where
validated_blocks,
eth_mappings,
chain_config,
messages_in_tipset_cache: Default::default(),
};

Ok(cs)
}

/// Cache for messages in tipsets, keyed by tipset key.
pub fn messages_in_tipset_cache(&self) -> &MessagesInTipsetCache {
&self.messages_in_tipset_cache
}

/// Sets heaviest tipset
pub fn set_heaviest_tipset(&self, head: Tipset) -> Result<(), Error> {
head.key().save(self.blockstore())?;
Expand Down Expand Up @@ -338,9 +344,13 @@ where

/// Retrieves ordered valid messages from a `Tipset`. This will only include
/// messages that will be passed through the VM.
pub fn messages_for_tipset(&self, ts: &Tipset) -> Result<Vec<ChainMessage>, Error> {
let bmsgs = BlockMessages::for_tipset(&self.db, ts)?;
Ok(bmsgs.into_iter().flat_map(|bm| bm.messages).collect())
pub fn messages_for_tipset(&self, ts: &Tipset) -> Result<Arc<Vec<ChainMessage>>, Error> {
Ok(self
.messages_in_tipset_cache()
.get_or_insert_with(ts.key(), || {
let bmsgs = BlockMessages::for_tipset(&self.db, ts)?;
Ok(bmsgs.into_iter().flat_map(|bm| bm.messages).collect_vec())
})?)
}

/// Gets look-back tipset (and state-root of that tipset) for block
Expand Down Expand Up @@ -596,133 +606,60 @@ where
/// on performance measurements, is resource-intensive and can be a bottleneck for certain
/// use-cases. This cache is intended to be used with a complementary function;
/// [`messages_for_tipset_with_cache`].
pub struct MsgsInTipsetCache {
cache: SizeTrackingLruCache<TipsetKey, Vec<ChainMessage>>,
pub struct MessagesInTipsetCache {
cache: SizeTrackingLruCache<TipsetKey, Arc<Vec<ChainMessage>>>,
}

impl MsgsInTipsetCache {
impl MessagesInTipsetCache {
pub fn new(capacity: NonZeroUsize) -> Self {
Self {
cache: SizeTrackingLruCache::new_with_metrics("msg_in_tipset".into(), capacity),
}
}

pub fn get(&self, key: &TipsetKey) -> Option<Vec<ChainMessage>> {
pub fn get(&self, key: &TipsetKey) -> Option<Arc<Vec<ChainMessage>>> {
self.cache.get_cloned(key)
}

pub fn get_or_insert_with<F>(&self, key: &TipsetKey, f: F) -> anyhow::Result<Vec<ChainMessage>>
pub fn get_or_insert_with<F>(
&self,
key: &TipsetKey,
f: F,
) -> anyhow::Result<Arc<Vec<ChainMessage>>>
where
F: FnOnce() -> anyhow::Result<Vec<ChainMessage>>,
{
if self.cache.contains(key) {
Ok(self.get(key).expect("cache entry disappeared!"))
if let Some(cached) = self.get(key) {
Ok(cached)
} else {
let v = f()?;
self.insert(key.clone(), v.clone());
Ok(v)
Ok(self.insert(key.clone(), f()?))
}
}

pub fn insert(&self, key: TipsetKey, mut value: Vec<ChainMessage>) {
pub fn insert(&self, key: TipsetKey, mut value: Vec<ChainMessage>) -> Arc<Vec<ChainMessage>> {
value.shrink_to_fit();
self.cache.push(key, value);
let value = Arc::new(value);
self.cache.push(key, value.clone());
value
}

/// Reads the intended cache size for this process from the environment or uses the default.
fn read_cache_size() -> NonZeroUsize {
// Arbitrary number, can be adjusted
const DEFAULT: NonZeroUsize = nonzero!(100usize);
const DEFAULT: NonZeroUsize = nonzero!(1024usize);
std::env::var("FOREST_MESSAGES_IN_TIPSET_CACHE_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT)
}
}

impl Default for MsgsInTipsetCache {
impl Default for MessagesInTipsetCache {
fn default() -> Self {
Self::new(Self::read_cache_size())
}
}

/// Same as [`messages_for_tipset`] but uses a cache to store messages for each tipset.
pub fn messages_for_tipset_with_cache<DB>(
db: &Arc<DB>,
ts: &Tipset,
cache: &MsgsInTipsetCache,
) -> Result<Vec<ChainMessage>, Error>
where
DB: Blockstore,
{
let key = ts.key();
cache
.get_or_insert_with(key, || {
messages_for_tipset(db, ts).context("failed to get messages for tipset")
})
.map_err(Into::into)
}

/// Given a tipset this function will return all unique messages in that tipset.
/// Note: This function is resource-intensive and can be a bottleneck for certain use-cases.
/// Consider using [`messages_for_tipset_with_cache`] for better performance.
pub fn messages_for_tipset<DB>(db: &Arc<DB>, ts: &Tipset) -> Result<Vec<ChainMessage>, Error>
Comment thread
hanabi1224 marked this conversation as resolved.
where
DB: Blockstore,
{
let mut applied: HashMap<Address, u64> = HashMap::new();
let mut balances: HashMap<Address, TokenAmount> = HashMap::new();
let state = StateTree::new_from_tipset(Arc::clone(db), ts)?;

// message to get all messages for block_header into a single iterator
let mut get_message_for_block_header =
|b: &CachingBlockHeader| -> Result<Vec<ChainMessage>, Error> {
let (unsigned, signed) = block_messages(db, b)?;
let mut messages = Vec::with_capacity(unsigned.len() + signed.len());
let unsigned_box = unsigned.into_iter().map(ChainMessage::Unsigned);
let signed_box = signed.into_iter().map(ChainMessage::Signed);

for message in unsigned_box.chain(signed_box) {
let from_address = &message.from();
if !applied.contains_key(from_address) {
let actor_state = state
.get_actor(from_address)?
.ok_or_else(|| Error::Other("Actor state not found".to_string()))?;
applied.insert(*from_address, actor_state.sequence);
balances.insert(*from_address, actor_state.balance.clone().into());
}
if let Some(seq) = applied.get_mut(from_address) {
if *seq != message.sequence() {
continue;
}
*seq += 1;
} else {
continue;
}
if let Some(bal) = balances.get_mut(from_address) {
if *bal < message.required_funds() {
continue;
}
*bal -= message.required_funds();
} else {
continue;
}

messages.push(message)
}

Ok(messages)
};

ts.block_headers()
.iter()
.try_fold(Vec::new(), |mut message_vec, b| {
let mut messages = get_message_for_block_header(b)?;
message_vec.append(&mut messages);
Ok(message_vec)
})
}

/// Returns messages from key-value store based on a slice of [`Cid`]s.
pub fn messages_from_cids<DB, T>(db: &DB, keys: &[Cid]) -> Result<Vec<T>, Error>
where
Expand Down Expand Up @@ -803,16 +740,16 @@ mod tests {

#[test]
fn test_messages_in_tipset_cache() {
let cache = MsgsInTipsetCache::new(2.try_into().unwrap());
let cache = MessagesInTipsetCache::new(nonzero!(2_usize));
let key1 = TipsetKey::from(nunny::vec![Cid::new_v1(
DAG_CBOR,
MultihashCode::Blake2b256.digest(&[1])
)]);
assert!(cache.get(&key1).is_none());

let msgs = vec![ChainMessage::Unsigned(Message::default())];
let msgs = vec![Message::default().into()];
cache.insert(key1.clone(), msgs.clone());
assert_eq!(msgs, cache.get(&key1).unwrap());
assert_eq!(&msgs, &*cache.get(&key1).unwrap());

let inserter_executed: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(false);
Expand All @@ -821,7 +758,10 @@ mod tests {
Ok(msgs.clone())
};

assert_eq!(msgs, cache.get_or_insert_with(&key1, key_inserter).unwrap());
assert_eq!(
&msgs,
&*cache.get_or_insert_with(&key1, key_inserter).unwrap()
);
assert!(!inserter_executed.load(std::sync::atomic::Ordering::Relaxed));

let key2 = TipsetKey::from(nunny::vec![Cid::new_v1(
Expand All @@ -830,7 +770,10 @@ mod tests {
)]);

assert!(cache.get(&key2).is_none());
assert_eq!(msgs, cache.get_or_insert_with(&key2, key_inserter).unwrap());
assert_eq!(
&msgs,
&*cache.get_or_insert_with(&key2, key_inserter).unwrap()
);
assert!(inserter_executed.load(std::sync::atomic::Ordering::Relaxed));
}
}
2 changes: 1 addition & 1 deletion src/chain_sync/tipset_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
};
use crate::{
eth::is_valid_eth_tx_for_sending,
message::{Message as MessageTrait, valid_for_block_inclusion},
message::{MessageRead as _, valid_for_block_inclusion},
};
use ahash::HashMap;
use cid::Cid;
Expand Down
5 changes: 1 addition & 4 deletions src/cli/subcommands/mpool_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::blocks::Tipset;
use crate::lotus_json::{HasLotusJson as _, NotNullVec};
use crate::message::SignedMessage;
use crate::message::{MessageRead as _, SignedMessage};
use crate::rpc::{self, prelude::*, types::ApiTipsetKey};
use crate::shim::address::StrictAddress;
use crate::shim::message::Message;
Expand Down Expand Up @@ -52,8 +52,6 @@ fn filter_messages(
to: Option<&StrictAddress>,
from: Option<&StrictAddress>,
) -> anyhow::Result<Vec<SignedMessage>> {
use crate::message::Message;

let filtered = messages
.into_iter()
.filter(|msg| {
Expand Down Expand Up @@ -285,7 +283,6 @@ impl MpoolCommands {
mod tests {
use super::*;
use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
use crate::message::{Message, SignedMessage};
use crate::message_pool::tests::create_smsg;
use crate::shim::crypto::SignatureType;
use itertools::Itertools as _;
Expand Down
2 changes: 0 additions & 2 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ fn maybe_start_rpc_service(
let tipset_send = chain_follower.tipset_sender.clone();
let keystore = ctx.keystore.clone();
let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone();
let msgs_in_tipset = Arc::new(crate::chain::MsgsInTipsetCache::default());
async move {
let rpc_listener = tokio::net::TcpListener::bind(rpc_address)
.await
Expand All @@ -412,7 +411,6 @@ fn maybe_start_rpc_service(
keystore,
mpool,
bad_blocks,
msgs_in_tipset,
sync_status,
eth_event_handler,
sync_network_context,
Expand Down
2 changes: 1 addition & 1 deletion src/eth/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use rlp::Rlp;
use spire_enum::prelude::delegated_enum;

use crate::{
message::{Message as _, SignedMessage},
message::{MessageRead as _, SignedMessage},
rpc::eth::types::EthAddress,
shim::{address::Address, crypto::SignatureType, message::Message, version::NetworkVersion},
};
Expand Down
16 changes: 5 additions & 11 deletions src/interpreter/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::interpreter::{
fvm4::ForestExterns as ForestExternsV4,
};
use crate::message::ChainMessage;
use crate::message::Message as MessageTrait;
use crate::message::MessageRead as _;
use crate::networks::{ChainConfig, NetworkChain};
use crate::shim::actors::{AwardBlockRewardParams, cron, reward};
use crate::shim::{
Expand Down Expand Up @@ -115,14 +115,8 @@ impl BlockMessages {
let (usm, sm) = block_messages(db, b)?;

let mut messages = Vec::with_capacity(usm.len() + sm.len());
messages.extend(
usm.into_iter()
.filter_map(|m| select_msg(ChainMessage::Unsigned(m))),
);
messages.extend(
sm.into_iter()
.filter_map(|m| select_msg(ChainMessage::Signed(m))),
);
messages.extend(usm.into_iter().filter_map(|m| select_msg(m.into())));
messages.extend(sm.into_iter().filter_map(|m| select_msg(m.into())));

Ok(BlockMessages {
miner: b.miner_address,
Expand Down Expand Up @@ -336,7 +330,7 @@ where
if let Some(mut callback) = callback {
callback(MessageCallbackCtx {
cid: cron_msg.cid(),
message: &ChainMessage::Unsigned(cron_msg),
message: &cron_msg.into(),
apply_ret: &ret,
at: CalledAt::Cron,
duration,
Expand Down Expand Up @@ -421,7 +415,7 @@ where
if let Some(callback) = &mut callback {
callback(MessageCallbackCtx {
cid: rew_msg.cid(),
message: &ChainMessage::Unsigned(rew_msg),
message: &rew_msg.into(),
apply_ret: &ret,
at: CalledAt::Reward,
duration,
Expand Down
Loading
Loading