diff --git a/foyer/Cargo.toml b/foyer/Cargo.toml index 720e6ca6..bbc3834d 100644 --- a/foyer/Cargo.toml +++ b/foyer/Cargo.toml @@ -21,6 +21,7 @@ nix = { version = "0.26", features = ["fs", "mman"] } parking_lot = "0.12" paste = "1.0" prometheus = "0.13" +rand = "0.8.5" thiserror = "1" tokio = { version = "1", features = [ "rt", @@ -36,6 +37,5 @@ twox-hash = "1" bytesize = "1" clap = { version = "4", features = ["derive"] } hdrhistogram = "7" -rand = "0.8.5" rand_mt = "4.2.1" tempfile = "3" diff --git a/foyer/src/store/read_only_file_store.rs b/foyer/src/store/read_only_file_store.rs index 8730fbdf..c75c5ed8 100644 --- a/foyer/src/store/read_only_file_store.rs +++ b/foyer/src/store/read_only_file_store.rs @@ -23,7 +23,9 @@ use std::{collections::HashMap, marker::PhantomData}; use async_trait::async_trait; +use bytes::{Buf, BufMut}; use itertools::Itertools; +use rand::{thread_rng, Rng}; use tokio::sync::{RwLock, RwLockWriteGuard}; use crate::{Data, Index}; @@ -139,19 +141,18 @@ where let dir = config.dir.clone(); move || { create_dir_all(&dir)?; - let ids: Vec = read_dir(dir)? + + let mut ids: Vec = read_dir(dir)? .map(|entry| entry.unwrap()) .filter(|entry| { - entry - .file_name() - .to_string_lossy() - .is_prefix_of(META_FILE_PREFIX) + META_FILE_PREFIX.is_prefix_of(&entry.file_name().to_string_lossy()) }) .map(|entry| { entry.file_name().to_string_lossy()[META_FILE_PREFIX.len()..].to_string() }) .map(|s| s.parse().unwrap()) .collect_vec(); + ids.sort(); Ok(ids) } }) @@ -187,11 +188,20 @@ where #[allow(clippy::uninit_vec)] async fn store(&self, index: Self::I, data: Self::D) -> Result<()> { - let buf = data.into(); - let len = buf.len(); - // append cache file and meta file let (fid, sid, location) = { + // randomly drop if size exceeds the threshold + if self.size.load(Ordering::Relaxed) as f64 + >= self.config.capacity as f64 * self.config.trigger_random_drop_ratio + { + let mut rng = thread_rng(); + if rng.gen_range(0.0..1.0) < self.config.random_drop_ratio { + return Ok(()); + } + } + + let buf = data.into(); + let files = self.files.read().await; let fid = files.active.fid; @@ -199,9 +209,14 @@ where let location = files.active.cache_file.append(buf).await?; let mut buf = Vec::with_capacity(Self::meta_entry_size()); + + let tags = Tags { valid: true }; + unsafe { buf.set_len(Self::meta_entry_size()) }; - index.write(&mut buf[..]); - location.write(&mut buf[I::size()..]); + tags.write(&mut buf[..]); + index.write(&mut buf[Tags::size()..]); + location.write(&mut buf[Tags::size() + I::size()..]); + let Location { offset: meta_offset, len: _, @@ -219,7 +234,8 @@ where drop(indices); } - self.size.fetch_add(len, Ordering::Relaxed); + self.size + .fetch_add(location.len as usize, Ordering::Relaxed); if active_file_size >= self.config.max_file_size { let files = self.files.write().await; @@ -434,8 +450,12 @@ where let buf = meta.read(0, size).await?; for sid in 0..slots { let slice = &buf[sid * Self::meta_entry_size()..(sid + 1) * Self::meta_entry_size()]; - let index = I::read(slice); - let location = Location::read(&slice[I::size()..]); + let tags = Tags::read(slice); + if !tags.valid { + continue; + } + let index = I::read(&slice[Tags::size()..]); + let location = Location::read(&slice[Tags::size() + I::size()..]); indices.insert(index, (fid, sid as SlotId, location)); } Ok(()) @@ -450,7 +470,29 @@ where } fn meta_entry_size() -> usize { - I::size() + Location::size() + I::size() + Location::size() + Tags::size() + } +} + +struct Tags { + valid: bool, +} + +impl Tags { + fn size() -> usize { + 1 + } + + fn write(&self, mut buf: &mut [u8]) { + let mut val = 0; + val |= self.valid as u8; + buf.put_u8(val); + } + + fn read(mut buf: &[u8]) -> Self { + let val = buf.get_u8(); + let valid = (val & 1) != 0; + Self { valid } } } @@ -514,4 +556,49 @@ mod tests { drop(dir); } + + #[tokio::test] + async fn test_read_only_file_store_recovery() { + let dir = tempdir().unwrap(); + + let config = Config { + dir: dir.path().to_owned(), + max_file_size: 4 * 1024, + capacity: 16 * 1024, + trigger_reclaim_garbage_ratio: 0.0, // disabled + trigger_reclaim_capacity_ratio: 0.75, + trigger_random_drop_ratio: 0.0, // disabled + random_drop_ratio: 0.0, // disabled + }; + + let store: ReadOnlyFileStore> = + ReadOnlyFileStore::open(0, config.clone()).await.unwrap(); + + for i in 0..20 { + store.store(i, data(i as u8, 1024)).await.unwrap(); + } + + assert_eq!(store.files.read().await.frozens.len(), 2); + for i in 0..12 { + assert_eq!(store.load(&i).await.unwrap(), None); + } + for i in 12..20 { + assert_eq!(store.load(&i).await.unwrap(), Some(data(i as u8, 1024))); + } + + drop(store); + + let store: ReadOnlyFileStore> = + ReadOnlyFileStore::open(0, config).await.unwrap(); + + assert_eq!(store.files.read().await.frozens.len(), 3); + for i in 0..12 { + assert_eq!(store.load(&i).await.unwrap(), None); + } + for i in 12..20 { + assert_eq!(store.load(&i).await.unwrap(), Some(data(i as u8, 1024))); + } + + drop(dir); + } }