Skip to content

Commit

Permalink
Add with_boundaries hint API for explicit bucket histograms (#2135)
Browse files Browse the repository at this point in the history
Co-authored-by: Cijo Thomas <[email protected]>
  • Loading branch information
utpilla and cijothomas authored Sep 24, 2024
1 parent b244673 commit c8136d9
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 6 deletions.
4 changes: 4 additions & 0 deletions examples/metrics-basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::{runtime, Resource};
use std::error::Error;
use std::vec;

fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
let exporter = opentelemetry_stdout::MetricsExporterBuilder::default()
Expand Down Expand Up @@ -90,6 +91,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let histogram = meter
.f64_histogram("my_histogram")
.with_description("My histogram example description")
// Setting boundaries is optional. By default, the boundaries are set to
// [0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0, 7500.0, 10000.0]
.with_boundaries(vec![0.0, 5.0, 10.0, 15.0, 20.0, 25.0])
.init();

// Record measurements using the histogram instrument.
Expand Down
22 changes: 20 additions & 2 deletions opentelemetry-sdk/src/metrics/meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
None,
)
.map(|i| Counter::new(Arc::new(i)))
}
Expand All @@ -96,6 +97,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
None,
)
.map(|i| Counter::new(Arc::new(i)))
}
Expand All @@ -111,6 +113,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
None,
)?;
if ms.is_empty() {
return Ok(ObservableCounter::new(Arc::new(NoopAsyncInstrument::new())));
Expand Down Expand Up @@ -138,6 +141,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
None,
)?;
if ms.is_empty() {
return Ok(ObservableCounter::new(Arc::new(NoopAsyncInstrument::new())));
Expand All @@ -164,6 +168,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
None,
)
.map(|i| UpDownCounter::new(Arc::new(i)))
}
Expand All @@ -179,6 +184,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
None,
)
.map(|i| UpDownCounter::new(Arc::new(i)))
}
Expand All @@ -194,6 +200,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
None,
)?;
if ms.is_empty() {
return Ok(ObservableUpDownCounter::new(Arc::new(
Expand Down Expand Up @@ -223,6 +230,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
None,
)?;
if ms.is_empty() {
return Ok(ObservableUpDownCounter::new(Arc::new(
Expand All @@ -249,6 +257,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
None,
)
.map(|i| Gauge::new(Arc::new(i)))
}
Expand All @@ -261,6 +270,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
None,
)
.map(|i| Gauge::new(Arc::new(i)))
}
Expand All @@ -273,6 +283,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
None,
)
.map(|i| Gauge::new(Arc::new(i)))
}
Expand All @@ -288,6 +299,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
None,
)?;
if ms.is_empty() {
return Ok(ObservableGauge::new(Arc::new(NoopAsyncInstrument::new())));
Expand Down Expand Up @@ -315,6 +327,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
None,
)?;
if ms.is_empty() {
return Ok(ObservableGauge::new(Arc::new(NoopAsyncInstrument::new())));
Expand Down Expand Up @@ -342,6 +355,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
None,
)?;
if ms.is_empty() {
return Ok(ObservableGauge::new(Arc::new(NoopAsyncInstrument::new())));
Expand All @@ -366,6 +380,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
builder.boundaries,
)
.map(|i| Histogram::new(Arc::new(i)))
}
Expand All @@ -378,6 +393,7 @@ impl InstrumentProvider for SdkMeter {
builder.name,
builder.description,
builder.unit,
builder.boundaries,
)
.map(|i| Histogram::new(Arc::new(i)))
}
Expand Down Expand Up @@ -479,8 +495,9 @@ where
name: Cow<'static, str>,
description: Option<Cow<'static, str>>,
unit: Option<Cow<'static, str>>,
boundaries: Option<Vec<f64>>,
) -> Result<ResolvedMeasures<T>> {
let aggregators = self.measures(kind, name, description, unit)?;
let aggregators = self.measures(kind, name, description, unit, boundaries)?;
Ok(ResolvedMeasures {
measures: aggregators,
})
Expand All @@ -492,6 +509,7 @@ where
name: Cow<'static, str>,
description: Option<Cow<'static, str>>,
unit: Option<Cow<'static, str>>,
boundaries: Option<Vec<f64>>,
) -> Result<Vec<Arc<dyn internal::Measure<T>>>> {
let inst = Instrument {
name,
Expand All @@ -501,7 +519,7 @@ where
scope: self.meter.scope.clone(),
};

self.resolve.measures(inst)
self.resolve.measures(inst, boundaries)
}
}

