Skip to content

Commit

Permalink
feat: impl destroy for sodc (#744)
Browse files Browse the repository at this point in the history
* feat: impl destroy for sodc

Changes:

- Use set 0 as meta set.
- Use watermark in metaset for destroy.
- sodc must start from region 0

Signed-off-by: MrCroxx <[email protected]>

* chore: fix ffmt

Signed-off-by: MrCroxx <[email protected]>

* fix: clear cache after destroy, use u128 timestamp

Signed-off-by: MrCroxx <[email protected]>

---------

Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Sep 27, 2024
1 parent 88089d7 commit fa8f011
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 42 deletions.
1 change: 1 addition & 0 deletions foyer-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ twox-hash = "1"
zstd = "0.13"

[dev-dependencies]
bytesize = { workspace = true }
tempfile = "3"
test-log = { workspace = true }

Expand Down
12 changes: 5 additions & 7 deletions foyer-storage/src/small/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
device::ALIGN,
io_buffer_pool::IoBufferPool,
serde::EntrySerializer,
small::{serde::EntryHeader, set::SetId},
small::{serde::EntryHeader, set::SetId, set_manager::SetPicker},
Compression, IoBuffer, IoBytes,
};

Expand Down Expand Up @@ -83,16 +83,14 @@ where
V: StorageValue,
S: HashBuilder + Debug,
{
/// Total set count.
total: SetId,

sets: HashMap<SetId, SetBatchMut<K, V, S>>,
buffer: IoBuffer,
len: usize,
sequence: Sequence,

/// Cache write buffer between rotation to reduce page fault.
buffer_pool: IoBufferPool,
set_picker: SetPicker,

waiters: Vec<oneshot::Sender<()>>,

Expand All @@ -107,16 +105,16 @@ where
V: StorageValue,
S: HashBuilder + Debug,
{
pub fn new(total: SetId, buffer_size: usize, metrics: Arc<Metrics>) -> Self {
pub fn new(sets: usize, buffer_size: usize, metrics: Arc<Metrics>) -> Self {
let buffer_size = bits::align_up(ALIGN, buffer_size);

Self {
total,
sets: HashMap::new(),
buffer: IoBuffer::new(buffer_size),
len: 0,
sequence: 0,
buffer_pool: IoBufferPool::new(buffer_size, 1),
set_picker: SetPicker::new(sets),
waiters: vec![],
init: None,
metrics,
Expand Down Expand Up @@ -192,7 +190,7 @@ where
}

fn sid(&self, hash: u64) -> SetId {
hash % self.total
self.set_picker.sid(hash)
}

pub fn is_empty(&self) -> bool {
Expand Down
144 changes: 137 additions & 7 deletions foyer-storage/src/small/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ use foyer_memory::CacheEntry;
use futures::{future::join_all, Future};
use itertools::Itertools;

use super::flusher::Submission;
use crate::{
device::{MonitoredDevice, RegionId},
error::Result,
small::{flusher::Flusher, set::SetId, set_manager::SetManager},
small::{
flusher::{Flusher, Submission},
set_manager::SetManager,
},
storage::Storage,
DeviceStats, Runtime, Statistics,
};
Expand Down Expand Up @@ -136,13 +138,20 @@ where
let stats = config.statistics.clone();
let metrics = config.device.metrics().clone();

let set_manager = SetManager::new(
assert_eq!(
config.regions.start, 0,
"small object disk cache must start with region 0, current: {:?}",
config.regions
);

let set_manager = SetManager::open(
config.set_size,
config.set_cache_capacity,
config.device.clone(),
config.regions.clone(),
config.flush,
);
)
.await?;

let flushers = (0..config.flushers)
.map(|_| Flusher::open(&config, set_manager.clone(), stats.clone(), metrics.clone()))
Expand Down Expand Up @@ -187,7 +196,7 @@ where

fn load(&self, hash: u64) -> impl Future<Output = Result<Option<(K, V)>>> + Send + 'static {
let set_manager = self.inner.set_manager.clone();
let sid = hash % set_manager.sets() as SetId;
let sid = set_manager.set_picker().sid(hash);
let stats = self.inner.stats.clone();

async move {
Expand Down Expand Up @@ -216,9 +225,14 @@ where
self.inner.flushers[id].submit(Submission::Deletion { hash });
}

async fn destroy(&self) -> Result<()> {
// TODO(MrCroxx): reset bloom filters
self.inner.set_manager.destroy().await
}

fn may_contains(&self, hash: u64) -> bool {
let set_manager = self.inner.set_manager.clone();
let sid = hash % set_manager.sets() as SetId;
let sid = set_manager.set_picker().sid(hash);
// FIXME: Anyway without blocking? Use atomic?
self.inner
.runtime
Expand Down Expand Up @@ -268,7 +282,7 @@ where
}

async fn destroy(&self) -> Result<()> {
todo!()
self.destroy().await
}

fn stats(&self) -> Arc<DeviceStats> {
Expand All @@ -279,3 +293,119 @@ where
self.wait()
}
}

#[cfg(test)]
mod tests {
use std::path::Path;

use ahash::RandomState;
use bytesize::ByteSize;
use foyer_common::metrics::Metrics;
use foyer_memory::{Cache, CacheBuilder, FifoConfig};
use tokio::runtime::Handle;

use super::*;
use crate::{
device::{
monitor::{Monitored, MonitoredOptions},
Dev,
},
serde::EntrySerializer,
DevExt, DirectFsDeviceOptions,
};

fn cache_for_test() -> Cache<u64, Vec<u8>> {
CacheBuilder::new(10)
.with_eviction_config(FifoConfig::default())
.build()
}

async fn device_for_test(dir: impl AsRef<Path>) -> MonitoredDevice {
let runtime = Runtime::current();
Monitored::open(
MonitoredOptions {
options: DirectFsDeviceOptions {
dir: dir.as_ref().into(),
capacity: ByteSize::kib(64).as_u64() as _,
file_size: ByteSize::kib(16).as_u64() as _,
}
.into(),
metrics: Arc::new(Metrics::new("test")),
},
runtime,
)
.await
.unwrap()
}

async fn store_for_test(dir: impl AsRef<Path>) -> GenericSmallStorage<u64, Vec<u8>, RandomState> {
let device = device_for_test(dir).await;
let regions = 0..device.regions() as RegionId;
let config = GenericSmallStorageConfig {
set_size: ByteSize::kib(4).as_u64() as _,
set_cache_capacity: 4,
device,
regions,
flush: false,
flushers: 1,
buffer_pool_size: ByteSize::kib(64).as_u64() as _,
statistics: Arc::<Statistics>::default(),
runtime: Runtime::new(None, None, Handle::current()),
marker: PhantomData,
};
GenericSmallStorage::open(config).await.unwrap()
}

fn enqueue(store: &GenericSmallStorage<u64, Vec<u8>, RandomState>, entry: &CacheEntry<u64, Vec<u8>>) {
let estimated_size = EntrySerializer::estimated_size(entry.key(), entry.value());
store.enqueue(entry.clone(), estimated_size);
}

async fn assert_some(store: &GenericSmallStorage<u64, Vec<u8>, RandomState>, entry: &CacheEntry<u64, Vec<u8>>) {
assert_eq!(
store.load(entry.hash()).await.unwrap().unwrap(),
(*entry.key(), entry.value().clone())
);
}

async fn assert_none(store: &GenericSmallStorage<u64, Vec<u8>, RandomState>, entry: &CacheEntry<u64, Vec<u8>>) {
assert!(store.load(entry.hash()).await.unwrap().is_none());
}

#[test_log::test(tokio::test)]
async fn test_store_enqueue_lookup_destroy_recovery() {
let dir = tempfile::tempdir().unwrap();

let memory = cache_for_test();
let store = store_for_test(dir.path()).await;

let e1 = memory.insert(1, vec![1; 42]);
enqueue(&store, &e1);
store.wait().await;

assert_some(&store, &e1).await;

store.delete(e1.hash());
store.wait().await;

assert_none(&store, &e1).await;

let e2 = memory.insert(2, vec![2; 192]);
let e3 = memory.insert(3, vec![3; 168]);

enqueue(&store, &e1);
enqueue(&store, &e2);
enqueue(&store, &e3);
store.wait().await;

assert_some(&store, &e1).await;
assert_some(&store, &e2).await;
assert_some(&store, &e3).await;

store.destroy().await.unwrap();

assert_none(&store, &e1).await;
assert_none(&store, &e2).await;
assert_none(&store, &e3).await;
}
}
57 changes: 36 additions & 21 deletions foyer-storage/src/small/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl SetMut {
/// # Format
///
/// ```plain
/// | checksum (4B) | timestamp (8B) | len (4B) |
/// | checksum (4B) | ns timestamp (16B) | len (4B) |
/// | bloom filter (4 * 8B = 32B) |
/// ```
pub struct SetStorage {
Expand All @@ -97,7 +97,7 @@ pub struct SetStorage {
/// Set size.
size: usize,
/// Set last updated timestamp.
timestamp: u64,
timestamp: u128,
/// Set bloom filter.
bloom_filter: BloomFilterU64<4>,

Expand All @@ -118,15 +118,18 @@ impl Debug for SetStorage {
}

impl SetStorage {
pub const SET_HEADER_SIZE: usize = 48;
pub const SET_HEADER_SIZE: usize = 56;

pub fn load(buffer: IoBytesMut) -> Self {
/// Load the set storage from buffer.
///
/// If `after` is set and the set storage is before the timestamp, load an empty set storage.
pub fn load(buffer: IoBytesMut, watermark: u128) -> Self {
assert!(buffer.len() >= Self::SET_HEADER_SIZE);

let checksum = (&buffer[0..4]).get_u32();
let timestamp = (&buffer[4..12]).get_u64();
let len = (&buffer[12..16]).get_u32() as usize;
let bloom_filter = BloomFilterU64::read(&buffer[16..48]);
let timestamp = (&buffer[4..20]).get_u128();
let len = (&buffer[20..24]).get_u32() as usize;
let bloom_filter = BloomFilterU64::read(&buffer[24..56]);

let mut this = Self {
checksum,
Expand All @@ -138,25 +141,29 @@ impl SetStorage {
buffer,
};

if Self::SET_HEADER_SIZE + this.len >= this.buffer.len() {
this.verify(watermark);

this
}

fn verify(&mut self, watermark: u128) {
if Self::SET_HEADER_SIZE + self.len >= self.buffer.len() || self.timestamp < watermark {
// invalid len
this.clear();
self.clear();
} else {
let c = Checksummer::checksum32(&this.buffer[4..Self::SET_HEADER_SIZE + this.len]);
if c != checksum {
let c = Checksummer::checksum32(&self.buffer[4..Self::SET_HEADER_SIZE + self.len]);
if c != self.checksum {
// checksum mismatch
this.clear();
self.clear();
}
}

this
}

pub fn update(&mut self) {
self.bloom_filter.write(&mut self.buffer[16..48]);
(&mut self.buffer[12..16]).put_u32(self.len as _);
self.timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64;
(&mut self.buffer[4..12]).put_u64(self.timestamp);
self.bloom_filter.write(&mut self.buffer[24..56]);
(&mut self.buffer[20..24]).put_u32(self.len as _);
self.timestamp = SetTimestamp::current();
(&mut self.buffer[4..20]).put_u128(self.timestamp);
self.checksum = Checksummer::checksum32(&self.buffer[4..Self::SET_HEADER_SIZE + self.len]);
(&mut self.buffer[0..4]).put_u32(self.checksum);
}
Expand Down Expand Up @@ -384,6 +391,14 @@ impl<'a> Iterator for SetIter<'a> {
}
}

pub struct SetTimestamp;

impl SetTimestamp {
pub fn current() -> u128 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos()
}
}

#[cfg(test)]
mod tests {

Expand Down Expand Up @@ -436,7 +451,7 @@ mod tests {
#[should_panic]
fn test_set_storage_empty() {
let buffer = IoBytesMut::new();
SetStorage::load(buffer);
SetStorage::load(buffer, 0);
}

#[test]
Expand All @@ -447,7 +462,7 @@ mod tests {
unsafe { buf.set_len(PAGE) };

// load will result in an empty set
let mut storage = SetStorage::load(buf);
let mut storage = SetStorage::load(buf, 0);
assert!(storage.is_empty());

let e1 = memory.insert(1, vec![b'1'; 42]);
Expand Down Expand Up @@ -510,7 +525,7 @@ mod tests {
let mut buf = IoBytesMut::with_capacity(PAGE);
unsafe { buf.set_len(PAGE) };
buf[0..bytes.len()].copy_from_slice(&bytes);
let mut storage = SetStorage::load(buf);
let mut storage = SetStorage::load(buf, 0);

assert_eq!(storage.len(), b4.len());
assert_none(&storage, e1.hash());
Expand Down
Loading

0 comments on commit fa8f011

Please sign in to comment.