Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metrics-tracing-context/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ where
let snapshotter = recorder.snapshotter();
let recorder = layer.layer(recorder);

metrics::clear_recorder();
unsafe { metrics::clear_recorder() };
metrics::set_boxed_recorder(Box::new(recorder)).expect("failed to install recorder");

let test_guard =
Expand Down
2 changes: 1 addition & 1 deletion metrics-util/src/debugging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ mod tests {
let recorder = DebuggingRecorder::per_thread();
let snapshotter = recorder.snapshotter();

metrics::clear_recorder();
unsafe { metrics::clear_recorder() };
recorder.install().expect("installing debugging recorder should not fail");

let t1 = std::thread::spawn(|| {
Expand Down
4 changes: 2 additions & 2 deletions metrics-util/src/layers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@
//! let layered = layer.layer(recorder);
//! metrics::set_boxed_recorder(Box::new(layered)).expect("failed to install recorder");
//!
//! # metrics::clear_recorder();
//! # unsafe { metrics::clear_recorder() };
//!
//! // Working with layers directly is a bit cumbersome, though, so let's use a `Stack`.
//! let stack = Stack::new(BasicRecorder);
//! stack.push(StairwayDenyLayer::default()).install().expect("failed to install stack");
//!
//! # metrics::clear_recorder();
//! # unsafe { metrics::clear_recorder() };
//!
//! // `Stack` makes it easy to chain layers together, as well.
//! let stack = Stack::new(BasicRecorder);
Expand Down
22 changes: 16 additions & 6 deletions metrics/benches/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,24 @@ impl Recorder for TestRecorder {
}

fn reset_recorder() {
let recorder = unsafe { &*Box::into_raw(Box::new(TestRecorder::default())) };
let recorder = Box::leak(Box::new(TestRecorder::default()));
unsafe { metrics::set_recorder_racy(recorder).unwrap() }
}

fn macro_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("macros");
group.bench_function("uninitialized/no_labels", |b| {
metrics::clear_recorder();
unsafe {
metrics::clear_recorder();
}
b.iter(|| {
counter!("counter_bench", 42);
})
});
group.bench_function("uninitialized/with_static_labels", |b| {
metrics::clear_recorder();
unsafe {
metrics::clear_recorder();
}
b.iter(|| {
counter!("counter_bench", 42, "request" => "http", "svc" => "admin");
})
Expand All @@ -47,14 +51,18 @@ fn macro_benchmark(c: &mut Criterion) {
b.iter(|| {
counter!("counter_bench", 42);
});
metrics::clear_recorder();
unsafe {
metrics::clear_recorder();
}
});
group.bench_function("initialized/with_static_labels", |b| {
reset_recorder();
b.iter(|| {
counter!("counter_bench", 42, "request" => "http", "svc" => "admin");
});
metrics::clear_recorder();
unsafe {
metrics::clear_recorder();
}
});
group.bench_function("initialized/with_dynamic_labels", |b| {
let label_val = thread_rng().gen::<u64>().to_string();
Expand All @@ -63,7 +71,9 @@ fn macro_benchmark(c: &mut Criterion) {
b.iter(move || {
counter!("counter_bench", 42, "request" => "http", "uid" => label_val.clone());
});
metrics::clear_recorder();
unsafe {
metrics::clear_recorder();
}
});
group.finish();
}
Expand Down
166 changes: 111 additions & 55 deletions metrics/src/recorder.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,106 @@
use self::cell::RecorderOnceCell;
use crate::{Counter, Gauge, Histogram, Key, KeyName, Unit};
use core::fmt;
use core::sync::atomic::{AtomicUsize, Ordering};

static mut RECORDER: &'static dyn Recorder = &NoopRecorder;
Comment thread
tobz marked this conversation as resolved.
static STATE: AtomicUsize = AtomicUsize::new(0);
mod cell {
use super::{Recorder, SetRecorderError};
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};

const UNINITIALIZED: usize = 0;
const INITIALIZING: usize = 1;
const INITIALIZED: usize = 2;
// FIXME: This can't be a const new function because trait objects aren't allowed in const fns
// This was stabilized in 1.61, so it can be cleaned up when it becomes the MSRV
Comment thread
tobz marked this conversation as resolved.
#[allow(clippy::declare_interior_mutable_const)]
pub const INIT: RecorderOnceCell = RecorderOnceCell {
recorder: UnsafeCell::new(None),
state: AtomicUsize::new(UNINITIALIZED),
};

/// The global Recorder instance with a `once_cell`-like API.
pub struct RecorderOnceCell {
recorder: UnsafeCell<Option<&'static dyn Recorder>>,
state: AtomicUsize,
}

/// The recorder is uninit and can be set.
const UNINITIALIZED: usize = 0;
/// The recorder is currently being initialized.
const INITIALIZING: usize = 1;
/// The recorder has been initialized successfully and can be read.
const INITIALIZED: usize = 2;

impl RecorderOnceCell {
#[cfg(atomic_cas)]
pub fn set(&self, recorder: &'static dyn Recorder) -> Result<(), SetRecorderError> {
Comment thread
Noratrieb marked this conversation as resolved.
// Acquire the lock because the write below must not be reordered above the CAS.
match self.state.compare_exchange(
UNINITIALIZED,
INITIALIZING,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(UNINITIALIZED) => {
unsafe {
// SAFETY: Access is unique because we CASed the state to INITIALIZING above
self.recorder.get().write(Some(recorder));
}
// Release the lock, others can now read it - but not write
self.state.store(INITIALIZED, Ordering::Release);
Ok(())
}
_ => Err(SetRecorderError(())),
}
}

/// Clears the currently installed recorder, allowing a new writer to override it.
/// # Safety
/// The caller must guarantee that no reader has read the state before we do this and then
/// reads the recorder after another writer has written to it after us.
Comment thread
Noratrieb marked this conversation as resolved.
pub unsafe fn clear(&self) {
// Set the state to `UNINIT` to allow the next writer to write again.
// This is not a problem for readers since their `&'static` refs will remain
// valid forever.
self.state.store(UNINITIALIZED, Ordering::Relaxed);
}

pub fn try_load(&self) -> Option<&'static dyn Recorder> {
if self.state.load(Ordering::Acquire) != INITIALIZED {
None
} else {
// SAFETY: Thanks to `Acquire` above we make sure that this doesn't get
// reordered above this and therefore no writer is here
unsafe { self.recorder.get().read() }
}
}

pub unsafe fn set_racy(
&self,
recorder: &'static dyn Recorder,
) -> Result<(), SetRecorderError> {
match self.state.load(Ordering::Relaxed) {
UNINITIALIZED => {
// SAFETY: Caller guarantees that access is unique
self.recorder.get().write(Some(recorder));
self.state.store(INITIALIZED, Ordering::Release);
Ok(())
}
INITIALIZING => {
// This is just plain UB, since we were racing another initialization function
unreachable!(
"set_recorder_racy must not be used with other initialization functions"
)
}
_ => Err(SetRecorderError(())),
}
}
}

// SAFETY: We can only mutate through `set` - which is protected by the `state` and unsafe
// function where the caller has to guarantee synced-ness
unsafe impl Send for RecorderOnceCell {}
unsafe impl Sync for RecorderOnceCell {}
}

static RECORDER: RecorderOnceCell = cell::INIT;

static SET_RECORDER_ERROR: &str =
"attempted to set a recorder after the metrics system was already initialized";
Expand Down Expand Up @@ -85,7 +178,7 @@ impl Recorder for NoopRecorder {
/// An error is returned if a recorder has already been set.
#[cfg(atomic_cas)]
pub fn set_recorder(recorder: &'static dyn Recorder) -> Result<(), SetRecorderError> {
set_recorder_inner(|| recorder)
RECORDER.set(recorder)
}

/// Sets the global recorder to a `Box<Recorder>`.
Expand All @@ -101,33 +194,7 @@ pub fn set_recorder(recorder: &'static dyn Recorder) -> Result<(), SetRecorderEr
/// An error is returned if a recorder has already been set.
#[cfg(atomic_cas)]
pub fn set_boxed_recorder(recorder: Box<dyn Recorder>) -> Result<(), SetRecorderError> {
set_recorder_inner(|| unsafe { &*Box::into_raw(recorder) })
}

#[cfg(atomic_cas)]
fn set_recorder_inner<F>(make_recorder: F) -> Result<(), SetRecorderError>
where
F: FnOnce() -> &'static dyn Recorder,
{
unsafe {
match STATE.compare_exchange(
UNINITIALIZED,
INITIALIZING,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(UNINITIALIZED) => {
RECORDER = make_recorder();
STATE.store(INITIALIZED, Ordering::SeqCst);
Ok(())
}
Err(INITIALIZING) => {
while STATE.load(Ordering::SeqCst) == INITIALIZING {}
Err(SetRecorderError(()))
}
_ => Err(SetRecorderError(())),
}
}
RECORDER.set(Box::leak(recorder))
}

/// A thread-unsafe version of [`set_recorder`].
Expand All @@ -148,18 +215,7 @@ where
/// It is safe to use other metrics functions while this function runs (including all metrics
/// macros).
pub unsafe fn set_recorder_racy(recorder: &'static dyn Recorder) -> Result<(), SetRecorderError> {
match STATE.load(Ordering::SeqCst) {
UNINITIALIZED => {
RECORDER = recorder;
STATE.store(INITIALIZED, Ordering::SeqCst);
Ok(())
}
INITIALIZING => {
// This is just plain UB, since we were racing another initialization function
unreachable!("set_recorder_racy must not be used with other initialization functions")
}
_ => Err(SetRecorderError(())),
}
RECORDER.set_racy(recorder)
}

/// Clears the currently configured recorder.
Expand All @@ -168,9 +224,15 @@ pub unsafe fn set_recorder_racy(recorder: &'static dyn Recorder) -> Result<(), S
/// and drop the installed recorder when clearing. Thus, any existing recorder will stay leaked.
///
/// This method is typically only useful for testing or benchmarking.
///
/// # Safety
///
/// This function must not be called during any readers reading or writers writing.
/// The caller can cause readers and writers to race if they are in reading/writing while
/// this function is called.
#[doc(hidden)]
pub fn clear_recorder() {
STATE.store(UNINITIALIZED, Ordering::SeqCst);
pub unsafe fn clear_recorder() {
RECORDER.clear();
}

/// The type returned by [`set_recorder`] if [`set_recorder`] has already been called.
Expand Down Expand Up @@ -202,11 +264,5 @@ pub fn recorder() -> &'static dyn Recorder {
///
/// If a recorder has not been set, returns `None`.
pub fn try_recorder() -> Option<&'static dyn Recorder> {
unsafe {
if STATE.load(Ordering::Relaxed) != INITIALIZED {
None
} else {
Some(RECORDER)
}
}
RECORDER.try_load()
}