Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion .clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ reason = """use `crate::utils::new_uuid_v4` instead."""
path = "tempfile::NamedTempFile::new"
reason = """The temporary files created by this method are not persistable if the temporary directory lives on a different filesystem than the target directory. While it is valid in other contexts (if not persisting files), it was misused many times and so we are banning it. Consider using `tempfile::NamedTempFile::new_in` or `tempfile::NamedTempFile::Builder"""

[[disallowed-methods]]
path = "lru::LruCache::new"
reason = """Use SizeTrackingLruCache instead."""

Comment thread
coderabbitai[bot] marked this conversation as resolved.
[[disallowed-methods]]
path = "lru::LruCache::with_hasher"
reason = """Use SizeTrackingLruCache instead."""

[[disallowed-methods]]
path = "lru::LruCache::unbounded"
reason = """Avoid unbounded lru cache for potential memory leak"""
reason = """Avoid unbounded lru cache for potential memory leak, use SizeTrackingLruCache instead."""

[[disallowed-methods]]
path = "lru::LruCache::unbounded_with_hasher"
reason = """Avoid unbounded lru cache for potential memory leak, use SizeTrackingLruCache instead."""
2 changes: 1 addition & 1 deletion interop-tests/tests/dhat_get_size_beacon_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn test_get_size_beacon_entry() {
assert_eq!(v.capacity(), v.len());
dhat::assert_eq!(
stats.curr_bytes,
size_of::<BeaconEntry>() * v.capacity() + 60
size_of::<BeaconEntry>() * v.capacity() + inner_bytes
);
dhat::assert_eq!(stats.curr_bytes, v.get_heap_size());
}
2 changes: 1 addition & 1 deletion src/beacon/drand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl DrandBeacon {
fil_round_time: interval,
fil_gen_time: genesis_ts,
verified_beacons: SizeTrackingLruCache::new_with_default_metrics_registry(
"verified_beacons_cache".into(),
"verified_beacons".into(),
NonZeroUsize::new(CACHE_SIZE).expect("Infallible"),
),
}
Expand Down
13 changes: 12 additions & 1 deletion src/blocks/election_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::blocks::VRFProof;
use crate::shim::clock::BLOCKS_PER_EPOCH;
use crate::utils::encoding::blake2b_256;
use fvm_ipld_encoding::tuple::*;
use get_size2::GetSize;
use num::{
BigInt, Integer,
bigint::{ParseBigIntError, Sign},
Expand Down Expand Up @@ -134,7 +135,17 @@ impl Poiss {
/// This is generated from hashing a partial ticket and using the hash to
/// generate a value.
#[derive(
Clone, Debug, PartialEq, PartialOrd, Eq, Default, Ord, Serialize_tuple, Deserialize_tuple, Hash,
Clone,
Debug,
PartialEq,
PartialOrd,
Eq,
Default,
Ord,
Serialize_tuple,
Deserialize_tuple,
Hash,
GetSize,
)]
pub struct ElectionProof {
pub win_count: i64,
Expand Down
40 changes: 38 additions & 2 deletions src/blocks/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ use crate::{
address::Address, clock::ChainEpoch, crypto::Signature, econ::TokenAmount,
sector::PoStProof, version::NetworkVersion,
},
utils::{encoding::blake2b_256, multihash::MultihashCode},
utils::{encoding::blake2b_256, get_size::big_int_heap_size_helper, multihash::MultihashCode},
};
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore as _;
use fvm_ipld_encoding::tuple::*;
use get_size2::GetSize;
use multihash_derive::MultihashDigest as _;
use num::BigInt;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -197,11 +198,46 @@ impl RawBlockHeader {
}
}

// The derive macro does not compile for some reason
impl GetSize for RawBlockHeader {
fn get_heap_size(&self) -> usize {
let Self {
miner_address,
ticket,
election_proof,
beacon_entries,
winning_post_proof,
parents,
weight,
epoch: _,
state_root: _,
message_receipts: _,
messages: _,
bls_aggregate,
timestamp: _,
signature,
fork_signal: _,
parent_base_fee,
} = self;
miner_address.get_heap_size()
+ ticket.get_heap_size()
+ election_proof.get_heap_size()
+ beacon_entries.get_heap_size()
+ winning_post_proof.get_heap_size()
+ parents.get_heap_size()
+ big_int_heap_size_helper(weight)
+ bls_aggregate.get_heap_size()
+ signature.get_heap_size()
+ parent_base_fee.get_heap_size()
}
}

/// A [`RawBlockHeader`] which caches calls to [`RawBlockHeader::cid`] and [`RawBlockHeader::verify_signature_against`]
#[cfg_attr(test, derive(Default))]
#[derive(Debug)]
#[derive(Debug, GetSize)]
pub struct CachingBlockHeader {
uncached: RawBlockHeader,
#[get_size(ignore)]
cid: OnceLock<Cid>,
has_ever_been_verified_against_any_signature: AtomicBool,
}
Expand Down
13 changes: 12 additions & 1 deletion src/blocks/ticket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,23 @@

