Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metrics #17

Merged
merged 1 commit into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions foyer/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::BTreeMap;
use std::hash::Hasher;

use std::ptr::NonNull;
use std::sync::Arc;

use futures::future::try_join_all;
use itertools::Itertools;
Expand All @@ -24,7 +25,7 @@ use twox_hash::XxHash64;

use crate::policies::{Handle, Policy};
use crate::store::Store;
use crate::{Data, Index, WrappedNonNull};
use crate::{Data, Index, Metrics, WrappedNonNull};

// TODO(MrCroxx): wrap own result type
use crate::store::error::Result;
Expand Down Expand Up @@ -73,6 +74,8 @@ where
{
pool_count_bits: usize,
pools: Vec<Mutex<Pool<I, P, H, D, S>>>,

metrics: Arc<Metrics>,
}

impl<I, P, H, D, S> Container<I, P, H, D, S>
Expand All @@ -85,11 +88,20 @@ where
S: Store<I = I, D = D>,
{
pub async fn open(config: Config<I, P, H, S>) -> Result<Self> {
Self::open_with_registry(config, prometheus::Registry::new()).await
}

pub async fn open_with_registry(
config: Config<I, P, H, S>,
registry: prometheus::Registry,
) -> Result<Self> {
let pool_count = 1 << config.pool_count_bits;
let capacity = config.capacity >> config.pool_count_bits;

let metrics = Arc::new(Metrics::new(registry));

let stores = (0..pool_count)
.map(|pool| S::open(pool, config.store_config.clone()))
.map(|pool| S::open(pool, config.store_config.clone(), metrics.clone()))
.collect_vec();
let stores = try_join_all(stores).await?;

Expand All @@ -103,17 +115,21 @@ where
size: 0,
handles: BTreeMap::new(),
store,
_metrics: metrics.clone(),
})
.map(Mutex::new)
.collect_vec();

Ok(Self {
pool_count_bits: config.pool_count_bits,
pools,
metrics,
})
}

