Skip to content

Commit

Permalink
fix: region advance & recovery & seal
Browse files Browse the repository at this point in the history
- fix region advance
- fix recovery slice destroy
- seal last dirty region when shutdown

Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx committed Jul 6, 2023
1 parent 011a401 commit ccb5c68
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 7 deletions.
2 changes: 1 addition & 1 deletion foyer-intrusive/src/eviction/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct FifoConfig {
/// The formula is as follows:
///
/// `segment's size = total_segments * (segment's ratio / sum(ratio))`
segment_ratios: Vec<usize>,
pub segment_ratios: Vec<usize>,
}

#[derive(Debug, Default)]
Expand Down
1 change: 1 addition & 0 deletions foyer-storage/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ where
async fn run(mut self) -> Result<()> {
loop {
tokio::select! {
biased;
Some(task) = self.task_rx.recv() => {
self.handle(task).await?;
}
Expand Down
1 change: 1 addition & 0 deletions foyer-storage/src/reclaimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ where
async fn run(mut self) -> Result<()> {
loop {
tokio::select! {
biased;
Some(task) = self.task_rx.recv() => {
self.handle(task).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion foyer-storage/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ where
{
pub fn new(id: RegionId, device: D) -> Self {
let inner = RegionInner {
version: 1,
version: 0,

buffer: None,
len: 0,
Expand Down
8 changes: 6 additions & 2 deletions foyer-storage/src/region_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use foyer_common::queue::AsyncQueue;
use foyer_intrusive::{
core::{adapter::Link, pointer::PointerOps},
eviction::EvictionPolicy,
intrusive_adapter, key_adapter,
intrusive_adapter, key_adapter, priority_adapter,
};
use parking_lot::RwLock;
use tokio::sync::RwLock as AsyncRwLock;
Expand All @@ -37,10 +37,12 @@ where
{
link: L,
id: RegionId,
priority: usize,
}

intrusive_adapter! { pub RegionEpItemAdapter<L> = Arc<RegionEpItem<L>>: RegionEpItem<L> { link: L } where L: Link }
key_adapter! { RegionEpItemAdapter<L> = RegionEpItem<L> { id: RegionId } where L: Link }
priority_adapter! { RegionEpItemAdapter<L> = RegionEpItem<L> { priority: usize } where L: Link }

struct RegionManagerInner {
current: Option<RegionId>,
Expand Down Expand Up @@ -94,6 +96,7 @@ where
let item = Arc::new(RegionEpItem {
link: E::Link::default(),
id,
priority: 0,
});

regions.push(region);
Expand Down Expand Up @@ -151,12 +154,13 @@ where
tracing::info!("switch to clean region: {}", region_id);

let region = self.region(&region_id);
region.advance().await;

let buffer = self.buffers.acquire().await;
region.attach_buffer(buffer).await;

let slice = region.allocate(size).await.unwrap();

region.advance().await;
inner.current = Some(region_id);

AllocateResult::Ok(slice)
Expand Down
161 changes: 158 additions & 3 deletions foyer-storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ where
}

pub async fn shutdown_runners(&self) -> Result<()> {
self.seal().await?;
self.stop_tx.send(()).unwrap();
let handles = self.handles.lock().drain(..).collect_vec();
for handle in handles {
Expand Down Expand Up @@ -271,6 +272,31 @@ where
bits::align_up(self.device.align(), unaligned)
}

async fn seal(&self) -> Result<()> {
match self
.region_manager
.allocate(self.device.region_size() - self.device.align())
.await
{
crate::region::AllocateResult::Full { mut slice, remain } => {
println!("seal");
// current region is full, write region footer and try allocate again
let footer = RegionFooter {
magic: REGION_MAGIC,
padding: remain as u64,
};
footer.write(slice.as_mut());
slice.destroy().await;
}
crate::region::AllocateResult::Ok(slice) => {
println!("not seal");
// region is empty, skip
slice.destroy().await
}
}
Ok(())
}

async fn recover(&self, concurrency: usize) -> Result<()> {
tracing::info!("start store recovery");

Expand Down Expand Up @@ -500,6 +526,8 @@ where
};

let footer = RegionFooter::read(slice.as_ref());
slice.destroy().await;

if footer.magic != REGION_MAGIC {
return Ok(None);
}
Expand Down Expand Up @@ -539,13 +567,18 @@ where
let rel_start =
align - EntryFooter::serialized_len() - (footer.padding + footer.key_len) as usize;
let rel_end = align - EntryFooter::serialized_len() - footer.padding as usize;
K::read(&slice.as_ref()[rel_start..rel_end])
let key = K::read(&slice.as_ref()[rel_start..rel_end]);
slice.destroy().await;
key
} else {
let slice = self.region.load(align_start..align_end, 0).await?.unwrap();
slice.destroy().await;
let s = self.region.load(align_start..align_end, 0).await?.unwrap();
let rel_start = abs_start - align_start;
let rel_end = abs_end - align_start;

K::read(&slice.as_ref()[rel_start..rel_end])
let key = K::read(&s.as_ref()[rel_start..rel_end]);
s.destroy().await;
key
};

self.cursor -= entry_len;
Expand All @@ -561,3 +594,125 @@ where
}))
}
}

#[cfg(test)]
pub mod tests {
use std::path::PathBuf;

use foyer_intrusive::eviction::fifo::{Fifo, FifoConfig, FifoLink};

use crate::{
admission::AdmitAll,
device::{
fs::{FsDevice, FsDeviceConfig},
io_buffer::AlignedAllocator,
},
reinsertion::ReinsertNone,
};

use super::*;

type TestStore = Store<
u64,
Vec<u8>,
AlignedAllocator,
FsDevice,
Fifo<RegionEpItemAdapter<FifoLink>>,
AdmitAll<u64, Vec<u8>>,
ReinsertNone<u64, Vec<u8>>,
FifoLink,
>;

type TestStoreConfig = StoreConfig<
FsDevice,
AdmitAll<u64, Vec<u8>>,
Fifo<RegionEpItemAdapter<FifoLink>>,
ReinsertNone<u64, Vec<u8>>,
FifoLink,
>;

#[tokio::test]
#[allow(clippy::identity_op)]
async fn test_recovery() {
const KB: usize = 1024;
const MB: usize = 1024 * 1024;

let tempdir = tempfile::tempdir().unwrap();

let config = TestStoreConfig {
eviction_config: FifoConfig {
segment_ratios: vec![1],
},
device_config: FsDeviceConfig {
dir: PathBuf::from(tempdir.path()),
capacity: 16 * MB,
file_capacity: 4 * MB,
align: 4096,
io_size: 4096 * KB,
},
admission: AdmitAll::default(),
reinsertion: ReinsertNone::default(),
buffer_pool_size: 8 * MB,
flushers: 1,
reclaimers: 1,
recover_concurrency: 2,
};

let store = TestStore::open(config).await.unwrap();

// files:
// [0, 1, 2] (evicted)
// [3, 4, 5]
// [6, 7, 8]
// [9, 10, 11]
for i in 0..12 {
store.insert(i, vec![i as u8; 1 * MB]).await.unwrap();
}

for i in 0..3 {
assert!(store.lookup(&i).await.unwrap().is_none());
}
for i in 3..12 {
assert_eq!(
store.lookup(&i).await.unwrap().unwrap(),
vec![i as u8; 1 * MB],
);
}

store.shutdown_runners().await.unwrap();
drop(store);

let config = TestStoreConfig {
eviction_config: FifoConfig {
segment_ratios: vec![1],
},
device_config: FsDeviceConfig {
dir: PathBuf::from(tempdir.path()),
capacity: 16 * MB,
file_capacity: 4 * MB,
align: 4096,
io_size: 4096 * KB,
},
admission: AdmitAll::default(),
reinsertion: ReinsertNone::default(),
buffer_pool_size: 8 * MB,
flushers: 1,
reclaimers: 1,
recover_concurrency: 2,
};
let store = TestStore::open(config).await.unwrap();

for i in 0..3 {
assert!(store.lookup(&i).await.unwrap().is_none());
}
for i in 3..12 {
assert_eq!(
store.lookup(&i).await.unwrap().unwrap(),
vec![i as u8; 1 * MB],
);
}

store.shutdown_runners().await.unwrap();
drop(store);
}
}

0 comments on commit ccb5c68

Please sign in to comment.