Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refine either storage #609

Merged
merged 2 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions foyer-common/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl<T, S> From<T> for Diversion<T, S> {
}

/// [`DiversionFuture`] is a future wrapper that partially store and partially return the future result.
#[must_use]
#[pin_project]
pub struct DiversionFuture<FU, T, S> {
#[pin]
Expand Down
107 changes: 59 additions & 48 deletions foyer-storage/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,26 @@
either::{Either, EitherConfig, Selection, Selector},
noop::Noop,
runtime::{Runtime, RuntimeStoreConfig},
WaitHandle,
},
DeviceStats, Storage,
};

pub struct SizeSelector<K>
pub struct SizeSelector<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
threshold: usize,
_marker: PhantomData<K>,
_marker: PhantomData<(K, V, S)>,
}

impl<K> Debug for SizeSelector<K>
impl<K, V, S> Debug for SizeSelector<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SizeSelector")
Expand All @@ -60,9 +65,11 @@
}
}

impl<K> SizeSelector<K>
impl<K, V, S> SizeSelector<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
pub fn new(threshold: usize) -> Self {
Self {
Expand All @@ -72,22 +79,26 @@
}
}

impl<K> Selector for SizeSelector<K>
impl<K, V, S> Selector for SizeSelector<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
type Key = K;
type Value = V;
type BuildHasher = S;

fn select<Q>(&self, #[allow(unused)] key: &Q) -> Selection
where
Self::Key: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
todo!()
fn select(&self, _entry: &CacheEntry<Self::Key, Self::Value, Self::BuildHasher>, buffer: &IoBuffer) -> Selection {
if buffer.len() < self.threshold {
Selection::Left

Check warning on line 94 in foyer-storage/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/engine.rs#L92-L94

Added lines #L92 - L94 were not covered by tests
} else {
Selection::Right

Check warning on line 96 in foyer-storage/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/engine.rs#L96

Added line #L96 was not covered by tests
}
}
}

