Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 68 additions & 17 deletions src/baggage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use hyper::{
http::HeaderValue,
};

#[derive(Default)]
#[derive(Debug, Default, PartialEq, Eq)]
pub struct Baggage {
pub cluster_id: Option<Strng>,
pub namespace: Option<Strng>,
Expand All @@ -29,20 +29,37 @@ pub struct Baggage {
pub zone: Option<Strng>,
}

#[allow(clippy::too_many_arguments)]
pub fn baggage_header_val(
cluster: &str,
namespace: &str,
workload_type: &str,
workload_name: &str,
name: &str,
version: &str,
region: &str,
zone: &str,
) -> String {
format!(
"k8s.cluster.name={cluster},k8s.namespace.name={namespace},k8s.{workload_type}.name={workload_name},service.name={name},service.version={version},cloud.region={region},cloud.availability_zone={zone}",
)
pub fn baggage_header_val(baggage: &Baggage, workload_type: &str) -> String {
let mut items = Vec::new();
baggage
.cluster_id
.as_ref()
.inspect(|cluster| items.push(format!("k8s.cluster.name={cluster}")));
baggage
.namespace
.as_ref()
.inspect(|namespace| items.push(format!("k8s.namespace.name={namespace}")));
baggage
.workload_name
.as_ref()
.inspect(|workload_name| items.push(format!("k8s.{workload_type}.name={workload_name}")));
baggage
.service_name
.as_ref()
.inspect(|service_name| items.push(format!("service.name={service_name}")));
baggage
.revision
.as_ref()
.inspect(|revision| items.push(format!("service.version={revision}")));
baggage
.region
.as_ref()
.inspect(|region| items.push(format!("cloud.region={region}")));
baggage
.zone
.as_ref()
.inspect(|zone| items.push(format!("cloud.availability_zone={zone}")));
items.join(",")
}

pub fn parse_baggage_header(headers: GetAll<HeaderValue>) -> Result<Baggage, ToStrError> {
Expand Down Expand Up @@ -83,8 +100,9 @@ pub mod tests {
use hyper::{HeaderMap, http::HeaderValue};

use crate::proxy::BAGGAGE_HEADER;
use crate::strng::Strng;

use super::parse_baggage_header;
use super::{Baggage, baggage_header_val, parse_baggage_header};

#[test]
fn baggage_parser() -> anyhow::Result<()> {
Expand All @@ -106,8 +124,8 @@ pub mod tests {
let mut hm = HeaderMap::new();
let baggage_str = "k8s.cluster.name=,k8s.namespace.name=,k8s.deployment.name=,service.name=,service.version=";
let header_value = HeaderValue::from_str(baggage_str)?;
let baggage = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?;
hm.append(BAGGAGE_HEADER, header_value);
let baggage = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?;
assert_eq!(baggage.cluster_id, None);
assert_eq!(baggage.namespace, None);
assert_eq!(baggage.workload_name, None);
Expand Down Expand Up @@ -152,4 +170,37 @@ pub mod tests {
assert_eq!(baggage.revision, None);
Ok(())
}

#[test]
fn baggage_header_val_can_be_parsed() -> anyhow::Result<()> {
{
let baggage = Baggage {
..Default::default()
};
let mut hm = HeaderMap::new();
hm.append(
BAGGAGE_HEADER,
HeaderValue::from_str(&baggage_header_val(&baggage, "deployment"))?,
);
let parsed = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?;
assert_eq!(baggage, parsed);
}
{
let baggage = Baggage {
cluster_id: Some(Strng::from("cluster")),
namespace: Some(Strng::from("default")),
workload_name: Some(Strng::from("workload")),
service_name: Some(Strng::from("service")),
..Default::default()
};
let mut hm = HeaderMap::new();
hm.append(
BAGGAGE_HEADER,
HeaderValue::from_str(&baggage_header_val(&baggage, "deployment"))?,
);
let parsed = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?;
assert_eq!(baggage, parsed);
}
Ok(())
}
}
19 changes: 5 additions & 14 deletions src/proxy/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl Inbound {
let send = req
.send_response(build_response(
StatusCode::OK,
Some((ri.destination_workload.as_ref(), pi.cfg.cluster_id.as_str())),
Some(ri.destination_workload.as_ref()),
))
.and_then(|h2_stream| async {
if let Some(TunnelRequest {
Expand Down Expand Up @@ -453,7 +453,7 @@ impl Inbound {
ConnectionOpen {
reporter: Reporter::destination,
source,
derived_source: Some(derived_source.clone()),
derived_source: Some(derived_source),
destination: Some(destination_workload.clone()),
connection_security_policy: metrics::SecurityPolicy::mutual_tls,
destination_service: ds,
Expand Down Expand Up @@ -739,22 +739,13 @@ pub fn parse_forwarded_host<T: RequestParts>(req: &T) -> Option<String> {
}

// Second argument is local workload and cluster name
fn build_response(status: StatusCode, local_wl: Option<(&Workload, &str)>) -> Response<()> {
fn build_response(status: StatusCode, local_wl: Option<&Workload>) -> Response<()> {
let mut builder = Response::builder().status(status);

if let Some((local_wl, cluster)) = local_wl {
if let Some(local_wl) = local_wl {
builder = builder.header(
BAGGAGE_HEADER,
baggage_header_val(
cluster,
&local_wl.namespace,
&local_wl.workload_type,
&local_wl.workload_name,
&local_wl.canonical_name,
&local_wl.canonical_revision,
&local_wl.locality.region,
&local_wl.locality.zone,
),
baggage_header_val(&local_wl.baggage(), &local_wl.workload_type),
)
}

Expand Down
15 changes: 3 additions & 12 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ impl OutboundConnection {
)
.method(hyper::Method::CONNECT)
.version(hyper::Version::HTTP_2)
.header(BAGGAGE_HEADER, baggage(req, self.pi.cfg.cluster_id.clone()))
.header(BAGGAGE_HEADER, baggage(req))
.header(
FORWARDED,
build_forwarded(remote_addr, &req.intended_destination_service),
Expand Down Expand Up @@ -728,17 +728,8 @@ fn build_forwarded(remote_addr: SocketAddr, server: &Option<ServiceDescription>)
}
}

fn baggage(r: &Request, cluster: String) -> String {
baggage::baggage_header_val(
&cluster,
&r.source.namespace,
&r.source.workload_type,
&r.source.workload_name,
&r.source.canonical_name,
&r.source.canonical_revision,
&r.source.locality.region,
&r.source.locality.zone,
)
fn baggage(r: &Request) -> String {
baggage::baggage_header_val(&r.source.baggage(), &r.source.workload_type)
}

#[derive(Debug)]
Expand Down
14 changes: 14 additions & 0 deletions src/state/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use crate::identity::Identity;

use crate::baggage::Baggage;
use crate::state::WorkloadInfo;
use crate::strng::Strng;
use crate::xds::istio::workload::{Port, PortList};
Expand Down Expand Up @@ -301,6 +302,19 @@ impl Workload {
service_account: self.service_account.clone(),
}
}

pub fn baggage(&self) -> Baggage {
Baggage {
cluster_id: (!self.cluster_id.is_empty()).then_some(self.cluster_id.clone()),
namespace: (!self.namespace.is_empty()).then_some(self.namespace.clone()),
workload_name: (!self.workload_name.is_empty()).then_some(self.workload_name.clone()),
service_name: (!self.canonical_name.is_empty()).then_some(self.canonical_name.clone()),
revision: (!self.canonical_revision.is_empty())
.then_some(self.canonical_revision.clone()),
region: (!self.locality.region.is_empty()).then_some(self.locality.region.clone()),
zone: (!self.locality.zone.is_empty()).then_some(self.locality.zone.clone()),
}
}
}

impl fmt::Display for Workload {
Expand Down
27 changes: 14 additions & 13 deletions src/test_helpers/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ use tokio::net::{TcpListener, TcpStream};
use tokio::time::Instant;
use tracing::{debug, error, info, trace};

use crate::baggage::baggage_header_val;
use crate::baggage::{Baggage, baggage_header_val};
use crate::hyper_util::TokioExecutor;
use crate::proxy::BAGGAGE_HEADER;
use crate::strng::Strng;
use crate::{identity, tls};

#[derive(Copy, Clone, Debug)]
Expand Down Expand Up @@ -283,20 +284,20 @@ impl HboneTestServer {
}
});
let mut resp = Response::new(Full::<Bytes>::from("streaming..."));
let baggage = Baggage {
cluster_id: Some(Strng::from("Kubernetes")),
namespace: Some(Strng::from("default")),
workload_name: Some(Strng::from(&name)),
service_name: Some(Strng::from(&name)),
revision: Some(Strng::from("v1")),
region: Some(Strng::from("r1")),
zone: Some(Strng::from("z1")),
};
resp.headers_mut().insert(
BAGGAGE_HEADER,
baggage_header_val(
"Kubernetes",
"default",
"deployment",
&name,
&name,
"v1",
"r1",
"z1",
)
.parse()
.expect("valid baggage header"),
baggage_header_val(&baggage, "deployment")
.parse()
.expect("valid baggage header"),
);
Ok::<_, Infallible>(resp)
}
Expand Down