Skip to content

Commit

Permalink
feat: add metrics (#17)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored May 29, 2023
1 parent 80c40de commit b5e3185
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 26 deletions.
34 changes: 31 additions & 3 deletions foyer/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::BTreeMap;
use std::hash::Hasher;

use std::ptr::NonNull;
use std::sync::Arc;

use futures::future::try_join_all;
use itertools::Itertools;
Expand All @@ -24,7 +25,7 @@ use twox_hash::XxHash64;

use crate::policies::{Handle, Policy};
use crate::store::Store;
use crate::{Data, Index, WrappedNonNull};
use crate::{Data, Index, Metrics, WrappedNonNull};

// TODO(MrCroxx): wrap own result type
use crate::store::error::Result;
Expand Down Expand Up @@ -73,6 +74,8 @@ where
{
pool_count_bits: usize,
pools: Vec<Mutex<Pool<I, P, H, D, S>>>,

metrics: Arc<Metrics>,
}

impl<I, P, H, D, S> Container<I, P, H, D, S>
Expand All @@ -85,11 +88,20 @@ where
S: Store<I = I, D = D>,
{
pub async fn open(config: Config<I, P, H, S>) -> Result<Self> {
Self::open_with_registry(config, prometheus::Registry::new()).await
}

pub async fn open_with_registry(
config: Config<I, P, H, S>,
registry: prometheus::Registry,
) -> Result<Self> {
let pool_count = 1 << config.pool_count_bits;
let capacity = config.capacity >> config.pool_count_bits;

let metrics = Arc::new(Metrics::new(registry));

let stores = (0..pool_count)
.map(|pool| S::open(pool, config.store_config.clone()))
.map(|pool| S::open(pool, config.store_config.clone(), metrics.clone()))
.collect_vec();
let stores = try_join_all(stores).await?;

Expand All @@ -103,17 +115,21 @@ where
size: 0,
handles: BTreeMap::new(),
store,
_metrics: metrics.clone(),
})
.map(Mutex::new)
.collect_vec();

Ok(Self {
pool_count_bits: config.pool_count_bits,
pools,
metrics,
})
}

