Skip to content

Commit

Permalink
feat: introduce submit queue size threshold (#749)
Browse files Browse the repository at this point in the history
* feat: introduce submit queue size threshold

Signed-off-by: MrCroxx <[email protected]>

* chore: update hint

Signed-off-by: MrCroxx <[email protected]>

---------

Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Sep 29, 2024
1 parent 7b2b9ce commit 7a0a35f
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 5 deletions.
27 changes: 23 additions & 4 deletions foyer-storage/src/large/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
use std::{
fmt::Debug,
future::Future,
sync::{atomic::Ordering, Arc},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};

use foyer_common::{
Expand Down Expand Up @@ -103,6 +106,7 @@ where
S: HashBuilder + Debug,
{
tx: flume::Sender<Submission<K, V, S>>,
submit_queue_size: Arc<AtomicUsize>,

metrics: Arc<Metrics>,
}
Expand All @@ -116,6 +120,7 @@ where
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
submit_queue_size: self.submit_queue_size.clone(),
metrics: self.metrics.clone(),
}
}
Expand All @@ -133,6 +138,7 @@ where
indexer: Indexer,
region_manager: RegionManager,
device: MonitoredDevice,
submit_queue_size: Arc<AtomicUsize>,
tombstone_log: Option<TombstoneLog>,
stats: Arc<Statistics>,
metrics: Arc<Metrics>,
Expand All @@ -153,6 +159,7 @@ where
rx,
batch,
flight: Arc::new(Semaphore::new(1)),
submit_queue_size: submit_queue_size.clone(),
region_manager,
indexer,
tombstone_log,
Expand All @@ -168,11 +175,18 @@ where
}
});

Ok(Self { tx, metrics })
Ok(Self {
tx,
submit_queue_size,
metrics,
})
}

pub fn submit(&self, submission: Submission<K, V, S>) {
tracing::trace!("[lodc flusher]: submit task: {submission:?}");
if let Submission::CacheEntry { estimated_size, .. } = &submission {
self.submit_queue_size.fetch_add(*estimated_size, Ordering::Relaxed);
}
if let Err(e) = self.tx.send(submission) {
tracing::error!("[lodc flusher]: error raised when submitting task, error: {e}");
}
Expand All @@ -196,6 +210,7 @@ where
rx: flume::Receiver<Submission<K, V, S>>,
batch: BatchMut<K, V, S>,
flight: Arc<Semaphore>,
submit_queue_size: Arc<AtomicUsize>,

region_manager: RegionManager,
indexer: Indexer,
Expand Down Expand Up @@ -245,9 +260,13 @@ where
match submission {
Submission::CacheEntry {
entry,
estimated_size: _,
estimated_size,
sequence,
} => report(self.batch.entry(entry, &self.compression, sequence)),
} => {
report(self.batch.entry(entry, &self.compression, sequence));
self.submit_queue_size.fetch_sub(estimated_size, Ordering::Relaxed);
}

Submission::Tombstone { tombstone, stats } => self.batch.tombstone(tombstone, stats),
Submission::Reinsertion { reinsertion } => report(self.batch.reinsertion(&reinsertion)),
Submission::Wait { tx } => self.batch.wait(tx),
Expand Down
23 changes: 22 additions & 1 deletion foyer-storage/src/large/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
marker::PhantomData,
ops::Range,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
time::Instant,
Expand Down Expand Up @@ -75,6 +75,7 @@ where
pub flushers: usize,
pub reclaimers: usize,
pub buffer_pool_size: usize,
pub submit_queue_size_threshold: usize,
pub clean_region_threshold: usize,
pub eviction_pickers: Vec<Box<dyn EvictionPicker>>,
pub reinsertion_picker: Arc<dyn ReinsertionPicker<Key = K>>,
Expand Down Expand Up @@ -102,6 +103,7 @@ where
.field("flushers", &self.flushers)
.field("reclaimers", &self.reclaimers)
.field("buffer_pool_size", &self.buffer_pool_size)
.field("submit_queue_size_threshold", &self.submit_queue_size_threshold)
.field("clean_region_threshold", &self.clean_region_threshold)
.field("eviction_pickers", &self.eviction_pickers)
.field("reinsertion_pickers", &self.reinsertion_picker)
Expand Down Expand Up @@ -145,6 +147,9 @@ where
flushers: Vec<Flusher<K, V, S>>,
reclaimers: Vec<Reclaimer>,

submit_queue_size: Arc<AtomicUsize>,
submit_queue_size_threshold: usize,

statistics: Arc<Statistics>,

flush: bool,
Expand Down Expand Up @@ -213,6 +218,7 @@ where
metrics.clone(),
);
let sequence = AtomicSequence::default();
let submit_queue_size = Arc::<AtomicUsize>::default();

