Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,4 @@ V0
V1
VPS
WIP
zstd
1 change: 1 addition & 0 deletions docs/docs/users/reference/env_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ process.
| `FOREST_SNAPSHOT_GC_INTERVAL_EPOCHS` | non-negative integer | 20160 | 8000 | The interval in epochs for scheduling snapshot GC |
| `FOREST_SNAPSHOT_GC_CHECK_INTERVAL_SECONDS` | non-negative integer | 300 | 60 | The interval in seconds for checking if snapshot GC should run |
| `FOREST_DISABLE_BAD_BLOCK_CACHE` | 1 or true | empty | 1 | Whether or not to disable bad block cache |
| `FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE` | positive integer | 1073741824 | 536870912 | The default zstd frame cache max size in bytes |

### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT`

Expand Down
3 changes: 1 addition & 2 deletions src/db/car/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::db::PersistentStore;
use crate::utils::io::EitherMmapOrRandomAccessFile;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use parking_lot::Mutex;
use positioned_io::ReadAt;
use std::borrow::Cow;
use std::io::{Error, ErrorKind, Result};
Expand Down Expand Up @@ -90,7 +89,7 @@ impl<ReaderT: RandomAccessFileReader> AnyCar<ReaderT> {
}

/// Set the z-frame cache of the inner CAR reader.
pub fn with_cache(self, cache: Arc<Mutex<ZstdFrameCache>>, key: CacheKey) -> Self {
pub fn with_cache(self, cache: Arc<ZstdFrameCache>, key: CacheKey) -> Self {
match self {
AnyCar::Forest(f) => AnyCar::Forest(f.with_cache(cache, key)),
AnyCar::Plain(p) => AnyCar::Plain(p),
Expand Down
18 changes: 8 additions & 10 deletions src/db/car/forest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use futures::{Stream, TryStream, TryStreamExt as _};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::to_vec;
use nunny::Vec as NonEmpty;
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use positioned_io::{Cursor, ReadAt, ReadBytesAtExt, SizeCursor};
use std::io::{Seek, SeekFrom};
use std::path::Path;
Expand Down Expand Up @@ -95,7 +95,7 @@ pub struct ForestCar<ReaderT> {
cache_key: CacheKey,
indexed: index::Reader<positioned_io::Slice<ReaderT>>,
index_size_bytes: u32,
frame_cache: Arc<Mutex<ZstdFrameCache>>,
frame_cache: Arc<ZstdFrameCache>,
write_cache: Arc<RwLock<ahash::HashMap<Cid, Vec<u8>>>>,
roots: NonEmpty<Cid>,
}
Expand All @@ -115,7 +115,7 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
cache_key: 0,
indexed,
index_size_bytes,
frame_cache: Arc::new(Mutex::new(ZstdFrameCache::default())),
frame_cache: Arc::new(ZstdFrameCache::default()),
write_cache: Arc::new(RwLock::new(ahash::HashMap::default())),
roots: header.roots,
})
Expand Down Expand Up @@ -183,7 +183,7 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
}
}

