From 2b22cc143d7e5e44a3e08c5b30be21be903ffbc2 Mon Sep 17 00:00:00 2001 From: Keith Mattix II Date: Thu, 22 Jan 2026 20:28:30 -0600 Subject: [PATCH 1/7] Add addtl codeowners for experimental (#1732) Signed-off-by: Keith Mattix II --- CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CODEOWNERS b/CODEOWNERS index 9aae9ebf0e..648a37ee9c 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -1,4 +1,4 @@ -* @istio/wg-networking-maintainers-ztunnel +* @istio/wg-networking-maintainers-ztunnel @Stevenjin8 @keithmattix @grnmeira @krinkinmu /Makefile* @istio/wg-test-and-release-maintainers /*.md @istio/wg-test-and-release-maintainers /common/ @istio/wg-test-and-release-maintainers From 2740d4bfa6bb6579a0bea3c03b97289fc61ec4a1 Mon Sep 17 00:00:00 2001 From: Steven Jin Date: Thu, 22 Jan 2026 21:51:29 -0500 Subject: [PATCH 2/7] More baggage support (#1731) * baggage * Use baggage for cross-cluster * fix unit tests * Fix namespaced tests. Remove extra code. * cleanup a bit * Initial plan * Add x-origin-source header to inner CONNECT requests in double HBONE Co-authored-by: keithmattix <1531662+keithmattix@users.noreply.github.com> * Add comment explaining single HBONE codepath and tests for x-origin-source header Co-authored-by: keithmattix <1531662+keithmattix@users.noreply.github.com> * Rename header from x-origin-source to x-istio-origin-source Co-authored-by: keithmattix <1531662+keithmattix@users.noreply.github.com> * Rename header to x-istio-origin-network and refocus test on double HBONE Co-authored-by: keithmattix <1531662+keithmattix@users.noreply.github.com> * Add test back Signed-off-by: Keith Mattix II * Use inbound x-istio-origin-network to know whether traffic originates from the gateway Signed-off-by: Keith Mattix II * fmt Signed-off-by: Keith Mattix II * lint * Codeowners * Fix rebase --------- Signed-off-by: Keith Mattix II Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: keithmattix <1531662+keithmattix@users.noreply.github.com> Co-authored-by: Keith Mattix II --- src/baggage.rs | 16 ++ src/copy.rs | 21 +- src/proxy.rs | 1 + src/proxy/h2/client.rs | 14 +- src/proxy/inbound.rs | 66 ++++-- src/proxy/inbound_passthrough.rs | 33 +-- src/proxy/metrics.rs | 139 +++++++++---- src/proxy/outbound.rs | 334 +++++++++++++++++++++++-------- src/proxy/pool.rs | 5 +- src/test_helpers/tcp.rs | 25 ++- src/tls/certificate.rs | 4 +- tests/namespaced.rs | 4 +- 12 files changed, 480 insertions(+), 182 deletions(-) diff --git a/src/baggage.rs b/src/baggage.rs index 375405dd0d..a5591338eb 100644 --- a/src/baggage.rs +++ b/src/baggage.rs @@ -29,6 +29,22 @@ pub struct Baggage { pub zone: Option, } +#[allow(clippy::too_many_arguments)] +pub fn baggage_header_val( + cluster: &str, + namespace: &str, + workload_type: &str, + workload_name: &str, + name: &str, + version: &str, + region: &str, + zone: &str, +) -> String { + format!( + "k8s.cluster.name={cluster},k8s.namespace.name={namespace},k8s.{workload_type}.name={workload_name},service.name={name},service.version={version},cloud.region={region},cloud.availability_zone={zone}", + ) +} + pub fn parse_baggage_header(headers: GetAll) -> Result { let mut baggage = Baggage { ..Default::default() diff --git a/src/copy.rs b/src/copy.rs index 13e7fe9102..b660512580 100644 --- a/src/copy.rs +++ b/src/copy.rs @@ -12,9 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::proxy; -use crate::proxy::ConnectionResult; use crate::proxy::Error::{BackendDisconnected, ClientDisconnected, ReceiveError, SendError}; +use crate::proxy::{self, ConnectionResult}; use bytes::{Buf, Bytes, BytesMut}; use pin_project_lite::pin_project; use std::future::Future; @@ -416,9 +415,9 @@ mod tests { let metrics = std::sync::Arc::new(crate::proxy::Metrics::new( crate::metrics::sub_registry(&mut registry), )); - let source_addr = "127.0.0.1:12345".parse().unwrap(); - let dest_addr = "127.0.0.1:34567".parse().unwrap(); - let cr = ConnectionResult::new( + let source_addr: std::net::SocketAddr = "127.0.0.1:12345".parse().unwrap(); + let dest_addr: std::net::SocketAddr = "127.0.0.1:34567".parse().unwrap(); + let cr = crate::proxy::metrics::ConnectionResultBuilder::new( source_addr, dest_addr, None, @@ -432,7 +431,8 @@ mod tests { destination_service: None, }, metrics.clone(), - ); + ) + .build(); copy_bidirectional(ztunnel_downsteam, ztunnel_upsteam, &cr).await }); const ITERS: usize = 1000; @@ -461,9 +461,9 @@ mod tests { let metrics = std::sync::Arc::new(crate::proxy::Metrics::new( crate::metrics::sub_registry(&mut registry), )); - let source_addr = "127.0.0.1:12345".parse().unwrap(); - let dest_addr = "127.0.0.1:34567".parse().unwrap(); - let cr = ConnectionResult::new( + let source_addr: std::net::SocketAddr = "127.0.0.1:12345".parse().unwrap(); + let dest_addr: std::net::SocketAddr = "127.0.0.1:34567".parse().unwrap(); + let cr = crate::proxy::metrics::ConnectionResultBuilder::new( source_addr, dest_addr, None, @@ -477,7 +477,8 @@ mod tests { destination_service: None, }, metrics.clone(), - ); + ) + .build(); copy_bidirectional(WeirdIO(ztunnel_downsteam), WeirdIO(ztunnel_upsteam), &cr).await }); const WRITES: usize = 2560; diff --git a/src/proxy.rs b/src/proxy.rs index b780769858..cb6461decc 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -558,6 +558,7 @@ pub struct TraceParent { pub const BAGGAGE_HEADER: &str = "baggage"; pub const TRACEPARENT_HEADER: &str = "traceparent"; +pub const X_ORIGIN_SOURCE_HEADER: &str = "x-istio-origin-network"; impl TraceParent { pub fn header(&self) -> hyper::header::HeaderValue { diff --git a/src/proxy/h2/client.rs b/src/proxy/h2/client.rs index 4c768d537e..37471535fe 100644 --- a/src/proxy/h2/client.rs +++ b/src/proxy/h2/client.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::baggage::{Baggage, parse_baggage_header}; use crate::config; use crate::identity::Identity; -use crate::proxy::Error; +use crate::proxy::{BAGGAGE_HEADER, Error}; use bytes::{Buf, Bytes}; use h2::SendStream; use h2::client::{Connection, SendRequest}; @@ -101,10 +102,10 @@ impl H2ConnectClient { pub async fn send_request( &mut self, req: http::Request<()>, - ) -> Result { + ) -> Result<(crate::proxy::h2::H2Stream, Option), Error> { let cur = self.stream_count.fetch_add(1, Ordering::SeqCst); trace!(current_streams = cur, "sending request"); - let (send, recv) = match self.internal_send(req).await { + let (send, recv, baggage) = match self.internal_send(req).await { Ok(r) => r, Err(e) => { // Request failed, so drop the stream now @@ -123,14 +124,14 @@ impl H2ConnectClient { _dropped: dropped2, }; let h2 = crate::proxy::h2::H2Stream { read, write }; - Ok(h2) + Ok((h2, baggage)) } // helper to allow us to handle errors once async fn internal_send( &mut self, req: Request<()>, - ) -> Result<(SendStream, h2::RecvStream), Error> { + ) -> Result<(SendStream, h2::RecvStream, Option), Error> { // "This function must return `Ready` before `send_request` is called" // We should always be ready though, because we make sure we don't go over the max stream limit out of band. futures::future::poll_fn(|cx| self.sender.poll_ready(cx)).await?; @@ -139,7 +140,8 @@ impl H2ConnectClient { if response.status() != 200 { return Err(Error::HttpStatus(response.status())); } - Ok((stream, response.into_body())) + let baggage = parse_baggage_header(response.headers().get_all(BAGGAGE_HEADER)).ok(); + Ok((stream, response.into_body(), baggage)) } } diff --git a/src/proxy/inbound.rs b/src/proxy/inbound.rs index 8f7a274bf2..2f64ff7d78 100644 --- a/src/proxy/inbound.rs +++ b/src/proxy/inbound.rs @@ -22,15 +22,20 @@ use tokio::sync::watch; use tracing::{Instrument, debug, error, info, info_span, trace_span}; -use super::{ConnectionResult, Error, HboneAddress, LocalWorkloadInformation, ResponseFlags, util}; -use crate::baggage::parse_baggage_header; +use super::{ + ConnectionResult, ConnectionResultBuilder, Error, HboneAddress, LocalWorkloadInformation, + ResponseFlags, util, +}; +use crate::baggage::{baggage_header_val, parse_baggage_header}; use crate::identity::Identity; use crate::config::Config; use crate::drain::DrainWatcher; use crate::proxy::h2::server::{H2Request, RequestParts}; use crate::proxy::metrics::{ConnectionOpen, Reporter}; -use crate::proxy::{BAGGAGE_HEADER, ProxyInputs, TRACEPARENT_HEADER, TraceParent, metrics}; +use crate::proxy::{ + BAGGAGE_HEADER, ProxyInputs, TRACEPARENT_HEADER, TraceParent, X_ORIGIN_SOURCE_HEADER, metrics, +}; use crate::rbac::Connection; use crate::socket::to_canonical; use crate::state::service::Service; @@ -214,7 +219,7 @@ impl Inbound { // At this point in processing, we never built up full context to log a complete access log. // Instead, just log a minimal error line. metrics::log_early_deny(src, dst, Reporter::destination, e); - if let Err(err) = req.send_error(build_response(code)) { + if let Err(err) = req.send_error(build_response(code, None)) { tracing::warn!("failed to send HTTP response: {err}"); } return; @@ -288,7 +293,7 @@ impl Inbound { Ok(res) => res, Err(InboundFlagError(err, flag, code)) => { ri.result_tracker.record_with_flag(Err(err), flag); - if let Err(err) = req.send_error(build_response(code)) { + if let Err(err) = req.send_error(build_response(code, None)) { tracing::warn!("failed to send HTTP response: {err}"); } return; @@ -305,7 +310,10 @@ impl Inbound { // See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt for more information about the // proxy protocol. let send = req - .send_response(build_response(StatusCode::OK)) + .send_response(build_response( + StatusCode::OK, + Some((ri.destination_workload.as_ref(), pi.cfg.cluster_id.as_str())), + )) .and_then(|h2_stream| async { if let Some(TunnelRequest { protocol: Protocol::PROXY, @@ -393,12 +401,12 @@ impl Inbound { // We may need a more explicit indicator in the future. // Note: previously this attempted to check that the src identity was equal to the Gateway; // this check is broken as the gateway only forwards an HBONE request, it doesn't initiate it itself. - let from_gateway = matches!(hbone_addr, HboneAddress::SvcHostname(_, _)); - if from_gateway { - debug!("request from gateway"); - } + let from_gateway = req.headers().get(X_ORIGIN_SOURCE_HEADER).is_some(); + let source = match from_gateway { - true => None, // we cannot lookup source workload since we don't know the network, see https://github.com/istio/ztunnel/issues/515 + // we cannot lookup source workload since we don't know the network, see https://github.com/istio/ztunnel/issues/515. + // Instead, we will use baggage + true => None, false => { let src_network_addr = NetworkAddress { // we can assume source network is our network because we did not traverse a gateway @@ -426,7 +434,7 @@ impl Inbound { upstream_service, &destination_workload, ); - let result_tracker = Box::new(metrics::ConnectionResult::new( + let connection_result_builder = ConnectionResultBuilder::new( rbac_ctx.conn.src, // For consistency with outbound logs, report the original destination (with 15008 port) // as dst.addr, and the target address as dst.hbone_addr @@ -436,19 +444,22 @@ impl Inbound { ConnectionOpen { reporter: Reporter::destination, source, - derived_source: Some(derived_source), - destination: Some(destination_workload), + derived_source: Some(derived_source.clone()), + destination: Some(destination_workload.clone()), connection_security_policy: metrics::SecurityPolicy::mutual_tls, destination_service: ds, }, pi.metrics.clone(), - )); + ); + + let result_tracker = Box::new(connection_result_builder.build()); Ok(InboundRequest { for_host, rbac_ctx, result_tracker, upstream_addr, tunnel_request, + destination_workload, }) } @@ -674,6 +685,7 @@ struct InboundRequest { result_tracker: Box, upstream_addr: SocketAddr, tunnel_request: Option, + destination_workload: Arc, } /// InboundError represents an error with an associated status code. @@ -717,9 +729,27 @@ pub fn parse_forwarded_host(req: &T) -> Option { .and_then(proxy::parse_forwarded_host) } -fn build_response(status: StatusCode) -> Response<()> { - Response::builder() - .status(status) +// Second argument is local workload and cluster name +fn build_response(status: StatusCode, local_wl: Option<(&Workload, &str)>) -> Response<()> { + let mut builder = Response::builder().status(status); + + if let Some((local_wl, cluster)) = local_wl { + builder = builder.header( + BAGGAGE_HEADER, + baggage_header_val( + cluster, + &local_wl.namespace, + &local_wl.workload_type, + &local_wl.workload_name, + &local_wl.canonical_name, + &local_wl.canonical_revision, + &local_wl.locality.region, + &local_wl.locality.zone, + ), + ) + } + + builder .body(()) .expect("builder with known status code should not fail") } diff --git a/src/proxy/inbound_passthrough.rs b/src/proxy/inbound_passthrough.rs index f7daad4a01..75629b16b5 100644 --- a/src/proxy/inbound_passthrough.rs +++ b/src/proxy/inbound_passthrough.rs @@ -187,21 +187,24 @@ impl InboundPassthrough { upstream_services, &upstream_workload, ); - let result_tracker = Box::new(metrics::ConnectionResult::new( - source_addr, - dest_addr, - None, - start, - metrics::ConnectionOpen { - reporter: Reporter::destination, - source: source_workload, - derived_source: Some(derived_source), - destination: Some(upstream_workload), - connection_security_policy: metrics::SecurityPolicy::unknown, - destination_service: ds, - }, - pi.metrics.clone(), - )); + let result_tracker = Box::new( + metrics::ConnectionResultBuilder::new( + source_addr, + dest_addr, + None, + start, + metrics::ConnectionOpen { + reporter: Reporter::destination, + source: source_workload, + derived_source: Some(derived_source), + destination: Some(upstream_workload), + connection_security_policy: metrics::SecurityPolicy::unknown, + destination_service: ds, + }, + pi.metrics.clone(), + ) + .build(), + ); let mut conn_guard = match pi .connection_manager diff --git a/src/proxy/metrics.rs b/src/proxy/metrics.rs index c7c6d89bae..1231be98c6 100644 --- a/src/proxy/metrics.rs +++ b/src/proxy/metrics.rs @@ -198,6 +198,26 @@ impl CommonTrafficLabels { self.destination_service_namespace = w.namespace.clone().into(); self } + + fn with_derived_destination(mut self, w: Option<&DerivedWorkload>) -> Self { + let Some(w) = w else { return self }; + self.destination_workload = w.workload_name.clone().into(); + self.destination_canonical_service = w.app.clone().into(); + self.destination_canonical_revision = w.revision.clone().into(); + self.destination_workload_namespace = w.namespace.clone().into(); + self.destination_app = w.workload_name.clone().into(); + self.destination_version = w.revision.clone().into(); + self.destination_cluster = w.cluster_id.clone().into(); + // This is the identity from the TLS handshake; this is the most trustworthy source so use it + self.destination_principal = w.identity.clone().into(); + + let mut local = self.locality.0.unwrap_or_default(); + local.destination_region = w.region.clone().into(); + local.destination_zone = w.zone.clone().into(); + self.locality = OptionallyEncode(Some(local)); + + self + } } impl From for CommonTrafficLabels { @@ -414,29 +434,29 @@ impl SocketCloseGuard { /// ConnectionResult abstracts recording a metric and emitting an access log upon a connection completion pub struct ConnectionResult { // Src address and name - src: (SocketAddr, Option), + pub(crate) src: (SocketAddr, Option), // Dst address and name - dst: (SocketAddr, Option), - hbone_target: Option, - start: Instant, + pub(crate) dst: (SocketAddr, Option), + pub(crate) hbone_target: Option, + pub(crate) start: Instant, // TODO: storing CommonTrafficLabels adds ~600 bytes retained throughout a connection life time. // We can pre-fetch the metrics we need at initialization instead of storing this, then keep a more // efficient representation for the fields we need to log. Ideally, this would even be optional // in case logs were disabled. - tl: CommonTrafficLabels, - metrics: Arc, + pub(crate) tl: CommonTrafficLabels, + pub(crate) metrics: Arc, // sent records the number of bytes sent on this connection - sent: AtomicU64, + pub(crate) sent: AtomicU64, // sent_metric records the number of bytes sent on this connection to the aggregated metric counter - sent_metric: Counter, + pub(crate) sent_metric: Counter, // recv records the number of bytes received on this connection - recv: AtomicU64, + pub(crate) recv: AtomicU64, // recv_metric records the number of bytes received on this connection to the aggregated metric counter - recv_metric: Counter, + pub(crate) recv_metric: Counter, // Have we recorded yet? - recorded: bool, + pub(crate) recorded: bool, } // log_early_deny allows logging a connection is denied before we have enough information to emit proper @@ -493,7 +513,17 @@ macro_rules! access_log { } }; } -impl ConnectionResult { + +pub struct ConnectionResultBuilder { + src: (SocketAddr, Option), + dst: (SocketAddr, Option), + hbone_target: Option, + start: Instant, + tl: CommonTrafficLabels, + metrics: Arc, +} + +impl ConnectionResultBuilder { pub fn new( src: SocketAddr, dst: SocketAddr, @@ -511,30 +541,61 @@ impl ConnectionResult { conn.destination.as_ref().map(|wl| wl.name.clone().into()), ); let tl = CommonTrafficLabels::from(conn); - metrics.connection_opens.get_or_create(&tl).inc(); - - let mtls = tl.connection_security_policy == SecurityPolicy::mutual_tls; src.1 = src.1.or(tl.source_canonical_service.clone().inner()); dst.1 = dst.1.or(tl.destination_canonical_service.clone().inner()); + Self { + src, + dst, + hbone_target, + start, + tl, + metrics, + } + } + + pub fn with_derived_source(mut self, w: &DerivedWorkload) -> Self { + self.tl = self.tl.with_derived_source(Some(w)); + self.src.1 = w.workload_name.clone().map(RichStrng::from); + self + } + + pub fn with_derived_destination(mut self, w: &DerivedWorkload) -> Self { + self.tl = self.tl.with_derived_destination(Some(w)); + self.dst.1 = w.workload_name.clone().map(RichStrng::from); + self + } + + pub fn build(self) -> ConnectionResult { + // Grab the metrics with our labels now, so we don't need to fetch them each time. + // The inner metric is an Arc so clone is fine/cheap. + // With the raw Counter, we increment is a simple atomic add operation (~1ns). + // Fetching the metric itself is ~300ns; fast, but we call it on each read/write so it would + // add up. + let sent_metric = self.metrics.sent_bytes.get_or_create(&self.tl).clone(); + let recv_metric = self.metrics.received_bytes.get_or_create(&self.tl).clone(); + let sent = atomic::AtomicU64::new(0); + let recv = atomic::AtomicU64::new(0); + + let mtls = self.tl.connection_security_policy == SecurityPolicy::mutual_tls; event!( target: "access", parent: None, tracing::Level::DEBUG, - src.addr = %src.0, - src.workload = src.1.as_deref().map(to_value), - src.namespace = tl.source_workload_namespace.to_value(), - src.identity = tl.source_principal.as_ref().filter(|_| mtls).map(to_value_owned), + src.addr = %self.src.0, + src.workload = self.src.1.as_deref().map(to_value), + src.namespace = self.tl.source_workload_namespace.to_value(), + src.identity = self.tl.source_principal.as_ref().filter(|_| mtls).map(to_value_owned), - dst.addr = %dst.0, - dst.hbone_addr = hbone_target.as_ref().map(display), - dst.service = tl.destination_service.to_value(), - dst.workload = dst.1.as_deref().map(to_value), - dst.namespace = tl.destination_workload_namespace.to_value(), - dst.identity = tl.destination_principal.as_ref().filter(|_| mtls).map(to_value_owned), + dst.addr = %self.dst.0, + dst.hbone_addr = self.hbone_target.as_ref().map(display), + dst.service = self.tl.destination_service.to_value(), + dst.workload = self.dst.1.as_deref().map(to_value), + dst.namespace = self.tl.destination_workload_namespace.to_value(), + dst.identity = self.tl.destination_principal.as_ref().filter(|_| mtls).map(to_value_owned), - direction = if tl.reporter == Reporter::source { + direction = if self.tl.reporter == Reporter::source { "outbound" } else { "inbound" @@ -542,23 +603,15 @@ impl ConnectionResult { "connection opened" ); - // Grab the metrics with our labels now, so we don't need to fetch them each time. - // The inner metric is an Arc so clone is fine/cheap. - // With the raw Counter, we increment is a simple atomic add operation (~1ns). - // Fetching the metric itself is ~300ns; fast, but we call it on each read/write so it would - // add up. - let sent_metric = metrics.sent_bytes.get_or_create(&tl).clone(); - let recv_metric = metrics.received_bytes.get_or_create(&tl).clone(); - let sent = atomic::AtomicU64::new(0); - let recv = atomic::AtomicU64::new(0); - Self { - src, - dst, - hbone_target, - start, - tl, - metrics, + self.metrics.connection_opens.get_or_create(&self.tl).inc(); + ConnectionResult { + src: self.src, + dst: self.dst, + hbone_target: self.hbone_target, + start: self.start, + tl: self.tl, + metrics: self.metrics, sent, sent_metric, recv, @@ -566,7 +619,9 @@ impl ConnectionResult { recorded: false, } } +} +impl ConnectionResult { pub fn increment_send(&self, res: u64) { self.sent.inc_by(res); self.sent_metric.inc_by(res); diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index c1f045f13a..c4bdcc20ee 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -25,13 +25,16 @@ use tokio::sync::watch; use tracing::{Instrument, debug, error, info, info_span, trace_span}; use crate::identity::Identity; +use crate::strng::Strng; use crate::proxy::metrics::Reporter; use crate::proxy::{ - BAGGAGE_HEADER, Error, HboneAddress, ProxyInputs, TRACEPARENT_HEADER, TraceParent, util, + BAGGAGE_HEADER, Error, HboneAddress, ProxyInputs, TRACEPARENT_HEADER, TraceParent, + X_ORIGIN_SOURCE_HEADER, util, }; -use crate::proxy::{ConnectionOpen, ConnectionResult, DerivedWorkload, metrics}; +use crate::proxy::{ConnectionOpen, ConnectionResultBuilder, DerivedWorkload, metrics}; +use crate::baggage::{self, Baggage}; use crate::drain::DrainWatcher; use crate::drain::run_with_drain; use crate::proxy::h2::{H2Stream, client::WorkloadKey}; @@ -39,6 +42,7 @@ use crate::state::service::{LoadBalancerMode, Service, ServiceDescription}; use crate::state::workload::OutboundProtocol; use crate::state::workload::{InboundProtocol, NetworkAddress, Workload, address::Address}; use crate::state::{ServiceResolutionMode, Upstream}; +use crate::tls::identity_from_connection; use crate::{assertions, copy, proxy, socket}; use super::h2::TokioH2Stream; @@ -198,7 +202,7 @@ impl OutboundConnection { let metrics = self.pi.metrics.clone(); let hbone_target = req.hbone_target_destination.clone(); - let result_tracker = Box::new(ConnectionResult::new( + let connection_result_builder = Box::new(ConnectionResultBuilder::new( source_addr, req.actual_destination, hbone_target, @@ -207,27 +211,26 @@ impl OutboundConnection { metrics, )); - let res = match req.protocol { + match req.protocol { OutboundProtocol::DOUBLEHBONE => { // We box this since its not a common path and it would make the future really big. Box::pin(self.proxy_to_double_hbone( source_stream, source_addr, &req, - &result_tracker, + connection_result_builder, )) .await } OutboundProtocol::HBONE => { - self.proxy_to_hbone(source_stream, source_addr, &req, &result_tracker) + self.proxy_to_hbone(source_stream, source_addr, &req, connection_result_builder) .await } OutboundProtocol::TCP => { - self.proxy_to_tcp(source_stream, &req, &result_tracker) + self.proxy_to_tcp(source_stream, &req, connection_result_builder) .await } }; - result_tracker.record(res) } async fn proxy_to_double_hbone( @@ -235,51 +238,87 @@ impl OutboundConnection { stream: TcpStream, remote_addr: SocketAddr, req: &Request, - connection_stats: &ConnectionResult, - ) -> Result<(), Error> { - // Create the outer HBONE stream - let upgraded = Box::pin(self.send_hbone_request(remote_addr, req)).await?; - // Wrap upgraded to implement tokio's Async{Write,Read} - let upgraded = TokioH2Stream::new(upgraded); - - // For the inner one, we do it manually to avoid connection pooling. - // Otherwise, we would only ever reach one workload in the remote cluster. - // We also need to abort tasks the right way to get graceful terminations. - let wl_key = WorkloadKey { - src_id: req.source.identity(), - dst_id: req.final_sans.clone(), - src: remote_addr.ip(), - dst: req.actual_destination, - }; - - // Fetch certs and establish inner TLS connection. - let cert = self - .pi - .local_workload_information - .fetch_certificate() - .await?; - let connector = cert.outbound_connector(wl_key.dst_id.clone())?; - let tls_stream = connector.connect(upgraded).await?; + mut connection_stats_builder: Box, + ) { + // async move block allows use of ? operator + let res = (async move { + // Create the outer HBONE stream + let (upgraded, _) = Box::pin(self.send_hbone_request(remote_addr, req)).await?; + // Wrap upgraded to implement tokio's Async{Write,Read} + let upgraded = TokioH2Stream::new(upgraded); + + // For the inner one, we do it manually to avoid connection pooling. + // Otherwise, we would only ever reach one workload in the remote cluster. + // We also need to abort tasks the right way to get graceful terminations. + let wl_key = WorkloadKey { + src_id: req.source.identity(), + dst_id: req.final_sans.clone(), + src: remote_addr.ip(), + dst: req.actual_destination, + }; - // Spawn inner CONNECT tunnel - let (drain_tx, drain_rx) = tokio::sync::watch::channel(false); - let mut sender = - super::h2::client::spawn_connection(self.pi.cfg.clone(), tls_stream, drain_rx, wl_key) + // Fetch certs and establish inner TLS connection. + let cert = self + .pi + .local_workload_information + .fetch_certificate() .await?; - let http_request = self.create_hbone_request(remote_addr, req); - let inner_upgraded = sender.send_request(http_request).await?; - - // Proxy - let res = copy::copy_bidirectional( - copy::TcpStreamSplitter(stream), - inner_upgraded, - connection_stats, - ) + let connector = cert.outbound_connector(wl_key.dst_id.clone())?; + let tls_stream = connector.connect(upgraded).await?; + let (_, ssl) = tls_stream.get_ref(); + let peer_identity = identity_from_connection(ssl); + + // Spawn inner CONNECT tunnel + let (drain_tx, drain_rx) = tokio::sync::watch::channel(false); + let mut sender = super::h2::client::spawn_connection( + self.pi.cfg.clone(), + tls_stream, + drain_rx, + wl_key, + ) + .await?; + let origin_network = &self.pi.cfg.network; + let http_request = self.create_hbone_request(remote_addr, req, Some(origin_network)); + let (inner_upgraded, baggage) = sender.send_request(http_request).await?; + + // Proxy + let derived_workload = baggage.map(|baggage| DerivedWorkload { + workload_name: baggage.workload_name, + app: baggage.service_name, + namespace: baggage.namespace, + identity: peer_identity, + cluster_id: baggage.cluster_id, + region: baggage.region, + zone: baggage.zone, + revision: baggage.revision, + }); + Result::<_, Error>::Ok((derived_workload, drain_tx, inner_upgraded)) + }) .await; - let _ = drain_tx.send(true); + match res { + Err(e) => { + let connection_stats = Box::new(connection_stats_builder.build()); + connection_stats.record(Err(e)); + } + Ok((derived_workload, drain_tx, inner_upgraded)) => { + if let Some(derived_workload) = derived_workload { + *connection_stats_builder = + connection_stats_builder.with_derived_destination(&derived_workload); + } - res + let connection_stats = connection_stats_builder.build(); + let res = copy::copy_bidirectional( + copy::TcpStreamSplitter(stream), + inner_upgraded, + &connection_stats, + ) + .await; + let _ = drain_tx.send(true); + + connection_stats.record(res); + } + } } async fn proxy_to_hbone( @@ -287,18 +326,25 @@ impl OutboundConnection { stream: TcpStream, remote_addr: SocketAddr, req: &Request, - connection_stats: &ConnectionResult, - ) -> Result<(), Error> { - let upgraded = Box::pin(self.send_hbone_request(remote_addr, req)).await?; - copy::copy_bidirectional(copy::TcpStreamSplitter(stream), upgraded, connection_stats).await + connection_stats_builder: Box, + ) { + let connection_stats = Box::new(connection_stats_builder.build()); + let res = (async { + let (upgraded, _) = Box::pin(self.send_hbone_request(remote_addr, req)).await?; + copy::copy_bidirectional(copy::TcpStreamSplitter(stream), upgraded, &connection_stats) + .await + }) + .await; + connection_stats.record(res); } fn create_hbone_request( - &mut self, + &self, remote_addr: SocketAddr, req: &Request, + origin_network: Option<&Strng>, ) -> http::Request<()> { - http::Request::builder() + let mut builder = http::Request::builder() .uri( req.hbone_target_destination .as_ref() @@ -312,17 +358,29 @@ impl OutboundConnection { FORWARDED, build_forwarded(remote_addr, &req.intended_destination_service), ) - .header(TRACEPARENT_HEADER, self.id.header()) + .header(TRACEPARENT_HEADER, self.id.header()); + + // Add x-istio-origin-network header for inner CONNECT requests in double HBONE + if let Some(network) = origin_network { + builder = builder.header(X_ORIGIN_SOURCE_HEADER, network.as_str()); + } + + builder .body(()) .expect("builder with known status code should not fail") } + /// returns upgraded stream and peer's baggage async fn send_hbone_request( &mut self, remote_addr: SocketAddr, req: &Request, - ) -> Result { - let request = self.create_hbone_request(remote_addr, req); + ) -> Result<(H2Stream, Option), Error> { + // This is the single cluster/single-HBONE codepath (and also the outer tunnel + // for double HBONE). We don't need the x-istio-origin-network header here because: + // - For single HBONE: both source and destination are in the same network + // - For double HBONE outer: the gateway doesn't need origin network info + let request = self.create_hbone_request(remote_addr, req, None); let pool_key = Box::new(WorkloadKey { src_id: req.source.identity(), // Clone here shouldn't be needed ideally, we could just take ownership of Request. @@ -330,32 +388,38 @@ impl OutboundConnection { src: remote_addr.ip(), dst: req.actual_destination, }); - let upgraded = Box::pin(self.pool.send_request_pooled(&pool_key, request)) + let (upgraded, baggage) = Box::pin(self.pool.send_request_pooled(&pool_key, request)) .instrument(trace_span!("outbound connect")) .await?; - Ok(upgraded) + Ok((upgraded, baggage)) } async fn proxy_to_tcp( &mut self, stream: TcpStream, req: &Request, - connection_stats: &ConnectionResult, - ) -> Result<(), Error> { - let outbound = super::freebind_connect( - None, // No need to spoof source IP on outbound - req.actual_destination, - self.pi.socket_factory.as_ref(), - ) - .await?; + connection_stats_builder: Box, + ) { + let connection_stats = Box::new(connection_stats_builder.build()); - // Proxying data between downstream and upstream - copy::copy_bidirectional( - copy::TcpStreamSplitter(stream), - copy::TcpStreamSplitter(outbound), - connection_stats, - ) - .await + let res = (async { + let outbound = super::freebind_connect( + None, // No need to spoof source IP on outbound + req.actual_destination, + self.pi.socket_factory.as_ref(), + ) + .await?; + + // Proxying data between downstream and upstream + copy::copy_bidirectional( + copy::TcpStreamSplitter(stream), + copy::TcpStreamSplitter(outbound), + &connection_stats, + ) + .await + }) + .await; + connection_stats.record(res); } fn conn_metrics_from_request(req: &Request) -> ConnectionOpen { @@ -665,15 +729,15 @@ fn build_forwarded(remote_addr: SocketAddr, server: &Option) } fn baggage(r: &Request, cluster: String) -> String { - format!( - "k8s.cluster.name={cluster},k8s.namespace.name={namespace},k8s.{workload_type}.name={workload_name},service.name={name},service.version={version},cloud.region={region},cloud.availability_zone={zone}", - namespace = r.source.namespace, - workload_type = r.source.workload_type, - workload_name = r.source.workload_name, - name = r.source.canonical_name, - version = r.source.canonical_revision, - region = r.source.locality.region, - zone = r.source.locality.zone, + baggage::baggage_header_val( + &cluster, + &r.source.namespace, + &r.source.workload_type, + &r.source.workload_name, + &r.source.canonical_name, + &r.source.canonical_revision, + &r.source.locality.region, + &r.source.locality.zone, ) } @@ -1876,6 +1940,110 @@ mod tests { ); } + #[tokio::test] + async fn test_x_origin_source_header() { + initialize_telemetry(); + + // Create a test config with a specific network + let cfg = Arc::new(Config { + network: "test-network".into(), + local_node: Some("local-node".to_string()), + ..crate::config::parse_config().unwrap() + }); + + // Create a source workload and add it to state + let source = XdsWorkload { + uid: "cluster1//v1/Pod/ns/source-workload".to_string(), + name: "source-workload".to_string(), + namespace: "ns".to_string(), + addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 1])], + node: "local-node".to_string(), + ..Default::default() + }; + + let state = new_proxy_state(&[source], &[], &[]); + let sock_fact = Arc::new(crate::proxy::DefaultSocketFactory::default()); + + let wi = WorkloadInfo { + name: "source-workload".to_string(), + namespace: "ns".to_string(), + service_account: "default".to_string(), + }; + let local_workload_information = Arc::new(LocalWorkloadInformation::new( + Arc::new(wi.clone()), + state.clone(), + identity::mock::new_secret_manager(Duration::from_secs(10)), + )); + + let outbound = OutboundConnection { + pi: Arc::new(ProxyInputs { + state: state.clone(), + cfg: cfg.clone(), + metrics: test_proxy_metrics(), + socket_factory: sock_fact.clone(), + local_workload_information: local_workload_information.clone(), + connection_manager: ConnectionManager::default(), + resolver: None, + disable_inbound_freebind: false, + crl_manager: None, + }), + id: TraceParent::new(), + pool: WorkloadHBONEPool::new( + cfg.clone(), + sock_fact, + local_workload_information.clone(), + ), + hbone_port: cfg.inbound_addr.port(), + }; + + // Get the source workload from state + let source_workload = outbound + .pi + .local_workload_information + .get_workload() + .await + .unwrap(); + + // Create a minimal test request with required fields + let req = Request { + protocol: OutboundProtocol::HBONE, + source: source_workload, + hbone_target_destination: Some(HboneAddress::SocketAddr( + "10.0.0.1:8080".parse().unwrap(), + )), + actual_destination_workload: None, + intended_destination_service: None, + actual_destination: "10.0.0.1:8080".parse().unwrap(), + upstream_sans: vec![], + final_sans: vec![], + }; + + let remote_addr = "127.0.0.1:12345".parse().unwrap(); + + // Test the single HBONE case - header should NOT be added when origin_network is None + let http_request_no_header = outbound.create_hbone_request(remote_addr, &req, None); + assert!( + http_request_no_header + .headers() + .get(X_ORIGIN_SOURCE_HEADER) + .is_none(), + "x-istio-origin-network header should not be present when origin_network is None (single HBONE)" + ); + + // Test the double HBONE inner request case - header should be added when network is specified + let network = crate::strng::Strng::from("test-network"); + let http_request_with_header = + outbound.create_hbone_request(remote_addr, &req, Some(&network)); + assert_eq!( + http_request_with_header + .headers() + .get(X_ORIGIN_SOURCE_HEADER) + .unwrap(), + "test-network", + "x-istio-origin-network header should contain the network name for double HBONE inner request" + ); + } + #[derive(PartialEq, Debug)] struct ExpectedRequest<'a> { protocol: OutboundProtocol, diff --git a/src/proxy/pool.rs b/src/proxy/pool.rs index d19ef0b419..e69d92797d 100644 --- a/src/proxy/pool.rs +++ b/src/proxy/pool.rs @@ -29,6 +29,7 @@ use tokio::sync::watch; use tokio::sync::Mutex; use tracing::{Instrument, debug, trace}; +use crate::baggage::Baggage; use crate::config; use flurry; @@ -370,7 +371,7 @@ impl WorkloadHBONEPool { &mut self, workload_key: &WorkloadKey, request: http::Request<()>, - ) -> Result { + ) -> Result<(H2Stream, Option), Error> { let mut connection = self.connect(workload_key).await?; connection.send_request(request).await @@ -610,7 +611,7 @@ mod test { .unwrap() }; - let c = pool.send_request_pooled(&key.clone(), req()).await.unwrap(); + let (c, _baggage) = pool.send_request_pooled(&key.clone(), req()).await.unwrap(); let mut c = TokioH2Stream::new(c); c.write_all(b"abcde").await.unwrap(); let mut b = [0u8; 100]; diff --git a/src/test_helpers/tcp.rs b/src/test_helpers/tcp.rs index 77f7cfc2e2..0ec124cde6 100644 --- a/src/test_helpers/tcp.rs +++ b/src/test_helpers/tcp.rs @@ -31,7 +31,9 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::time::Instant; use tracing::{debug, error, info, trace}; +use crate::baggage::baggage_header_val; use crate::hyper_util::TokioExecutor; +use crate::proxy::BAGGAGE_HEADER; use crate::{identity, tls}; #[derive(Copy, Clone, Debug)] @@ -247,7 +249,7 @@ impl HboneTestServer { &identity::Identity::Spiffe { trust_domain: "cluster.local".into(), namespace: "default".into(), - service_account: self.name.into(), + service_account: self.name.clone().into(), } .into(), Duration::from_secs(0), @@ -256,13 +258,16 @@ impl HboneTestServer { let acceptor = tls::mock::MockServerCertProvider::new(certs); let mut tls_stream = crate::hyper_util::tls_server(acceptor, self.listener); let mode = self.mode; + let name = self.name.clone(); while let Some(socket) = tls_stream.next().await { let waypoint_message = self.waypoint_message.clone(); + let name = name.clone(); if let Err(err) = http2::Builder::new(TokioExecutor) .serve_connection( TokioIo::new(socket), service_fn(move |req| { let waypoint_message = waypoint_message.clone(); + let name = name.clone(); async move { info!("waypoint: received request"); tokio::task::spawn(async move { @@ -277,7 +282,23 @@ impl HboneTestServer { Err(e) => error!("No upgrade {e}"), } }); - Ok::<_, Infallible>(Response::new(Full::::from("streaming..."))) + let mut resp = Response::new(Full::::from("streaming...")); + resp.headers_mut().insert( + BAGGAGE_HEADER, + baggage_header_val( + "Kubernetes", + "default", + "deployment", + &name, + &name, + "v1", + "r1", + "z1", + ) + .parse() + .expect("valid baggage header"), + ); + Ok::<_, Infallible>(resp) } }), ) diff --git a/src/tls/certificate.rs b/src/tls/certificate.rs index a09666c871..24749309ed 100644 --- a/src/tls/certificate.rs +++ b/src/tls/certificate.rs @@ -23,7 +23,7 @@ use rustls::client::Resumption; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; use rustls::server::WebPkiClientVerifier; -use rustls::{ClientConfig, RootCertStore, ServerConfig, server}; +use rustls::{ClientConfig, CommonState, RootCertStore, ServerConfig}; use rustls_pemfile::Item; use std::io::Cursor; use std::str::FromStr; @@ -60,7 +60,7 @@ pub struct WorkloadCertificate { pub roots: Vec, } -pub fn identity_from_connection(conn: &server::ServerConnection) -> Option { +pub fn identity_from_connection(conn: &CommonState) -> Option { use x509_parser::prelude::*; conn.peer_certificates() .and_then(|certs| certs.first()) diff --git a/tests/namespaced.rs b/tests/namespaced.rs index c94badd612..ebef445731 100644 --- a/tests/namespaced.rs +++ b/tests/namespaced.rs @@ -296,7 +296,7 @@ mod namespaced { let want = HashMap::from([ ("scope", "access"), ("src.workload", "client"), - ("dst.workload", "actual-ew-gtw"), + ("dst.workload", "echo"), ("dst.hbone_addr", "remote.default.svc.cluster.local:8080"), ("dst.addr", &dst_addr), ("bytes_sent", &sent), @@ -309,7 +309,7 @@ mod namespaced { ), ( "dst.identity", - "spiffe://cluster.local/ns/default/sa/actual-ew-gtw", + "spiffe://cluster.local/ns/default/sa/echo", ), ]); telemetry::testing::assert_contains(want); From 3b30c184f3a0e5d60883648af0a92e27bd840aa1 Mon Sep 17 00:00:00 2001 From: Keith Mattix II Date: Tue, 27 Jan 2026 00:21:19 +0000 Subject: [PATCH 3/7] Cargo fmt Signed-off-by: Keith Mattix II --- tests/namespaced.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/namespaced.rs b/tests/namespaced.rs index ebef445731..43375e00d7 100644 --- a/tests/namespaced.rs +++ b/tests/namespaced.rs @@ -307,10 +307,7 @@ mod namespaced { "src.identity", "spiffe://cluster.local/ns/default/sa/client", ), - ( - "dst.identity", - "spiffe://cluster.local/ns/default/sa/echo", - ), + ("dst.identity", "spiffe://cluster.local/ns/default/sa/echo"), ]); telemetry::testing::assert_contains(want); Ok(()) From 2cabee40dc4a7c6a3cf94bebf509ce54e3f8cb26 Mon Sep 17 00:00:00 2001 From: Keith Mattix II Date: Tue, 27 Jan 2026 02:41:46 +0000 Subject: [PATCH 4/7] Remove experimental codeowners Signed-off-by: Keith Mattix II --- CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CODEOWNERS b/CODEOWNERS index 648a37ee9c..9aae9ebf0e 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -1,4 +1,4 @@ -* @istio/wg-networking-maintainers-ztunnel @Stevenjin8 @keithmattix @grnmeira @krinkinmu +* @istio/wg-networking-maintainers-ztunnel /Makefile* @istio/wg-test-and-release-maintainers /*.md @istio/wg-test-and-release-maintainers /common/ @istio/wg-test-and-release-maintainers From 3437b1e8d7dda64f4e8415ec1a3d89503ee5f7e5 Mon Sep 17 00:00:00 2001 From: "Krinkin, Mike" Date: Wed, 28 Jan 2026 13:16:37 +0000 Subject: [PATCH 5/7] Address review comments (#1738) --- src/baggage.rs | 85 ++++++++++++++++++++++++++++++++--------- src/proxy/inbound.rs | 19 +++------ src/proxy/outbound.rs | 15 ++------ src/state/workload.rs | 14 +++++++ src/test_helpers/tcp.rs | 27 ++++++------- 5 files changed, 104 insertions(+), 56 deletions(-) diff --git a/src/baggage.rs b/src/baggage.rs index a5591338eb..d70b729fb4 100644 --- a/src/baggage.rs +++ b/src/baggage.rs @@ -18,7 +18,7 @@ use hyper::{ http::HeaderValue, }; -#[derive(Default)] +#[derive(Debug, Default, PartialEq, Eq)] pub struct Baggage { pub cluster_id: Option, pub namespace: Option, @@ -29,20 +29,37 @@ pub struct Baggage { pub zone: Option, } -#[allow(clippy::too_many_arguments)] -pub fn baggage_header_val( - cluster: &str, - namespace: &str, - workload_type: &str, - workload_name: &str, - name: &str, - version: &str, - region: &str, - zone: &str, -) -> String { - format!( - "k8s.cluster.name={cluster},k8s.namespace.name={namespace},k8s.{workload_type}.name={workload_name},service.name={name},service.version={version},cloud.region={region},cloud.availability_zone={zone}", - ) +pub fn baggage_header_val(baggage: &Baggage, workload_type: &str) -> String { + let mut items = Vec::new(); + baggage + .cluster_id + .as_ref() + .inspect(|cluster| items.push(format!("k8s.cluster.name={cluster}"))); + baggage + .namespace + .as_ref() + .inspect(|namespace| items.push(format!("k8s.namespace.name={namespace}"))); + baggage + .workload_name + .as_ref() + .inspect(|workload_name| items.push(format!("k8s.{workload_type}.name={workload_name}"))); + baggage + .service_name + .as_ref() + .inspect(|service_name| items.push(format!("service.name={service_name}"))); + baggage + .revision + .as_ref() + .inspect(|revision| items.push(format!("service.version={revision}"))); + baggage + .region + .as_ref() + .inspect(|region| items.push(format!("cloud.region={region}"))); + baggage + .zone + .as_ref() + .inspect(|zone| items.push(format!("cloud.availability_zone={zone}"))); + items.join(",") } pub fn parse_baggage_header(headers: GetAll) -> Result { @@ -83,8 +100,9 @@ pub mod tests { use hyper::{HeaderMap, http::HeaderValue}; use crate::proxy::BAGGAGE_HEADER; + use crate::strng::Strng; - use super::parse_baggage_header; + use super::{Baggage, baggage_header_val, parse_baggage_header}; #[test] fn baggage_parser() -> anyhow::Result<()> { @@ -106,8 +124,8 @@ pub mod tests { let mut hm = HeaderMap::new(); let baggage_str = "k8s.cluster.name=,k8s.namespace.name=,k8s.deployment.name=,service.name=,service.version="; let header_value = HeaderValue::from_str(baggage_str)?; - let baggage = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; hm.append(BAGGAGE_HEADER, header_value); + let baggage = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; assert_eq!(baggage.cluster_id, None); assert_eq!(baggage.namespace, None); assert_eq!(baggage.workload_name, None); @@ -152,4 +170,37 @@ pub mod tests { assert_eq!(baggage.revision, None); Ok(()) } + + #[test] + fn baggage_header_val_can_be_parsed() -> anyhow::Result<()> { + { + let baggage = Baggage { + ..Default::default() + }; + let mut hm = HeaderMap::new(); + hm.append( + BAGGAGE_HEADER, + HeaderValue::from_str(&baggage_header_val(&baggage, "deployment"))?, + ); + let parsed = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; + assert_eq!(baggage, parsed); + } + { + let baggage = Baggage { + cluster_id: Some(Strng::from("cluster")), + namespace: Some(Strng::from("default")), + workload_name: Some(Strng::from("workload")), + service_name: Some(Strng::from("service")), + ..Default::default() + }; + let mut hm = HeaderMap::new(); + hm.append( + BAGGAGE_HEADER, + HeaderValue::from_str(&baggage_header_val(&baggage, "deployment"))?, + ); + let parsed = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; + assert_eq!(baggage, parsed); + } + Ok(()) + } } diff --git a/src/proxy/inbound.rs b/src/proxy/inbound.rs index 1d1824eeac..dcb81e5593 100644 --- a/src/proxy/inbound.rs +++ b/src/proxy/inbound.rs @@ -313,7 +313,7 @@ impl Inbound { let send = req .send_response(build_response( StatusCode::OK, - Some((ri.destination_workload.as_ref(), pi.cfg.cluster_id.as_str())), + Some(ri.destination_workload.as_ref()), )) .and_then(|h2_stream| async { if let Some(TunnelRequest { @@ -453,7 +453,7 @@ impl Inbound { ConnectionOpen { reporter: Reporter::destination, source, - derived_source: Some(derived_source.clone()), + derived_source: Some(derived_source), destination: Some(destination_workload.clone()), connection_security_policy: metrics::SecurityPolicy::mutual_tls, destination_service: ds, @@ -739,22 +739,13 @@ pub fn parse_forwarded_host(req: &T) -> Option { } // Second argument is local workload and cluster name -fn build_response(status: StatusCode, local_wl: Option<(&Workload, &str)>) -> Response<()> { +fn build_response(status: StatusCode, local_wl: Option<&Workload>) -> Response<()> { let mut builder = Response::builder().status(status); - if let Some((local_wl, cluster)) = local_wl { + if let Some(local_wl) = local_wl { builder = builder.header( BAGGAGE_HEADER, - baggage_header_val( - cluster, - &local_wl.namespace, - &local_wl.workload_type, - &local_wl.workload_name, - &local_wl.canonical_name, - &local_wl.canonical_revision, - &local_wl.locality.region, - &local_wl.locality.zone, - ), + baggage_header_val(&local_wl.baggage(), &local_wl.workload_type), ) } diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index b6d97e5b12..9a24d67a11 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -353,7 +353,7 @@ impl OutboundConnection { ) .method(hyper::Method::CONNECT) .version(hyper::Version::HTTP_2) - .header(BAGGAGE_HEADER, baggage(req, self.pi.cfg.cluster_id.clone())) + .header(BAGGAGE_HEADER, baggage(req)) .header( FORWARDED, build_forwarded(remote_addr, &req.intended_destination_service), @@ -728,17 +728,8 @@ fn build_forwarded(remote_addr: SocketAddr, server: &Option) } } -fn baggage(r: &Request, cluster: String) -> String { - baggage::baggage_header_val( - &cluster, - &r.source.namespace, - &r.source.workload_type, - &r.source.workload_name, - &r.source.canonical_name, - &r.source.canonical_revision, - &r.source.locality.region, - &r.source.locality.zone, - ) +fn baggage(r: &Request) -> String { + baggage::baggage_header_val(&r.source.baggage(), &r.source.workload_type) } #[derive(Debug)] diff --git a/src/state/workload.rs b/src/state/workload.rs index 15c56979e3..a2aaad86dc 100644 --- a/src/state/workload.rs +++ b/src/state/workload.rs @@ -14,6 +14,7 @@ use crate::identity::Identity; +use crate::baggage::Baggage; use crate::state::WorkloadInfo; use crate::strng::Strng; use crate::xds::istio::workload::{Port, PortList}; @@ -301,6 +302,19 @@ impl Workload { service_account: self.service_account.clone(), } } + + pub fn baggage(&self) -> Baggage { + Baggage { + cluster_id: (!self.cluster_id.is_empty()).then_some(self.cluster_id.clone()), + namespace: (!self.namespace.is_empty()).then_some(self.namespace.clone()), + workload_name: (!self.workload_name.is_empty()).then_some(self.workload_name.clone()), + service_name: (!self.canonical_name.is_empty()).then_some(self.canonical_name.clone()), + revision: (!self.canonical_revision.is_empty()) + .then_some(self.canonical_revision.clone()), + region: (!self.locality.region.is_empty()).then_some(self.locality.region.clone()), + zone: (!self.locality.zone.is_empty()).then_some(self.locality.zone.clone()), + } + } } impl fmt::Display for Workload { diff --git a/src/test_helpers/tcp.rs b/src/test_helpers/tcp.rs index 0ec124cde6..6af6f3a155 100644 --- a/src/test_helpers/tcp.rs +++ b/src/test_helpers/tcp.rs @@ -31,9 +31,10 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::time::Instant; use tracing::{debug, error, info, trace}; -use crate::baggage::baggage_header_val; +use crate::baggage::{Baggage, baggage_header_val}; use crate::hyper_util::TokioExecutor; use crate::proxy::BAGGAGE_HEADER; +use crate::strng::Strng; use crate::{identity, tls}; #[derive(Copy, Clone, Debug)] @@ -283,20 +284,20 @@ impl HboneTestServer { } }); let mut resp = Response::new(Full::::from("streaming...")); + let baggage = Baggage { + cluster_id: Some(Strng::from("Kubernetes")), + namespace: Some(Strng::from("default")), + workload_name: Some(Strng::from(&name)), + service_name: Some(Strng::from(&name)), + revision: Some(Strng::from("v1")), + region: Some(Strng::from("r1")), + zone: Some(Strng::from("z1")), + }; resp.headers_mut().insert( BAGGAGE_HEADER, - baggage_header_val( - "Kubernetes", - "default", - "deployment", - &name, - &name, - "v1", - "r1", - "z1", - ) - .parse() - .expect("valid baggage header"), + baggage_header_val(&baggage, "deployment") + .parse() + .expect("valid baggage header"), ); Ok::<_, Infallible>(resp) } From 08f5bd418b768db5d621296cca2c53bd177305da Mon Sep 17 00:00:00 2001 From: Keith Mattix II Date: Wed, 28 Jan 2026 19:11:51 +0000 Subject: [PATCH 6/7] Address PR comments Signed-off-by: Keith Mattix II --- src/baggage.rs | 43 +++++--------- src/config.rs | 3 + src/proxy/inbound.rs | 126 +++++++++++++++++++++++++++++++++++------- src/proxy/outbound.rs | 2 +- 4 files changed, 123 insertions(+), 51 deletions(-) diff --git a/src/baggage.rs b/src/baggage.rs index d70b729fb4..c4d4ea2f98 100644 --- a/src/baggage.rs +++ b/src/baggage.rs @@ -30,36 +30,19 @@ pub struct Baggage { } pub fn baggage_header_val(baggage: &Baggage, workload_type: &str) -> String { - let mut items = Vec::new(); - baggage - .cluster_id - .as_ref() - .inspect(|cluster| items.push(format!("k8s.cluster.name={cluster}"))); - baggage - .namespace - .as_ref() - .inspect(|namespace| items.push(format!("k8s.namespace.name={namespace}"))); - baggage - .workload_name - .as_ref() - .inspect(|workload_name| items.push(format!("k8s.{workload_type}.name={workload_name}"))); - baggage - .service_name - .as_ref() - .inspect(|service_name| items.push(format!("service.name={service_name}"))); - baggage - .revision - .as_ref() - .inspect(|revision| items.push(format!("service.version={revision}"))); - baggage - .region - .as_ref() - .inspect(|region| items.push(format!("cloud.region={region}"))); - baggage - .zone - .as_ref() - .inspect(|zone| items.push(format!("cloud.availability_zone={zone}"))); - items.join(",") + [ + baggage.cluster_id.as_ref().map(|cluster| format!("k8s.cluster.name={cluster}")), + baggage.namespace.as_ref().map(|namespace| format!("k8s.namespace.name={namespace}")), + baggage.workload_name.as_ref().map(|workload| format!("k8s.{workload_type}.name={workload}")), + baggage.service_name.as_ref().map(|service| format!("service.name={service}")), + baggage.revision.as_ref().map(|revision| format!("service.version={revision}")), + baggage.region.as_ref().map(|region| format!("cloud.region={region}")), + baggage.zone.as_ref().map(|zone| format!("cloud.availability_zone={zone}")), + ] + .into_iter() + .flatten() + .collect::>() + .join(",") } pub fn parse_baggage_header(headers: GetAll) -> Result { diff --git a/src/config.rs b/src/config.rs index 926355d360..01033d438e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -115,6 +115,7 @@ const PROXY_MODE_DEDICATED: &str = "dedicated"; const PROXY_MODE_SHARED: &str = "shared"; const LOCALHOST_APP_TUNNEL: &str = "LOCALHOST_APP_TUNNEL"; +const ENABLE_ENHANCED_BAGGAGE: &str = "ENABLE_RESPONSE_BAGGAGE"; #[derive(serde::Serialize, Clone, Debug, PartialEq, Eq)] pub enum RootCert { @@ -316,6 +317,7 @@ pub struct Config { // path to CRL file; if set, enables CRL checking pub crl_path: Option, + pub enable_enhanced_baggage: bool, } #[derive(serde::Serialize, Clone, Copy, Debug)] @@ -875,6 +877,7 @@ pub fn construct_config(pc: ProxyConfig) -> Result { .ok() .filter(|s| !s.is_empty()) .map(PathBuf::from), + enable_enhanced_baggage: parse_default(ENABLE_ENHANCED_BAGGAGE, true)?, }) } diff --git a/src/proxy/inbound.rs b/src/proxy/inbound.rs index dcb81e5593..6e09cab6df 100644 --- a/src/proxy/inbound.rs +++ b/src/proxy/inbound.rs @@ -220,7 +220,7 @@ impl Inbound { // At this point in processing, we never built up full context to log a complete access log. // Instead, just log a minimal error line. metrics::log_early_deny(src, dst, Reporter::destination, e); - if let Err(err) = req.send_error(build_response(code, None)) { + if let Err(err) = req.send_error(build_response(code, None, pi.cfg.enable_enhanced_baggage)) { tracing::warn!("failed to send HTTP response: {err}"); } return; @@ -294,7 +294,7 @@ impl Inbound { Ok(res) => res, Err(InboundFlagError(err, flag, code)) => { ri.result_tracker.record_with_flag(Err(err), flag); - if let Err(err) = req.send_error(build_response(code, None)) { + if let Err(err) = req.send_error(build_response(code, None, pi.cfg.enable_enhanced_baggage)) { tracing::warn!("failed to send HTTP response: {err}"); } return; @@ -314,6 +314,7 @@ impl Inbound { .send_response(build_response( StatusCode::OK, Some(ri.destination_workload.as_ref()), + pi.cfg.enable_enhanced_baggage, )) .and_then(|h2_stream| async { if let Some(TunnelRequest { @@ -395,8 +396,11 @@ impl Inbound { }; let for_host = parse_forwarded_host(req); - let baggage = - parse_baggage_header(req.headers().get_all(BAGGAGE_HEADER)).unwrap_or_default(); + let baggage = if pi.cfg.enable_enhanced_baggage { + parse_baggage_header(req.headers().get_all(BAGGAGE_HEADER)).unwrap_or_default() + } else { + Default::default() + }; // We assume it is from gateway if it's a hostname request. // We may need a more explicit indicator in the future. @@ -427,15 +431,22 @@ impl Inbound { } }; - let derived_source = metrics::DerivedWorkload { - identity: rbac_ctx.conn.src_identity.clone(), - cluster_id: baggage.cluster_id, - region: baggage.region, - zone: baggage.zone, - namespace: baggage.namespace, - app: baggage.service_name, - workload_name: baggage.workload_name, - revision: baggage.revision, + let derived_source = if pi.cfg.enable_enhanced_baggage { + metrics::DerivedWorkload { + identity: rbac_ctx.conn.src_identity.clone(), + cluster_id: baggage.cluster_id, + region: baggage.region, + zone: baggage.zone, + namespace: baggage.namespace, + app: baggage.service_name, + workload_name: baggage.workload_name, + revision: baggage.revision, + } + } else { + metrics::DerivedWorkload { + identity: rbac_ctx.conn.src_identity.clone(), + ..Default::default() + } }; let ds = proxy::guess_inbound_service( &rbac_ctx.conn, @@ -739,15 +750,16 @@ pub fn parse_forwarded_host(req: &T) -> Option { } // Second argument is local workload and cluster name -fn build_response(status: StatusCode, local_wl: Option<&Workload>) -> Response<()> { +fn build_response(status: StatusCode, local_wl: Option<&Workload>, enable_response_baggage: bool) -> Response<()> { let mut builder = Response::builder().status(status); - if let Some(local_wl) = local_wl { - builder = builder.header( - BAGGAGE_HEADER, - baggage_header_val(&local_wl.baggage(), &local_wl.workload_type), - ) - } + if let Some(local_wl) = local_wl + && enable_response_baggage { + builder = builder.header( + BAGGAGE_HEADER, + baggage_header_val(&local_wl.baggage(), &local_wl.workload_type), + ) + } builder .body(()) @@ -1093,4 +1105,78 @@ mod tests { }) } } + + #[test] + fn test_build_response_baggage_feature_gate() { + use http::StatusCode; + use crate::proxy::BAGGAGE_HEADER; + use crate::test_helpers; + use super::build_response; + + // Create a test workload + let workload = test_helpers::test_default_workload(); + + // Test with baggage enabled + let mut config_enabled = test_helpers::test_config(); + config_enabled.enable_enhanced_baggage = true; + + let response_enabled = build_response(StatusCode::OK, Some(&workload), config_enabled.enable_enhanced_baggage); + assert!(response_enabled.headers().contains_key(BAGGAGE_HEADER)); + + let baggage_header = response_enabled.headers().get(BAGGAGE_HEADER).unwrap(); + let baggage_value = baggage_header.to_str().unwrap(); + // Check that baggage header contains cluster_id from the test workload + assert!(baggage_value.contains("k8s.cluster.name=Kubernetes")); + + // Test with baggage disabled + let mut config_disabled = test_helpers::test_config(); + config_disabled.enable_enhanced_baggage = false; + + let response_disabled = build_response(StatusCode::OK, Some(&workload), config_disabled.enable_enhanced_baggage); + assert!(!response_disabled.headers().contains_key(BAGGAGE_HEADER)); + + // Test with None workload (should not have baggage regardless of config) + let response_no_workload = build_response(StatusCode::OK, None, config_enabled.enable_enhanced_baggage); + assert!(!response_no_workload.headers().contains_key(BAGGAGE_HEADER)); + } + + #[test] + fn test_incoming_baggage_parsing_feature_gate() { + use http::{HeaderMap, HeaderValue}; + use crate::proxy::BAGGAGE_HEADER; + use crate::baggage::{parse_baggage_header, Baggage}; + use crate::test_helpers; + + // Create mock baggage header + let mut headers = HeaderMap::new(); + headers.insert(BAGGAGE_HEADER, HeaderValue::from_str("k8s.cluster.name=test-cluster,k8s.namespace.name=test-ns,k8s.deployment.name=test-app").unwrap()); + + // Test with baggage enabled + let config_enabled = test_helpers::test_config(); + assert!(config_enabled.enable_enhanced_baggage); // Default should be true + + let baggage_enabled = if config_enabled.enable_enhanced_baggage { + parse_baggage_header(headers.get_all(BAGGAGE_HEADER)).unwrap_or_default() + } else { + Baggage::default() + }; + + assert_eq!(baggage_enabled.cluster_id, Some("test-cluster".into())); + assert_eq!(baggage_enabled.namespace, Some("test-ns".into())); + assert_eq!(baggage_enabled.workload_name, Some("test-app".into())); + + // Test with baggage disabled + let mut config_disabled = test_helpers::test_config(); + config_disabled.enable_enhanced_baggage = false; + + let baggage_disabled = if config_disabled.enable_enhanced_baggage { + parse_baggage_header(headers.get_all(BAGGAGE_HEADER)).unwrap_or_default() + } else { + Baggage::default() + }; + + assert_eq!(baggage_disabled.cluster_id, None); + assert_eq!(baggage_disabled.namespace, None); + assert_eq!(baggage_disabled.workload_name, None); + } } diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index 9a24d67a11..644b6c132b 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -298,7 +298,7 @@ impl OutboundConnection { match res { Err(e) => { - let connection_stats = Box::new(connection_stats_builder.build()); + let connection_stats = connection_stats_builder.build(); connection_stats.record(Err(e)); } Ok((derived_workload, drain_tx, inner_upgraded)) => { From 46f596ac40b531fdd15935bd5390a42c2c602418 Mon Sep 17 00:00:00 2001 From: Keith Mattix II Date: Wed, 28 Jan 2026 19:17:46 +0000 Subject: [PATCH 7/7] Cargo fmt Signed-off-by: Keith Mattix II --- src/baggage.rs | 35 ++++++++++++++++++++++++------- src/proxy/inbound.rs | 50 ++++++++++++++++++++++++++++++-------------- 2 files changed, 62 insertions(+), 23 deletions(-) diff --git a/src/baggage.rs b/src/baggage.rs index c4d4ea2f98..33d6c2b554 100644 --- a/src/baggage.rs +++ b/src/baggage.rs @@ -31,13 +31,34 @@ pub struct Baggage { pub fn baggage_header_val(baggage: &Baggage, workload_type: &str) -> String { [ - baggage.cluster_id.as_ref().map(|cluster| format!("k8s.cluster.name={cluster}")), - baggage.namespace.as_ref().map(|namespace| format!("k8s.namespace.name={namespace}")), - baggage.workload_name.as_ref().map(|workload| format!("k8s.{workload_type}.name={workload}")), - baggage.service_name.as_ref().map(|service| format!("service.name={service}")), - baggage.revision.as_ref().map(|revision| format!("service.version={revision}")), - baggage.region.as_ref().map(|region| format!("cloud.region={region}")), - baggage.zone.as_ref().map(|zone| format!("cloud.availability_zone={zone}")), + baggage + .cluster_id + .as_ref() + .map(|cluster| format!("k8s.cluster.name={cluster}")), + baggage + .namespace + .as_ref() + .map(|namespace| format!("k8s.namespace.name={namespace}")), + baggage + .workload_name + .as_ref() + .map(|workload| format!("k8s.{workload_type}.name={workload}")), + baggage + .service_name + .as_ref() + .map(|service| format!("service.name={service}")), + baggage + .revision + .as_ref() + .map(|revision| format!("service.version={revision}")), + baggage + .region + .as_ref() + .map(|region| format!("cloud.region={region}")), + baggage + .zone + .as_ref() + .map(|zone| format!("cloud.availability_zone={zone}")), ] .into_iter() .flatten() diff --git a/src/proxy/inbound.rs b/src/proxy/inbound.rs index 6e09cab6df..a4f6255eb4 100644 --- a/src/proxy/inbound.rs +++ b/src/proxy/inbound.rs @@ -220,7 +220,9 @@ impl Inbound { // At this point in processing, we never built up full context to log a complete access log. // Instead, just log a minimal error line. metrics::log_early_deny(src, dst, Reporter::destination, e); - if let Err(err) = req.send_error(build_response(code, None, pi.cfg.enable_enhanced_baggage)) { + if let Err(err) = + req.send_error(build_response(code, None, pi.cfg.enable_enhanced_baggage)) + { tracing::warn!("failed to send HTTP response: {err}"); } return; @@ -294,7 +296,9 @@ impl Inbound { Ok(res) => res, Err(InboundFlagError(err, flag, code)) => { ri.result_tracker.record_with_flag(Err(err), flag); - if let Err(err) = req.send_error(build_response(code, None, pi.cfg.enable_enhanced_baggage)) { + if let Err(err) = + req.send_error(build_response(code, None, pi.cfg.enable_enhanced_baggage)) + { tracing::warn!("failed to send HTTP response: {err}"); } return; @@ -750,16 +754,21 @@ pub fn parse_forwarded_host(req: &T) -> Option { } // Second argument is local workload and cluster name -fn build_response(status: StatusCode, local_wl: Option<&Workload>, enable_response_baggage: bool) -> Response<()> { +fn build_response( + status: StatusCode, + local_wl: Option<&Workload>, + enable_response_baggage: bool, +) -> Response<()> { let mut builder = Response::builder().status(status); if let Some(local_wl) = local_wl - && enable_response_baggage { - builder = builder.header( - BAGGAGE_HEADER, - baggage_header_val(&local_wl.baggage(), &local_wl.workload_type), - ) - } + && enable_response_baggage + { + builder = builder.header( + BAGGAGE_HEADER, + baggage_header_val(&local_wl.baggage(), &local_wl.workload_type), + ) + } builder .body(()) @@ -1108,10 +1117,10 @@ mod tests { #[test] fn test_build_response_baggage_feature_gate() { - use http::StatusCode; + use super::build_response; use crate::proxy::BAGGAGE_HEADER; use crate::test_helpers; - use super::build_response; + use http::StatusCode; // Create a test workload let workload = test_helpers::test_default_workload(); @@ -1120,7 +1129,11 @@ mod tests { let mut config_enabled = test_helpers::test_config(); config_enabled.enable_enhanced_baggage = true; - let response_enabled = build_response(StatusCode::OK, Some(&workload), config_enabled.enable_enhanced_baggage); + let response_enabled = build_response( + StatusCode::OK, + Some(&workload), + config_enabled.enable_enhanced_baggage, + ); assert!(response_enabled.headers().contains_key(BAGGAGE_HEADER)); let baggage_header = response_enabled.headers().get(BAGGAGE_HEADER).unwrap(); @@ -1132,20 +1145,25 @@ mod tests { let mut config_disabled = test_helpers::test_config(); config_disabled.enable_enhanced_baggage = false; - let response_disabled = build_response(StatusCode::OK, Some(&workload), config_disabled.enable_enhanced_baggage); + let response_disabled = build_response( + StatusCode::OK, + Some(&workload), + config_disabled.enable_enhanced_baggage, + ); assert!(!response_disabled.headers().contains_key(BAGGAGE_HEADER)); // Test with None workload (should not have baggage regardless of config) - let response_no_workload = build_response(StatusCode::OK, None, config_enabled.enable_enhanced_baggage); + let response_no_workload = + build_response(StatusCode::OK, None, config_enabled.enable_enhanced_baggage); assert!(!response_no_workload.headers().contains_key(BAGGAGE_HEADER)); } #[test] fn test_incoming_baggage_parsing_feature_gate() { - use http::{HeaderMap, HeaderValue}; + use crate::baggage::{Baggage, parse_baggage_header}; use crate::proxy::BAGGAGE_HEADER; - use crate::baggage::{parse_baggage_header, Baggage}; use crate::test_helpers; + use http::{HeaderMap, HeaderValue}; // Create mock baggage header let mut headers = HeaderMap::new();