Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ reason = """use `crate::utils::new_uuid_v4` instead."""
[[disallowed-methods]]
path = "tempfile::NamedTempFile::new"
reason = """The temporary files created by this method are not persistable if the temporary directory lives on a different filesystem than the target directory. While it is valid in other contexts (if not persisting files), it was misused many times and so we are banning it. Consider using `tempfile::NamedTempFile::new_in` or `tempfile::NamedTempFile::Builder"""

[[disallowed-methods]]
path = "lru::LruCache::unbounded"
reason = """Avoid unbounded lru cache for potential memory leak"""
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

### Added

- [#5859](https://github.com/ChainSafe/forest/pull/5859) Added size metrics for zstd frame cache and made max size configurable via `FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE` environment variable.

### Changed

### Removed
Expand All @@ -37,6 +39,8 @@

- [#5863](https://github.com/ChainSafe/forest/pull/5863) Fixed needless GC runs on a stateless node.

- [#5859](https://github.com/ChainSafe/forest/pull/5859) Fixed size calculation for zstd frame cache.

## Forest v0.28.0 "Denethor's Folly"

This is a non-mandatory release recommended for all node operators. It includes numerous fixes and quality-of-life improvements for development and archival snapshot operations. It also includes a memory leak fix that would surface on long-running nodes.
Expand Down
1 change: 1 addition & 0 deletions docs/dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,4 @@ V0
V1
VPS
WIP
zstd
1 change: 1 addition & 0 deletions docs/docs/users/reference/env_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ process.
| `FOREST_SNAPSHOT_GC_INTERVAL_EPOCHS` | non-negative integer | 20160 | 8000 | The interval in epochs for scheduling snapshot GC |
| `FOREST_SNAPSHOT_GC_CHECK_INTERVAL_SECONDS` | non-negative integer | 300 | 60 | The interval in seconds for checking if snapshot GC should run |
| `FOREST_DISABLE_BAD_BLOCK_CACHE` | 1 or true | empty | 1 | Whether or not to disable bad block cache |
| `FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE` | positive integer | 268435456 | 536870912 | The default zstd frame cache max size in bytes |

### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT`

Expand Down
3 changes: 1 addition & 2 deletions src/db/car/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::blocks::{Tipset, TipsetKey};
use crate::utils::io::EitherMmapOrRandomAccessFile;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use parking_lot::Mutex;
use positioned_io::ReadAt;
use std::borrow::Cow;
use std::io::{Error, ErrorKind, Result};
Expand Down Expand Up @@ -89,7 +88,7 @@ impl<ReaderT: RandomAccessFileReader> AnyCar<ReaderT> {
}

/// Set the z-frame cache of the inner CAR reader.
pub fn with_cache(self, cache: Arc<Mutex<ZstdFrameCache>>, key: CacheKey) -> Self {
pub fn with_cache(self, cache: Arc<ZstdFrameCache>, key: CacheKey) -> Self {
match self {
AnyCar::Forest(f) => AnyCar::Forest(f.with_cache(cache, key)),
AnyCar::Plain(p) => AnyCar::Plain(p),
Expand Down
17 changes: 7 additions & 10 deletions src/db/car/forest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ use futures::{Stream, TryStream, TryStreamExt as _};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::to_vec;
use nunny::Vec as NonEmpty;
use parking_lot::Mutex;
use positioned_io::{Cursor, ReadAt, ReadBytesAtExt, SizeCursor};
use std::io::{Seek, SeekFrom};
use std::path::Path;
Expand Down Expand Up @@ -94,7 +93,7 @@ pub struct ForestCar<ReaderT> {
cache_key: CacheKey,
indexed: index::Reader<positioned_io::Slice<ReaderT>>,
index_size_bytes: u32,
frame_cache: Arc<Mutex<ZstdFrameCache>>,
frame_cache: Arc<ZstdFrameCache>,
roots: NonEmpty<Cid>,
}

Expand All @@ -113,7 +112,7 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
cache_key: 0,
indexed,
index_size_bytes,
frame_cache: Arc::new(Mutex::new(ZstdFrameCache::default())),
frame_cache: Arc::new(ZstdFrameCache::default()),
roots: header.roots,
})
}
Expand Down Expand Up @@ -179,7 +178,7 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
}
}

pub fn with_cache(self, cache: Arc<Mutex<ZstdFrameCache>>, key: CacheKey) -> Self {
pub fn with_cache(self, cache: Arc<ZstdFrameCache>, key: CacheKey) -> Self {
Self {
cache_key: key,
frame_cache: cache,
Expand All @@ -203,7 +202,7 @@ where
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
let indexed = &self.indexed;
for position in indexed.get(*k)?.into_iter() {
let cache_query = self.frame_cache.lock().get(position, self.cache_key, *k);
let cache_query = self.frame_cache.get(position, self.cache_key, *k);
match cache_query {
// Frame cache hit, found value.
Some(Some(val)) => return Ok(Some(val)),
Expand All @@ -220,12 +219,10 @@ where
UviBytes::<Bytes>::default().decode_eof(&mut zstd_frame)?
{
let CarBlock { cid, data } = CarBlock::from_bytes(block_frame)?;
block_map.insert(cid, data);
block_map.insert(cid.into(), data);
}
let get_result = block_map.get(k).cloned();
self.frame_cache
.lock()
.put(position, self.cache_key, block_map);
let get_result = block_map.get(&(*k).into()).cloned();
self.frame_cache.put(position, self.cache_key, block_map);

// This lookup only fails in case of a hash collision
if let Some(value) = get_result {
Expand Down
6 changes: 3 additions & 3 deletions src/db/car/many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{blocks::Tipset, libp2p_bitswap::BitswapStoreRead};
use anyhow::Context as _;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use std::cmp::Ord;
use std::collections::BinaryHeap;
use std::{path::PathBuf, sync::Arc};
Expand Down Expand Up @@ -64,15 +64,15 @@ impl PartialEq for WithHeaviestEpoch {
}

pub struct ManyCar<WriterT = MemoryDB> {
shared_cache: Arc<Mutex<ZstdFrameCache>>,
shared_cache: Arc<ZstdFrameCache>,
read_only: Arc<RwLock<BinaryHeap<WithHeaviestEpoch>>>,
writer: WriterT,
}

impl<WriterT> ManyCar<WriterT> {
pub fn new(writer: WriterT) -> Self {
ManyCar {
shared_cache: Arc::new(Mutex::new(ZstdFrameCache::default())),
shared_cache: Arc::new(ZstdFrameCache::default()),
read_only: Arc::new(RwLock::new(BinaryHeap::default())),
writer,
}
Expand Down
122 changes: 102 additions & 20 deletions src/db/car/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@ pub mod plain;

pub use any::AnyCar;
pub use forest::ForestCar;
use get_size2::GetSize as _;
pub use many::ManyCar;
pub use plain::PlainCar;

use ahash::HashMap;
use cid::Cid;
use lru::LruCache;
use positioned_io::{ReadAt, Size};
use std::{
num::NonZeroUsize,
sync::{
LazyLock,
atomic::{AtomicUsize, Ordering},
},
};

use crate::utils::{cache::SizeTrackingLruCache, get_size::CidWrapper};

pub trait RandomAccessFileReader: ReadAt + Size + Send + Sync + 'static {}
impl<X: ReadAt + Size + Send + Sync + 'static> RandomAccessFileReader for X {}
Expand All @@ -24,55 +33,128 @@ pub type CacheKey = u64;

type FrameOffset = u64;

pub static ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE: LazyLock<usize> = LazyLock::new(|| {
const ENV_KEY: &str = "FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE";
if let Ok(value) = std::env::var(ENV_KEY) {
if let Ok(size) = value.parse::<NonZeroUsize>() {
let size = size.get();
tracing::info!("zstd frame max size is set to {size} via {ENV_KEY}");
return size;
} else {
tracing::error!(
"Failed to parse {ENV_KEY}={value}, value should be a positive integer"
);
}
}
// 256 MiB
256 * 1024 * 1024
});

pub struct ZstdFrameCache {
/// Maximum size in bytes. Pages will be evicted if the total size of the
/// cache exceeds this amount.
pub max_size: usize,
current_size: usize,
lru: LruCache<(FrameOffset, CacheKey), HashMap<Cid, Vec<u8>>>,
current_size: AtomicUsize,
lru: SizeTrackingLruCache<(FrameOffset, CacheKey), HashMap<CidWrapper, Vec<u8>>>,
}

impl Default for ZstdFrameCache {
fn default() -> Self {
ZstdFrameCache::new(ZstdFrameCache::DEFAULT_SIZE)
ZstdFrameCache::new(*ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE)
}
}

impl ZstdFrameCache {
// 1 GiB
pub const DEFAULT_SIZE: usize = 1024 * 1024 * 1024;

pub fn new(max_size: usize) -> Self {
ZstdFrameCache {
max_size,
current_size: 0,
lru: LruCache::unbounded(),
current_size: AtomicUsize::new(0),
lru: SizeTrackingLruCache::unbounded_with_default_metrics_registry(
"zstd_frame_cache".into(),
),
}
}

/// Return a clone of the value associated with `cid`. If a value is found,
/// the cache entry is moved to the top of the queue.
pub fn get(&mut self, offset: FrameOffset, key: CacheKey, cid: Cid) -> Option<Option<Vec<u8>>> {
pub fn get(&self, offset: FrameOffset, key: CacheKey, cid: Cid) -> Option<Option<Vec<u8>>> {
self.lru
.cache()
.write()
.get(&(offset, key))
.map(|index| index.get(&cid).cloned())
.map(|index| index.get(&cid.into()).cloned())
}

/// Insert entry into lru-cache and evict pages if `max_size` has been exceeded.
pub fn put(&mut self, offset: FrameOffset, key: CacheKey, index: HashMap<Cid, Vec<u8>>) {
fn size_of_entry(entry: &HashMap<Cid, Vec<u8>>) -> usize {
entry.values().map(Vec::len).sum::<usize>()
pub fn put(&self, offset: FrameOffset, key: CacheKey, index: HashMap<CidWrapper, Vec<u8>>) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love it if we could have some coverage here, both on the logic level and size calculation correctness.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

let lru_key = (offset, key);
let lru_key_size = lru_key.get_size();
let entry_size = index.get_size();
// Skip large items
if entry_size.saturating_add(lru_key_size) >= self.max_size {
return;
}
self.current_size += size_of_entry(&index);
if let Some(prev_entry) = self.lru.put((offset, key), index) {
self.current_size -= size_of_entry(&prev_entry);

if let Some((_, prev_entry)) = self.lru.push(lru_key, index) {
// keys are cancelled out
self.current_size.fetch_add(entry_size, Ordering::Relaxed);
self.current_size
.fetch_sub(prev_entry.get_size(), Ordering::Relaxed);
} else {
self.current_size
.fetch_add(entry_size.saturating_add(lru_key_size), Ordering::Relaxed);
}
while self.current_size > self.max_size {
if let Some((_, entry)) = self.lru.pop_lru() {
self.current_size -= size_of_entry(&entry)
while self.current_size.load(Ordering::Relaxed) > self.max_size {
if let Some((prev_key, prev_entry)) = self.lru.pop_lru() {
self.current_size.fetch_sub(
prev_key.get_size().saturating_add(prev_entry.get_size()),
Ordering::Relaxed,
);
} else {
break;
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::utils::{multihash::MultihashCode, rand::forest_rng};
use ahash::HashMap;
use fvm_ipld_encoding::IPLD_RAW;
use multihash_derive::MultihashDigest;
use rand::Rng;

#[test]
fn test_zstd_frame_cache_size() {
let mut rng = forest_rng();
let cache = ZstdFrameCache::new(10);
for i in 0..100 {
let index = gen_index(&mut rng);
cache.put(i, i, index);
assert_eq!(
cache.current_size.load(Ordering::Relaxed),
cache.lru.size_in_bytes()
);
let index2 = gen_index(&mut rng);
cache.put(i, i, index2);
assert_eq!(
cache.current_size.load(Ordering::Relaxed),
cache.lru.size_in_bytes()
);
}
}

fn gen_index(rng: &mut impl Rng) -> HashMap<CidWrapper, Vec<u8>> {
let mut map = HashMap::default();
for _ in 0..10 {
let vec_len = rng.gen_range(64..1024);
let mut data = vec![0; vec_len];
rng.fill_bytes(&mut data);
let cid = Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data));
map.insert(cid.into(), data);
}
map
}
}
43 changes: 37 additions & 6 deletions src/utils/cache/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,29 @@ where
registry.register_collector(Box::new(self.clone()));
}

pub fn new_without_metrics_registry(
cache_name: Cow<'static, str>,
capacity: NonZeroUsize,
) -> Self {
fn new_inner(cache_name: Cow<'static, str>, capacity: Option<NonZeroUsize>) -> Self {
static ID_GENERATOR: AtomicUsize = AtomicUsize::new(0);

Self {
cache_id: ID_GENERATOR.fetch_add(1, Ordering::Relaxed),
cache_name,
cache: Arc::new(RwLock::new(LruCache::new(capacity))),
#[allow(clippy::disallowed_methods)]
cache: Arc::new(RwLock::new(
capacity
.map(LruCache::new)
// For constructing lru cache that is bounded by memory usage instead of length
.unwrap_or_else(LruCache::unbounded),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we ban the LruCache::unbounded and mark this as exception with proper comment?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

)),
}
}

pub fn new_without_metrics_registry(
cache_name: Cow<'static, str>,
capacity: NonZeroUsize,
) -> Self {
Self::new_inner(cache_name, Some(capacity))
}

pub fn new_with_metrics_registry(
cache_name: Cow<'static, str>,
capacity: NonZeroUsize,
Expand All @@ -89,6 +99,23 @@ where
Self::new_with_metrics_registry(cache_name, capacity, &mut default_registry())
}

pub fn unbounded_without_metrics_registry(cache_name: Cow<'static, str>) -> Self {
Self::new_inner(cache_name, None)
}

pub fn unbounded_with_metrics_registry(
cache_name: Cow<'static, str>,
metrics_registry: &mut Registry,
) -> Self {
let c = Self::unbounded_without_metrics_registry(cache_name);
c.register_metrics(metrics_registry);
c
}

pub fn unbounded_with_default_metrics_registry(cache_name: Cow<'static, str>) -> Self {
Self::unbounded_with_metrics_registry(cache_name, &mut default_registry())
}

pub fn cache(&self) -> &Arc<RwLock<LruCache<K, V>>> {
&self.cache
}
Expand All @@ -113,6 +140,10 @@ where
self.cache.read().peek(k).cloned()
}

pub fn pop_lru(&self) -> Option<(K, V)> {
self.cache.write().pop_lru()
}

pub fn len(&self) -> usize {
self.cache.read().len()
}
Expand All @@ -121,7 +152,7 @@ where
self.cache.read().cap().get()
}

fn size_in_bytes(&self) -> usize {
pub(crate) fn size_in_bytes(&self) -> usize {
let mut size = 0_usize;
for (k, v) in self.cache.read().iter() {
size = size
Expand Down
Loading