enum LoadFuture<F1, F2, F3, F4, F5, F6, F7> {
enum StoreFuture<F1, F2, F3, F4, F5, F6, F7> {
Noop(F1),
Large(F2),
LargeRuntime(F3),
Expand All @@ -97,28 +108,28 @@
CombinedRuntime(F7),
}

impl<F1, F2, F3, F4, F5, F6, F7> LoadFuture<F1, F2, F3, F4, F5, F6, F7> {
impl<F1, F2, F3, F4, F5, F6, F7> StoreFuture<F1, F2, F3, F4, F5, F6, F7> {
// TODO(MrCroxx): use `expect` after `lint_reasons` is stable.
#[allow(clippy::type_complexity)]
pub fn as_pin_mut(
self: Pin<&mut Self>,
) -> LoadFuture<Pin<&mut F1>, Pin<&mut F2>, Pin<&mut F3>, Pin<&mut F4>, Pin<&mut F5>, Pin<&mut F6>, Pin<&mut F7>>
) -> StoreFuture<Pin<&mut F1>, Pin<&mut F2>, Pin<&mut F3>, Pin<&mut F4>, Pin<&mut F5>, Pin<&mut F6>, Pin<&mut F7>>
{
unsafe {
match *Pin::get_unchecked_mut(self) {
LoadFuture::Noop(ref mut inner) => LoadFuture::Noop(Pin::new_unchecked(inner)),
LoadFuture::Large(ref mut inner) => LoadFuture::Large(Pin::new_unchecked(inner)),
LoadFuture::LargeRuntime(ref mut inner) => LoadFuture::LargeRuntime(Pin::new_unchecked(inner)),
LoadFuture::Small(ref mut inner) => LoadFuture::Small(Pin::new_unchecked(inner)),
LoadFuture::SmallRuntime(ref mut inner) => LoadFuture::SmallRuntime(Pin::new_unchecked(inner)),
LoadFuture::Combined(ref mut inner) => LoadFuture::Combined(Pin::new_unchecked(inner)),
LoadFuture::CombinedRuntime(ref mut inner) => LoadFuture::CombinedRuntime(Pin::new_unchecked(inner)),
StoreFuture::Noop(ref mut inner) => StoreFuture::Noop(Pin::new_unchecked(inner)),

Check warning on line 120 in foyer-storage/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/engine.rs#L120

Added line #L120 was not covered by tests
StoreFuture::Large(ref mut inner) => StoreFuture::Large(Pin::new_unchecked(inner)),
StoreFuture::LargeRuntime(ref mut inner) => StoreFuture::LargeRuntime(Pin::new_unchecked(inner)),
StoreFuture::Small(ref mut inner) => StoreFuture::Small(Pin::new_unchecked(inner)),
StoreFuture::SmallRuntime(ref mut inner) => StoreFuture::SmallRuntime(Pin::new_unchecked(inner)),
StoreFuture::Combined(ref mut inner) => StoreFuture::Combined(Pin::new_unchecked(inner)),
StoreFuture::CombinedRuntime(ref mut inner) => StoreFuture::CombinedRuntime(Pin::new_unchecked(inner)),

Check warning on line 126 in foyer-storage/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/engine.rs#L123-L126

Added lines #L123 - L126 were not covered by tests
}
}
}
}

impl<F1, F2, F3, F4, F5, F6, F7> Future for LoadFuture<F1, F2, F3, F4, F5, F6, F7>
impl<F1, F2, F3, F4, F5, F6, F7> Future for StoreFuture<F1, F2, F3, F4, F5, F6, F7>
where
F1: Future,
F2: Future<Output = F1::Output>,
Expand All @@ -132,13 +143,13 @@

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_pin_mut() {
LoadFuture::Noop(future) => future.poll(cx),
LoadFuture::Large(future) => future.poll(cx),
LoadFuture::LargeRuntime(future) => future.poll(cx),
LoadFuture::Small(future) => future.poll(cx),
LoadFuture::SmallRuntime(future) => future.poll(cx),
LoadFuture::Combined(future) => future.poll(cx),
LoadFuture::CombinedRuntime(future) => future.poll(cx),
StoreFuture::Noop(future) => future.poll(cx),

Check warning on line 146 in foyer-storage/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/engine.rs#L146

Added line #L146 was not covered by tests
StoreFuture::Large(future) => future.poll(cx),
StoreFuture::LargeRuntime(future) => future.poll(cx),
StoreFuture::Small(future) => future.poll(cx),
StoreFuture::SmallRuntime(future) => future.poll(cx),
StoreFuture::Combined(future) => future.poll(cx),
StoreFuture::CombinedRuntime(future) => future.poll(cx),

Check warning on line 152 in foyer-storage/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/engine.rs#L149-L152

Added lines #L149 - L152 were not covered by tests
}
}
}
Expand All @@ -156,10 +167,10 @@
LargeRuntime(RuntimeStoreConfig<GenericLargeStorage<K, V, S>>),
Small(GenericSmallStorageConfig<K, V, S>),
SmallRuntime(RuntimeStoreConfig<GenericSmallStorage<K, V, S>>),
Combined(EitherConfig<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K>>),
Combined(EitherConfig<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K, V, S>>),
CombinedRuntime(
RuntimeStoreConfig<
Either<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K>>,
Either<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K, V, S>>,
>,
),
}
Expand Down Expand Up @@ -202,10 +213,10 @@
/// Small object disk cache with a dedicated runtime.
SmallRuntime(Runtime<GenericSmallStorage<K, V, S>>),
/// Combined large and small object disk cache.
Combined(Either<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K>>),
Combined(Either<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K, V, S>>),
/// Combined large and small object disk cache with a dedicated runtime.
CombinedRuntime(
Runtime<Either<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K>>>,
Runtime<Either<K, V, S, GenericSmallStorage<K, V, S>, GenericLargeStorage<K, V, S>, SizeSelector<K, V, S>>>,
),
}

