-
Notifications
You must be signed in to change notification settings - Fork 2.2k
chore(gcp_stackdriver_metrics sink): rewrite to stream based sink #18749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 4 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,160 @@ | ||
| use bytes::Bytes; | ||
| use goauth::scopes::Scope; | ||
| use http::{Request, Uri}; | ||
|
|
||
| use crate::{ | ||
| gcp::{GcpAuthConfig, GcpAuthenticator}, | ||
| http::HttpClient, | ||
| sinks::{ | ||
| gcp, | ||
| prelude::*, | ||
| util::http::{http_response_retry_logic, HttpService, HttpServiceRequestBuilder}, | ||
| }, | ||
| }; | ||
|
|
||
| use super::{ | ||
| request_builder::{StackdriverMetricsEncoder, StackdriverMetricsRequestBuilder}, | ||
| sink::StackdriverMetricsSink, | ||
| StackdriverMetricsDefaultBatchSettings, | ||
| }; | ||
|
|
||
| /// Configuration for the `gcp_stackdriver_metrics` sink. | ||
| #[configurable_component(sink( | ||
| "gcp_stackdriver_metrics", | ||
| "Deliver metrics to GCP's Cloud Monitoring system." | ||
| ))] | ||
| #[derive(Clone, Debug, Default)] | ||
| pub struct StackdriverConfig { | ||
| #[serde(skip, default = "default_endpoint")] | ||
| pub(super) endpoint: String, | ||
|
|
||
| /// The project ID to which to publish metrics. | ||
| /// | ||
| /// See the [Google Cloud Platform project management documentation][project_docs] for more details. | ||
| /// | ||
| /// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects | ||
| pub(super) project_id: String, | ||
|
|
||
| /// The monitored resource to associate the metrics with. | ||
| pub(super) resource: gcp::GcpTypedResource, | ||
|
|
||
| #[serde(flatten)] | ||
| pub(super) auth: GcpAuthConfig, | ||
|
|
||
| /// The default namespace to use for metrics that do not have one. | ||
| /// | ||
| /// Metrics with the same name can only be differentiated by their namespace, and not all | ||
| /// metrics have their own namespace. | ||
| #[serde(default = "default_metric_namespace_value")] | ||
| pub(super) default_namespace: String, | ||
|
|
||
| #[configurable(derived)] | ||
| #[serde(default)] | ||
| pub(super) request: TowerRequestConfig, | ||
|
|
||
| #[configurable(derived)] | ||
| #[serde(default)] | ||
| pub(super) batch: BatchConfig<StackdriverMetricsDefaultBatchSettings>, | ||
|
|
||
| #[configurable(derived)] | ||
| pub(super) tls: Option<TlsConfig>, | ||
|
|
||
| #[configurable(derived)] | ||
| #[serde( | ||
| default, | ||
| deserialize_with = "crate::serde::bool_or_struct", | ||
| skip_serializing_if = "crate::serde::skip_serializing_if_default" | ||
| )] | ||
| pub(super) acknowledgements: AcknowledgementsConfig, | ||
| } | ||
|
|
||
| fn default_metric_namespace_value() -> String { | ||
| "namespace".to_string() | ||
| } | ||
|
|
||
| fn default_endpoint() -> String { | ||
| "https://monitoring.googleapis.com".to_string() | ||
| } | ||
|
|
||
| impl_generate_config_from_default!(StackdriverConfig); | ||
|
|
||
| #[async_trait::async_trait] | ||
| #[typetag::serde(name = "gcp_stackdriver_metrics")] | ||
| impl SinkConfig for StackdriverConfig { | ||
| async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { | ||
| let auth = self.auth.build(Scope::MonitoringWrite).await?; | ||
|
|
||
| let healthcheck = healthcheck().boxed(); | ||
| let started = chrono::Utc::now(); | ||
| let tls_settings = TlsSettings::from_options(&self.tls)?; | ||
| let client = HttpClient::new(tls_settings, cx.proxy())?; | ||
|
|
||
| let batch_settings = self.batch.validate()?.into_batcher_settings()?; | ||
|
|
||
| let request_builder = StackdriverMetricsRequestBuilder { | ||
| encoder: StackdriverMetricsEncoder { | ||
| default_namespace: self.default_namespace.clone(), | ||
| started, | ||
| resource: self.resource.clone(), | ||
| }, | ||
| }; | ||
|
|
||
| let request_limits = self.request.unwrap_with( | ||
| &TowerRequestConfig::default() | ||
| .rate_limit_duration_secs(1) | ||
| .rate_limit_num(1000), | ||
| ); | ||
|
|
||
| let uri: Uri = format!( | ||
| "{}/v3/projects/{}/timeSeries", | ||
| self.endpoint, self.project_id | ||
| ) | ||
| .parse()?; | ||
|
|
||
| auth.spawn_regenerate_token(); | ||
|
|
||
| let stackdriver_logs_service_request_builder = | ||
| StackdriverMetricsServiceRequestBuilder { uri, auth }; | ||
|
|
||
| let service = HttpService::new(client, stackdriver_logs_service_request_builder); | ||
|
|
||
| let service = ServiceBuilder::new() | ||
| .settings(request_limits, http_response_retry_logic()) | ||
| .service(service); | ||
|
|
||
| let sink = StackdriverMetricsSink::new(service, batch_settings, request_builder); | ||
|
|
||
| Ok((VectorSink::from_event_streamsink(sink), healthcheck)) | ||
| } | ||
|
|
||
| fn input(&self) -> Input { | ||
| Input::metric() | ||
| } | ||
|
|
||
| fn acknowledgements(&self) -> &AcknowledgementsConfig { | ||
| &self.acknowledgements | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug, Clone)] | ||
| pub(super) struct StackdriverMetricsServiceRequestBuilder { | ||
| pub(super) uri: Uri, | ||
| pub(super) auth: GcpAuthenticator, | ||
| } | ||
|
|
||
| impl HttpServiceRequestBuilder for StackdriverMetricsServiceRequestBuilder { | ||
| fn build(&self, body: Bytes) -> Request<Bytes> { | ||
| let mut request = Request::post(self.uri.clone()) | ||
| .header("Content-Type", "application/json") | ||
| .body(body) | ||
| .unwrap(); | ||
|
|
||
| self.auth.apply(&mut request); | ||
|
|
||
| request | ||
| } | ||
| } | ||
|
|
||
| async fn healthcheck() -> crate::Result<()> { | ||
| Ok(()) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| // TODO: In order to correctly assert component specification compliance, we would have to do some more advanced mocking | ||
|
StephenWakely marked this conversation as resolved.
Outdated
|
||
| // off the endpoint, which would include also providing a mock OAuth2 endpoint to allow for generating a token from the | ||
| // mocked credentials. Let this TODO serve as a placeholder for doing that in the future. | ||
|
neuronull marked this conversation as resolved.
Outdated
|
||
|
|
||
| use crate::sinks::prelude::*; | ||
|
StephenWakely marked this conversation as resolved.
Outdated
|
||
|
|
||
| mod config; | ||
| mod request_builder; | ||
| mod sink; | ||
| #[cfg(test)] | ||
| mod tests; | ||
|
|
||
| #[derive(Clone, Copy, Debug, Default)] | ||
| pub struct StackdriverMetricsDefaultBatchSettings; | ||
|
|
||
| impl SinkBatchSettings for StackdriverMetricsDefaultBatchSettings { | ||
| const MAX_EVENTS: Option<usize> = Some(1); | ||
| const MAX_BYTES: Option<usize> = None; | ||
| const TIMEOUT_SECS: f64 = 1.0; | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,136 @@ | ||
| use std::io; | ||
|
|
||
| use bytes::Bytes; | ||
| use chrono::Utc; | ||
| use vector_core::event::{Metric, MetricValue}; | ||
|
|
||
| use crate::sinks::{gcp, prelude::*, util::http::HttpRequest}; | ||
|
|
||
| #[derive(Clone, Debug)] | ||
| pub(super) struct StackdriverMetricsRequestBuilder { | ||
| pub(super) encoder: StackdriverMetricsEncoder, | ||
| } | ||
|
|
||
| impl RequestBuilder<Vec<Metric>> for StackdriverMetricsRequestBuilder { | ||
| type Metadata = EventFinalizers; | ||
| type Events = Vec<Metric>; | ||
| type Encoder = StackdriverMetricsEncoder; | ||
| type Payload = Bytes; | ||
| type Request = HttpRequest; | ||
| type Error = io::Error; | ||
|
|
||
| fn compression(&self) -> Compression { | ||
| Compression::None | ||
| } | ||
|
|
||
| fn encoder(&self) -> &Self::Encoder { | ||
| &self.encoder | ||
| } | ||
|
|
||
| fn split_input( | ||
| &self, | ||
| mut events: Vec<Metric>, | ||
| ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { | ||
| let finalizers = events.take_finalizers(); | ||
| let builder = RequestMetadataBuilder::from_events(&events); | ||
| (finalizers, builder, events) | ||
| } | ||
|
|
||
| fn build_request( | ||
| &self, | ||
| metadata: Self::Metadata, | ||
| request_metadata: RequestMetadata, | ||
| payload: EncodeResult<Self::Payload>, | ||
| ) -> Self::Request { | ||
| HttpRequest::new(payload.into_payload(), metadata, request_metadata) | ||
| } | ||
| } | ||
|
|
||
| #[derive(Clone, Debug)] | ||
| pub struct StackdriverMetricsEncoder { | ||
| pub(super) default_namespace: String, | ||
| pub(super) started: chrono::DateTime<Utc>, | ||
| pub(super) resource: gcp::GcpTypedResource, | ||
| } | ||
|
|
||
| impl encoding::Encoder<Vec<Metric>> for StackdriverMetricsEncoder { | ||
| /// Create the object defined [here][api_docs]. | ||
| /// | ||
| /// [api_docs]: https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/create | ||
| fn encode_input( | ||
| &self, | ||
| input: Vec<Metric>, | ||
| writer: &mut dyn io::Write, | ||
| ) -> io::Result<(usize, GroupedCountByteSize)> { | ||
| let mut byte_size = telemetry().create_request_count_byte_size(); | ||
| let time_series = input | ||
| .into_iter() | ||
| .map(|metric| { | ||
| byte_size.add_event(&metric, metric.estimated_json_encoded_size_of()); | ||
|
|
||
| let (series, data, _metadata) = metric.into_parts(); | ||
| let namespace = series | ||
| .name | ||
| .namespace | ||
| .unwrap_or_else(|| self.default_namespace.clone()); | ||
| let metric_type = format!( | ||
| "custom.googleapis.com/{}/metrics/{}", | ||
| namespace, series.name.name | ||
| ); | ||
|
|
||
| let end_time = data.time.timestamp.unwrap_or_else(chrono::Utc::now); | ||
|
|
||
| let (point_value, interval, metric_kind) = match &data.value { | ||
| MetricValue::Counter { value } => { | ||
| let interval = gcp::GcpInterval { | ||
| start_time: Some(self.started), | ||
| end_time, | ||
| }; | ||
|
|
||
| (*value, interval, gcp::GcpMetricKind::Cumulative) | ||
| } | ||
| MetricValue::Gauge { value } => { | ||
| let interval = gcp::GcpInterval { | ||
| start_time: None, | ||
| end_time, | ||
| }; | ||
|
|
||
| (*value, interval, gcp::GcpMetricKind::Gauge) | ||
| } | ||
| _ => unreachable!(), | ||
|
StephenWakely marked this conversation as resolved.
Outdated
|
||
| }; | ||
| let metric_labels = series | ||
| .tags | ||
| .unwrap_or_default() | ||
| .into_iter_single() | ||
| .collect::<std::collections::HashMap<_, _>>(); | ||
|
|
||
| gcp::GcpSerie { | ||
| metric: gcp::GcpMetric { | ||
| r#type: metric_type, | ||
| labels: metric_labels, | ||
| }, | ||
| resource: gcp::GcpResource { | ||
| r#type: self.resource.r#type.clone(), | ||
| labels: self.resource.labels.clone(), | ||
| }, | ||
| metric_kind, | ||
| value_type: gcp::GcpValueType::Int64, | ||
| points: vec![gcp::GcpPoint { | ||
| interval, | ||
| value: gcp::GcpPointValue { | ||
| int64_value: Some(point_value as i64), | ||
| }, | ||
| }], | ||
| } | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| let series = gcp::GcpSeries { | ||
| time_series: &time_series, | ||
| }; | ||
|
|
||
| let body = crate::serde::json::to_bytes(&series).unwrap().freeze(); | ||
| writer.write_all(&body).map(|()| (body.len(), byte_size)) | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.