Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce metrics for serde #666

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion etc/grafana/dashboards/foyer.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion foyer-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ opentelemetry_sdk = { version = "0.24", features = [
parking_lot = "0.12"
rand = "0.8.5"
serde = { workspace = true }
serde_bytes = "0.11.14"
serde_bytes = "0.11.15"
tokio = { workspace = true }
tracing = "0.1"
tracing-opentelemetry = { version = "0.25", optional = true }
Expand Down
9 changes: 9 additions & 0 deletions foyer-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub struct Metrics {

pub storage_region_size_bytes: Gauge,

pub storage_entry_serialize_duration: Histogram,
pub storage_entry_deserialize_duration: Histogram,

/* hybrid cache metrics */
pub hybrid_insert: Counter,
pub hybrid_hit: Counter,
Expand Down Expand Up @@ -156,6 +159,10 @@ impl Metrics {

let storage_region_size_bytes = gauge!(format!("foyer_storage_region_size_bytes"), "name" => name.to_string());

let storage_entry_serialize_duration =
histogram!(format!("foyer_storage_entry_serde_duration"), "name" => name.to_string(), "op" => "serialize");
let storage_entry_deserialize_duration = histogram!(format!("foyer_storage_entry_serde_duration"), "name" => name.to_string(), "op" => "deserialize");

/* hybrid cache metrics */

let hybrid_insert = counter!(format!("foyer_hybrid_op_total"), "name" => name.to_string(), "op" => "insert");
Expand Down Expand Up @@ -210,6 +217,8 @@ impl Metrics {
storage_region_clean,
storage_region_evictable,
storage_region_size_bytes,
storage_entry_serialize_duration,
storage_entry_deserialize_duration,

hybrid_insert,
hybrid_hit,
Expand Down
14 changes: 12 additions & 2 deletions foyer-storage/src/large/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ where
&indexer,
&region_manager,
&tombstones,
metrics.clone(),
config.user_runtime_handle.clone(),
)
.await?;
Expand Down Expand Up @@ -256,6 +257,7 @@ where
flushers.clone(),
stats.clone(),
config.flush,
metrics.clone(),
config.write_runtime_handle.clone(),
)
.await
Expand Down Expand Up @@ -346,6 +348,7 @@ where
header.value_len as _,
header.compression,
Some(header.checksum),
&metrics,
) {
Ok(res) => res,
Err(e) => match e {
Expand Down Expand Up @@ -499,7 +502,7 @@ mod tests {
},
picker::utils::{FifoPicker, RejectAllPicker},
serde::EntrySerializer,
test_utils::BiasedPicker,
test_utils::{metrics_for_test, BiasedPicker},
IoBytesMut, TombstoneLogConfigBuilder,
};

Expand Down Expand Up @@ -594,7 +597,14 @@ mod tests {

fn enqueue(store: &GenericLargeStorage<u64, Vec<u8>, RandomState>, entry: CacheEntry<u64, Vec<u8>, RandomState>) {
let mut buffer = IoBytesMut::new();
let info = EntrySerializer::serialize(entry.key(), entry.value(), &Compression::None, &mut buffer).unwrap();
let info = EntrySerializer::serialize(
entry.key(),
entry.value(),
&Compression::None,
&mut buffer,
metrics_for_test(),
)
.unwrap();
let buffer = buffer.freeze();
store.enqueue(entry, buffer, info);
}
Expand Down
11 changes: 9 additions & 2 deletions foyer-storage/src/large/reclaimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

use std::{fmt::Debug, future::Future, sync::Arc, time::Duration};

use foyer_common::code::{HashBuilder, StorageKey, StorageValue};
use foyer_common::{
code::{HashBuilder, StorageKey, StorageValue},
metrics::Metrics,
};
use futures::future::join_all;
use itertools::Itertools;
use tokio::{
Expand Down Expand Up @@ -53,6 +56,7 @@ impl Reclaimer {
flushers: Vec<Flusher<K, V, S>>,
stats: Arc<Statistics>,
flush: bool,
metrics: Arc<Metrics>,
runtime: Handle,
) -> Self
where
Expand All @@ -70,6 +74,7 @@ impl Reclaimer {
reinsertion_picker,
stats,
flush,
metrics,
wait_rx,
runtime: runtime.clone(),
};
Expand Down Expand Up @@ -108,6 +113,8 @@ where

flush: bool,

metrics: Arc<Metrics>,

wait_rx: mpsc::UnboundedReceiver<oneshot::Sender<()>>,

runtime: Handle,
Expand Down Expand Up @@ -167,7 +174,7 @@ where

tracing::debug!("[reclaimer]: Start reclaiming region {id}.");

let mut scanner = RegionScanner::new(region.clone());
let mut scanner = RegionScanner::new(region.clone(), self.metrics.clone());
let mut picked_count = 0;
let mut unpicked = vec![];
// The loop will ends when:
Expand Down
11 changes: 8 additions & 3 deletions foyer-storage/src/large/recover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::Arc;

use clap::ValueEnum;
use foyer_common::code::{HashBuilder, StorageKey, StorageValue};
use foyer_common::metrics::Metrics;
use futures::future::try_join_all;

use itertools::Itertools;
Expand Down Expand Up @@ -61,13 +62,16 @@ pub enum RecoverMode {
pub struct RecoverRunner;

impl RecoverRunner {
// TODO(MrCroxx): use `expect` after `lint_reasons` is stable.
#[allow(clippy::too_many_arguments)]
pub async fn run<K, V, S>(
config: &GenericLargeStorageConfig<K, V, S>,
regions: Range<RegionId>,
sequence: &AtomicSequence,
indexer: &Indexer,
region_manager: &RegionManager,
tombstones: &[Tombstone],
metrics: Arc<Metrics>,
runtime: Handle,
) -> Result<()>
where
Expand All @@ -81,9 +85,10 @@ impl RecoverRunner {
let handles = regions.map(|id| {
let semaphore = semaphore.clone();
let region = region_manager.region(id).clone();
let metrics = metrics.clone();
runtime.spawn(async move {
let permit = semaphore.acquire().await;
let res = RegionRecoverRunner::run(mode, region).await;
let res = RegionRecoverRunner::run(mode, region, metrics).await;
drop(permit);
res
})
Expand Down Expand Up @@ -194,15 +199,15 @@ impl RecoverRunner {
struct RegionRecoverRunner;

impl RegionRecoverRunner {
async fn run(mode: RecoverMode, region: Region) -> Result<Vec<EntryInfo>> {
async fn run(mode: RecoverMode, region: Region, metrics: Arc<Metrics>) -> Result<Vec<EntryInfo>> {
if mode == RecoverMode::None {
return Ok(vec![]);
}

let mut infos = vec![];

let id = region.id();
let mut iter = RegionScanner::new(region);
let mut iter = RegionScanner::new(region, metrics);
loop {
let r = iter.next().await;
match r {
Expand Down
8 changes: 7 additions & 1 deletion foyer-storage/src/large/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use foyer_common::{
bits,
code::{StorageKey, StorageValue},
metrics::Metrics,
strict_assert,
};

Expand Down Expand Up @@ -83,15 +86,17 @@
region: Region,
offset: u64,
cache: CachedDeviceReader,
metrics: Arc<Metrics>,
}

impl RegionScanner {
pub fn new(region: Region) -> Self {
pub fn new(region: Region, metrics: Arc<Metrics>) -> Self {
let cache = CachedDeviceReader::new(region.clone());
Self {
region,
offset: 0,
cache,
metrics,
}
}

Expand Down Expand Up @@ -216,6 +221,7 @@
header.value_len as _,
header.compression,
Some(header.checksum),
&self.metrics,

Check warning on line 224 in foyer-storage/src/large/scanner.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/large/scanner.rs#L224

Added line #L224 was not covered by tests
)?;

self.step(&header).await;
Expand Down
17 changes: 15 additions & 2 deletions foyer-storage/src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{fmt::Debug, hash::Hasher};
use std::{fmt::Debug, hash::Hasher, time::Instant};
use twox_hash::XxHash64;

use foyer_common::code::{StorageKey, StorageValue};
use foyer_common::{
code::{StorageKey, StorageValue},
metrics::Metrics,
};

use crate::{
compress::Compression,
Expand Down Expand Up @@ -51,11 +54,14 @@ impl EntrySerializer {
value: &'a V,
compression: &'a Compression,
mut buffer: &'a mut IoBytesMut,
metrics: &Metrics,
) -> Result<KvInfo>
where
K: StorageKey,
V: StorageValue,
{
let now = Instant::now();

let mut cursor = buffer.len();

// serialize value
Expand Down Expand Up @@ -85,6 +91,8 @@ impl EntrySerializer {
bincode::serialize_into(&mut buffer, &key).map_err(Error::from)?;
let key_len = buffer.len() - cursor;

metrics.storage_entry_serialize_duration.record(now.elapsed());

Ok(KvInfo { key_len, value_len })
}
}
Expand All @@ -100,11 +108,14 @@ impl EntryDeserializer {
value_len: usize,
compression: Compression,
checksum: Option<u64>,
metrics: &Metrics,
) -> Result<(K, V)>
where
K: StorageKey,
V: StorageValue,
{
let now = Instant::now();

// deserialize value
let buf = &buffer[..value_len];
let value = Self::deserialize_value(buf, compression)?;
Expand All @@ -121,6 +132,8 @@ impl EntryDeserializer {
}
}

metrics.storage_entry_deserialize_duration.record(now.elapsed());

Ok((key, value))
}

Expand Down
8 changes: 7 additions & 1 deletion foyer-storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ where
self.inner.write_runtime_handle.spawn(async move {
if force || this.pick(entry.key()) {
let mut buffer = IoBytesMut::new();
match EntrySerializer::serialize(entry.key(), entry.value(), &compression, &mut buffer) {
match EntrySerializer::serialize(
entry.key(),
entry.value(),
&compression,
&mut buffer,
&this.inner.metrics,
) {
Ok(info) => {
let buffer = buffer.freeze();
this.inner.engine.enqueue(entry, buffer, info);
Expand Down
19 changes: 17 additions & 2 deletions foyer-storage/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,31 @@

//! Test utils for the `foyer-storage` crate.

use std::{borrow::Borrow, collections::HashSet, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc};
use std::{
borrow::Borrow,
collections::HashSet,
fmt::Debug,
hash::Hash,
marker::PhantomData,
sync::{Arc, OnceLock},
};

use foyer_common::code::StorageKey;
use foyer_common::{code::StorageKey, metrics::Metrics};
use parking_lot::Mutex;

use crate::{
picker::{AdmissionPicker, ReinsertionPicker},
statistics::Statistics,
};

/// A phantom metrics for test.
static METRICS_FOR_TEST: OnceLock<Metrics> = OnceLock::new();

/// Get a phantom metrics for test.
pub fn metrics_for_test() -> &'static Metrics {
METRICS_FOR_TEST.get_or_init(|| Metrics::new("test"))
}

/// A picker that only admits key from the given list.
pub struct BiasedPicker<K, Q> {
admits: HashSet<Q>,
Expand Down
Loading