diff --git a/metrics-util/CHANGELOG.md b/metrics-util/CHANGELOG.md index 9fae5251..cfb680ce 100644 --- a/metrics-util/CHANGELOG.md +++ b/metrics-util/CHANGELOG.md @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate +### Added + +- A new per-thread mode for `DebuggingRecorder` that allows recording metrics on a per-thread basis to better supporting + the testing of metrics in user applications where many tests are concurrently emitting metrics. + ## [0.12.0] - 2022-03-10 ### Added diff --git a/metrics-util/src/debugging.rs b/metrics-util/src/debugging.rs index 0f75e058..f18a5dbb 100644 --- a/metrics-util/src/debugging.rs +++ b/metrics-util/src/debugging.rs @@ -5,6 +5,7 @@ //! metrics in some limited cases. use core::hash::Hash; +use std::cell::RefCell; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use std::{collections::HashMap, fmt::Debug}; @@ -16,6 +17,11 @@ use indexmap::IndexMap; use metrics::{Counter, Gauge, Histogram, Key, KeyName, Recorder, Unit}; use ordered_float::OrderedFloat; +thread_local! { + /// A per-thread version of the debugging registry/state used only when the debugging recorder is configured to run in per-thread mode. + static PER_THREAD_INNER: RefCell> = RefCell::new(None); +} + /// A composite key name that stores both the metric key name and the metric kind. #[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] struct CompositeKeyName(MetricKind, KeyName); @@ -59,11 +65,25 @@ pub enum DebugValue { Histogram(Vec>), } +struct Inner { + registry: Registry, + seen: Mutex>, + metadata: Mutex, &'static str)>>, +} + +impl Inner { + fn new() -> Self { + Self { + registry: Registry::atomic(), + seen: Mutex::new(IndexMap::new()), + metadata: Mutex::new(IndexMap::new()), + } + } +} + /// Captures point-in-time snapshots of [`DebuggingRecorder`]. pub struct Snapshotter { - registry: Arc>, - seen: Arc>>, - metadata: Arc, &'static str)>>>, + inner: Arc, } impl Snapshotter { @@ -71,12 +91,12 @@ impl Snapshotter { pub fn snapshot(&self) -> Snapshot { let mut snapshot = Vec::new(); - let counters = self.registry.get_counter_handles(); - let gauges = self.registry.get_gauge_handles(); - let histograms = self.registry.get_histogram_handles(); + let counters = self.inner.registry.get_counter_handles(); + let gauges = self.inner.registry.get_gauge_handles(); + let histograms = self.inner.registry.get_histogram_handles(); - let seen = self.seen.lock().expect("seen lock poisoned").clone(); - let metadata = self.metadata.lock().expect("metadata lock poisoned").clone(); + let seen = self.inner.seen.lock().expect("seen lock poisoned").clone(); + let metadata = self.inner.metadata.lock().expect("metadata lock poisoned").clone(); for (ck, _) in seen.into_iter() { let value = match ck.kind() { @@ -110,6 +130,59 @@ impl Snapshotter { Snapshot(snapshot) } + + /// Takes a snapshot of the recorder for the current thread only. + /// + /// If no registry exists for the current thread, `None` is returned. Otherwise, `Some(snapshot)` is returned. + pub fn current_thread_snapshot() -> Option { + PER_THREAD_INNER.with(|maybe_inner| match maybe_inner.borrow().as_ref() { + None => None, + Some(inner) => { + let mut snapshot = Vec::new(); + + let counters = inner.registry.get_counter_handles(); + let gauges = inner.registry.get_gauge_handles(); + let histograms = inner.registry.get_histogram_handles(); + + let seen = inner.seen.lock().expect("seen lock poisoned").clone(); + let metadata = inner.metadata.lock().expect("metadata lock poisoned").clone(); + + for (ck, _) in seen.into_iter() { + let value = match ck.kind() { + MetricKind::Counter => counters + .get(ck.key()) + .map(|c| DebugValue::Counter(c.load(Ordering::SeqCst))), + MetricKind::Gauge => gauges.get(ck.key()).map(|g| { + let value = f64::from_bits(g.load(Ordering::SeqCst)); + DebugValue::Gauge(value.into()) + }), + MetricKind::Histogram => histograms.get(ck.key()).map(|h| { + let mut values = Vec::new(); + h.clear_with(|xs| { + values.extend(xs.iter().map(|f| OrderedFloat::from(*f))) + }); + DebugValue::Histogram(values) + }), + }; + + let ckn = CompositeKeyName::new(ck.kind(), ck.key().name().to_string().into()); + let (unit, desc) = metadata + .get(&ckn) + .copied() + .map(|(u, d)| (u, Some(d))) + .unwrap_or_else(|| (None, None)); + + // If there's no value for the key, that means the metric was only ever described, and + // not registered, so don't emit it. + if let Some(value) = value { + snapshot.push((ck, unit, desc, value)); + } + } + + Some(Snapshot(snapshot)) + } + }) + } } /// A simplistic recorder that can be installed and used for debugging or testing. @@ -117,42 +190,81 @@ impl Snapshotter { /// Callers can easily take snapshots of the metrics at any given time and get access /// to the raw values. pub struct DebuggingRecorder { - registry: Arc>, - seen: Arc>>, - metadata: Arc, &'static str)>>>, + inner: Arc, + is_per_thread: bool, } impl DebuggingRecorder { /// Creates a new `DebuggingRecorder`. pub fn new() -> DebuggingRecorder { - DebuggingRecorder { - registry: Arc::new(Registry::atomic()), - seen: Arc::new(Mutex::new(IndexMap::new())), - metadata: Arc::new(Mutex::new(IndexMap::new())), - } + DebuggingRecorder { inner: Arc::new(Inner::new()), is_per_thread: false } + } + + /// Creates a new `DebuggingRecorder` in per-thread mode. + /// + /// This sends all metrics to a per-thread registry, such that the snapshotter will only see metrics emitted in the + /// thread that the `Snapshotter` is used from. Additionally, as keeping a reference to the original `Snapshotter` + /// around can be tricky, [`Snapshotter::current_thread_snapshot`] can be used to get all of the metrics currently + /// present in the registry for the calling thread, if any were emitted. + /// + /// Please note that this recorder must still be installed, but it can be installed multiple times (if the error + /// from `install` is ignored) without clearing or removing any of the existing per-thread metrics, so it's safe to + /// re-create and re-install multiple times in the same test binary if necessary. + pub fn per_thread() -> DebuggingRecorder { + DebuggingRecorder { inner: Arc::new(Inner::new()), is_per_thread: true } } /// Gets a `Snapshotter` attached to this recorder. pub fn snapshotter(&self) -> Snapshotter { - Snapshotter { - registry: self.registry.clone(), - seen: self.seen.clone(), - metadata: self.metadata.clone(), - } + Snapshotter { inner: Arc::clone(&self.inner) } } fn describe_metric(&self, rkey: CompositeKeyName, unit: Option, desc: &'static str) { - let mut metadata = self.metadata.lock().expect("metadata lock poisoned"); - let (uentry, dentry) = metadata.entry(rkey).or_insert((None, desc)); - if unit.is_some() { - *uentry = unit; + if self.is_per_thread { + PER_THREAD_INNER.with(|cell| { + // Create the inner state if it doesn't yet exist. + // + // SAFETY: It's safe to use `borrow_mut` here, even though the parent method is `&self`, as this is a + // per-thread invocation, so no other caller could possibly be holding a referenced, immutable or + // mutable, at the same time. + let mut maybe_inner = cell.borrow_mut(); + let inner = maybe_inner.get_or_insert_with(Inner::new); + + let mut metadata = inner.metadata.lock().expect("metadata lock poisoned"); + let (uentry, dentry) = metadata.entry(rkey).or_insert((None, desc)); + if unit.is_some() { + *uentry = unit; + } + *dentry = desc; + }); + } else { + let mut metadata = self.inner.metadata.lock().expect("metadata lock poisoned"); + let (uentry, dentry) = metadata.entry(rkey).or_insert((None, desc)); + if unit.is_some() { + *uentry = unit; + } + *dentry = desc; } - *dentry = desc; } fn track_metric(&self, ckey: CompositeKey) { - let mut seen = self.seen.lock().expect("seen lock poisoned"); - seen.insert(ckey, ()); + if self.is_per_thread { + PER_THREAD_INNER.with(|cell| { + // Create the inner state if it doesn't yet exist. + // + // SAFETY: It's safe to use `borrow_mut` here, even though the parent method is `&self`, as this is a + // per-thread invocation, so no other caller could possibly be holding a referenced, immutable or + // mutable, at the same time. + let mut maybe_inner = cell.borrow_mut(); + let inner = maybe_inner.get_or_insert_with(Inner::new); + + let mut seen = inner.seen.lock().expect("seen lock poisoned"); + seen.insert(ckey, ()); + }); + } else { + let mut seen = self.inner.seen.lock().expect("seen lock poisoned"); + seen.insert(ckey, ()); + } } /// Installs this recorder as the global recorder. @@ -180,19 +292,64 @@ impl Recorder for DebuggingRecorder { fn register_counter(&self, key: &Key) -> Counter { let ckey = CompositeKey::new(MetricKind::Counter, key.clone()); self.track_metric(ckey); - self.registry.get_or_create_counter(key, |c| Counter::from_arc(c.clone())) + + if self.is_per_thread { + PER_THREAD_INNER.with(move |cell| { + // Create the inner state if it doesn't yet exist. + // + // SAFETY: It's safe to use `borrow_mut` here, even though the parent method is `&self`, as this is a + // per-thread invocation, so no other caller could possibly be holding a referenced, immutable or + // mutable, at the same time. + let mut maybe_inner = cell.borrow_mut(); + let inner = maybe_inner.get_or_insert_with(Inner::new); + + inner.registry.get_or_create_counter(key, |c| Counter::from_arc(c.clone())) + }) + } else { + self.inner.registry.get_or_create_counter(key, |c| Counter::from_arc(c.clone())) + } } fn register_gauge(&self, key: &Key) -> Gauge { let ckey = CompositeKey::new(MetricKind::Gauge, key.clone()); self.track_metric(ckey); - self.registry.get_or_create_gauge(key, |g| Gauge::from_arc(g.clone())) + + if self.is_per_thread { + PER_THREAD_INNER.with(move |cell| { + // Create the inner state if it doesn't yet exist. + // + // SAFETY: It's safe to use `borrow_mut` here, even though the parent method is `&self`, as this is a + // per-thread invocation, so no other caller could possibly be holding a referenced, immutable or + // mutable, at the same time. + let mut maybe_inner = cell.borrow_mut(); + let inner = maybe_inner.get_or_insert_with(Inner::new); + + inner.registry.get_or_create_gauge(key, |g| Gauge::from_arc(g.clone())) + }) + } else { + self.inner.registry.get_or_create_gauge(key, |g| Gauge::from_arc(g.clone())) + } } fn register_histogram(&self, key: &Key) -> Histogram { let ckey = CompositeKey::new(MetricKind::Histogram, key.clone()); self.track_metric(ckey); - self.registry.get_or_create_histogram(key, |h| Histogram::from_arc(h.clone())) + + if self.is_per_thread { + PER_THREAD_INNER.with(move |cell| { + // Create the inner state if it doesn't yet exist. + // + // SAFETY: It's safe to use `borrow_mut` here, even though the parent method is `&self`, as this is a + // per-thread invocation, so no other caller could possibly be holding a referenced, immutable or + // mutable, at the same time. + let mut maybe_inner = cell.borrow_mut(); + let inner = maybe_inner.get_or_insert_with(Inner::new); + + inner.registry.get_or_create_histogram(key, |h| Histogram::from_arc(h.clone())) + }) + } else { + self.inner.registry.get_or_create_histogram(key, |h| Histogram::from_arc(h.clone())) + } } } @@ -201,3 +358,64 @@ impl Default for DebuggingRecorder { DebuggingRecorder::new() } } + +#[cfg(test)] +mod tests { + use metrics::counter; + + use crate::{CompositeKey, MetricKind}; + + use super::{DebugValue, DebuggingRecorder, Snapshotter}; + + #[test] + fn per_thread() { + // Create the recorder in per-thread mode, get the snapshotter, and then spawn two threads that record some + // metrics. Neither thread should see the metrics of the other, and this main thread running the test should see + // _any_ metrics. + let recorder = DebuggingRecorder::per_thread(); + let snapshotter = recorder.snapshotter(); + + metrics::clear_recorder(); + recorder.install().expect("installing debugging recorder should not fail"); + + let t1 = std::thread::spawn(|| { + counter!("test_counter", 43); + + Snapshotter::current_thread_snapshot() + }); + + let t2 = std::thread::spawn(|| { + counter!("test_counter", 47); + + Snapshotter::current_thread_snapshot() + }); + + let t1_result = + t1.join().expect("thread 1 should not fail").expect("thread 1 should have metrics"); + let t2_result = + t2.join().expect("thread 2 should not fail").expect("thread 2 should have metrics"); + + let main_result = snapshotter.snapshot().into_vec(); + assert!(main_result.is_empty()); + + assert_eq!( + t1_result.into_vec(), + vec![( + CompositeKey::new(MetricKind::Counter, "test_counter".into()), + None, + None, + DebugValue::Counter(43), + )] + ); + + assert_eq!( + t2_result.into_vec(), + vec![( + CompositeKey::new(MetricKind::Counter, "test_counter".into()), + None, + None, + DebugValue::Counter(47), + )] + ); + } +}