Skip to content

Commit

Permalink
feat: refine either storage (#609)
Browse files Browse the repository at this point in the history
* feat: refine either storage

Signed-off-by: MrCroxx <[email protected]>

* chore: tiny refactor

Signed-off-by: MrCroxx <[email protected]>

---------

Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Jul 10, 2024
1 parent 3c15f70 commit 034032f
Show file tree
Hide file tree
Showing 13 changed files with 189 additions and 142 deletions.
1 change: 1 addition & 0 deletions foyer-common/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl<T, S> From<T> for Diversion<T, S> {
}

/// [`DiversionFuture`] is a future wrapper that partially store and partially return the future result.
#[must_use]
#[pin_project]
pub struct DiversionFuture<FU, T, S> {
#[pin]
Expand Down
107 changes: 59 additions & 48 deletions foyer-storage/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,26 @@ use crate::{
either::{Either, EitherConfig, Selection, Selector},
noop::Noop,
runtime::{Runtime, RuntimeStoreConfig},
WaitHandle,
},
DeviceStats, Storage,
};

pub struct SizeSelector<K>
pub struct SizeSelector<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
threshold: usize,
_marker: PhantomData<K>,
_marker: PhantomData<(K, V, S)>,
}

impl<K> Debug for SizeSelector<K>
impl<K, V, S> Debug for SizeSelector<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SizeSelector")
Expand All @@ -60,9 +65,11 @@ where
}
}

impl<K> SizeSelector<K>
impl<K, V, S> SizeSelector<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
pub fn new(threshold: usize) -> Self {
Self {
Expand All @@ -72,22 +79,26 @@ where
}
}

impl<K> Selector for SizeSelector<K>
impl<K, V, S> Selector for SizeSelector<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
type Key = K;
type Value = V;
type BuildHasher = S;

fn select<Q>(&self, #[allow(unused)] key: &Q) -> Selection
where
Self::Key: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
todo!()
fn select(&self, _entry: &CacheEntry<Self::Key, Self::Value, Self::BuildHasher>, buffer: &IoBuffer) -> Selection {
if buffer.len() < self.threshold {
Selection::Left
} else {
Selection::Right
}
}
}