Expand Down
59 changes: 59 additions & 0 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ mod tests {
histogram_aggregation_helper(Temporality::Delta);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn histogram_aggregation_with_custom_bounds() {
// Run this test with stdout enabled to see output.
// cargo test histogram_aggregation_with_custom_bounds --features=testing -- --nocapture
histogram_aggregation_with_custom_bounds_helper(Temporality::Delta);
histogram_aggregation_with_custom_bounds_helper(Temporality::Cumulative);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn updown_counter_aggregation_cumulative() {
// Run this test with stdout enabled to see output.
Expand Down Expand Up @@ -1790,6 +1798,57 @@ mod tests {
}
}

fn histogram_aggregation_with_custom_bounds_helper(temporality: Temporality) {
let mut test_context = TestContext::new(temporality);
let histogram = test_context
.meter()
.u64_histogram("test_histogram")
.with_boundaries(vec![1.0, 2.5, 5.5])
.init();
histogram.record(1, &[KeyValue::new("key1", "value1")]);
histogram.record(2, &[KeyValue::new("key1", "value1")]);
histogram.record(3, &[KeyValue::new("key1", "value1")]);
histogram.record(4, &[KeyValue::new("key1", "value1")]);
histogram.record(5, &[KeyValue::new("key1", "value1")]);

test_context.flush_metrics();

// Assert
let histogram_data =
test_context.get_aggregation::<data::Histogram<u64>>("test_histogram", None);
// Expecting 2 time-series.
assert_eq!(histogram_data.data_points.len(), 1);
if let Temporality::Cumulative = temporality {
assert_eq!(
histogram_data.temporality,
Temporality::Cumulative,
"Should produce cumulative"
);
} else {
assert_eq!(
histogram_data.temporality,
Temporality::Delta,
"Should produce delta"
);
}

// find and validate key1=value1 datapoint
let data_point =
find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected");

assert_eq!(data_point.count, 5);
assert_eq!(data_point.sum, 15);

// Check the bucket counts
// -∞ to 1.0: 1
// 1.0 to 2.5: 1
// 2.5 to 5.5: 3
// 5.5 to +∞: 0

assert_eq!(vec![1.0, 2.5, 5.5], data_point.bounds);
assert_eq!(vec![1, 1, 3, 0], data_point.bucket_counts);
}
fn gauge_aggregation_helper(temporality: Temporality) {
// Arrange
let mut test_context = TestContext::new(temporality);
Expand Down
24 changes: 20 additions & 4 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ where
///
/// If an instrument is determined to use a [aggregation::Aggregation::Drop],
/// that instrument is not inserted nor returned.
fn instrument(&self, inst: Instrument) -> Result<Vec<Arc<dyn internal::Measure<T>>>> {
fn instrument(
&self,
inst: Instrument,
boundaries: Option<&[f64]>,
) -> Result<Vec<Arc<dyn internal::Measure<T>>>> {
let mut matched = false;
let mut measures = vec![];
let mut errs = vec![];
Expand Down Expand Up @@ -288,14 +292,22 @@ where
}

// Apply implicit default view if no explicit matched.
let stream = Stream {
let mut stream = Stream {
name: inst.name,
description: inst.description,
unit: inst.unit,
aggregation: None,
allowed_attribute_keys: None,
};

// Override default histogram boundaries if provided.
if let Some(boundaries) = boundaries {
stream.aggregation = Some(Aggregation::ExplicitBucketHistogram {
boundaries: boundaries.to_vec(),
record_min_max: true,
});
}

match self.cached_aggregator(&inst.scope, kind, stream) {
Ok(agg) => {
if errs.is_empty() {
Expand Down Expand Up @@ -682,11 +694,15 @@ where
}

/// The measures that must be updated by the instrument defined by key.
pub(crate) fn measures(&self, id: Instrument) -> Result<Vec<Arc<dyn internal::Measure<T>>>> {
pub(crate) fn measures(
&self,
id: Instrument,
boundaries: Option<Vec<f64>>,
) -> Result<Vec<Arc<dyn internal::Measure<T>>>> {
let (mut measures, mut errs) = (vec![], vec![]);

for inserter in &self.inserters {
match inserter.instrument(id.clone()) {
match inserter.instrument(id.clone(), boundaries.as_deref()) {
Ok(ms) => measures.extend(ms),
Err(err) => errs.push(err),
}
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

- **Modified**: `MeterProvider.meter()` and `MeterProvider.versioned_meter()` argument types have been updated to `&'static str` instead of `impl Into<Cow<'static, str>>>` [#2112](https://github.com/open-telemetry/opentelemetry-rust/pull/2112). These APIs were modified to enforce the Meter `name`, `version`, and `schema_url` to be `&'static str`.

- Added `with_boundaries` API to allow users to provide custom bounds for Histogram instruments. [#2135](https://github.com/open-telemetry/opentelemetry-rust/pull/2135)

## v0.25.0

- **BREAKING** [#1993](https://github.com/open-telemetry/opentelemetry-rust/pull/1993) Box complex types in AnyValue enum
Expand Down
11 changes: 11 additions & 0 deletions opentelemetry/src/metrics/instruments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub struct HistogramBuilder<'a, T> {
/// Unit of the Histogram.
pub unit: Option<Cow<'static, str>>,

/// Bucket boundaries for the histogram.
pub boundaries: Option<Vec<f64>>,

// boundaries: Vec<T>,
_marker: marker::PhantomData<T>,
}
Expand All @@ -51,6 +54,7 @@ impl<'a, T> HistogramBuilder<'a, T> {
name,
description: None,
unit: None,
boundaries: None,
_marker: marker::PhantomData,
}
}
Expand All @@ -72,6 +76,12 @@ impl<'a, T> HistogramBuilder<'a, T> {
self.unit = Some(unit.into());
self
}

/// Set the boundaries for this histogram.
pub fn with_boundaries(mut self, boundaries: Vec<f64>) -> Self {
self.boundaries = Some(boundaries);
self
}
}

impl<'a> HistogramBuilder<'a, f64> {
Expand Down Expand Up @@ -198,6 +208,7 @@ impl<T> fmt::Debug for HistogramBuilder<'_, T> {
.field("name", &self.name)
.field("description", &self.description)
.field("unit", &self.unit)
.field("boundaries", &self.boundaries)
.field(
"kind",
&format!("Histogram<{}>", &std::any::type_name::<T>()),
Expand Down

0 comments on commit c8136d9

Please sign in to comment.