pub fn with_cache(self, cache: Arc<Mutex<ZstdFrameCache>>, key: CacheKey) -> Self {
pub fn with_cache(self, cache: Arc<ZstdFrameCache>, key: CacheKey) -> Self {
Self {
cache_key: key,
frame_cache: cache,
Expand Down Expand Up @@ -212,7 +212,7 @@ where

let indexed = &self.indexed;
for position in indexed.get(*k)?.into_iter() {
let cache_query = self.frame_cache.lock().get(position, self.cache_key, *k);
let cache_query = self.frame_cache.get(position, self.cache_key, *k);
match cache_query {
// Frame cache hit, found value.
Some(Some(val)) => return Ok(Some(val)),
Expand All @@ -229,12 +229,10 @@ where
UviBytes::<Bytes>::default().decode_eof(&mut zstd_frame)?
{
let CarBlock { cid, data } = CarBlock::from_bytes(block_frame)?;
block_map.insert(cid, data);
block_map.insert(cid.into(), data);
}
let get_result = block_map.get(k).cloned();
self.frame_cache
.lock()
.put(position, self.cache_key, block_map);
let get_result = block_map.get(&(*k).into()).cloned();
self.frame_cache.put(position, self.cache_key, block_map);

// This lookup only fails in case of a hash collision
if let Some(value) = get_result {
Expand Down
6 changes: 3 additions & 3 deletions src/db/car/many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{blocks::Tipset, libp2p_bitswap::BitswapStoreRead};
use anyhow::Context as _;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use std::cmp::Ord;
use std::collections::BinaryHeap;
use std::{path::PathBuf, sync::Arc};
Expand Down Expand Up @@ -64,15 +64,15 @@ impl PartialEq for WithHeaviestEpoch {
}

pub struct ManyCar<WriterT = MemoryDB> {
shared_cache: Arc<Mutex<ZstdFrameCache>>,
shared_cache: Arc<ZstdFrameCache>,
read_only: Arc<RwLock<BinaryHeap<WithHeaviestEpoch>>>,
writer: WriterT,
}

impl<WriterT> ManyCar<WriterT> {
pub fn new(writer: WriterT) -> Self {
ManyCar {
shared_cache: Arc::new(Mutex::new(ZstdFrameCache::default())),
shared_cache: Arc::new(ZstdFrameCache::default()),
read_only: Arc::new(RwLock::new(BinaryHeap::default())),
writer,
}
Expand Down
78 changes: 58 additions & 20 deletions src/db/car/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@ pub mod plain;

pub use any::AnyCar;
pub use forest::ForestCar;
use get_size2::GetSize as _;
pub use many::ManyCar;
pub use plain::PlainCar;

use ahash::HashMap;
use cid::Cid;
use lru::LruCache;
use positioned_io::{ReadAt, Size};
use std::{
num::NonZeroUsize,
sync::{
LazyLock,
atomic::{AtomicUsize, Ordering},
},
};

use crate::utils::{cache::SizeTrackingLruCache, get_size::CidWrapper};

pub trait RandomAccessFileReader: ReadAt + Size + Send + Sync + 'static {}
impl<X: ReadAt + Size + Send + Sync + 'static> RandomAccessFileReader for X {}
Expand All @@ -24,52 +33,81 @@ pub type CacheKey = u64;

type FrameOffset = u64;

pub static ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE: LazyLock<usize> = LazyLock::new(|| {
const ENV_KEY: &str = "FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE";
if let Ok(value) = std::env::var(ENV_KEY) {
if let Ok(size) = value.parse::<NonZeroUsize>() {
let size = size.get();
tracing::info!("zstd frame max size is set to {size} via {ENV_KEY}");
return size;
} else {
tracing::warn!("Failed to parse {ENV_KEY}={value}, value should be a positive integer");
Comment thread
hanabi1224 marked this conversation as resolved.
Outdated
}
}
// 1GiB
1024 * 1024 * 1024
});

pub struct ZstdFrameCache {
/// Maximum size in bytes. Pages will be evicted if the total size of the
/// cache exceeds this amount.
pub max_size: usize,
current_size: usize,
lru: LruCache<(FrameOffset, CacheKey), HashMap<Cid, Vec<u8>>>,
current_size: AtomicUsize,
lru: SizeTrackingLruCache<(FrameOffset, CacheKey), HashMap<CidWrapper, Vec<u8>>>,
}

impl Default for ZstdFrameCache {
fn default() -> Self {
ZstdFrameCache::new(ZstdFrameCache::DEFAULT_SIZE)
ZstdFrameCache::new(*ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE)
}
}

impl ZstdFrameCache {
// 1 GiB
pub const DEFAULT_SIZE: usize = 1024 * 1024 * 1024;

pub fn new(max_size: usize) -> Self {
ZstdFrameCache {
max_size,
current_size: 0,
lru: LruCache::unbounded(),
current_size: AtomicUsize::new(0),
lru: SizeTrackingLruCache::unbounded_with_default_metrics_registry(
"zstd_frame_cache".into(),
),
}
}

/// Return a clone of the value associated with `cid`. If a value is found,
/// the cache entry is moved to the top of the queue.
pub fn get(&mut self, offset: FrameOffset, key: CacheKey, cid: Cid) -> Option<Option<Vec<u8>>> {
pub fn get(&self, offset: FrameOffset, key: CacheKey, cid: Cid) -> Option<Option<Vec<u8>>> {
self.lru
.cache()
.write()
.get(&(offset, key))
.map(|index| index.get(&cid).cloned())
.map(|index| index.get(&cid.into()).cloned())
}

/// Insert entry into lru-cache and evict pages if `max_size` has been exceeded.
pub fn put(&mut self, offset: FrameOffset, key: CacheKey, index: HashMap<Cid, Vec<u8>>) {
fn size_of_entry(entry: &HashMap<Cid, Vec<u8>>) -> usize {
entry.values().map(Vec::len).sum::<usize>()
pub fn put(&self, offset: FrameOffset, key: CacheKey, index: HashMap<CidWrapper, Vec<u8>>) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love it if we could have some coverage here, both on the logic level and size calculation correctness.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

let lru_key = (offset, key);
let lru_key_size = lru_key.get_size();
let entry_size = index.get_size();
// Skip large items
if entry_size.saturating_add(lru_key_size) >= self.max_size {
return;
}
self.current_size += size_of_entry(&index);
if let Some(prev_entry) = self.lru.put((offset, key), index) {
self.current_size -= size_of_entry(&prev_entry);

if let Some((_, prev_entry)) = self.lru.push(lru_key, index) {
// keys are cancelled out
self.current_size.fetch_add(entry_size, Ordering::Relaxed);
self.current_size
.fetch_sub(prev_entry.get_size(), Ordering::Relaxed);
} else {
self.current_size
.fetch_add(entry_size.saturating_add(lru_key_size), Ordering::Relaxed);
}
while self.current_size > self.max_size {
if let Some((_, entry)) = self.lru.pop_lru() {
self.current_size -= size_of_entry(&entry)
while self.current_size.load(Ordering::Relaxed) > self.max_size {
if let Some((prev_key, prev_entry)) = self.lru.pop_lru() {
self.current_size.fetch_sub(
prev_key.get_size().saturating_add(prev_entry.get_size()),
Ordering::Relaxed,
);
} else {
break;
}
Expand Down
39 changes: 34 additions & 5 deletions src/utils/cache/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,27 @@ where
registry.register_collector(Box::new(self.clone()));
}

pub fn new_without_metrics_registry(
cache_name: Cow<'static, str>,
capacity: NonZeroUsize,
) -> Self {
fn new_inner(cache_name: Cow<'static, str>, capacity: Option<NonZeroUsize>) -> Self {
static ID_GENERATOR: AtomicUsize = AtomicUsize::new(0);

Self {
cache_id: ID_GENERATOR.fetch_add(1, Ordering::Relaxed),
cache_name,
cache: Arc::new(RwLock::new(LruCache::new(capacity))),
cache: Arc::new(RwLock::new(
capacity
.map(LruCache::new)
.unwrap_or_else(LruCache::unbounded),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we ban the LruCache::unbounded and mark this as exception with proper comment?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

)),
}
}

pub fn new_without_metrics_registry(
cache_name: Cow<'static, str>,
capacity: NonZeroUsize,
) -> Self {
Self::new_inner(cache_name, Some(capacity))
}

pub fn new_with_metrics_registry(
cache_name: Cow<'static, str>,
capacity: NonZeroUsize,
Expand All @@ -89,6 +97,23 @@ where
Self::new_with_metrics_registry(cache_name, capacity, &mut default_registry())
}

pub fn unbounded_without_metrics_registry(cache_name: Cow<'static, str>) -> Self {
Self::new_inner(cache_name, None)
}

pub fn unbounded_with_metrics_registry(
cache_name: Cow<'static, str>,
metrics_registry: &mut Registry,
) -> Self {
let c = Self::unbounded_without_metrics_registry(cache_name);
c.register_metrics(metrics_registry);
c
}

pub fn unbounded_with_default_metrics_registry(cache_name: Cow<'static, str>) -> Self {
Self::unbounded_with_metrics_registry(cache_name, &mut default_registry())
}

pub fn cache(&self) -> &Arc<RwLock<LruCache<K, V>>> {
&self.cache
}
Expand All @@ -113,6 +138,10 @@ where
self.cache.read().peek(k).cloned()
}

pub fn pop_lru(&self) -> Option<(K, V)> {
self.cache.write().pop_lru()
}

pub fn len(&self) -> usize {
self.cache.read().len()
}
Expand Down
Loading