enum LoadFuture<F1, F2, F3, F4, F5, F6, F7> {
enum StoreFuture<F1, F2, F3, F4, F5, F6, F7> {
Noop(F1),
Large(F2),
LargeRuntime(F3),
Expand All @@ -97,28 +108,28 @@ enum LoadFuture<F1, F2, F3, F4, F5, F6, F7> {
CombinedRuntime(F7),
}

impl<F1, F2, F3, F4, F5, F6, F7> LoadFuture<F1, F2, F3, F4, F5, F6, F7> {
impl<F1, F2, F3, F4, F5, F6, F7> StoreFuture<F1, F2, F3, F4, F5, F6, F7> {
// TODO(MrCroxx): use `expect` after `lint_reasons` is stable.
#[allow(clippy::type_complexity)]
pub fn as_pin_mut(
self: Pin<&mut Self>,
) -> LoadFuture<Pin<&mut F1>, Pin<&mut F2>, Pin<&mut F3>, Pin<&mut F4>, Pin<&mut F5>, Pin<&mut F6>, Pin<&mut F7>>
) -> StoreFuture<Pin<&mut F1>, Pin<&mut F2>, Pin<&mut F3>, Pin<&mut F4>, Pin<&mut F5>, Pin<&mut F6>, Pin<&mut F7>>
{
unsafe {
match *Pin::get_unchecked_mut(self) {
LoadFuture::Noop(ref mut inner) => LoadFuture::Noop(Pin::new_unchecked(inner)),
LoadFuture::Large(ref mut inner) => LoadFuture::Large(Pin::new_unchecked(inner)),
LoadFuture::LargeRuntime(ref mut inner) => LoadFuture::LargeRuntime(Pin::new_unchecked(inner)),
LoadFuture::Small(ref mut inner) => LoadFuture::Small(Pin::new_unchecked(inner)),
LoadFuture::SmallRuntime(ref mut inner) => LoadFuture::SmallRuntime(Pin::new_unchecked(inner)),
LoadFuture::Combined(ref mut inner) => LoadFuture::Combined(Pin::new_unchecked(inner)),
LoadFuture::CombinedRuntime(ref mut inner) => LoadFuture::CombinedRuntime(Pin::new_unchecked(inner)),
StoreFuture::Noop(ref mut inner) => StoreFuture::Noop(Pin::new_unchecked(inner)),
StoreFuture::Large(ref mut inner) => StoreFuture::Large(Pin::new_unchecked(inner)),
StoreFuture::LargeRuntime(ref mut inner) => StoreFuture::LargeRuntime(Pin::new_unchecked(inner)),
StoreFuture::Small(ref mut inner) => StoreFuture::Small(Pin::new_unchecked(inner)),
StoreFuture::SmallRuntime(ref mut inner) => StoreFuture::SmallRuntime(Pin::new_unchecked(inner)),
StoreFuture::Combined(ref mut inner) => StoreFuture::Combined(Pin::new_unchecked(inner)),
StoreFuture::CombinedRuntime(ref mut inner) => StoreFuture::CombinedRuntime(Pin::new_unchecked(inner)),
}
}
}
}

impl<F1, F2, F3, F4, F5, F6, F7> Future for LoadFuture<F1, F2, F3, F4, F5, F6, F7>
impl<F1, F2, F3, F4, F5, F6, F7> Future for StoreFuture<F1, F2, F3, F4, F5, F6, F7>
where
F1: Future,
F2: Future<Output = F1::Output>,
Expand All @@ -132,13 +143,13 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_pin_mut() {
LoadFuture::Noop(future) => future.poll(cx),
LoadFuture::Large(future) => future.poll(cx),
LoadFuture::LargeRuntime(future) => future.poll(cx),
LoadFuture::Small(future) => future.poll(cx),
LoadFuture::SmallRuntime(future) => future.poll(cx),
LoadFuture::Combined(future) => future.poll(cx),
LoadFuture::CombinedRuntime(future) => future.poll(cx),
StoreFuture::Noop(future) => future.poll(cx),
StoreFuture::Large(future) => future.poll(cx),
StoreFuture::LargeRuntime(future) => future.poll(cx),
StoreFuture::Small(future) => future.poll(cx),
StoreFuture::SmallRuntime(future) => future.poll(cx),
StoreFuture::Combined(future) => future.poll(cx),
StoreFuture::CombinedRuntime(future) => future.poll(cx),
}
}
}
Expand All @@ -156,10 +167,10 @@ where
LargeRuntime(RuntimeStoreConfig<GenericLargeStorage<K, V, S>>),
Small(GenericSmallStorageConfig<K, V, S>),
SmallRuntime(RuntimeStoreConfig<GenericSmallStorage<K, V, S>>),
Combined(EitherConfig<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K>>),
Combined(EitherConfig<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K, V, S>>),
CombinedRuntime(
RuntimeStoreConfig<
Either<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K>>,
Either<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K, V, S>>,
>,
),
}
Expand Down Expand Up @@ -202,10 +213,10 @@ where
/// Small object disk cache with a dedicated runtime.
SmallRuntime(Runtime<GenericSmallStorage<K, V, S>>),
/// Combined large and small object disk cache.
Combined(Either<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K>>),
Combined(Either<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K, V, S>>),
/// Combined large and small object disk cache with a dedicated runtime.
CombinedRuntime(
Runtime<Either<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K>>>,
Runtime<Either<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K, V, S>>>,
),
}

