diff --git a/foyer-storage/src/large/flusher.rs b/foyer-storage/src/large/flusher.rs index 4a789540..c3e1053d 100644 --- a/foyer-storage/src/large/flusher.rs +++ b/foyer-storage/src/large/flusher.rs @@ -15,7 +15,10 @@ use std::{ fmt::Debug, future::Future, - sync::{atomic::Ordering, Arc}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; use foyer_common::{ @@ -103,6 +106,7 @@ where S: HashBuilder + Debug, { tx: flume::Sender>, + submit_queue_size: Arc, metrics: Arc, } @@ -116,6 +120,7 @@ where fn clone(&self) -> Self { Self { tx: self.tx.clone(), + submit_queue_size: self.submit_queue_size.clone(), metrics: self.metrics.clone(), } } @@ -133,6 +138,7 @@ where indexer: Indexer, region_manager: RegionManager, device: MonitoredDevice, + submit_queue_size: Arc, tombstone_log: Option, stats: Arc, metrics: Arc, @@ -153,6 +159,7 @@ where rx, batch, flight: Arc::new(Semaphore::new(1)), + submit_queue_size: submit_queue_size.clone(), region_manager, indexer, tombstone_log, @@ -168,11 +175,18 @@ where } }); - Ok(Self { tx, metrics }) + Ok(Self { + tx, + submit_queue_size, + metrics, + }) } pub fn submit(&self, submission: Submission) { tracing::trace!("[lodc flusher]: submit task: {submission:?}"); + if let Submission::CacheEntry { estimated_size, .. } = &submission { + self.submit_queue_size.fetch_add(*estimated_size, Ordering::Relaxed); + } if let Err(e) = self.tx.send(submission) { tracing::error!("[lodc flusher]: error raised when submitting task, error: {e}"); } @@ -196,6 +210,7 @@ where rx: flume::Receiver>, batch: BatchMut, flight: Arc, + submit_queue_size: Arc, region_manager: RegionManager, indexer: Indexer, @@ -245,9 +260,13 @@ where match submission { Submission::CacheEntry { entry, - estimated_size: _, + estimated_size, sequence, - } => report(self.batch.entry(entry, &self.compression, sequence)), + } => { + report(self.batch.entry(entry, &self.compression, sequence)); + self.submit_queue_size.fetch_sub(estimated_size, Ordering::Relaxed); + } + Submission::Tombstone { tombstone, stats } => self.batch.tombstone(tombstone, stats), Submission::Reinsertion { reinsertion } => report(self.batch.reinsertion(&reinsertion)), Submission::Wait { tx } => self.batch.wait(tx), diff --git a/foyer-storage/src/large/generic.rs b/foyer-storage/src/large/generic.rs index 5c8fce90..047d96a7 100644 --- a/foyer-storage/src/large/generic.rs +++ b/foyer-storage/src/large/generic.rs @@ -18,7 +18,7 @@ use std::{ marker::PhantomData, ops::Range, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, time::Instant, @@ -75,6 +75,7 @@ where pub flushers: usize, pub reclaimers: usize, pub buffer_pool_size: usize, + pub submit_queue_size_threshold: usize, pub clean_region_threshold: usize, pub eviction_pickers: Vec>, pub reinsertion_picker: Arc>, @@ -102,6 +103,7 @@ where .field("flushers", &self.flushers) .field("reclaimers", &self.reclaimers) .field("buffer_pool_size", &self.buffer_pool_size) + .field("submit_queue_size_threshold", &self.submit_queue_size_threshold) .field("clean_region_threshold", &self.clean_region_threshold) .field("eviction_pickers", &self.eviction_pickers) .field("reinsertion_pickers", &self.reinsertion_picker) @@ -145,6 +147,9 @@ where flushers: Vec>, reclaimers: Vec, + submit_queue_size: Arc, + submit_queue_size_threshold: usize, + statistics: Arc, flush: bool, @@ -213,6 +218,7 @@ where metrics.clone(), ); let sequence = AtomicSequence::default(); + let submit_queue_size = Arc::::default(); RecoverRunner::run( &config, @@ -232,6 +238,7 @@ where indexer.clone(), region_manager.clone(), device.clone(), + submit_queue_size.clone(), tombstone_log.clone(), stats.clone(), metrics.clone(), @@ -262,6 +269,8 @@ where region_manager, flushers, reclaimers, + submit_queue_size, + submit_queue_size_threshold: config.submit_queue_size_threshold, statistics: stats, flush: config.flush, sequence, @@ -294,7 +303,17 @@ where return; } + if self.inner.submit_queue_size.load(Ordering::Relaxed) > self.inner.submit_queue_size_threshold { + tracing::warn!( + "[lodc] {} {}", + "submit queue overflow, new entry ignored.", + "Hint: set an appropriate rate limiter as the admission picker or scale out flushers." + ); + return; + } + let sequence = self.inner.sequence.fetch_add(1, Ordering::Relaxed); + self.inner.flushers[sequence as usize % self.inner.flushers.len()].submit(Submission::CacheEntry { entry, estimated_size, @@ -553,6 +572,7 @@ mod tests { reinsertion_picker, tombstone_log_config: None, buffer_pool_size: 16 * 1024 * 1024, + submit_queue_size_threshold: 16 * 1024 * 1024 * 2, statistics: Arc::::default(), runtime: Runtime::new(None, None, Handle::current()), marker: PhantomData, @@ -582,6 +602,7 @@ mod tests { reinsertion_picker: Arc::>::default(), tombstone_log_config: Some(TombstoneLogConfigBuilder::new(path).with_flush(true).build()), buffer_pool_size: 16 * 1024 * 1024, + submit_queue_size_threshold: 16 * 1024 * 1024 * 2, statistics: Arc::::default(), runtime: Runtime::new(None, None, Handle::current()), marker: PhantomData, diff --git a/foyer-storage/src/store.rs b/foyer-storage/src/store.rs index ac49db59..741dd9c6 100644 --- a/foyer-storage/src/store.rs +++ b/foyer-storage/src/store.rs @@ -558,6 +558,7 @@ where reinsertion_picker: self.large.reinsertion_picker, tombstone_log_config: self.large.tombstone_log_config, buffer_pool_size: self.large.buffer_pool_size, + submit_queue_size_threshold: self.large.submit_queue_size_threshold.unwrap_or(self.large.buffer_pool_size * 2), statistics: statistics.clone(), runtime, marker: PhantomData, @@ -614,6 +615,7 @@ where reinsertion_picker: self.large.reinsertion_picker, tombstone_log_config: self.large.tombstone_log_config, buffer_pool_size: self.large.buffer_pool_size, + submit_queue_size_threshold: self.large.submit_queue_size_threshold.unwrap_or(self.large.buffer_pool_size * 2), statistics: statistics.clone(), runtime, marker: PhantomData, @@ -656,6 +658,7 @@ where flushers: usize, reclaimers: usize, buffer_pool_size: usize, + submit_queue_size_threshold: Option, clean_region_threshold: Option, eviction_pickers: Vec>, reinsertion_picker: Arc>, @@ -689,6 +692,7 @@ where flushers: 1, reclaimers: 1, buffer_pool_size: 16 * 1024 * 1024, // 16 MiB + submit_queue_size_threshold: None, clean_region_threshold: None, eviction_pickers: vec![Box::new(InvalidRatioPicker::new(0.8)), Box::::default()], reinsertion_picker: Arc::>::default(), @@ -745,6 +749,17 @@ where self } + /// Set the submit queue size threshold. + /// + /// If the total entry estimated size in the submit queue exceeds the threshold, the further entries will be + /// ignored. + /// + /// Default: `buffer_pool_size`` * 2. + pub fn with_submit_queue_size_threshold(mut self, buffer_pool_size: usize) -> Self { + self.buffer_pool_size = buffer_pool_size; + self + } + /// Set the clean region threshold for the disk cache store. /// /// The reclaimers only work when the clean region count is equal to or lower than the clean region threshold.