use crate::blocks::VRFProof;
use fvm_ipld_encoding::tuple::*;
use get_size2::GetSize;

/// A Ticket is a marker of a tick of the blockchain's clock. It is the source
/// of randomness for proofs of storage and leader election. It is generated
/// by the miner of a block using a `VRF` and a `VDF`.
#[derive(
Clone, Debug, PartialEq, Eq, Default, Serialize_tuple, Deserialize_tuple, Hash, PartialOrd, Ord,
Clone,
Debug,
PartialEq,
Eq,
Default,
Serialize_tuple,
Deserialize_tuple,
Hash,
PartialOrd,
Ord,
GetSize,
)]
pub struct Ticket {
/// A proof output by running a `VRF` on the `VDFResult` of the parent
Expand Down
5 changes: 3 additions & 2 deletions src/blocks/tipset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
cid_collections::SmallCidNonEmptyVec,
networks::{calibnet, mainnet},
shim::clock::ChainEpoch,
utils::cid::CidCborExt,
utils::{cid::CidCborExt, get_size::nunny_vec_heap_size_helper},
};
use ahash::HashMap;
use anyhow::Context as _;
Expand Down Expand Up @@ -146,9 +146,10 @@ impl IntoIterator for TipsetKey {
///
/// Represents non-null tipsets, see the documentation on [`crate::state_manager::apply_block_messages`]
/// for more.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, GetSize)]
pub struct Tipset {
/// Sorted
#[get_size(size_fn = nunny_vec_heap_size_helper)]
headers: NonEmpty<CachingBlockHeader>,
// key is lazily initialized via `fn key()`.
key: OnceLock<TipsetKey>,
Expand Down
7 changes: 7 additions & 0 deletions src/blocks/vrf_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::utils::encoding::{blake2b_256, serde_byte_array};
use get_size2::GetSize;
use serde::{Deserialize, Serialize};

/// The output from running a VRF proof.
Expand All @@ -25,3 +26,9 @@ impl VRFProof {
blake2b_256(&self.0)
}
}

impl GetSize for VRFProof {
fn get_heap_size(&self) -> usize {
self.0.get_heap_size()
}
}
36 changes: 19 additions & 17 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use super::{
index::{ChainIndex, ResolveNullTipset},
tipset_tracker::TipsetTracker,
};
use crate::fil_cns;
use crate::interpreter::{BlockMessages, VMTrace};
use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage};
Expand All @@ -27,15 +26,15 @@ use crate::{
chain_sync::metrics,
db::{EthMappingsStore, EthMappingsStoreExt, IndicesStore, IndicesStoreExt},
};
use crate::{fil_cns, utils::cache::SizeTrackingLruCache};
use ahash::{HashMap, HashMapExt, HashSet};
use anyhow::Context as _;
use cid::Cid;
use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore;
use itertools::Itertools;
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
use parking_lot::Mutex;
use serde::{Serialize, de::DeserializeOwned};
use std::{num::NonZeroUsize, sync::Arc};
use tokio::sync::broadcast::{self, Sender as Publisher};
Expand Down Expand Up @@ -554,27 +553,28 @@ where
/// use-cases. This cache is intended to be used with a complementary function;
/// [`messages_for_tipset_with_cache`].
pub struct MsgsInTipsetCache {
cache: RwLock<LruCache<TipsetKey, Vec<ChainMessage>>>,
cache: SizeTrackingLruCache<TipsetKey, Vec<ChainMessage>>,
}