Expand Down Expand Up @@ -306,29 +317,29 @@ where
Q: Hash + Eq + ?Sized + Send + Sync + 'static,
{
match self {
Engine::Noop(storage) => LoadFuture::Noop(storage.load(key)),
Engine::Large(storage) => LoadFuture::Large(storage.load(key)),
Engine::LargeRuntime(storage) => LoadFuture::LargeRuntime(storage.load(key)),
Engine::Small(storage) => LoadFuture::Small(storage.load(key)),
Engine::SmallRuntime(storage) => LoadFuture::SmallRuntime(storage.load(key)),
Engine::Combined(storage) => LoadFuture::Combined(storage.load(key)),
Engine::CombinedRuntime(storage) => LoadFuture::CombinedRuntime(storage.load(key)),
Engine::Noop(storage) => StoreFuture::Noop(storage.load(key)),
Engine::Large(storage) => StoreFuture::Large(storage.load(key)),
Engine::LargeRuntime(storage) => StoreFuture::LargeRuntime(storage.load(key)),
Engine::Small(storage) => StoreFuture::Small(storage.load(key)),
Engine::SmallRuntime(storage) => StoreFuture::SmallRuntime(storage.load(key)),
Engine::Combined(storage) => StoreFuture::Combined(storage.load(key)),
Engine::CombinedRuntime(storage) => StoreFuture::CombinedRuntime(storage.load(key)),
}
}

fn delete<Q>(&self, key: &Q) -> crate::EnqueueHandle
fn delete<Q>(&self, key: &Q) -> WaitHandle<impl Future<Output = Result<bool>> + Send + 'static>
where
Self::Key: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
match self {
Engine::Noop(storage) => storage.delete(key),
Engine::Large(storage) => storage.delete(key),
Engine::LargeRuntime(storage) => storage.delete(key),
Engine::Small(storage) => storage.delete(key),
Engine::SmallRuntime(storage) => storage.delete(key),
Engine::Combined(storage) => storage.delete(key),
Engine::CombinedRuntime(storage) => storage.delete(key),
Engine::Noop(storage) => WaitHandle::new(StoreFuture::Noop(storage.delete(key))),
Engine::Large(storage) => WaitHandle::new(StoreFuture::Large(storage.delete(key))),
Engine::LargeRuntime(storage) => WaitHandle::new(StoreFuture::LargeRuntime(storage.delete(key))),
Engine::Small(storage) => WaitHandle::new(StoreFuture::Small(storage.delete(key))),
Engine::SmallRuntime(storage) => WaitHandle::new(StoreFuture::SmallRuntime(storage.delete(key))),
Engine::Combined(storage) => WaitHandle::new(StoreFuture::Combined(storage.delete(key))),
Engine::CombinedRuntime(storage) => WaitHandle::new(StoreFuture::CombinedRuntime(storage.delete(key))),
}
}

Expand Down
25 changes: 14 additions & 11 deletions foyer-storage/src/large/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ use foyer_common::{
metrics::Metrics,
};
use foyer_memory::{Cache, CacheEntry};
use futures::future::{join_all, try_join_all};
use futures::{
future::{join_all, try_join_all},
FutureExt,
};

use crate::{
compress::Compression,
Expand All @@ -41,7 +44,7 @@ use crate::{
region::RegionManager,
serde::{EntryDeserializer, KvInfo},
statistics::Statistics,
storage::{EnqueueHandle, Storage},
storage::{Storage, WaitHandle},
tombstone::{Tombstone, TombstoneLog, TombstoneLogConfig},
AtomicSequence,
};
Expand Down Expand Up @@ -361,20 +364,20 @@ where
.in_span(Span::enter_with_local_parent("foyer::storage::large::generic::load"))
}

fn delete<Q>(&self, key: &Q) -> EnqueueHandle
fn delete<Q>(&self, key: &Q) -> WaitHandle<impl Future<Output = Result<bool>> + Send + 'static>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let now = Instant::now();

let (tx, rx) = oneshot::channel();
let future = EnqueueHandle::new(rx);
let handle = WaitHandle::new(rx.map(|recv| recv.unwrap()));

