From af0b2f4e4696c05f0d23af1bb05e5ef14095dcfb Mon Sep 17 00:00:00 2001 From: koushiro Date: Fri, 30 Aug 2024 13:08:31 +0800 Subject: [PATCH 01/15] refactor(layers/prometheus): provide consistent APIs --- core/src/layers/observe/mod.rs | 35 ++++ core/src/layers/prometheus.rs | 306 ++++++++++++++++++++++----------- 2 files changed, 244 insertions(+), 97 deletions(-) diff --git a/core/src/layers/observe/mod.rs b/core/src/layers/observe/mod.rs index 57e26f3c4b53..d9b7cb70326a 100644 --- a/core/src/layers/observe/mod.rs +++ b/core/src/layers/observe/mod.rs @@ -20,6 +20,7 @@ //! This module offers essential components to facilitate the implementation of observability in OpenDAL. mod metrics; + pub use metrics::MetricMetadata; pub use metrics::MetricsAccessor; pub use metrics::MetricsIntercept; @@ -33,3 +34,37 @@ pub use metrics::LABEL_SCHEME; pub use metrics::METRIC_OPERATION_BYTES; pub use metrics::METRIC_OPERATION_DURATION_SECONDS; pub use metrics::METRIC_OPERATION_ERRORS_TOTAL; + +pub(crate) fn path_label_value(path: &str, path_level: usize) -> Option<&str> { + if path.is_empty() { + return None; + } + + if path_level > 0 { + let label_value = path + .char_indices() + .filter(|&(_, c)| c == '/') + .nth(path_level - 1) + .map_or(path, |(i, _)| &path[..i]); + Some(label_value) + } else { + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_path_label_value() { + let path = "abc/def/ghi"; + assert_eq!(path_label_value(path, 0), None); + assert_eq!(path_label_value(path, 1), Some("abc")); + assert_eq!(path_label_value(path, 2), Some("abc/def")); + assert_eq!(path_label_value(path, 3), Some("abc/def/ghi")); + assert_eq!(path_label_value(path, usize::MAX), Some("abc/def/ghi")); + + assert_eq!(path_label_value("", 1), None); + } +} diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index adbc436ceab0..8949901e15a1 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -20,12 +20,10 @@ use std::time::Duration; use prometheus::core::AtomicU64; use prometheus::core::GenericCounterVec; -use prometheus::exponential_buckets; use prometheus::histogram_opts; -use prometheus::register_histogram_vec_with_registry; -use prometheus::register_int_counter_vec_with_registry; use prometheus::HistogramVec; use prometheus::Registry; +use prometheus::{exponential_buckets, Opts}; use crate::layers::observe; use crate::raw::Access; @@ -53,26 +51,24 @@ use crate::*; /// /// # Examples /// -/// ```no_build -/// use log::debug; -/// use log::info; -/// use opendal::layers::PrometheusLayer; -/// use opendal::services; -/// use opendal::Operator; -/// use opendal::Result; -/// use prometheus::Encoder; +/// ```no_run +/// # use log::debug; +/// # use log::info; +/// # use opendal::layers::PrometheusLayer; +/// # use opendal::services; +/// # use opendal::Operator; +/// # use opendal::Result; +/// # use prometheus::Encoder; /// -/// /// Visit [`opendal::services`] for more service related config. -/// /// Visit [`opendal::Operator`] for more operator level APIs. -/// #[tokio::main] -/// async fn main() -> Result<()> { +/// # #[tokio::main] +/// # async fn main() -> Result<()> { /// // Pick a builder and configure it. /// let builder = services::Memory::default(); /// let registry = prometheus::default_registry(); /// /// let op = Operator::new(builder) /// .expect("must init") -/// .layer(PrometheusLayer::new(registry.clone())) +/// .layer(PrometheusLayer::new(registry)) /// .finish(); /// debug!("operator: {op:?}"); /// @@ -93,47 +89,123 @@ use crate::*; /// println!("## Prometheus Metrics"); /// println!("{}", String::from_utf8(buffer.clone()).unwrap()); /// Ok(()) -/// } +/// # } /// ``` -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct PrometheusLayer { - registry: Registry, - operation_duration_seconds_buckets: Vec, - operation_bytes_buckets: Vec, - path_label_level: usize, + interceptor: PrometheusInterceptor, } impl PrometheusLayer { /// Create a [`PrometheusLayer`] while registering itself to this registry. - pub fn new(registry: Registry) -> Self { - Self { - registry, - operation_duration_seconds_buckets: exponential_buckets(0.01, 2.0, 16).unwrap(), - operation_bytes_buckets: exponential_buckets(1.0, 2.0, 16).unwrap(), - path_label_level: 0, - } + pub fn new(registry: &Registry) -> Self { + let interceptor = PrometheusInterceptor::default(); + Self { interceptor }.register(registry) + } + + /// Register the metrics into the registry. + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let op = Operator::new(builder) + /// .expect("must init") + /// .layer(PrometheusLayer::default().register(registry)) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn register(self, registry: &Registry) -> Self { + self.interceptor.register(registry); + self } /// Set buckets for `operation_duration_seconds` histogram. /// - /// You could call the [`linear_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.linear_buckets.html) - /// or [`exponential_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.exponential_buckets.html) - /// to generate the buckets. + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let buckets = prometheus::exponential_buckets(0.01, 2.0, 16).unwrap(); + /// let op = Operator::new(builder) + /// .expect("must init") + /// .layer( + /// PrometheusLayer::default() + /// .operation_duration_seconds_buckets(buckets) + /// .register(registry) + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` pub fn operation_duration_seconds_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { - self.operation_duration_seconds_buckets = buckets; + self.interceptor = self.interceptor.with_operation_duration_seconds(buckets); } self } /// Set buckets for `operation_bytes` histogram. /// - /// You could call the [`linear_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.linear_buckets.html) - /// or [`exponential_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.exponential_buckets.html) - /// to generate the buckets. + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let buckets = prometheus::exponential_buckets(1.0, 2.0, 16).unwrap(); + /// let op = Operator::new(builder) + /// .expect("must init") + /// .layer( + /// PrometheusLayer::default() + /// .operation_bytes_buckets(buckets) + /// .register(registry) + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` pub fn operation_bytes_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { - self.operation_bytes_buckets = buckets; + self.interceptor = self.interceptor.with_operation_bytes(buckets); } self } @@ -143,8 +215,37 @@ impl PrometheusLayer { /// - level = 0: we will ignore the path label. /// - level > 0: the path label will be the path split by "/" and get the last n level, /// if n=1 and input path is "abc/def/ghi", and then we will get "abc/" as the path label. + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let op = Operator::new(builder) + /// .expect("must init") + /// .layer( + /// PrometheusLayer::default() + /// .enable_path_label(1) + /// .register(registry) + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` pub fn enable_path_label(mut self, level: usize) -> Self { - self.path_label_level = level; + self.interceptor = self.interceptor.with_path_label_level(level); self } } @@ -153,13 +254,7 @@ impl Layer for PrometheusLayer { type LayeredAccess = observe::MetricsAccessor; fn layer(&self, inner: A) -> Self::LayeredAccess { - let interceptor = PrometheusInterceptor::register( - self.registry.clone(), - self.operation_duration_seconds_buckets.clone(), - self.operation_bytes_buckets.clone(), - self.path_label_level, - ); - observe::MetricsLayer::new(interceptor).layer(inner) + observe::MetricsLayer::new(self.interceptor.clone()).layer(inner) } } @@ -171,43 +266,41 @@ pub struct PrometheusInterceptor { path_label_level: usize, } -impl PrometheusInterceptor { - fn register( - registry: Registry, - operation_duration_seconds_buckets: Vec, - operation_bytes_buckets: Vec, - path_label_level: usize, - ) -> Self { +impl Default for PrometheusInterceptor { + fn default() -> Self { + let operation_duration_seconds_buckets = exponential_buckets(0.01, 2.0, 16).unwrap(); + let operation_bytes_buckets = exponential_buckets(1.0, 2.0, 16).unwrap(); + let path_label_level = 0; + let labels = OperationLabels::names(false, path_label_level); - let operation_duration_seconds = register_histogram_vec_with_registry!( + let operation_duration_seconds = HistogramVec::new( histogram_opts!( observe::METRIC_OPERATION_DURATION_SECONDS.name(), observe::METRIC_OPERATION_DURATION_SECONDS.help(), operation_duration_seconds_buckets ), &labels, - registry ) - .unwrap(); - let operation_bytes = register_histogram_vec_with_registry!( + .unwrap(); + let operation_bytes = HistogramVec::new( histogram_opts!( observe::METRIC_OPERATION_BYTES.name(), observe::METRIC_OPERATION_BYTES.help(), operation_bytes_buckets ), &labels, - registry ) - .unwrap(); + .unwrap(); - let labels = OperationLabels::names(true, path_label_level); - let operation_errors_total = register_int_counter_vec_with_registry!( - observe::METRIC_OPERATION_ERRORS_TOTAL.name(), - observe::METRIC_OPERATION_ERRORS_TOTAL.help(), + let labels = OperationLabels::names(false, path_label_level); + let operation_errors_total = GenericCounterVec::new( + Opts::new( + observe::METRIC_OPERATION_ERRORS_TOTAL.name(), + observe::METRIC_OPERATION_ERRORS_TOTAL.help(), + ), &labels, - registry ) - .unwrap(); + .unwrap(); Self { operation_duration_seconds, @@ -218,6 +311,53 @@ impl PrometheusInterceptor { } } +impl PrometheusInterceptor { + fn with_operation_duration_seconds(mut self, buckets: Vec) -> Self { + let labels = OperationLabels::names(false, self.path_label_level); + self.operation_duration_seconds = HistogramVec::new( + histogram_opts!( + observe::METRIC_OPERATION_DURATION_SECONDS.name(), + observe::METRIC_OPERATION_DURATION_SECONDS.help(), + buckets + ), + &labels, + ) + .unwrap(); + self + } + + fn with_operation_bytes(mut self, buckets: Vec) -> Self { + let labels = OperationLabels::names(false, self.path_label_level); + self.operation_bytes = HistogramVec::new( + histogram_opts!( + observe::METRIC_OPERATION_BYTES.name(), + observe::METRIC_OPERATION_BYTES.help(), + buckets + ), + &labels, + ) + .unwrap(); + self + } + + fn with_path_label_level(mut self, level: usize) -> Self { + self.path_label_level = level; + self + } + + fn register(&self, registry: &Registry) { + registry + .register(Box::new(self.operation_duration_seconds.clone())) + .unwrap(); + registry + .register(Box::new(self.operation_bytes.clone())) + .unwrap(); + registry + .register(Box::new(self.operation_errors_total.clone())) + .unwrap(); + } +} + impl observe::MetricsIntercept for PrometheusInterceptor { fn observe_operation_duration_seconds( &self, @@ -236,7 +376,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { error: None, path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_duration_seconds .with_label_values(&labels) @@ -260,7 +400,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { error: None, path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_bytes .with_label_values(&labels) @@ -284,7 +424,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { error: Some(error), path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_errors_total.with_label_values(&labels).inc(); } @@ -337,8 +477,8 @@ impl<'a> OperationLabels<'a> { self.op.into_static(), ]); - if path_label_level > 0 { - labels.push(get_path_label(self.path, path_label_level)); + if let Some(path) = observe::path_label_value(self.path, path_label_level) { + labels.push(path); } if let Some(error) = self.error { @@ -348,31 +488,3 @@ impl<'a> OperationLabels<'a> { labels } } - -fn get_path_label(path: &str, path_level: usize) -> &str { - if path_level > 0 { - return path - .char_indices() - .filter(|&(_, c)| c == '/') - .nth(path_level - 1) - .map_or(path, |(i, _)| &path[..i]); - } - "" -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_get_path_label() { - let path = "abc/def/ghi"; - assert_eq!(get_path_label(path, 0), ""); - assert_eq!(get_path_label(path, 1), "abc"); - assert_eq!(get_path_label(path, 2), "abc/def"); - assert_eq!(get_path_label(path, 3), "abc/def/ghi"); - assert_eq!(get_path_label(path, usize::MAX), "abc/def/ghi"); - - assert_eq!(get_path_label("", 0), ""); - } -} From af4dd4de6c1b89d90b5948e1756292ce18c392b7 Mon Sep 17 00:00:00 2001 From: koushiro Date: Fri, 30 Aug 2024 13:11:38 +0800 Subject: [PATCH 02/15] fix fmt --- core/src/layers/prometheus.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 8949901e15a1..dff2954e9d9b 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -281,7 +281,7 @@ impl Default for PrometheusInterceptor { ), &labels, ) - .unwrap(); + .unwrap(); let operation_bytes = HistogramVec::new( histogram_opts!( observe::METRIC_OPERATION_BYTES.name(), @@ -290,7 +290,7 @@ impl Default for PrometheusInterceptor { ), &labels, ) - .unwrap(); + .unwrap(); let labels = OperationLabels::names(false, path_label_level); let operation_errors_total = GenericCounterVec::new( @@ -300,7 +300,7 @@ impl Default for PrometheusInterceptor { ), &labels, ) - .unwrap(); + .unwrap(); Self { operation_duration_seconds, @@ -322,7 +322,7 @@ impl PrometheusInterceptor { ), &labels, ) - .unwrap(); + .unwrap(); self } @@ -336,7 +336,7 @@ impl PrometheusInterceptor { ), &labels, ) - .unwrap(); + .unwrap(); self } @@ -376,7 +376,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { error: None, path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_duration_seconds .with_label_values(&labels) @@ -400,7 +400,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { error: None, path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_bytes .with_label_values(&labels) @@ -424,7 +424,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { error: Some(error), path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_errors_total.with_label_values(&labels).inc(); } From 2e43e31c5eac438ee18230932f219a6072f7255d Mon Sep 17 00:00:00 2001 From: koushiro Date: Fri, 30 Aug 2024 13:26:52 +0800 Subject: [PATCH 03/15] add feature attr for observe mod --- core/src/layers/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 11a4f3993571..305194765b25 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -108,4 +108,5 @@ mod dtrace; #[cfg(all(target_os = "linux", feature = "layers-dtrace"))] pub use self::dtrace::DtraceLayer; +#[cfg(any(feature = "layers-prometheus", feature = "layers-prometheus-client"))] pub mod observe; From 415689767173944fb27e9eb677650b879b283dea Mon Sep 17 00:00:00 2001 From: koushiro Date: Fri, 30 Aug 2024 14:51:54 +0800 Subject: [PATCH 04/15] add some comments --- core/src/layers/mod.rs | 1 - core/src/layers/observe/mod.rs | 11 ++++++++--- core/src/layers/prometheus.rs | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 305194765b25..11a4f3993571 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -108,5 +108,4 @@ mod dtrace; #[cfg(all(target_os = "linux", feature = "layers-dtrace"))] pub use self::dtrace::DtraceLayer; -#[cfg(any(feature = "layers-prometheus", feature = "layers-prometheus-client"))] pub mod observe; diff --git a/core/src/layers/observe/mod.rs b/core/src/layers/observe/mod.rs index d9b7cb70326a..16109bae1e45 100644 --- a/core/src/layers/observe/mod.rs +++ b/core/src/layers/observe/mod.rs @@ -35,16 +35,21 @@ pub use metrics::METRIC_OPERATION_BYTES; pub use metrics::METRIC_OPERATION_DURATION_SECONDS; pub use metrics::METRIC_OPERATION_ERRORS_TOTAL; -pub(crate) fn path_label_value(path: &str, path_level: usize) -> Option<&str> { +/// Return the path label value according to the given `path` and `level`. +/// +/// - level = 0: return `None`, which means we ignore the path label. +/// - level > 0: the path label will be the path split by "/" and get the last n level, +/// if n=1 and input path is "abc/def/ghi", and then we'll use "abc/" as the path label. +pub fn path_label_value(path: &str, level: usize) -> Option<&str> { if path.is_empty() { return None; } - if path_level > 0 { + if level > 0 { let label_value = path .char_indices() .filter(|&(_, c)| c == '/') - .nth(path_level - 1) + .nth(level - 1) .map_or(path, |(i, _)| &path[..i]); Some(label_value) } else { diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index dff2954e9d9b..a4677456c893 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -214,7 +214,7 @@ impl PrometheusLayer { /// /// - level = 0: we will ignore the path label. /// - level > 0: the path label will be the path split by "/" and get the last n level, - /// if n=1 and input path is "abc/def/ghi", and then we will get "abc/" as the path label. + /// if n=1 and input path is "abc/def/ghi", and then we'll use "abc/" as the path label. /// /// # Example /// From 669c8504fb48e8f9b359cc0b3b0cb7d5740a74cc Mon Sep 17 00:00:00 2001 From: koushiro Date: Tue, 3 Sep 2024 18:03:28 +0800 Subject: [PATCH 05/15] update --- core/src/layers/prometheus.rs | 265 +++++++++++----------------------- 1 file changed, 84 insertions(+), 181 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index a4677456c893..5f174f3bc883 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -91,50 +91,25 @@ use crate::*; /// Ok(()) /// # } /// ``` -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct PrometheusLayer { interceptor: PrometheusInterceptor, } +impl Default for PrometheusLayer { + fn default() -> Self { + let register = prometheus::default_registry(); + Self::new(register) + } +} + impl PrometheusLayer { /// Create a [`PrometheusLayer`] while registering itself to this registry. pub fn new(registry: &Registry) -> Self { - let interceptor = PrometheusInterceptor::default(); - Self { interceptor }.register(registry) - } - - /// Register the metrics into the registry. - /// - /// # Example - /// - /// ```no_run - /// # use log::debug; - /// # use opendal::layers::PrometheusLayer; - /// # use opendal::services; - /// # use opendal::Operator; - /// # use opendal::Result; - /// # - /// # #[tokio::main] - /// # async fn main() -> Result<()> { - /// // Pick a builder and configure it. - /// let builder = services::Memory::default(); - /// let registry = prometheus::default_registry(); - /// - /// let op = Operator::new(builder) - /// .expect("must init") - /// .layer(PrometheusLayer::default().register(registry)) - /// .finish(); - /// debug!("operator: {op:?}"); - /// - /// Ok(()) - /// # } - /// ``` - pub fn register(self, registry: &Registry) -> Self { - self.interceptor.register(registry); - self + PrometheusLayerBuilder::default().register(registry) } - /// Set buckets for `operation_duration_seconds` histogram. + /// Create a [`PrometheusLayerBuilder`]. /// /// # Example /// @@ -151,12 +126,15 @@ impl PrometheusLayer { /// let builder = services::Memory::default(); /// let registry = prometheus::default_registry(); /// - /// let buckets = prometheus::exponential_buckets(0.01, 2.0, 16).unwrap(); + /// let duration_seconds_buckets = prometheus::exponential_buckets(0.01, 2.0, 16).unwrap(); + /// let bytes_buckets = prometheus::exponential_buckets(1.0, 2.0, 16).unwrap(); /// let op = Operator::new(builder) /// .expect("must init") /// .layer( - /// PrometheusLayer::default() - /// .operation_duration_seconds_buckets(buckets) + /// PrometheusLayer::builder() + /// .operation_duration_seconds_buckets(duration_seconds_buckets) + /// .operation_bytes_buckets(bytes_buckets) + /// .enable_path_label(1) /// .register(registry) /// ) /// .finish(); @@ -165,47 +143,49 @@ impl PrometheusLayer { /// Ok(()) /// # } /// ``` + pub fn builder() -> PrometheusLayerBuilder { + PrometheusLayerBuilder::default() + } +} + +impl Layer for PrometheusLayer { + type LayeredAccess = observe::MetricsAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + observe::MetricsLayer::new(self.interceptor.clone()).layer(inner) + } +} + +/// [`PrometheusLayerBuilder`] is a config builder to build a [`PrometheusLayer`]. +pub struct PrometheusLayerBuilder { + operation_duration_seconds_buckets: Vec, + operation_bytes_buckets: Vec, + path_label_level: usize, +} + +impl Default for PrometheusLayerBuilder { + fn default() -> Self { + Self { + operation_duration_seconds_buckets: exponential_buckets(0.01, 2.0, 16).unwrap(), + operation_bytes_buckets: exponential_buckets(1.0, 2.0, 16).unwrap(), + path_label_level: 0, + } + } +} + +impl PrometheusLayerBuilder { + /// Set buckets for `operation_duration_seconds` histogram. pub fn operation_duration_seconds_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { - self.interceptor = self.interceptor.with_operation_duration_seconds(buckets); + self.operation_duration_seconds_buckets = buckets; } self } /// Set buckets for `operation_bytes` histogram. - /// - /// # Example - /// - /// ```no_run - /// # use log::debug; - /// # use opendal::layers::PrometheusLayer; - /// # use opendal::services; - /// # use opendal::Operator; - /// # use opendal::Result; - /// # - /// # #[tokio::main] - /// # async fn main() -> Result<()> { - /// // Pick a builder and configure it. - /// let builder = services::Memory::default(); - /// let registry = prometheus::default_registry(); - /// - /// let buckets = prometheus::exponential_buckets(1.0, 2.0, 16).unwrap(); - /// let op = Operator::new(builder) - /// .expect("must init") - /// .layer( - /// PrometheusLayer::default() - /// .operation_bytes_buckets(buckets) - /// .register(registry) - /// ) - /// .finish(); - /// debug!("operator: {op:?}"); - /// - /// Ok(()) - /// # } - /// ``` pub fn operation_bytes_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { - self.interceptor = self.interceptor.with_operation_bytes(buckets); + self.operation_bytes_buckets = buckets; } self } @@ -214,85 +194,35 @@ impl PrometheusLayer { /// /// - level = 0: we will ignore the path label. /// - level > 0: the path label will be the path split by "/" and get the last n level, - /// if n=1 and input path is "abc/def/ghi", and then we'll use "abc/" as the path label. - /// - /// # Example - /// - /// ```no_run - /// # use log::debug; - /// # use opendal::layers::PrometheusLayer; - /// # use opendal::services; - /// # use opendal::Operator; - /// # use opendal::Result; - /// # - /// # #[tokio::main] - /// # async fn main() -> Result<()> { - /// // Pick a builder and configure it. - /// let builder = services::Memory::default(); - /// let registry = prometheus::default_registry(); - /// - /// let op = Operator::new(builder) - /// .expect("must init") - /// .layer( - /// PrometheusLayer::default() - /// .enable_path_label(1) - /// .register(registry) - /// ) - /// .finish(); - /// debug!("operator: {op:?}"); - /// - /// Ok(()) - /// # } - /// ``` + /// if n=1 and input path is "abc/def/ghi", and then we will get "abc/" as the path label. pub fn enable_path_label(mut self, level: usize) -> Self { - self.interceptor = self.interceptor.with_path_label_level(level); + self.path_label_level = level; self } -} - -impl Layer for PrometheusLayer { - type LayeredAccess = observe::MetricsAccessor; - - fn layer(&self, inner: A) -> Self::LayeredAccess { - observe::MetricsLayer::new(self.interceptor.clone()).layer(inner) - } -} - -#[derive(Clone, Debug)] -pub struct PrometheusInterceptor { - operation_duration_seconds: HistogramVec, - operation_bytes: HistogramVec, - operation_errors_total: GenericCounterVec, - path_label_level: usize, -} - -impl Default for PrometheusInterceptor { - fn default() -> Self { - let operation_duration_seconds_buckets = exponential_buckets(0.01, 2.0, 16).unwrap(); - let operation_bytes_buckets = exponential_buckets(1.0, 2.0, 16).unwrap(); - let path_label_level = 0; - let labels = OperationLabels::names(false, path_label_level); + /// Register the metrics into the registry and return a [`PrometheusLayer`]. + pub fn register(self, registry: &Registry) -> PrometheusLayer { + let labels = OperationLabels::names(false, self.path_label_level); let operation_duration_seconds = HistogramVec::new( histogram_opts!( observe::METRIC_OPERATION_DURATION_SECONDS.name(), observe::METRIC_OPERATION_DURATION_SECONDS.help(), - operation_duration_seconds_buckets + self.operation_duration_seconds_buckets ), &labels, ) - .unwrap(); + .unwrap(); let operation_bytes = HistogramVec::new( histogram_opts!( observe::METRIC_OPERATION_BYTES.name(), observe::METRIC_OPERATION_BYTES.help(), - operation_bytes_buckets + self.operation_bytes_buckets ), &labels, ) - .unwrap(); + .unwrap(); - let labels = OperationLabels::names(false, path_label_level); + let labels = OperationLabels::names(true, self.path_label_level); let operation_errors_total = GenericCounterVec::new( Opts::new( observe::METRIC_OPERATION_ERRORS_TOTAL.name(), @@ -300,64 +230,37 @@ impl Default for PrometheusInterceptor { ), &labels, ) - .unwrap(); - - Self { - operation_duration_seconds, - operation_bytes, - operation_errors_total, - path_label_level, - } - } -} - -impl PrometheusInterceptor { - fn with_operation_duration_seconds(mut self, buckets: Vec) -> Self { - let labels = OperationLabels::names(false, self.path_label_level); - self.operation_duration_seconds = HistogramVec::new( - histogram_opts!( - observe::METRIC_OPERATION_DURATION_SECONDS.name(), - observe::METRIC_OPERATION_DURATION_SECONDS.help(), - buckets - ), - &labels, - ) - .unwrap(); - self - } - - fn with_operation_bytes(mut self, buckets: Vec) -> Self { - let labels = OperationLabels::names(false, self.path_label_level); - self.operation_bytes = HistogramVec::new( - histogram_opts!( - observe::METRIC_OPERATION_BYTES.name(), - observe::METRIC_OPERATION_BYTES.help(), - buckets - ), - &labels, - ) - .unwrap(); - self - } - - fn with_path_label_level(mut self, level: usize) -> Self { - self.path_label_level = level; - self - } + .unwrap(); - fn register(&self, registry: &Registry) { registry - .register(Box::new(self.operation_duration_seconds.clone())) + .register(Box::new(operation_duration_seconds.clone())) .unwrap(); registry - .register(Box::new(self.operation_bytes.clone())) + .register(Box::new(operation_bytes.clone())) .unwrap(); registry - .register(Box::new(self.operation_errors_total.clone())) + .register(Box::new(operation_errors_total.clone())) .unwrap(); + + PrometheusLayer { + interceptor: PrometheusInterceptor { + operation_duration_seconds, + operation_bytes, + operation_errors_total, + path_label_level: self.path_label_level, + }, + } } } +#[derive(Clone, Debug)] +pub struct PrometheusInterceptor { + operation_duration_seconds: HistogramVec, + operation_bytes: HistogramVec, + operation_errors_total: GenericCounterVec, + path_label_level: usize, +} + impl observe::MetricsIntercept for PrometheusInterceptor { fn observe_operation_duration_seconds( &self, @@ -376,7 +279,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { error: None, path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_duration_seconds .with_label_values(&labels) @@ -400,7 +303,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { error: None, path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_bytes .with_label_values(&labels) @@ -424,7 +327,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { error: Some(error), path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_errors_total.with_label_values(&labels).inc(); } From 0ab2369426ed2f38f0f7a4970626313930d53cd9 Mon Sep 17 00:00:00 2001 From: koushiro Date: Tue, 3 Sep 2024 18:07:45 +0800 Subject: [PATCH 06/15] fix fmt --- core/src/layers/prometheus.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 5f174f3bc883..5ecbb54f7237 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -211,7 +211,7 @@ impl PrometheusLayerBuilder { ), &labels, ) - .unwrap(); + .unwrap(); let operation_bytes = HistogramVec::new( histogram_opts!( observe::METRIC_OPERATION_BYTES.name(), @@ -220,7 +220,7 @@ impl PrometheusLayerBuilder { ), &labels, ) - .unwrap(); + .unwrap(); let labels = OperationLabels::names(true, self.path_label_level); let operation_errors_total = GenericCounterVec::new( @@ -230,7 +230,7 @@ impl PrometheusLayerBuilder { ), &labels, ) - .unwrap(); + .unwrap(); registry .register(Box::new(operation_duration_seconds.clone())) @@ -279,7 +279,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { error: None, path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_duration_seconds .with_label_values(&labels) @@ -303,7 +303,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { error: None, path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_bytes .with_label_values(&labels) @@ -327,7 +327,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { error: Some(error), path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_errors_total.with_label_values(&labels).inc(); } From 0657dd3d0ed99fa68dff0cf4f37dd59946ab392b Mon Sep 17 00:00:00 2001 From: koushiro Date: Tue, 3 Sep 2024 18:12:11 +0800 Subject: [PATCH 07/15] update doc --- core/src/layers/prometheus.rs | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 5ecbb54f7237..8c3661d5fd17 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -104,12 +104,37 @@ impl Default for PrometheusLayer { } impl PrometheusLayer { - /// Create a [`PrometheusLayer`] while registering itself to this registry. + /// Create a [`PrometheusLayer`] and register its metrics to the given registry. + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let op = Operator::new(builder) + /// .expect("must init") + /// .layer(PrometheusLayer::new(registry)) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` pub fn new(registry: &Registry) -> Self { PrometheusLayerBuilder::default().register(registry) } - /// Create a [`PrometheusLayerBuilder`]. + /// Create a [`PrometheusLayerBuilder`] to modify the default metric configuration. /// /// # Example /// @@ -200,7 +225,7 @@ impl PrometheusLayerBuilder { self } - /// Register the metrics into the registry and return a [`PrometheusLayer`]. + /// Register the metrics into the given registry and return a [`PrometheusLayer`]. pub fn register(self, registry: &Registry) -> PrometheusLayer { let labels = OperationLabels::names(false, self.path_label_level); let operation_duration_seconds = HistogramVec::new( From 4ac28e166237231bcac27655dd7ab99a1037e38e Mon Sep 17 00:00:00 2001 From: koushiro Date: Tue, 3 Sep 2024 21:45:39 +0800 Subject: [PATCH 08/15] apply review suggestions --- core/src/layers/mod.rs | 2 +- core/src/layers/prometheus.rs | 180 ++++++++++++++++++++++++---------- core/src/types/error.rs | 7 ++ 3 files changed, 136 insertions(+), 53 deletions(-) diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 11a4f3993571..cb76475bc3bf 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -62,7 +62,7 @@ pub use self::mime_guess::MimeGuessLayer; #[cfg(feature = "layers-prometheus")] mod prometheus; #[cfg(feature = "layers-prometheus")] -pub use self::prometheus::PrometheusLayer; +pub use self::prometheus::{PrometheusLayer, PrometheusLayerBuilder}; #[cfg(feature = "layers-prometheus-client")] mod prometheus_client; diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 8c3661d5fd17..aea56a3b6fab 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -66,9 +66,8 @@ use crate::*; /// let builder = services::Memory::default(); /// let registry = prometheus::default_registry(); /// -/// let op = Operator::new(builder) -/// .expect("must init") -/// .layer(PrometheusLayer::new(registry)) +/// let op = Operator::new(builder)? +/// .layer(PrometheusLayer::builder().register(registry)?) /// .finish(); /// debug!("operator: {op:?}"); /// @@ -85,7 +84,7 @@ use crate::*; /// // Export prometheus metrics. /// let mut buffer = Vec::::new(); /// let encoder = prometheus::TextEncoder::new(); -/// encoder.encode(&prometheus::gather(), &mut buffer).unwrap(); +/// encoder.encode(&prometheus::gather(), &mut buffer)?; /// println!("## Prometheus Metrics"); /// println!("{}", String::from_utf8(buffer.clone()).unwrap()); /// Ok(()) @@ -96,15 +95,8 @@ pub struct PrometheusLayer { interceptor: PrometheusInterceptor, } -impl Default for PrometheusLayer { - fn default() -> Self { - let register = prometheus::default_registry(); - Self::new(register) - } -} - impl PrometheusLayer { - /// Create a [`PrometheusLayer`] and register its metrics to the given registry. + /// Create a [`PrometheusLayer`] and register its metrics to the default registry. /// /// # Example /// @@ -119,22 +111,21 @@ impl PrometheusLayer { /// # async fn main() -> Result<()> { /// // Pick a builder and configure it. /// let builder = services::Memory::default(); - /// let registry = prometheus::default_registry(); /// - /// let op = Operator::new(builder) - /// .expect("must init") - /// .layer(PrometheusLayer::new(registry)) + /// let op = Operator::new(builder)? + /// .layer(PrometheusLayer::register_default()) /// .finish(); /// debug!("operator: {op:?}"); /// /// Ok(()) /// # } /// ``` - pub fn new(registry: &Registry) -> Self { - PrometheusLayerBuilder::default().register(registry) + pub fn register_default() -> Result { + let registry = prometheus::default_registry(); + Self::builder().register(registry) } - /// Create a [`PrometheusLayerBuilder`] to modify the default metric configuration. + /// Create a [`PrometheusLayerBuilder`] to set the configuration of metrics. /// /// # Example /// @@ -151,16 +142,15 @@ impl PrometheusLayer { /// let builder = services::Memory::default(); /// let registry = prometheus::default_registry(); /// - /// let duration_seconds_buckets = prometheus::exponential_buckets(0.01, 2.0, 16).unwrap(); - /// let bytes_buckets = prometheus::exponential_buckets(1.0, 2.0, 16).unwrap(); - /// let op = Operator::new(builder) - /// .expect("must init") + /// let duration_seconds_buckets = prometheus::exponential_buckets(0.01, 2.0, 16)?; + /// let bytes_buckets = prometheus::exponential_buckets(1.0, 2.0, 16)?; + /// let op = Operator::new(builder)? /// .layer( /// PrometheusLayer::builder() /// .operation_duration_seconds_buckets(duration_seconds_buckets) /// .operation_bytes_buckets(bytes_buckets) /// .enable_path_label(1) - /// .register(registry) + /// .register(registry)? /// ) /// .finish(); /// debug!("operator: {op:?}"); @@ -169,7 +159,14 @@ impl PrometheusLayer { /// # } /// ``` pub fn builder() -> PrometheusLayerBuilder { - PrometheusLayerBuilder::default() + let operation_duration_seconds_buckets = exponential_buckets(0.01, 2.0, 16).unwrap(); + let operation_bytes_buckets = exponential_buckets(1.0, 2.0, 16).unwrap(); + let path_label_level = 0; + PrometheusLayerBuilder::new( + operation_duration_seconds_buckets, + operation_bytes_buckets, + path_label_level, + ) } } @@ -188,18 +185,49 @@ pub struct PrometheusLayerBuilder { path_label_level: usize, } -impl Default for PrometheusLayerBuilder { - fn default() -> Self { +impl PrometheusLayerBuilder { + fn new( + operation_duration_seconds_buckets: Vec, + operation_bytes_buckets: Vec, + path_label_level: usize, + ) -> Self { Self { - operation_duration_seconds_buckets: exponential_buckets(0.01, 2.0, 16).unwrap(), - operation_bytes_buckets: exponential_buckets(1.0, 2.0, 16).unwrap(), - path_label_level: 0, + operation_duration_seconds_buckets, + operation_bytes_buckets, + path_label_level, } } -} -impl PrometheusLayerBuilder { /// Set buckets for `operation_duration_seconds` histogram. + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let buckets = prometheus::exponential_buckets(0.01, 2.0, 16)?; + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .operation_duration_seconds_buckets(buckets) + /// .register(registry)? + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` pub fn operation_duration_seconds_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { self.operation_duration_seconds_buckets = buckets; @@ -208,6 +236,35 @@ impl PrometheusLayerBuilder { } /// Set buckets for `operation_bytes` histogram. + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let buckets = prometheus::exponential_buckets(1.0, 2.0, 16)?; + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .operation_bytes_buckets(buckets) + /// .register(registry)? + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` pub fn operation_bytes_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { self.operation_bytes_buckets = buckets; @@ -220,13 +277,41 @@ impl PrometheusLayerBuilder { /// - level = 0: we will ignore the path label. /// - level > 0: the path label will be the path split by "/" and get the last n level, /// if n=1 and input path is "abc/def/ghi", and then we will get "abc/" as the path label. + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .enable_path_label(1) + /// .register(registry)? + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` pub fn enable_path_label(mut self, level: usize) -> Self { self.path_label_level = level; self } /// Register the metrics into the given registry and return a [`PrometheusLayer`]. - pub fn register(self, registry: &Registry) -> PrometheusLayer { + pub fn register(self, registry: &Registry) -> Result { let labels = OperationLabels::names(false, self.path_label_level); let operation_duration_seconds = HistogramVec::new( histogram_opts!( @@ -235,8 +320,7 @@ impl PrometheusLayerBuilder { self.operation_duration_seconds_buckets ), &labels, - ) - .unwrap(); + )?; let operation_bytes = HistogramVec::new( histogram_opts!( observe::METRIC_OPERATION_BYTES.name(), @@ -244,8 +328,7 @@ impl PrometheusLayerBuilder { self.operation_bytes_buckets ), &labels, - ) - .unwrap(); + )?; let labels = OperationLabels::names(true, self.path_label_level); let operation_errors_total = GenericCounterVec::new( @@ -254,27 +337,20 @@ impl PrometheusLayerBuilder { observe::METRIC_OPERATION_ERRORS_TOTAL.help(), ), &labels, - ) - .unwrap(); - - registry - .register(Box::new(operation_duration_seconds.clone())) - .unwrap(); - registry - .register(Box::new(operation_bytes.clone())) - .unwrap(); - registry - .register(Box::new(operation_errors_total.clone())) - .unwrap(); - - PrometheusLayer { + )?; + + registry.register(Box::new(operation_duration_seconds.clone()))?; + registry.register(Box::new(operation_bytes.clone()))?; + registry.register(Box::new(operation_errors_total.clone()))?; + + Ok(PrometheusLayer { interceptor: PrometheusInterceptor { operation_duration_seconds, operation_bytes, operation_errors_total, path_label_level: self.path_label_level, }, - } + }) } } diff --git a/core/src/types/error.rs b/core/src/types/error.rs index 4d55142732c6..ba25b5f7057a 100644 --- a/core/src/types/error.rs +++ b/core/src/types/error.rs @@ -414,6 +414,13 @@ impl From for io::Error { } } +#[cfg(feature = "layers-prometheus")] +impl From for Error { + fn from(err: prometheus::Error) -> Self { + Self::new(ErrorKind::Unexpected, err.to_string()) + } +} + #[cfg(test)] mod tests { use anyhow::anyhow; From c32510e9b2620068c1524e1455a31a82421169fe Mon Sep 17 00:00:00 2001 From: koushiro Date: Tue, 3 Sep 2024 21:46:48 +0800 Subject: [PATCH 09/15] adjust imports --- core/src/layers/prometheus.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index aea56a3b6fab..0876b258c7e3 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -20,10 +20,11 @@ use std::time::Duration; use prometheus::core::AtomicU64; use prometheus::core::GenericCounterVec; +use prometheus::exponential_buckets; use prometheus::histogram_opts; use prometheus::HistogramVec; +use prometheus::Opts; use prometheus::Registry; -use prometheus::{exponential_buckets, Opts}; use crate::layers::observe; use crate::raw::Access; From a2b458c0abbd254478215e642977922fd002846e Mon Sep 17 00:00:00 2001 From: koushiro Date: Tue, 3 Sep 2024 23:12:51 +0800 Subject: [PATCH 10/15] fix doc --- core/src/layers/prometheus.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 0876b258c7e3..1947f4f15bb9 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -114,7 +114,7 @@ impl PrometheusLayer { /// let builder = services::Memory::default(); /// /// let op = Operator::new(builder)? - /// .layer(PrometheusLayer::register_default()) + /// .layer(PrometheusLayer::register_default()?) /// .finish(); /// debug!("operator: {op:?}"); /// From 9b5c0968229d7d35970bf1ff05a303a75b61b49e Mon Sep 17 00:00:00 2001 From: koushiro Date: Tue, 3 Sep 2024 23:32:07 +0800 Subject: [PATCH 11/15] improve prometheus error handling --- core/src/layers/prometheus.rs | 47 ++++++++++++++++++++++------------- core/src/types/error.rs | 13 +++++----- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 1947f4f15bb9..ae6e3a1ba09b 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -68,7 +68,7 @@ use crate::*; /// let registry = prometheus::default_registry(); /// /// let op = Operator::new(builder)? -/// .layer(PrometheusLayer::builder().register(registry)?) +/// .layer(PrometheusLayer::builder().register(registry).expect("register metrics successfully")) /// .finish(); /// debug!("operator: {op:?}"); /// @@ -114,7 +114,7 @@ impl PrometheusLayer { /// let builder = services::Memory::default(); /// /// let op = Operator::new(builder)? - /// .layer(PrometheusLayer::register_default()?) + /// .layer(PrometheusLayer::register_default().expect("register metrics successfully")) /// .finish(); /// debug!("operator: {op:?}"); /// @@ -143,15 +143,16 @@ impl PrometheusLayer { /// let builder = services::Memory::default(); /// let registry = prometheus::default_registry(); /// - /// let duration_seconds_buckets = prometheus::exponential_buckets(0.01, 2.0, 16)?; - /// let bytes_buckets = prometheus::exponential_buckets(1.0, 2.0, 16)?; + /// let duration_seconds_buckets = prometheus::exponential_buckets(0.01, 2.0, 16).unwrap(); + /// let bytes_buckets = prometheus::exponential_buckets(1.0, 2.0, 16).unwrap(); /// let op = Operator::new(builder)? /// .layer( /// PrometheusLayer::builder() /// .operation_duration_seconds_buckets(duration_seconds_buckets) /// .operation_bytes_buckets(bytes_buckets) /// .enable_path_label(1) - /// .register(registry)? + /// .register(registry) + /// .expect("register metrics successfully") /// ) /// .finish(); /// debug!("operator: {op:?}"); @@ -216,12 +217,13 @@ impl PrometheusLayerBuilder { /// let builder = services::Memory::default(); /// let registry = prometheus::default_registry(); /// - /// let buckets = prometheus::exponential_buckets(0.01, 2.0, 16)?; + /// let buckets = prometheus::exponential_buckets(0.01, 2.0, 16).unwrap(); /// let op = Operator::new(builder)? /// .layer( /// PrometheusLayer::builder() /// .operation_duration_seconds_buckets(buckets) - /// .register(registry)? + /// .register(registry) + /// .expect("register metrics successfully") /// ) /// .finish(); /// debug!("operator: {op:?}"); @@ -253,12 +255,13 @@ impl PrometheusLayerBuilder { /// let builder = services::Memory::default(); /// let registry = prometheus::default_registry(); /// - /// let buckets = prometheus::exponential_buckets(1.0, 2.0, 16)?; + /// let buckets = prometheus::exponential_buckets(1.0, 2.0, 16).unwrap(); /// let op = Operator::new(builder)? /// .layer( /// PrometheusLayer::builder() /// .operation_bytes_buckets(buckets) - /// .register(registry)? + /// .register(registry) + /// .expect("register metrics successfully") /// ) /// .finish(); /// debug!("operator: {op:?}"); @@ -298,7 +301,8 @@ impl PrometheusLayerBuilder { /// .layer( /// PrometheusLayer::builder() /// .enable_path_label(1) - /// .register(registry)? + /// .register(registry) + /// .expect("register metrics successfully") /// ) /// .finish(); /// debug!("operator: {op:?}"); @@ -321,7 +325,8 @@ impl PrometheusLayerBuilder { self.operation_duration_seconds_buckets ), &labels, - )?; + ) + .map_err(Error::parse_prometheus_error)?; let operation_bytes = HistogramVec::new( histogram_opts!( observe::METRIC_OPERATION_BYTES.name(), @@ -329,7 +334,8 @@ impl PrometheusLayerBuilder { self.operation_bytes_buckets ), &labels, - )?; + ) + .map_err(Error::parse_prometheus_error)?; let labels = OperationLabels::names(true, self.path_label_level); let operation_errors_total = GenericCounterVec::new( @@ -338,11 +344,18 @@ impl PrometheusLayerBuilder { observe::METRIC_OPERATION_ERRORS_TOTAL.help(), ), &labels, - )?; - - registry.register(Box::new(operation_duration_seconds.clone()))?; - registry.register(Box::new(operation_bytes.clone()))?; - registry.register(Box::new(operation_errors_total.clone()))?; + ) + .map_err(Error::parse_prometheus_error)?; + + registry + .register(Box::new(operation_duration_seconds.clone())) + .map_err(Error::parse_prometheus_error)?; + registry + .register(Box::new(operation_bytes.clone())) + .map_err(Error::parse_prometheus_error)?; + registry + .register(Box::new(operation_errors_total.clone())) + .map_err(Error::parse_prometheus_error)?; Ok(PrometheusLayer { interceptor: PrometheusInterceptor { diff --git a/core/src/types/error.rs b/core/src/types/error.rs index ba25b5f7057a..73c751e5d43b 100644 --- a/core/src/types/error.rs +++ b/core/src/types/error.rs @@ -400,6 +400,12 @@ impl Error { pub fn is_temporary(&self) -> bool { self.status == ErrorStatus::Temporary } + + /// Convert the [`prometheus::Error`] to [`Self`]. + #[cfg(feature = "layers-prometheus")] + pub fn parse_prometheus_error(err: prometheus::Error) -> Self { + Self::new(ErrorKind::Unexpected, err.to_string()).set_source(err) + } } impl From for io::Error { @@ -414,13 +420,6 @@ impl From for io::Error { } } -#[cfg(feature = "layers-prometheus")] -impl From for Error { - fn from(err: prometheus::Error) -> Self { - Self::new(ErrorKind::Unexpected, err.to_string()) - } -} - #[cfg(test)] mod tests { use anyhow::anyhow; From 6c49506f88e72ed6d5e0c5f32101ee2b3b1ede36 Mon Sep 17 00:00:00 2001 From: koushiro Date: Tue, 3 Sep 2024 23:42:45 +0800 Subject: [PATCH 12/15] move parse_prometheus_error --- core/src/layers/prometheus.rs | 17 +++++++++++------ core/src/types/error.rs | 6 ------ 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index ae6e3a1ba09b..474c8dd6f2ee 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -326,7 +326,7 @@ impl PrometheusLayerBuilder { ), &labels, ) - .map_err(Error::parse_prometheus_error)?; + .map_err(parse_prometheus_error)?; let operation_bytes = HistogramVec::new( histogram_opts!( observe::METRIC_OPERATION_BYTES.name(), @@ -335,7 +335,7 @@ impl PrometheusLayerBuilder { ), &labels, ) - .map_err(Error::parse_prometheus_error)?; + .map_err(parse_prometheus_error)?; let labels = OperationLabels::names(true, self.path_label_level); let operation_errors_total = GenericCounterVec::new( @@ -345,17 +345,17 @@ impl PrometheusLayerBuilder { ), &labels, ) - .map_err(Error::parse_prometheus_error)?; + .map_err(parse_prometheus_error)?; registry .register(Box::new(operation_duration_seconds.clone())) - .map_err(Error::parse_prometheus_error)?; + .map_err(parse_prometheus_error)?; registry .register(Box::new(operation_bytes.clone())) - .map_err(Error::parse_prometheus_error)?; + .map_err(parse_prometheus_error)?; registry .register(Box::new(operation_errors_total.clone())) - .map_err(Error::parse_prometheus_error)?; + .map_err(parse_prometheus_error)?; Ok(PrometheusLayer { interceptor: PrometheusInterceptor { @@ -368,6 +368,11 @@ impl PrometheusLayerBuilder { } } +/// Convert the [`prometheus::Error`] to [`Error`]. +fn parse_prometheus_error(err: prometheus::Error) -> Error { + Error::new(ErrorKind::Unexpected, err.to_string()).set_source(err) +} + #[derive(Clone, Debug)] pub struct PrometheusInterceptor { operation_duration_seconds: HistogramVec, diff --git a/core/src/types/error.rs b/core/src/types/error.rs index 73c751e5d43b..4d55142732c6 100644 --- a/core/src/types/error.rs +++ b/core/src/types/error.rs @@ -400,12 +400,6 @@ impl Error { pub fn is_temporary(&self) -> bool { self.status == ErrorStatus::Temporary } - - /// Convert the [`prometheus::Error`] to [`Self`]. - #[cfg(feature = "layers-prometheus")] - pub fn parse_prometheus_error(err: prometheus::Error) -> Self { - Self::new(ErrorKind::Unexpected, err.to_string()).set_source(err) - } } impl From for io::Error { From ec545b4bd738333c01bc1a80d4d4203c8c1ea0cc Mon Sep 17 00:00:00 2001 From: koushiro Date: Tue, 3 Sep 2024 23:56:25 +0800 Subject: [PATCH 13/15] fix doc example --- core/src/layers/prometheus.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 474c8dd6f2ee..bc89701111b5 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -85,7 +85,7 @@ use crate::*; /// // Export prometheus metrics. /// let mut buffer = Vec::::new(); /// let encoder = prometheus::TextEncoder::new(); -/// encoder.encode(&prometheus::gather(), &mut buffer)?; +/// encoder.encode(&prometheus::gather(), &mut buffer).unwrap(); /// println!("## Prometheus Metrics"); /// println!("{}", String::from_utf8(buffer.clone()).unwrap()); /// Ok(()) From 7cae089f65b37b0b553bed6eee1051550d7b0b47 Mon Sep 17 00:00:00 2001 From: koushiro Date: Wed, 4 Sep 2024 01:06:56 +0800 Subject: [PATCH 14/15] move register_default and align path_label --- core/src/layers/prometheus.rs | 96 +++++++++++++++++++++++------------ 1 file changed, 64 insertions(+), 32 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index bc89701111b5..88ffb5283b61 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -97,35 +97,6 @@ pub struct PrometheusLayer { } impl PrometheusLayer { - /// Create a [`PrometheusLayer`] and register its metrics to the default registry. - /// - /// # Example - /// - /// ```no_run - /// # use log::debug; - /// # use opendal::layers::PrometheusLayer; - /// # use opendal::services; - /// # use opendal::Operator; - /// # use opendal::Result; - /// # - /// # #[tokio::main] - /// # async fn main() -> Result<()> { - /// // Pick a builder and configure it. - /// let builder = services::Memory::default(); - /// - /// let op = Operator::new(builder)? - /// .layer(PrometheusLayer::register_default().expect("register metrics successfully")) - /// .finish(); - /// debug!("operator: {op:?}"); - /// - /// Ok(()) - /// # } - /// ``` - pub fn register_default() -> Result { - let registry = prometheus::default_registry(); - Self::builder().register(registry) - } - /// Create a [`PrometheusLayerBuilder`] to set the configuration of metrics. /// /// # Example @@ -150,7 +121,7 @@ impl PrometheusLayer { /// PrometheusLayer::builder() /// .operation_duration_seconds_buckets(duration_seconds_buckets) /// .operation_bytes_buckets(bytes_buckets) - /// .enable_path_label(1) + /// .path_label(1) /// .register(registry) /// .expect("register metrics successfully") /// ) @@ -300,7 +271,7 @@ impl PrometheusLayerBuilder { /// let op = Operator::new(builder)? /// .layer( /// PrometheusLayer::builder() - /// .enable_path_label(1) + /// .path_label(1) /// .register(registry) /// .expect("register metrics successfully") /// ) @@ -310,12 +281,40 @@ impl PrometheusLayerBuilder { /// Ok(()) /// # } /// ``` - pub fn enable_path_label(mut self, level: usize) -> Self { + pub fn path_label(mut self, level: usize) -> Self { self.path_label_level = level; self } /// Register the metrics into the given registry and return a [`PrometheusLayer`]. + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .register(registry) + /// .expect("register metrics successfully") + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` pub fn register(self, registry: &Registry) -> Result { let labels = OperationLabels::names(false, self.path_label_level); let operation_duration_seconds = HistogramVec::new( @@ -366,6 +365,39 @@ impl PrometheusLayerBuilder { }, }) } + + /// Register the metrics into the default registry and return a [`PrometheusLayer`]. + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .register_default() + /// .expect("register metrics successfully") + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn register_default(self) -> Result { + let registry = prometheus::default_registry(); + self.register(registry) + } } /// Convert the [`prometheus::Error`] to [`Error`]. From 04d64a56b97e164776783194d2adf49fa9bd325f Mon Sep 17 00:00:00 2001 From: koushiro Date: Wed, 4 Sep 2024 01:22:02 +0800 Subject: [PATCH 15/15] update doc --- core/src/layers/observe/mod.rs | 11 +++++++++++ core/src/layers/prometheus.rs | 32 +++++++++++++------------------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/core/src/layers/observe/mod.rs b/core/src/layers/observe/mod.rs index 16109bae1e45..c7a9c2dd2887 100644 --- a/core/src/layers/observe/mod.rs +++ b/core/src/layers/observe/mod.rs @@ -18,6 +18,17 @@ //! OpenDAL Observability Layer //! //! This module offers essential components to facilitate the implementation of observability in OpenDAL. +//! +//! # Prometheus Metrics +//! +//! These metrics are essential for understanding the behavior and performance of our applications. +//! +//! | Metric Name | Type | Description | Labels | +//! |------------------------------|-----------|--------------------------------------------------------------|-------------------------------------------------| +//! | operation_duration_seconds | Histogram | Histogram of time spent during opendal operations | scheme, namespace, root, operation, path | +//! | operation_bytes. | Histogram | Histogram of the bytes transferred during opendal operations | scheme, operation, root, operation, path | +//! | operation_errors_total | Counter | Error counter during opendal operations | scheme, operation, root, operation, path, error | +//! mod metrics; diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 88ffb5283b61..458afd36b960 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -35,21 +35,9 @@ use crate::*; /// /// # Prometheus Metrics /// -/// In this section, we will introduce three metrics that are currently being exported by our project. These metrics are essential for understanding the behavior and performance of our applications. -/// -/// -/// | Metric Name | Type | Description | Labels | -/// |-------------------------|----------|---------------------------------------------------|---------------------| -/// | requests_total | Counter | Total times of 'create' operation being called | scheme, operation | -/// | requests_duration_seconds | Histogram | Histogram of the time spent on specific operation | scheme, operation | -/// | bytes_total | Histogram | Total size | scheme, operation | -/// +/// We provide several metrics, please see the documentation of [`observe`] module. /// For a more detailed explanation of these metrics and how they are used, please refer to the [Prometheus documentation](https://prometheus.io/docs/introduction/overview/). /// -/// # Histogram Configuration -/// -/// The metric buckets for these histograms are automatically generated based on the `exponential_buckets(0.01, 2.0, 16)` configuration. -/// /// # Examples /// /// ```no_run @@ -99,6 +87,12 @@ pub struct PrometheusLayer { impl PrometheusLayer { /// Create a [`PrometheusLayerBuilder`] to set the configuration of metrics. /// + /// # Default Configuration + /// + /// - `operation_duration_seconds_buckets`: `exponential_buckets(0.01, 2.0, 16)` + /// - `operation_bytes_buckets`: `exponential_buckets(1.0, 2.0, 16)` + /// - `path_label`: `0` + /// /// # Example /// /// ```no_run @@ -121,7 +115,7 @@ impl PrometheusLayer { /// PrometheusLayer::builder() /// .operation_duration_seconds_buckets(duration_seconds_buckets) /// .operation_bytes_buckets(bytes_buckets) - /// .path_label(1) + /// .path_label(0) /// .register(registry) /// .expect("register metrics successfully") /// ) @@ -427,7 +421,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { scheme, namespace: &namespace, root: &root, - op, + operation: op, error: None, path, } @@ -451,7 +445,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { scheme, namespace: &namespace, root: &root, - op, + operation: op, error: None, path, } @@ -475,7 +469,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { scheme, namespace: &namespace, root: &root, - op, + operation: op, error: Some(error), path, } @@ -489,7 +483,7 @@ struct OperationLabels<'a> { scheme: Scheme, namespace: &'a str, root: &'a str, - op: Operation, + operation: Operation, path: &'a str, error: Option, } @@ -529,7 +523,7 @@ impl<'a> OperationLabels<'a> { self.scheme.into_static(), self.namespace, self.root, - self.op.into_static(), + self.operation.into_static(), ]); if let Some(path) = observe::path_label_value(self.path, path_label_level) {