Expand Down Expand Up @@ -306,29 +317,29 @@
Q: Hash + Eq + ?Sized + Send + Sync + 'static,
{
match self {
Engine::Noop(storage) => LoadFuture::Noop(storage.load(key)),
Engine::Large(storage) => LoadFuture::Large(storage.load(key)),
Engine::LargeRuntime(storage) => LoadFuture::LargeRuntime(storage.load(key)),
Engine::Small(storage) => LoadFuture::Small(storage.load(key)),
Engine::SmallRuntime(storage) => LoadFuture::SmallRuntime(storage.load(key)),
Engine::Combined(storage) => LoadFuture::Combined(storage.load(key)),
Engine::CombinedRuntime(storage) => LoadFuture::CombinedRuntime(storage.load(key)),
Engine::Noop(storage) => StoreFuture::Noop(storage.load(key)),

Check warning on line 320 in foyer-storage/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/engine.rs#L320

Added line #L320 was not covered by tests
Engine::Large(storage) => StoreFuture::Large(storage.load(key)),
Engine::LargeRuntime(storage) => StoreFuture::LargeRuntime(storage.load(key)),
Engine::Small(storage) => StoreFuture::Small(storage.load(key)),
Engine::SmallRuntime(storage) => StoreFuture::SmallRuntime(storage.load(key)),
Engine::Combined(storage) => StoreFuture::Combined(storage.load(key)),
Engine::CombinedRuntime(storage) => StoreFuture::CombinedRuntime(storage.load(key)),

Check warning on line 326 in foyer-storage/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/engine.rs#L323-L326

Added lines #L323 - L326 were not covered by tests
}
}

fn delete<Q>(&self, key: &Q) -> crate::EnqueueHandle
fn delete<Q>(&self, key: &Q) -> WaitHandle<impl Future<Output = Result<bool>> + Send + 'static>
where
Self::Key: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
match self {
Engine::Noop(storage) => storage.delete(key),
Engine::Large(storage) => storage.delete(key),
Engine::LargeRuntime(storage) => storage.delete(key),
Engine::Small(storage) => storage.delete(key),
Engine::SmallRuntime(storage) => storage.delete(key),
Engine::Combined(storage) => storage.delete(key),
Engine::CombinedRuntime(storage) => storage.delete(key),
Engine::Noop(storage) => WaitHandle::new(StoreFuture::Noop(storage.delete(key))),

Check warning on line 336 in foyer-storage/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/engine.rs#L336

Added line #L336 was not covered by tests
Engine::Large(storage) => WaitHandle::new(StoreFuture::Large(storage.delete(key))),
Engine::LargeRuntime(storage) => WaitHandle::new(StoreFuture::LargeRuntime(storage.delete(key))),
Engine::Small(storage) => WaitHandle::new(StoreFuture::Small(storage.delete(key))),
Engine::SmallRuntime(storage) => WaitHandle::new(StoreFuture::SmallRuntime(storage.delete(key))),
Engine::Combined(storage) => WaitHandle::new(StoreFuture::Combined(storage.delete(key))),
Engine::CombinedRuntime(storage) => WaitHandle::new(StoreFuture::CombinedRuntime(storage.delete(key))),

Check warning on line 342 in foyer-storage/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/engine.rs#L338-L342

Added lines #L338 - L342 were not covered by tests
}
}

Expand Down
25 changes: 14 additions & 11 deletions foyer-storage/src/large/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
metrics::Metrics,
};
use foyer_memory::{Cache, CacheEntry};
use futures::future::{join_all, try_join_all};
use futures::{
future::{join_all, try_join_all},
FutureExt,
};

use crate::{
compress::Compression,
Expand All @@ -41,7 +44,7 @@
region::RegionManager,
serde::{EntryDeserializer, KvInfo},
statistics::Statistics,
storage::{EnqueueHandle, Storage},
storage::{Storage, WaitHandle},
tombstone::{Tombstone, TombstoneLog, TombstoneLogConfig},
AtomicSequence,
};
Expand Down Expand Up @@ -361,20 +364,20 @@
.in_span(Span::enter_with_local_parent("foyer::storage::large::generic::load"))
}

fn delete<Q>(&self, key: &Q) -> EnqueueHandle
fn delete<Q>(&self, key: &Q) -> WaitHandle<impl Future<Output = Result<bool>> + Send + 'static>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let now = Instant::now();

