Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl destroy for sodc #744

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions foyer-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ twox-hash = "1"
zstd = "0.13"

[dev-dependencies]
bytesize = { workspace = true }
tempfile = "3"
test-log = { workspace = true }

Expand Down
12 changes: 5 additions & 7 deletions foyer-storage/src/small/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
device::ALIGN,
io_buffer_pool::IoBufferPool,
serde::EntrySerializer,
small::{serde::EntryHeader, set::SetId},
small::{serde::EntryHeader, set::SetId, set_manager::SetPicker},
Compression, IoBuffer, IoBytes,
};

Expand Down Expand Up @@ -83,16 +83,14 @@ where
V: StorageValue,
S: HashBuilder + Debug,
{
/// Total set count.
total: SetId,

sets: HashMap<SetId, SetBatchMut<K, V, S>>,
buffer: IoBuffer,
len: usize,
sequence: Sequence,

/// Cache write buffer between rotation to reduce page fault.
buffer_pool: IoBufferPool,
set_picker: SetPicker,

waiters: Vec<oneshot::Sender<()>>,

Expand All @@ -107,16 +105,16 @@ where
V: StorageValue,
S: HashBuilder + Debug,
{
pub fn new(total: SetId, buffer_size: usize, metrics: Arc<Metrics>) -> Self {
pub fn new(sets: usize, buffer_size: usize, metrics: Arc<Metrics>) -> Self {
let buffer_size = bits::align_up(ALIGN, buffer_size);

Self {
total,
sets: HashMap::new(),
buffer: IoBuffer::new(buffer_size),
len: 0,
sequence: 0,
buffer_pool: IoBufferPool::new(buffer_size, 1),
set_picker: SetPicker::new(sets),
waiters: vec![],
init: None,
metrics,
Expand Down Expand Up @@ -192,7 +190,7 @@ where
}

fn sid(&self, hash: u64) -> SetId {
hash % self.total
self.set_picker.sid(hash)
}

pub fn is_empty(&self) -> bool {
Expand Down
144 changes: 137 additions & 7 deletions foyer-storage/src/small/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
use futures::{future::join_all, Future};
use itertools::Itertools;

use super::flusher::Submission;
use crate::{
device::{MonitoredDevice, RegionId},
error::Result,
small::{flusher::Flusher, set::SetId, set_manager::SetManager},
small::{
flusher::{Flusher, Submission},
set_manager::SetManager,
},
storage::Storage,
DeviceStats, Runtime, Statistics,
};
Expand Down Expand Up @@ -136,13 +138,20 @@
let stats = config.statistics.clone();
let metrics = config.device.metrics().clone();

let set_manager = SetManager::new(
assert_eq!(
config.regions.start, 0,
"small object disk cache must start with region 0, current: {:?}",

Check warning on line 143 in foyer-storage/src/small/generic.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/small/generic.rs#L143

Added line #L143 was not covered by tests
config.regions
);

let set_manager = SetManager::open(
config.set_size,
config.set_cache_capacity,
config.device.clone(),
config.regions.clone(),
config.flush,
);
)
.await?;

let flushers = (0..config.flushers)
.map(|_| Flusher::open(&config, set_manager.clone(), stats.clone(), metrics.clone()))
Expand Down Expand Up @@ -187,7 +196,7 @@

fn load(&self, hash: u64) -> impl Future<Output = Result<Option<(K, V)>>> + Send + 'static {
let set_manager = self.inner.set_manager.clone();
let sid = hash % set_manager.sets() as SetId;
let sid = set_manager.set_picker().sid(hash);
let stats = self.inner.stats.clone();

async move {
Expand Down Expand Up @@ -216,9 +225,14 @@
self.inner.flushers[id].submit(Submission::Deletion { hash });
}

async fn destroy(&self) -> Result<()> {
// TODO(MrCroxx): reset bloom filters
self.inner.set_manager.destroy().await
}

fn may_contains(&self, hash: u64) -> bool {
let set_manager = self.inner.set_manager.clone();
let sid = hash % set_manager.sets() as SetId;
let sid = set_manager.set_picker().sid(hash);

Check warning on line 235 in foyer-storage/src/small/generic.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/small/generic.rs#L235

Added line #L235 was not covered by tests
// FIXME: Anyway without blocking? Use atomic?
self.inner
.runtime
Expand Down Expand Up @@ -268,7 +282,7 @@
}

async fn destroy(&self) -> Result<()> {
todo!()
self.destroy().await

Check warning on line 285 in foyer-storage/src/small/generic.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/small/generic.rs#L285

Added line #L285 was not covered by tests
}

fn stats(&self) -> Arc<DeviceStats> {
Expand All @@ -279,3 +293,119 @@
self.wait()
}
}

#[cfg(test)]
mod tests {
use std::path::Path;

use ahash::RandomState;
use bytesize::ByteSize;
use foyer_common::metrics::Metrics;
use foyer_memory::{Cache, CacheBuilder, FifoConfig};
use tokio::runtime::Handle;

use super::*;
use crate::{
device::{
monitor::{Monitored, MonitoredOptions},
Dev,
},
serde::EntrySerializer,
DevExt, DirectFsDeviceOptions,
};

fn cache_for_test() -> Cache<u64, Vec<u8>> {
CacheBuilder::new(10)
.with_eviction_config(FifoConfig::default())
.build()
}

async fn device_for_test(dir: impl AsRef<Path>) -> MonitoredDevice {
let runtime = Runtime::current();
Monitored::open(
MonitoredOptions {
options: DirectFsDeviceOptions {
dir: dir.as_ref().into(),
capacity: ByteSize::kib(64).as_u64() as _,
file_size: ByteSize::kib(16).as_u64() as _,
}
.into(),
metrics: Arc::new(Metrics::new("test")),
},
runtime,
)
.await

Check warning on line 337 in foyer-storage/src/small/generic.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/small/generic.rs#L337

Added line #L337 was not covered by tests
.unwrap()
}

async fn store_for_test(dir: impl AsRef<Path>) -> GenericSmallStorage<u64, Vec<u8>, RandomState> {
let device = device_for_test(dir).await;
let regions = 0..device.regions() as RegionId;
let config = GenericSmallStorageConfig {
set_size: ByteSize::kib(4).as_u64() as _,
set_cache_capacity: 4,
device,
regions,
flush: false,
flushers: 1,
buffer_pool_size: ByteSize::kib(64).as_u64() as _,
statistics: Arc::<Statistics>::default(),
runtime: Runtime::new(None, None, Handle::current()),
marker: PhantomData,
};
GenericSmallStorage::open(config).await.unwrap()
}

fn enqueue(store: &GenericSmallStorage<u64, Vec<u8>, RandomState>, entry: &CacheEntry<u64, Vec<u8>>) {
let estimated_size = EntrySerializer::estimated_size(entry.key(), entry.value());
store.enqueue(entry.clone(), estimated_size);
}

async fn assert_some(store: &GenericSmallStorage<u64, Vec<u8>, RandomState>, entry: &CacheEntry<u64, Vec<u8>>) {
assert_eq!(
store.load(entry.hash()).await.unwrap().unwrap(),
(*entry.key(), entry.value().clone())
);
}

async fn assert_none(store: &GenericSmallStorage<u64, Vec<u8>, RandomState>, entry: &CacheEntry<u64, Vec<u8>>) {
assert!(store.load(entry.hash()).await.unwrap().is_none());
}

#[test_log::test(tokio::test)]
async fn test_store_enqueue_lookup_destroy_recovery() {
let dir = tempfile::tempdir().unwrap();

let memory = cache_for_test();
let store = store_for_test(dir.path()).await;

let e1 = memory.insert(1, vec![1; 42]);
enqueue(&store, &e1);
store.wait().await;

assert_some(&store, &e1).await;

store.delete(e1.hash());
store.wait().await;

assert_none(&store, &e1).await;

let e2 = memory.insert(2, vec![2; 192]);
let e3 = memory.insert(3, vec![3; 168]);

enqueue(&store, &e1);
enqueue(&store, &e2);
enqueue(&store, &e3);
store.wait().await;

assert_some(&store, &e1).await;
assert_some(&store, &e2).await;
assert_some(&store, &e3).await;

store.destroy().await.unwrap();

assert_none(&store, &e1).await;
assert_none(&store, &e2).await;
assert_none(&store, &e3).await;
}
}
57 changes: 36 additions & 21 deletions foyer-storage/src/small/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl SetMut {
/// # Format
///
/// ```plain
/// | checksum (4B) | timestamp (8B) | len (4B) |
/// | checksum (4B) | ns timestamp (16B) | len (4B) |
/// | bloom filter (4 * 8B = 32B) |
/// ```
pub struct SetStorage {
Expand All @@ -97,7 +97,7 @@ pub struct SetStorage {
/// Set size.
size: usize,
/// Set last updated timestamp.
timestamp: u64,
timestamp: u128,
/// Set bloom filter.
bloom_filter: BloomFilterU64<4>,

Expand All @@ -118,15 +118,18 @@ impl Debug for SetStorage {
}

impl SetStorage {
pub const SET_HEADER_SIZE: usize = 48;
pub const SET_HEADER_SIZE: usize = 56;

pub fn load(buffer: IoBytesMut) -> Self {
/// Load the set storage from buffer.
///
/// If `after` is set and the set storage is before the timestamp, load an empty set storage.
pub fn load(buffer: IoBytesMut, watermark: u128) -> Self {
assert!(buffer.len() >= Self::SET_HEADER_SIZE);

let checksum = (&buffer[0..4]).get_u32();
let timestamp = (&buffer[4..12]).get_u64();
let len = (&buffer[12..16]).get_u32() as usize;
let bloom_filter = BloomFilterU64::read(&buffer[16..48]);
let timestamp = (&buffer[4..20]).get_u128();
let len = (&buffer[20..24]).get_u32() as usize;
let bloom_filter = BloomFilterU64::read(&buffer[24..56]);

let mut this = Self {
checksum,
Expand All @@ -138,25 +141,29 @@ impl SetStorage {
buffer,
};

if Self::SET_HEADER_SIZE + this.len >= this.buffer.len() {
this.verify(watermark);

this
}

fn verify(&mut self, watermark: u128) {
if Self::SET_HEADER_SIZE + self.len >= self.buffer.len() || self.timestamp < watermark {
// invalid len
this.clear();
self.clear();
} else {
let c = Checksummer::checksum32(&this.buffer[4..Self::SET_HEADER_SIZE + this.len]);
if c != checksum {
let c = Checksummer::checksum32(&self.buffer[4..Self::SET_HEADER_SIZE + self.len]);
if c != self.checksum {
// checksum mismatch
this.clear();
self.clear();
}
}

this
}

pub fn update(&mut self) {
self.bloom_filter.write(&mut self.buffer[16..48]);
(&mut self.buffer[12..16]).put_u32(self.len as _);
self.timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64;
(&mut self.buffer[4..12]).put_u64(self.timestamp);
self.bloom_filter.write(&mut self.buffer[24..56]);
(&mut self.buffer[20..24]).put_u32(self.len as _);
self.timestamp = SetTimestamp::current();
(&mut self.buffer[4..20]).put_u128(self.timestamp);
self.checksum = Checksummer::checksum32(&self.buffer[4..Self::SET_HEADER_SIZE + self.len]);
(&mut self.buffer[0..4]).put_u32(self.checksum);
}
Expand Down Expand Up @@ -384,6 +391,14 @@ impl<'a> Iterator for SetIter<'a> {
}
}

pub struct SetTimestamp;

impl SetTimestamp {
pub fn current() -> u128 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos()
}
}

#[cfg(test)]
mod tests {

Expand Down Expand Up @@ -436,7 +451,7 @@ mod tests {
#[should_panic]
fn test_set_storage_empty() {
let buffer = IoBytesMut::new();
SetStorage::load(buffer);
SetStorage::load(buffer, 0);
}

#[test]
Expand All @@ -447,7 +462,7 @@ mod tests {
unsafe { buf.set_len(PAGE) };

// load will result in an empty set
let mut storage = SetStorage::load(buf);
let mut storage = SetStorage::load(buf, 0);
assert!(storage.is_empty());

let e1 = memory.insert(1, vec![b'1'; 42]);
Expand Down Expand Up @@ -510,7 +525,7 @@ mod tests {
let mut buf = IoBytesMut::with_capacity(PAGE);
unsafe { buf.set_len(PAGE) };
buf[0..bytes.len()].copy_from_slice(&bytes);
let mut storage = SetStorage::load(buf);
let mut storage = SetStorage::load(buf, 0);

assert_eq!(storage.len(), b4.len());
assert_none(&storage, e1.hash());
Expand Down
Loading
Loading