diff --git a/foyer-intrusive/src/eviction/fifo.rs b/foyer-intrusive/src/eviction/fifo.rs index 86d3d97d..846581de 100644 --- a/foyer-intrusive/src/eviction/fifo.rs +++ b/foyer-intrusive/src/eviction/fifo.rs @@ -48,7 +48,7 @@ pub struct FifoConfig { /// The formula is as follows: /// /// `segment's size = total_segments * (segment's ratio / sum(ratio))` - segment_ratios: Vec, + pub segment_ratios: Vec, } #[derive(Debug, Default)] diff --git a/foyer-storage/src/flusher.rs b/foyer-storage/src/flusher.rs index b02d92c2..7f3f96a1 100644 --- a/foyer-storage/src/flusher.rs +++ b/foyer-storage/src/flusher.rs @@ -138,6 +138,7 @@ where async fn run(mut self) -> Result<()> { loop { tokio::select! { + biased; Some(task) = self.task_rx.recv() => { self.handle(task).await?; } diff --git a/foyer-storage/src/reclaimer.rs b/foyer-storage/src/reclaimer.rs index de8a7535..73c4af9b 100644 --- a/foyer-storage/src/reclaimer.rs +++ b/foyer-storage/src/reclaimer.rs @@ -169,6 +169,7 @@ where async fn run(mut self) -> Result<()> { loop { tokio::select! { + biased; Some(task) = self.task_rx.recv() => { self.handle(task).await?; } diff --git a/foyer-storage/src/region.rs b/foyer-storage/src/region.rs index 3615661d..7e86fac6 100644 --- a/foyer-storage/src/region.rs +++ b/foyer-storage/src/region.rs @@ -92,7 +92,7 @@ where { pub fn new(id: RegionId, device: D) -> Self { let inner = RegionInner { - version: 1, + version: 0, buffer: None, len: 0, diff --git a/foyer-storage/src/region_manager.rs b/foyer-storage/src/region_manager.rs index 78aaac80..43803921 100644 --- a/foyer-storage/src/region_manager.rs +++ b/foyer-storage/src/region_manager.rs @@ -18,7 +18,7 @@ use foyer_common::queue::AsyncQueue; use foyer_intrusive::{ core::{adapter::Link, pointer::PointerOps}, eviction::EvictionPolicy, - intrusive_adapter, key_adapter, + intrusive_adapter, key_adapter, priority_adapter, }; use parking_lot::RwLock; use tokio::sync::RwLock as AsyncRwLock; @@ -37,10 +37,12 @@ where { link: L, id: RegionId, + priority: usize, } intrusive_adapter! { pub RegionEpItemAdapter = Arc>: RegionEpItem { link: L } where L: Link } key_adapter! { RegionEpItemAdapter = RegionEpItem { id: RegionId } where L: Link } +priority_adapter! { RegionEpItemAdapter = RegionEpItem { priority: usize } where L: Link } struct RegionManagerInner { current: Option, @@ -94,6 +96,7 @@ where let item = Arc::new(RegionEpItem { link: E::Link::default(), id, + priority: 0, }); regions.push(region); @@ -151,12 +154,13 @@ where tracing::info!("switch to clean region: {}", region_id); let region = self.region(®ion_id); + region.advance().await; + let buffer = self.buffers.acquire().await; region.attach_buffer(buffer).await; let slice = region.allocate(size).await.unwrap(); - region.advance().await; inner.current = Some(region_id); AllocateResult::Ok(slice) diff --git a/foyer-storage/src/store.rs b/foyer-storage/src/store.rs index cf38088e..358d9c11 100644 --- a/foyer-storage/src/store.rs +++ b/foyer-storage/src/store.rs @@ -187,6 +187,7 @@ where } pub async fn shutdown_runners(&self) -> Result<()> { + self.seal().await?; self.stop_tx.send(()).unwrap(); let handles = self.handles.lock().drain(..).collect_vec(); for handle in handles { @@ -271,6 +272,31 @@ where bits::align_up(self.device.align(), unaligned) } + async fn seal(&self) -> Result<()> { + match self + .region_manager + .allocate(self.device.region_size() - self.device.align()) + .await + { + crate::region::AllocateResult::Full { mut slice, remain } => { + println!("seal"); + // current region is full, write region footer and try allocate again + let footer = RegionFooter { + magic: REGION_MAGIC, + padding: remain as u64, + }; + footer.write(slice.as_mut()); + slice.destroy().await; + } + crate::region::AllocateResult::Ok(slice) => { + println!("not seal"); + // region is empty, skip + slice.destroy().await + } + } + Ok(()) + } + async fn recover(&self, concurrency: usize) -> Result<()> { tracing::info!("start store recovery"); @@ -500,6 +526,8 @@ where }; let footer = RegionFooter::read(slice.as_ref()); + slice.destroy().await; + if footer.magic != REGION_MAGIC { return Ok(None); } @@ -539,13 +567,18 @@ where let rel_start = align - EntryFooter::serialized_len() - (footer.padding + footer.key_len) as usize; let rel_end = align - EntryFooter::serialized_len() - footer.padding as usize; - K::read(&slice.as_ref()[rel_start..rel_end]) + let key = K::read(&slice.as_ref()[rel_start..rel_end]); + slice.destroy().await; + key } else { - let slice = self.region.load(align_start..align_end, 0).await?.unwrap(); + slice.destroy().await; + let s = self.region.load(align_start..align_end, 0).await?.unwrap(); let rel_start = abs_start - align_start; let rel_end = abs_end - align_start; - K::read(&slice.as_ref()[rel_start..rel_end]) + let key = K::read(&s.as_ref()[rel_start..rel_end]); + s.destroy().await; + key }; self.cursor -= entry_len; @@ -561,3 +594,125 @@ where })) } } + +#[cfg(test)] +pub mod tests { + use std::path::PathBuf; + + use foyer_intrusive::eviction::fifo::{Fifo, FifoConfig, FifoLink}; + + use crate::{ + admission::AdmitAll, + device::{ + fs::{FsDevice, FsDeviceConfig}, + io_buffer::AlignedAllocator, + }, + reinsertion::ReinsertNone, + }; + + use super::*; + + type TestStore = Store< + u64, + Vec, + AlignedAllocator, + FsDevice, + Fifo>, + AdmitAll>, + ReinsertNone>, + FifoLink, + >; + + type TestStoreConfig = StoreConfig< + FsDevice, + AdmitAll>, + Fifo>, + ReinsertNone>, + FifoLink, + >; + + #[tokio::test] + #[allow(clippy::identity_op)] + async fn test_recovery() { + const KB: usize = 1024; + const MB: usize = 1024 * 1024; + + let tempdir = tempfile::tempdir().unwrap(); + + let config = TestStoreConfig { + eviction_config: FifoConfig { + segment_ratios: vec![1], + }, + device_config: FsDeviceConfig { + dir: PathBuf::from(tempdir.path()), + capacity: 16 * MB, + file_capacity: 4 * MB, + align: 4096, + io_size: 4096 * KB, + }, + admission: AdmitAll::default(), + reinsertion: ReinsertNone::default(), + buffer_pool_size: 8 * MB, + flushers: 1, + reclaimers: 1, + recover_concurrency: 2, + }; + + let store = TestStore::open(config).await.unwrap(); + + // files: + // [0, 1, 2] (evicted) + // [3, 4, 5] + // [6, 7, 8] + // [9, 10, 11] + for i in 0..12 { + store.insert(i, vec![i as u8; 1 * MB]).await.unwrap(); + } + + for i in 0..3 { + assert!(store.lookup(&i).await.unwrap().is_none()); + } + for i in 3..12 { + assert_eq!( + store.lookup(&i).await.unwrap().unwrap(), + vec![i as u8; 1 * MB], + ); + } + + store.shutdown_runners().await.unwrap(); + drop(store); + + let config = TestStoreConfig { + eviction_config: FifoConfig { + segment_ratios: vec![1], + }, + device_config: FsDeviceConfig { + dir: PathBuf::from(tempdir.path()), + capacity: 16 * MB, + file_capacity: 4 * MB, + align: 4096, + io_size: 4096 * KB, + }, + admission: AdmitAll::default(), + reinsertion: ReinsertNone::default(), + buffer_pool_size: 8 * MB, + flushers: 1, + reclaimers: 1, + recover_concurrency: 2, + }; + let store = TestStore::open(config).await.unwrap(); + + for i in 0..3 { + assert!(store.lookup(&i).await.unwrap().is_none()); + } + for i in 3..12 { + assert_eq!( + store.lookup(&i).await.unwrap().unwrap(), + vec![i as u8; 1 * MB], + ); + } + + store.shutdown_runners().await.unwrap(); + drop(store); + } +}