pub async fn insert(&self, index: I, data: D) -> Result<bool> {
let _timer = self.metrics.latency_insert.start_timer();

let mut pool = self.pool(&index).await;

if pool.handles.get(&index).is_some() {
Expand All @@ -131,6 +147,8 @@ where
}

pub async fn remove(&self, index: &I) -> Result<bool> {
let _timer = self.metrics.latency_remove.start_timer();

let mut pool = self.pool(index).await;

if pool.handles.get(index).is_none() {
Expand All @@ -144,9 +162,17 @@ where
}

pub async fn get(&self, index: &I) -> Result<Option<D>> {
let _timer = self.metrics.latency_get.start_timer();

let mut pool = self.pool(index).await;

pool.get(index).await
let res = pool.get(index).await?;

if res.is_none() {
self.metrics.miss.inc();
}

Ok(res)
}

// TODO(MrCroxx): optimize this
Expand Down Expand Up @@ -186,6 +212,8 @@ where
handles: BTreeMap<I, PoolHandle<I, H>>,

store: S,

_metrics: Arc<Metrics>,
}

impl<I, P, H, D, S> Pool<I, P, H, D, S>
Expand Down
68 changes: 58 additions & 10 deletions foyer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,30 @@
// limitations under the License.

use prometheus::{
register_counter_vec_with_registry, register_histogram_vec_with_registry,
register_int_counter_with_registry, CounterVec, HistogramVec, IntCounter, Registry,
register_counter_vec_with_registry, register_gauge_with_registry,
register_histogram_vec_with_registry, register_int_counter_with_registry, Counter, CounterVec,
Gauge, Histogram, HistogramVec, IntCounter, Registry,
};

pub struct Metrics {
pub latency_insert: Histogram,
pub latency_get: Histogram,
pub latency_remove: Histogram,

pub latency_store: Histogram,
pub latency_load: Histogram,
pub latency_delete: Histogram,

pub bytes_store: Counter,
pub bytes_load: Counter,
pub bytes_delete: Counter,

pub cache_data_size: Gauge,

pub miss: IntCounter,

pub latency: HistogramVec,
pub bytes: CounterVec,
_latency: HistogramVec,
_bytes: CounterVec,
}

impl Default for Metrics {
Expand All @@ -32,10 +47,6 @@ impl Default for Metrics {

impl Metrics {
pub fn new(registry: Registry) -> Self {
let miss =
register_int_counter_with_registry!("foyer_cache_miss", "file cache miss", registry)
.unwrap();

let latency = register_histogram_vec_with_registry!(
"foyer_latency",
"foyer latency",
Expand All @@ -52,10 +63,47 @@ impl Metrics {
register_counter_vec_with_registry!("foyer_bytes", "foyer bytes", &["op"], registry)
.unwrap();

let latency_insert = latency.with_label_values(&["insert"]);
let latency_get = latency.with_label_values(&["get"]);
let latency_remove = latency.with_label_values(&["remove"]);

let latency_store = latency.with_label_values(&["store"]);
let latency_load = latency.with_label_values(&["load"]);
let latency_delete = latency.with_label_values(&["delete"]);

let bytes_store = bytes.with_label_values(&["store"]);
let bytes_load = bytes.with_label_values(&["load"]);
let bytes_delete = bytes.with_label_values(&["delete"]);

let miss =
register_int_counter_with_registry!("foyer_cache_miss", "foyer cache miss", registry)
.unwrap();
let cache_data_size = register_gauge_with_registry!(
"foyer_cache_data_size",
"foyer cache data size",
registry
)
.unwrap();

Self {
latency_insert,
latency_get,
latency_remove,

latency_store,
latency_load,
latency_delete,

bytes_store,
bytes_load,
bytes_delete,

cache_data_size,

miss,
latency,
bytes,

_latency: latency,
_bytes: bytes,
}
}
}
8 changes: 5 additions & 3 deletions foyer/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ pub mod error;
pub mod file;
pub mod read_only_file_store;

use crate::{Data, Index};
use std::sync::Arc;

use crate::{Data, Index, Metrics};
use async_trait::async_trait;

use error::Result;
Expand All @@ -28,7 +30,7 @@ pub trait Store: Send + Sync + Sized + 'static {
type D: Data;
type C: Send + Sync + Clone + std::fmt::Debug + 'static;

async fn open(pool: usize, config: Self::C) -> Result<Self>;
async fn open(pool: usize, config: Self::C, metrics: Arc<Metrics>) -> Result<Self>;

async fn store(&self, index: Self::I, data: Self::D) -> Result<()>;

Expand Down Expand Up @@ -81,7 +83,7 @@ pub mod tests {

type C = ();

async fn open(_pool: usize, _: Self::C) -> Result<Self> {
async fn open(_pool: usize, _: Self::C, _metrics: Arc<Metrics>) -> Result<Self> {
Ok(Self::new())
}

Expand Down
49 changes: 39 additions & 10 deletions foyer/src/store/read_only_file_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use itertools::Itertools;
use rand::{thread_rng, Rng};
use tokio::sync::{RwLock, RwLockWriteGuard};

use crate::{Data, Index};
use crate::{Data, Index, Metrics};

use super::error::Result;
use super::file::{AppendableFile, Location, ReadableFile, WritableFile};
Expand Down Expand Up @@ -102,6 +102,8 @@ where

size: Arc<AtomicUsize>,

metrics: Arc<Metrics>,

_marker: PhantomData<D>,
}

Expand All @@ -117,6 +119,7 @@ where
indices: Arc::clone(&self.indices),
files: Arc::clone(&self.files),
size: Arc::clone(&self.size),
metrics: Arc::clone(&self.metrics),
_marker: PhantomData,
}
}
Expand All @@ -134,7 +137,7 @@ where

type C = Config;

async fn open(pool: usize, mut config: Self::C) -> Result<Self> {
async fn open(pool: usize, mut config: Self::C, metrics: Arc<Metrics>) -> Result<Self> {
config.dir = config.dir.join(format!("{:04}", pool));

let ids = asyncify({
Expand Down Expand Up @@ -182,12 +185,15 @@ where
indices: Arc::new(RwLock::new(indices)),
files: Arc::new(RwLock::new(files)),
size: Arc::new(AtomicUsize::new(size)),
metrics,
_marker: PhantomData,
})
}

#[allow(clippy::uninit_vec)]
async fn store(&self, index: Self::I, data: Self::D) -> Result<()> {
let _timer = self.metrics.latency_store.start_timer();

// append cache file and meta file
let (fid, sid, location) = {
// randomly drop if size exceeds the threshold
Expand Down Expand Up @@ -234,8 +240,13 @@ where
drop(indices);
}

self.size
.fetch_add(location.len as usize, Ordering::Relaxed);
let cache_data_size = self
.size
.fetch_add(location.len as usize, Ordering::Relaxed)
+ location.len as usize;

self.metrics.bytes_store.inc_by(location.len as f64);
self.metrics.cache_data_size.set(cache_data_size as f64);

if active_file_size >= self.config.max_file_size {
let files = self.files.write().await;
Expand All @@ -251,6 +262,8 @@ where
}

async fn load(&self, index: &Self::I) -> Result<Option<Self::D>> {
let _timer = self.metrics.latency_load.start_timer();

// TODO(MrCroxx): add bloom filters ?
let (fid, _sid, location) = {
let indices = self.indices.read().await;
Expand Down Expand Up @@ -283,12 +296,16 @@ where
}
};

self.metrics.bytes_load.inc_by(location.len as f64);

self.maybe_trigger_reclaim().await?;

Ok(Some(buf.into()))
}

async fn delete(&self, index: &Self::I) -> Result<()> {
let _timer = self.metrics.latency_delete.start_timer();

let (fid, sid, location) = {
let indices = self.indices.read().await;
let (fid, sid, location) = match indices.get(index) {
Expand Down Expand Up @@ -318,8 +335,13 @@ where
}
}

self.size
.fetch_sub(location.len as usize, Ordering::Relaxed);
let cache_data_size = self
.size
.fetch_sub(location.len as usize, Ordering::Relaxed)
- location.len as usize;

self.metrics.bytes_delete.inc_by(location.len as f64);
self.metrics.cache_data_size.set(cache_data_size as f64);

self.maybe_trigger_reclaim().await?;

Expand Down Expand Up @@ -376,7 +398,8 @@ where
size as usize
};

self.size.fetch_sub(size, Ordering::Relaxed);
let cache_data_size = self.size.fetch_sub(size, Ordering::Relaxed) - size;
self.metrics.cache_data_size.set(cache_data_size as f64);

meta_file.reclaim().await?;
cache_file.reclaim().await?;
Expand Down Expand Up @@ -520,7 +543,9 @@ mod tests {
};

let store: ReadOnlyFileStore<u64, Vec<u8>> =
ReadOnlyFileStore::open(0, config).await.unwrap();
ReadOnlyFileStore::open(0, config, Arc::new(Metrics::default()))
.await
.unwrap();

store.store(1, data(1, 1024)).await.unwrap();
assert_eq!(store.load(&1).await.unwrap(), Some(data(1, 1024)));
Expand Down Expand Up @@ -572,7 +597,9 @@ mod tests {
};

let store: ReadOnlyFileStore<u64, Vec<u8>> =
ReadOnlyFileStore::open(0, config.clone()).await.unwrap();
ReadOnlyFileStore::open(0, config.clone(), Arc::new(Metrics::default()))
.await
.unwrap();

for i in 0..20 {
store.store(i, data(i as u8, 1024)).await.unwrap();
Expand All @@ -589,7 +616,9 @@ mod tests {
drop(store);

let store: ReadOnlyFileStore<u64, Vec<u8>> =
ReadOnlyFileStore::open(0, config).await.unwrap();
ReadOnlyFileStore::open(0, config, Arc::new(Metrics::default()))
.await
.unwrap();

assert_eq!(store.files.read().await.frozens.len(), 3);
for i in 0..12 {
Expand Down

0 comments on commit b5e3185

Please sign in to comment.