From 70a10e15201ab5eaa1767e620455e598500179fc Mon Sep 17 00:00:00 2001 From: Mikhail Krinkin Date: Wed, 28 Jan 2026 12:44:28 +0000 Subject: [PATCH] Address review comments The fixes include: 1. Rework of the baggage_header_val to take Baggage structure and workload type as input 2. Adding some tests to make sure that the baggage value we generate can be parsed by our code correctly 3. Fix one of the tests for baggage parsing function Other than that, I reworked the code to not include items with empty values into baggage string. And finally, in a few places we took cluster from the proxy config when we already had workload available - I removed that and now cluster always comes from the workload structure. Signed-off-by: Mikhail Krinkin --- src/baggage.rs | 85 ++++++++++++++++++++++++++++++++--------- src/proxy/inbound.rs | 19 +++------ src/proxy/outbound.rs | 15 ++------ src/state/workload.rs | 14 +++++++ src/test_helpers/tcp.rs | 27 ++++++------- 5 files changed, 104 insertions(+), 56 deletions(-) diff --git a/src/baggage.rs b/src/baggage.rs index a5591338eb..d70b729fb4 100644 --- a/src/baggage.rs +++ b/src/baggage.rs @@ -18,7 +18,7 @@ use hyper::{ http::HeaderValue, }; -#[derive(Default)] +#[derive(Debug, Default, PartialEq, Eq)] pub struct Baggage { pub cluster_id: Option, pub namespace: Option, @@ -29,20 +29,37 @@ pub struct Baggage { pub zone: Option, } -#[allow(clippy::too_many_arguments)] -pub fn baggage_header_val( - cluster: &str, - namespace: &str, - workload_type: &str, - workload_name: &str, - name: &str, - version: &str, - region: &str, - zone: &str, -) -> String { - format!( - "k8s.cluster.name={cluster},k8s.namespace.name={namespace},k8s.{workload_type}.name={workload_name},service.name={name},service.version={version},cloud.region={region},cloud.availability_zone={zone}", - ) +pub fn baggage_header_val(baggage: &Baggage, workload_type: &str) -> String { + let mut items = Vec::new(); + baggage + .cluster_id + .as_ref() + .inspect(|cluster| items.push(format!("k8s.cluster.name={cluster}"))); + baggage + .namespace + .as_ref() + .inspect(|namespace| items.push(format!("k8s.namespace.name={namespace}"))); + baggage + .workload_name + .as_ref() + .inspect(|workload_name| items.push(format!("k8s.{workload_type}.name={workload_name}"))); + baggage + .service_name + .as_ref() + .inspect(|service_name| items.push(format!("service.name={service_name}"))); + baggage + .revision + .as_ref() + .inspect(|revision| items.push(format!("service.version={revision}"))); + baggage + .region + .as_ref() + .inspect(|region| items.push(format!("cloud.region={region}"))); + baggage + .zone + .as_ref() + .inspect(|zone| items.push(format!("cloud.availability_zone={zone}"))); + items.join(",") } pub fn parse_baggage_header(headers: GetAll) -> Result { @@ -83,8 +100,9 @@ pub mod tests { use hyper::{HeaderMap, http::HeaderValue}; use crate::proxy::BAGGAGE_HEADER; + use crate::strng::Strng; - use super::parse_baggage_header; + use super::{Baggage, baggage_header_val, parse_baggage_header}; #[test] fn baggage_parser() -> anyhow::Result<()> { @@ -106,8 +124,8 @@ pub mod tests { let mut hm = HeaderMap::new(); let baggage_str = "k8s.cluster.name=,k8s.namespace.name=,k8s.deployment.name=,service.name=,service.version="; let header_value = HeaderValue::from_str(baggage_str)?; - let baggage = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; hm.append(BAGGAGE_HEADER, header_value); + let baggage = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; assert_eq!(baggage.cluster_id, None); assert_eq!(baggage.namespace, None); assert_eq!(baggage.workload_name, None); @@ -152,4 +170,37 @@ pub mod tests { assert_eq!(baggage.revision, None); Ok(()) } + + #[test] + fn baggage_header_val_can_be_parsed() -> anyhow::Result<()> { + { + let baggage = Baggage { + ..Default::default() + }; + let mut hm = HeaderMap::new(); + hm.append( + BAGGAGE_HEADER, + HeaderValue::from_str(&baggage_header_val(&baggage, "deployment"))?, + ); + let parsed = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; + assert_eq!(baggage, parsed); + } + { + let baggage = Baggage { + cluster_id: Some(Strng::from("cluster")), + namespace: Some(Strng::from("default")), + workload_name: Some(Strng::from("workload")), + service_name: Some(Strng::from("service")), + ..Default::default() + }; + let mut hm = HeaderMap::new(); + hm.append( + BAGGAGE_HEADER, + HeaderValue::from_str(&baggage_header_val(&baggage, "deployment"))?, + ); + let parsed = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; + assert_eq!(baggage, parsed); + } + Ok(()) + } } diff --git a/src/proxy/inbound.rs b/src/proxy/inbound.rs index 1d1824eeac..dcb81e5593 100644 --- a/src/proxy/inbound.rs +++ b/src/proxy/inbound.rs @@ -313,7 +313,7 @@ impl Inbound { let send = req .send_response(build_response( StatusCode::OK, - Some((ri.destination_workload.as_ref(), pi.cfg.cluster_id.as_str())), + Some(ri.destination_workload.as_ref()), )) .and_then(|h2_stream| async { if let Some(TunnelRequest { @@ -453,7 +453,7 @@ impl Inbound { ConnectionOpen { reporter: Reporter::destination, source, - derived_source: Some(derived_source.clone()), + derived_source: Some(derived_source), destination: Some(destination_workload.clone()), connection_security_policy: metrics::SecurityPolicy::mutual_tls, destination_service: ds, @@ -739,22 +739,13 @@ pub fn parse_forwarded_host(req: &T) -> Option { } // Second argument is local workload and cluster name -fn build_response(status: StatusCode, local_wl: Option<(&Workload, &str)>) -> Response<()> { +fn build_response(status: StatusCode, local_wl: Option<&Workload>) -> Response<()> { let mut builder = Response::builder().status(status); - if let Some((local_wl, cluster)) = local_wl { + if let Some(local_wl) = local_wl { builder = builder.header( BAGGAGE_HEADER, - baggage_header_val( - cluster, - &local_wl.namespace, - &local_wl.workload_type, - &local_wl.workload_name, - &local_wl.canonical_name, - &local_wl.canonical_revision, - &local_wl.locality.region, - &local_wl.locality.zone, - ), + baggage_header_val(&local_wl.baggage(), &local_wl.workload_type), ) } diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index b6d97e5b12..9a24d67a11 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -353,7 +353,7 @@ impl OutboundConnection { ) .method(hyper::Method::CONNECT) .version(hyper::Version::HTTP_2) - .header(BAGGAGE_HEADER, baggage(req, self.pi.cfg.cluster_id.clone())) + .header(BAGGAGE_HEADER, baggage(req)) .header( FORWARDED, build_forwarded(remote_addr, &req.intended_destination_service), @@ -728,17 +728,8 @@ fn build_forwarded(remote_addr: SocketAddr, server: &Option) } } -fn baggage(r: &Request, cluster: String) -> String { - baggage::baggage_header_val( - &cluster, - &r.source.namespace, - &r.source.workload_type, - &r.source.workload_name, - &r.source.canonical_name, - &r.source.canonical_revision, - &r.source.locality.region, - &r.source.locality.zone, - ) +fn baggage(r: &Request) -> String { + baggage::baggage_header_val(&r.source.baggage(), &r.source.workload_type) } #[derive(Debug)] diff --git a/src/state/workload.rs b/src/state/workload.rs index 15c56979e3..a2aaad86dc 100644 --- a/src/state/workload.rs +++ b/src/state/workload.rs @@ -14,6 +14,7 @@ use crate::identity::Identity; +use crate::baggage::Baggage; use crate::state::WorkloadInfo; use crate::strng::Strng; use crate::xds::istio::workload::{Port, PortList}; @@ -301,6 +302,19 @@ impl Workload { service_account: self.service_account.clone(), } } + + pub fn baggage(&self) -> Baggage { + Baggage { + cluster_id: (!self.cluster_id.is_empty()).then_some(self.cluster_id.clone()), + namespace: (!self.namespace.is_empty()).then_some(self.namespace.clone()), + workload_name: (!self.workload_name.is_empty()).then_some(self.workload_name.clone()), + service_name: (!self.canonical_name.is_empty()).then_some(self.canonical_name.clone()), + revision: (!self.canonical_revision.is_empty()) + .then_some(self.canonical_revision.clone()), + region: (!self.locality.region.is_empty()).then_some(self.locality.region.clone()), + zone: (!self.locality.zone.is_empty()).then_some(self.locality.zone.clone()), + } + } } impl fmt::Display for Workload { diff --git a/src/test_helpers/tcp.rs b/src/test_helpers/tcp.rs index 0ec124cde6..6af6f3a155 100644 --- a/src/test_helpers/tcp.rs +++ b/src/test_helpers/tcp.rs @@ -31,9 +31,10 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::time::Instant; use tracing::{debug, error, info, trace}; -use crate::baggage::baggage_header_val; +use crate::baggage::{Baggage, baggage_header_val}; use crate::hyper_util::TokioExecutor; use crate::proxy::BAGGAGE_HEADER; +use crate::strng::Strng; use crate::{identity, tls}; #[derive(Copy, Clone, Debug)] @@ -283,20 +284,20 @@ impl HboneTestServer { } }); let mut resp = Response::new(Full::::from("streaming...")); + let baggage = Baggage { + cluster_id: Some(Strng::from("Kubernetes")), + namespace: Some(Strng::from("default")), + workload_name: Some(Strng::from(&name)), + service_name: Some(Strng::from(&name)), + revision: Some(Strng::from("v1")), + region: Some(Strng::from("r1")), + zone: Some(Strng::from("z1")), + }; resp.headers_mut().insert( BAGGAGE_HEADER, - baggage_header_val( - "Kubernetes", - "default", - "deployment", - &name, - &name, - "v1", - "r1", - "z1", - ) - .parse() - .expect("valid baggage header"), + baggage_header_val(&baggage, "deployment") + .parse() + .expect("valid baggage header"), ); Ok::<_, Infallible>(resp) }