Skip to content

Commit

Permalink
fix: fix read only store recovery, impl random drop (#16)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored May 29, 2023
1 parent 1e3be82 commit 80c40de
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 15 deletions.
2 changes: 1 addition & 1 deletion foyer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ nix = { version = "0.26", features = ["fs", "mman"] }
parking_lot = "0.12"
paste = "1.0"
prometheus = "0.13"
rand = "0.8.5"
thiserror = "1"
tokio = { version = "1", features = [
"rt",
Expand All @@ -36,6 +37,5 @@ twox-hash = "1"
bytesize = "1"
clap = { version = "4", features = ["derive"] }
hdrhistogram = "7"
rand = "0.8.5"
rand_mt = "4.2.1"
tempfile = "3"
115 changes: 101 additions & 14 deletions foyer/src/store/read_only_file_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use std::{collections::HashMap, marker::PhantomData};

use async_trait::async_trait;

use bytes::{Buf, BufMut};
use itertools::Itertools;
use rand::{thread_rng, Rng};
use tokio::sync::{RwLock, RwLockWriteGuard};

use crate::{Data, Index};
Expand Down Expand Up @@ -139,19 +141,18 @@ where
let dir = config.dir.clone();
move || {
create_dir_all(&dir)?;
let ids: Vec<FileId> = read_dir(dir)?

let mut ids: Vec<FileId> = read_dir(dir)?
.map(|entry| entry.unwrap())
.filter(|entry| {
entry
.file_name()
.to_string_lossy()
.is_prefix_of(META_FILE_PREFIX)
META_FILE_PREFIX.is_prefix_of(&entry.file_name().to_string_lossy())
})
.map(|entry| {
entry.file_name().to_string_lossy()[META_FILE_PREFIX.len()..].to_string()
})
.map(|s| s.parse().unwrap())
.collect_vec();
ids.sort();
Ok(ids)
}
})
Expand Down Expand Up @@ -187,21 +188,35 @@ where

#[allow(clippy::uninit_vec)]
async fn store(&self, index: Self::I, data: Self::D) -> Result<()> {
let buf = data.into();
let len = buf.len();

// append cache file and meta file
let (fid, sid, location) = {
// randomly drop if size exceeds the threshold
if self.size.load(Ordering::Relaxed) as f64
>= self.config.capacity as f64 * self.config.trigger_random_drop_ratio
{
let mut rng = thread_rng();
if rng.gen_range(0.0..1.0) < self.config.random_drop_ratio {
return Ok(());
}
}

let buf = data.into();

let files = self.files.read().await;

let fid = files.active.fid;

let location = files.active.cache_file.append(buf).await?;

let mut buf = Vec::with_capacity(Self::meta_entry_size());

let tags = Tags { valid: true };

unsafe { buf.set_len(Self::meta_entry_size()) };
index.write(&mut buf[..]);
location.write(&mut buf[I::size()..]);
tags.write(&mut buf[..]);
index.write(&mut buf[Tags::size()..]);
location.write(&mut buf[Tags::size() + I::size()..]);

let Location {
offset: meta_offset,
len: _,
Expand All @@ -219,7 +234,8 @@ where
drop(indices);
}

self.size.fetch_add(len, Ordering::Relaxed);
self.size
.fetch_add(location.len as usize, Ordering::Relaxed);

if active_file_size >= self.config.max_file_size {
let files = self.files.write().await;
Expand Down Expand Up @@ -434,8 +450,12 @@ where
let buf = meta.read(0, size).await?;
for sid in 0..slots {
let slice = &buf[sid * Self::meta_entry_size()..(sid + 1) * Self::meta_entry_size()];
let index = I::read(slice);
let location = Location::read(&slice[I::size()..]);
let tags = Tags::read(slice);
if !tags.valid {
continue;
}
let index = I::read(&slice[Tags::size()..]);
let location = Location::read(&slice[Tags::size() + I::size()..]);
indices.insert(index, (fid, sid as SlotId, location));
}
Ok(())
Expand All @@ -450,7 +470,29 @@ where
}

fn meta_entry_size() -> usize {
I::size() + Location::size()
I::size() + Location::size() + Tags::size()
}
}

struct Tags {
valid: bool,
}

impl Tags {
fn size() -> usize {
1
}

fn write(&self, mut buf: &mut [u8]) {
let mut val = 0;
val |= self.valid as u8;
buf.put_u8(val);
}

fn read(mut buf: &[u8]) -> Self {
let val = buf.get_u8();
let valid = (val & 1) != 0;
Self { valid }
}
}

Expand Down Expand Up @@ -514,4 +556,49 @@ mod tests {

drop(dir);
}

#[tokio::test]
async fn test_read_only_file_store_recovery() {
let dir = tempdir().unwrap();

let config = Config {
dir: dir.path().to_owned(),
max_file_size: 4 * 1024,
capacity: 16 * 1024,
trigger_reclaim_garbage_ratio: 0.0, // disabled
trigger_reclaim_capacity_ratio: 0.75,
trigger_random_drop_ratio: 0.0, // disabled
random_drop_ratio: 0.0, // disabled
};

let store: ReadOnlyFileStore<u64, Vec<u8>> =
ReadOnlyFileStore::open(0, config.clone()).await.unwrap();

for i in 0..20 {
store.store(i, data(i as u8, 1024)).await.unwrap();
}

assert_eq!(store.files.read().await.frozens.len(), 2);
for i in 0..12 {
assert_eq!(store.load(&i).await.unwrap(), None);
}
for i in 12..20 {
assert_eq!(store.load(&i).await.unwrap(), Some(data(i as u8, 1024)));
}

drop(store);

let store: ReadOnlyFileStore<u64, Vec<u8>> =
ReadOnlyFileStore::open(0, config).await.unwrap();

assert_eq!(store.files.read().await.frozens.len(), 3);
for i in 0..12 {
assert_eq!(store.load(&i).await.unwrap(), None);
}
for i in 12..20 {
assert_eq!(store.load(&i).await.unwrap(), Some(data(i as u8, 1024)));
}

drop(dir);
}
}

0 comments on commit 80c40de

Please sign in to comment.