Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/proxy/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,18 @@ impl Inbound {
let dst = to_canonical(raw_socket.local_addr().expect("local_addr available"));
let network = pi.cfg.network.clone();
let acceptor = crate::tls::InboundAcceptor::new(acceptor.clone());

let socket_labels = metrics::SocketLabels {
reporter: Reporter::destination,
};
pi.metrics.record_socket_open(&socket_labels);
let metrics_for_socket_close = pi.metrics.clone();

let serve_client = async move {
let _socket_guard = metrics::SocketCloseGuard::new(
metrics_for_socket_close,
Reporter::destination,
);
let tls = match acceptor.accept(raw_socket).await {
Ok(tls) => tls,
Err(e) => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll drop the guard here so we track close, but we could record something that indicates why it closed in error cases

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I adjusted the socket metric to be a gauge so I don't know if it makes sense to include any error codes as metric labels now

Expand Down
10 changes: 10 additions & 0 deletions src/proxy/inbound_passthrough.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,17 @@ impl InboundPassthrough {
let pi = self.pi.clone();
match socket {
Ok((stream, remote)) => {
let socket_labels = metrics::SocketLabels {
reporter: Reporter::destination,
};
pi.metrics.record_socket_open(&socket_labels);

let metrics_for_socket_close = pi.metrics.clone();
let serve_client = async move {
let _socket_guard = metrics::SocketCloseGuard::new(
metrics_for_socket_close,
Reporter::destination,
);
debug!(component="inbound passthrough", "connection started");
// Since this task is spawned, make sure we are guaranteed to terminate
tokio::select! {
Expand Down
120 changes: 117 additions & 3 deletions src/proxy/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use prometheus_client::encoding::{
};
use prometheus_client::metrics::counter::{Atomic, Counter};
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;

use tracing::event;
Expand All @@ -40,6 +41,8 @@ use crate::strng::{RichStrng, Strng};
pub struct Metrics {
pub connection_opens: Family<CommonTrafficLabels, Counter>,
pub connection_close: Family<CommonTrafficLabels, Counter>,
pub connection_failures: Family<CommonTrafficLabels, Counter>,
pub open_sockets: Family<SocketLabels, Gauge>,
pub received_bytes: Family<CommonTrafficLabels, Counter>,
pub sent_bytes: Family<CommonTrafficLabels, Counter>,

Expand Down Expand Up @@ -71,6 +74,14 @@ pub enum ResponseFlags {
AuthorizationPolicyDenied,
// connection denied because we could not establish an upstream connection
ConnectionFailure,
// TLS handshake failure
TlsFailure,
// HTTP/2 handshake failure
Http2HandshakeFailure,
// Network policy blocking connection
NetworkPolicyError,
// Identity/certificate error
IdentityError,
}

impl EncodeLabelValue for ResponseFlags {
Expand All @@ -79,6 +90,10 @@ impl EncodeLabelValue for ResponseFlags {
ResponseFlags::None => writer.write_str("-"),
ResponseFlags::AuthorizationPolicyDenied => writer.write_str("DENY"),
ResponseFlags::ConnectionFailure => writer.write_str("CONNECT"),
ResponseFlags::TlsFailure => writer.write_str("TLS_FAILURE"),
ResponseFlags::Http2HandshakeFailure => writer.write_str("H2_HANDSHAKE_FAILURE"),
ResponseFlags::NetworkPolicyError => writer.write_str("NETWORK_POLICY"),
ResponseFlags::IdentityError => writer.write_str("IDENTITY_ERROR"),
}
}
}
Expand Down Expand Up @@ -202,6 +217,12 @@ impl From<ConnectionOpen> for CommonTrafficLabels {
}
}

/// Minimal labels for socket metrics (without direction)
#[derive(Clone, Hash, Default, Debug, PartialEq, Eq, EncodeLabelSet)]
pub struct SocketLabels {
pub reporter: Reporter,
}

#[derive(Clone, Hash, Default, Debug, PartialEq, Eq, EncodeLabelSet)]
pub struct CommonTrafficLabels {
reporter: Reporter,
Expand Down Expand Up @@ -330,14 +351,63 @@ impl Metrics {
on_demand_dns.clone(),
);

let connection_failures = Family::default();
registry.register(
"tcp_connections_failed",
"The total number of TCP connections that failed to establish (unstable)",
connection_failures.clone(),
);

let open_sockets = Family::default();
registry.register(
"tcp_sockets_open",
"The current number of open TCP sockets (unstable)",
open_sockets.clone(),
);

Self {
connection_opens,
connection_close,
received_bytes,
sent_bytes,
on_demand_dns,
connection_failures,
open_sockets,
}
}

pub fn record_socket_open(&self, labels: &SocketLabels) {
self.open_sockets.get_or_create(labels).inc();
}

pub fn record_socket_close(&self, labels: &SocketLabels) {
self.open_sockets.get_or_create(labels).dec();
}
}

/// Guard to ensure socket close is recorded even if task is cancelled
/// This should be created at the start of an async block that handles a socket
/// Stores only the minimal information needed to reconstruct labels, avoiding
/// cloning the large CommonTrafficLabels struct
pub struct SocketCloseGuard {
metrics: Arc<Metrics>,
reporter: Reporter,
}

impl Drop for SocketCloseGuard {
fn drop(&mut self) {
let labels = SocketLabels {
reporter: self.reporter,
};
self.metrics.record_socket_close(&labels);
}
}

impl SocketCloseGuard {
/// Create a new socket close guard
pub fn new(metrics: Arc<Metrics>, reporter: Reporter) -> Self {
Self { metrics, reporter }
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -508,7 +578,7 @@ impl ConnectionResult {
}

// Record our final result, with more details as a response flag.
pub fn record_with_flag<E: std::error::Error>(
pub fn record_with_flag<E: std::error::Error + 'static>(
mut self,
res: Result<(), E>,
flag: ResponseFlags,
Expand All @@ -518,12 +588,44 @@ impl ConnectionResult {
}

// Record our final result.
pub fn record<E: std::error::Error>(mut self, res: Result<(), E>) {
pub fn record<E: std::error::Error + 'static>(mut self, res: Result<(), E>) {
// If no specific flag was set and we have an error, try to infer the failure reason
if self.tl.response_flags == ResponseFlags::None
&& let Err(ref err) = res
{
self.tl.response_flags = Self::extract_failure_reason(err);
}
self.record_internal(res)
}

// Extract failure reason from error type using downcasting
fn extract_failure_reason<E: std::error::Error + 'static>(err: &E) -> ResponseFlags {
use std::any::Any;

// Try to downcast the error itself to proxy::Error
if let Some(proxy_err) = (err as &dyn Any).downcast_ref::<proxy::Error>() {
return match proxy_err {
proxy::Error::Tls(_) => ResponseFlags::TlsFailure,
proxy::Error::Http2Handshake(_) | proxy::Error::H2(_) => {
ResponseFlags::Http2HandshakeFailure
}
proxy::Error::MaybeHBONENetworkPolicyError(_) => ResponseFlags::NetworkPolicyError,
proxy::Error::Identity(_) => ResponseFlags::IdentityError,
proxy::Error::AuthorizationPolicyRejection(_)
| proxy::Error::AuthorizationPolicyLateRejection => {
ResponseFlags::AuthorizationPolicyDenied
}
proxy::Error::ConnectionFailed(_) => ResponseFlags::ConnectionFailure,
_ => ResponseFlags::ConnectionFailure,
};
}

// Default to generic connection failure if we can't identify the error type
ResponseFlags::ConnectionFailure
}

// Internal-only function that takes `&mut` to facilitate Drop. Public consumers must use consuming functions.
fn record_internal<E: std::error::Error>(&mut self, res: Result<(), E>) {
fn record_internal<E: std::error::Error + 'static>(&mut self, res: Result<(), E>) {
debug_assert!(!self.recorded, "record called multiple times");
if self.recorded {
return;
Expand All @@ -534,6 +636,18 @@ impl ConnectionResult {
// Unconditionally record the connection was closed
self.metrics.connection_close.get_or_create(tl).inc();

if matches!(
tl.response_flags,
ResponseFlags::ConnectionFailure
| ResponseFlags::AuthorizationPolicyDenied
| ResponseFlags::TlsFailure
| ResponseFlags::Http2HandshakeFailure
| ResponseFlags::NetworkPolicyError
| ResponseFlags::IdentityError
) {
self.metrics.connection_failures.get_or_create(tl).inc();
}

// Unconditionally write out an access log
let mtls = tl.connection_security_policy == SecurityPolicy::mutual_tls;
let bytes = (
Expand Down
10 changes: 10 additions & 0 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,24 @@ impl Outbound {
let mut force_shutdown = force_shutdown.clone();
match socket {
Ok((stream, _remote)) => {
let socket_labels = metrics::SocketLabels {
reporter: Reporter::source,
};
self.pi.metrics.record_socket_open(&socket_labels);

let mut oc = OutboundConnection {
pi: self.pi.clone(),
id: TraceParent::new(),
pool: pool.clone(),
hbone_port: self.pi.cfg.inbound_addr.port(),
};
let span = info_span!("outbound", id=%oc.id);
let metrics_for_socket_close = self.pi.metrics.clone();
let serve_outbound_connection = async move {
let _socket_guard = metrics::SocketCloseGuard::new(
metrics_for_socket_close,
Reporter::source,
);
debug!(component="outbound", "connection started");
// Since this task is spawned, make sure we are guaranteed to terminate
tokio::select! {
Expand Down
10 changes: 10 additions & 0 deletions src/proxy/socks5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,24 @@ impl Socks5 {
let mut force_shutdown = force_shutdown.clone();
match socket {
Ok((stream, _remote)) => {
let socket_labels = crate::proxy::metrics::SocketLabels {
reporter: crate::proxy::metrics::Reporter::source,
};
self.pi.metrics.record_socket_open(&socket_labels);

let oc = OutboundConnection {
pi: self.pi.clone(),
id: TraceParent::new(),
pool: pool.clone(),
hbone_port: self.pi.cfg.inbound_addr.port(),
};
let span = info_span!("socks5", id=%oc.id);
let metrics_for_socket_close = self.pi.metrics.clone();
let serve = (async move {
let _socket_guard = crate::proxy::metrics::SocketCloseGuard::new(
metrics_for_socket_close,
crate::proxy::metrics::Reporter::source,
);
debug!(component="socks5", "connection started");
// Since this task is spawned, make sure we are guaranteed to terminate
tokio::select! {
Expand Down
2 changes: 1 addition & 1 deletion src/state/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::sync::Arc;
use std::{fmt, net};
use thiserror::Error;
use tokio::sync::watch::{Receiver, Sender};
use tracing::{error, trace};
use tracing::trace;
use xds::istio::workload::ApplicationTunnel as XdsApplicationTunnel;
use xds::istio::workload::GatewayAddress as XdsGatewayAddress;
use xds::istio::workload::Workload as XdsWorkload;
Expand Down
2 changes: 1 addition & 1 deletion src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use serde::Serializer;
use serde::ser::SerializeMap;

use thiserror::Error;
use tracing::{Event, Subscriber, error, field, info, warn};
use tracing::{Event, Subscriber, field, info, warn};
use tracing_appender::non_blocking::NonBlocking;
use tracing_core::Field;
use tracing_core::field::Visit;
Expand Down
2 changes: 0 additions & 2 deletions src/tls/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ use rustls::crypto::CryptoProvider;
use rustls::ClientConfig;
use rustls::ServerConfig;

use tracing::error;

#[async_trait::async_trait]
pub trait ControlPlaneClientCertProvider: Send + Sync {
async fn fetch_cert(&self, alt_hostname: Option<String>) -> Result<ClientConfig, Error>;
Expand Down