Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ reason = """use `crate::utils::new_uuid_v4` instead."""
path = "tempfile::NamedTempFile::new"
reason = """The temporary files created by this method are not persistable if the temporary directory lives on a different filesystem than the target directory. While it is valid in other contexts (if not persisting files), it was misused many times and so we are banning it. Consider using `tempfile::NamedTempFile::new_in` or `tempfile::NamedTempFile::Builder"""

[[disallowed-methods]]
path = "lru::LruCache::new"
reason = """Use SizeTrackingLruCache instead."""

Comment thread
coderabbitai[bot] marked this conversation as resolved.
[[disallowed-methods]]
path = "lru::LruCache::unbounded"
reason = """Avoid unbounded lru cache for potential memory leak"""
reason = """Avoid unbounded lru cache for potential memory leak, use SizeTrackingLruCache instead."""
2 changes: 1 addition & 1 deletion interop-tests/tests/dhat_get_size_beacon_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn test_get_size_beacon_entry() {
assert_eq!(v.capacity(), v.len());
dhat::assert_eq!(
stats.curr_bytes,
size_of::<BeaconEntry>() * v.capacity() + 60
size_of::<BeaconEntry>() * v.capacity() + inner_bytes
);
dhat::assert_eq!(stats.curr_bytes, v.get_heap_size());
}
2 changes: 1 addition & 1 deletion src/beacon/drand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl DrandBeacon {
fil_round_time: interval,
fil_gen_time: genesis_ts,
verified_beacons: SizeTrackingLruCache::new_with_default_metrics_registry(
"verified_beacons_cache".into(),
"verified_beacons".into(),
NonZeroUsize::new(CACHE_SIZE).expect("Infallible"),
),
}
Expand Down
13 changes: 12 additions & 1 deletion src/blocks/election_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::blocks::VRFProof;
use crate::shim::clock::BLOCKS_PER_EPOCH;
use crate::utils::encoding::blake2b_256;
use fvm_ipld_encoding::tuple::*;
use get_size2::GetSize;
use num::{
BigInt, Integer,
bigint::{ParseBigIntError, Sign},
Expand Down Expand Up @@ -134,7 +135,17 @@ impl Poiss {
/// This is generated from hashing a partial ticket and using the hash to
/// generate a value.
#[derive(
Clone, Debug, PartialEq, PartialOrd, Eq, Default, Ord, Serialize_tuple, Deserialize_tuple, Hash,
Clone,
Debug,
PartialEq,
PartialOrd,
Eq,
Default,
Ord,
Serialize_tuple,
Deserialize_tuple,
Hash,
GetSize,
)]
pub struct ElectionProof {
pub win_count: i64,
Expand Down
40 changes: 38 additions & 2 deletions src/blocks/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ use crate::{
address::Address, clock::ChainEpoch, crypto::Signature, econ::TokenAmount,
sector::PoStProof, version::NetworkVersion,
},
utils::{encoding::blake2b_256, multihash::MultihashCode},
utils::{encoding::blake2b_256, get_size::big_int_heap_size_helper, multihash::MultihashCode},
};
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore as _;
use fvm_ipld_encoding::tuple::*;
use get_size2::GetSize;
use multihash_derive::MultihashDigest as _;
use num::BigInt;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -197,11 +198,46 @@ impl RawBlockHeader {
}
}

// The derive macro does not compile for some reason
impl GetSize for RawBlockHeader {
fn get_heap_size(&self) -> usize {
let Self {
miner_address,
ticket,
election_proof,
beacon_entries,
winning_post_proof,
parents,
weight,
epoch: _,
state_root: _,
message_receipts: _,
messages: _,
bls_aggregate,
timestamp: _,
signature,
fork_signal: _,
parent_base_fee,
} = self;
miner_address.get_heap_size()
+ ticket.get_heap_size()
+ election_proof.get_heap_size()
+ beacon_entries.get_heap_size()
+ winning_post_proof.get_heap_size()
+ parents.get_heap_size()
+ big_int_heap_size_helper(weight)
+ bls_aggregate.get_heap_size()
+ signature.get_heap_size()
+ parent_base_fee.get_heap_size()
}
}

/// A [`RawBlockHeader`] which caches calls to [`RawBlockHeader::cid`] and [`RawBlockHeader::verify_signature_against`]
#[cfg_attr(test, derive(Default))]
#[derive(Debug)]
#[derive(Debug, GetSize)]
pub struct CachingBlockHeader {
uncached: RawBlockHeader,
#[get_size(ignore)]
cid: OnceLock<Cid>,
has_ever_been_verified_against_any_signature: AtomicBool,
}
Expand Down
13 changes: 12 additions & 1 deletion src/blocks/ticket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,23 @@

use crate::blocks::VRFProof;
use fvm_ipld_encoding::tuple::*;
use get_size2::GetSize;

