diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 9c443ec013dc..dc61d910d422 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -16,7 +16,6 @@ // under the License. use std::fmt; -use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; @@ -25,10 +24,10 @@ use prometheus_client::encoding::EncodeLabelSet; use prometheus_client::encoding::LabelSetEncoder; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; -use prometheus_client::metrics::histogram; +use prometheus_client::metrics::family::MetricConstructor; +use prometheus_client::metrics::histogram::exponential_buckets; use prometheus_client::metrics::histogram::Histogram; use prometheus_client::registry::Registry; -use prometheus_client::registry::Unit; use crate::layers::observe; use crate::raw::*; @@ -36,28 +35,29 @@ use crate::*; /// Add [prometheus-client](https://docs.rs/prometheus-client) for every operation. /// +/// # Prometheus Metrics +/// +/// We provide several metrics, please see the documentation of [`observe`] module. +/// For a more detailed explanation of these metrics and how they are used, please refer to the [Prometheus documentation](https://prometheus.io/docs/introduction/overview/). +/// /// # Examples /// -/// ```no_build -/// use log::debug; -/// use log::info; -/// use opendal::layers::PrometheusClientLayer; -/// use opendal::layers::PrometheusClientInterceptor; -/// use opendal::services; -/// use opendal::Operator; -/// use opendal::Result; +/// ```no_run +/// # use log::debug; +/// # use log::info; +/// # use opendal::layers::PrometheusClientLayer; +/// # use opendal::services; +/// # use opendal::Operator; +/// # use opendal::Result; /// -/// /// Visit [`opendal::services`] for more service related config. -/// /// Visit [`opendal::Operator`] for more operator level APIs. -/// #[tokio::main] -/// async fn main() -> Result<()> { +/// # #[tokio::main] +/// # async fn main() -> Result<()> { /// // Pick a builder and configure it. /// let builder = services::Memory::default(); /// let mut registry = prometheus_client::registry::Registry::default(); /// -/// let op = Operator::new(builder) -/// .expect("must init") -/// .layer(PrometheusClientLayer::new(&mut registry)) +/// let op = Operator::new(builder)? +/// .layer(PrometheusClientLayer::builder().register(&mut registry)) /// .finish(); /// debug!("operator: {op:?}"); /// @@ -77,68 +77,286 @@ use crate::*; /// println!("## Prometheus Metrics"); /// println!("{}", buf); /// Ok(()) -/// } +/// # } /// ``` #[derive(Clone, Debug)] -pub struct PrometheusClientLayer(observe::MetricsLayer); +pub struct PrometheusClientLayer { + interceptor: PrometheusClientInterceptor, +} impl PrometheusClientLayer { - /// Create a new [`PrometheusClientLayer`]. - pub fn new(registry: &mut Registry) -> Self { - let operation_duration_seconds = Family::::new_with_constructor(|| { - let buckets = histogram::exponential_buckets(0.01, 2.0, 16); - Histogram::new(buckets) - }); - registry.register_with_unit( + /// Create a [`PrometheusClientLayerBuilder`] to set the configuration of metrics. + /// + /// # Default Configuration + /// + /// - `operation_duration_seconds_buckets`: `exponential_buckets(0.01, 2.0, 16)` + /// - `operation_bytes_buckets`: `exponential_buckets(1.0, 2.0, 16)` + /// - `path_label`: `0` + /// + /// # Examples + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusClientLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let mut registry = prometheus_client::registry::Registry::default(); + /// + /// let duration_seconds_buckets = prometheus_client::metrics::histogram::exponential_buckets(0.01, 2.0, 16).collect(); + /// let bytes_buckets = prometheus_client::metrics::histogram::exponential_buckets(1.0, 2.0, 16).collect(); + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusClientLayer::builder() + /// .operation_duration_seconds_buckets(duration_seconds_buckets) + /// .operation_bytes_buckets(bytes_buckets) + /// .path_label(0) + /// .register(&mut registry) + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn builder() -> PrometheusClientLayerBuilder { + let operation_duration_seconds_buckets = exponential_buckets(0.01, 2.0, 16).collect(); + let operation_bytes_buckets = exponential_buckets(1.0, 2.0, 16).collect(); + PrometheusClientLayerBuilder::new( + operation_duration_seconds_buckets, + operation_bytes_buckets, + 0, + ) + } +} + +impl Layer for PrometheusClientLayer { + type LayeredAccess = observe::MetricsAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + observe::MetricsLayer::new(self.interceptor.clone()).layer(inner) + } +} + +/// [`PrometheusClientLayerBuilder`] is a config builder to build a [`PrometheusClientLayer`]. +pub struct PrometheusClientLayerBuilder { + operation_duration_seconds_buckets: Vec, + operation_bytes_buckets: Vec, + path_label_level: usize, +} + +impl PrometheusClientLayerBuilder { + fn new( + operation_duration_seconds_buckets: Vec, + operation_bytes_buckets: Vec, + path_label_level: usize, + ) -> Self { + Self { + operation_duration_seconds_buckets, + operation_bytes_buckets, + path_label_level, + } + } + + /// Set buckets for `operation_duration_seconds` histogram. + /// + /// # Examples + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusClientLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let mut registry = prometheus_client::registry::Registry::default(); + /// + /// let buckets = prometheus_client::metrics::histogram::exponential_buckets(0.01, 2.0, 16).collect(); + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusClientLayer::builder() + /// .operation_duration_seconds_buckets(buckets) + /// .register(&mut registry) + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn operation_duration_seconds_buckets(mut self, buckets: Vec) -> Self { + if !buckets.is_empty() { + self.operation_duration_seconds_buckets = buckets; + } + self + } + + /// Set buckets for `operation_bytes` histogram. + /// + /// # Examples + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusClientLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let mut registry = prometheus_client::registry::Registry::default(); + /// + /// let buckets = prometheus_client::metrics::histogram::exponential_buckets(1.0, 2.0, 16).collect(); + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusClientLayer::builder() + /// .operation_bytes_buckets(buckets) + /// .register(&mut registry) + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn operation_bytes_buckets(mut self, buckets: Vec) -> Self { + if !buckets.is_empty() { + self.operation_bytes_buckets = buckets; + } + self + } + + /// Set the level of path label. + /// + /// - level = 0: we will ignore the path label. + /// - level > 0: the path label will be the path split by "/" and get the last n level, + /// if n=1 and input path is "abc/def/ghi", and then we will get "abc/" as the path label. + /// + /// # Examples + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusClientLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let mut registry = prometheus_client::registry::Registry::default(); + /// + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusClientLayer::builder() + /// .path_label(1) + /// .register(&mut registry) + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn path_label(mut self, level: usize) -> Self { + self.path_label_level = level; + self + } + + /// Register the metrics into the registry and return a [`PrometheusClientLayer`]. + /// + /// # Examples + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusClientLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let mut registry = prometheus_client::registry::Registry::default(); + /// + /// let op = Operator::new(builder)? + /// .layer(PrometheusClientLayer::builder().register(&mut registry)) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn register(self, registry: &mut Registry) -> PrometheusClientLayer { + let operation_duration_seconds = + Family::::new_with_constructor(HistogramConstructor { + buckets: self.operation_duration_seconds_buckets, + }); + let operation_bytes = + Family::::new_with_constructor(HistogramConstructor { + buckets: self.operation_bytes_buckets, + }); + let operation_errors_total = Family::::default(); + + registry.register( observe::METRIC_OPERATION_DURATION_SECONDS.name(), observe::METRIC_OPERATION_DURATION_SECONDS.help(), - Unit::Seconds, operation_duration_seconds.clone(), ); - - let operation_bytes = Family::::new_with_constructor(|| { - let buckets = histogram::exponential_buckets(1.0, 2.0, 16); - Histogram::new(buckets) - }); - registry.register_with_unit( + registry.register( observe::METRIC_OPERATION_BYTES.name(), observe::METRIC_OPERATION_BYTES.help(), - Unit::Bytes, operation_bytes.clone(), ); - - let operation_errors_total = - Family::::new_with_constructor(|| { - Counter::default() - }); + // `prometheus-client` will automatically add `_total` suffix into the name of counter + // metrics, so we can't use `METRIC_OPERATION_ERRORS_TOTAL.name()` here. registry.register( - observe::METRIC_OPERATION_ERRORS_TOTAL.name(), + "opendal_operation_errors", observe::METRIC_OPERATION_ERRORS_TOTAL.help(), operation_errors_total.clone(), ); - let interceptor = PrometheusClientInterceptor { - operation_duration_seconds, - operation_bytes, - operation_errors_total, - }; - Self(observe::MetricsLayer::new(interceptor)) + PrometheusClientLayer { + interceptor: PrometheusClientInterceptor { + operation_duration_seconds, + operation_bytes, + operation_errors_total, + path_label_level: self.path_label_level, + }, + } } } -impl Layer for PrometheusClientLayer { - type LayeredAccess = observe::MetricsAccessor; +#[derive(Clone)] +struct HistogramConstructor { + buckets: Vec, +} - fn layer(&self, inner: A) -> Self::LayeredAccess { - self.0.layer(inner) +impl MetricConstructor for HistogramConstructor { + fn new_metric(&self) -> Histogram { + Histogram::new(self.buckets.iter().cloned()) } } -#[derive(Debug, Clone)] +#[derive(Clone, Debug)] pub struct PrometheusClientInterceptor { - operation_duration_seconds: Family, - operation_bytes: Family, - operation_errors_total: Family, + operation_duration_seconds: Family, + operation_bytes: Family, + operation_errors_total: Family, + path_label_level: usize, } impl observe::MetricsIntercept for PrometheusClientInterceptor { @@ -147,7 +365,7 @@ impl observe::MetricsIntercept for PrometheusClientInterceptor { scheme: Scheme, namespace: Arc, root: Arc, - _: &str, + path: &str, op: Operation, duration: Duration, ) { @@ -156,8 +374,9 @@ impl observe::MetricsIntercept for PrometheusClientInterceptor { scheme, namespace, root, - path: None, - op, + operation: op, + path: observe::path_label_value(path, self.path_label_level).map(Into::into), + error: None, }) .observe(duration.as_secs_f64()) } @@ -167,7 +386,7 @@ impl observe::MetricsIntercept for PrometheusClientInterceptor { scheme: Scheme, namespace: Arc, root: Arc, - _: &str, + path: &str, op: Operation, bytes: usize, ) { @@ -176,8 +395,9 @@ impl observe::MetricsIntercept for PrometheusClientInterceptor { scheme, namespace, root, - path: None, - op, + operation: op, + path: observe::path_label_value(path, self.path_label_level).map(Into::into), + error: None, }) .observe(bytes as f64) } @@ -187,18 +407,18 @@ impl observe::MetricsIntercept for PrometheusClientInterceptor { scheme: Scheme, namespace: Arc, root: Arc, - _: &str, + path: &str, op: Operation, error: ErrorKind, ) { self.operation_errors_total - .get_or_create(&OperationErrorsTotalLabels { + .get_or_create(&OperationLabels { scheme, namespace, root, - path: None, - op, - error: error.into_static(), + operation: op, + path: observe::path_label_value(path, self.path_label_level).map(Into::into), + error: Some(error.into_static()), }) .inc(); } @@ -209,8 +429,9 @@ struct OperationLabels { scheme: Scheme, namespace: Arc, root: Arc, + operation: Operation, path: Option, - op: Operation, + error: Option<&'static str>, } impl EncodeLabelSet for OperationLabels { @@ -218,34 +439,13 @@ impl EncodeLabelSet for OperationLabels { (observe::LABEL_SCHEME, self.scheme.into_static()).encode(encoder.encode_label())?; (observe::LABEL_NAMESPACE, self.namespace.as_str()).encode(encoder.encode_label())?; (observe::LABEL_ROOT, self.root.as_str()).encode(encoder.encode_label())?; + (observe::LABEL_OPERATION, self.operation.into_static()).encode(encoder.encode_label())?; if let Some(path) = &self.path { (observe::LABEL_PATH, path.as_str()).encode(encoder.encode_label())?; } - (observe::LABEL_OPERATION, self.op.into_static()).encode(encoder.encode_label())?; - Ok(()) - } -} - -#[derive(Clone, Debug, Eq, PartialEq, Hash)] -struct OperationErrorsTotalLabels { - scheme: Scheme, - namespace: Arc, - root: Arc, - path: Option, - op: Operation, - error: &'static str, -} - -impl EncodeLabelSet for OperationErrorsTotalLabels { - fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), fmt::Error> { - (observe::LABEL_SCHEME, self.scheme.into_static()).encode(encoder.encode_label())?; - (observe::LABEL_NAMESPACE, self.namespace.as_str()).encode(encoder.encode_label())?; - (observe::LABEL_ROOT, self.root.as_str()).encode(encoder.encode_label())?; - if let Some(path) = &self.path { - (observe::LABEL_PATH, path.as_str()).encode(encoder.encode_label())?; + if let Some(error) = self.error { + (observe::LABEL_ERROR, error).encode(encoder.encode_label())?; } - (observe::LABEL_OPERATION, self.op.into_static()).encode(encoder.encode_label())?; - (observe::LABEL_ERROR, self.error).encode(encoder.encode_label())?; Ok(()) } }