Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions metrics-util/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased] - ReleaseDate

### Added

- A new per-thread mode for `DebuggingRecorder` that allows recording metrics on a per-thread basis to better supporting
the testing of metrics in user applications where many tests are concurrently emitting metrics.

## [0.12.0] - 2022-03-10

### Added
Expand Down
280 changes: 249 additions & 31 deletions metrics-util/src/debugging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! metrics in some limited cases.

use core::hash::Hash;
use std::cell::RefCell;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::{collections::HashMap, fmt::Debug};
Expand All @@ -16,6 +17,11 @@ use indexmap::IndexMap;
use metrics::{Counter, Gauge, Histogram, Key, KeyName, Recorder, Unit};
use ordered_float::OrderedFloat;

thread_local! {
/// A per-thread version of the debugging registry/state used only when the debugging recorder is configured to run in per-thread mode.
static PER_THREAD_INNER: RefCell<Option<Inner>> = RefCell::new(None);
}

/// A composite key name that stores both the metric key name and the metric kind.
#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
struct CompositeKeyName(MetricKind, KeyName);
Expand Down Expand Up @@ -59,24 +65,38 @@ pub enum DebugValue {
Histogram(Vec<OrderedFloat<f64>>),
}

struct Inner {
registry: Registry<Key, AtomicStorage>,
seen: Mutex<IndexMap<CompositeKey, ()>>,
metadata: Mutex<IndexMap<CompositeKeyName, (Option<Unit>, &'static str)>>,
}

impl Inner {
fn new() -> Self {
Self {
registry: Registry::atomic(),
seen: Mutex::new(IndexMap::new()),
metadata: Mutex::new(IndexMap::new()),
}
}
}

/// Captures point-in-time snapshots of [`DebuggingRecorder`].
pub struct Snapshotter {
registry: Arc<Registry<Key, AtomicStorage>>,
seen: Arc<Mutex<IndexMap<CompositeKey, ()>>>,
metadata: Arc<Mutex<IndexMap<CompositeKeyName, (Option<Unit>, &'static str)>>>,
inner: Arc<Inner>,
}

impl Snapshotter {
/// Takes a snapshot of the recorder.
pub fn snapshot(&self) -> Snapshot {
let mut snapshot = Vec::new();

let counters = self.registry.get_counter_handles();
let gauges = self.registry.get_gauge_handles();
let histograms = self.registry.get_histogram_handles();
let counters = self.inner.registry.get_counter_handles();
let gauges = self.inner.registry.get_gauge_handles();
let histograms = self.inner.registry.get_histogram_handles();

let seen = self.seen.lock().expect("seen lock poisoned").clone();
let metadata = self.metadata.lock().expect("metadata lock poisoned").clone();
let seen = self.inner.seen.lock().expect("seen lock poisoned").clone();
let metadata = self.inner.metadata.lock().expect("metadata lock poisoned").clone();

for (ck, _) in seen.into_iter() {
let value = match ck.kind() {
Expand Down Expand Up @@ -110,49 +130,141 @@ impl Snapshotter {

Snapshot(snapshot)
}

/// Takes a snapshot of the recorder for the current thread only.
///
/// If no registry exists for the current thread, `None` is returned. Otherwise, `Some(snapshot)` is returned.
pub fn current_thread_snapshot() -> Option<Snapshot> {
PER_THREAD_INNER.with(|maybe_inner| match maybe_inner.borrow().as_ref() {
None => None,
Some(inner) => {
let mut snapshot = Vec::new();

let counters = inner.registry.get_counter_handles();
let gauges = inner.registry.get_gauge_handles();
let histograms = inner.registry.get_histogram_handles();

let seen = inner.seen.lock().expect("seen lock poisoned").clone();
let metadata = inner.metadata.lock().expect("metadata lock poisoned").clone();

for (ck, _) in seen.into_iter() {
let value = match ck.kind() {
MetricKind::Counter => counters
.get(ck.key())
.map(|c| DebugValue::Counter(c.load(Ordering::SeqCst))),
MetricKind::Gauge => gauges.get(ck.key()).map(|g| {
let value = f64::from_bits(g.load(Ordering::SeqCst));
DebugValue::Gauge(value.into())
}),
MetricKind::Histogram => histograms.get(ck.key()).map(|h| {
let mut values = Vec::new();
h.clear_with(|xs| {
values.extend(xs.iter().map(|f| OrderedFloat::from(*f)))
});
DebugValue::Histogram(values)
}),
};

let ckn = CompositeKeyName::new(ck.kind(), ck.key().name().to_string().into());
let (unit, desc) = metadata
.get(&ckn)
.copied()
.map(|(u, d)| (u, Some(d)))
.unwrap_or_else(|| (None, None));

// If there's no value for the key, that means the metric was only ever described, and
// not registered, so don't emit it.
if let Some(value) = value {
snapshot.push((ck, unit, desc, value));
}
}

Some(Snapshot(snapshot))
}
})
}
}

/// A simplistic recorder that can be installed and used for debugging or testing.
///
/// Callers can easily take snapshots of the metrics at any given time and get access
/// to the raw values.
pub struct DebuggingRecorder {
registry: Arc<Registry<Key, AtomicStorage>>,
seen: Arc<Mutex<IndexMap<CompositeKey, ()>>>,
metadata: Arc<Mutex<IndexMap<CompositeKeyName, (Option<Unit>, &'static str)>>>,
inner: Arc<Inner>,
is_per_thread: bool,
}

impl DebuggingRecorder {
/// Creates a new `DebuggingRecorder`.
pub fn new() -> DebuggingRecorder {
DebuggingRecorder {
registry: Arc::new(Registry::atomic()),
seen: Arc::new(Mutex::new(IndexMap::new())),
metadata: Arc::new(Mutex::new(IndexMap::new())),
}
DebuggingRecorder { inner: Arc::new(Inner::new()), is_per_thread: false }
}

/// Creates a new `DebuggingRecorder` in per-thread mode.
///
/// This sends all metrics to a per-thread registry, such that the snapshotter will only see metrics emitted in the
/// thread that the `Snapshotter` is used from. Additionally, as keeping a reference to the original `Snapshotter`
/// around can be tricky, [`Snapshotter::current_thread_snapshot`] can be used to get all of the metrics currently
/// present in the registry for the calling thread, if any were emitted.
///
/// Please note that this recorder must still be installed, but it can be installed multiple times (if the error
/// from `install` is ignored) without clearing or removing any of the existing per-thread metrics, so it's safe to
/// re-create and re-install multiple times in the same test binary if necessary.
pub fn per_thread() -> DebuggingRecorder {
DebuggingRecorder { inner: Arc::new(Inner::new()), is_per_thread: true }
}

/// Gets a `Snapshotter` attached to this recorder.
pub fn snapshotter(&self) -> Snapshotter {
Snapshotter {
registry: self.registry.clone(),
seen: self.seen.clone(),
metadata: self.metadata.clone(),
}
Snapshotter { inner: Arc::clone(&self.inner) }
}

fn describe_metric(&self, rkey: CompositeKeyName, unit: Option<Unit>, desc: &'static str) {
let mut metadata = self.metadata.lock().expect("metadata lock poisoned");
let (uentry, dentry) = metadata.entry(rkey).or_insert((None, desc));
if unit.is_some() {
*uentry = unit;
if self.is_per_thread {
PER_THREAD_INNER.with(|cell| {
// Create the inner state if it doesn't yet exist.
//
// SAFETY: It's safe to use `borrow_mut` here, even though the parent method is `&self`, as this is a
// per-thread invocation, so no other caller could possibly be holding a referenced, immutable or
// mutable, at the same time.
let mut maybe_inner = cell.borrow_mut();
let inner = maybe_inner.get_or_insert_with(Inner::new);

let mut metadata = inner.metadata.lock().expect("metadata lock poisoned");
let (uentry, dentry) = metadata.entry(rkey).or_insert((None, desc));
if unit.is_some() {
*uentry = unit;
}
*dentry = desc;
});
} else {
let mut metadata = self.inner.metadata.lock().expect("metadata lock poisoned");
let (uentry, dentry) = metadata.entry(rkey).or_insert((None, desc));
if unit.is_some() {
*uentry = unit;
}
*dentry = desc;
}
*dentry = desc;
}

fn track_metric(&self, ckey: CompositeKey) {
let mut seen = self.seen.lock().expect("seen lock poisoned");
seen.insert(ckey, ());
if self.is_per_thread {
PER_THREAD_INNER.with(|cell| {
// Create the inner state if it doesn't yet exist.
//
// SAFETY: It's safe to use `borrow_mut` here, even though the parent method is `&self`, as this is a
// per-thread invocation, so no other caller could possibly be holding a referenced, immutable or
// mutable, at the same time.
let mut maybe_inner = cell.borrow_mut();
let inner = maybe_inner.get_or_insert_with(Inner::new);

let mut seen = inner.seen.lock().expect("seen lock poisoned");
seen.insert(ckey, ());
});
} else {
let mut seen = self.inner.seen.lock().expect("seen lock poisoned");
seen.insert(ckey, ());
}
}

/// Installs this recorder as the global recorder.
Expand Down Expand Up @@ -180,19 +292,64 @@ impl Recorder for DebuggingRecorder {
fn register_counter(&self, key: &Key) -> Counter {
let ckey = CompositeKey::new(MetricKind::Counter, key.clone());
self.track_metric(ckey);
self.registry.get_or_create_counter(key, |c| Counter::from_arc(c.clone()))

if self.is_per_thread {
PER_THREAD_INNER.with(move |cell| {
// Create the inner state if it doesn't yet exist.
//
// SAFETY: It's safe to use `borrow_mut` here, even though the parent method is `&self`, as this is a
// per-thread invocation, so no other caller could possibly be holding a referenced, immutable or
// mutable, at the same time.
let mut maybe_inner = cell.borrow_mut();
let inner = maybe_inner.get_or_insert_with(Inner::new);

inner.registry.get_or_create_counter(key, |c| Counter::from_arc(c.clone()))
})
} else {
self.inner.registry.get_or_create_counter(key, |c| Counter::from_arc(c.clone()))
}
}

fn register_gauge(&self, key: &Key) -> Gauge {
let ckey = CompositeKey::new(MetricKind::Gauge, key.clone());
self.track_metric(ckey);
self.registry.get_or_create_gauge(key, |g| Gauge::from_arc(g.clone()))

if self.is_per_thread {
PER_THREAD_INNER.with(move |cell| {
// Create the inner state if it doesn't yet exist.
//
// SAFETY: It's safe to use `borrow_mut` here, even though the parent method is `&self`, as this is a
// per-thread invocation, so no other caller could possibly be holding a referenced, immutable or
// mutable, at the same time.
let mut maybe_inner = cell.borrow_mut();
let inner = maybe_inner.get_or_insert_with(Inner::new);

inner.registry.get_or_create_gauge(key, |g| Gauge::from_arc(g.clone()))
})
} else {
self.inner.registry.get_or_create_gauge(key, |g| Gauge::from_arc(g.clone()))
}
}

fn register_histogram(&self, key: &Key) -> Histogram {
let ckey = CompositeKey::new(MetricKind::Histogram, key.clone());
self.track_metric(ckey);
self.registry.get_or_create_histogram(key, |h| Histogram::from_arc(h.clone()))

if self.is_per_thread {
PER_THREAD_INNER.with(move |cell| {
// Create the inner state if it doesn't yet exist.
//
// SAFETY: It's safe to use `borrow_mut` here, even though the parent method is `&self`, as this is a
// per-thread invocation, so no other caller could possibly be holding a referenced, immutable or
// mutable, at the same time.
let mut maybe_inner = cell.borrow_mut();
let inner = maybe_inner.get_or_insert_with(Inner::new);

inner.registry.get_or_create_histogram(key, |h| Histogram::from_arc(h.clone()))
})
} else {
self.inner.registry.get_or_create_histogram(key, |h| Histogram::from_arc(h.clone()))
}
}
}

Expand All @@ -201,3 +358,64 @@ impl Default for DebuggingRecorder {
DebuggingRecorder::new()
}
}

#[cfg(test)]
mod tests {
use metrics::counter;

use crate::{CompositeKey, MetricKind};

use super::{DebugValue, DebuggingRecorder, Snapshotter};

#[test]
fn per_thread() {
// Create the recorder in per-thread mode, get the snapshotter, and then spawn two threads that record some
// metrics. Neither thread should see the metrics of the other, and this main thread running the test should see
// _any_ metrics.
let recorder = DebuggingRecorder::per_thread();
let snapshotter = recorder.snapshotter();

metrics::clear_recorder();
recorder.install().expect("installing debugging recorder should not fail");

let t1 = std::thread::spawn(|| {
counter!("test_counter", 43);

Snapshotter::current_thread_snapshot()
});

let t2 = std::thread::spawn(|| {
counter!("test_counter", 47);

Snapshotter::current_thread_snapshot()
});

let t1_result =
t1.join().expect("thread 1 should not fail").expect("thread 1 should have metrics");
let t2_result =
t2.join().expect("thread 2 should not fail").expect("thread 2 should have metrics");

let main_result = snapshotter.snapshot().into_vec();
assert!(main_result.is_empty());

assert_eq!(
t1_result.into_vec(),
vec![(
CompositeKey::new(MetricKind::Counter, "test_counter".into()),
None,
None,
DebugValue::Counter(43),
)]
);

assert_eq!(
t2_result.into_vec(),
vec![(
CompositeKey::new(MetricKind::Counter, "test_counter".into()),
None,
None,
DebugValue::Counter(47),
)]
);
}
}