From 33b7c17b3a7907d525ef01229291b8950cfa0f71 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 16 Aug 2023 11:31:10 +0100 Subject: [PATCH 01/32] Refactor prometheus remote write sink Signed-off-by: Stephen Wakely --- src/sinks/prometheus/remote_write.rs | 791 ------------------ src/sinks/prometheus/remote_write/config.rs | 206 +++++ .../remote_write/integration_tests.rs | 111 +++ src/sinks/prometheus/remote_write/mod.rs | 55 ++ .../remote_write/request_builder.rs | 198 +++++ src/sinks/prometheus/remote_write/service.rs | 120 +++ src/sinks/prometheus/remote_write/sink.rs | 147 ++++ src/sinks/prometheus/remote_write/tests.rs | 230 +++++ 8 files changed, 1067 insertions(+), 791 deletions(-) delete mode 100644 src/sinks/prometheus/remote_write.rs create mode 100644 src/sinks/prometheus/remote_write/config.rs create mode 100644 src/sinks/prometheus/remote_write/integration_tests.rs create mode 100644 src/sinks/prometheus/remote_write/mod.rs create mode 100644 src/sinks/prometheus/remote_write/request_builder.rs create mode 100644 src/sinks/prometheus/remote_write/service.rs create mode 100644 src/sinks/prometheus/remote_write/sink.rs create mode 100644 src/sinks/prometheus/remote_write/tests.rs diff --git a/src/sinks/prometheus/remote_write.rs b/src/sinks/prometheus/remote_write.rs deleted file mode 100644 index a1e9b792fed4d..0000000000000 --- a/src/sinks/prometheus/remote_write.rs +++ /dev/null @@ -1,791 +0,0 @@ -use std::io::Read; -use std::sync::Arc; -use std::task; - -use aws_credential_types::provider::SharedCredentialsProvider; -use aws_types::region::Region; -use bytes::{Bytes, BytesMut}; -use futures::{future::BoxFuture, stream, FutureExt, SinkExt}; -use http::{Request, Uri}; -use prost::Message; -use snafu::{ResultExt, Snafu}; -use tower::Service; -use vector_config::configurable_component; -use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; - -use super::collector::{self, MetricCollector as _}; -use crate::{ - aws::RegionOrEndpoint, - config::{self, AcknowledgementsConfig, Input, SinkConfig}, - event::{Event, Metric}, - http::{Auth, HttpClient}, - internal_events::{EndpointBytesSent, TemplateRenderingError}, - sinks::{ - self, - prometheus::PrometheusRemoteWriteAuth, - util::{ - batch::BatchConfig, - buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer}, - http::HttpRetryLogic, - uri, EncodedEvent, PartitionBuffer, PartitionInnerBuffer, SinkBatchSettings, - TowerRequestConfig, - }, - }, - template::Template, - tls::{TlsConfig, TlsSettings}, -}; - -#[derive(Clone, Copy, Debug, Default)] -pub struct PrometheusRemoteWriteDefaultBatchSettings; - -impl SinkBatchSettings for PrometheusRemoteWriteDefaultBatchSettings { - const MAX_EVENTS: Option = Some(1_000); - const MAX_BYTES: Option = None; - const TIMEOUT_SECS: f64 = 1.0; -} - -#[derive(Debug, Snafu)] -enum Errors { - #[snafu(display(r#"Prometheus remote_write sink cannot accept "set" metrics"#))] - SetMetricInvalid, - #[snafu(display("aws.region required when AWS authentication is in use"))] - AwsRegionRequired, -} - -/// Configuration for the `prometheus_remote_write` sink. -#[configurable_component(sink( - "prometheus_remote_write", - "Deliver metric data to a Prometheus remote write endpoint." -))] -#[derive(Clone, Debug, Default)] -#[serde(deny_unknown_fields)] -pub struct RemoteWriteConfig { - /// The endpoint to send data to. - /// - /// The endpoint should include the scheme and the path to write to. - #[configurable(metadata(docs::examples = "https://localhost:8087/api/v1/write"))] - pub endpoint: String, - - /// The default namespace for any metrics sent. - /// - /// This namespace is only used if a metric has no existing namespace. When a namespace is - /// present, it is used as a prefix to the metric name, and separated with an underscore (`_`). - /// - /// It should follow the Prometheus [naming conventions][prom_naming_docs]. - /// - /// [prom_naming_docs]: https://prometheus.io/docs/practices/naming/#metric-names - #[configurable(metadata(docs::examples = "service"))] - #[configurable(metadata(docs::advanced))] - pub default_namespace: Option, - - /// Default buckets to use for aggregating [distribution][dist_metric_docs] metrics into histograms. - /// - /// [dist_metric_docs]: https://vector.dev/docs/about/under-the-hood/architecture/data-model/metric/#distribution - #[serde(default = "super::default_histogram_buckets")] - #[configurable(metadata(docs::advanced))] - pub buckets: Vec, - - /// Quantiles to use for aggregating [distribution][dist_metric_docs] metrics into a summary. - /// - /// [dist_metric_docs]: https://vector.dev/docs/about/under-the-hood/architecture/data-model/metric/#distribution - #[serde(default = "super::default_summary_quantiles")] - #[configurable(metadata(docs::advanced))] - pub quantiles: Vec, - - #[configurable(derived)] - #[serde(default)] - pub batch: BatchConfig, - - #[configurable(derived)] - #[serde(default)] - pub request: TowerRequestConfig, - - /// The tenant ID to send. - /// - /// If set, a header named `X-Scope-OrgID` is added to outgoing requests with the value of this setting. - /// - /// This may be used by Cortex or other remote services to identify the tenant making the request. - #[serde(default)] - #[configurable(metadata(docs::examples = "my-domain"))] - #[configurable(metadata(docs::advanced))] - pub tenant_id: Option