Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ libp2p = { version = "0.56", default-features = false }
libp2p-swarm-test = { version = "0.6", default-features = false, features = ["tokio"] }
multihash-codetable = { version = "0.1", features = ["blake2b", "blake2s", "blake3", "sha2", "sha3", "strobe"] }
rust2go = "0.4"
serde = { version = "1", default-features = false, features = ["derive"] }
serde = { version = "1", default-features = false, features = ["derive", "rc"] }
serde_yaml = "0.9"
tokio = "1"

Expand Down
153 changes: 48 additions & 105 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@ use super::{
tipset_tracker::TipsetTracker,
};
use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage};
use crate::message::{ChainMessage, SignedMessage};
use crate::networks::{ChainConfig, Height};
use crate::rpc::eth::{eth_tx_from_signed_eth_message, types::EthHash};
use crate::shim::clock::ChainEpoch;
use crate::shim::{
address::Address, econ::TokenAmount, executor::Receipt, message::Message,
state_tree::StateTree, version::NetworkVersion,
};
use crate::shim::{executor::Receipt, message::Message, version::NetworkVersion};
use crate::state_manager::StateOutput;
use crate::utils::db::{BlockstoreExt, CborStoreExt};
use crate::{
Expand All @@ -30,7 +27,7 @@ use crate::{
interpreter::{BlockMessages, VMTrace},
rpc::chain::PathChanges,
};
use ahash::{HashMap, HashMapExt, HashSet};
use ahash::{HashMap, HashSet};
use anyhow::Context as _;
use cid::Cid;
use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
Expand Down Expand Up @@ -89,6 +86,9 @@ pub struct ChainStore<DB> {

/// Needed by the Ethereum mapping.
chain_config: Arc<ChainConfig>,

/// Cache for messages in tipsets, keyed by tipset key.
messages_in_tipset_cache: MessagesInTipsetCache,
}

impl<DB> BitswapStoreRead for ChainStore<DB>
Expand Down Expand Up @@ -151,11 +151,17 @@ where
validated_blocks,
eth_mappings,
chain_config,
messages_in_tipset_cache: Default::default(),
};

Ok(cs)
}

/// Cache for messages in tipsets, keyed by tipset key.
pub fn messages_in_tipset_cache(&self) -> &MessagesInTipsetCache {
&self.messages_in_tipset_cache
}

