diff --git a/src/baggage.rs b/src/baggage.rs index 375405dd0d..33d6c2b554 100644 --- a/src/baggage.rs +++ b/src/baggage.rs @@ -18,7 +18,7 @@ use hyper::{ http::HeaderValue, }; -#[derive(Default)] +#[derive(Debug, Default, PartialEq, Eq)] pub struct Baggage { pub cluster_id: Option, pub namespace: Option, @@ -29,6 +29,43 @@ pub struct Baggage { pub zone: Option, } +pub fn baggage_header_val(baggage: &Baggage, workload_type: &str) -> String { + [ + baggage + .cluster_id + .as_ref() + .map(|cluster| format!("k8s.cluster.name={cluster}")), + baggage + .namespace + .as_ref() + .map(|namespace| format!("k8s.namespace.name={namespace}")), + baggage + .workload_name + .as_ref() + .map(|workload| format!("k8s.{workload_type}.name={workload}")), + baggage + .service_name + .as_ref() + .map(|service| format!("service.name={service}")), + baggage + .revision + .as_ref() + .map(|revision| format!("service.version={revision}")), + baggage + .region + .as_ref() + .map(|region| format!("cloud.region={region}")), + baggage + .zone + .as_ref() + .map(|zone| format!("cloud.availability_zone={zone}")), + ] + .into_iter() + .flatten() + .collect::>() + .join(",") +} + pub fn parse_baggage_header(headers: GetAll) -> Result { let mut baggage = Baggage { ..Default::default() @@ -67,8 +104,9 @@ pub mod tests { use hyper::{HeaderMap, http::HeaderValue}; use crate::proxy::BAGGAGE_HEADER; + use crate::strng::Strng; - use super::parse_baggage_header; + use super::{Baggage, baggage_header_val, parse_baggage_header}; #[test] fn baggage_parser() -> anyhow::Result<()> { @@ -90,8 +128,8 @@ pub mod tests { let mut hm = HeaderMap::new(); let baggage_str = "k8s.cluster.name=,k8s.namespace.name=,k8s.deployment.name=,service.name=,service.version="; let header_value = HeaderValue::from_str(baggage_str)?; - let baggage = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; hm.append(BAGGAGE_HEADER, header_value); + let baggage = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; assert_eq!(baggage.cluster_id, None); assert_eq!(baggage.namespace, None); assert_eq!(baggage.workload_name, None); @@ -136,4 +174,37 @@ pub mod tests { assert_eq!(baggage.revision, None); Ok(()) } + + #[test] + fn baggage_header_val_can_be_parsed() -> anyhow::Result<()> { + { + let baggage = Baggage { + ..Default::default() + }; + let mut hm = HeaderMap::new(); + hm.append( + BAGGAGE_HEADER, + HeaderValue::from_str(&baggage_header_val(&baggage, "deployment"))?, + ); + let parsed = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; + assert_eq!(baggage, parsed); + } + { + let baggage = Baggage { + cluster_id: Some(Strng::from("cluster")), + namespace: Some(Strng::from("default")), + workload_name: Some(Strng::from("workload")), + service_name: Some(Strng::from("service")), + ..Default::default() + }; + let mut hm = HeaderMap::new(); + hm.append( + BAGGAGE_HEADER, + HeaderValue::from_str(&baggage_header_val(&baggage, "deployment"))?, + ); + let parsed = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; + assert_eq!(baggage, parsed); + } + Ok(()) + } } diff --git a/src/config.rs b/src/config.rs index 2918f143cf..7fba5455a1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -115,6 +115,7 @@ const PROXY_MODE_DEDICATED: &str = "dedicated"; const PROXY_MODE_SHARED: &str = "shared"; const LOCALHOST_APP_TUNNEL: &str = "LOCALHOST_APP_TUNNEL"; +const ENABLE_ENHANCED_BAGGAGE: &str = "ENABLE_RESPONSE_BAGGAGE"; #[derive(serde::Serialize, Clone, Debug, PartialEq, Eq)] pub enum RootCert { @@ -316,6 +317,7 @@ pub struct Config { // path to CRL file; if set, enables CRL checking pub crl_path: Option, + pub enable_enhanced_baggage: bool, } #[derive(serde::Serialize, Clone, Copy, Debug)] @@ -875,6 +877,7 @@ pub fn construct_config(pc: ProxyConfig) -> Result { .ok() .filter(|s| !s.is_empty()) .map(PathBuf::from), + enable_enhanced_baggage: parse_default(ENABLE_ENHANCED_BAGGAGE, true)?, }) } diff --git a/src/copy.rs b/src/copy.rs index 13e7fe9102..b660512580 100644 --- a/src/copy.rs +++ b/src/copy.rs @@ -12,9 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::proxy; -use crate::proxy::ConnectionResult; use crate::proxy::Error::{BackendDisconnected, ClientDisconnected, ReceiveError, SendError}; +use crate::proxy::{self, ConnectionResult}; use bytes::{Buf, Bytes, BytesMut}; use pin_project_lite::pin_project; use std::future::Future; @@ -416,9 +415,9 @@ mod tests { let metrics = std::sync::Arc::new(crate::proxy::Metrics::new( crate::metrics::sub_registry(&mut registry), )); - let source_addr = "127.0.0.1:12345".parse().unwrap(); - let dest_addr = "127.0.0.1:34567".parse().unwrap(); - let cr = ConnectionResult::new( + let source_addr: std::net::SocketAddr = "127.0.0.1:12345".parse().unwrap(); + let dest_addr: std::net::SocketAddr = "127.0.0.1:34567".parse().unwrap(); + let cr = crate::proxy::metrics::ConnectionResultBuilder::new( source_addr, dest_addr, None, @@ -432,7 +431,8 @@ mod tests { destination_service: None, }, metrics.clone(), - ); + ) + .build(); copy_bidirectional(ztunnel_downsteam, ztunnel_upsteam, &cr).await }); const ITERS: usize = 1000; @@ -461,9 +461,9 @@ mod tests { let metrics = std::sync::Arc::new(crate::proxy::Metrics::new( crate::metrics::sub_registry(&mut registry), )); - let source_addr = "127.0.0.1:12345".parse().unwrap(); - let dest_addr = "127.0.0.1:34567".parse().unwrap(); - let cr = ConnectionResult::new( + let source_addr: std::net::SocketAddr = "127.0.0.1:12345".parse().unwrap(); + let dest_addr: std::net::SocketAddr = "127.0.0.1:34567".parse().unwrap(); + let cr = crate::proxy::metrics::ConnectionResultBuilder::new( source_addr, dest_addr, None, @@ -477,7 +477,8 @@ mod tests { destination_service: None, }, metrics.clone(), - ); + ) + .build(); copy_bidirectional(WeirdIO(ztunnel_downsteam), WeirdIO(ztunnel_upsteam), &cr).await }); const WRITES: usize = 2560; diff --git a/src/proxy/h2/client.rs b/src/proxy/h2/client.rs index 4c768d537e..37471535fe 100644 --- a/src/proxy/h2/client.rs +++ b/src/proxy/h2/client.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::baggage::{Baggage, parse_baggage_header}; use crate::config; use crate::identity::Identity; -use crate::proxy::Error; +use crate::proxy::{BAGGAGE_HEADER, Error}; use bytes::{Buf, Bytes}; use h2::SendStream; use h2::client::{Connection, SendRequest}; @@ -101,10 +102,10 @@ impl H2ConnectClient { pub async fn send_request( &mut self, req: http::Request<()>, - ) -> Result { + ) -> Result<(crate::proxy::h2::H2Stream, Option), Error> { let cur = self.stream_count.fetch_add(1, Ordering::SeqCst); trace!(current_streams = cur, "sending request"); - let (send, recv) = match self.internal_send(req).await { + let (send, recv, baggage) = match self.internal_send(req).await { Ok(r) => r, Err(e) => { // Request failed, so drop the stream now @@ -123,14 +124,14 @@ impl H2ConnectClient { _dropped: dropped2, }; let h2 = crate::proxy::h2::H2Stream { read, write }; - Ok(h2) + Ok((h2, baggage)) } // helper to allow us to handle errors once async fn internal_send( &mut self, req: Request<()>, - ) -> Result<(SendStream, h2::RecvStream), Error> { + ) -> Result<(SendStream, h2::RecvStream, Option), Error> { // "This function must return `Ready` before `send_request` is called" // We should always be ready though, because we make sure we don't go over the max stream limit out of band. futures::future::poll_fn(|cx| self.sender.poll_ready(cx)).await?; @@ -139,7 +140,8 @@ impl H2ConnectClient { if response.status() != 200 { return Err(Error::HttpStatus(response.status())); } - Ok((stream, response.into_body())) + let baggage = parse_baggage_header(response.headers().get_all(BAGGAGE_HEADER)).ok(); + Ok((stream, response.into_body(), baggage)) } } diff --git a/src/proxy/inbound.rs b/src/proxy/inbound.rs index 4df849fc62..a4f6255eb4 100644 --- a/src/proxy/inbound.rs +++ b/src/proxy/inbound.rs @@ -22,8 +22,11 @@ use tokio::sync::watch; use tracing::{Instrument, debug, error, info, info_span, trace_span}; -use super::{ConnectionResult, Error, HboneAddress, LocalWorkloadInformation, ResponseFlags, util}; -use crate::baggage::parse_baggage_header; +use super::{ + ConnectionResult, ConnectionResultBuilder, Error, HboneAddress, LocalWorkloadInformation, + ResponseFlags, util, +}; +use crate::baggage::{baggage_header_val, parse_baggage_header}; use crate::identity::Identity; use crate::config::Config; @@ -217,7 +220,9 @@ impl Inbound { // At this point in processing, we never built up full context to log a complete access log. // Instead, just log a minimal error line. metrics::log_early_deny(src, dst, Reporter::destination, e); - if let Err(err) = req.send_error(build_response(code)) { + if let Err(err) = + req.send_error(build_response(code, None, pi.cfg.enable_enhanced_baggage)) + { tracing::warn!("failed to send HTTP response: {err}"); } return; @@ -291,7 +296,9 @@ impl Inbound { Ok(res) => res, Err(InboundFlagError(err, flag, code)) => { ri.result_tracker.record_with_flag(Err(err), flag); - if let Err(err) = req.send_error(build_response(code)) { + if let Err(err) = + req.send_error(build_response(code, None, pi.cfg.enable_enhanced_baggage)) + { tracing::warn!("failed to send HTTP response: {err}"); } return; @@ -308,7 +315,11 @@ impl Inbound { // See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt for more information about the // proxy protocol. let send = req - .send_response(build_response(StatusCode::OK)) + .send_response(build_response( + StatusCode::OK, + Some(ri.destination_workload.as_ref()), + pi.cfg.enable_enhanced_baggage, + )) .and_then(|h2_stream| async { if let Some(TunnelRequest { protocol: Protocol::PROXY, @@ -389,8 +400,11 @@ impl Inbound { }; let for_host = parse_forwarded_host(req); - let baggage = - parse_baggage_header(req.headers().get_all(BAGGAGE_HEADER)).unwrap_or_default(); + let baggage = if pi.cfg.enable_enhanced_baggage { + parse_baggage_header(req.headers().get_all(BAGGAGE_HEADER)).unwrap_or_default() + } else { + Default::default() + }; // We assume it is from gateway if it's a hostname request. // We may need a more explicit indicator in the future. @@ -407,7 +421,9 @@ impl Inbound { debug!("request from gateway"); } let source = match from_gateway { - true => None, // we cannot lookup source workload since we don't know the network, see https://github.com/istio/ztunnel/issues/515 + // we cannot lookup source workload since we don't know the network, see https://github.com/istio/ztunnel/issues/515. + // Instead, we will use baggage + true => None, false => { let src_network_addr = NetworkAddress { // we can assume source network is our network because we did not traverse a gateway @@ -419,15 +435,22 @@ impl Inbound { } }; - let derived_source = metrics::DerivedWorkload { - identity: rbac_ctx.conn.src_identity.clone(), - cluster_id: baggage.cluster_id, - region: baggage.region, - zone: baggage.zone, - namespace: baggage.namespace, - app: baggage.service_name, - workload_name: baggage.workload_name, - revision: baggage.revision, + let derived_source = if pi.cfg.enable_enhanced_baggage { + metrics::DerivedWorkload { + identity: rbac_ctx.conn.src_identity.clone(), + cluster_id: baggage.cluster_id, + region: baggage.region, + zone: baggage.zone, + namespace: baggage.namespace, + app: baggage.service_name, + workload_name: baggage.workload_name, + revision: baggage.revision, + } + } else { + metrics::DerivedWorkload { + identity: rbac_ctx.conn.src_identity.clone(), + ..Default::default() + } }; let ds = proxy::guess_inbound_service( &rbac_ctx.conn, @@ -435,7 +458,7 @@ impl Inbound { upstream_service, &destination_workload, ); - let result_tracker = Box::new(metrics::ConnectionResult::new( + let connection_result_builder = ConnectionResultBuilder::new( rbac_ctx.conn.src, // For consistency with outbound logs, report the original destination (with 15008 port) // as dst.addr, and the target address as dst.hbone_addr @@ -446,18 +469,21 @@ impl Inbound { reporter: Reporter::destination, source, derived_source: Some(derived_source), - destination: Some(destination_workload), + destination: Some(destination_workload.clone()), connection_security_policy: metrics::SecurityPolicy::mutual_tls, destination_service: ds, }, pi.metrics.clone(), - )); + ); + + let result_tracker = Box::new(connection_result_builder.build()); Ok(InboundRequest { for_host, rbac_ctx, result_tracker, upstream_addr, tunnel_request, + destination_workload, }) } @@ -683,6 +709,7 @@ struct InboundRequest { result_tracker: Box, upstream_addr: SocketAddr, tunnel_request: Option, + destination_workload: Arc, } /// InboundError represents an error with an associated status code. @@ -726,9 +753,24 @@ pub fn parse_forwarded_host(req: &T) -> Option { .and_then(proxy::parse_forwarded_host) } -fn build_response(status: StatusCode) -> Response<()> { - Response::builder() - .status(status) +// Second argument is local workload and cluster name +fn build_response( + status: StatusCode, + local_wl: Option<&Workload>, + enable_response_baggage: bool, +) -> Response<()> { + let mut builder = Response::builder().status(status); + + if let Some(local_wl) = local_wl + && enable_response_baggage + { + builder = builder.header( + BAGGAGE_HEADER, + baggage_header_val(&local_wl.baggage(), &local_wl.workload_type), + ) + } + + builder .body(()) .expect("builder with known status code should not fail") } @@ -1072,4 +1114,87 @@ mod tests { }) } } + + #[test] + fn test_build_response_baggage_feature_gate() { + use super::build_response; + use crate::proxy::BAGGAGE_HEADER; + use crate::test_helpers; + use http::StatusCode; + + // Create a test workload + let workload = test_helpers::test_default_workload(); + + // Test with baggage enabled + let mut config_enabled = test_helpers::test_config(); + config_enabled.enable_enhanced_baggage = true; + + let response_enabled = build_response( + StatusCode::OK, + Some(&workload), + config_enabled.enable_enhanced_baggage, + ); + assert!(response_enabled.headers().contains_key(BAGGAGE_HEADER)); + + let baggage_header = response_enabled.headers().get(BAGGAGE_HEADER).unwrap(); + let baggage_value = baggage_header.to_str().unwrap(); + // Check that baggage header contains cluster_id from the test workload + assert!(baggage_value.contains("k8s.cluster.name=Kubernetes")); + + // Test with baggage disabled + let mut config_disabled = test_helpers::test_config(); + config_disabled.enable_enhanced_baggage = false; + + let response_disabled = build_response( + StatusCode::OK, + Some(&workload), + config_disabled.enable_enhanced_baggage, + ); + assert!(!response_disabled.headers().contains_key(BAGGAGE_HEADER)); + + // Test with None workload (should not have baggage regardless of config) + let response_no_workload = + build_response(StatusCode::OK, None, config_enabled.enable_enhanced_baggage); + assert!(!response_no_workload.headers().contains_key(BAGGAGE_HEADER)); + } + + #[test] + fn test_incoming_baggage_parsing_feature_gate() { + use crate::baggage::{Baggage, parse_baggage_header}; + use crate::proxy::BAGGAGE_HEADER; + use crate::test_helpers; + use http::{HeaderMap, HeaderValue}; + + // Create mock baggage header + let mut headers = HeaderMap::new(); + headers.insert(BAGGAGE_HEADER, HeaderValue::from_str("k8s.cluster.name=test-cluster,k8s.namespace.name=test-ns,k8s.deployment.name=test-app").unwrap()); + + // Test with baggage enabled + let config_enabled = test_helpers::test_config(); + assert!(config_enabled.enable_enhanced_baggage); // Default should be true + + let baggage_enabled = if config_enabled.enable_enhanced_baggage { + parse_baggage_header(headers.get_all(BAGGAGE_HEADER)).unwrap_or_default() + } else { + Baggage::default() + }; + + assert_eq!(baggage_enabled.cluster_id, Some("test-cluster".into())); + assert_eq!(baggage_enabled.namespace, Some("test-ns".into())); + assert_eq!(baggage_enabled.workload_name, Some("test-app".into())); + + // Test with baggage disabled + let mut config_disabled = test_helpers::test_config(); + config_disabled.enable_enhanced_baggage = false; + + let baggage_disabled = if config_disabled.enable_enhanced_baggage { + parse_baggage_header(headers.get_all(BAGGAGE_HEADER)).unwrap_or_default() + } else { + Baggage::default() + }; + + assert_eq!(baggage_disabled.cluster_id, None); + assert_eq!(baggage_disabled.namespace, None); + assert_eq!(baggage_disabled.workload_name, None); + } } diff --git a/src/proxy/inbound_passthrough.rs b/src/proxy/inbound_passthrough.rs index f7daad4a01..75629b16b5 100644 --- a/src/proxy/inbound_passthrough.rs +++ b/src/proxy/inbound_passthrough.rs @@ -187,21 +187,24 @@ impl InboundPassthrough { upstream_services, &upstream_workload, ); - let result_tracker = Box::new(metrics::ConnectionResult::new( - source_addr, - dest_addr, - None, - start, - metrics::ConnectionOpen { - reporter: Reporter::destination, - source: source_workload, - derived_source: Some(derived_source), - destination: Some(upstream_workload), - connection_security_policy: metrics::SecurityPolicy::unknown, - destination_service: ds, - }, - pi.metrics.clone(), - )); + let result_tracker = Box::new( + metrics::ConnectionResultBuilder::new( + source_addr, + dest_addr, + None, + start, + metrics::ConnectionOpen { + reporter: Reporter::destination, + source: source_workload, + derived_source: Some(derived_source), + destination: Some(upstream_workload), + connection_security_policy: metrics::SecurityPolicy::unknown, + destination_service: ds, + }, + pi.metrics.clone(), + ) + .build(), + ); let mut conn_guard = match pi .connection_manager diff --git a/src/proxy/metrics.rs b/src/proxy/metrics.rs index c7c6d89bae..1231be98c6 100644 --- a/src/proxy/metrics.rs +++ b/src/proxy/metrics.rs @@ -198,6 +198,26 @@ impl CommonTrafficLabels { self.destination_service_namespace = w.namespace.clone().into(); self } + + fn with_derived_destination(mut self, w: Option<&DerivedWorkload>) -> Self { + let Some(w) = w else { return self }; + self.destination_workload = w.workload_name.clone().into(); + self.destination_canonical_service = w.app.clone().into(); + self.destination_canonical_revision = w.revision.clone().into(); + self.destination_workload_namespace = w.namespace.clone().into(); + self.destination_app = w.workload_name.clone().into(); + self.destination_version = w.revision.clone().into(); + self.destination_cluster = w.cluster_id.clone().into(); + // This is the identity from the TLS handshake; this is the most trustworthy source so use it + self.destination_principal = w.identity.clone().into(); + + let mut local = self.locality.0.unwrap_or_default(); + local.destination_region = w.region.clone().into(); + local.destination_zone = w.zone.clone().into(); + self.locality = OptionallyEncode(Some(local)); + + self + } } impl From for CommonTrafficLabels { @@ -414,29 +434,29 @@ impl SocketCloseGuard { /// ConnectionResult abstracts recording a metric and emitting an access log upon a connection completion pub struct ConnectionResult { // Src address and name - src: (SocketAddr, Option), + pub(crate) src: (SocketAddr, Option), // Dst address and name - dst: (SocketAddr, Option), - hbone_target: Option, - start: Instant, + pub(crate) dst: (SocketAddr, Option), + pub(crate) hbone_target: Option, + pub(crate) start: Instant, // TODO: storing CommonTrafficLabels adds ~600 bytes retained throughout a connection life time. // We can pre-fetch the metrics we need at initialization instead of storing this, then keep a more // efficient representation for the fields we need to log. Ideally, this would even be optional // in case logs were disabled. - tl: CommonTrafficLabels, - metrics: Arc, + pub(crate) tl: CommonTrafficLabels, + pub(crate) metrics: Arc, // sent records the number of bytes sent on this connection - sent: AtomicU64, + pub(crate) sent: AtomicU64, // sent_metric records the number of bytes sent on this connection to the aggregated metric counter - sent_metric: Counter, + pub(crate) sent_metric: Counter, // recv records the number of bytes received on this connection - recv: AtomicU64, + pub(crate) recv: AtomicU64, // recv_metric records the number of bytes received on this connection to the aggregated metric counter - recv_metric: Counter, + pub(crate) recv_metric: Counter, // Have we recorded yet? - recorded: bool, + pub(crate) recorded: bool, } // log_early_deny allows logging a connection is denied before we have enough information to emit proper @@ -493,7 +513,17 @@ macro_rules! access_log { } }; } -impl ConnectionResult { + +pub struct ConnectionResultBuilder { + src: (SocketAddr, Option), + dst: (SocketAddr, Option), + hbone_target: Option, + start: Instant, + tl: CommonTrafficLabels, + metrics: Arc, +} + +impl ConnectionResultBuilder { pub fn new( src: SocketAddr, dst: SocketAddr, @@ -511,30 +541,61 @@ impl ConnectionResult { conn.destination.as_ref().map(|wl| wl.name.clone().into()), ); let tl = CommonTrafficLabels::from(conn); - metrics.connection_opens.get_or_create(&tl).inc(); - - let mtls = tl.connection_security_policy == SecurityPolicy::mutual_tls; src.1 = src.1.or(tl.source_canonical_service.clone().inner()); dst.1 = dst.1.or(tl.destination_canonical_service.clone().inner()); + Self { + src, + dst, + hbone_target, + start, + tl, + metrics, + } + } + + pub fn with_derived_source(mut self, w: &DerivedWorkload) -> Self { + self.tl = self.tl.with_derived_source(Some(w)); + self.src.1 = w.workload_name.clone().map(RichStrng::from); + self + } + + pub fn with_derived_destination(mut self, w: &DerivedWorkload) -> Self { + self.tl = self.tl.with_derived_destination(Some(w)); + self.dst.1 = w.workload_name.clone().map(RichStrng::from); + self + } + + pub fn build(self) -> ConnectionResult { + // Grab the metrics with our labels now, so we don't need to fetch them each time. + // The inner metric is an Arc so clone is fine/cheap. + // With the raw Counter, we increment is a simple atomic add operation (~1ns). + // Fetching the metric itself is ~300ns; fast, but we call it on each read/write so it would + // add up. + let sent_metric = self.metrics.sent_bytes.get_or_create(&self.tl).clone(); + let recv_metric = self.metrics.received_bytes.get_or_create(&self.tl).clone(); + let sent = atomic::AtomicU64::new(0); + let recv = atomic::AtomicU64::new(0); + + let mtls = self.tl.connection_security_policy == SecurityPolicy::mutual_tls; event!( target: "access", parent: None, tracing::Level::DEBUG, - src.addr = %src.0, - src.workload = src.1.as_deref().map(to_value), - src.namespace = tl.source_workload_namespace.to_value(), - src.identity = tl.source_principal.as_ref().filter(|_| mtls).map(to_value_owned), + src.addr = %self.src.0, + src.workload = self.src.1.as_deref().map(to_value), + src.namespace = self.tl.source_workload_namespace.to_value(), + src.identity = self.tl.source_principal.as_ref().filter(|_| mtls).map(to_value_owned), - dst.addr = %dst.0, - dst.hbone_addr = hbone_target.as_ref().map(display), - dst.service = tl.destination_service.to_value(), - dst.workload = dst.1.as_deref().map(to_value), - dst.namespace = tl.destination_workload_namespace.to_value(), - dst.identity = tl.destination_principal.as_ref().filter(|_| mtls).map(to_value_owned), + dst.addr = %self.dst.0, + dst.hbone_addr = self.hbone_target.as_ref().map(display), + dst.service = self.tl.destination_service.to_value(), + dst.workload = self.dst.1.as_deref().map(to_value), + dst.namespace = self.tl.destination_workload_namespace.to_value(), + dst.identity = self.tl.destination_principal.as_ref().filter(|_| mtls).map(to_value_owned), - direction = if tl.reporter == Reporter::source { + direction = if self.tl.reporter == Reporter::source { "outbound" } else { "inbound" @@ -542,23 +603,15 @@ impl ConnectionResult { "connection opened" ); - // Grab the metrics with our labels now, so we don't need to fetch them each time. - // The inner metric is an Arc so clone is fine/cheap. - // With the raw Counter, we increment is a simple atomic add operation (~1ns). - // Fetching the metric itself is ~300ns; fast, but we call it on each read/write so it would - // add up. - let sent_metric = metrics.sent_bytes.get_or_create(&tl).clone(); - let recv_metric = metrics.received_bytes.get_or_create(&tl).clone(); - let sent = atomic::AtomicU64::new(0); - let recv = atomic::AtomicU64::new(0); - Self { - src, - dst, - hbone_target, - start, - tl, - metrics, + self.metrics.connection_opens.get_or_create(&self.tl).inc(); + ConnectionResult { + src: self.src, + dst: self.dst, + hbone_target: self.hbone_target, + start: self.start, + tl: self.tl, + metrics: self.metrics, sent, sent_metric, recv, @@ -566,7 +619,9 @@ impl ConnectionResult { recorded: false, } } +} +impl ConnectionResult { pub fn increment_send(&self, res: u64) { self.sent.inc_by(res); self.sent_metric.inc_by(res); diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index 2658f60e62..644b6c132b 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -32,8 +32,9 @@ use crate::proxy::{ BAGGAGE_HEADER, Error, HboneAddress, ProxyInputs, TRACEPARENT_HEADER, TraceParent, X_FORWARDED_NETWORK_HEADER, util, }; -use crate::proxy::{ConnectionOpen, ConnectionResult, DerivedWorkload, metrics}; +use crate::proxy::{ConnectionOpen, ConnectionResultBuilder, DerivedWorkload, metrics}; +use crate::baggage::{self, Baggage}; use crate::drain::DrainWatcher; use crate::drain::run_with_drain; use crate::proxy::h2::{H2Stream, client::WorkloadKey}; @@ -41,6 +42,7 @@ use crate::state::service::{LoadBalancerMode, Service, ServiceDescription}; use crate::state::workload::OutboundProtocol; use crate::state::workload::{InboundProtocol, NetworkAddress, Workload, address::Address}; use crate::state::{ServiceResolutionMode, Upstream}; +use crate::tls::identity_from_connection; use crate::{assertions, copy, proxy, socket}; use super::h2::TokioH2Stream; @@ -200,7 +202,7 @@ impl OutboundConnection { let metrics = self.pi.metrics.clone(); let hbone_target = req.hbone_target_destination.clone(); - let result_tracker = Box::new(ConnectionResult::new( + let connection_result_builder = Box::new(ConnectionResultBuilder::new( source_addr, req.actual_destination, hbone_target, @@ -209,27 +211,26 @@ impl OutboundConnection { metrics, )); - let res = match req.protocol { + match req.protocol { OutboundProtocol::DOUBLEHBONE => { // We box this since its not a common path and it would make the future really big. Box::pin(self.proxy_to_double_hbone( source_stream, source_addr, &req, - &result_tracker, + connection_result_builder, )) .await } OutboundProtocol::HBONE => { - self.proxy_to_hbone(source_stream, source_addr, &req, &result_tracker) + self.proxy_to_hbone(source_stream, source_addr, &req, connection_result_builder) .await } OutboundProtocol::TCP => { - self.proxy_to_tcp(source_stream, &req, &result_tracker) + self.proxy_to_tcp(source_stream, &req, connection_result_builder) .await } }; - result_tracker.record(res) } async fn proxy_to_double_hbone( @@ -237,52 +238,87 @@ impl OutboundConnection { stream: TcpStream, remote_addr: SocketAddr, req: &Request, - connection_stats: &ConnectionResult, - ) -> Result<(), Error> { - // Create the outer HBONE stream - let upgraded = Box::pin(self.send_hbone_request(remote_addr, req)).await?; - // Wrap upgraded to implement tokio's Async{Write,Read} - let upgraded = TokioH2Stream::new(upgraded); - - // For the inner one, we do it manually to avoid connection pooling. - // Otherwise, we would only ever reach one workload in the remote cluster. - // We also need to abort tasks the right way to get graceful terminations. - let wl_key = WorkloadKey { - src_id: req.source.identity(), - dst_id: req.final_sans.clone(), - src: remote_addr.ip(), - dst: req.actual_destination, - }; - - // Fetch certs and establish inner TLS connection. - let cert = self - .pi - .local_workload_information - .fetch_certificate() - .await?; - let connector = cert.outbound_connector(wl_key.dst_id.clone())?; - let tls_stream = connector.connect(upgraded).await?; + mut connection_stats_builder: Box, + ) { + // async move block allows use of ? operator + let res = (async move { + // Create the outer HBONE stream + let (upgraded, _) = Box::pin(self.send_hbone_request(remote_addr, req)).await?; + // Wrap upgraded to implement tokio's Async{Write,Read} + let upgraded = TokioH2Stream::new(upgraded); + + // For the inner one, we do it manually to avoid connection pooling. + // Otherwise, we would only ever reach one workload in the remote cluster. + // We also need to abort tasks the right way to get graceful terminations. + let wl_key = WorkloadKey { + src_id: req.source.identity(), + dst_id: req.final_sans.clone(), + src: remote_addr.ip(), + dst: req.actual_destination, + }; - // Spawn inner CONNECT tunnel - let (drain_tx, drain_rx) = tokio::sync::watch::channel(false); - let mut sender = - super::h2::client::spawn_connection(self.pi.cfg.clone(), tls_stream, drain_rx, wl_key) + // Fetch certs and establish inner TLS connection. + let cert = self + .pi + .local_workload_information + .fetch_certificate() .await?; - let origin_network = &self.pi.cfg.network; - let http_request = self.create_hbone_request(remote_addr, req, Some(origin_network)); - let inner_upgraded = sender.send_request(http_request).await?; - - // Proxy - let res = copy::copy_bidirectional( - copy::TcpStreamSplitter(stream), - inner_upgraded, - connection_stats, - ) + let connector = cert.outbound_connector(wl_key.dst_id.clone())?; + let tls_stream = connector.connect(upgraded).await?; + let (_, ssl) = tls_stream.get_ref(); + let peer_identity = identity_from_connection(ssl); + + // Spawn inner CONNECT tunnel + let (drain_tx, drain_rx) = tokio::sync::watch::channel(false); + let mut sender = super::h2::client::spawn_connection( + self.pi.cfg.clone(), + tls_stream, + drain_rx, + wl_key, + ) + .await?; + let origin_network = &self.pi.cfg.network; + let http_request = self.create_hbone_request(remote_addr, req, Some(origin_network)); + let (inner_upgraded, baggage) = sender.send_request(http_request).await?; + + // Proxy + let derived_workload = baggage.map(|baggage| DerivedWorkload { + workload_name: baggage.workload_name, + app: baggage.service_name, + namespace: baggage.namespace, + identity: peer_identity, + cluster_id: baggage.cluster_id, + region: baggage.region, + zone: baggage.zone, + revision: baggage.revision, + }); + Result::<_, Error>::Ok((derived_workload, drain_tx, inner_upgraded)) + }) .await; - let _ = drain_tx.send(true); + match res { + Err(e) => { + let connection_stats = connection_stats_builder.build(); + connection_stats.record(Err(e)); + } + Ok((derived_workload, drain_tx, inner_upgraded)) => { + if let Some(derived_workload) = derived_workload { + *connection_stats_builder = + connection_stats_builder.with_derived_destination(&derived_workload); + } + + let connection_stats = connection_stats_builder.build(); + let res = copy::copy_bidirectional( + copy::TcpStreamSplitter(stream), + inner_upgraded, + &connection_stats, + ) + .await; + let _ = drain_tx.send(true); - res + connection_stats.record(res); + } + } } async fn proxy_to_hbone( @@ -290,10 +326,16 @@ impl OutboundConnection { stream: TcpStream, remote_addr: SocketAddr, req: &Request, - connection_stats: &ConnectionResult, - ) -> Result<(), Error> { - let upgraded = Box::pin(self.send_hbone_request(remote_addr, req)).await?; - copy::copy_bidirectional(copy::TcpStreamSplitter(stream), upgraded, connection_stats).await + connection_stats_builder: Box, + ) { + let connection_stats = Box::new(connection_stats_builder.build()); + let res = (async { + let (upgraded, _) = Box::pin(self.send_hbone_request(remote_addr, req)).await?; + copy::copy_bidirectional(copy::TcpStreamSplitter(stream), upgraded, &connection_stats) + .await + }) + .await; + connection_stats.record(res); } fn create_hbone_request( @@ -311,7 +353,7 @@ impl OutboundConnection { ) .method(hyper::Method::CONNECT) .version(hyper::Version::HTTP_2) - .header(BAGGAGE_HEADER, baggage(req, self.pi.cfg.cluster_id.clone())) + .header(BAGGAGE_HEADER, baggage(req)) .header( FORWARDED, build_forwarded(remote_addr, &req.intended_destination_service), @@ -328,11 +370,12 @@ impl OutboundConnection { .expect("builder with known status code should not fail") } + /// returns upgraded stream and peer's baggage async fn send_hbone_request( &mut self, remote_addr: SocketAddr, req: &Request, - ) -> Result { + ) -> Result<(H2Stream, Option), Error> { // This is the single cluster/single-HBONE codepath (and also the outer tunnel // for double HBONE). We don't need the x-istio-origin-network header here because: // - For single HBONE: both source and destination are in the same network @@ -345,32 +388,38 @@ impl OutboundConnection { src: remote_addr.ip(), dst: req.actual_destination, }); - let upgraded = Box::pin(self.pool.send_request_pooled(&pool_key, request)) + let (upgraded, baggage) = Box::pin(self.pool.send_request_pooled(&pool_key, request)) .instrument(trace_span!("outbound connect")) .await?; - Ok(upgraded) + Ok((upgraded, baggage)) } async fn proxy_to_tcp( &mut self, stream: TcpStream, req: &Request, - connection_stats: &ConnectionResult, - ) -> Result<(), Error> { - let outbound = super::freebind_connect( - None, // No need to spoof source IP on outbound - req.actual_destination, - self.pi.socket_factory.as_ref(), - ) - .await?; + connection_stats_builder: Box, + ) { + let connection_stats = Box::new(connection_stats_builder.build()); - // Proxying data between downstream and upstream - copy::copy_bidirectional( - copy::TcpStreamSplitter(stream), - copy::TcpStreamSplitter(outbound), - connection_stats, - ) - .await + let res = (async { + let outbound = super::freebind_connect( + None, // No need to spoof source IP on outbound + req.actual_destination, + self.pi.socket_factory.as_ref(), + ) + .await?; + + // Proxying data between downstream and upstream + copy::copy_bidirectional( + copy::TcpStreamSplitter(stream), + copy::TcpStreamSplitter(outbound), + &connection_stats, + ) + .await + }) + .await; + connection_stats.record(res); } fn conn_metrics_from_request(req: &Request) -> ConnectionOpen { @@ -679,17 +728,8 @@ fn build_forwarded(remote_addr: SocketAddr, server: &Option) } } -fn baggage(r: &Request, cluster: String) -> String { - format!( - "k8s.cluster.name={cluster},k8s.namespace.name={namespace},k8s.{workload_type}.name={workload_name},service.name={name},service.version={version},cloud.region={region},cloud.availability_zone={zone}", - namespace = r.source.namespace, - workload_type = r.source.workload_type, - workload_name = r.source.workload_name, - name = r.source.canonical_name, - version = r.source.canonical_revision, - region = r.source.locality.region, - zone = r.source.locality.zone, - ) +fn baggage(r: &Request) -> String { + baggage::baggage_header_val(&r.source.baggage(), &r.source.workload_type) } #[derive(Debug)] diff --git a/src/proxy/pool.rs b/src/proxy/pool.rs index d19ef0b419..e69d92797d 100644 --- a/src/proxy/pool.rs +++ b/src/proxy/pool.rs @@ -29,6 +29,7 @@ use tokio::sync::watch; use tokio::sync::Mutex; use tracing::{Instrument, debug, trace}; +use crate::baggage::Baggage; use crate::config; use flurry; @@ -370,7 +371,7 @@ impl WorkloadHBONEPool { &mut self, workload_key: &WorkloadKey, request: http::Request<()>, - ) -> Result { + ) -> Result<(H2Stream, Option), Error> { let mut connection = self.connect(workload_key).await?; connection.send_request(request).await @@ -610,7 +611,7 @@ mod test { .unwrap() }; - let c = pool.send_request_pooled(&key.clone(), req()).await.unwrap(); + let (c, _baggage) = pool.send_request_pooled(&key.clone(), req()).await.unwrap(); let mut c = TokioH2Stream::new(c); c.write_all(b"abcde").await.unwrap(); let mut b = [0u8; 100]; diff --git a/src/state/workload.rs b/src/state/workload.rs index 77c1742365..dc6be70bc0 100644 --- a/src/state/workload.rs +++ b/src/state/workload.rs @@ -14,6 +14,7 @@ use crate::identity::Identity; +use crate::baggage::Baggage; use crate::state::WorkloadInfo; use crate::strng::Strng; use crate::xds::istio::workload::{Port, PortList}; @@ -301,6 +302,19 @@ impl Workload { service_account: self.service_account.clone(), } } + + pub fn baggage(&self) -> Baggage { + Baggage { + cluster_id: (!self.cluster_id.is_empty()).then_some(self.cluster_id.clone()), + namespace: (!self.namespace.is_empty()).then_some(self.namespace.clone()), + workload_name: (!self.workload_name.is_empty()).then_some(self.workload_name.clone()), + service_name: (!self.canonical_name.is_empty()).then_some(self.canonical_name.clone()), + revision: (!self.canonical_revision.is_empty()) + .then_some(self.canonical_revision.clone()), + region: (!self.locality.region.is_empty()).then_some(self.locality.region.clone()), + zone: (!self.locality.zone.is_empty()).then_some(self.locality.zone.clone()), + } + } } impl fmt::Display for Workload { diff --git a/src/test_helpers/tcp.rs b/src/test_helpers/tcp.rs index 77f7cfc2e2..6af6f3a155 100644 --- a/src/test_helpers/tcp.rs +++ b/src/test_helpers/tcp.rs @@ -31,7 +31,10 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::time::Instant; use tracing::{debug, error, info, trace}; +use crate::baggage::{Baggage, baggage_header_val}; use crate::hyper_util::TokioExecutor; +use crate::proxy::BAGGAGE_HEADER; +use crate::strng::Strng; use crate::{identity, tls}; #[derive(Copy, Clone, Debug)] @@ -247,7 +250,7 @@ impl HboneTestServer { &identity::Identity::Spiffe { trust_domain: "cluster.local".into(), namespace: "default".into(), - service_account: self.name.into(), + service_account: self.name.clone().into(), } .into(), Duration::from_secs(0), @@ -256,13 +259,16 @@ impl HboneTestServer { let acceptor = tls::mock::MockServerCertProvider::new(certs); let mut tls_stream = crate::hyper_util::tls_server(acceptor, self.listener); let mode = self.mode; + let name = self.name.clone(); while let Some(socket) = tls_stream.next().await { let waypoint_message = self.waypoint_message.clone(); + let name = name.clone(); if let Err(err) = http2::Builder::new(TokioExecutor) .serve_connection( TokioIo::new(socket), service_fn(move |req| { let waypoint_message = waypoint_message.clone(); + let name = name.clone(); async move { info!("waypoint: received request"); tokio::task::spawn(async move { @@ -277,7 +283,23 @@ impl HboneTestServer { Err(e) => error!("No upgrade {e}"), } }); - Ok::<_, Infallible>(Response::new(Full::::from("streaming..."))) + let mut resp = Response::new(Full::::from("streaming...")); + let baggage = Baggage { + cluster_id: Some(Strng::from("Kubernetes")), + namespace: Some(Strng::from("default")), + workload_name: Some(Strng::from(&name)), + service_name: Some(Strng::from(&name)), + revision: Some(Strng::from("v1")), + region: Some(Strng::from("r1")), + zone: Some(Strng::from("z1")), + }; + resp.headers_mut().insert( + BAGGAGE_HEADER, + baggage_header_val(&baggage, "deployment") + .parse() + .expect("valid baggage header"), + ); + Ok::<_, Infallible>(resp) } }), ) diff --git a/src/tls/certificate.rs b/src/tls/certificate.rs index 8b00c382b3..9d565e52ce 100644 --- a/src/tls/certificate.rs +++ b/src/tls/certificate.rs @@ -23,7 +23,7 @@ use rustls::client::Resumption; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; use rustls::server::WebPkiClientVerifier; -use rustls::{ClientConfig, RootCertStore, ServerConfig, server}; +use rustls::{ClientConfig, CommonState, RootCertStore, ServerConfig}; use rustls_pemfile::Item; use std::io::Cursor; use std::str::FromStr; @@ -60,7 +60,7 @@ pub struct WorkloadCertificate { pub roots: Vec, } -pub fn identity_from_connection(conn: &server::ServerConnection) -> Option { +pub fn identity_from_connection(conn: &CommonState) -> Option { use x509_parser::prelude::*; conn.peer_certificates() .and_then(|certs| certs.first()) diff --git a/tests/namespaced.rs b/tests/namespaced.rs index c94badd612..43375e00d7 100644 --- a/tests/namespaced.rs +++ b/tests/namespaced.rs @@ -296,7 +296,7 @@ mod namespaced { let want = HashMap::from([ ("scope", "access"), ("src.workload", "client"), - ("dst.workload", "actual-ew-gtw"), + ("dst.workload", "echo"), ("dst.hbone_addr", "remote.default.svc.cluster.local:8080"), ("dst.addr", &dst_addr), ("bytes_sent", &sent), @@ -307,10 +307,7 @@ mod namespaced { "src.identity", "spiffe://cluster.local/ns/default/sa/client", ), - ( - "dst.identity", - "spiffe://cluster.local/ns/default/sa/actual-ew-gtw", - ), + ("dst.identity", "spiffe://cluster.local/ns/default/sa/echo"), ]); telemetry::testing::assert_contains(want); Ok(())