From baea69cb39d379e9fbb0db8c85d468fe915aece7 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Thu, 19 Oct 2023 15:08:28 -0400 Subject: [PATCH 1/8] enhancement(sources, sinks): add telemetry to http and grpc servers --- Cargo.toml | 2 +- lib/vector-core/src/metrics/label_filter.rs | 27 +++++--- src/api/server.rs | 46 ++++++++++---- src/http.rs | 48 ++++++++++++++- src/internal_events/grpc.rs | 61 ++++++++++++++++++- src/internal_events/http.rs | 31 +++++++++- src/sinks/prometheus/exporter.rs | 27 ++++---- src/sources/aws_kinesis_firehose/mod.rs | 28 ++++++--- src/sources/datadog_agent/mod.rs | 48 +++++++++------ src/sources/opentelemetry/http.rs | 25 +++++--- src/sources/splunk_hec/mod.rs | 25 ++++++-- src/sources/util/grpc/mod.rs | 51 ++++++++++++++-- src/sources/util/http/prelude.rs | 52 ++++++++++------ .../components/sources/internal_metrics.cue | 48 +++++++++++++++ website/cue/reference/urls.cue | 1 + 15 files changed, 419 insertions(+), 101 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ef08f56229f77..baa5d11e744b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -199,7 +199,7 @@ opendal = {version = "0.38", default-features = false, features = ["native-tls", # Tower tower = { version = "0.4.13", default-features = false, features = ["buffer", "limit", "retry", "timeout", "util", "balance", "discover"] } -tower-http = { version = "0.4.4", default-features = false, features = ["decompression-gzip"]} +tower-http = { version = "0.4.4", default-features = false, features = ["decompression-gzip", "trace"]} # Serde serde = { version = "1.0.189", default-features = false, features = ["derive"] } serde-toml-merge = { version = "0.3.3", default-features = false } diff --git a/lib/vector-core/src/metrics/label_filter.rs b/lib/vector-core/src/metrics/label_filter.rs index 1f6d5283e548f..5f6acb40c7371 100644 --- a/lib/vector-core/src/metrics/label_filter.rs +++ b/lib/vector-core/src/metrics/label_filter.rs @@ -5,12 +5,25 @@ use metrics_tracing_context::LabelFilter; pub(crate) struct VectorLabelFilter; impl LabelFilter for VectorLabelFilter { - fn should_include_label(&self, _key: &KeyName, label: &Label) -> bool { - let key = label.key(); - key == "component_id" - || key == "component_type" - || key == "component_kind" - || key == "component_name" - || key == "buffer_type" + fn should_include_label(&self, metric_key: &KeyName, label: &Label) -> bool { + let label_key = label.key(); + // HTTP Server-specific labels + if metric_key.as_str().starts_with("http_server_") + && (label_key == "method" || label_key == "path") + { + return true; + } + // GRPC Server-specific labels + if metric_key.as_str().starts_with("grpc_server_") + && (label_key == "grpc_method" || label_key == "grpc_service") + { + return true; + } + // Global labels + label_key == "component_id" + || label_key == "component_type" + || label_key == "component_kind" + || label_key == "component_name" + || label_key == "buffer_type" } } diff --git a/src/api/server.rs b/src/api/server.rs index 935c0b03ecaa6..e41596e4f67b6 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -9,13 +9,17 @@ use async_graphql::{ Data, Request, Schema, }; use async_graphql_warp::{graphql_protocol, GraphQLResponse, GraphQLWebSocket}; +use hyper::{server::conn::AddrIncoming, service::make_service_fn, Server as HyperServer}; use tokio::runtime::Handle; use tokio::sync::oneshot; +use tower::ServiceBuilder; +use tracing::Span; use warp::{filters::BoxedFilter, http::Response, ws::Ws, Filter, Reply}; use super::{handler, schema, ShutdownTx}; use crate::{ config, + http::build_http_trace_layer, internal_events::{SocketBindError, SocketMode}, topology, }; @@ -39,20 +43,36 @@ impl Server { let (_shutdown, rx) = oneshot::channel(); // warp uses `tokio::spawn` and so needs us to enter the runtime context. let _guard = handle.enter(); - let (addr, server) = warp::serve(routes) - .try_bind_with_graceful_shutdown( - config.api.address.expect("No socket address"), - async { + + let addr = config.api.address.expect("No socket address"); + let incoming = AddrIncoming::bind(&addr).map_err(|error| { + emit!(SocketBindError { + mode: SocketMode::Tcp, + error: &error, + }); + error + })?; + + let span = Span::current(); + let make_svc = make_service_fn(move |_conn| { + let svc = ServiceBuilder::new() + .layer(build_http_trace_layer(span.clone())) + .service(warp::service(routes.clone())); + futures_util::future::ok::<_, Infallible>(svc) + }); + + let server = async move { + HyperServer::builder(incoming) + .serve(make_svc) + .with_graceful_shutdown(async { rx.await.ok(); - }, - ) - .map_err(|error| { - emit!(SocketBindError { - mode: SocketMode::Tcp, - error: &error, - }); - error - })?; + }) + .await + .map_err(|err| { + error!("An error occurred: {:?}.", err); + () + }) + }; // Update component schema with the config before starting the server. schema::components::update_config(config); diff --git a/src/http.rs b/src/http.rs index 84e0364c00515..869b49b6efb09 100644 --- a/src/http.rs +++ b/src/http.rs @@ -2,11 +2,14 @@ use std::{ fmt, task::{Context, Poll}, + time::Duration, }; use futures::future::BoxFuture; use headers::{Authorization, HeaderMapExt}; -use http::{header::HeaderValue, request::Builder, uri::InvalidUri, HeaderMap, Request, Uri}; +use http::{ + header::HeaderValue, request::Builder, uri::InvalidUri, HeaderMap, Request, Response, Uri, +}; use hyper::{ body::{Body, HttpBody}, client, @@ -16,13 +19,17 @@ use hyper_openssl::HttpsConnector; use hyper_proxy::ProxyConnector; use snafu::{ResultExt, Snafu}; use tower::Service; -use tracing::Instrument; +use tower_http::{ + classify::{ServerErrorsAsFailures, SharedClassifier}, + trace::TraceLayer, +}; +use tracing::{Instrument, Span}; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; use crate::{ config::ProxyConfig, - internal_events::http_client, + internal_events::{http_client, HttpServerRequestReceived, HttpServerResponseSent}, tls::{tls_connector_builder, MaybeTlsSettings, TlsError}, }; @@ -338,6 +345,41 @@ pub fn get_http_scheme_from_uri(uri: &Uri) -> &'static str { }) } +/// Builds a [TraceLayer] configured for a HTTP server. +pub fn build_http_trace_layer( + span: Span, +) -> TraceLayer< + SharedClassifier, + impl Fn(&Request) -> Span + Clone, + impl Fn(&Request, &Span) + Clone, + impl Fn(&Response, Duration, &Span) + Clone, + (), + (), + (), +> { + TraceLayer::new_for_http() + .make_span_with(move |request: &Request| { + // This is an error span so that the labels are always present for metrics. + error_span!( + parent: &span, + "http-request", + method = %request.method(), + path = %request.uri().path(), + ) + }) + .on_request(Box::new(|_request: &Request, _span: &Span| { + emit!(HttpServerRequestReceived); + })) + .on_response( + |response: &Response, latency: Duration, _span: &Span| { + emit!(HttpServerResponseSent { response, latency }); + }, + ) + .on_failure(()) + .on_body_chunk(()) + .on_eos(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/internal_events/grpc.rs b/src/internal_events/grpc.rs index e30388ed4710b..556882db993ed 100644 --- a/src/internal_events/grpc.rs +++ b/src/internal_events/grpc.rs @@ -1,7 +1,44 @@ -use metrics::counter; +use std::time::Duration; + +use http::response::Response; +use metrics::{counter, histogram}; +use tonic::Code; use vector_common::internal_event::{error_stage, error_type}; use vector_core::internal_event::InternalEvent; +const GRPC_STATUS_LABEL: &'static str = "grpc_status"; + +#[derive(Debug)] +pub struct GrpcServerRequestReceived; + +impl InternalEvent for GrpcServerRequestReceived { + fn emit(self) { + counter!("grpc_server_messages_received_total", 1); + } +} + +#[derive(Debug)] +pub struct GrpcServerResponseSent<'a, B> { + pub response: &'a Response, + pub latency: Duration, +} + +impl<'a, B> InternalEvent for GrpcServerResponseSent<'a, B> { + fn emit(self) { + let grpc_code = self + .response + .headers() + .get("grpc-status") + // The header value is missing on success. + .map_or(tonic::Code::Ok, |v| tonic::Code::from_bytes(v.as_bytes())); + let grpc_code = grpc_code_to_name(grpc_code); + + let labels = &[(GRPC_STATUS_LABEL, grpc_code)]; + counter!("grpc_server_messages_sent_total", 1, labels); + histogram!("grpc_server_handler_latency_seconds", self.latency, labels); + } +} + #[derive(Debug)] pub struct GrpcInvalidCompressionSchemeError<'a> { pub status: &'a tonic::Status, @@ -48,3 +85,25 @@ where ); } } + +fn grpc_code_to_name(code: Code) -> &'static str { + match code { + Code::Ok => "Ok", + Code::Cancelled => "Cancelled", + Code::Unknown => "Unknown", + Code::InvalidArgument => "InvalidArgument", + Code::DeadlineExceeded => "DeadlineExceeded", + Code::NotFound => "NotFound", + Code::AlreadyExists => "AlreadyExists", + Code::PermissionDenied => "PermissionDenied", + Code::ResourceExhausted => "ResourceExhausted", + Code::FailedPrecondition => "FailedPrecondition", + Code::Aborted => "Aborted", + Code::OutOfRange => "OutOfRange", + Code::Unimplemented => "Unimplemented", + Code::Internal => "Internal", + Code::Unavailable => "Unavailable", + Code::DataLoss => "DataLoss", + Code::Unauthenticated => "Unauthenticated", + } +} diff --git a/src/internal_events/http.rs b/src/internal_events/http.rs index 5243cf47628f5..feabbd2bdd1fb 100644 --- a/src/internal_events/http.rs +++ b/src/internal_events/http.rs @@ -1,5 +1,6 @@ -use std::error::Error; +use std::{error::Error, time::Duration}; +use http::Response; use metrics::{counter, histogram}; use vector_core::internal_event::InternalEvent; @@ -8,6 +9,34 @@ use vector_common::{ json_size::JsonSize, }; +const HTTP_STATUS_LABEL: &'static str = "status"; + +#[derive(Debug)] +pub struct HttpServerRequestReceived; + +impl InternalEvent for HttpServerRequestReceived { + fn emit(self) { + counter!("http_server_requests_received_total", 1); + } +} + +#[derive(Debug)] +pub struct HttpServerResponseSent<'a, B> { + pub response: &'a Response, + pub latency: Duration, +} + +impl<'a, B> InternalEvent for HttpServerResponseSent<'a, B> { + fn emit(self) { + let labels = &[( + HTTP_STATUS_LABEL, + self.response.status().as_u16().to_string(), + )]; + counter!("http_server_responses_sent_total", 1, labels); + histogram!("http_server_handler_latency_seconds", self.latency, labels); + } +} + #[derive(Debug)] pub struct HttpBytesReceived<'a> { pub byte_size: usize, diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 63a882e8dfa98..c4711de68fad5 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -19,6 +19,7 @@ use indexmap::{map::Entry, IndexMap}; use serde_with::serde_as; use snafu::Snafu; use stream_cancel::{Trigger, Tripwire}; +use tower::ServiceBuilder; use tracing::{Instrument, Span}; use vector_config::configurable_component; use vector_core::{ @@ -36,7 +37,7 @@ use crate::{ metric::{Metric, MetricData, MetricKind, MetricSeries, MetricValue}, Event, EventStatus, Finalizable, }, - http::Auth, + http::{build_http_trace_layer, Auth}, internal_events::{PrometheusNormalizationError, PrometheusServerRequestComplete}, sinks::{ util::{ @@ -485,19 +486,21 @@ impl PrometheusExporter { let metrics = Arc::clone(&metrics); let handler = handler.clone(); - async move { - Ok::<_, Infallible>(service_fn(move |req| { - span.in_scope(|| { - let response = handler.handle(req, &metrics); + let inner = service_fn(move |req| { + let response = handler.handle(req, &metrics); - emit!(PrometheusServerRequestComplete { - status_code: response.status(), - }); + emit!(PrometheusServerRequestComplete { + status_code: response.status(), + }); - future::ok::<_, Infallible>(response) - }) - })) - } + future::ok::<_, Infallible>(response) + }); + + let service = ServiceBuilder::new() + .layer(build_http_trace_layer(span.clone())) + .service(inner); + + async move { Ok::<_, Infallible>(service) } }); let (trigger, tripwire) = Tripwire::new(); diff --git a/src/sources/aws_kinesis_firehose/mod.rs b/src/sources/aws_kinesis_firehose/mod.rs index add2787d48037..6ed1f9bf93f57 100644 --- a/src/sources/aws_kinesis_firehose/mod.rs +++ b/src/sources/aws_kinesis_firehose/mod.rs @@ -1,14 +1,15 @@ -use std::{fmt, net::SocketAddr}; +use std::{convert::Infallible, fmt, net::SocketAddr}; use codecs::decoding::{DeserializerConfig, FramingConfig}; use futures::FutureExt; +use hyper::{service::make_service_fn, Server}; use lookup::owned_value_path; +use tower::ServiceBuilder; use tracing::Span; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; use vector_core::config::{LegacyKey, LogNamespace}; use vrl::value::Kind; -use warp::Filter; use crate::{ codecs::DecodingConfig, @@ -16,6 +17,7 @@ use crate::{ GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput, }, + http::build_http_trace_layer, serde::{bool_or_struct, default_decoding, default_framing_message_based}, tls::{MaybeTlsSettings, TlsEnableableConfig}, }; @@ -175,12 +177,22 @@ impl SourceConfig for AwsKinesisFirehoseConfig { let shutdown = cx.shutdown; Ok(Box::pin(async move { let span = Span::current(); - warp::serve(svc.with(warp::trace(move |_info| span.clone()))) - .serve_incoming_with_graceful_shutdown( - listener.accept_stream(), - shutdown.map(|_| ()), - ) - .await; + let make_svc = make_service_fn(move |_conn| { + let svc = ServiceBuilder::new() + .layer(build_http_trace_layer(span.clone())) + .service(warp::service(svc.clone())); + futures_util::future::ok::<_, Infallible>(svc) + }); + + Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + .serve(make_svc) + .with_graceful_shutdown(shutdown.map(|_| ())) + .await + .map_err(|err| { + error!("An error occurred: {:?}.", err); + () + })?; + Ok(()) })) } diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index becaf033a9bb1..1ea9436583398 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -17,6 +17,7 @@ pub(crate) mod ddtrace_proto { include!(concat!(env!("OUT_DIR"), "/dd_trace.rs")); } +use std::convert::Infallible; use std::{fmt::Debug, io::Read, net::SocketAddr, sync::Arc}; use bytes::{Buf, Bytes}; @@ -25,10 +26,13 @@ use codecs::decoding::{DeserializerConfig, FramingConfig}; use flate2::read::{MultiGzDecoder, ZlibDecoder}; use futures::FutureExt; use http::StatusCode; +use hyper::service::make_service_fn; +use hyper::Server; use lookup::owned_value_path; use regex::Regex; use serde::{Deserialize, Serialize}; use snafu::Snafu; +use tower::ServiceBuilder; use tracing::Span; use vector_common::internal_event::{EventsReceived, Registered}; use vector_config::configurable_component; @@ -38,6 +42,7 @@ use vrl::path::OwnedTargetPath; use vrl::value::Kind; use warp::{filters::BoxedFilter, reject::Rejection, reply::Response, Filter, Reply}; +use crate::http::build_http_trace_layer; use crate::{ codecs::{Decoder, DecodingConfig}, config::{ @@ -175,25 +180,32 @@ impl SourceConfig for DatadogAgentConfig { info!(message = "Building HTTP server.", address = %self.address); Ok(Box::pin(async move { + let routes = filters.recover(|r: Rejection| async move { + if let Some(e_msg) = r.find::() { + let json = warp::reply::json(e_msg); + Ok(warp::reply::with_status(json, e_msg.status_code())) + } else { + // other internal error - will return 500 internal server error + Err(r) + } + }); + let span = Span::current(); - let routes = filters - .with(warp::trace(move |_info| span.clone())) - .recover(|r: Rejection| async move { - if let Some(e_msg) = r.find::() { - let json = warp::reply::json(e_msg); - Ok(warp::reply::with_status(json, e_msg.status_code())) - } else { - // other internal error - will return 500 internal server error - Err(r) - } - }); - - warp::serve(routes) - .serve_incoming_with_graceful_shutdown( - listener.accept_stream(), - shutdown.map(|_| ()), - ) - .await; + let make_svc = make_service_fn(move |_conn| { + let svc = ServiceBuilder::new() + .layer(build_http_trace_layer(span.clone())) + .service(warp::service(routes.clone())); + futures_util::future::ok::<_, Infallible>(svc) + }); + + Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + .serve(make_svc) + .with_graceful_shutdown(shutdown.map(|_| ())) + .await + .map_err(|err| { + error!("An error occurred: {:?}.", err); + () + })?; Ok(()) })) diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index be7ebef83ef7a..e36248f61f022 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -1,13 +1,15 @@ -use std::net::SocketAddr; +use std::{convert::Infallible, net::SocketAddr}; use bytes::Bytes; use futures_util::FutureExt; use http::StatusCode; +use hyper::{service::make_service_fn, Server}; use opentelemetry_proto::proto::collector::logs::v1::{ ExportLogsServiceRequest, ExportLogsServiceResponse, }; use prost::Message; use snafu::Snafu; +use tower::ServiceBuilder; use tracing::Span; use vector_common::internal_event::{ ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered, @@ -21,6 +23,7 @@ use warp::{filters::BoxedFilter, reject::Rejection, reply::Response, Filter, Rep use crate::{ event::Event, + http::build_http_trace_layer, internal_events::{EventsReceived, StreamClosedError}, shutdown::ShutdownSignal, sources::util::{decode, ErrorMessage}, @@ -44,17 +47,23 @@ pub(crate) async fn run_http_server( filters: BoxedFilter<(Response,)>, shutdown: ShutdownSignal, ) -> crate::Result<()> { - let span = Span::current(); let listener = tls_settings.bind(&address).await?; - let routes = filters - .with(warp::trace(move |_info| span.clone())) - .recover(handle_rejection); + let routes = filters.recover(handle_rejection); info!(message = "Building HTTP server.", address = %address); - warp::serve(routes) - .serve_incoming_with_graceful_shutdown(listener.accept_stream(), shutdown.map(|_| ())) - .await; + let span = Span::current(); + let make_svc = make_service_fn(move |_conn| { + let svc = ServiceBuilder::new() + .layer(build_http_trace_layer(span.clone())) + .service(warp::service(routes.clone())); + futures_util::future::ok::<_, Infallible>(svc) + }); + + Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + .serve(make_svc) + .with_graceful_shutdown(shutdown.map(|_| ())) + .await?; Ok(()) } diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index ac597caf2be06..e711e7a819859 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -1,5 +1,6 @@ use std::{ collections::HashMap, + convert::Infallible, io::Read, net::{Ipv4Addr, SocketAddr}, sync::Arc, @@ -10,11 +11,13 @@ use chrono::{DateTime, TimeZone, Utc}; use flate2::read::MultiGzDecoder; use futures::FutureExt; use http::StatusCode; +use hyper::{service::make_service_fn, Server}; use lookup::lookup_v2::OptionalValuePath; use lookup::{event_path, owned_value_path}; use serde::Serialize; use serde_json::{de::Read as JsonRead, Deserializer, Value as JsonValue}; use snafu::Snafu; +use tower::ServiceBuilder; use tracing::Span; use vector_common::internal_event::{CountByteSize, InternalEventHandle as _, Registered}; use vector_common::sensitive_string::SensitiveString; @@ -38,6 +41,7 @@ use self::{ use crate::{ config::{log_schema, DataType, Resource, SourceConfig, SourceContext, SourceOutput}, event::{Event, LogEvent, Value}, + http::build_http_trace_layer, internal_events::{ EventsReceived, HttpBytesReceived, SplunkHecRequestBodyInvalidError, SplunkHecRequestError, SplunkHecRequestReceived, @@ -166,12 +170,21 @@ impl SourceConfig for SplunkConfig { Ok(Box::pin(async move { let span = Span::current(); - warp::serve(services.with(warp::trace(move |_info| span.clone()))) - .serve_incoming_with_graceful_shutdown( - listener.accept_stream(), - shutdown.map(|_| ()), - ) - .await; + let make_svc = make_service_fn(move |_conn| { + let svc = ServiceBuilder::new() + .layer(build_http_trace_layer(span.clone())) + .service(warp::service(services.clone())); + futures_util::future::ok::<_, Infallible>(svc) + }); + + Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + .serve(make_svc) + .with_graceful_shutdown(shutdown.map(|_| ())) + .await + .map_err(|err| { + error!("An error occurred: {:?}.", err); + () + })?; Ok(()) })) diff --git a/src/sources/util/grpc/mod.rs b/src/sources/util/grpc/mod.rs index 68c54c5d2a0d6..bb8bfedf2e30d 100644 --- a/src/sources/util/grpc/mod.rs +++ b/src/sources/util/grpc/mod.rs @@ -1,17 +1,22 @@ use crate::{ + internal_events::{GrpcServerRequestReceived, GrpcServerResponseSent}, shutdown::{ShutdownSignal, ShutdownSignalToken}, tls::MaybeTlsSettings, }; use futures::FutureExt; use http::{Request, Response}; use hyper::Body; -use std::{convert::Infallible, net::SocketAddr}; +use std::{convert::Infallible, net::SocketAddr, time::Duration}; use tonic::{ body::BoxBody, transport::server::{NamedService, Server}, }; use tower::Service; -use tracing::{Instrument, Span}; +use tower_http::{ + classify::{GrpcErrorsAsFailures, SharedClassifier}, + trace::TraceLayer, +}; +use tracing::Span; mod decompression; pub use self::decompression::{DecompressionAndMetrics, DecompressionAndMetricsLayer}; @@ -38,7 +43,7 @@ where info!(%address, "Building gRPC server."); Server::builder() - .trace_fn(move |_| span.clone()) + .layer(build_grpc_trace_layer(span.clone())) // This layer explicitly decompresses payloads, if compressed, and reports the number of message bytes we've // received if the message is processed successfully, aka `BytesReceived`. We do this because otherwise the only // access we have is either the event-specific bytes (the in-memory representation) or the raw bytes over the @@ -51,10 +56,48 @@ where .layer(DecompressionAndMetricsLayer) .add_service(service) .serve_with_incoming_shutdown(stream, shutdown.map(|token| tx.send(token).unwrap())) - .in_current_span() .await?; drop(rx.await); Ok(()) } + +pub fn build_grpc_trace_layer( + span: Span, +) -> TraceLayer< + SharedClassifier, + impl Fn(&Request) -> Span + Clone, + impl Fn(&Request, &Span) + Clone, + impl Fn(&Response, Duration, &Span) + Clone, + (), + (), + (), +> { + TraceLayer::new_for_grpc() + .make_span_with(move |request: &Request| { + // The path is defined as “/” {service name} “/” {method name}. + let mut path = request.uri().path().split("/"); + let service = path.nth(1).unwrap_or("_unknown"); + let method = path.next().unwrap_or("_unknown"); + + // This is an error span so that the labels are always present for metrics. + error_span!( + parent: &span, + "grpc-request", + grpc_service = service, + grpc_method = method, + ) + }) + .on_request(Box::new(|_request: &Request, _span: &Span| { + emit!(GrpcServerRequestReceived); + })) + .on_response( + |response: &Response, latency: Duration, _span: &Span| { + emit!(GrpcServerResponseSent { response, latency }); + }, + ) + .on_failure(()) + .on_body_chunk(()) + .on_eos(()) +} diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 3d96d1971c2f2..b688549374670 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -1,10 +1,17 @@ -use std::{collections::HashMap, convert::TryFrom, fmt, net::SocketAddr}; -use vector_core::EstimatedJsonEncodedSizeOf; +use std::{ + collections::HashMap, + convert::{Infallible, TryFrom}, + fmt, + net::SocketAddr, +}; use async_trait::async_trait; use bytes::Bytes; use futures::{FutureExt, TryFutureExt}; +use hyper::{service::make_service_fn, Server}; +use tower::ServiceBuilder; use tracing::Span; +use vector_core::EstimatedJsonEncodedSizeOf; use vector_core::{ config::SourceAcknowledgementsConfig, event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event}, @@ -21,6 +28,7 @@ use warp::{ use crate::{ config::SourceContext, + http::build_http_trace_layer, internal_events::{ HttpBadRequest, HttpBytesReceived, HttpEventsReceived, HttpInternalError, StreamClosedError, }, @@ -76,7 +84,6 @@ pub trait HttpSource: Clone + Send + Sync + 'static { let path = path.to_owned(); let acknowledgements = cx.do_acknowledgements(acknowledgements); Ok(Box::pin(async move { - let span = Span::current(); let mut filter: BoxedFilter<()> = match method { HttpMethod::Head => warp::head().boxed(), HttpMethod::Get => warp::get().boxed(), @@ -155,8 +162,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static { handle_request(events, acknowledgements, response_code, cx.out.clone()) }, - ) - .with(warp::trace(move |_info| span.clone())); + ); let ping = warp::get().and(warp::path("ping")).map(|| "pong"); let routes = svc.or(ping).recover(|r: Rejection| async move { @@ -172,22 +178,30 @@ pub trait HttpSource: Clone + Send + Sync + 'static { } }); + let span = Span::current(); + let make_svc = make_service_fn(move |_conn| { + let svc = ServiceBuilder::new() + .layer(build_http_trace_layer(span.clone())) + .service(warp::service(routes.clone())); + futures_util::future::ok::<_, Infallible>(svc) + }); + info!(message = "Building HTTP server.", address = %address); - match tls.bind(&address).await { - Ok(listener) => { - warp::serve(routes) - .serve_incoming_with_graceful_shutdown( - listener.accept_stream(), - cx.shutdown.map(|_| ()), - ) - .await; - } - Err(error) => { - error!("An error occurred: {:?}.", error); - return Err(()); - } - } + let listener = tls.bind(&address).await.map_err(|err| { + error!("An error occurred: {:?}.", err); + () + })?; + + Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + .serve(make_svc) + .with_graceful_shutdown(cx.shutdown.map(|_| ())) + .await + .map_err(|err| { + error!("An error occurred: {:?}.", err); + () + })?; + Ok(()) })) } diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index e399573e7d91a..f47de7fdcaabb 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -721,6 +721,24 @@ components: sources: internal_metrics: { path: _path } } + grpc_server_messages_received_total: { + description: "The total number of GRPC messages received." + type: "counter" + default_namespace: "vector" + tags: _component_tags & _method & _path + } + grpc_server_messages_sent_total: { + description: "The total number of GRPC messages sent." + type: "counter" + default_namespace: "vector" + tags: _component_tags & _method & _path + } + grpc_server_handler_latency_seconds: { + description: "The duration spent handling a GRPC request." + type: "histogram" + default_namespace: "vector" + tags: _component_tags & _method & _path & _status + } http_bad_requests_total: { description: "The total number of HTTP `400 Bad Request` errors encountered." type: "counter" @@ -775,6 +793,24 @@ components: sources: internal_metrics: { default_namespace: "vector" tags: _component_tags } + http_server_requests_received_total { + description: "The total number of HTTP requests received." + type: "counter" + default_namespace: "vector" + tags: _component_tags & _grpc_method & _grpc_service + } + http_server_responses_sent_total { + description: "The total number of HTTP responses sent." + type: "counter" + default_namespace: "vector" + tags: _component_tags & _grpc_method & _grpc_service & _grpc_status + } + http_server_handler_latency_seconds { + description: "The duration spent handling a HTTP request." + type: "histogram" + default_namespace: "vector" + tags: _component_tags & _grpc_method & _grpc_service & _grpc_status + } invalid_record_total: { description: "The total number of invalid records that have been discarded." type: "counter" @@ -1165,6 +1201,18 @@ components: sources: internal_metrics: { description: "The file that produced the error" required: false } + _grpc_method: { + description: "The name of the method called on the gRPC service." + required: true + } + _grpc_service: { + description: "The gRPC service name." + required: true + } + _grpc_status: { + description: "The human-readable [gRPC status code](\(urls.grpc_status_code))." + required: true + } _host: { description: "The hostname of the originating system." required: true diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index bba6a6d352126..eced20ef2fb51 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -239,6 +239,7 @@ urls: { greptimecloud: "https://greptime.cloud" greptimedb: "https://github.com/greptimeteam/greptimedb" greptimedb_grpc: "https://docs.greptime.com/" + grpc_status_code: "https://grpc.github.io/grpc/core/md_doc_statuscodes.html" grok: "https://github.com/daschl/grok/tree/master/patterns" grok_debugger: "https://grokdebug.herokuapp.com/" grok_patterns: "\(github)/daschl/grok/tree/master/patterns" From 224b1446d93da6f027bf679c0605b94600433f25 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Thu, 19 Oct 2023 16:40:28 -0400 Subject: [PATCH 2/8] fix syntax --- .../cue/reference/components/sources/internal_metrics.cue | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index f47de7fdcaabb..d046f6079735d 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -793,19 +793,19 @@ components: sources: internal_metrics: { default_namespace: "vector" tags: _component_tags } - http_server_requests_received_total { + http_server_requests_received_total: { description: "The total number of HTTP requests received." type: "counter" default_namespace: "vector" tags: _component_tags & _grpc_method & _grpc_service } - http_server_responses_sent_total { + http_server_responses_sent_total: { description: "The total number of HTTP responses sent." type: "counter" default_namespace: "vector" tags: _component_tags & _grpc_method & _grpc_service & _grpc_status } - http_server_handler_latency_seconds { + http_server_handler_latency_seconds: { description: "The duration spent handling a HTTP request." type: "histogram" default_namespace: "vector" From a376dcda3147280e894486ceba72f1a9e9d52828 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Fri, 20 Oct 2023 09:46:49 -0400 Subject: [PATCH 3/8] clippy --- src/api/server.rs | 1 - src/internal_events/grpc.rs | 4 ++-- src/internal_events/http.rs | 2 +- src/sources/aws_kinesis_firehose/mod.rs | 1 - src/sources/datadog_agent/mod.rs | 1 - src/sources/splunk_hec/mod.rs | 1 - src/sources/util/grpc/mod.rs | 2 +- src/sources/util/http/prelude.rs | 2 -- 8 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/api/server.rs b/src/api/server.rs index e41596e4f67b6..244b2c25c9b00 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -70,7 +70,6 @@ impl Server { .await .map_err(|err| { error!("An error occurred: {:?}.", err); - () }) }; diff --git a/src/internal_events/grpc.rs b/src/internal_events/grpc.rs index 556882db993ed..e5d39ba482fba 100644 --- a/src/internal_events/grpc.rs +++ b/src/internal_events/grpc.rs @@ -6,7 +6,7 @@ use tonic::Code; use vector_common::internal_event::{error_stage, error_type}; use vector_core::internal_event::InternalEvent; -const GRPC_STATUS_LABEL: &'static str = "grpc_status"; +const GRPC_STATUS_LABEL: &str = "grpc_status"; #[derive(Debug)] pub struct GrpcServerRequestReceived; @@ -86,7 +86,7 @@ where } } -fn grpc_code_to_name(code: Code) -> &'static str { +const fn grpc_code_to_name(code: Code) -> &'static str { match code { Code::Ok => "Ok", Code::Cancelled => "Cancelled", diff --git a/src/internal_events/http.rs b/src/internal_events/http.rs index feabbd2bdd1fb..8948f1787f79f 100644 --- a/src/internal_events/http.rs +++ b/src/internal_events/http.rs @@ -9,7 +9,7 @@ use vector_common::{ json_size::JsonSize, }; -const HTTP_STATUS_LABEL: &'static str = "status"; +const HTTP_STATUS_LABEL: &str = "status"; #[derive(Debug)] pub struct HttpServerRequestReceived; diff --git a/src/sources/aws_kinesis_firehose/mod.rs b/src/sources/aws_kinesis_firehose/mod.rs index 6ed1f9bf93f57..5ce755b60df25 100644 --- a/src/sources/aws_kinesis_firehose/mod.rs +++ b/src/sources/aws_kinesis_firehose/mod.rs @@ -190,7 +190,6 @@ impl SourceConfig for AwsKinesisFirehoseConfig { .await .map_err(|err| { error!("An error occurred: {:?}.", err); - () })?; Ok(()) diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index 1ea9436583398..aba5caddc791c 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -204,7 +204,6 @@ impl SourceConfig for DatadogAgentConfig { .await .map_err(|err| { error!("An error occurred: {:?}.", err); - () })?; Ok(()) diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index e711e7a819859..5b6cac84a9c3c 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -183,7 +183,6 @@ impl SourceConfig for SplunkConfig { .await .map_err(|err| { error!("An error occurred: {:?}.", err); - () })?; Ok(()) diff --git a/src/sources/util/grpc/mod.rs b/src/sources/util/grpc/mod.rs index bb8bfedf2e30d..b88148ed57091 100644 --- a/src/sources/util/grpc/mod.rs +++ b/src/sources/util/grpc/mod.rs @@ -77,7 +77,7 @@ pub fn build_grpc_trace_layer( TraceLayer::new_for_grpc() .make_span_with(move |request: &Request| { // The path is defined as “/” {service name} “/” {method name}. - let mut path = request.uri().path().split("/"); + let mut path = request.uri().path().split('/'); let service = path.nth(1).unwrap_or("_unknown"); let method = path.next().unwrap_or("_unknown"); diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index b688549374670..aa95ab2da074a 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -190,7 +190,6 @@ pub trait HttpSource: Clone + Send + Sync + 'static { let listener = tls.bind(&address).await.map_err(|err| { error!("An error occurred: {:?}.", err); - () })?; Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) @@ -199,7 +198,6 @@ pub trait HttpSource: Clone + Send + Sync + 'static { .await .map_err(|err| { error!("An error occurred: {:?}.", err); - () })?; Ok(()) From 1ad7afde9de3a3ba86460502fa5fd4dcbfa9125c Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Fri, 20 Oct 2023 15:35:22 -0400 Subject: [PATCH 4/8] respond to feedback and improve docs --- lib/vector-core/src/metrics/label_filter.rs | 2 +- src/http.rs | 2 ++ src/internal_events/grpc.rs | 2 +- src/internal_events/http.rs | 2 +- src/sources/util/grpc/mod.rs | 3 +++ .../components/sinks/prometheus_exporter.cue | 6 ++++++ .../components/sources/aws_kinesis_firehose.cue | 3 +++ .../reference/components/sources/datadog_agent.cue | 6 ++++++ .../cue/reference/components/sources/heroku_logs.cue | 7 +++++-- .../cue/reference/components/sources/http_server.cue | 7 +++++-- .../reference/components/sources/internal_metrics.cue | 10 +++++----- .../reference/components/sources/opentelemetry.cue | 9 +++++++++ .../components/sources/prometheus_remote_write.cue | 11 +++++++---- .../cue/reference/components/sources/splunk_hec.cue | 7 +++++-- website/cue/reference/components/sources/vector.cue | 5 ++++- 15 files changed, 63 insertions(+), 19 deletions(-) diff --git a/lib/vector-core/src/metrics/label_filter.rs b/lib/vector-core/src/metrics/label_filter.rs index 5f6acb40c7371..7b677c04901e9 100644 --- a/lib/vector-core/src/metrics/label_filter.rs +++ b/lib/vector-core/src/metrics/label_filter.rs @@ -13,7 +13,7 @@ impl LabelFilter for VectorLabelFilter { { return true; } - // GRPC Server-specific labels + // gRPC Server-specific labels if metric_key.as_str().starts_with("grpc_server_") && (label_key == "grpc_method" || label_key == "grpc_service") { diff --git a/src/http.rs b/src/http.rs index 869b49b6efb09..c6e3782dbd2b2 100644 --- a/src/http.rs +++ b/src/http.rs @@ -346,6 +346,8 @@ pub fn get_http_scheme_from_uri(uri: &Uri) -> &'static str { } /// Builds a [TraceLayer] configured for a HTTP server. +/// +/// This layer emits HTTP specific telemetry for requests received, responses sent, and handler duration. pub fn build_http_trace_layer( span: Span, ) -> TraceLayer< diff --git a/src/internal_events/grpc.rs b/src/internal_events/grpc.rs index e5d39ba482fba..3b8dbe71a401b 100644 --- a/src/internal_events/grpc.rs +++ b/src/internal_events/grpc.rs @@ -35,7 +35,7 @@ impl<'a, B> InternalEvent for GrpcServerResponseSent<'a, B> { let labels = &[(GRPC_STATUS_LABEL, grpc_code)]; counter!("grpc_server_messages_sent_total", 1, labels); - histogram!("grpc_server_handler_latency_seconds", self.latency, labels); + histogram!("grpc_server_handler_duration_seconds", self.latency, labels); } } diff --git a/src/internal_events/http.rs b/src/internal_events/http.rs index 8948f1787f79f..532d02de4e622 100644 --- a/src/internal_events/http.rs +++ b/src/internal_events/http.rs @@ -33,7 +33,7 @@ impl<'a, B> InternalEvent for HttpServerResponseSent<'a, B> { self.response.status().as_u16().to_string(), )]; counter!("http_server_responses_sent_total", 1, labels); - histogram!("http_server_handler_latency_seconds", self.latency, labels); + histogram!("http_server_handler_duration_seconds", self.latency, labels); } } diff --git a/src/sources/util/grpc/mod.rs b/src/sources/util/grpc/mod.rs index b88148ed57091..66d1036802edf 100644 --- a/src/sources/util/grpc/mod.rs +++ b/src/sources/util/grpc/mod.rs @@ -63,6 +63,9 @@ where Ok(()) } +/// Builds a [TraceLayer] configured for a gRPC server. +/// +/// This layer emits gPRC specific telemetry for messages received/sent and handler duration. pub fn build_grpc_trace_layer( span: Span, ) -> TraceLayer< diff --git a/website/cue/reference/components/sinks/prometheus_exporter.cue b/website/cue/reference/components/sinks/prometheus_exporter.cue index d4b79f88f23c3..7e7916ae2c47b 100644 --- a/website/cue/reference/components/sinks/prometheus_exporter.cue +++ b/website/cue/reference/components/sinks/prometheus_exporter.cue @@ -314,4 +314,10 @@ components: sinks: prometheus_exporter: { """ } } + + telemetry: metrics: { + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total + } } diff --git a/website/cue/reference/components/sources/aws_kinesis_firehose.cue b/website/cue/reference/components/sources/aws_kinesis_firehose.cue index 22adc0ecb28e4..32c446c847887 100644 --- a/website/cue/reference/components/sources/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sources/aws_kinesis_firehose.cue @@ -187,6 +187,9 @@ components: sources: aws_kinesis_firehose: { } telemetry: metrics: { + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total request_read_errors_total: components.sources.internal_metrics.output.metrics.request_read_errors_total requests_received_total: components.sources.internal_metrics.output.metrics.requests_received_total request_automatic_decode_errors_total: components.sources.internal_metrics.output.metrics.request_automatic_decode_errors_total diff --git a/website/cue/reference/components/sources/datadog_agent.cue b/website/cue/reference/components/sources/datadog_agent.cue index 88e5cd9cd429f..9c6b220af87a8 100644 --- a/website/cue/reference/components/sources/datadog_agent.cue +++ b/website/cue/reference/components/sources/datadog_agent.cue @@ -217,4 +217,10 @@ components: sources: datadog_agent: { """ } } + + telemetry: metrics: { + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total + } } diff --git a/website/cue/reference/components/sources/heroku_logs.cue b/website/cue/reference/components/sources/heroku_logs.cue index 038fe51b8164b..5f049a08193d5 100644 --- a/website/cue/reference/components/sources/heroku_logs.cue +++ b/website/cue/reference/components/sources/heroku_logs.cue @@ -101,7 +101,10 @@ components: sources: heroku_logs: { } telemetry: metrics: { - request_read_errors_total: components.sources.internal_metrics.output.metrics.request_read_errors_total - requests_received_total: components.sources.internal_metrics.output.metrics.requests_received_total + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total + request_read_errors_total: components.sources.internal_metrics.output.metrics.request_read_errors_total + requests_received_total: components.sources.internal_metrics.output.metrics.requests_received_total } } diff --git a/website/cue/reference/components/sources/http_server.cue b/website/cue/reference/components/sources/http_server.cue index 87055622b5ec7..36edbaece2683 100644 --- a/website/cue/reference/components/sources/http_server.cue +++ b/website/cue/reference/components/sources/http_server.cue @@ -178,8 +178,11 @@ components: sources: http_server: { ] telemetry: metrics: { - http_bad_requests_total: components.sources.internal_metrics.output.metrics.http_bad_requests_total - parse_errors_total: components.sources.internal_metrics.output.metrics.parse_errors_total + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total + http_bad_requests_total: components.sources.internal_metrics.output.metrics.http_bad_requests_total + parse_errors_total: components.sources.internal_metrics.output.metrics.parse_errors_total } how_it_works: { diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index d046f6079735d..9d2e9799186c9 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -722,19 +722,19 @@ components: sources: internal_metrics: { } } grpc_server_messages_received_total: { - description: "The total number of GRPC messages received." + description: "The total number of gRPC messages received." type: "counter" default_namespace: "vector" tags: _component_tags & _method & _path } grpc_server_messages_sent_total: { - description: "The total number of GRPC messages sent." + description: "The total number of gRPC messages sent." type: "counter" default_namespace: "vector" tags: _component_tags & _method & _path } - grpc_server_handler_latency_seconds: { - description: "The duration spent handling a GRPC request." + grpc_server_handler_duration_seconds: { + description: "The duration spent handling a gRPC request." type: "histogram" default_namespace: "vector" tags: _component_tags & _method & _path & _status @@ -805,7 +805,7 @@ components: sources: internal_metrics: { default_namespace: "vector" tags: _component_tags & _grpc_method & _grpc_service & _grpc_status } - http_server_handler_latency_seconds: { + http_server_handler_duration_seconds: { description: "The duration spent handling a HTTP request." type: "histogram" default_namespace: "vector" diff --git a/website/cue/reference/components/sources/opentelemetry.cue b/website/cue/reference/components/sources/opentelemetry.cue index d86d43faf5ad2..415039485ced6 100644 --- a/website/cue/reference/components/sources/opentelemetry.cue +++ b/website/cue/reference/components/sources/opentelemetry.cue @@ -205,4 +205,13 @@ components: sources: opentelemetry: { """ } } + + telemetry: metrics: { + grpc_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.grpc_server_handler_duration_seconds + grpc_server_messages_received_total: components.sources.internal_metrics.output.metrics.grpc_server_messages_received_total + grpc_server_messages_sent_total: components.sources.internal_metrics.output.metrics.grpc_server_messages_sent_total + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total + } } diff --git a/website/cue/reference/components/sources/prometheus_remote_write.cue b/website/cue/reference/components/sources/prometheus_remote_write.cue index ce004a3595c95..9c6362e6ecf6e 100644 --- a/website/cue/reference/components/sources/prometheus_remote_write.cue +++ b/website/cue/reference/components/sources/prometheus_remote_write.cue @@ -84,9 +84,12 @@ components: sources: prometheus_remote_write: { } telemetry: metrics: { - parse_errors_total: components.sources.internal_metrics.output.metrics.parse_errors_total - requests_completed_total: components.sources.internal_metrics.output.metrics.requests_completed_total - requests_received_total: components.sources.internal_metrics.output.metrics.requests_received_total - request_duration_seconds: components.sources.internal_metrics.output.metrics.request_duration_seconds + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total + parse_errors_total: components.sources.internal_metrics.output.metrics.parse_errors_total + requests_completed_total: components.sources.internal_metrics.output.metrics.requests_completed_total + requests_received_total: components.sources.internal_metrics.output.metrics.requests_received_total + request_duration_seconds: components.sources.internal_metrics.output.metrics.request_duration_seconds } } diff --git a/website/cue/reference/components/sources/splunk_hec.cue b/website/cue/reference/components/sources/splunk_hec.cue index c1334cb3792da..5671387a87fbd 100644 --- a/website/cue/reference/components/sources/splunk_hec.cue +++ b/website/cue/reference/components/sources/splunk_hec.cue @@ -79,8 +79,11 @@ components: sources: splunk_hec: { } telemetry: metrics: { - http_request_errors_total: components.sources.internal_metrics.output.metrics.http_request_errors_total - requests_received_total: components.sources.internal_metrics.output.metrics.requests_received_total + http_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.http_server_handler_duration_seconds + http_server_requests_received_total: components.sources.internal_metrics.output.metrics.http_server_requests_received_total + http_server_responses_sent_total: components.sources.internal_metrics.output.metrics.http_server_responses_sent_total + http_request_errors_total: components.sources.internal_metrics.output.metrics.http_request_errors_total + requests_received_total: components.sources.internal_metrics.output.metrics.requests_received_total } how_it_works: { diff --git a/website/cue/reference/components/sources/vector.cue b/website/cue/reference/components/sources/vector.cue index 87737a272480f..bccc77d956b66 100644 --- a/website/cue/reference/components/sources/vector.cue +++ b/website/cue/reference/components/sources/vector.cue @@ -100,6 +100,9 @@ components: sources: vector: { } telemetry: metrics: { - protobuf_decode_errors_total: components.sources.internal_metrics.output.metrics.protobuf_decode_errors_total + grpc_server_handler_duration_seconds: components.sources.internal_metrics.output.metrics.grpc_server_handler_duration_seconds + grpc_server_messages_received_total: components.sources.internal_metrics.output.metrics.grpc_server_messages_received_total + grpc_server_messages_sent_total: components.sources.internal_metrics.output.metrics.grpc_server_messages_sent_total + protobuf_decode_errors_total: components.sources.internal_metrics.output.metrics.protobuf_decode_errors_total } } From 5228143b01666acfa06315d97797f1f2f76292f2 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Mon, 23 Oct 2023 10:43:38 -0400 Subject: [PATCH 5/8] fmt --- src/sources/util/grpc/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/util/grpc/mod.rs b/src/sources/util/grpc/mod.rs index 66d1036802edf..c1976a4ac8fb5 100644 --- a/src/sources/util/grpc/mod.rs +++ b/src/sources/util/grpc/mod.rs @@ -64,7 +64,7 @@ where } /// Builds a [TraceLayer] configured for a gRPC server. -/// +/// /// This layer emits gPRC specific telemetry for messages received/sent and handler duration. pub fn build_grpc_trace_layer( span: Span, From 1279d03c5cbcbb50842d4ff30596bf738ed3e88a Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Mon, 23 Oct 2023 11:25:59 -0400 Subject: [PATCH 6/8] fmt again --- website/cue/reference/components/sources/internal_metrics.cue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index 9d2e9799186c9..a99eeb01d80d0 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -737,7 +737,7 @@ components: sources: internal_metrics: { description: "The duration spent handling a gRPC request." type: "histogram" default_namespace: "vector" - tags: _component_tags & _method & _path & _status + tags: _component_tags & _method & _path & _status } http_bad_requests_total: { description: "The total number of HTTP `400 Bad Request` errors encountered." From 3a29425b5423fb598d95c3a4293e077b31a52473 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Tue, 24 Oct 2023 13:31:31 -0400 Subject: [PATCH 7/8] fix cue --- .../components/sources/internal_metrics.cue | 34 +++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index a99eeb01d80d0..e719c0dd765fb 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -725,19 +725,30 @@ components: sources: internal_metrics: { description: "The total number of gRPC messages received." type: "counter" default_namespace: "vector" - tags: _component_tags & _method & _path + tags: _component_tags & { + grpc_method: _grpc_method + grpc_service: _grpc_service + } } grpc_server_messages_sent_total: { description: "The total number of gRPC messages sent." type: "counter" default_namespace: "vector" - tags: _component_tags & _method & _path + tags: _component_tags & { + grpc_method: _grpc_method + grpc_service: _grpc_service + grpc_status: _grpc_status + } } grpc_server_handler_duration_seconds: { description: "The duration spent handling a gRPC request." type: "histogram" default_namespace: "vector" - tags: _component_tags & _method & _path & _status + tags: _component_tags & { + grpc_method: _grpc_method + grpc_service: _grpc_service + grpc_status: _grpc_status + } } http_bad_requests_total: { description: "The total number of HTTP `400 Bad Request` errors encountered." @@ -797,19 +808,30 @@ components: sources: internal_metrics: { description: "The total number of HTTP requests received." type: "counter" default_namespace: "vector" - tags: _component_tags & _grpc_method & _grpc_service + tags: _component_tags & { + method: _method + path: _path + } } http_server_responses_sent_total: { description: "The total number of HTTP responses sent." type: "counter" default_namespace: "vector" - tags: _component_tags & _grpc_method & _grpc_service & _grpc_status + tags: _component_tags & { + method: _method + path: _path + status: _status + } } http_server_handler_duration_seconds: { description: "The duration spent handling a HTTP request." type: "histogram" default_namespace: "vector" - tags: _component_tags & _grpc_method & _grpc_service & _grpc_status + tags: _component_tags & { + method: _method + path: _path + status: _status + } } invalid_record_total: { description: "The total number of invalid records that have been discarded." From 040f6f05af36615f0e8093ac57be74a133b5a4d2 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Wed, 25 Oct 2023 16:41:46 -0400 Subject: [PATCH 8/8] remove feature flags from http internal events --- src/internal_events/mod.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 64b8664aeef0d..cbc7b603295ab 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -197,13 +197,6 @@ pub(crate) use self::gcp_pubsub::*; pub(crate) use self::grpc::*; #[cfg(feature = "sources-host_metrics")] pub(crate) use self::host_metrics::*; -#[cfg(any( - feature = "sources-utils-http", - feature = "sources-utils-http-encoding", - feature = "sources-datadog_agent", - feature = "sources-splunk_hec", -))] -pub(crate) use self::http::*; #[cfg(feature = "sources-utils-http-client")] pub(crate) use self::http_client_source::*; #[cfg(feature = "sinks-influxdb")] @@ -268,7 +261,7 @@ pub(crate) use self::websocket::*; pub(crate) use self::windows::*; pub use self::{ adaptive_concurrency::*, batch::*, common::*, conditions::*, encoding_transcode::*, - heartbeat::*, open::*, process::*, socket::*, tcp::*, template::*, udp::*, + heartbeat::*, http::*, open::*, process::*, socket::*, tcp::*, template::*, udp::*, }; // this version won't be needed once all `InternalEvent`s implement `name()`