Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce submit queue size threshold #749

Merged
merged 2 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions foyer-storage/src/large/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
use std::{
fmt::Debug,
future::Future,
sync::{atomic::Ordering, Arc},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};

use foyer_common::{
Expand Down Expand Up @@ -103,6 +106,7 @@ where
S: HashBuilder + Debug,
{
tx: flume::Sender<Submission<K, V, S>>,
submit_queue_size: Arc<AtomicUsize>,

metrics: Arc<Metrics>,
}
Expand All @@ -116,6 +120,7 @@ where
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
submit_queue_size: self.submit_queue_size.clone(),
metrics: self.metrics.clone(),
}
}
Expand All @@ -133,6 +138,7 @@ where
indexer: Indexer,
region_manager: RegionManager,
device: MonitoredDevice,
submit_queue_size: Arc<AtomicUsize>,
tombstone_log: Option<TombstoneLog>,
stats: Arc<Statistics>,
metrics: Arc<Metrics>,
Expand All @@ -153,6 +159,7 @@ where
rx,
batch,
flight: Arc::new(Semaphore::new(1)),
submit_queue_size: submit_queue_size.clone(),
region_manager,
indexer,
tombstone_log,
Expand All @@ -168,11 +175,18 @@ where
}
});

Ok(Self { tx, metrics })
Ok(Self {
tx,
submit_queue_size,
metrics,
})
}

pub fn submit(&self, submission: Submission<K, V, S>) {
tracing::trace!("[lodc flusher]: submit task: {submission:?}");
if let Submission::CacheEntry { estimated_size, .. } = &submission {
self.submit_queue_size.fetch_add(*estimated_size, Ordering::Relaxed);
}
if let Err(e) = self.tx.send(submission) {
tracing::error!("[lodc flusher]: error raised when submitting task, error: {e}");
}
Expand All @@ -196,6 +210,7 @@ where
rx: flume::Receiver<Submission<K, V, S>>,
batch: BatchMut<K, V, S>,
flight: Arc<Semaphore>,
submit_queue_size: Arc<AtomicUsize>,

region_manager: RegionManager,
indexer: Indexer,
Expand Down Expand Up @@ -245,9 +260,13 @@ where
match submission {
Submission::CacheEntry {
entry,
estimated_size: _,
estimated_size,
sequence,
} => report(self.batch.entry(entry, &self.compression, sequence)),
} => {
report(self.batch.entry(entry, &self.compression, sequence));
self.submit_queue_size.fetch_sub(estimated_size, Ordering::Relaxed);
}

Submission::Tombstone { tombstone, stats } => self.batch.tombstone(tombstone, stats),
Submission::Reinsertion { reinsertion } => report(self.batch.reinsertion(&reinsertion)),
Submission::Wait { tx } => self.batch.wait(tx),
Expand Down
23 changes: 22 additions & 1 deletion foyer-storage/src/large/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
marker::PhantomData,
ops::Range,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
time::Instant,
Expand Down Expand Up @@ -75,6 +75,7 @@
pub flushers: usize,
pub reclaimers: usize,
pub buffer_pool_size: usize,
pub submit_queue_size_threshold: usize,
pub clean_region_threshold: usize,
pub eviction_pickers: Vec<Box<dyn EvictionPicker>>,
pub reinsertion_picker: Arc<dyn ReinsertionPicker<Key = K>>,
Expand Down Expand Up @@ -102,6 +103,7 @@
.field("flushers", &self.flushers)
.field("reclaimers", &self.reclaimers)
.field("buffer_pool_size", &self.buffer_pool_size)
.field("submit_queue_size_threshold", &self.submit_queue_size_threshold)

Check warning on line 106 in foyer-storage/src/large/generic.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/large/generic.rs#L106

Added line #L106 was not covered by tests
.field("clean_region_threshold", &self.clean_region_threshold)
.field("eviction_pickers", &self.eviction_pickers)
.field("reinsertion_pickers", &self.reinsertion_picker)
Expand Down Expand Up @@ -145,6 +147,9 @@
flushers: Vec<Flusher<K, V, S>>,
reclaimers: Vec<Reclaimer>,

submit_queue_size: Arc<AtomicUsize>,
submit_queue_size_threshold: usize,

statistics: Arc<Statistics>,

flush: bool,
Expand Down Expand Up @@ -213,6 +218,7 @@
metrics.clone(),
);
let sequence = AtomicSequence::default();
let submit_queue_size = Arc::<AtomicUsize>::default();