RecoverRunner::run(
&config,
Expand All @@ -232,6 +238,7 @@ where
indexer.clone(),
region_manager.clone(),
device.clone(),
submit_queue_size.clone(),
tombstone_log.clone(),
stats.clone(),
metrics.clone(),
Expand Down Expand Up @@ -262,6 +269,8 @@ where
region_manager,
flushers,
reclaimers,
submit_queue_size,
submit_queue_size_threshold: config.submit_queue_size_threshold,
statistics: stats,
flush: config.flush,
sequence,
Expand Down Expand Up @@ -294,7 +303,17 @@ where
return;
}

if self.inner.submit_queue_size.load(Ordering::Relaxed) > self.inner.submit_queue_size_threshold {
tracing::warn!(
"[lodc] {} {}",
"submit queue overflow, new entry ignored.",
"Hint: set an appropriate rate limiter as the admission picker or scale out flushers."
);
return;
}

let sequence = self.inner.sequence.fetch_add(1, Ordering::Relaxed);

self.inner.flushers[sequence as usize % self.inner.flushers.len()].submit(Submission::CacheEntry {
entry,
estimated_size,
Expand Down Expand Up @@ -553,6 +572,7 @@ mod tests {
reinsertion_picker,
tombstone_log_config: None,
buffer_pool_size: 16 * 1024 * 1024,
submit_queue_size_threshold: 16 * 1024 * 1024 * 2,
statistics: Arc::<Statistics>::default(),
runtime: Runtime::new(None, None, Handle::current()),
marker: PhantomData,
Expand Down Expand Up @@ -582,6 +602,7 @@ mod tests {
reinsertion_picker: Arc::<RejectAllPicker<u64>>::default(),
tombstone_log_config: Some(TombstoneLogConfigBuilder::new(path).with_flush(true).build()),
buffer_pool_size: 16 * 1024 * 1024,
submit_queue_size_threshold: 16 * 1024 * 1024 * 2,
statistics: Arc::<Statistics>::default(),
runtime: Runtime::new(None, None, Handle::current()),
marker: PhantomData,
Expand Down
15 changes: 15 additions & 0 deletions foyer-storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ where
reinsertion_picker: self.large.reinsertion_picker,
tombstone_log_config: self.large.tombstone_log_config,
buffer_pool_size: self.large.buffer_pool_size,
submit_queue_size_threshold: self.large.submit_queue_size_threshold.unwrap_or(self.large.buffer_pool_size * 2),
statistics: statistics.clone(),
runtime,
marker: PhantomData,
Expand Down Expand Up @@ -614,6 +615,7 @@ where
reinsertion_picker: self.large.reinsertion_picker,
tombstone_log_config: self.large.tombstone_log_config,
buffer_pool_size: self.large.buffer_pool_size,
submit_queue_size_threshold: self.large.submit_queue_size_threshold.unwrap_or(self.large.buffer_pool_size * 2),
statistics: statistics.clone(),
runtime,
marker: PhantomData,
Expand Down Expand Up @@ -656,6 +658,7 @@ where
flushers: usize,
reclaimers: usize,
buffer_pool_size: usize,
submit_queue_size_threshold: Option<usize>,
clean_region_threshold: Option<usize>,
eviction_pickers: Vec<Box<dyn EvictionPicker>>,
reinsertion_picker: Arc<dyn ReinsertionPicker<Key = K>>,
Expand Down Expand Up @@ -689,6 +692,7 @@ where
flushers: 1,
reclaimers: 1,
buffer_pool_size: 16 * 1024 * 1024, // 16 MiB
submit_queue_size_threshold: None,
clean_region_threshold: None,
eviction_pickers: vec![Box::new(InvalidRatioPicker::new(0.8)), Box::<FifoPicker>::default()],
reinsertion_picker: Arc::<RejectAllPicker<K>>::default(),
Expand Down Expand Up @@ -745,6 +749,17 @@ where
self
}

/// Set the submit queue size threshold.
///
/// If the total entry estimated size in the submit queue exceeds the threshold, the further entries will be
/// ignored.
///
/// Default: `buffer_pool_size`` * 2.
pub fn with_submit_queue_size_threshold(mut self, buffer_pool_size: usize) -> Self {
self.buffer_pool_size = buffer_pool_size;
self
}

/// Set the clean region threshold for the disk cache store.
///
/// The reclaimers only work when the clean region count is equal to or lower than the clean region threshold.
Expand Down

0 comments on commit 7a0a35f

Please sign in to comment.