pub async fn insert(&self, index: I, data: D) -> Result<bool> {
let _timer = self.metrics.latency_insert.start_timer();

let mut pool = self.pool(&index).await;

if pool.handles.get(&index).is_some() {
Expand All @@ -131,6 +147,8 @@ where
}

pub async fn remove(&self, index: &I) -> Result<bool> {
let _timer = self.metrics.latency_remove.start_timer();

let mut pool = self.pool(index).await;

if pool.handles.get(index).is_none() {
Expand All @@ -144,9 +162,17 @@ where
}

pub async fn get(&self, index: &I) -> Result<Option<D>> {
let _timer = self.metrics.latency_get.start_timer();

let mut pool = self.pool(index).await;

pool.get(index).await
let res = pool.get(index).await?;

if res.is_none() {
self.metrics.miss.inc();
}

Ok(res)
}

// TODO(MrCroxx): optimize this
Expand Down Expand Up @@ -186,6 +212,8 @@ where
handles: BTreeMap<I, PoolHandle<I, H>>,

store: S,

_metrics: Arc<Metrics>,
}

impl<I, P, H, D, S> Pool<I, P, H, D, S>
Expand Down
68 changes: 58 additions & 10 deletions foyer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,30 @@
// limitations under the License.

use prometheus::{
register_counter_vec_with_registry, register_histogram_vec_with_registry,
register_int_counter_with_registry, CounterVec, HistogramVec, IntCounter, Registry,
register_counter_vec_with_registry, register_gauge_with_registry,
register_histogram_vec_with_registry, register_int_counter_with_registry, Counter, CounterVec,
Gauge, Histogram, HistogramVec, IntCounter, Registry,
};

pub struct Metrics {
pub latency_insert: Histogram,
pub latency_get: Histogram,
pub latency_remove: Histogram,

pub latency_store: Histogram,
pub latency_load: Histogram,
pub latency_delete: Histogram,

pub bytes_store: Counter,
pub bytes_load: Counter,
pub bytes_delete: Counter,

pub cache_data_size: Gauge,

pub miss: IntCounter,

pub latency: HistogramVec,
pub bytes: CounterVec,
_latency: HistogramVec,
_bytes: CounterVec,
}

impl Default for Metrics {
Expand All @@ -32,10 +47,6 @@ impl Default for Metrics {

impl Metrics {
pub fn new(registry: Registry) -> Self {
let miss =
register_int_counter_with_registry!("foyer_cache_miss", "file cache miss", registry)
.unwrap();

let latency = register_histogram_vec_with_registry!(
"foyer_latency",
"foyer latency",
Expand All @@ -52,10 +63,47 @@ impl Metrics {
register_counter_vec_with_registry!("foyer_bytes", "foyer bytes", &["op"], registry)
.unwrap();

let latency_insert = latency.with_label_values(&["insert"]);
let latency_get = latency.with_label_values(&["get"]);
let latency_remove = latency.with_label_values(&["remove"]);

let latency_store = latency.with_label_values(&["store"]);
let latency_load = latency.with_label_values(&["load"]);
let latency_delete = latency.with_label_values(&["delete"]);

let bytes_store = bytes.with_label_values(&["store"]);
let bytes_load = bytes.with_label_values(&["load"]);
let bytes_delete = bytes.with_label_values(&["delete"]);

let miss =
register_int_counter_with_registry!("foyer_cache_miss", "foyer cache miss", registry)
.unwrap();
let cache_data_size = register_gauge_with_registry!(
"foyer_cache_data_size",
"foyer cache data size",
registry
)
.unwrap();

Self {
latency_insert,
latency_get,
latency_remove,

latency_store,
latency_load,
latency_delete,

bytes_store,
bytes_load,
bytes_delete,

cache_data_size,

miss,
latency,
bytes,

_latency: latency,
_bytes: bytes,
}
}
}
8 changes: 5 additions & 3 deletions foyer/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ pub mod error;
pub mod file;
pub mod read_only_file_store;

use crate::{Data, Index};
use std::sync::Arc;

use crate::{Data, Index, Metrics};
use async_trait::async_trait;

use error::Result;
Expand All @@ -28,7 +30,7 @@ pub trait Store: Send + Sync + Sized + 'static {
type D: Data;
type C: Send + Sync + Clone + std::fmt::Debug + 'static;

async fn open(pool: usize, config: Self::C) -> Result<Self>;
async fn open(pool: usize, config: Self::C, metrics: Arc<Metrics>) -> Result<Self>;

async fn store(&self, index: Self::I, data: Self::D) -> Result<()>;

Expand Down Expand Up @@ -81,7 +83,7 @@ pub mod tests {

type C = ();

async fn open(_pool: usize, _: Self::C) -> Result<Self> {
async fn open(_pool: usize, _: Self::C, _metrics: Arc<Metrics>) -> Result<Self> {
Ok(Self::new())
}

Expand Down
49 changes: 39 additions & 10 deletions foyer/src/store/read_only_file_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use itertools::Itertools;
use rand::{thread_rng, Rng};
use tokio::sync::{RwLock, RwLockWriteGuard};

use crate::{Data, Index};
use crate::{Data, Index, Metrics};

use super::error::Result;
use super::file::{AppendableFile, Location, ReadableFile, WritableFile};
Expand Down Expand Up @@ -102,6 +102,8 @@ where

size: Arc<AtomicUsize>,

metrics: Arc<Metrics>,

_marker: PhantomData<D>,
}

Expand All @@ -117,6 +119,7 @@ where
indices: Arc::clone(&self.indices),
files: Arc::clone(&self.files),
size: Arc::clone(&self.size),
metrics: Arc::clone(&self.metrics),
_marker: PhantomData,
}
}
Expand All @@ -134,7 +137,7 @@ where

type C = Config;

async fn open(pool: usize, mut config: Self::C) -> Result<Self> {
async fn open(pool: usize, mut config: Self::C, metrics: Arc<Metrics>) -> Result<Self> {
config.dir = config.dir.join(format!("{:04}", pool));

let ids = asyncify({
Expand Down Expand Up @@ -182,12 +185,15 @@ where
indices: Arc::new(RwLock::new(indices)),
files: Arc::new(RwLock::new(files)),
size: Arc::new(AtomicUsize::new(size)),
metrics,
_marker: PhantomData,
})
}

#[allow(clippy::uninit_vec)]
async fn store(&self, index: Self::I, data: Self::D) -> Result<()> {
let _timer = self.metrics.latency_store.start_timer();

// append cache file and meta file
let (fid, sid, location) = {
// randomly drop if size exceeds the threshold
Expand Down Expand Up @@ -234,8 +240,13 @@ where
drop(indices);
}

self.size
.fetch_add(location.len as usize, Ordering::Relaxed);
let cache_data_size = self
.size
.fetch_add(location.len as usize, Ordering::Relaxed)
+ location.len as usize;

self.metrics.bytes_store.inc_by(location.len as f64);
self.metrics.cache_data_size.set(cache_data_size as f64);

if active_file_size >= self.config.max_file_size {
let files = self.files.write().await;
Expand All @@ -251,6 +262,8 @@ where
}

async fn load(&self, index: &Self::I) -> Result<Option<Self::D>> {
let _timer = self.metrics.latency_load.start_timer();

// TODO(MrCroxx): add bloom filters ?
let (fid, _sid, location) = {
let indices = self.indices.read().await;
Expand Down Expand Up @@ -283,12 +296,16 @@ where
}
};

self.metrics.bytes_load.inc_by(location.len as f64);

self.maybe_trigger_reclaim().await?;

Ok(Some(buf.into()))
}

async fn delete(&self, index: &Self::I) -> Result<()> {
let _timer = self.metrics.latency_delete.start_timer();

let (fid, sid, location) = {
let indices = self.indices.read().await;
let (fid, sid, location) = match indices.get(index) {
Expand Down Expand Up @@ -318,8 +335,13 @@ where
}
}

self.size
.fetch_sub(location.len as usize, Ordering::Relaxed);
let cache_data_size = self
.size
.fetch_sub(location.len as usize, Ordering::Relaxed)
- location.len as usize;

self.metrics.bytes_delete.inc_by(location.len as f64);
self.metrics.cache_data_size.set(cache_data_size as f64);

self.maybe_trigger_reclaim().await?;

Expand Down Expand Up @@ -376,7 +398,8 @@ where
size as usize
};

self.size.fetch_sub(size, Ordering::Relaxed);
let cache_data_size = self.size.fetch_sub(size, Ordering::Relaxed) - size;
self.metrics.cache_data_size.set(cache_data_size as f64);

meta_file.reclaim().await?;
cache_file.reclaim().await?;
Expand Down Expand Up @@ -520,7 +543,9 @@ mod tests {
};

let store: ReadOnlyFileStore<u64, Vec<u8>> =
ReadOnlyFileStore::open(0, config).await.unwrap();
ReadOnlyFileStore::open(0, config, Arc::new(Metrics::default()))
.await
.unwrap();

store.store(1, data(1, 1024)).await.unwrap();
assert_eq!(store.load(&1).await.unwrap(), Some(data(1, 1024)));
Expand Down Expand Up @@ -572,7 +597,9 @@ mod tests {
};

let store: ReadOnlyFileStore<u64, Vec<u8>> =
ReadOnlyFileStore::open(0, config.clone()).await.unwrap();
ReadOnlyFileStore::open(0, config.clone(), Arc::new(Metrics::default()))
.await
.unwrap();

for i in 0..20 {
store.store(i, data(i as u8, 1024)).await.unwrap();
Expand All @@ -589,7 +616,9 @@ mod tests {
drop(store);

let store: ReadOnlyFileStore<u64, Vec<u8>> =
ReadOnlyFileStore::open(0, config).await.unwrap();
ReadOnlyFileStore::open(0, config, Arc::new(Metrics::default()))
.await
.unwrap();

assert_eq!(store.files.read().await.frozens.len(), 3);
for i in 0..12 {
Expand Down