RecoverRunner::run(
&config,
Expand All @@ -232,6 +238,7 @@
indexer.clone(),
region_manager.clone(),
device.clone(),
submit_queue_size.clone(),
tombstone_log.clone(),
stats.clone(),
metrics.clone(),
Expand Down Expand Up @@ -262,6 +269,8 @@
region_manager,
flushers,
reclaimers,
submit_queue_size,
submit_queue_size_threshold: config.submit_queue_size_threshold,
statistics: stats,
flush: config.flush,
sequence,
Expand Down Expand Up @@ -294,7 +303,17 @@
return;
}

if self.inner.submit_queue_size.load(Ordering::Relaxed) > self.inner.submit_queue_size_threshold {
tracing::warn!(
"[lodc] {} {}",
"submit queue overflow, new entry ignored.",
"Hint: set an appropriate rate limiter as the admission picker or scale out flushers."
);
return;
}

let sequence = self.inner.sequence.fetch_add(1, Ordering::Relaxed);

self.inner.flushers[sequence as usize % self.inner.flushers.len()].submit(Submission::CacheEntry {
entry,
estimated_size,
Expand Down Expand Up @@ -553,6 +572,7 @@
reinsertion_picker,
tombstone_log_config: None,
buffer_pool_size: 16 * 1024 * 1024,
submit_queue_size_threshold: 16 * 1024 * 1024 * 2,
statistics: Arc::<Statistics>::default(),
runtime: Runtime::new(None, None, Handle::current()),
marker: PhantomData,
Expand Down Expand Up @@ -582,6 +602,7 @@
reinsertion_picker: Arc::<RejectAllPicker<u64>>::default(),
tombstone_log_config: Some(TombstoneLogConfigBuilder::new(path).with_flush(true).build()),
buffer_pool_size: 16 * 1024 * 1024,
submit_queue_size_threshold: 16 * 1024 * 1024 * 2,
statistics: Arc::<Statistics>::default(),
runtime: Runtime::new(None, None, Handle::current()),
marker: PhantomData,
Expand Down
15 changes: 15 additions & 0 deletions foyer-storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@
reinsertion_picker: self.large.reinsertion_picker,
tombstone_log_config: self.large.tombstone_log_config,
buffer_pool_size: self.large.buffer_pool_size,
submit_queue_size_threshold: self.large.submit_queue_size_threshold.unwrap_or(self.large.buffer_pool_size * 2),
statistics: statistics.clone(),
runtime,
marker: PhantomData,
Expand Down Expand Up @@ -614,6 +615,7 @@
reinsertion_picker: self.large.reinsertion_picker,
tombstone_log_config: self.large.tombstone_log_config,
buffer_pool_size: self.large.buffer_pool_size,
submit_queue_size_threshold: self.large.submit_queue_size_threshold.unwrap_or(self.large.buffer_pool_size * 2),
statistics: statistics.clone(),
runtime,
marker: PhantomData,
Expand Down Expand Up @@ -656,6 +658,7 @@
flushers: usize,
reclaimers: usize,
buffer_pool_size: usize,
submit_queue_size_threshold: Option<usize>,
clean_region_threshold: Option<usize>,
eviction_pickers: Vec<Box<dyn EvictionPicker>>,
reinsertion_picker: Arc<dyn ReinsertionPicker<Key = K>>,
Expand Down Expand Up @@ -689,6 +692,7 @@
flushers: 1,
reclaimers: 1,
buffer_pool_size: 16 * 1024 * 1024, // 16 MiB
submit_queue_size_threshold: None,
clean_region_threshold: None,
eviction_pickers: vec![Box::new(InvalidRatioPicker::new(0.8)), Box::<FifoPicker>::default()],
reinsertion_picker: Arc::<RejectAllPicker<K>>::default(),
Expand Down Expand Up @@ -745,6 +749,17 @@
self
}

/// Set the submit queue size threshold.
///
/// If the total entry estimated size in the submit queue exceeds the threshold, the further entries will be
/// ignored.
///
/// Default: `buffer_pool_size`` * 2.
pub fn with_submit_queue_size_threshold(mut self, buffer_pool_size: usize) -> Self {
self.buffer_pool_size = buffer_pool_size;
self
}

Check warning on line 761 in foyer-storage/src/store.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/store.rs#L758-L761

Added lines #L758 - L761 were not covered by tests

/// Set the clean region threshold for the disk cache store.
///
/// The reclaimers only work when the clean region count is equal to or lower than the clean region threshold.
Expand Down
Loading