Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 74 additions & 3 deletions src/baggage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use hyper::{
http::HeaderValue,
};

#[derive(Default)]
#[derive(Debug, Default, PartialEq, Eq)]
pub struct Baggage {
pub cluster_id: Option<Strng>,
pub namespace: Option<Strng>,
Expand All @@ -29,6 +29,43 @@ pub struct Baggage {
pub zone: Option<Strng>,
}

pub fn baggage_header_val(baggage: &Baggage, workload_type: &str) -> String {
[
baggage
.cluster_id
.as_ref()
.map(|cluster| format!("k8s.cluster.name={cluster}")),
baggage
.namespace
.as_ref()
.map(|namespace| format!("k8s.namespace.name={namespace}")),
baggage
.workload_name
.as_ref()
.map(|workload| format!("k8s.{workload_type}.name={workload}")),
baggage
.service_name
.as_ref()
.map(|service| format!("service.name={service}")),
baggage
.revision
.as_ref()
.map(|revision| format!("service.version={revision}")),
baggage
.region
.as_ref()
.map(|region| format!("cloud.region={region}")),
baggage
.zone
.as_ref()
.map(|zone| format!("cloud.availability_zone={zone}")),
]
.into_iter()
.flatten()
.collect::<Vec<_>>()
.join(",")
}

pub fn parse_baggage_header(headers: GetAll<HeaderValue>) -> Result<Baggage, ToStrError> {
let mut baggage = Baggage {
..Default::default()
Expand Down Expand Up @@ -67,8 +104,9 @@ pub mod tests {
use hyper::{HeaderMap, http::HeaderValue};

use crate::proxy::BAGGAGE_HEADER;
use crate::strng::Strng;

use super::parse_baggage_header;
use super::{Baggage, baggage_header_val, parse_baggage_header};

#[test]
fn baggage_parser() -> anyhow::Result<()> {
Expand All @@ -90,8 +128,8 @@ pub mod tests {
let mut hm = HeaderMap::new();
let baggage_str = "k8s.cluster.name=,k8s.namespace.name=,k8s.deployment.name=,service.name=,service.version=";
let header_value = HeaderValue::from_str(baggage_str)?;
let baggage = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?;
hm.append(BAGGAGE_HEADER, header_value);
let baggage = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?;
assert_eq!(baggage.cluster_id, None);
assert_eq!(baggage.namespace, None);
assert_eq!(baggage.workload_name, None);
Expand Down Expand Up @@ -136,4 +174,37 @@ pub mod tests {
assert_eq!(baggage.revision, None);
Ok(())
}

#[test]
fn baggage_header_val_can_be_parsed() -> anyhow::Result<()> {
{
let baggage = Baggage {
..Default::default()
};
let mut hm = HeaderMap::new();
hm.append(
BAGGAGE_HEADER,
HeaderValue::from_str(&baggage_header_val(&baggage, "deployment"))?,
);
let parsed = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?;
assert_eq!(baggage, parsed);
}
{
let baggage = Baggage {
cluster_id: Some(Strng::from("cluster")),
namespace: Some(Strng::from("default")),
workload_name: Some(Strng::from("workload")),
service_name: Some(Strng::from("service")),
..Default::default()
};
let mut hm = HeaderMap::new();
hm.append(
BAGGAGE_HEADER,
HeaderValue::from_str(&baggage_header_val(&baggage, "deployment"))?,
);
let parsed = parse_baggage_header(hm.get_all(BAGGAGE_HEADER))?;
assert_eq!(baggage, parsed);
}
Ok(())
}
}
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ const PROXY_MODE_DEDICATED: &str = "dedicated";
const PROXY_MODE_SHARED: &str = "shared";

const LOCALHOST_APP_TUNNEL: &str = "LOCALHOST_APP_TUNNEL";
const ENABLE_ENHANCED_BAGGAGE: &str = "ENABLE_RESPONSE_BAGGAGE";

#[derive(serde::Serialize, Clone, Debug, PartialEq, Eq)]
pub enum RootCert {
Expand Down Expand Up @@ -316,6 +317,7 @@ pub struct Config {

// path to CRL file; if set, enables CRL checking
pub crl_path: Option<PathBuf>,
pub enable_enhanced_baggage: bool,
}

#[derive(serde::Serialize, Clone, Copy, Debug)]
Expand Down Expand Up @@ -875,6 +877,7 @@ pub fn construct_config(pc: ProxyConfig) -> Result<Config, Error> {
.ok()
.filter(|s| !s.is_empty())
.map(PathBuf::from),
enable_enhanced_baggage: parse_default(ENABLE_ENHANCED_BAGGAGE, true)?,
})
}

Expand Down
21 changes: 11 additions & 10 deletions src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::proxy;
use crate::proxy::ConnectionResult;
use crate::proxy::Error::{BackendDisconnected, ClientDisconnected, ReceiveError, SendError};
use crate::proxy::{self, ConnectionResult};
use bytes::{Buf, Bytes, BytesMut};
use pin_project_lite::pin_project;
use std::future::Future;
Expand Down Expand Up @@ -416,9 +415,9 @@ mod tests {
let metrics = std::sync::Arc::new(crate::proxy::Metrics::new(
crate::metrics::sub_registry(&mut registry),
));
let source_addr = "127.0.0.1:12345".parse().unwrap();
let dest_addr = "127.0.0.1:34567".parse().unwrap();
let cr = ConnectionResult::new(
let source_addr: std::net::SocketAddr = "127.0.0.1:12345".parse().unwrap();
let dest_addr: std::net::SocketAddr = "127.0.0.1:34567".parse().unwrap();
let cr = crate::proxy::metrics::ConnectionResultBuilder::new(
source_addr,
dest_addr,
None,
Expand All @@ -432,7 +431,8 @@ mod tests {
destination_service: None,
},
metrics.clone(),
);
)
.build();
copy_bidirectional(ztunnel_downsteam, ztunnel_upsteam, &cr).await
});
const ITERS: usize = 1000;
Expand Down Expand Up @@ -461,9 +461,9 @@ mod tests {
let metrics = std::sync::Arc::new(crate::proxy::Metrics::new(
crate::metrics::sub_registry(&mut registry),
));
let source_addr = "127.0.0.1:12345".parse().unwrap();
let dest_addr = "127.0.0.1:34567".parse().unwrap();
let cr = ConnectionResult::new(
let source_addr: std::net::SocketAddr = "127.0.0.1:12345".parse().unwrap();
let dest_addr: std::net::SocketAddr = "127.0.0.1:34567".parse().unwrap();
let cr = crate::proxy::metrics::ConnectionResultBuilder::new(
source_addr,
dest_addr,
None,
Expand All @@ -477,7 +477,8 @@ mod tests {
destination_service: None,
},
metrics.clone(),
);
)
.build();
copy_bidirectional(WeirdIO(ztunnel_downsteam), WeirdIO(ztunnel_upsteam), &cr).await
});
const WRITES: usize = 2560;
Expand Down
14 changes: 8 additions & 6 deletions src/proxy/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::baggage::{Baggage, parse_baggage_header};
use crate::config;
use crate::identity::Identity;
use crate::proxy::Error;
use crate::proxy::{BAGGAGE_HEADER, Error};
use bytes::{Buf, Bytes};
use h2::SendStream;
use h2::client::{Connection, SendRequest};
Expand Down Expand Up @@ -101,10 +102,10 @@ impl H2ConnectClient {
pub async fn send_request(
&mut self,
req: http::Request<()>,
) -> Result<crate::proxy::h2::H2Stream, Error> {
) -> Result<(crate::proxy::h2::H2Stream, Option<Baggage>), Error> {
let cur = self.stream_count.fetch_add(1, Ordering::SeqCst);
trace!(current_streams = cur, "sending request");
let (send, recv) = match self.internal_send(req).await {
let (send, recv, baggage) = match self.internal_send(req).await {
Ok(r) => r,
Err(e) => {
// Request failed, so drop the stream now
Expand All @@ -123,14 +124,14 @@ impl H2ConnectClient {
_dropped: dropped2,
};
let h2 = crate::proxy::h2::H2Stream { read, write };
Ok(h2)
Ok((h2, baggage))
}

// helper to allow us to handle errors once
async fn internal_send(
&mut self,
req: Request<()>,
) -> Result<(SendStream<Bytes>, h2::RecvStream), Error> {
) -> Result<(SendStream<Bytes>, h2::RecvStream, Option<Baggage>), Error> {
// "This function must return `Ready` before `send_request` is called"
// We should always be ready though, because we make sure we don't go over the max stream limit out of band.
futures::future::poll_fn(|cx| self.sender.poll_ready(cx)).await?;
Expand All @@ -139,7 +140,8 @@ impl H2ConnectClient {
if response.status() != 200 {
return Err(Error::HttpStatus(response.status()));
}
Ok((stream, response.into_body()))
let baggage = parse_baggage_header(response.headers().get_all(BAGGAGE_HEADER)).ok();
Ok((stream, response.into_body(), baggage))
}
}

Expand Down
Loading