diff --git a/src/beacon/beacon_entries.rs b/src/beacon/beacon_entries.rs index 06fac9ed0542..5b020a8c4af3 100644 --- a/src/beacon/beacon_entries.rs +++ b/src/beacon/beacon_entries.rs @@ -4,6 +4,7 @@ use crate::utils::encoding::serde_byte_array; use byteorder::{BigEndian, ByteOrder as _}; use digest::Digest as _; +use get_size2::GetSize; use serde_tuple::{self, Deserialize_tuple, Serialize_tuple}; /// The result from getting an entry from `Drand`. @@ -12,7 +13,17 @@ use serde_tuple::{self, Deserialize_tuple, Serialize_tuple}; /// This beacon entry is stored on chain in the block header. #[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))] #[derive( - Clone, Debug, Default, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize_tuple, Deserialize_tuple, + Clone, + Debug, + Default, + Eq, + PartialEq, + Hash, + Ord, + PartialOrd, + Serialize_tuple, + Deserialize_tuple, + GetSize, )] pub struct BeaconEntry { round: u64, diff --git a/src/beacon/drand.rs b/src/beacon/drand.rs index 4a9748081058..1ad5942587cb 100644 --- a/src/beacon/drand.rs +++ b/src/beacon/drand.rs @@ -12,14 +12,13 @@ use super::{ }; use crate::shim::clock::ChainEpoch; use crate::shim::version::NetworkVersion; +use crate::utils::cache::SizeTrackingLruCache; use crate::utils::net::global_http_client; use anyhow::Context as _; use async_trait::async_trait; use backon::{ExponentialBuilder, Retryable}; use bls_signatures::Serialize as _; use itertools::Itertools as _; -use lru::LruCache; -use parking_lot::RwLock; use serde::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize}; use tracing::debug; use url::Url; @@ -244,7 +243,7 @@ pub struct DrandBeacon { fil_round_time: u64, /// Keeps track of verified beacon entries. - verified_beacons: RwLock>, + verified_beacons: SizeTrackingLruCache, } impl DrandBeacon { @@ -262,14 +261,15 @@ impl DrandBeacon { drand_gen_time: config.chain_info.genesis_time as u64, fil_round_time: interval, fil_gen_time: genesis_ts, - verified_beacons: RwLock::new(LruCache::new( + verified_beacons: SizeTrackingLruCache::new_with_default_metrics_registry( + "verified_beacons_cache".into(), NonZeroUsize::new(CACHE_SIZE).expect("Infallible"), - )), + ), } } fn is_verified(&self, entry: &BeaconEntry) -> bool { - let cache = self.verified_beacons.read(); + let cache = self.verified_beacons.cache().read(); cache.peek(&entry.round()) == Some(entry) } } @@ -333,12 +333,12 @@ impl Beacon for DrandBeacon { }; if is_valid && !validated.is_empty() { - let mut cache = self.verified_beacons.write(); - if cache.cap().get() < validated.len() { - tracing::warn!(cap=%cache.cap().get(), validated_len=%validated.len(), "verified_beacons.cap() is too small"); + let cap = self.verified_beacons.cap(); + if cap < validated.len() { + tracing::warn!(%cap, validated_len=%validated.len(), "verified_beacons.cap() is too small"); } for entry in validated { - cache.put(entry.round(), entry.clone()); + self.verified_beacons.push(entry.round(), entry.clone()); } } @@ -346,7 +346,7 @@ impl Beacon for DrandBeacon { } async fn entry(&self, round: u64) -> anyhow::Result { - let cached: Option = self.verified_beacons.read().peek(&round).cloned(); + let cached: Option = self.verified_beacons.peek_cloned(&round); match cached { Some(cached_entry) => Ok(cached_entry), None => { diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index 8a3aa8793957..a2ddf345c128 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -16,10 +16,11 @@ use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic}; use crate::message::{Message as MessageTrait, SignedMessage}; use crate::networks::ChainConfig; use crate::shim::{address::Address, crypto::Signature}; +use crate::utils::cache::SizeTrackingLruCache; +use crate::utils::get_size::CidWrapper; use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use cid::Cid; use fvm_ipld_encoding::to_vec; -use lru::LruCache; use parking_lot::{Mutex, RwLock as SyncRwLock}; use tracing::error; use utils::{get_base_fee_lower_bound, recover_sig}; @@ -211,7 +212,7 @@ where #[allow(clippy::too_many_arguments)] pub async fn head_change( api: &T, - bls_sig_cache: &Mutex>, + bls_sig_cache: &SizeTrackingLruCache, repub_trigger: Arc>, republished: &SyncRwLock>, pending: &SyncRwLock>, @@ -233,7 +234,7 @@ where let (umsg, smsgs) = api.messages_for_block(block)?; msgs.extend(smsgs); for msg in umsg { - let smsg = recover_sig(&mut bls_sig_cache.lock(), msg)?; + let smsg = recover_sig(bls_sig_cache, msg)?; msgs.push(smsg) } } @@ -420,7 +421,7 @@ pub mod tests { let sig = Signature::new_secp256k1(vec![]); let signed = SignedMessage::new_unchecked(umsg, sig); let cid = signed.cid(); - pool.sig_val_cache.lock().put(cid, ()); + pool.sig_val_cache.push(cid.into(), ()); signed } diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 4ea337d1362b..3c0d70a361ec 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -23,13 +23,14 @@ use crate::shim::{ gas::{Gas, price_list_by_network_version}, }; use crate::state_manager::is_valid_for_sending; +use crate::utils::cache::SizeTrackingLruCache; +use crate::utils::get_size::CidWrapper; use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use anyhow::Context as _; use cid::Cid; use futures::StreamExt; use fvm_ipld_encoding::to_vec; use itertools::Itertools; -use lru::LruCache; use nonzero_ext::nonzero; use parking_lot::{Mutex, RwLock as SyncRwLock}; use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval}; @@ -182,9 +183,9 @@ pub struct MessagePool { /// Sender half to send messages to other components pub network_sender: flume::Sender, /// A cache for BLS signature keyed by Cid - pub bls_sig_cache: Arc>>, + pub bls_sig_cache: Arc>, /// A cache for BLS signature keyed by Cid - pub sig_val_cache: Arc>>, + pub sig_val_cache: Arc>, /// A set of republished messages identified by their Cid pub republished: Arc>>, /// Acts as a signal to republish messages from the republished set of @@ -261,14 +262,14 @@ where fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> { let cid = msg.cid(); - if let Some(()) = self.sig_val_cache.lock().get(&cid) { + if let Some(()) = self.sig_val_cache.get_cloned(&(cid).into()) { return Ok(()); } msg.verify(self.chain_config.eth_chain_id) .map_err(|e| Error::Other(e.to_string()))?; - self.sig_val_cache.lock().put(cid, ()); + self.sig_val_cache.push(cid.into(), ()); Ok(()) } @@ -413,7 +414,7 @@ where msg_vec.append(smsgs.as_mut()); for msg in umsg { - let smsg = recover_sig(&mut self.bls_sig_cache.lock(), msg)?; + let smsg = recover_sig(self.bls_sig_cache.as_ref(), msg)?; msg_vec.push(smsg) } } @@ -471,8 +472,14 @@ where let local_addrs = Arc::new(SyncRwLock::new(Vec::new())); let pending = Arc::new(SyncRwLock::new(HashMap::new())); let tipset = Arc::new(Mutex::new(api.get_heaviest_tipset())); - let bls_sig_cache = Arc::new(Mutex::new(LruCache::new(BLS_SIG_CACHE_SIZE))); - let sig_val_cache = Arc::new(Mutex::new(LruCache::new(SIG_VAL_CACHE_SIZE))); + let bls_sig_cache = Arc::new(SizeTrackingLruCache::new_with_default_metrics_registry( + "bls_sig_cache".into(), + BLS_SIG_CACHE_SIZE, + )); + let sig_val_cache = Arc::new(SizeTrackingLruCache::new_with_default_metrics_registry( + "sig_val_cache".into(), + SIG_VAL_CACHE_SIZE, + )); let local_msgs = Arc::new(SyncRwLock::new(HashSet::new())); let republished = Arc::new(SyncRwLock::new(HashSet::new())); let block_delay = chain_config.block_delay_secs; @@ -583,7 +590,7 @@ where /// hash-map. pub(in crate::message_pool) fn add_helper( api: &T, - bls_sig_cache: &Mutex>, + bls_sig_cache: &SizeTrackingLruCache, pending: &SyncRwLock>, msg: SignedMessage, sequence: u64, @@ -592,7 +599,7 @@ where T: Provider, { if msg.signature().signature_type() == SignatureType::Bls { - bls_sig_cache.lock().put(msg.cid(), msg.signature().clone()); + bls_sig_cache.push(msg.cid().into(), msg.signature().clone()); } if msg.message().gas_limit > 100_000_000 { diff --git a/src/message_pool/msgpool/utils.rs b/src/message_pool/msgpool/utils.rs index 814d38f1f08c..4a177f6a2c50 100644 --- a/src/message_pool/msgpool/utils.rs +++ b/src/message_pool/msgpool/utils.rs @@ -4,8 +4,8 @@ use crate::chain::MINIMUM_BASE_FEE; use crate::message::{Message as MessageTrait, SignedMessage}; use crate::shim::{crypto::Signature, econ::TokenAmount, message::Message}; -use cid::Cid; -use lru::LruCache; +use crate::utils::cache::SizeTrackingLruCache; +use crate::utils::get_size::CidWrapper; use num_rational::BigRational; use num_traits::ToPrimitive; @@ -46,12 +46,12 @@ pub(in crate::message_pool) fn get_gas_perf(gas_reward: &TokenAmount, gas_limit: /// Attempt to get a signed message that corresponds to an unsigned message in /// `bls_sig_cache`. pub(in crate::message_pool) fn recover_sig( - bls_sig_cache: &mut LruCache, + bls_sig_cache: &SizeTrackingLruCache, msg: Message, ) -> Result { let val = bls_sig_cache - .get(&msg.cid()) + .get_cloned(&(msg.cid()).into()) .ok_or_else(|| Error::Other("Could not recover sig".to_owned()))?; - let smsg = SignedMessage::new_from_parts(msg, val.clone())?; + let smsg = SignedMessage::new_from_parts(msg, val)?; Ok(smsg) } diff --git a/src/rpc/methods/f3.rs b/src/rpc/methods/f3.rs index acdb1dfa0bc8..c2c408a590c9 100644 --- a/src/rpc/methods/f3.rs +++ b/src/rpc/methods/f3.rs @@ -15,6 +15,14 @@ pub use self::types::{ }; use self::{types::*, util::*}; use super::wallet::WalletSign; +use crate::shim::actors::{ + convert::{ + from_policy_v13_to_v9, from_policy_v13_to_v10, from_policy_v13_to_v11, + from_policy_v13_to_v12, from_policy_v13_to_v14, from_policy_v13_to_v15, + from_policy_v13_to_v16, + }, + miner, power, +}; use crate::{ blocks::Tipset, chain::index::ResolveNullTipset, @@ -33,24 +41,12 @@ use crate::{ }, utils::misc::env::is_env_set_and_truthy, }; -use crate::{ - blocks::TipsetKey, - shim::actors::{ - convert::{ - from_policy_v13_to_v9, from_policy_v13_to_v10, from_policy_v13_to_v11, - from_policy_v13_to_v12, from_policy_v13_to_v14, from_policy_v13_to_v15, - from_policy_v13_to_v16, - }, - miner, power, - }, -}; use ahash::{HashMap, HashSet}; use anyhow::Context as _; use enumflags2::BitFlags; use fvm_ipld_blockstore::Blockstore; use jsonrpsee::core::{client::ClientT as _, params::ArrayParams}; use libp2p::PeerId; -use lru::LruCache; use num::Signed as _; use parking_lot::RwLock; use std::{ @@ -473,21 +469,11 @@ impl RpcMethod<1> for GetPowerTable { ctx: Ctx, (f3_tsk,): Self::Params, ) -> Result { - static CACHE: LazyLock>>> = - LazyLock::new(|| { - tokio::sync::Mutex::new(LruCache::new(32.try_into().expect("Infallible"))) - }); let tsk = f3_tsk.try_into()?; - let mut cache = CACHE.lock().await; - if let Some(v) = cache.get(&tsk) { - return Ok(v.clone()); - } - let start = std::time::Instant::now(); let ts = ctx.chain_index().load_required_tipset(&tsk)?; let power_entries = Self::compute(&ctx, &ts).await?; tracing::debug!(epoch=%ts.epoch(), %tsk, "F3.GetPowerTable, took {}", humantime::format_duration(start.elapsed())); - cache.push(tsk, power_entries.clone()); Ok(power_entries) } } diff --git a/src/shim/crypto.rs b/src/shim/crypto.rs index 8ea93e31776e..5b1ad4536d59 100644 --- a/src/shim/crypto.rs +++ b/src/shim/crypto.rs @@ -15,13 +15,14 @@ use fvm_ipld_encoding::{ ser, strict_bytes, }; pub use fvm_shared3::TICKET_RANDOMNESS_LOOKBACK; +use get_size2::GetSize; use num::FromPrimitive; use num_derive::FromPrimitive; use schemars::JsonSchema; use std::borrow::Cow; /// A cryptographic signature, represented in bytes, of any key protocol. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, PartialEq, Eq, Hash, GetSize)] #[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))] pub struct Signature { pub sig_type: SignatureType, @@ -307,6 +308,7 @@ pub fn cid_to_replica_commitment_v1(c: &Cid) -> Result strum::Display, strum::EnumString, JsonSchema, + GetSize, )] #[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))] #[repr(u8)] diff --git a/src/state_migration/common/mod.rs b/src/state_migration/common/mod.rs index 1d3fc1915f4a..e98e7369d3b4 100644 --- a/src/state_migration/common/mod.rs +++ b/src/state_migration/common/mod.rs @@ -6,7 +6,10 @@ use std::{num::NonZeroUsize, sync::Arc}; -use crate::shim::{address::Address, clock::ChainEpoch, econ::TokenAmount, state_tree::StateTree}; +use crate::{ + shim::{address::Address, clock::ChainEpoch, econ::TokenAmount, state_tree::StateTree}, + utils::{cache::SizeTrackingLruCache, get_size::CidWrapper}, +}; use cid::Cid; use fvm_ipld_blockstore::Blockstore; @@ -16,43 +19,44 @@ pub(in crate::state_migration) mod migrators; mod state_migration; pub(in crate::state_migration) mod verifier; -use lru::LruCache; -use parking_lot::RwLock; pub(in crate::state_migration) use state_migration::StateMigration; pub(in crate::state_migration) type Migrator = Arc + Send + Sync>; /// Cache of existing CID to CID migrations for an actor. #[derive(Clone)] pub(in crate::state_migration) struct MigrationCache { - cache: Arc>>, + cache: Arc>, } impl MigrationCache { pub fn new(size: NonZeroUsize) -> Self { Self { - cache: Arc::new(RwLock::new(LruCache::new(size))), + cache: Arc::new(SizeTrackingLruCache::new_with_default_metrics_registry( + "migration_cache".into(), + size, + )), } } pub fn get(&self, key: &str) -> Option { - self.cache.write().get(key).cloned() + self.cache.get_cloned(key).map(From::from) } pub fn get_or_insert_with(&self, key: String, f: F) -> anyhow::Result where F: FnOnce() -> anyhow::Result, { - if self.cache.read().contains(&key) { - Ok(self.cache.write().get(&key).cloned().unwrap()) + if let Some(v) = self.cache.get_cloned(&key) { + Ok(v.into()) } else { let v = f()?; - self.cache.write().put(key, v); + self.push(key, v); Ok(v) } } - pub fn insert(&self, key: String, value: Cid) { - self.cache.write().put(key, value); + pub fn push(&self, key: String, value: Cid) { + self.cache.push(key, value.into()); } } @@ -136,7 +140,7 @@ mod tests { fn test_migration_cache() { let cache = MigrationCache::new(NonZeroUsize::new(10).unwrap()); let cid = Cid::from_cbor_blake2b256(&42).unwrap(); - cache.insert("Cthulhu".to_owned(), cid); + cache.push("Cthulhu".to_owned(), cid); assert_eq!(cache.get("Cthulhu"), Some(cid)); assert_eq!(cache.get("Ao"), None); diff --git a/src/state_migration/nv21/miner.rs b/src/state_migration/nv21/miner.rs index f6a5293d31fd..3aee3e5d760a 100644 --- a/src/state_migration/nv21/miner.rs +++ b/src/state_migration/nv21/miner.rs @@ -127,8 +127,8 @@ impl MinerMigrator { out_array.flush()? }; - cache.insert(miner_prev_sectors_in_key(address), in_root.to_owned()); - cache.insert(miner_prev_sectors_out_key(address), out_root.to_owned()); + cache.push(miner_prev_sectors_in_key(address), in_root.to_owned()); + cache.push(miner_prev_sectors_out_key(address), out_root.to_owned()); Ok(out_root) }) .map(|cid| cid.to_owned()) diff --git a/src/utils/cache/lru.rs b/src/utils/cache/lru.rs index a1a2951e3317..e979ef087d9c 100644 --- a/src/utils/cache/lru.rs +++ b/src/utils/cache/lru.rs @@ -89,6 +89,10 @@ where Self::new_with_metrics_registry(cache_name, capacity, &mut default_registry()) } + pub fn cache(&self) -> &Arc>> { + &self.cache + } + pub fn push(&self, k: K, v: V) -> Option<(K, V)> { self.cache.write().push(k, v) } @@ -106,13 +110,17 @@ where K: Borrow, Q: Hash + Eq + ?Sized, { - self.cache.write().peek(k).cloned() + self.cache.read().peek(k).cloned() } pub fn len(&self) -> usize { self.cache.read().len() } + pub fn cap(&self) -> usize { + self.cache.read().cap().get() + } + fn size_in_bytes(&self) -> usize { let mut size = 0_usize; for (k, v) in self.cache.read().iter() {