/// A Ticket is a marker of a tick of the blockchain's clock. It is the source
/// of randomness for proofs of storage and leader election. It is generated
/// by the miner of a block using a `VRF` and a `VDF`.
#[derive(
Clone, Debug, PartialEq, Eq, Default, Serialize_tuple, Deserialize_tuple, Hash, PartialOrd, Ord,
Clone,
Debug,
PartialEq,
Eq,
Default,
Serialize_tuple,
Deserialize_tuple,
Hash,
PartialOrd,
Ord,
GetSize,
)]
pub struct Ticket {
/// A proof output by running a `VRF` on the `VDFResult` of the parent
Expand Down
5 changes: 3 additions & 2 deletions src/blocks/tipset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
cid_collections::SmallCidNonEmptyVec,
networks::{calibnet, mainnet},
shim::clock::ChainEpoch,
utils::cid::CidCborExt,
utils::{cid::CidCborExt, get_size::nunny_vec_heap_size_helper},
};
use ahash::HashMap;
use anyhow::Context as _;
Expand Down Expand Up @@ -146,9 +146,10 @@ impl IntoIterator for TipsetKey {
///
/// Represents non-null tipsets, see the documentation on [`crate::state_manager::apply_block_messages`]
/// for more.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, GetSize)]
pub struct Tipset {
/// Sorted
#[get_size(size_fn = nunny_vec_heap_size_helper)]
headers: NonEmpty<CachingBlockHeader>,
// key is lazily initialized via `fn key()`.
key: OnceLock<TipsetKey>,
Expand Down
7 changes: 7 additions & 0 deletions src/blocks/vrf_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::utils::encoding::{blake2b_256, serde_byte_array};
use get_size2::GetSize;
use serde::{Deserialize, Serialize};

/// The output from running a VRF proof.
Expand All @@ -25,3 +26,9 @@ impl VRFProof {
blake2b_256(&self.0)
}
}

impl GetSize for VRFProof {
fn get_heap_size(&self) -> usize {
self.0.get_heap_size()
}
}
36 changes: 19 additions & 17 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use super::{
index::{ChainIndex, ResolveNullTipset},
tipset_tracker::TipsetTracker,
};
use crate::fil_cns;
use crate::interpreter::{BlockMessages, VMTrace};
use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage};
Expand All @@ -27,15 +26,15 @@ use crate::{
chain_sync::metrics,
db::{EthMappingsStore, EthMappingsStoreExt, IndicesStore, IndicesStoreExt},
};
use crate::{fil_cns, utils::cache::SizeTrackingLruCache};
use ahash::{HashMap, HashMapExt, HashSet};
use anyhow::Context as _;
use cid::Cid;
use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore;
use itertools::Itertools;
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
use parking_lot::Mutex;
use serde::{Serialize, de::DeserializeOwned};
use std::{num::NonZeroUsize, sync::Arc};
use tokio::sync::broadcast::{self, Sender as Publisher};
Expand Down Expand Up @@ -554,27 +553,28 @@ where
/// use-cases. This cache is intended to be used with a complementary function;
/// [`messages_for_tipset_with_cache`].
pub struct MsgsInTipsetCache {
cache: RwLock<LruCache<TipsetKey, Vec<ChainMessage>>>,
cache: SizeTrackingLruCache<TipsetKey, Vec<ChainMessage>>,
}