if !self.inner.active.load(Ordering::Relaxed) {
tx.send(Err(anyhow::anyhow!("cannot delete entry after closed").into()))
.unwrap();
return future;
return handle;
}

let hash = self.inner.memory.hash_builder().hash_one(key);
Expand All @@ -397,7 +400,7 @@ where
self.inner.metrics.storage_delete.increment(1);
self.inner.metrics.storage_miss_duration.record(now.elapsed());

future
handle
}

fn may_contains<Q>(&self, key: &Q) -> bool
Expand All @@ -417,13 +420,13 @@ where
// Write an tombstone to clear tombstone log by increase the max sequence.
let sequence = self.inner.sequence.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = oneshot::channel();
let future = EnqueueHandle::new(rx);
let handle = WaitHandle::new(rx.map(|recv| recv.unwrap()));
self.inner.flushers[sequence as usize % self.inner.flushers.len()].submit(Submission::Tombstone {
tombstone: Tombstone { hash: 0, sequence },
stats: None,
tx,
});
future.await?;
handle.await?;

// Clear indices.
//
Expand Down Expand Up @@ -487,7 +490,7 @@ where
self.load(key)
}

fn delete<Q>(&self, key: &Q) -> EnqueueHandle
fn delete<Q>(&self, key: &Q) -> WaitHandle<impl Future<Output = Result<bool>> + Send + 'static>
where
Self::Key: Borrow<Q>,
Q: Hash + Eq + ?Sized,
Expand Down Expand Up @@ -630,7 +633,7 @@ mod tests {
fn enqueue(
store: &GenericLargeStorage<u64, Vec<u8>, RandomState>,
entry: CacheEntry<u64, Vec<u8>, RandomState>,
) -> EnqueueHandle {
) -> WaitHandle<impl Future<Output = Result<bool>>> {
let (tx, rx) = oneshot::channel();
let mut buffer = IoBuffer::new_in(&IO_BUFFER_ALLOCATOR);
let info = EntrySerializer::serialize(
Expand All @@ -641,7 +644,7 @@ mod tests {
)
.unwrap();
store.enqueue(entry, buffer, info, tx);
EnqueueHandle::new(rx)
WaitHandle::new(rx.map(|recv| recv.unwrap()))
}

#[test_log::test(tokio::test)]
Expand Down
6 changes: 3 additions & 3 deletions foyer-storage/src/large/reclaimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::{fmt::Debug, sync::Arc, time::Duration};

use foyer_common::code::{HashBuilder, StorageKey, StorageValue};
use futures::future::try_join_all;
use futures::{future::try_join_all, FutureExt};
use tokio::{
runtime::Handle,
sync::{mpsc, oneshot, Semaphore, SemaphorePermit},
Expand All @@ -32,7 +32,7 @@ use crate::{
picker::ReinsertionPicker,
region::{Region, RegionManager},
statistics::Statistics,
EnqueueHandle, Sequence,
Sequence, WaitHandle,
};

#[derive(Debug)]
Expand Down Expand Up @@ -198,7 +198,7 @@ where
};
let flusher = self.flushers[futures.len() % self.flushers.len()].clone();
let (tx, rx) = oneshot::channel();
let future = EnqueueHandle::new(rx);
let future = WaitHandle::new(rx.map(|recv| recv.unwrap()));
flusher.submit(Submission::Reinsertion {
reinsertion: Reinsertion {
hash: info.hash,
Expand Down
2 changes: 1 addition & 1 deletion foyer-storage/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use crate::{
statistics::Statistics,
storage::{
runtime::{RuntimeConfig, RuntimeConfigBuilder},
EnqueueHandle, Storage,
Storage, WaitHandle,
},
store::{CombinedConfig, DeviceConfig, Store, StoreBuilder},
tombstone::{TombstoneLogConfig, TombstoneLogConfigBuilder},
Expand Down
Loading

0 comments on commit 034032f

Please sign in to comment.