let (tx, rx) = oneshot::channel();
let future = EnqueueHandle::new(rx);
let handle = WaitHandle::new(rx.map(|recv| recv.unwrap()));

if !self.inner.active.load(Ordering::Relaxed) {
tx.send(Err(anyhow::anyhow!("cannot delete entry after closed").into()))
.unwrap();
return future;
return handle;

Check warning on line 380 in foyer-storage/src/large/generic.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/large/generic.rs#L380

Added line #L380 was not covered by tests
}

let hash = self.inner.memory.hash_builder().hash_one(key);
Expand All @@ -397,7 +400,7 @@
self.inner.metrics.storage_delete.increment(1);
self.inner.metrics.storage_miss_duration.record(now.elapsed());

future
handle
}

fn may_contains<Q>(&self, key: &Q) -> bool
Expand All @@ -417,13 +420,13 @@
// Write an tombstone to clear tombstone log by increase the max sequence.
let sequence = self.inner.sequence.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = oneshot::channel();
let future = EnqueueHandle::new(rx);
let handle = WaitHandle::new(rx.map(|recv| recv.unwrap()));
self.inner.flushers[sequence as usize % self.inner.flushers.len()].submit(Submission::Tombstone {
tombstone: Tombstone { hash: 0, sequence },
stats: None,
tx,
});
future.await?;
handle.await?;

// Clear indices.
//
Expand Down Expand Up @@ -487,7 +490,7 @@
self.load(key)
}

fn delete<Q>(&self, key: &Q) -> EnqueueHandle
fn delete<Q>(&self, key: &Q) -> WaitHandle<impl Future<Output = Result<bool>> + Send + 'static>
where
Self::Key: Borrow<Q>,
Q: Hash + Eq + ?Sized,
Expand Down Expand Up @@ -630,7 +633,7 @@
fn enqueue(
store: &GenericLargeStorage<u64, Vec<u8>, RandomState>,
entry: CacheEntry<u64, Vec<u8>, RandomState>,
) -> EnqueueHandle {
) -> WaitHandle<impl Future<Output = Result<bool>>> {
let (tx, rx) = oneshot::channel();
let mut buffer = IoBuffer::new_in(&IO_BUFFER_ALLOCATOR);
let info = EntrySerializer::serialize(
Expand All @@ -641,7 +644,7 @@
)
.unwrap();
store.enqueue(entry, buffer, info, tx);
EnqueueHandle::new(rx)
WaitHandle::new(rx.map(|recv| recv.unwrap()))
}

#[test_log::test(tokio::test)]
Expand Down
6 changes: 3 additions & 3 deletions foyer-storage/src/large/reclaimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::{fmt::Debug, sync::Arc, time::Duration};

use foyer_common::code::{HashBuilder, StorageKey, StorageValue};
use futures::future::try_join_all;
use futures::{future::try_join_all, FutureExt};
use tokio::{
runtime::Handle,
sync::{mpsc, oneshot, Semaphore, SemaphorePermit},
Expand All @@ -32,7 +32,7 @@ use crate::{
picker::ReinsertionPicker,
region::{Region, RegionManager},
statistics::Statistics,
EnqueueHandle, Sequence,
Sequence, WaitHandle,
};

#[derive(Debug)]
Expand Down Expand Up @@ -198,7 +198,7 @@ where
};
let flusher = self.flushers[futures.len() % self.flushers.len()].clone();
let (tx, rx) = oneshot::channel();
let future = EnqueueHandle::new(rx);
let future = WaitHandle::new(rx.map(|recv| recv.unwrap()));
flusher.submit(Submission::Reinsertion {
reinsertion: Reinsertion {
hash: info.hash,
Expand Down
2 changes: 1 addition & 1 deletion foyer-storage/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use crate::{
statistics::Statistics,
storage::{
runtime::{RuntimeConfig, RuntimeConfigBuilder},
EnqueueHandle, Storage,
Storage, WaitHandle,
},
store::{CombinedConfig, DeviceConfig, Store, StoreBuilder},
tombstone::{TombstoneLogConfig, TombstoneLogConfigBuilder},
Expand Down
Loading
Loading