impl MsgsInTipsetCache {
pub fn new(cap: usize) -> anyhow::Result<Self> {
Ok(Self {
cache: RwLock::new(LruCache::new(
NonZeroUsize::new(cap).context("cache capacity must be greater than 0")?,
)),
})
pub fn new(capacity: NonZeroUsize) -> Self {
Self {
cache: SizeTrackingLruCache::new_with_default_metrics_registry(
"msg_in_tipset".into(),
capacity,
),
}
}

pub fn get(&self, key: &TipsetKey) -> Option<Vec<ChainMessage>> {
self.cache.write().get(key).cloned()
self.cache.get_cloned(key)
}

pub fn get_or_insert_with<F>(&self, key: &TipsetKey, f: F) -> anyhow::Result<Vec<ChainMessage>>
where
F: FnOnce() -> anyhow::Result<Vec<ChainMessage>>,
{
if self.cache.read().contains(key) {
if self.cache.contains(key) {
Ok(self.get(key).expect("cache entry disappeared!"))
} else {
let v = f()?;
Expand All @@ -584,21 +584,23 @@ impl MsgsInTipsetCache {
}

pub fn insert(&self, key: TipsetKey, value: Vec<ChainMessage>) {
self.cache.write().put(key, value);
self.cache.push(key, value);
}

/// Reads the intended cache size for this process from the environment or uses the default.
fn read_cache_size() -> usize {
fn read_cache_size() -> NonZeroUsize {
// Arbitrary number, can be adjusted
const DEFAULT: NonZeroUsize = NonZeroUsize::new(100).expect("infallible");
std::env::var("FOREST_MESSAGES_IN_TIPSET_CACHE_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(100) // Arbitrary number, can be adjusted
.unwrap_or(DEFAULT)
}
}

impl Default for MsgsInTipsetCache {
fn default() -> Self {
Self::new(Self::read_cache_size()).expect("failed to create default cache")
Self::new(Self::read_cache_size())
}
}

Expand Down Expand Up @@ -796,7 +798,7 @@ mod tests {

#[test]
fn test_messages_in_tipset_cache() {
let cache = MsgsInTipsetCache::new(2).unwrap();
let cache = MsgsInTipsetCache::new(2.try_into().unwrap());
let key1 = TipsetKey::from(nunny::vec![Cid::new_v1(
DAG_CBOR,
MultihashCode::Blake2b256.digest(&[1])
Expand Down
17 changes: 9 additions & 8 deletions src/chain/store/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,21 @@ use crate::blocks::{Tipset, TipsetKey};
use crate::chain::Error;
use crate::metrics;
use crate::shim::clock::ChainEpoch;
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::misc::env::is_env_truthy;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;
use lru::LruCache;
use nonzero_ext::nonzero;
use parking_lot::Mutex;

const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(131072_usize);

type TipsetCache = Mutex<LruCache<TipsetKey, Arc<Tipset>>>;
type TipsetCache = SizeTrackingLruCache<TipsetKey, Arc<Tipset>>;

/// Keeps look-back tipsets in cache at a given interval `skip_length` and can
/// be used to look-back at the chain to retrieve an old tipset.
pub struct ChainIndex<DB> {
/// `Arc` reference tipset cache.
ts_cache: TipsetCache,

/// `Blockstore` pointer needed to load tipsets from cold storage.
pub db: DB,
}
Expand All @@ -40,25 +38,28 @@ pub enum ResolveNullTipset {

impl<DB: Blockstore> ChainIndex<DB> {
pub fn new(db: DB) -> Self {
let ts_cache = Mutex::new(LruCache::new(DEFAULT_TIPSET_CACHE_SIZE));
let ts_cache = SizeTrackingLruCache::new_with_default_metrics_registry(
"tipet".into(),
DEFAULT_TIPSET_CACHE_SIZE,
);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Self { ts_cache, db }
}

/// Loads a tipset from memory given the tipset keys and cache. Semantically
/// identical to [`Tipset::load`] but the result is cached.
pub fn load_tipset(&self, tsk: &TipsetKey) -> Result<Option<Arc<Tipset>>, Error> {
if !is_env_truthy("FOREST_TIPSET_CACHE_DISABLED")
&& let Some(ts) = self.ts_cache.lock().get(tsk)
&& let Some(ts) = self.ts_cache.get_cloned(tsk)
{
metrics::LRU_CACHE_HIT
.get_or_create(&metrics::values::TIPSET)
.inc();
return Ok(Some(ts.clone()));
return Ok(Some(ts));
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

let ts_opt = Tipset::load(&self.db, tsk)?.map(Arc::new);
if let Some(ts) = &ts_opt {
self.ts_cache.lock().put(tsk.clone(), ts.clone());
self.ts_cache.push(tsk.clone(), ts.clone());
metrics::LRU_CACHE_MISS
.get_or_create(&metrics::values::TIPSET)
.inc();
Expand Down
5 changes: 1 addition & 4 deletions src/chain_sync/bad_block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ impl Default for BadBlockCache {
impl BadBlockCache {
pub fn new(cap: NonZeroUsize) -> Self {
Self {
cache: SizeTrackingLruCache::new_with_default_metrics_registry(
"bad_block_cache".into(),
cap,
),
cache: SizeTrackingLruCache::new_with_default_metrics_registry("bad_block".into(), cap),
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/db/car/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ impl ZstdFrameCache {
ZstdFrameCache {
max_size,
current_size: AtomicUsize::new(0),
lru: SizeTrackingLruCache::unbounded_with_default_metrics_registry(
"zstd_frame_cache".into(),
),
lru: SizeTrackingLruCache::unbounded_with_default_metrics_registry("zstd_frame".into()),
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/message/chain_message.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
// Copyright 2019-2025 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::Message as MessageTrait;
use crate::message::signed_message::SignedMessage;
use crate::shim::message::MethodNum;
use crate::shim::{address::Address, econ::TokenAmount, message::Message};
use fvm_ipld_encoding::RawBytes;
use get_size2::GetSize;
use serde::{Deserialize, Serialize};

use super::Message as MessageTrait;
use crate::message::signed_message::SignedMessage;

/// `Enum` to encapsulate signed and unsigned messages. Useful when working with
/// both types
#[derive(Clone, Debug, Serialize, Deserialize, Hash, Eq, PartialEq)]
#[derive(Clone, Debug, Serialize, Deserialize, Hash, Eq, PartialEq, GetSize)]
#[serde(untagged)]
pub enum ChainMessage {
Unsigned(Message),
Expand Down
Loading
Loading