impl MsgsInTipsetCache {
pub fn new(cap: usize) -> anyhow::Result<Self> {
Ok(Self {
cache: RwLock::new(LruCache::new(
NonZeroUsize::new(cap).context("cache capacity must be greater than 0")?,
)),
})
pub fn new(capacity: NonZeroUsize) -> Self {
Self {
cache: SizeTrackingLruCache::new_with_default_metrics_registry(
"msg_in_tipset".into(),
capacity,
),
}
}

pub fn get(&self, key: &TipsetKey) -> Option<Vec<ChainMessage>> {
self.cache.write().get(key).cloned()
self.cache.get_cloned(key)
}

pub fn get_or_insert_with<F>(&self, key: &TipsetKey, f: F) -> anyhow::Result<Vec<ChainMessage>>
where
F: FnOnce() -> anyhow::Result<Vec<ChainMessage>>,
{
if self.cache.read().contains(key) {
if self.cache.contains(key) {
Ok(self.get(key).expect("cache entry disappeared!"))
} else {
let v = f()?;
Expand All @@ -584,21 +584,23 @@ impl MsgsInTipsetCache {
}

pub fn insert(&self, key: TipsetKey, value: Vec<ChainMessage>) {
self.cache.write().put(key, value);
self.cache.push(key, value);
}

/// Reads the intended cache size for this process from the environment or uses the default.
fn read_cache_size() -> usize {
fn read_cache_size() -> NonZeroUsize {
// Arbitrary number, can be adjusted
const DEFAULT: NonZeroUsize = NonZeroUsize::new(100).expect("infallible");
std::env::var("FOREST_MESSAGES_IN_TIPSET_CACHE_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(100) // Arbitrary number, can be adjusted
.unwrap_or(DEFAULT)
}
}

impl Default for MsgsInTipsetCache {
fn default() -> Self {
Self::new(Self::read_cache_size()).expect("failed to create default cache")
Self::new(Self::read_cache_size())
}
}

Expand Down Expand Up @@ -796,7 +798,7 @@ mod tests {

#[test]
fn test_messages_in_tipset_cache() {
let cache = MsgsInTipsetCache::new(2).unwrap();
let cache = MsgsInTipsetCache::new(2.try_into().unwrap());
let key1 = TipsetKey::from(nunny::vec![Cid::new_v1(
DAG_CBOR,
MultihashCode::Blake2b256.digest(&[1])
Expand Down
22 changes: 11 additions & 11 deletions src/chain/store/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,21 @@ use crate::blocks::{Tipset, TipsetKey};
use crate::chain::Error;
use crate::metrics;
use crate::shim::clock::ChainEpoch;
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::misc::env::is_env_truthy;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;
use lru::LruCache;
use nonzero_ext::nonzero;
use parking_lot::Mutex;

const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(131072_usize);

type TipsetCache = Mutex<LruCache<TipsetKey, Arc<Tipset>>>;
type TipsetCache = SizeTrackingLruCache<TipsetKey, Arc<Tipset>>;

/// Keeps look-back tipsets in cache at a given interval `skip_length` and can
/// be used to look-back at the chain to retrieve an old tipset.
pub struct ChainIndex<DB> {
/// `Arc` reference tipset cache.
ts_cache: TipsetCache,

/// `Blockstore` pointer needed to load tipsets from cold storage.
pub db: DB,
}
Expand All @@ -40,25 +38,27 @@ pub enum ResolveNullTipset {

impl<DB: Blockstore> ChainIndex<DB> {
pub fn new(db: DB) -> Self {
let ts_cache = Mutex::new(LruCache::new(DEFAULT_TIPSET_CACHE_SIZE));
let ts_cache = SizeTrackingLruCache::new_with_default_metrics_registry(
"tipset".into(),
DEFAULT_TIPSET_CACHE_SIZE,
);
Self { ts_cache, db }
}

/// Loads a tipset from memory given the tipset keys and cache. Semantically
/// identical to [`Tipset::load`] but the result is cached.
pub fn load_tipset(&self, tsk: &TipsetKey) -> Result<Option<Arc<Tipset>>, Error> {
if !is_env_truthy("FOREST_TIPSET_CACHE_DISABLED")
&& let Some(ts) = self.ts_cache.lock().get(tsk)
{
let cache_enabled = !is_env_truthy("FOREST_TIPSET_CACHE_DISABLED");
if cache_enabled && let Some(ts) = self.ts_cache.get_cloned(tsk) {
metrics::LRU_CACHE_HIT
.get_or_create(&metrics::values::TIPSET)
.inc();
return Ok(Some(ts.clone()));
return Ok(Some(ts));
}

let ts_opt = Tipset::load(&self.db, tsk)?.map(Arc::new);
if let Some(ts) = &ts_opt {
self.ts_cache.lock().put(tsk.clone(), ts.clone());
if cache_enabled && let Some(ts) = &ts_opt {
self.ts_cache.push(tsk.clone(), ts.clone());
metrics::LRU_CACHE_MISS
.get_or_create(&metrics::values::TIPSET)
.inc();
Expand Down
5 changes: 1 addition & 4 deletions src/chain_sync/bad_block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ impl Default for BadBlockCache {
impl BadBlockCache {
pub fn new(cap: NonZeroUsize) -> Self {
Self {
cache: SizeTrackingLruCache::new_with_default_metrics_registry(
"bad_block_cache".into(),
cap,
),
cache: SizeTrackingLruCache::new_with_default_metrics_registry("bad_block".into(), cap),
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/db/car/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ impl ZstdFrameCache {
ZstdFrameCache {
max_size,
current_size: AtomicUsize::new(0),
lru: SizeTrackingLruCache::unbounded_with_default_metrics_registry(
"zstd_frame_cache".into(),
),
lru: SizeTrackingLruCache::unbounded_with_default_metrics_registry("zstd_frame".into()),
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/message/chain_message.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
// Copyright 2019-2025 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::Message as MessageTrait;
use crate::message::signed_message::SignedMessage;
use crate::shim::message::MethodNum;
use crate::shim::{address::Address, econ::TokenAmount, message::Message};
use fvm_ipld_encoding::RawBytes;
use get_size2::GetSize;
use serde::{Deserialize, Serialize};

use super::Message as MessageTrait;
use crate::message::signed_message::SignedMessage;

/// `Enum` to encapsulate signed and unsigned messages. Useful when working with
/// both types
#[derive(Clone, Debug, Serialize, Deserialize, Hash, Eq, PartialEq)]
#[derive(Clone, Debug, Serialize, Deserialize, Hash, Eq, PartialEq, GetSize)]
#[serde(untagged)]
pub enum ChainMessage {
Unsigned(Message),
Expand Down
Loading
Loading