diff --git a/src/baggage.rs b/src/baggage.rs index a5591338eb..d70b729fb4 100644 --- a/src/baggage.rs +++ b/src/baggage.rs @@ -18,7 +18,7 @@ use hyper::{ http::HeaderValue, }; -#[derive(Default)] +#[derive(Debug, Default, PartialEq, Eq)] pub struct Baggage { pub cluster_id: Option, pub namespace: Option, @@ -29,20 +29,37 @@ pub struct Baggage { pub zone: Option, } -#[allow(clippy::too_many_arguments)] -pub fn baggage_header_val( - cluster: &str, - namespace: &str, - workload_type: &str, - workload_name: &str, - name: &str, - version: &str, - region: &str, - zone: &str, -) -> String { - format!( - "k8s.cluster.name={cluster},k8s.namespace.name={namespace},k8s.{workload_type}.name={workload_name},service.name={name},service.version={version},cloud.region={region},cloud.availability_zone={zone}", - ) +pub fn baggage_header_val(baggage: &Baggage, workload_type: &str) -> String { + let mut items = Vec::new(); + baggage + .cluster_id + .as_ref() + .inspect(|cluster| items.push(format!("k8s.cluster.name={cluster}"))); + baggage + .namespace + .as_ref() + .inspect(|namespace| items.push(format!("k8s.namespace.name={namespace}"))); + baggage + .workload_name + .as_ref() + .inspect(|workload_name| items.push(format!("k8s.{workload_type}.name={workload_name}"))); + baggage + .service_name + .as_ref() + .inspect(|service_name| items.push(format!("service.name={service_name}"))); + baggage + .revision + .as_ref() + .inspect(|revision| items.push(format!("service.version={revision}"))); + baggage + .region + .as_ref() + .inspect(|region| items.push(format!("cloud.region={region}"))); + baggage + .zone + .as_ref() + .inspect(|zone| items.push(format!("cloud.availability_zone={zone}"))); + items.join(",") } pub fn parse_baggage_header(headers: GetAll) -> Result { @@ -83,8 +100,9 @@ pub mod tests { use hyper::{HeaderMap, http::HeaderValue}; use crate::proxy::BAGGAGE_HEADER; + use crate::strng::Strng; - use super::parse_baggage_header; + use super::{Baggage, baggage_header_val, parse_baggage_header}; #[test] fn baggage_parser() -> anyhow::Result<()> { @@ -106,8 +124,8 @@ pub mod tests { let mut hm = HeaderMap::new(); let baggage_str = "k8s.cluster.name=,k8s.namespace.name=,k8s.deployment.name=,service.name=,service.version="; let header_value = HeaderValue::from_str(baggage_str)?; - let baggage = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; hm.append(BAGGAGE_HEADER, header_value); + let baggage = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; assert_eq!(baggage.cluster_id, None); assert_eq!(baggage.namespace, None); assert_eq!(baggage.workload_name, None); @@ -152,4 +170,37 @@ pub mod tests { assert_eq!(baggage.revision, None); Ok(()) } + + #[test] + fn baggage_header_val_can_be_parsed() -> anyhow::Result<()> { + { + let baggage = Baggage { + ..Default::default() + }; + let mut hm = HeaderMap::new(); + hm.append( + BAGGAGE_HEADER, + HeaderValue::from_str(&baggage_header_val(&baggage, "deployment"))?, + ); + let parsed = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; + assert_eq!(baggage, parsed); + } + { + let baggage = Baggage { + cluster_id: Some(Strng::from("cluster")), + namespace: Some(Strng::from("default")), + workload_name: Some(Strng::from("workload")), + service_name: Some(Strng::from("service")), + ..Default::default() + }; + let mut hm = HeaderMap::new(); + hm.append( + BAGGAGE_HEADER, + HeaderValue::from_str(&baggage_header_val(&baggage, "deployment"))?, + ); + let parsed = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?; + assert_eq!(baggage, parsed); + } + Ok(()) + } } diff --git a/src/proxy/inbound.rs b/src/proxy/inbound.rs index 1d1824eeac..dcb81e5593 100644 --- a/src/proxy/inbound.rs +++ b/src/proxy/inbound.rs @@ -313,7 +313,7 @@ impl Inbound { let send = req .send_response(build_response( StatusCode::OK, - Some((ri.destination_workload.as_ref(), pi.cfg.cluster_id.as_str())), + Some(ri.destination_workload.as_ref()), )) .and_then(|h2_stream| async { if let Some(TunnelRequest { @@ -453,7 +453,7 @@ impl Inbound { ConnectionOpen { reporter: Reporter::destination, source, - derived_source: Some(derived_source.clone()), + derived_source: Some(derived_source), destination: Some(destination_workload.clone()), connection_security_policy: metrics::SecurityPolicy::mutual_tls, destination_service: ds, @@ -739,22 +739,13 @@ pub fn parse_forwarded_host(req: &T) -> Option { } // Second argument is local workload and cluster name -fn build_response(status: StatusCode, local_wl: Option<(&Workload, &str)>) -> Response<()> { +fn build_response(status: StatusCode, local_wl: Option<&Workload>) -> Response<()> { let mut builder = Response::builder().status(status); - if let Some((local_wl, cluster)) = local_wl { + if let Some(local_wl) = local_wl { builder = builder.header( BAGGAGE_HEADER, - baggage_header_val( - cluster, - &local_wl.namespace, - &local_wl.workload_type, - &local_wl.workload_name, - &local_wl.canonical_name, - &local_wl.canonical_revision, - &local_wl.locality.region, - &local_wl.locality.zone, - ), + baggage_header_val(&local_wl.baggage(), &local_wl.workload_type), ) } diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index b6d97e5b12..9a24d67a11 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -353,7 +353,7 @@ impl OutboundConnection { ) .method(hyper::Method::CONNECT) .version(hyper::Version::HTTP_2) - .header(BAGGAGE_HEADER, baggage(req, self.pi.cfg.cluster_id.clone())) + .header(BAGGAGE_HEADER, baggage(req)) .header( FORWARDED, build_forwarded(remote_addr, &req.intended_destination_service), @@ -728,17 +728,8 @@ fn build_forwarded(remote_addr: SocketAddr, server: &Option) } } -fn baggage(r: &Request, cluster: String) -> String { - baggage::baggage_header_val( - &cluster, - &r.source.namespace, - &r.source.workload_type, - &r.source.workload_name, - &r.source.canonical_name, - &r.source.canonical_revision, - &r.source.locality.region, - &r.source.locality.zone, - ) +fn baggage(r: &Request) -> String { + baggage::baggage_header_val(&r.source.baggage(), &r.source.workload_type) } #[derive(Debug)] diff --git a/src/state/workload.rs b/src/state/workload.rs index 15c56979e3..a2aaad86dc 100644 --- a/src/state/workload.rs +++ b/src/state/workload.rs @@ -14,6 +14,7 @@ use crate::identity::Identity; +use crate::baggage::Baggage; use crate::state::WorkloadInfo; use crate::strng::Strng; use crate::xds::istio::workload::{Port, PortList}; @@ -301,6 +302,19 @@ impl Workload { service_account: self.service_account.clone(), } } + + pub fn baggage(&self) -> Baggage { + Baggage { + cluster_id: (!self.cluster_id.is_empty()).then_some(self.cluster_id.clone()), + namespace: (!self.namespace.is_empty()).then_some(self.namespace.clone()), + workload_name: (!self.workload_name.is_empty()).then_some(self.workload_name.clone()), + service_name: (!self.canonical_name.is_empty()).then_some(self.canonical_name.clone()), + revision: (!self.canonical_revision.is_empty()) + .then_some(self.canonical_revision.clone()), + region: (!self.locality.region.is_empty()).then_some(self.locality.region.clone()), + zone: (!self.locality.zone.is_empty()).then_some(self.locality.zone.clone()), + } + } } impl fmt::Display for Workload { diff --git a/src/test_helpers/tcp.rs b/src/test_helpers/tcp.rs index 0ec124cde6..6af6f3a155 100644 --- a/src/test_helpers/tcp.rs +++ b/src/test_helpers/tcp.rs @@ -31,9 +31,10 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::time::Instant; use tracing::{debug, error, info, trace}; -use crate::baggage::baggage_header_val; +use crate::baggage::{Baggage, baggage_header_val}; use crate::hyper_util::TokioExecutor; use crate::proxy::BAGGAGE_HEADER; +use crate::strng::Strng; use crate::{identity, tls}; #[derive(Copy, Clone, Debug)] @@ -283,20 +284,20 @@ impl HboneTestServer { } }); let mut resp = Response::new(Full::::from("streaming...")); + let baggage = Baggage { + cluster_id: Some(Strng::from("Kubernetes")), + namespace: Some(Strng::from("default")), + workload_name: Some(Strng::from(&name)), + service_name: Some(Strng::from(&name)), + revision: Some(Strng::from("v1")), + region: Some(Strng::from("r1")), + zone: Some(Strng::from("z1")), + }; resp.headers_mut().insert( BAGGAGE_HEADER, - baggage_header_val( - "Kubernetes", - "default", - "deployment", - &name, - &name, - "v1", - "r1", - "z1", - ) - .parse() - .expect("valid baggage header"), + baggage_header_val(&baggage, "deployment") + .parse() + .expect("valid baggage header"), ); Ok::<_, Infallible>(resp) }