diff --git a/src/proxy/inbound.rs b/src/proxy/inbound.rs index 51a41615af..14078c6193 100644 --- a/src/proxy/inbound.rs +++ b/src/proxy/inbound.rs @@ -108,7 +108,18 @@ impl Inbound { let dst = to_canonical(raw_socket.local_addr().expect("local_addr available")); let network = pi.cfg.network.clone(); let acceptor = crate::tls::InboundAcceptor::new(acceptor.clone()); + + let socket_labels = metrics::SocketLabels { + reporter: Reporter::destination, + }; + pi.metrics.record_socket_open(&socket_labels); + let metrics_for_socket_close = pi.metrics.clone(); + let serve_client = async move { + let _socket_guard = metrics::SocketCloseGuard::new( + metrics_for_socket_close, + Reporter::destination, + ); let tls = match acceptor.accept(raw_socket).await { Ok(tls) => tls, Err(e) => { diff --git a/src/proxy/inbound_passthrough.rs b/src/proxy/inbound_passthrough.rs index e2b8b41464..f7daad4a01 100644 --- a/src/proxy/inbound_passthrough.rs +++ b/src/proxy/inbound_passthrough.rs @@ -76,7 +76,17 @@ impl InboundPassthrough { let pi = self.pi.clone(); match socket { Ok((stream, remote)) => { + let socket_labels = metrics::SocketLabels { + reporter: Reporter::destination, + }; + pi.metrics.record_socket_open(&socket_labels); + + let metrics_for_socket_close = pi.metrics.clone(); let serve_client = async move { + let _socket_guard = metrics::SocketCloseGuard::new( + metrics_for_socket_close, + Reporter::destination, + ); debug!(component="inbound passthrough", "connection started"); // Since this task is spawned, make sure we are guaranteed to terminate tokio::select! { diff --git a/src/proxy/metrics.rs b/src/proxy/metrics.rs index 5c0f0ab03f..c7c6d89bae 100644 --- a/src/proxy/metrics.rs +++ b/src/proxy/metrics.rs @@ -23,6 +23,7 @@ use prometheus_client::encoding::{ }; use prometheus_client::metrics::counter::{Atomic, Counter}; use prometheus_client::metrics::family::Family; +use prometheus_client::metrics::gauge::Gauge; use prometheus_client::registry::Registry; use tracing::event; @@ -40,6 +41,8 @@ use crate::strng::{RichStrng, Strng}; pub struct Metrics { pub connection_opens: Family, pub connection_close: Family, + pub connection_failures: Family, + pub open_sockets: Family, pub received_bytes: Family, pub sent_bytes: Family, @@ -71,6 +74,14 @@ pub enum ResponseFlags { AuthorizationPolicyDenied, // connection denied because we could not establish an upstream connection ConnectionFailure, + // TLS handshake failure + TlsFailure, + // HTTP/2 handshake failure + Http2HandshakeFailure, + // Network policy blocking connection + NetworkPolicyError, + // Identity/certificate error + IdentityError, } impl EncodeLabelValue for ResponseFlags { @@ -79,6 +90,10 @@ impl EncodeLabelValue for ResponseFlags { ResponseFlags::None => writer.write_str("-"), ResponseFlags::AuthorizationPolicyDenied => writer.write_str("DENY"), ResponseFlags::ConnectionFailure => writer.write_str("CONNECT"), + ResponseFlags::TlsFailure => writer.write_str("TLS_FAILURE"), + ResponseFlags::Http2HandshakeFailure => writer.write_str("H2_HANDSHAKE_FAILURE"), + ResponseFlags::NetworkPolicyError => writer.write_str("NETWORK_POLICY"), + ResponseFlags::IdentityError => writer.write_str("IDENTITY_ERROR"), } } } @@ -202,6 +217,12 @@ impl From for CommonTrafficLabels { } } +/// Minimal labels for socket metrics (without direction) +#[derive(Clone, Hash, Default, Debug, PartialEq, Eq, EncodeLabelSet)] +pub struct SocketLabels { + pub reporter: Reporter, +} + #[derive(Clone, Hash, Default, Debug, PartialEq, Eq, EncodeLabelSet)] pub struct CommonTrafficLabels { reporter: Reporter, @@ -330,14 +351,63 @@ impl Metrics { on_demand_dns.clone(), ); + let connection_failures = Family::default(); + registry.register( + "tcp_connections_failed", + "The total number of TCP connections that failed to establish (unstable)", + connection_failures.clone(), + ); + + let open_sockets = Family::default(); + registry.register( + "tcp_sockets_open", + "The current number of open TCP sockets (unstable)", + open_sockets.clone(), + ); + Self { connection_opens, connection_close, received_bytes, sent_bytes, on_demand_dns, + connection_failures, + open_sockets, } } + + pub fn record_socket_open(&self, labels: &SocketLabels) { + self.open_sockets.get_or_create(labels).inc(); + } + + pub fn record_socket_close(&self, labels: &SocketLabels) { + self.open_sockets.get_or_create(labels).dec(); + } +} + +/// Guard to ensure socket close is recorded even if task is cancelled +/// This should be created at the start of an async block that handles a socket +/// Stores only the minimal information needed to reconstruct labels, avoiding +/// cloning the large CommonTrafficLabels struct +pub struct SocketCloseGuard { + metrics: Arc, + reporter: Reporter, +} + +impl Drop for SocketCloseGuard { + fn drop(&mut self) { + let labels = SocketLabels { + reporter: self.reporter, + }; + self.metrics.record_socket_close(&labels); + } +} + +impl SocketCloseGuard { + /// Create a new socket close guard + pub fn new(metrics: Arc, reporter: Reporter) -> Self { + Self { metrics, reporter } + } } #[derive(Debug)] @@ -508,7 +578,7 @@ impl ConnectionResult { } // Record our final result, with more details as a response flag. - pub fn record_with_flag( + pub fn record_with_flag( mut self, res: Result<(), E>, flag: ResponseFlags, @@ -518,12 +588,44 @@ impl ConnectionResult { } // Record our final result. - pub fn record(mut self, res: Result<(), E>) { + pub fn record(mut self, res: Result<(), E>) { + // If no specific flag was set and we have an error, try to infer the failure reason + if self.tl.response_flags == ResponseFlags::None + && let Err(ref err) = res + { + self.tl.response_flags = Self::extract_failure_reason(err); + } self.record_internal(res) } + // Extract failure reason from error type using downcasting + fn extract_failure_reason(err: &E) -> ResponseFlags { + use std::any::Any; + + // Try to downcast the error itself to proxy::Error + if let Some(proxy_err) = (err as &dyn Any).downcast_ref::() { + return match proxy_err { + proxy::Error::Tls(_) => ResponseFlags::TlsFailure, + proxy::Error::Http2Handshake(_) | proxy::Error::H2(_) => { + ResponseFlags::Http2HandshakeFailure + } + proxy::Error::MaybeHBONENetworkPolicyError(_) => ResponseFlags::NetworkPolicyError, + proxy::Error::Identity(_) => ResponseFlags::IdentityError, + proxy::Error::AuthorizationPolicyRejection(_) + | proxy::Error::AuthorizationPolicyLateRejection => { + ResponseFlags::AuthorizationPolicyDenied + } + proxy::Error::ConnectionFailed(_) => ResponseFlags::ConnectionFailure, + _ => ResponseFlags::ConnectionFailure, + }; + } + + // Default to generic connection failure if we can't identify the error type + ResponseFlags::ConnectionFailure + } + // Internal-only function that takes `&mut` to facilitate Drop. Public consumers must use consuming functions. - fn record_internal(&mut self, res: Result<(), E>) { + fn record_internal(&mut self, res: Result<(), E>) { debug_assert!(!self.recorded, "record called multiple times"); if self.recorded { return; @@ -534,6 +636,18 @@ impl ConnectionResult { // Unconditionally record the connection was closed self.metrics.connection_close.get_or_create(tl).inc(); + if matches!( + tl.response_flags, + ResponseFlags::ConnectionFailure + | ResponseFlags::AuthorizationPolicyDenied + | ResponseFlags::TlsFailure + | ResponseFlags::Http2HandshakeFailure + | ResponseFlags::NetworkPolicyError + | ResponseFlags::IdentityError + ) { + self.metrics.connection_failures.get_or_create(tl).inc(); + } + // Unconditionally write out an access log let mtls = tl.connection_security_policy == SecurityPolicy::mutual_tls; let bytes = ( diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index e468ed7f6e..03777a0e48 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -90,6 +90,11 @@ impl Outbound { let mut force_shutdown = force_shutdown.clone(); match socket { Ok((stream, _remote)) => { + let socket_labels = metrics::SocketLabels { + reporter: Reporter::source, + }; + self.pi.metrics.record_socket_open(&socket_labels); + let mut oc = OutboundConnection { pi: self.pi.clone(), id: TraceParent::new(), @@ -97,7 +102,12 @@ impl Outbound { hbone_port: self.pi.cfg.inbound_addr.port(), }; let span = info_span!("outbound", id=%oc.id); + let metrics_for_socket_close = self.pi.metrics.clone(); let serve_outbound_connection = async move { + let _socket_guard = metrics::SocketCloseGuard::new( + metrics_for_socket_close, + Reporter::source, + ); debug!(component="outbound", "connection started"); // Since this task is spawned, make sure we are guaranteed to terminate tokio::select! { diff --git a/src/proxy/socks5.rs b/src/proxy/socks5.rs index 9a9eeab3ca..ddbecbf00c 100644 --- a/src/proxy/socks5.rs +++ b/src/proxy/socks5.rs @@ -86,6 +86,11 @@ impl Socks5 { let mut force_shutdown = force_shutdown.clone(); match socket { Ok((stream, _remote)) => { + let socket_labels = crate::proxy::metrics::SocketLabels { + reporter: crate::proxy::metrics::Reporter::source, + }; + self.pi.metrics.record_socket_open(&socket_labels); + let oc = OutboundConnection { pi: self.pi.clone(), id: TraceParent::new(), @@ -93,7 +98,12 @@ impl Socks5 { hbone_port: self.pi.cfg.inbound_addr.port(), }; let span = info_span!("socks5", id=%oc.id); + let metrics_for_socket_close = self.pi.metrics.clone(); let serve = (async move { + let _socket_guard = crate::proxy::metrics::SocketCloseGuard::new( + metrics_for_socket_close, + crate::proxy::metrics::Reporter::source, + ); debug!(component="socks5", "connection started"); // Since this task is spawned, make sure we are guaranteed to terminate tokio::select! { diff --git a/src/state/workload.rs b/src/state/workload.rs index 509f20bc0e..a64dc2d207 100644 --- a/src/state/workload.rs +++ b/src/state/workload.rs @@ -36,7 +36,7 @@ use std::sync::Arc; use std::{fmt, net}; use thiserror::Error; use tokio::sync::watch::{Receiver, Sender}; -use tracing::{error, trace}; +use tracing::trace; use xds::istio::workload::ApplicationTunnel as XdsApplicationTunnel; use xds::istio::workload::GatewayAddress as XdsGatewayAddress; use xds::istio::workload::Workload as XdsWorkload; diff --git a/src/telemetry.rs b/src/telemetry.rs index 0adcd9967e..93c3b00788 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -24,7 +24,7 @@ use serde::Serializer; use serde::ser::SerializeMap; use thiserror::Error; -use tracing::{Event, Subscriber, error, field, info, warn}; +use tracing::{Event, Subscriber, field, info, warn}; use tracing_appender::non_blocking::NonBlocking; use tracing_core::Field; use tracing_core::field::Visit; diff --git a/src/tls/lib.rs b/src/tls/lib.rs index 5f0426813f..f6b73a755d 100644 --- a/src/tls/lib.rs +++ b/src/tls/lib.rs @@ -28,8 +28,6 @@ use rustls::crypto::CryptoProvider; use rustls::ClientConfig; use rustls::ServerConfig; -use tracing::error; - #[async_trait::async_trait] pub trait ControlPlaneClientCertProvider: Send + Sync { async fn fetch_cert(&self, alt_hostname: Option) -> Result;