diff --git a/benches/throughput.rs b/benches/throughput.rs index c3c51dae72..d3fa15764d 100644 --- a/benches/throughput.rs +++ b/benches/throughput.rs @@ -35,7 +35,7 @@ use tokio::sync::Mutex; use tracing::info; use ztunnel::rbac::{Authorization, RbacMatch, StringMatch}; -use ztunnel::state::workload::{Protocol, Workload}; +use ztunnel::state::workload::{InboundProtocol, Workload}; use ztunnel::state::{DemandProxyState, ProxyRbacContext, ProxyState}; use ztunnel::test_helpers::app::{DestinationAddr, TestApp}; use ztunnel::test_helpers::linux::{TestMode, WorkloadManager}; @@ -457,7 +457,7 @@ fn hbone_connection_config() -> ztunnel::config::ConfigSource { let lwl = LocalWorkload { workload: Workload { workload_ips: vec![hbone_connection_ip(i)], - protocol: Protocol::HBONE, + protocol: InboundProtocol::HBONE, uid: strng::format!("cluster1//v1/Pod/default/remote{}", i), name: strng::format!("workload-{}", i), namespace: strng::format!("namespace-{}", i), @@ -471,7 +471,7 @@ fn hbone_connection_config() -> ztunnel::config::ConfigSource { let lwl = LocalWorkload { workload: Workload { workload_ips: vec![], - protocol: Protocol::HBONE, + protocol: InboundProtocol::HBONE, uid: "cluster1//v1/Pod/default/local-source".into(), name: "local-source".into(), namespace: "default".into(), diff --git a/src/cert_fetcher.rs b/src/cert_fetcher.rs index ab905b7d9d..df2a53a6c1 100644 --- a/src/cert_fetcher.rs +++ b/src/cert_fetcher.rs @@ -16,7 +16,7 @@ use crate::config; use crate::config::ProxyMode; use crate::identity::Priority::Warmup; use crate::identity::{Identity, Request, SecretManager}; -use crate::state::workload::{Protocol, Workload}; +use crate::state::workload::{InboundProtocol, Workload}; use std::sync::Arc; use tokio::sync::mpsc; use tracing::{debug, error, info}; @@ -96,7 +96,7 @@ impl CertFetcherImpl { // We only get certs for our own node Some(w.node.as_ref()) == self.local_node.as_deref() && // If it doesn't support HBONE it *probably* doesn't need a cert. - (w.native_tunnel || w.protocol == Protocol::HBONE) + (w.native_tunnel || w.protocol == InboundProtocol::HBONE) } } diff --git a/src/proxy.rs b/src/proxy.rs index b6c6346e4e..5ccafb8515 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -446,6 +446,9 @@ pub enum Error { #[error("unknown waypoint: {0}")] UnknownWaypoint(String), + #[error("unknown network gateway: {0}")] + UnknownNetworkGateway(String), + #[error("no service or workload for hostname: {0}")] NoHostname(String), diff --git a/src/proxy/connection_manager.rs b/src/proxy/connection_manager.rs index 04e338ddc8..a02cfa0322 100644 --- a/src/proxy/connection_manager.rs +++ b/src/proxy/connection_manager.rs @@ -24,7 +24,7 @@ use std::net::SocketAddr; use crate::drain; use crate::drain::{DrainTrigger, DrainWatcher}; -use crate::state::workload::Protocol; +use crate::state::workload::{InboundProtocol, OutboundProtocol}; use std::sync::Arc; use std::sync::RwLock; use tracing::{debug, error, info, warn}; @@ -134,7 +134,7 @@ pub struct OutboundConnection { pub src: SocketAddr, pub original_dst: SocketAddr, pub actual_dst: SocketAddr, - pub protocol: Protocol, + pub protocol: OutboundProtocol, } #[derive(Debug, Clone, Eq, Hash, Ord, PartialEq, PartialOrd, serde::Serialize)] @@ -143,7 +143,7 @@ pub struct InboundConnectionDump { pub src: SocketAddr, pub original_dst: Option, pub actual_dst: SocketAddr, - pub protocol: Protocol, + pub protocol: InboundProtocol, } #[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize)] @@ -160,7 +160,7 @@ impl ConnectionManager { src: SocketAddr, original_dst: SocketAddr, actual_dst: SocketAddr, - protocol: Protocol, + protocol: OutboundProtocol, ) -> OutboundConnectionGuard { let c = OutboundConnection { src, @@ -284,9 +284,9 @@ impl Serialize for ConnectionManager { original_dst: c.dest_service, actual_dst: c.ctx.conn.dst, protocol: if c.ctx.conn.src_identity.is_some() { - Protocol::HBONE + InboundProtocol::HBONE } else { - Protocol::TCP + InboundProtocol::TCP }, }) .collect(); diff --git a/src/proxy/h2.rs b/src/proxy/h2.rs index 257f75647d..b86ac4df59 100644 --- a/src/proxy/h2.rs +++ b/src/proxy/h2.rs @@ -13,7 +13,7 @@ // limitations under the License. use crate::copy; -use bytes::Bytes; +use bytes::{BufMut, Bytes}; use futures_core::ready; use h2::Reason; use std::io::Error; @@ -85,6 +85,8 @@ pub struct H2StreamWriteHalf { _dropped: Option, } +pub struct TokioH2Stream(H2Stream); + struct DropCounter { // Whether the other end of this shared counter has already dropped. // We only decrement if they have, so we do not double count @@ -138,6 +140,69 @@ impl Drop for DropCounter { } } +// We can't directly implement tokio::io::{AsyncRead, AsyncWrite} for H2Stream because +// then the specific implementation will conflict with the generic one. +impl TokioH2Stream { + pub fn new(stream: H2Stream) -> Self { + Self(stream) + } +} + +impl tokio::io::AsyncRead for TokioH2Stream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + let pinned = std::pin::Pin::new(&mut self.0.read); + copy::ResizeBufRead::poll_bytes(pinned, cx).map(|r| match r { + Ok(bytes) => { + if buf.remaining() < bytes.len() { + Err(Error::new( + std::io::ErrorKind::Other, + format!( + "kould overflow buffer of with {} remaining", + buf.remaining() + ), + )) + } else { + buf.put(bytes); + Ok(()) + } + } + Err(e) => Err(e), + }) + } +} + +impl tokio::io::AsyncWrite for TokioH2Stream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let pinned = std::pin::Pin::new(&mut self.0.write); + let buf = Bytes::copy_from_slice(buf); + copy::AsyncWriteBuf::poll_write_buf(pinned, cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let pinned = std::pin::Pin::new(&mut self.0.write); + copy::AsyncWriteBuf::poll_flush(pinned, cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let pinned = std::pin::Pin::new(&mut self.0.write); + copy::AsyncWriteBuf::poll_shutdown(pinned, cx) + } +} + impl copy::ResizeBufRead for H2StreamReadHalf { fn poll_bytes(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); diff --git a/src/proxy/h2/client.rs b/src/proxy/h2/client.rs index a847cf4ac3..4c768d537e 100644 --- a/src/proxy/h2/client.rs +++ b/src/proxy/h2/client.rs @@ -27,10 +27,8 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::net::TcpStream; use tokio::sync::oneshot; use tokio::sync::watch::Receiver; -use tokio_rustls::client::TlsStream; use tracing::{Instrument, debug, error, trace, warn}; #[derive(Debug, Clone)] @@ -147,7 +145,7 @@ impl H2ConnectClient { pub async fn spawn_connection( cfg: Arc, - s: TlsStream, + s: impl AsyncRead + AsyncWrite + Unpin + Send + 'static, driver_drain: Receiver, wl_key: WorkloadKey, ) -> Result { diff --git a/src/proxy/inbound.rs b/src/proxy/inbound.rs index 17c47c7214..0df26ffc67 100644 --- a/src/proxy/inbound.rs +++ b/src/proxy/inbound.rs @@ -665,7 +665,7 @@ mod tests { self, DemandProxyState, service::{Endpoint, EndpointSet, Service}, workload::{ - ApplicationTunnel, GatewayAddress, NetworkAddress, Protocol, Workload, + ApplicationTunnel, GatewayAddress, InboundProtocol, NetworkAddress, Workload, application_tunnel::Protocol as AppProtocol, gatewayaddress::Destination, }, }, @@ -922,7 +922,7 @@ mod tests { .map(|(name, ip, waypoint, app_tunnel)| Workload { workload_ips: vec![ip.parse().unwrap()], waypoint: waypoint.workload_attached(), - protocol: Protocol::HBONE, + protocol: InboundProtocol::HBONE, uid: strng::format!("cluster1//v1/Pod/default/{name}"), name: strng::format!("workload-{name}"), namespace: "default".into(), diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index 7d0a95cb6e..91b1015ed5 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -37,9 +37,12 @@ use crate::drain::run_with_drain; use crate::proxy::h2::{H2Stream, client::WorkloadKey}; use crate::state::ServiceResolutionMode; use crate::state::service::ServiceDescription; -use crate::state::workload::{NetworkAddress, Protocol, Workload, address::Address}; +use crate::state::workload::OutboundProtocol; +use crate::state::workload::{InboundProtocol, NetworkAddress, Workload, address::Address}; use crate::{assertions, copy, proxy, socket}; +use super::h2::TokioH2Stream; + pub struct Outbound { pi: Arc, drain: DrainWatcher, @@ -183,7 +186,7 @@ impl OutboundConnection { ); let metrics = self.pi.metrics.clone(); - let hbone_target = req.hbone_target_destination.map(HboneAddress::SocketAddr); + let hbone_target = req.hbone_target_destination.clone(); let result_tracker = Box::new(ConnectionResult::new( source_addr, req.actual_destination, @@ -194,11 +197,21 @@ impl OutboundConnection { )); let res = match req.protocol { - Protocol::HBONE => { + OutboundProtocol::DOUBLEHBONE => { + // We box this since its not a common path and it would make the future really big. + Box::pin(self.proxy_to_double_hbone( + source_stream, + source_addr, + &req, + &result_tracker, + )) + .await + } + OutboundProtocol::HBONE => { self.proxy_to_hbone(source_stream, source_addr, &req, &result_tracker) .await } - Protocol::TCP => { + OutboundProtocol::TCP => { self.proxy_to_tcp(source_stream, &req, &result_tracker) .await } @@ -206,6 +219,58 @@ impl OutboundConnection { result_tracker.record(res) } + async fn proxy_to_double_hbone( + &mut self, + stream: TcpStream, + remote_addr: SocketAddr, + req: &Request, + connection_stats: &ConnectionResult, + ) -> Result<(), Error> { + // Create the outer HBONE stream + let upgraded = Box::pin(self.send_hbone_request(remote_addr, req)).await?; + // Wrap upgraded to implement tokio's Async{Write,Read} + let upgraded = TokioH2Stream::new(upgraded); + + // For the inner one, we do it manually to avoid connection pooling. + // Otherwise, we would only ever reach one workload in the remote cluster. + // We also need to abort tasks the right way to get graceful terminations. + let wl_key = WorkloadKey { + src_id: req.source.identity(), + dst_id: req.final_sans.clone(), + src: remote_addr.ip(), + dst: req.actual_destination, + }; + + // Fetch certs and establish inner TLS connection. + let cert = self + .pi + .local_workload_information + .fetch_certificate() + .await?; + let connector = cert.outbound_connector(wl_key.dst_id.clone())?; + let tls_stream = connector.connect(upgraded).await?; + + // Spawn inner CONNECT tunnel + let (drain_tx, drain_rx) = tokio::sync::watch::channel(false); + let mut sender = + super::h2::client::spawn_connection(self.pi.cfg.clone(), tls_stream, drain_rx, wl_key) + .await?; + let http_request = self.create_hbone_request(remote_addr, req); + let inner_upgraded = sender.send_request(http_request).await?; + + // Proxy + let res = copy::copy_bidirectional( + copy::TcpStreamSplitter(stream), + inner_upgraded, + connection_stats, + ) + .await; + + let _ = drain_tx.send(true); + + res + } + async fn proxy_to_hbone( &mut self, stream: TcpStream, @@ -217,14 +282,15 @@ impl OutboundConnection { copy::copy_bidirectional(copy::TcpStreamSplitter(stream), upgraded, connection_stats).await } - async fn send_hbone_request( + fn create_hbone_request( &mut self, remote_addr: SocketAddr, req: &Request, - ) -> Result { - let request = http::Request::builder() + ) -> http::Request<()> { + http::Request::builder() .uri( req.hbone_target_destination + .as_ref() .expect("HBONE must have target") .to_string(), ) @@ -237,12 +303,18 @@ impl OutboundConnection { ) .header(TRACEPARENT_HEADER, self.id.header()) .body(()) - .expect("builder with known status code should not fail"); + .expect("builder with known status code should not fail") + } + async fn send_hbone_request( + &mut self, + remote_addr: SocketAddr, + req: &Request, + ) -> Result { + let request = self.create_hbone_request(remote_addr, req); let pool_key = Box::new(WorkloadKey { src_id: req.source.identity(), // Clone here shouldn't be needed ideally, we could just take ownership of Request. - // But that dst_id: req.upstream_sans.clone(), src: remote_addr.ip(), dst: req.actual_destination, @@ -276,25 +348,23 @@ impl OutboundConnection { } fn conn_metrics_from_request(req: &Request) -> ConnectionOpen { - let derived_source = if req.protocol == Protocol::HBONE { - Some(DerivedWorkload { - // We are going to do mTLS, so report our identity - identity: Some(req.source.as_ref().identity()), - ..Default::default() - }) - } else { - None + let (derived_source, security_policy) = match req.protocol { + OutboundProtocol::HBONE | OutboundProtocol::DOUBLEHBONE => ( + Some(DerivedWorkload { + // We are going to do mTLS, so report our identity + identity: Some(req.source.as_ref().identity()), + ..Default::default() + }), + metrics::SecurityPolicy::mutual_tls, + ), + OutboundProtocol::TCP => (None, metrics::SecurityPolicy::unknown), }; ConnectionOpen { reporter: Reporter::source, derived_source, source: Some(req.source.clone()), destination: req.actual_destination_workload.clone(), - connection_security_policy: if req.protocol == Protocol::HBONE { - metrics::SecurityPolicy::mutual_tls - } else { - metrics::SecurityPolicy::unknown - }, + connection_security_policy: security_policy, destination_service: req.intended_destination_service.clone(), } } @@ -324,16 +394,22 @@ impl OutboundConnection { .await? { let upstream_sans = waypoint.workload_and_services_san(); - let actual_destination = waypoint.workload_socket_addr(); + let actual_destination = + waypoint + .workload_socket_addr() + .ok_or(Error::NoValidDestination(Box::new( + (*waypoint.workload).clone(), + )))?; debug!("built request to service waypoint proxy"); return Ok(Request { - protocol: Protocol::HBONE, + protocol: OutboundProtocol::HBONE, source: source_workload, - hbone_target_destination: Some(target), + hbone_target_destination: Some(HboneAddress::SocketAddr(target)), actual_destination_workload: Some(waypoint.workload), intended_destination_service: Some(ServiceDescription::from(&*target_service)), actual_destination, upstream_sans, + final_sans: vec![], }); } // this was service addressed but we did not find a waypoint @@ -357,16 +433,53 @@ impl OutboundConnection { } debug!("built request as passthrough; no upstream found"); return Ok(Request { - protocol: Protocol::TCP, + protocol: OutboundProtocol::TCP, source: source_workload, hbone_target_destination: None, actual_destination_workload: None, intended_destination_service: None, actual_destination: target, upstream_sans: vec![], + final_sans: vec![], }); }; + // Check whether we are using an E/W gateway and sending cross network traffic + if us.workload.network != source_workload.network { + if let Some(ew_gtw) = &us.workload.network_gateway { + let gtw_us = { + self.pi + .state + .fetch_network_gateway(ew_gtw, &source_workload, target) + .await? + }; + + let svc = us + .destination_service + .as_ref() + .expect("Workloads with network gateways must be service addressed."); + let hbone_target_destination = + Some(HboneAddress::SvcHostname(svc.hostname.clone(), us.port)); + + return Ok(Request { + protocol: OutboundProtocol::DOUBLEHBONE, + source: source_workload, + hbone_target_destination, + actual_destination_workload: Some(gtw_us.workload.clone()), + intended_destination_service: us.destination_service.clone(), + actual_destination: gtw_us.workload_socket_addr().ok_or( + Error::NoValidDestination(Box::new((*gtw_us.workload).clone())), + )?, + upstream_sans: gtw_us.workload_and_services_san(), + final_sans: us.service_sans(), + }); + } else { + // Do not try to send cross-network traffic without network gateway. + return Err(Error::NoValidDestination(Box::new((*us.workload).clone()))); + } + } + + // We are not using a network gateway and there is no workload address. let from_waypoint = proxy::check_from_waypoint( state, &us.workload, @@ -384,45 +497,61 @@ impl OutboundConnection { .fetch_workload_waypoint(&us.workload, &source_workload, target) .await?; if let Some(waypoint) = waypoint { - let actual_destination = waypoint.workload_socket_addr(); + let actual_destination = + waypoint + .workload_socket_addr() + .ok_or(Error::NoValidDestination(Box::new( + (*waypoint.workload).clone(), + )))?; let upstream_sans = waypoint.workload_and_services_san(); debug!("built request to workload waypoint proxy"); return Ok(Request { // Always use HBONE here - protocol: Protocol::HBONE, + protocol: OutboundProtocol::HBONE, source: source_workload, // Use the original VIP, not translated - hbone_target_destination: Some(target), + hbone_target_destination: Some(HboneAddress::SocketAddr(target)), actual_destination_workload: Some(waypoint.workload), intended_destination_service: us.destination_service.clone(), actual_destination, upstream_sans, + final_sans: vec![], }); } // Workload doesn't have a waypoint; send directly } + let selected_workload_ip = us + .selected_workload_ip + .ok_or(Error::NoValidDestination(Box::new((*us.workload).clone())))?; + // only change the port if we're sending HBONE let actual_destination = match us.workload.protocol { - Protocol::HBONE => SocketAddr::from((us.selected_workload_ip, self.hbone_port)), - Protocol::TCP => us.workload_socket_addr(), + InboundProtocol::HBONE => SocketAddr::from((selected_workload_ip, self.hbone_port)), + InboundProtocol::TCP => us + .workload_socket_addr() + .ok_or(Error::NoValidDestination(Box::new((*us.workload).clone())))?, }; let hbone_target_destination = match us.workload.protocol { - Protocol::HBONE => Some(us.workload_socket_addr()), - Protocol::TCP => None, + InboundProtocol::HBONE => Some(HboneAddress::SocketAddr( + us.workload_socket_addr() + .ok_or(Error::NoValidDestination(Box::new((*us.workload).clone())))?, + )), + InboundProtocol::TCP => None, }; // For case no waypoint for both side and direct to remote node proxy - let upstream_sans = us.workload_and_services_san(); + let (upstream_sans, final_sans) = (us.workload_and_services_san(), vec![]); debug!("built request to workload"); Ok(Request { - protocol: us.workload.protocol, + protocol: OutboundProtocol::from(us.workload.protocol), source: source_workload, hbone_target_destination, actual_destination_workload: Some(us.workload.clone()), intended_destination_service: us.destination_service.clone(), actual_destination, upstream_sans, + final_sans, }) } } @@ -453,7 +582,7 @@ fn baggage(r: &Request, cluster: String) -> String { #[derive(Debug)] struct Request { - protocol: Protocol, + protocol: OutboundProtocol, // Source workload sending the request source: Arc, // The actual destination workload we are targeting. When proxying through a waypoint, this is the waypoint, @@ -469,11 +598,16 @@ struct Request { // When using HBONE, the `hbone_target_destination` is the inner :authority and `actual_destination` is the TCP destination. actual_destination: SocketAddr, // If using HBONE, the inner (:authority) of the HBONE request. - hbone_target_destination: Option, + hbone_target_destination: Option, // The identity we will assert for the next hop; this may not be the same as actual_destination_workload // in the case of proxies along the path. upstream_sans: Vec, + + // The identity of workload that will ultimately process this request. + // This field only matters if we need to know both the identity of the next hop, as well as the + // final hop (currently, this is only double HBONE). + final_sans: Vec, } #[cfg(test)] @@ -494,7 +628,9 @@ mod tests { use crate::xds::istio::workload::Workload as XdsWorkload; use crate::xds::istio::workload::address::Type as XdsAddressType; use crate::xds::istio::workload::{IpFamilies, Port}; - use crate::xds::istio::workload::{NetworkAddress as XdsNetworkAddress, PortList}; + use crate::xds::istio::workload::{ + NamespacedHostname as XdsNamespacedHostname, NetworkAddress as XdsNetworkAddress, PortList, + }; use crate::xds::istio::workload::{NetworkMode, Service as XdsService}; use crate::{identity, xds}; @@ -607,6 +743,7 @@ mod tests { protocol: r.protocol, hbone_destination: &r .hbone_target_destination + .as_ref() .map(|s| s.to_string()) .unwrap_or_default(), destination: &r.actual_destination.to_string(), @@ -629,7 +766,7 @@ mod tests { ..Default::default() }), Some(ExpectedRequest { - protocol: Protocol::TCP, + protocol: OutboundProtocol::TCP, hbone_destination: "", destination: "1.2.3.4:80", }), @@ -637,6 +774,182 @@ mod tests { .await; } + #[tokio::test] + async fn build_request_wrong_network() { + run_build_request_multi( + "127.0.0.1", + "127.0.0.3:80", + vec![ + XdsAddressType::Service(XdsService { + hostname: "example.com".to_string(), + addresses: vec![XdsNetworkAddress { + network: "".to_string(), + address: vec![127, 0, 0, 3], + }], + ports: vec![Port { + service_port: 80, + target_port: 8080, + }], + ..Default::default() + }), + XdsAddressType::Workload(XdsWorkload { + uid: "cluster1//v1/Pod/default/remote-pod".to_string(), + addresses: vec![Bytes::copy_from_slice(&[10, 0, 0, 2])], + network: "remote".to_string(), + services: std::collections::HashMap::from([( + "/example.com".to_string(), + PortList { + ports: vec![Port { + service_port: 80, + target_port: 8080, + }], + }, + )]), + ..Default::default() + }), + ], + None, + ) + .await; + } + + #[tokio::test] + async fn build_request_double_hbone() { + run_build_request_multi( + "127.0.0.1", + "127.0.0.3:80", + vec![ + XdsAddressType::Service(XdsService { + hostname: "example.com".to_string(), + addresses: vec![XdsNetworkAddress { + network: "".to_string(), + address: vec![127, 0, 0, 3], + }], + ports: vec![Port { + service_port: 80, + target_port: 8080, + }], + ..Default::default() + }), + XdsAddressType::Workload(XdsWorkload { + uid: "cluster1//v1/Pod/default/remote-pod".to_string(), + addresses: vec![], + network: "remote".to_string(), + network_gateway: Some(xds::istio::workload::GatewayAddress { + destination: Some( + xds::istio::workload::gateway_address::Destination::Address( + XdsNetworkAddress { + network: "remote".to_string(), + address: vec![10, 22, 1, 1], + }, + ), + ), + hbone_mtls_port: 15009, + }), + services: std::collections::HashMap::from([( + "/example.com".to_string(), + PortList { + ports: vec![Port { + service_port: 80, + target_port: 8080, + }], + }, + )]), + ..Default::default() + }), + XdsAddressType::Workload(XdsWorkload { + uid: "cluster1//v1/Pod/default/ew-gtw".to_string(), + addresses: vec![Bytes::copy_from_slice(&[10, 22, 1, 1])], + network: "remote".to_string(), + ..Default::default() + }), + ], + Some(ExpectedRequest { + protocol: OutboundProtocol::DOUBLEHBONE, + hbone_destination: "example.com:8080", + destination: "10.22.1.1:15009", + }), + ) + .await; + run_build_request_multi( + "127.0.0.1", + "127.0.0.3:80", + vec![ + XdsAddressType::Service(XdsService { + hostname: "example.com".to_string(), + addresses: vec![XdsNetworkAddress { + network: "".to_string(), + address: vec![127, 0, 0, 3], + }], + ports: vec![Port { + service_port: 80, + target_port: 8080, + }], + ..Default::default() + }), + XdsAddressType::Service(XdsService { + hostname: "ew-gtw".to_string(), + addresses: vec![XdsNetworkAddress { + network: "".to_string(), + address: vec![127, 0, 0, 4], + }], + ports: vec![Port { + service_port: 15009, + target_port: 15009, + }], + ..Default::default() + }), + XdsAddressType::Workload(XdsWorkload { + uid: "cluster1//v1/Pod/default/remote-pod".to_string(), + addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 6])], + network: "remote".to_string(), + network_gateway: Some(xds::istio::workload::GatewayAddress { + hbone_mtls_port: 15009, + destination: Some( + xds::istio::workload::gateway_address::Destination::Hostname( + XdsNamespacedHostname { + namespace: Default::default(), + hostname: "ew-gtw".into(), + }, + ), + ), + }), + services: std::collections::HashMap::from([( + "/example.com".to_string(), + PortList { + ports: vec![Port { + service_port: 80, + target_port: 8080, + }], + }, + )]), + ..Default::default() + }), + XdsAddressType::Workload(XdsWorkload { + uid: "cluster1//v1/Pod/default/ew-gtw".to_string(), + addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 5])], + network: "remote".to_string(), + services: std::collections::HashMap::from([( + "/ew-gtw".to_string(), + PortList { + ports: vec![Port { + service_port: 15009, + target_port: 15008, + }], + }, + )]), + ..Default::default() + }), + ], + Some(ExpectedRequest { + protocol: OutboundProtocol::DOUBLEHBONE, + hbone_destination: "example.com:8080", + destination: "127.0.0.5:15008", + }), + ) + .await; + } + #[tokio::test] async fn build_request_known_dest_remote_node_tcp() { run_build_request( @@ -652,7 +965,7 @@ mod tests { ..Default::default() }), Some(ExpectedRequest { - protocol: Protocol::TCP, + protocol: OutboundProtocol::TCP, hbone_destination: "", destination: "127.0.0.2:80", }), @@ -675,7 +988,7 @@ mod tests { ..Default::default() }), Some(ExpectedRequest { - protocol: Protocol::HBONE, + protocol: OutboundProtocol::HBONE, hbone_destination: "127.0.0.2:80", destination: "127.0.0.2:15008", }), @@ -698,7 +1011,7 @@ mod tests { ..Default::default() }), Some(ExpectedRequest { - protocol: Protocol::TCP, + protocol: OutboundProtocol::TCP, hbone_destination: "", destination: "127.0.0.2:80", }), @@ -721,7 +1034,7 @@ mod tests { ..Default::default() }), Some(ExpectedRequest { - protocol: Protocol::HBONE, + protocol: OutboundProtocol::HBONE, hbone_destination: "127.0.0.2:80", destination: "127.0.0.2:15008", }), @@ -750,7 +1063,7 @@ mod tests { }), // Even though source has a waypoint, we don't use it Some(ExpectedRequest { - protocol: Protocol::TCP, + protocol: OutboundProtocol::TCP, hbone_destination: "", destination: "127.0.0.1:80", }), @@ -779,7 +1092,7 @@ mod tests { }), // Should use the waypoint Some(ExpectedRequest { - protocol: Protocol::HBONE, + protocol: OutboundProtocol::HBONE, hbone_destination: "127.0.0.2:80", destination: "127.0.0.10:15008", }), @@ -813,7 +1126,7 @@ mod tests { }), // Should use the waypoint Some(ExpectedRequest { - protocol: Protocol::HBONE, + protocol: OutboundProtocol::HBONE, hbone_destination: "[ff06::c3]:80", destination: "127.0.0.11:15008", }), @@ -848,7 +1161,7 @@ mod tests { }), // Should use the waypoint Some(ExpectedRequest { - protocol: Protocol::HBONE, + protocol: OutboundProtocol::HBONE, hbone_destination: "127.0.0.3:80", destination: "127.0.0.10:15008", }), @@ -933,7 +1246,7 @@ mod tests { }), ], Some(ExpectedRequest { - protocol: Protocol::TCP, + protocol: OutboundProtocol::TCP, hbone_destination: "", destination: "127.0.0.2:1234", }), @@ -989,7 +1302,7 @@ mod tests { // Traffic to the service should go to the pod in the service Some(ExpectedRequest { destination: "127.0.0.2:80", - protocol: Protocol::TCP, + protocol: OutboundProtocol::TCP, hbone_destination: "", }), ) @@ -1010,7 +1323,7 @@ mod tests { // Traffic to the service should go to the pod in the service Some(ExpectedRequest { destination: "127.0.0.2:80", - protocol: Protocol::TCP, + protocol: OutboundProtocol::TCP, hbone_destination: "", }), ) @@ -1040,7 +1353,7 @@ mod tests { "127.0.0.2:80", workload.clone(), Some(ExpectedRequest { - protocol: Protocol::TCP, + protocol: OutboundProtocol::TCP, hbone_destination: "", destination: "127.0.0.2:80", }), @@ -1052,7 +1365,7 @@ mod tests { "[ff06::c3]:80", workload.clone(), Some(ExpectedRequest { - protocol: Protocol::TCP, + protocol: OutboundProtocol::TCP, hbone_destination: "", destination: "[ff06::c3]:80", }), @@ -1104,7 +1417,7 @@ mod tests { "127.0.0.3:80", vec![svc(IpFamilies::Ipv6Only), workload.clone()], Some(ExpectedRequest { - protocol: Protocol::HBONE, + protocol: OutboundProtocol::HBONE, hbone_destination: "[ff06::c3]:80", destination: "[ff06::c3]:15008", }), @@ -1116,7 +1429,7 @@ mod tests { "127.0.0.3:80", vec![svc(IpFamilies::Ipv4Only), workload.clone()], Some(ExpectedRequest { - protocol: Protocol::HBONE, + protocol: OutboundProtocol::HBONE, hbone_destination: "127.0.0.2:80", destination: "127.0.0.2:15008", }), @@ -1128,7 +1441,7 @@ mod tests { "127.0.0.3:80", vec![svc(IpFamilies::Dual), workload.clone()], Some(ExpectedRequest { - protocol: Protocol::HBONE, + protocol: OutboundProtocol::HBONE, hbone_destination: "127.0.0.2:80", destination: "127.0.0.2:15008", }), @@ -1140,7 +1453,7 @@ mod tests { "[::3]:80", vec![svc(IpFamilies::Dual), workload.clone()], Some(ExpectedRequest { - protocol: Protocol::HBONE, + protocol: OutboundProtocol::HBONE, hbone_destination: "[ff06::c3]:80", destination: "[ff06::c3]:15008", }), @@ -1173,7 +1486,7 @@ mod tests { #[derive(PartialEq, Debug)] struct ExpectedRequest<'a> { - protocol: Protocol, + protocol: OutboundProtocol, hbone_destination: &'a str, destination: &'a str, } diff --git a/src/proxy/pool.rs b/src/proxy/pool.rs index d603ebdce3..4b55b9dd6a 100644 --- a/src/proxy/pool.rs +++ b/src/proxy/pool.rs @@ -525,6 +525,7 @@ mod test { use std::sync::RwLock; use std::sync::atomic::AtomicU32; use std::time::Duration; + use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; @@ -537,6 +538,8 @@ mod test { use crate::identity::Identity; + use self::h2::TokioH2Stream; + use super::*; use crate::drain::DrainWatcher; use crate::state::workload; @@ -590,6 +593,34 @@ mod test { assert_opens_drops!(srv, 1, 1); } + /// This is really a test for TokioH2Stream, but its nicer here because we have access to + /// streams + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn small_reads() { + let (mut pool, srv) = setup_test(3).await; + + let key = key(&srv, 2); + let req = || { + http::Request::builder() + .uri(srv.addr.to_string()) + .method(http::Method::CONNECT) + .version(http::Version::HTTP_2) + .body(()) + .unwrap() + }; + + let c = pool.send_request_pooled(&key.clone(), req()).await.unwrap(); + let mut c = TokioH2Stream::new(c); + c.write_all(b"abcde").await.unwrap(); + let mut b = [0u8; 0]; + // Crucially, this should error rather than panic. + if let Err(e) = c.read(&mut b).await { + assert_eq!(e.kind(), io::ErrorKind::Other); + } else { + panic!("Should have errored"); + } + } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn unique_keys_have_unique_connections() { let (pool, mut srv) = setup_test(3).await; diff --git a/src/state.rs b/src/state.rs index 9f605d4d92..756c13c08a 100644 --- a/src/state.rs +++ b/src/state.rs @@ -62,7 +62,8 @@ pub struct Upstream { pub workload: Arc, /// selected_workload_ip defines the IP address we should actually use to connect to this workload /// This handles multiple IPs (dual stack) or Hostname destinations (DNS resolution) - pub selected_workload_ip: IpAddr, + /// The workload IP might be empty if we have to go through a network gateway. + pub selected_workload_ip: Option, /// Port is the port we should connect to pub port: u16, /// Service SANs defines SANs defined at the service level *only*. A complete view of things requires @@ -73,8 +74,9 @@ pub struct Upstream { } impl Upstream { - pub fn workload_socket_addr(&self) -> SocketAddr { - SocketAddr::new(self.selected_workload_ip, self.port) + pub fn workload_socket_addr(&self) -> Option { + self.selected_workload_ip + .map(|ip| SocketAddr::new(ip, self.port)) } pub fn workload_and_services_san(&self) -> Vec { self.service_sans @@ -89,6 +91,19 @@ impl Upstream { .chain(std::iter::once(self.workload.identity())) .collect() } + + pub fn service_sans(&self) -> Vec { + self.service_sans + .iter() + .flat_map(|san| match Identity::from_str(san) { + Ok(id) => Some(id), + Err(err) => { + warn!("ignoring invalid SAN {}: {}", san, err); + None + } + }) + .collect() + } } // Workload information that a specific proxy instance represents. This is used to cross check @@ -567,13 +582,13 @@ impl DemandProxyState { src_workload: &Workload, original_target_address: SocketAddr, ip_family_restriction: Option, - ) -> Result { + ) -> Result, Error> { // If the user requested the pod by a specific IP, use that directly. if dst_workload .workload_ips .contains(&original_target_address.ip()) { - return Ok(original_target_address.ip()); + return Ok(Some(original_target_address.ip())); } // They may have 1 or 2 IPs (single/dual stack) // Ensure we are meeting the Service family restriction (if any is defined). @@ -588,14 +603,19 @@ impl DemandProxyState { }) .find_or_first(|ip| ip.is_ipv6() == original_target_address.is_ipv6()) { - return Ok(*ip); + return Ok(Some(*ip)); } if dst_workload.hostname.is_empty() { - debug!( - "workload {} has no suitable workload IPs for routing", - dst_workload.name - ); - return Err(Error::NoValidDestination(Box::new(dst_workload.clone()))); + if dst_workload.network_gateway.is_none() { + debug!( + "workload {} has no suitable workload IPs for routing", + dst_workload.name + ); + return Err(Error::NoValidDestination(Box::new(dst_workload.clone()))); + } else { + // We can route through network gateway + return Ok(None); + } } let ip = Box::pin(self.resolve_workload_address( dst_workload, @@ -603,7 +623,7 @@ impl DemandProxyState { original_target_address, )) .await?; - Ok(ip) + Ok(Some(ip)) } async fn resolve_workload_address( @@ -781,6 +801,65 @@ impl DemandProxyState { Ok(Some(res)) } + /// Returns destination address, upstream sans, and final sans, for + /// connecting to a remote workload through a gateway. + /// Would be nice to return this as an Upstream, but gateways don't necessarily + /// have workloads. That is, they could just be IPs without a corresponding workload. + pub async fn fetch_network_gateway( + &self, + gw_address: &GatewayAddress, + source_workload: &Workload, + original_destination_address: SocketAddr, + ) -> Result { + let (res, target_address) = match &gw_address.destination { + Destination::Address(ip) => { + let addr = SocketAddr::new(ip.address, gw_address.hbone_mtls_port); + let us = self.state.read().unwrap().find_upstream( + ip.network.clone(), + source_workload, + addr, + ServiceResolutionMode::Standard, + ); + // If the workload references a network gateway by IP, use that IP as the destination. + // Note this means that an IPv6 call may be translated to IPv4 if the network + // gateway is specified as an IPv4 address. + // For this reason, the Hostname method is preferred which can adapt to the callers IP family. + (us, addr) + } + Destination::Hostname(host) => { + let state = self.read(); + match state.find_hostname(host) { + Some(Address::Service(s)) => { + let us = state.find_upstream_from_service( + source_workload, + gw_address.hbone_mtls_port, + ServiceResolutionMode::Standard, + s, + ); + // For hostname, use the original_destination_address as the target so we can + // adapt to the callers IP family. + (us, original_destination_address) + } + Some(Address::Workload(w)) => { + let us = Some((w, gw_address.hbone_mtls_port, None)); + (us, original_destination_address) + } + None => { + return Err(Error::UnknownNetworkGateway(format!( + "network gateway {} not found", + host.hostname + ))); + } + } + } + }; + self.finalize_upstream(source_workload, target_address, res) + .await? + .ok_or_else(|| { + Error::UnknownNetworkGateway(format!("network gateway {:?} not found", gw_address)) + }) + } + async fn fetch_waypoint( &self, gw_address: &GatewayAddress, diff --git a/src/state/workload.rs b/src/state/workload.rs index b2aa92b35e..f850357f2c 100644 --- a/src/state/workload.rs +++ b/src/state/workload.rs @@ -41,6 +41,7 @@ use xds::istio::workload::ApplicationTunnel as XdsApplicationTunnel; use xds::istio::workload::GatewayAddress as XdsGatewayAddress; use xds::istio::workload::Workload as XdsWorkload; +// The protocol that the final workload expects #[derive( Default, Debug, @@ -54,17 +55,48 @@ use xds::istio::workload::Workload as XdsWorkload; serde::Serialize, serde::Deserialize, )] -pub enum Protocol { +pub enum InboundProtocol { #[default] TCP, HBONE, } -impl From for Protocol { +impl From for InboundProtocol { fn from(value: xds::istio::workload::TunnelProtocol) -> Self { match value { - xds::istio::workload::TunnelProtocol::Hbone => Protocol::HBONE, - xds::istio::workload::TunnelProtocol::None => Protocol::TCP, + xds::istio::workload::TunnelProtocol::Hbone => InboundProtocol::HBONE, + xds::istio::workload::TunnelProtocol::None => InboundProtocol::TCP, + } + } +} + +// The protocol that the sender should use to send data. Can be different from ServerProtocol when there is a +// proxy in the middle (e.g. e/w gateway with double hbone). +#[derive( + Default, + Debug, + Hash, + Eq, + PartialEq, + Ord, + PartialOrd, + Clone, + Copy, + serde::Serialize, + serde::Deserialize, +)] +pub enum OutboundProtocol { + #[default] + TCP, + HBONE, + DOUBLEHBONE, +} + +impl From for OutboundProtocol { + fn from(value: InboundProtocol) -> Self { + match value { + InboundProtocol::HBONE => OutboundProtocol::HBONE, + InboundProtocol::TCP => OutboundProtocol::TCP, } } } @@ -189,7 +221,7 @@ pub struct Workload { pub network_gateway: Option, #[serde(default)] - pub protocol: Protocol, + pub protocol: InboundProtocol, #[serde(default)] pub network_mode: NetworkMode, @@ -409,7 +441,7 @@ impl TryFrom for (Workload, HashMap) { waypoint: wp, network_gateway: network_gw, - protocol: Protocol::from(xds::istio::workload::TunnelProtocol::try_from( + protocol: InboundProtocol::from(xds::istio::workload::TunnelProtocol::try_from( resource.tunnel_protocol, )?), network_mode: NetworkMode::from(xds::istio::workload::NetworkMode::try_from( @@ -714,7 +746,7 @@ impl WorkloadByAddr { let is_pod = w.uid.contains("//Pod/"); // We fallback to looking for HBONE -- a resource marked as in the mesh is likely // to have more useful context than one not in the mesh. - let is_hbone = w.protocol == Protocol::HBONE; + let is_hbone = w.protocol == InboundProtocol::HBONE; match (is_pod, is_hbone) { (true, true) => 3, (true, false) => 2, diff --git a/src/test_helpers.rs b/src/test_helpers.rs index b11b595756..7ce762f0f8 100644 --- a/src/test_helpers.rs +++ b/src/test_helpers.rs @@ -15,11 +15,11 @@ use crate::config::ConfigSource; use crate::config::{self, RootCert}; use crate::state::service::{Endpoint, EndpointSet, Service}; -use crate::state::workload::Protocol::{HBONE, TCP}; +use crate::state::workload::InboundProtocol::{HBONE, TCP}; use crate::state::workload::{ GatewayAddress, NamespacedHostname, NetworkAddress, Workload, gatewayaddress, }; -use crate::state::workload::{HealthStatus, Protocol}; +use crate::state::workload::{HealthStatus, InboundProtocol}; use crate::state::{DemandProxyState, ProxyState}; use crate::xds::istio::security::Authorization as XdsAuthorization; use crate::xds::istio::workload::Address as XdsAddress; @@ -233,7 +233,7 @@ pub fn test_default_workload() -> Workload { fn test_custom_workload( ip_str: &str, name: &str, - protocol: Protocol, + protocol: InboundProtocol, echo_port: u16, services_vec: Vec<&Service>, hostname_only: bool, diff --git a/src/test_helpers/linux.rs b/src/test_helpers/linux.rs index eb90d76d00..ac0c4fd2c3 100644 --- a/src/test_helpers/linux.rs +++ b/src/test_helpers/linux.rs @@ -16,6 +16,7 @@ use crate::config::{ConfigSource, ProxyMode}; use crate::rbac::Authorization; use crate::state::service::{Endpoint, Service}; use crate::state::workload::{HealthStatus, Workload, gatewayaddress}; +use crate::strng::Strng; use crate::test_helpers::app::TestApp; use crate::test_helpers::netns::{Namespace, Resolver}; use crate::test_helpers::*; @@ -26,6 +27,7 @@ use crate::inpod::istio::zds::WorkloadInfo; use crate::signal::ShutdownTrigger; use crate::test_helpers::inpod::start_ztunnel_server; use crate::test_helpers::linux::TestMode::{Dedicated, Shared}; +use arcstr::ArcStr; use itertools::Itertools; use nix::unistd::mkdtemp; use std::net::IpAddr; @@ -350,6 +352,11 @@ impl<'a> TestServiceBuilder<'a> { self } + pub fn subject_alt_names(mut self, mut sans: Vec) -> Self { + self.s.subject_alt_names.append(&mut sans); + self + } + /// Set the service waypoint pub fn waypoint(mut self, waypoint: IpAddr) -> Self { self.s.waypoint = Some(GatewayAddress { @@ -414,6 +421,11 @@ impl<'a> TestWorkloadBuilder<'a> { self } + pub fn network(mut self, network: Strng) -> Self { + self.w.workload.network = network; + self + } + pub fn identity(mut self, identity: identity::Identity) -> Self { match identity { identity::Identity::Spiffe { @@ -453,12 +465,17 @@ impl<'a> TestWorkloadBuilder<'a> { self } - /// Set a waypoint to the workload + /// Mutate the workload pub fn mutate_workload(mut self, f: impl FnOnce(&mut Workload)) -> Self { f(&mut self.w.workload); self } + pub fn network_gateway(mut self, network_gateway: GatewayAddress) -> Self { + self.w.workload.network_gateway = Some(network_gateway); + self + } + /// Append a service to the workload pub fn service(mut self, service: &str, server_port: u16, target_port: u16) -> Self { self.w @@ -505,7 +522,13 @@ impl<'a> TestWorkloadBuilder<'a> { .namespaces .child(&self.w.workload.node, &self.w.workload.name)? }; - self.w.workload.workload_ips = vec![network_namespace.ip()]; + if self.w.workload.network_gateway.is_some() { + // This is a little inefficient, because we create the + // namespace, but never actually use it. + self.w.workload.workload_ips = vec![]; + } else { + self.w.workload.workload_ips = vec![network_namespace.ip()]; + } self.w.workload.uid = format!( "cluster1//v1/Pod/{}/{}", self.w.workload.namespace, self.w.workload.name, diff --git a/src/test_helpers/tcp.rs b/src/test_helpers/tcp.rs index d59317eb90..77f7cfc2e2 100644 --- a/src/test_helpers/tcp.rs +++ b/src/test_helpers/tcp.rs @@ -222,16 +222,19 @@ pub struct HboneTestServer { listener: TcpListener, mode: Mode, name: String, + /// Write this message when acting as waypoint to show that waypoint was hit. + waypoint_message: Vec, } impl HboneTestServer { - pub async fn new(mode: Mode, name: &str) -> Self { + pub async fn new(mode: Mode, name: &str, waypoint_message: Vec) -> Self { let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 15008); let listener = TcpListener::bind(addr).await.unwrap(); Self { listener, mode, name: name.to_string(), + waypoint_message, } } @@ -254,24 +257,28 @@ impl HboneTestServer { let mut tls_stream = crate::hyper_util::tls_server(acceptor, self.listener); let mode = self.mode; while let Some(socket) = tls_stream.next().await { + let waypoint_message = self.waypoint_message.clone(); if let Err(err) = http2::Builder::new(TokioExecutor) .serve_connection( TokioIo::new(socket), - service_fn(move |req| async move { - info!("waypoint: received request"); - tokio::task::spawn(async move { - match hyper::upgrade::on(req).await { - Ok(upgraded) => { - let mut io = TokioIo::new(upgraded); - // let (mut ri, mut wi) = tokio::io::split(TokioIo::new(upgraded)); - // Signal we are the waypoint so tests can validate this - io.write_all(b"waypoint\n").await.unwrap(); - handle_stream(mode, &mut io).await; + service_fn(move |req| { + let waypoint_message = waypoint_message.clone(); + async move { + info!("waypoint: received request"); + tokio::task::spawn(async move { + match hyper::upgrade::on(req).await { + Ok(upgraded) => { + let mut io = TokioIo::new(upgraded); + // let (mut ri, mut wi) = tokio::io::split(TokioIo::new(upgraded)); + // Signal we are the waypoint so tests can validate this + io.write_all(&waypoint_message[..]).await.unwrap(); + handle_stream(mode, &mut io).await; + } + Err(e) => error!("No upgrade {e}"), } - Err(e) => error!("No upgrade {e}"), - } - }); - Ok::<_, Infallible>(Response::new(Full::::from("streaming..."))) + }); + Ok::<_, Infallible>(Response::new(Full::::from("streaming..."))) + } }), ) .await diff --git a/src/tls/workload.rs b/src/tls/workload.rs index f8ad23dc49..92e05d40c8 100644 --- a/src/tls/workload.rs +++ b/src/tls/workload.rs @@ -31,6 +31,7 @@ use std::future::Future; use std::io; use std::pin::Pin; use std::sync::Arc; +use tokio::io::{AsyncRead, AsyncWrite}; use crate::strng::Strng; use crate::tls; @@ -163,19 +164,17 @@ pub struct OutboundConnector { } impl OutboundConnector { - pub async fn connect( - self, - stream: TcpStream, - ) -> Result, io::Error> { - let dest = ServerName::IpAddress( - stream - .peer_addr() - .expect("peer_addr must be set") - .ip() - .into(), - ); + pub async fn connect(self, stream: IO) -> Result, io::Error> + where + IO: AsyncRead + AsyncWrite + Unpin, + { let c = tokio_rustls::TlsConnector::from(self.client_config); - c.connect(dest, stream).await + // Use dummy value for domain because it doesn't matter. + c.connect( + ServerName::IpAddress(std::net::Ipv4Addr::new(0, 0, 0, 0).into()), + stream, + ) + .await } } diff --git a/tests/namespaced.rs b/tests/namespaced.rs index cf4de63065..7d7d0d4fe0 100644 --- a/tests/namespaced.rs +++ b/tests/namespaced.rs @@ -18,6 +18,8 @@ mod namespaced { use futures::future::poll_fn; use http_body_util::Empty; use std::collections::HashMap; + use ztunnel::state::workload::gatewayaddress::Destination; + use ztunnel::state::workload::{GatewayAddress, NamespacedHostname}; use std::net::{IpAddr, SocketAddr}; @@ -50,6 +52,8 @@ mod namespaced { use ztunnel::test_helpers::netns::{Namespace, Resolver}; use ztunnel::test_helpers::*; + const WAYPOINT_MESSAGE: &[u8] = b"waypoint\n"; + /// initialize_namespace_tests sets up the namespace tests. #[ctor::ctor] fn initialize_namespace_tests() { @@ -137,7 +141,12 @@ mod namespaced { let waypoint = manager.register_waypoint("waypoint", DEFAULT_NODE).await?; let waypoint_ip = waypoint.ip(); - run_hbone_server(waypoint, "waypoint")?; + run_hbone_server( + waypoint, + "waypoint", + tcp::Mode::ReadWrite, + WAYPOINT_MESSAGE.into(), + )?; manager .workload_builder("server", DEFAULT_NODE) @@ -187,6 +196,216 @@ mod namespaced { Ok(()) } + #[tokio::test] + async fn double_hbone1() -> anyhow::Result<()> { + let mut manager = setup_netns_test!(Shared); + + let zt = manager.deploy_ztunnel(DEFAULT_NODE).await?; + + // Service that resolves to workload with ew gateway + // The 8080 port mappings don't actually matter because the + // final ztunnel is actually an hbone echo server that doesn't + // forward anything. + manager + .service_builder("remote") + .addresses(vec![NetworkAddress { + network: strng::EMPTY, + address: TEST_VIP.parse::()?, + }]) + .subject_alt_names(vec!["spiffe://cluster.local/ns/default/sa/echo".into()]) + .ports(HashMap::from([(8080, 8080)])) + .register() + .await?; + + // This is the e/w gateway that is supposed to be in the remote cluster/network. + let actual_ew_gtw = manager + .workload_builder("actual-ew-gtw", "remote-node") + .hbone() + .network("remote".into()) + .register() + .await?; + + // This is the workload in the local cluster that represents the workloads in the remote cluster. + // Its local in the sense that the it shows up in the local cluster's xds, but it + // represents workloads in the remote cluster. + // Its a little weird because we do give it a namespaced/ip, + // but that's because of how the tests infra works. + let _local_remote_workload = manager + .workload_builder("local-remote-workload", "remote-node") + .hbone() + .network("remote".into()) + .network_gateway(GatewayAddress { + destination: Destination::Address(NetworkAddress { + network: "remote".into(), + address: actual_ew_gtw.ip(), + }), + hbone_mtls_port: 15008, + }) + .identity(identity::Identity::Spiffe { + trust_domain: "cluster.local".into(), + namespace: "default".into(), + service_account: "actual-ew-gtw".into(), + }) + .service("default/remote.default.svc.cluster.local", 8080, 8080) + .register() + .await?; + let echo = manager + .workload_builder("echo", "remote-node2") + .register() + .await?; + + let client = manager + .workload_builder("client", DEFAULT_NODE) + .register() + .await?; + + let echo_hbone_addr = SocketAddr::new(echo.ip(), 15008); + + // No need to run local_remote_workload, as it doesn't actually exist. + run_hbone_server( + echo.clone(), + "echo", + tcp::Mode::ReadWrite, + WAYPOINT_MESSAGE.into(), + )?; + run_hbone_server( + actual_ew_gtw.clone(), + "actual-ew-gtw", + tcp::Mode::Forward(echo_hbone_addr), + b"".into(), + )?; + + run_tcp_to_hbone_client( + client.clone(), + manager.resolver(), + &format!("{TEST_VIP}:8080"), + )?; + + let metrics = [ + (CONNECTIONS_OPENED, 1), + (CONNECTIONS_CLOSED, 1), + (BYTES_RECV, REQ_SIZE), + (BYTES_SENT, HBONE_REQ_SIZE), + ]; + verify_metrics(&zt, &metrics, &source_labels()).await; + + let sent = format!("{REQ_SIZE}"); + let recv = format!("{HBONE_REQ_SIZE}"); + let dst_addr = format!("{}:15008", actual_ew_gtw.ip()); + let want = HashMap::from([ + ("scope", "access"), + ("src.workload", "client"), + ("dst.workload", "actual-ew-gtw"), + ("dst.hbone_addr", "remote.default.svc.cluster.local:8080"), + ("dst.addr", &dst_addr), + ("bytes_sent", &sent), + ("bytes_recv", &recv), + ("direction", "outbound"), + ("message", "connection complete"), + ( + "src.identity", + "spiffe://cluster.local/ns/default/sa/client", + ), + ( + "dst.identity", + "spiffe://cluster.local/ns/default/sa/actual-ew-gtw", + ), + ]); + telemetry::testing::assert_contains(want); + Ok(()) + } + + #[tokio::test] + async fn double_hbone2() -> anyhow::Result<()> { + let mut manager = setup_netns_test!(Shared); + + let _zt = manager.deploy_ztunnel(DEFAULT_NODE).await?; + + // Service that resolves to workload with ew gateway that uses service addressing + manager + .service_builder("remote-svc-gtw") + .addresses(vec![NetworkAddress { + network: strng::EMPTY, + address: TEST_VIP2.parse::()?, + }]) + .subject_alt_names(vec!["spiffe://cluster.local/ns/default/sa/echo".into()]) + .ports(HashMap::from([(8080, 8080)])) + .register() + .await?; + + // Service that resolves to the ew gateway. + manager + .service_builder("ew-gtw-svc") + .addresses(vec![NetworkAddress { + network: strng::EMPTY, + address: TEST_VIP3.parse::()?, + }]) + .ports(HashMap::from([(15009u16, 15008u16)])) + .register() + .await?; + + // This is the e/w gateway that is supposed to be in the remote cluster/network. + let actual_ew_gtw = manager + .workload_builder("actual-ew-gtw", "remote-node") + .hbone() + .service( + "default/ew-gtw-svc.default.svc.cluster.local", + 15009u16, + 15008u16, + ) + .network("remote".into()) + .register() + .await?; + + // Like local_remote_workload, but the network gateway is service addressed + let _local_remote_workload_svc_gtw = manager + .workload_builder("local-remote-workload-svc-gtw", "remote-node") + .hbone() + .network("remote".into()) + .network_gateway(GatewayAddress { + destination: Destination::Hostname(NamespacedHostname { + namespace: "default".into(), + hostname: "ew-gtw-svc.default.svc.cluster.local".into(), + }), + hbone_mtls_port: 15009, + }) + .service( + "default/remote-svc-gtw.default.svc.cluster.local", + 8080, + 8080, + ) + .register() + .await?; + + let echo = manager + .workload_builder("echo", "remote-node2") + .register() + .await?; + + let client = manager + .workload_builder("client", DEFAULT_NODE) + .register() + .await?; + let echo_addr = SocketAddr::new(echo.ip(), 15008); + // No need to run local_remote_workload, as it doesn't actually exist. + + run_hbone_server(echo, "echo", tcp::Mode::ReadWrite, WAYPOINT_MESSAGE.into())?; + run_hbone_server( + actual_ew_gtw, + "actual-ew-gtw", + tcp::Mode::Forward(echo_addr), + b"".into(), + )?; + + run_tcp_to_hbone_client( + client.clone(), + manager.resolver(), + &format!("{TEST_VIP2}:8080"), + )?; + + Ok(()) + } + #[tokio::test] async fn service_waypoint() -> anyhow::Result<()> { let mut manager = setup_netns_test!(Shared); @@ -195,7 +414,12 @@ mod namespaced { let waypoint = manager.register_waypoint("waypoint", DEFAULT_NODE).await?; let waypoint_ip = waypoint.ip(); - run_hbone_server(waypoint, "waypoint")?; + run_hbone_server( + waypoint, + "waypoint", + tcp::Mode::ReadWrite, + WAYPOINT_MESSAGE.into(), + )?; let client = manager .workload_builder("client", DEFAULT_NODE) @@ -275,7 +499,12 @@ mod namespaced { ) .register() .await?; - run_hbone_server(waypoint, "waypoint")?; + run_hbone_server( + waypoint, + "waypoint", + tcp::Mode::ReadWrite, + WAYPOINT_MESSAGE.into(), + )?; manager .workload_builder("server", DEFAULT_NODE) @@ -338,7 +567,12 @@ mod namespaced { .mutate_workload(|w| w.hostname = "waypoint.example.com".into()) .register() .await?; - run_hbone_server(waypoint, "waypoint")?; + run_hbone_server( + waypoint, + "waypoint", + tcp::Mode::ReadWrite, + WAYPOINT_MESSAGE.into(), + )?; manager .workload_builder("server", DEFAULT_NODE) @@ -1156,6 +1390,8 @@ mod namespaced { } const TEST_VIP: &str = "10.10.0.1"; + const TEST_VIP2: &str = "10.10.0.2"; + const TEST_VIP3: &str = "10.10.0.3"; const SERVER_PORT: u16 = 8080; const SERVER_HOSTNAME: &str = "server.default.svc.cluster.local"; @@ -1478,10 +1714,15 @@ mod namespaced { } /// run_hbone_server deploys a simple echo server, deployed over HBONE, in the provided namespace - fn run_hbone_server(server: Namespace, name: &str) -> anyhow::Result<()> { + fn run_hbone_server( + server: Namespace, + name: &str, + mode: tcp::Mode, + waypoint_message: Vec, + ) -> anyhow::Result<()> { let name = name.to_string(); server.run_ready(move |ready| async move { - let echo = tcp::HboneTestServer::new(tcp::Mode::ReadWrite, &name).await; + let echo = tcp::HboneTestServer::new(mode, &name, waypoint_message).await; info!("Running hbone echo server at {}", echo.address()); ready.set_ready(); echo.run().await; @@ -1501,7 +1742,6 @@ mod namespaced { async fn hbone_read_write_stream(stream: &mut TcpStream) { const BODY: &[u8] = b"hello world"; - const WAYPOINT_MESSAGE: &[u8] = b"waypoint\n"; stream.write_all(BODY).await.unwrap(); let mut buf = [0; BODY.len() + WAYPOINT_MESSAGE.len()]; stream.read_exact(&mut buf).await.unwrap();