diff --git a/.clippy.toml b/.clippy.toml index 03f8ecdc7497..12d47fd1eae7 100644 --- a/.clippy.toml +++ b/.clippy.toml @@ -43,3 +43,7 @@ reason = """use `crate::utils::new_uuid_v4` instead.""" [[disallowed-methods]] path = "tempfile::NamedTempFile::new" reason = """The temporary files created by this method are not persistable if the temporary directory lives on a different filesystem than the target directory. While it is valid in other contexts (if not persisting files), it was misused many times and so we are banning it. Consider using `tempfile::NamedTempFile::new_in` or `tempfile::NamedTempFile::Builder""" + +[[disallowed-methods]] +path = "lru::LruCache::unbounded" +reason = """Avoid unbounded lru cache for potential memory leak""" diff --git a/CHANGELOG.md b/CHANGELOG.md index e769382ac109..a809a2352f52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ ### Added +- [#5859](https://github.com/ChainSafe/forest/pull/5859) Added size metrics for zstd frame cache and made max size configurable via `FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE` environment variable. + ### Changed ### Removed @@ -37,6 +39,8 @@ - [#5863](https://github.com/ChainSafe/forest/pull/5863) Fixed needless GC runs on a stateless node. +- [#5859](https://github.com/ChainSafe/forest/pull/5859) Fixed size calculation for zstd frame cache. + ## Forest v0.28.0 "Denethor's Folly" This is a non-mandatory release recommended for all node operators. It includes numerous fixes and quality-of-life improvements for development and archival snapshot operations. It also includes a memory leak fix that would surface on long-running nodes. diff --git a/docs/dictionary.txt b/docs/dictionary.txt index e80d4827080a..af92c614eaed 100644 --- a/docs/dictionary.txt +++ b/docs/dictionary.txt @@ -117,3 +117,4 @@ V0 V1 VPS WIP +zstd diff --git a/docs/docs/users/reference/env_variables.md b/docs/docs/users/reference/env_variables.md index 390f0863832c..3457e8efd7c1 100644 --- a/docs/docs/users/reference/env_variables.md +++ b/docs/docs/users/reference/env_variables.md @@ -49,6 +49,7 @@ process. | `FOREST_SNAPSHOT_GC_INTERVAL_EPOCHS` | non-negative integer | 20160 | 8000 | The interval in epochs for scheduling snapshot GC | | `FOREST_SNAPSHOT_GC_CHECK_INTERVAL_SECONDS` | non-negative integer | 300 | 60 | The interval in seconds for checking if snapshot GC should run | | `FOREST_DISABLE_BAD_BLOCK_CACHE` | 1 or true | empty | 1 | Whether or not to disable bad block cache | +| `FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE` | positive integer | 268435456 | 536870912 | The default zstd frame cache max size in bytes | ### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT` diff --git a/src/db/car/any.rs b/src/db/car/any.rs index 298b3f866773..39dc6063965c 100644 --- a/src/db/car/any.rs +++ b/src/db/car/any.rs @@ -13,7 +13,6 @@ use crate::blocks::{Tipset, TipsetKey}; use crate::utils::io::EitherMmapOrRandomAccessFile; use cid::Cid; use fvm_ipld_blockstore::Blockstore; -use parking_lot::Mutex; use positioned_io::ReadAt; use std::borrow::Cow; use std::io::{Error, ErrorKind, Result}; @@ -89,7 +88,7 @@ impl AnyCar { } /// Set the z-frame cache of the inner CAR reader. - pub fn with_cache(self, cache: Arc>, key: CacheKey) -> Self { + pub fn with_cache(self, cache: Arc, key: CacheKey) -> Self { match self { AnyCar::Forest(f) => AnyCar::Forest(f.with_cache(cache, key)), AnyCar::Plain(p) => AnyCar::Plain(p), diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index 655f5a36b953..3d9db5e12ac7 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -61,7 +61,6 @@ use futures::{Stream, TryStream, TryStreamExt as _}; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::to_vec; use nunny::Vec as NonEmpty; -use parking_lot::Mutex; use positioned_io::{Cursor, ReadAt, ReadBytesAtExt, SizeCursor}; use std::io::{Seek, SeekFrom}; use std::path::Path; @@ -94,7 +93,7 @@ pub struct ForestCar { cache_key: CacheKey, indexed: index::Reader>, index_size_bytes: u32, - frame_cache: Arc>, + frame_cache: Arc, roots: NonEmpty, } @@ -113,7 +112,7 @@ impl ForestCar { cache_key: 0, indexed, index_size_bytes, - frame_cache: Arc::new(Mutex::new(ZstdFrameCache::default())), + frame_cache: Arc::new(ZstdFrameCache::default()), roots: header.roots, }) } @@ -179,7 +178,7 @@ impl ForestCar { } } - pub fn with_cache(self, cache: Arc>, key: CacheKey) -> Self { + pub fn with_cache(self, cache: Arc, key: CacheKey) -> Self { Self { cache_key: key, frame_cache: cache, @@ -203,7 +202,7 @@ where fn get(&self, k: &Cid) -> anyhow::Result>> { let indexed = &self.indexed; for position in indexed.get(*k)?.into_iter() { - let cache_query = self.frame_cache.lock().get(position, self.cache_key, *k); + let cache_query = self.frame_cache.get(position, self.cache_key, *k); match cache_query { // Frame cache hit, found value. Some(Some(val)) => return Ok(Some(val)), @@ -220,12 +219,10 @@ where UviBytes::::default().decode_eof(&mut zstd_frame)? { let CarBlock { cid, data } = CarBlock::from_bytes(block_frame)?; - block_map.insert(cid, data); + block_map.insert(cid.into(), data); } - let get_result = block_map.get(k).cloned(); - self.frame_cache - .lock() - .put(position, self.cache_key, block_map); + let get_result = block_map.get(&(*k).into()).cloned(); + self.frame_cache.put(position, self.cache_key, block_map); // This lookup only fails in case of a hash collision if let Some(value) = get_result { diff --git a/src/db/car/many.rs b/src/db/car/many.rs index e77dbc59b77d..ed5dc78810ff 100644 --- a/src/db/car/many.rs +++ b/src/db/car/many.rs @@ -23,7 +23,7 @@ use crate::{blocks::Tipset, libp2p_bitswap::BitswapStoreRead}; use anyhow::Context as _; use cid::Cid; use fvm_ipld_blockstore::Blockstore; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use std::cmp::Ord; use std::collections::BinaryHeap; use std::{path::PathBuf, sync::Arc}; @@ -64,7 +64,7 @@ impl PartialEq for WithHeaviestEpoch { } pub struct ManyCar { - shared_cache: Arc>, + shared_cache: Arc, read_only: Arc>>, writer: WriterT, } @@ -72,7 +72,7 @@ pub struct ManyCar { impl ManyCar { pub fn new(writer: WriterT) -> Self { ManyCar { - shared_cache: Arc::new(Mutex::new(ZstdFrameCache::default())), + shared_cache: Arc::new(ZstdFrameCache::default()), read_only: Arc::new(RwLock::new(BinaryHeap::default())), writer, } diff --git a/src/db/car/mod.rs b/src/db/car/mod.rs index aaf3f7e9a03d..f68528d46823 100644 --- a/src/db/car/mod.rs +++ b/src/db/car/mod.rs @@ -7,13 +7,22 @@ pub mod plain; pub use any::AnyCar; pub use forest::ForestCar; +use get_size2::GetSize as _; pub use many::ManyCar; pub use plain::PlainCar; use ahash::HashMap; use cid::Cid; -use lru::LruCache; use positioned_io::{ReadAt, Size}; +use std::{ + num::NonZeroUsize, + sync::{ + LazyLock, + atomic::{AtomicUsize, Ordering}, + }, +}; + +use crate::utils::{cache::SizeTrackingLruCache, get_size::CidWrapper}; pub trait RandomAccessFileReader: ReadAt + Size + Send + Sync + 'static {} impl RandomAccessFileReader for X {} @@ -24,55 +33,128 @@ pub type CacheKey = u64; type FrameOffset = u64; +pub static ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE: LazyLock = LazyLock::new(|| { + const ENV_KEY: &str = "FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE"; + if let Ok(value) = std::env::var(ENV_KEY) { + if let Ok(size) = value.parse::() { + let size = size.get(); + tracing::info!("zstd frame max size is set to {size} via {ENV_KEY}"); + return size; + } else { + tracing::error!( + "Failed to parse {ENV_KEY}={value}, value should be a positive integer" + ); + } + } + // 256 MiB + 256 * 1024 * 1024 +}); + pub struct ZstdFrameCache { /// Maximum size in bytes. Pages will be evicted if the total size of the /// cache exceeds this amount. pub max_size: usize, - current_size: usize, - lru: LruCache<(FrameOffset, CacheKey), HashMap>>, + current_size: AtomicUsize, + lru: SizeTrackingLruCache<(FrameOffset, CacheKey), HashMap>>, } impl Default for ZstdFrameCache { fn default() -> Self { - ZstdFrameCache::new(ZstdFrameCache::DEFAULT_SIZE) + ZstdFrameCache::new(*ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE) } } impl ZstdFrameCache { - // 1 GiB - pub const DEFAULT_SIZE: usize = 1024 * 1024 * 1024; - pub fn new(max_size: usize) -> Self { ZstdFrameCache { max_size, - current_size: 0, - lru: LruCache::unbounded(), + current_size: AtomicUsize::new(0), + lru: SizeTrackingLruCache::unbounded_with_default_metrics_registry( + "zstd_frame_cache".into(), + ), } } /// Return a clone of the value associated with `cid`. If a value is found, /// the cache entry is moved to the top of the queue. - pub fn get(&mut self, offset: FrameOffset, key: CacheKey, cid: Cid) -> Option>> { + pub fn get(&self, offset: FrameOffset, key: CacheKey, cid: Cid) -> Option>> { self.lru + .cache() + .write() .get(&(offset, key)) - .map(|index| index.get(&cid).cloned()) + .map(|index| index.get(&cid.into()).cloned()) } /// Insert entry into lru-cache and evict pages if `max_size` has been exceeded. - pub fn put(&mut self, offset: FrameOffset, key: CacheKey, index: HashMap>) { - fn size_of_entry(entry: &HashMap>) -> usize { - entry.values().map(Vec::len).sum::() + pub fn put(&self, offset: FrameOffset, key: CacheKey, index: HashMap>) { + let lru_key = (offset, key); + let lru_key_size = lru_key.get_size(); + let entry_size = index.get_size(); + // Skip large items + if entry_size.saturating_add(lru_key_size) >= self.max_size { + return; } - self.current_size += size_of_entry(&index); - if let Some(prev_entry) = self.lru.put((offset, key), index) { - self.current_size -= size_of_entry(&prev_entry); + + if let Some((_, prev_entry)) = self.lru.push(lru_key, index) { + // keys are cancelled out + self.current_size.fetch_add(entry_size, Ordering::Relaxed); + self.current_size + .fetch_sub(prev_entry.get_size(), Ordering::Relaxed); + } else { + self.current_size + .fetch_add(entry_size.saturating_add(lru_key_size), Ordering::Relaxed); } - while self.current_size > self.max_size { - if let Some((_, entry)) = self.lru.pop_lru() { - self.current_size -= size_of_entry(&entry) + while self.current_size.load(Ordering::Relaxed) > self.max_size { + if let Some((prev_key, prev_entry)) = self.lru.pop_lru() { + self.current_size.fetch_sub( + prev_key.get_size().saturating_add(prev_entry.get_size()), + Ordering::Relaxed, + ); } else { break; } } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::utils::{multihash::MultihashCode, rand::forest_rng}; + use ahash::HashMap; + use fvm_ipld_encoding::IPLD_RAW; + use multihash_derive::MultihashDigest; + use rand::Rng; + + #[test] + fn test_zstd_frame_cache_size() { + let mut rng = forest_rng(); + let cache = ZstdFrameCache::new(10); + for i in 0..100 { + let index = gen_index(&mut rng); + cache.put(i, i, index); + assert_eq!( + cache.current_size.load(Ordering::Relaxed), + cache.lru.size_in_bytes() + ); + let index2 = gen_index(&mut rng); + cache.put(i, i, index2); + assert_eq!( + cache.current_size.load(Ordering::Relaxed), + cache.lru.size_in_bytes() + ); + } + } + + fn gen_index(rng: &mut impl Rng) -> HashMap> { + let mut map = HashMap::default(); + for _ in 0..10 { + let vec_len = rng.gen_range(64..1024); + let mut data = vec![0; vec_len]; + rng.fill_bytes(&mut data); + let cid = Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data)); + map.insert(cid.into(), data); + } + map + } +} diff --git a/src/utils/cache/lru.rs b/src/utils/cache/lru.rs index e979ef087d9c..e58bd35a9d45 100644 --- a/src/utils/cache/lru.rs +++ b/src/utils/cache/lru.rs @@ -59,19 +59,29 @@ where registry.register_collector(Box::new(self.clone())); } - pub fn new_without_metrics_registry( - cache_name: Cow<'static, str>, - capacity: NonZeroUsize, - ) -> Self { + fn new_inner(cache_name: Cow<'static, str>, capacity: Option) -> Self { static ID_GENERATOR: AtomicUsize = AtomicUsize::new(0); Self { cache_id: ID_GENERATOR.fetch_add(1, Ordering::Relaxed), cache_name, - cache: Arc::new(RwLock::new(LruCache::new(capacity))), + #[allow(clippy::disallowed_methods)] + cache: Arc::new(RwLock::new( + capacity + .map(LruCache::new) + // For constructing lru cache that is bounded by memory usage instead of length + .unwrap_or_else(LruCache::unbounded), + )), } } + pub fn new_without_metrics_registry( + cache_name: Cow<'static, str>, + capacity: NonZeroUsize, + ) -> Self { + Self::new_inner(cache_name, Some(capacity)) + } + pub fn new_with_metrics_registry( cache_name: Cow<'static, str>, capacity: NonZeroUsize, @@ -89,6 +99,23 @@ where Self::new_with_metrics_registry(cache_name, capacity, &mut default_registry()) } + pub fn unbounded_without_metrics_registry(cache_name: Cow<'static, str>) -> Self { + Self::new_inner(cache_name, None) + } + + pub fn unbounded_with_metrics_registry( + cache_name: Cow<'static, str>, + metrics_registry: &mut Registry, + ) -> Self { + let c = Self::unbounded_without_metrics_registry(cache_name); + c.register_metrics(metrics_registry); + c + } + + pub fn unbounded_with_default_metrics_registry(cache_name: Cow<'static, str>) -> Self { + Self::unbounded_with_metrics_registry(cache_name, &mut default_registry()) + } + pub fn cache(&self) -> &Arc>> { &self.cache } @@ -113,6 +140,10 @@ where self.cache.read().peek(k).cloned() } + pub fn pop_lru(&self) -> Option<(K, V)> { + self.cache.write().pop_lru() + } + pub fn len(&self) -> usize { self.cache.read().len() } @@ -121,7 +152,7 @@ where self.cache.read().cap().get() } - fn size_in_bytes(&self) -> usize { + pub(crate) fn size_in_bytes(&self) -> usize { let mut size = 0_usize; for (k, v) in self.cache.read().iter() { size = size