/// Sets heaviest tipset
pub fn set_heaviest_tipset(&self, head: Tipset) -> Result<(), Error> {
head.key().save(self.blockstore())?;
Expand Down Expand Up @@ -338,9 +344,13 @@ where

/// Retrieves ordered valid messages from a `Tipset`. This will only include
/// messages that will be passed through the VM.
pub fn messages_for_tipset(&self, ts: &Tipset) -> Result<Vec<ChainMessage>, Error> {
let bmsgs = BlockMessages::for_tipset(&self.db, ts)?;
Ok(bmsgs.into_iter().flat_map(|bm| bm.messages).collect())
pub fn messages_for_tipset(&self, ts: &Tipset) -> Result<Arc<Vec<ChainMessage>>, Error> {
Ok(self
.messages_in_tipset_cache()
.get_or_insert_with(ts.key(), || {
let bmsgs = BlockMessages::for_tipset(&self.db, ts)?;
Ok(bmsgs.into_iter().flat_map(|bm| bm.messages).collect_vec())
})?)
}

/// Gets look-back tipset (and state-root of that tipset) for block
Expand Down Expand Up @@ -596,133 +606,60 @@ where
/// on performance measurements, is resource-intensive and can be a bottleneck for certain
/// use-cases. This cache is intended to be used with a complementary function;
/// [`messages_for_tipset_with_cache`].
pub struct MsgsInTipsetCache {
cache: SizeTrackingLruCache<TipsetKey, Vec<ChainMessage>>,
pub struct MessagesInTipsetCache {
cache: SizeTrackingLruCache<TipsetKey, Arc<Vec<ChainMessage>>>,
}

impl MsgsInTipsetCache {
impl MessagesInTipsetCache {
pub fn new(capacity: NonZeroUsize) -> Self {
Self {
cache: SizeTrackingLruCache::new_with_metrics("msg_in_tipset".into(), capacity),
}
}

pub fn get(&self, key: &TipsetKey) -> Option<Vec<ChainMessage>> {
pub fn get(&self, key: &TipsetKey) -> Option<Arc<Vec<ChainMessage>>> {
self.cache.get_cloned(key)
}

pub fn get_or_insert_with<F>(&self, key: &TipsetKey, f: F) -> anyhow::Result<Vec<ChainMessage>>
pub fn get_or_insert_with<F>(
&self,
key: &TipsetKey,
f: F,
) -> anyhow::Result<Arc<Vec<ChainMessage>>>
where
F: FnOnce() -> anyhow::Result<Vec<ChainMessage>>,
{
if self.cache.contains(key) {
Ok(self.get(key).expect("cache entry disappeared!"))
if let Some(cached) = self.get(key) {
Ok(cached)
} else {
let v = f()?;
self.insert(key.clone(), v.clone());
Ok(v)
Ok(self.insert(key.clone(), f()?))
}
}

pub fn insert(&self, key: TipsetKey, mut value: Vec<ChainMessage>) {
pub fn insert(&self, key: TipsetKey, mut value: Vec<ChainMessage>) -> Arc<Vec<ChainMessage>> {
value.shrink_to_fit();
self.cache.push(key, value);
let value = Arc::new(value);
self.cache.push(key, value.clone());
value
}

/// Reads the intended cache size for this process from the environment or uses the default.
fn read_cache_size() -> NonZeroUsize {
// Arbitrary number, can be adjusted
const DEFAULT: NonZeroUsize = nonzero!(100usize);
const DEFAULT: NonZeroUsize = nonzero!(1024usize);
std::env::var("FOREST_MESSAGES_IN_TIPSET_CACHE_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT)
}
}

impl Default for MsgsInTipsetCache {
impl Default for MessagesInTipsetCache {
fn default() -> Self {
Self::new(Self::read_cache_size())
}
}

/// Same as [`messages_for_tipset`] but uses a cache to store messages for each tipset.
pub fn messages_for_tipset_with_cache<DB>(
db: &Arc<DB>,
ts: &Tipset,
cache: &MsgsInTipsetCache,
) -> Result<Vec<ChainMessage>, Error>
where
DB: Blockstore,
{
let key = ts.key();
cache
.get_or_insert_with(key, || {
messages_for_tipset(db, ts).context("failed to get messages for tipset")
})
.map_err(Into::into)
}

/// Given a tipset this function will return all unique messages in that tipset.
/// Note: This function is resource-intensive and can be a bottleneck for certain use-cases.
/// Consider using [`messages_for_tipset_with_cache`] for better performance.
pub fn messages_for_tipset<DB>(db: &Arc<DB>, ts: &Tipset) -> Result<Vec<ChainMessage>, Error>
Comment thread
hanabi1224 marked this conversation as resolved.
where
DB: Blockstore,
{
let mut applied: HashMap<Address, u64> = HashMap::new();
let mut balances: HashMap<Address, TokenAmount> = HashMap::new();
let state = StateTree::new_from_tipset(Arc::clone(db), ts)?;

// message to get all messages for block_header into a single iterator
let mut get_message_for_block_header =
|b: &CachingBlockHeader| -> Result<Vec<ChainMessage>, Error> {
let (unsigned, signed) = block_messages(db, b)?;
let mut messages = Vec::with_capacity(unsigned.len() + signed.len());
let unsigned_box = unsigned.into_iter().map(ChainMessage::Unsigned);
let signed_box = signed.into_iter().map(ChainMessage::Signed);

for message in unsigned_box.chain(signed_box) {
let from_address = &message.from();
if !applied.contains_key(from_address) {
let actor_state = state
.get_actor(from_address)?
.ok_or_else(|| Error::Other("Actor state not found".to_string()))?;
applied.insert(*from_address, actor_state.sequence);
balances.insert(*from_address, actor_state.balance.clone().into());
}
if let Some(seq) = applied.get_mut(from_address) {
if *seq != message.sequence() {
continue;
}
*seq += 1;
} else {
continue;
}
if let Some(bal) = balances.get_mut(from_address) {
if *bal < message.required_funds() {
continue;
}
*bal -= message.required_funds();
} else {
continue;
}

messages.push(message)
}

Ok(messages)
};

ts.block_headers()
.iter()
.try_fold(Vec::new(), |mut message_vec, b| {
let mut messages = get_message_for_block_header(b)?;
message_vec.append(&mut messages);
Ok(message_vec)
})
}

/// Returns messages from key-value store based on a slice of [`Cid`]s.
pub fn messages_from_cids<DB, T>(db: &DB, keys: &[Cid]) -> Result<Vec<T>, Error>
where
Expand Down Expand Up @@ -803,16 +740,16 @@ mod tests {

#[test]
fn test_messages_in_tipset_cache() {
let cache = MsgsInTipsetCache::new(2.try_into().unwrap());
let cache = MessagesInTipsetCache::new(nonzero!(2_usize));
let key1 = TipsetKey::from(nunny::vec![Cid::new_v1(
DAG_CBOR,
MultihashCode::Blake2b256.digest(&[1])
)]);
assert!(cache.get(&key1).is_none());

let msgs = vec![ChainMessage::Unsigned(Message::default())];
let msgs = vec![Message::default().into()];
cache.insert(key1.clone(), msgs.clone());
assert_eq!(msgs, cache.get(&key1).unwrap());
assert_eq!(&msgs, &*cache.get(&key1).unwrap());

let inserter_executed: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(false);
Expand All @@ -821,7 +758,10 @@ mod tests {
Ok(msgs.clone())
};

assert_eq!(msgs, cache.get_or_insert_with(&key1, key_inserter).unwrap());
assert_eq!(
&msgs,
&*cache.get_or_insert_with(&key1, key_inserter).unwrap()
);
assert!(!inserter_executed.load(std::sync::atomic::Ordering::Relaxed));

let key2 = TipsetKey::from(nunny::vec![Cid::new_v1(
Expand All @@ -830,7 +770,10 @@ mod tests {
)]);

assert!(cache.get(&key2).is_none());
assert_eq!(msgs, cache.get_or_insert_with(&key2, key_inserter).unwrap());
assert_eq!(
&msgs,
&*cache.get_or_insert_with(&key2, key_inserter).unwrap()
);
assert!(inserter_executed.load(std::sync::atomic::Ordering::Relaxed));
}
}
2 changes: 1 addition & 1 deletion src/chain_sync/tipset_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
};
use crate::{
eth::is_valid_eth_tx_for_sending,
message::{Message as MessageTrait, valid_for_block_inclusion},
message::{MessageRead as _, valid_for_block_inclusion},
};
use ahash::HashMap;
use cid::Cid;
Expand Down
5 changes: 1 addition & 4 deletions src/cli/subcommands/mpool_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::blocks::Tipset;
use crate::lotus_json::{HasLotusJson as _, NotNullVec};
use crate::message::SignedMessage;
use crate::message::{MessageRead as _, SignedMessage};
use crate::rpc::{self, prelude::*, types::ApiTipsetKey};
use crate::shim::address::StrictAddress;
use crate::shim::message::Message;
Expand Down Expand Up @@ -52,8 +52,6 @@ fn filter_messages(
to: Option<&StrictAddress>,
from: Option<&StrictAddress>,
) -> anyhow::Result<Vec<SignedMessage>> {
use crate::message::Message;

let filtered = messages
.into_iter()
.filter(|msg| {
Expand Down Expand Up @@ -285,7 +283,6 @@ impl MpoolCommands {
mod tests {
use super::*;
use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
use crate::message::{Message, SignedMessage};
use crate::message_pool::tests::create_smsg;
use crate::shim::crypto::SignatureType;
use itertools::Itertools as _;
Expand Down
2 changes: 0 additions & 2 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ fn maybe_start_rpc_service(
let tipset_send = chain_follower.tipset_sender.clone();
let keystore = ctx.keystore.clone();
let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone();
let msgs_in_tipset = Arc::new(crate::chain::MsgsInTipsetCache::default());
async move {
let rpc_listener = tokio::net::TcpListener::bind(rpc_address)
.await
Expand All @@ -412,7 +411,6 @@ fn maybe_start_rpc_service(
keystore,
mpool,
bad_blocks,
msgs_in_tipset,
sync_status,
eth_event_handler,
sync_network_context,
Expand Down
2 changes: 1 addition & 1 deletion src/eth/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use rlp::Rlp;
use spire_enum::prelude::delegated_enum;

use crate::{
message::{Message as _, SignedMessage},
message::{MessageRead as _, SignedMessage},
rpc::eth::types::EthAddress,
shim::{address::Address, crypto::SignatureType, message::Message, version::NetworkVersion},
};
Expand Down
16 changes: 5 additions & 11 deletions src/interpreter/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::interpreter::{
fvm4::ForestExterns as ForestExternsV4,
};
use crate::message::ChainMessage;
use crate::message::Message as MessageTrait;
use crate::message::MessageRead as _;
use crate::networks::{ChainConfig, NetworkChain};
use crate::shim::actors::{AwardBlockRewardParams, cron, reward};
use crate::shim::{
Expand Down Expand Up @@ -115,14 +115,8 @@ impl BlockMessages {
let (usm, sm) = block_messages(db, b)?;

let mut messages = Vec::with_capacity(usm.len() + sm.len());
messages.extend(
usm.into_iter()
.filter_map(|m| select_msg(ChainMessage::Unsigned(m))),
);
messages.extend(
sm.into_iter()
.filter_map(|m| select_msg(ChainMessage::Signed(m))),
);
messages.extend(usm.into_iter().filter_map(|m| select_msg(m.into())));
messages.extend(sm.into_iter().filter_map(|m| select_msg(m.into())));

Ok(BlockMessages {
miner: b.miner_address,
Expand Down Expand Up @@ -336,7 +330,7 @@ where
if let Some(mut callback) = callback {
callback(MessageCallbackCtx {
cid: cron_msg.cid(),
message: &ChainMessage::Unsigned(cron_msg),
message: &cron_msg.into(),
apply_ret: &ret,
at: CalledAt::Cron,
duration,
Expand Down Expand Up @@ -421,7 +415,7 @@ where
if let Some(callback) = &mut callback {
callback(MessageCallbackCtx {
cid: rew_msg.cid(),
message: &ChainMessage::Unsigned(rew_msg),
message: &rew_msg.into(),
apply_ret: &ret,
at: CalledAt::Reward,
duration,
Expand Down
Loading
Loading