diff --git a/foyer/src/container.rs b/foyer/src/container.rs index 786ac6e4..583080a6 100644 --- a/foyer/src/container.rs +++ b/foyer/src/container.rs @@ -16,6 +16,7 @@ use std::collections::BTreeMap; use std::hash::Hasher; use std::ptr::NonNull; +use std::sync::Arc; use futures::future::try_join_all; use itertools::Itertools; @@ -24,7 +25,7 @@ use twox_hash::XxHash64; use crate::policies::{Handle, Policy}; use crate::store::Store; -use crate::{Data, Index, WrappedNonNull}; +use crate::{Data, Index, Metrics, WrappedNonNull}; // TODO(MrCroxx): wrap own result type use crate::store::error::Result; @@ -73,6 +74,8 @@ where { pool_count_bits: usize, pools: Vec>>, + + metrics: Arc, } impl Container @@ -85,11 +88,20 @@ where S: Store, { pub async fn open(config: Config) -> Result { + Self::open_with_registry(config, prometheus::Registry::new()).await + } + + pub async fn open_with_registry( + config: Config, + registry: prometheus::Registry, + ) -> Result { let pool_count = 1 << config.pool_count_bits; let capacity = config.capacity >> config.pool_count_bits; + let metrics = Arc::new(Metrics::new(registry)); + let stores = (0..pool_count) - .map(|pool| S::open(pool, config.store_config.clone())) + .map(|pool| S::open(pool, config.store_config.clone(), metrics.clone())) .collect_vec(); let stores = try_join_all(stores).await?; @@ -103,6 +115,7 @@ where size: 0, handles: BTreeMap::new(), store, + _metrics: metrics.clone(), }) .map(Mutex::new) .collect_vec(); @@ -110,10 +123,13 @@ where Ok(Self { pool_count_bits: config.pool_count_bits, pools, + metrics, }) } pub async fn insert(&self, index: I, data: D) -> Result { + let _timer = self.metrics.latency_insert.start_timer(); + let mut pool = self.pool(&index).await; if pool.handles.get(&index).is_some() { @@ -131,6 +147,8 @@ where } pub async fn remove(&self, index: &I) -> Result { + let _timer = self.metrics.latency_remove.start_timer(); + let mut pool = self.pool(index).await; if pool.handles.get(index).is_none() { @@ -144,9 +162,17 @@ where } pub async fn get(&self, index: &I) -> Result> { + let _timer = self.metrics.latency_get.start_timer(); + let mut pool = self.pool(index).await; - pool.get(index).await + let res = pool.get(index).await?; + + if res.is_none() { + self.metrics.miss.inc(); + } + + Ok(res) } // TODO(MrCroxx): optimize this @@ -186,6 +212,8 @@ where handles: BTreeMap>, store: S, + + _metrics: Arc, } impl Pool diff --git a/foyer/src/metrics.rs b/foyer/src/metrics.rs index aa84a4cb..09f92107 100644 --- a/foyer/src/metrics.rs +++ b/foyer/src/metrics.rs @@ -13,15 +13,30 @@ // limitations under the License. use prometheus::{ - register_counter_vec_with_registry, register_histogram_vec_with_registry, - register_int_counter_with_registry, CounterVec, HistogramVec, IntCounter, Registry, + register_counter_vec_with_registry, register_gauge_with_registry, + register_histogram_vec_with_registry, register_int_counter_with_registry, Counter, CounterVec, + Gauge, Histogram, HistogramVec, IntCounter, Registry, }; pub struct Metrics { + pub latency_insert: Histogram, + pub latency_get: Histogram, + pub latency_remove: Histogram, + + pub latency_store: Histogram, + pub latency_load: Histogram, + pub latency_delete: Histogram, + + pub bytes_store: Counter, + pub bytes_load: Counter, + pub bytes_delete: Counter, + + pub cache_data_size: Gauge, + pub miss: IntCounter, - pub latency: HistogramVec, - pub bytes: CounterVec, + _latency: HistogramVec, + _bytes: CounterVec, } impl Default for Metrics { @@ -32,10 +47,6 @@ impl Default for Metrics { impl Metrics { pub fn new(registry: Registry) -> Self { - let miss = - register_int_counter_with_registry!("foyer_cache_miss", "file cache miss", registry) - .unwrap(); - let latency = register_histogram_vec_with_registry!( "foyer_latency", "foyer latency", @@ -52,10 +63,47 @@ impl Metrics { register_counter_vec_with_registry!("foyer_bytes", "foyer bytes", &["op"], registry) .unwrap(); + let latency_insert = latency.with_label_values(&["insert"]); + let latency_get = latency.with_label_values(&["get"]); + let latency_remove = latency.with_label_values(&["remove"]); + + let latency_store = latency.with_label_values(&["store"]); + let latency_load = latency.with_label_values(&["load"]); + let latency_delete = latency.with_label_values(&["delete"]); + + let bytes_store = bytes.with_label_values(&["store"]); + let bytes_load = bytes.with_label_values(&["load"]); + let bytes_delete = bytes.with_label_values(&["delete"]); + + let miss = + register_int_counter_with_registry!("foyer_cache_miss", "foyer cache miss", registry) + .unwrap(); + let cache_data_size = register_gauge_with_registry!( + "foyer_cache_data_size", + "foyer cache data size", + registry + ) + .unwrap(); + Self { + latency_insert, + latency_get, + latency_remove, + + latency_store, + latency_load, + latency_delete, + + bytes_store, + bytes_load, + bytes_delete, + + cache_data_size, + miss, - latency, - bytes, + + _latency: latency, + _bytes: bytes, } } } diff --git a/foyer/src/store/mod.rs b/foyer/src/store/mod.rs index e5d46e20..69a861f7 100644 --- a/foyer/src/store/mod.rs +++ b/foyer/src/store/mod.rs @@ -17,7 +17,9 @@ pub mod error; pub mod file; pub mod read_only_file_store; -use crate::{Data, Index}; +use std::sync::Arc; + +use crate::{Data, Index, Metrics}; use async_trait::async_trait; use error::Result; @@ -28,7 +30,7 @@ pub trait Store: Send + Sync + Sized + 'static { type D: Data; type C: Send + Sync + Clone + std::fmt::Debug + 'static; - async fn open(pool: usize, config: Self::C) -> Result; + async fn open(pool: usize, config: Self::C, metrics: Arc) -> Result; async fn store(&self, index: Self::I, data: Self::D) -> Result<()>; @@ -81,7 +83,7 @@ pub mod tests { type C = (); - async fn open(_pool: usize, _: Self::C) -> Result { + async fn open(_pool: usize, _: Self::C, _metrics: Arc) -> Result { Ok(Self::new()) } diff --git a/foyer/src/store/read_only_file_store.rs b/foyer/src/store/read_only_file_store.rs index c75c5ed8..e68d72cf 100644 --- a/foyer/src/store/read_only_file_store.rs +++ b/foyer/src/store/read_only_file_store.rs @@ -28,7 +28,7 @@ use itertools::Itertools; use rand::{thread_rng, Rng}; use tokio::sync::{RwLock, RwLockWriteGuard}; -use crate::{Data, Index}; +use crate::{Data, Index, Metrics}; use super::error::Result; use super::file::{AppendableFile, Location, ReadableFile, WritableFile}; @@ -102,6 +102,8 @@ where size: Arc, + metrics: Arc, + _marker: PhantomData, } @@ -117,6 +119,7 @@ where indices: Arc::clone(&self.indices), files: Arc::clone(&self.files), size: Arc::clone(&self.size), + metrics: Arc::clone(&self.metrics), _marker: PhantomData, } } @@ -134,7 +137,7 @@ where type C = Config; - async fn open(pool: usize, mut config: Self::C) -> Result { + async fn open(pool: usize, mut config: Self::C, metrics: Arc) -> Result { config.dir = config.dir.join(format!("{:04}", pool)); let ids = asyncify({ @@ -182,12 +185,15 @@ where indices: Arc::new(RwLock::new(indices)), files: Arc::new(RwLock::new(files)), size: Arc::new(AtomicUsize::new(size)), + metrics, _marker: PhantomData, }) } #[allow(clippy::uninit_vec)] async fn store(&self, index: Self::I, data: Self::D) -> Result<()> { + let _timer = self.metrics.latency_store.start_timer(); + // append cache file and meta file let (fid, sid, location) = { // randomly drop if size exceeds the threshold @@ -234,8 +240,13 @@ where drop(indices); } - self.size - .fetch_add(location.len as usize, Ordering::Relaxed); + let cache_data_size = self + .size + .fetch_add(location.len as usize, Ordering::Relaxed) + + location.len as usize; + + self.metrics.bytes_store.inc_by(location.len as f64); + self.metrics.cache_data_size.set(cache_data_size as f64); if active_file_size >= self.config.max_file_size { let files = self.files.write().await; @@ -251,6 +262,8 @@ where } async fn load(&self, index: &Self::I) -> Result> { + let _timer = self.metrics.latency_load.start_timer(); + // TODO(MrCroxx): add bloom filters ? let (fid, _sid, location) = { let indices = self.indices.read().await; @@ -283,12 +296,16 @@ where } }; + self.metrics.bytes_load.inc_by(location.len as f64); + self.maybe_trigger_reclaim().await?; Ok(Some(buf.into())) } async fn delete(&self, index: &Self::I) -> Result<()> { + let _timer = self.metrics.latency_delete.start_timer(); + let (fid, sid, location) = { let indices = self.indices.read().await; let (fid, sid, location) = match indices.get(index) { @@ -318,8 +335,13 @@ where } } - self.size - .fetch_sub(location.len as usize, Ordering::Relaxed); + let cache_data_size = self + .size + .fetch_sub(location.len as usize, Ordering::Relaxed) + - location.len as usize; + + self.metrics.bytes_delete.inc_by(location.len as f64); + self.metrics.cache_data_size.set(cache_data_size as f64); self.maybe_trigger_reclaim().await?; @@ -376,7 +398,8 @@ where size as usize }; - self.size.fetch_sub(size, Ordering::Relaxed); + let cache_data_size = self.size.fetch_sub(size, Ordering::Relaxed) - size; + self.metrics.cache_data_size.set(cache_data_size as f64); meta_file.reclaim().await?; cache_file.reclaim().await?; @@ -520,7 +543,9 @@ mod tests { }; let store: ReadOnlyFileStore> = - ReadOnlyFileStore::open(0, config).await.unwrap(); + ReadOnlyFileStore::open(0, config, Arc::new(Metrics::default())) + .await + .unwrap(); store.store(1, data(1, 1024)).await.unwrap(); assert_eq!(store.load(&1).await.unwrap(), Some(data(1, 1024))); @@ -572,7 +597,9 @@ mod tests { }; let store: ReadOnlyFileStore> = - ReadOnlyFileStore::open(0, config.clone()).await.unwrap(); + ReadOnlyFileStore::open(0, config.clone(), Arc::new(Metrics::default())) + .await + .unwrap(); for i in 0..20 { store.store(i, data(i as u8, 1024)).await.unwrap(); @@ -589,7 +616,9 @@ mod tests { drop(store); let store: ReadOnlyFileStore> = - ReadOnlyFileStore::open(0, config).await.unwrap(); + ReadOnlyFileStore::open(0, config, Arc::new(Metrics::default())) + .await + .unwrap(); assert_eq!(store.files.read().await.frozens.len(), 3); for i in 0..12 {