diff --git a/proto/workload.proto b/proto/workload.proto index 7cf0693da8..74ba6ed9f3 100644 --- a/proto/workload.proto +++ b/proto/workload.proto @@ -143,6 +143,9 @@ message LoadBalancing { // 3. Endpoints matching `[NETWORK]` // 4. Any endpoints FAILOVER = 2; + // In PASSTHROUGH mode, endpoint selection will not be done and traffic passes directly through to the original + // desitnation address. + PASSTHROUGH = 3; } enum HealthPolicy { // Only select healthy endpoints diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index 14b7903607..c241f6137d 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -35,7 +35,7 @@ use crate::proxy::{ConnectionOpen, ConnectionResult, DerivedWorkload, metrics}; use crate::drain::DrainWatcher; use crate::drain::run_with_drain; use crate::proxy::h2::{H2Stream, client::WorkloadKey}; -use crate::state::service::{Service, ServiceDescription}; +use crate::state::service::{LoadBalancerMode, Service, ServiceDescription}; use crate::state::workload::OutboundProtocol; use crate::state::workload::{InboundProtocol, NetworkAddress, Workload, address::Address}; use crate::state::{ServiceResolutionMode, Upstream}; @@ -517,8 +517,17 @@ impl OutboundConnection { ) .await? else { - if service.is_some() { - return Err(Error::NoHealthyUpstream(target)); + if let Some(service) = service { + if service. + load_balancer. + as_ref(). + // If we are not a passthrough service, we should have an upstream + map(|lb| lb.mode != LoadBalancerMode::Passthrough). + // If the service had no lb, we should have an upstream + unwrap_or(true) + { + return Err(Error::NoHealthyUpstream(target)); + } } debug!("built request as passthrough; no upstream found"); return Ok(Request { @@ -702,10 +711,10 @@ mod tests { use crate::state::WorkloadInfo; use crate::test_helpers::helpers::{initialize_telemetry, test_proxy_metrics}; use crate::test_helpers::new_proxy_state; - use crate::xds::istio::workload::TunnelProtocol as XdsProtocol; use crate::xds::istio::workload::Workload as XdsWorkload; use crate::xds::istio::workload::address::Type as XdsAddressType; use crate::xds::istio::workload::{IpFamilies, Port}; + use crate::xds::istio::workload::{LoadBalancing, TunnelProtocol as XdsProtocol}; use crate::xds::istio::workload::{ NamespacedHostname as XdsNamespacedHostname, NetworkAddress as XdsNetworkAddress, PortList, }; @@ -1751,6 +1760,88 @@ mod tests { .await; } + #[tokio::test] + async fn build_request_passthrough_svc() { + run_build_request( + "127.0.0.1", + "1.2.3.4:80", + XdsAddressType::Service(XdsService { + hostname: "example.com".to_string(), + waypoint: None, + load_balancing: Some(LoadBalancing { + mode: xds::istio::workload::load_balancing::Mode::Passthrough.into(), + ..Default::default() + }), + addresses: vec![ + XdsNetworkAddress { + network: "".to_string(), + address: vec![1, 2, 3, 4], + }, + XdsNetworkAddress { + network: "".to_string(), + address: vec![1, 5, 6, 7], + }, + ], + ports: vec![Port { + service_port: 80, + target_port: 80, + }], + ..Default::default() + }), + Some(ExpectedRequest { + protocol: OutboundProtocol::TCP, + hbone_destination: "", + destination: "1.2.3.4:80", + }), + ) + .await; + } + + #[tokio::test] + async fn build_request_passthrough_svc_with_waypoint() { + run_build_request( + "127.0.0.1", + "1.2.3.4:80", + XdsAddressType::Service(XdsService { + hostname: "example.com".to_string(), + waypoint: Some(xds::istio::workload::GatewayAddress { + destination: Some(xds::istio::workload::gateway_address::Destination::Address( + XdsNetworkAddress { + network: "".to_string(), + address: [127, 0, 0, 10].to_vec(), + }, + )), + hbone_mtls_port: 15008, + }), + load_balancing: Some(LoadBalancing { + mode: xds::istio::workload::load_balancing::Mode::Passthrough.into(), + ..Default::default() + }), + addresses: vec![ + XdsNetworkAddress { + network: "".to_string(), + address: vec![1, 2, 3, 4], + }, + XdsNetworkAddress { + network: "".to_string(), + address: vec![1, 5, 6, 7], + }, + ], + ports: vec![Port { + service_port: 80, + target_port: 80, + }], + ..Default::default() + }), + Some(ExpectedRequest { + protocol: OutboundProtocol::HBONE, + destination: "127.0.0.10:15008", + hbone_destination: "1.2.3.4:80", + }), + ) + .await; + } + #[test] fn build_forwarded() { assert_eq!( diff --git a/src/state.rs b/src/state.rs index a4086cee24..8611d3510d 100644 --- a/src/state.rs +++ b/src/state.rs @@ -73,6 +73,12 @@ pub struct Upstream { pub destination_service: Option, } +#[derive(Clone, Debug, Eq, PartialEq)] +enum UpstreamDestination { + UpstreamParts(Arc, u16, Option>), + OriginalDestination, +} + impl Upstream { pub fn workload_socket_addr(&self) -> Option { self.selected_workload_ip @@ -288,11 +294,16 @@ impl ProxyState { source_workload: &Workload, addr: SocketAddr, resolution_mode: ServiceResolutionMode, - ) -> Option<(Arc, u16, Option>)> { + ) -> Option { if let Some(svc) = self .services .get_by_vip(&network_addr(network.clone(), addr.ip())) { + if let Some(lb) = &svc.load_balancer { + if lb.mode == LoadBalancerMode::Passthrough { + return Some(UpstreamDestination::OriginalDestination); + } + } return self.find_upstream_from_service( source_workload, addr.port(), @@ -304,7 +315,7 @@ impl ProxyState { .workloads .find_address(&network_addr(network, addr.ip())) { - return Some((wl, addr.port(), None)); + return Some(UpstreamDestination::UpstreamParts(wl, addr.port(), None)); } None } @@ -315,7 +326,7 @@ impl ProxyState { svc_port: u16, resolution_mode: ServiceResolutionMode, svc: Arc, - ) -> Option<(Arc, u16, Option>)> { + ) -> Option { // Randomly pick an upstream // TODO: do this more efficiently, and not just randomly let Some((ep, wl)) = self.load_balance(source_workload, &svc, svc_port, resolution_mode) @@ -343,7 +354,11 @@ impl ProxyState { return None; }; - Some((wl, target_port, Some(svc))) + Some(UpstreamDestination::UpstreamParts( + wl, + target_port, + Some(svc), + )) } fn load_balance<'a>( @@ -789,10 +804,11 @@ impl DemandProxyState { &self, source_workload: &Workload, original_target_address: SocketAddr, - upstream: Option<(Arc, u16, Option>)>, + upstream: Option, ) -> Result, Error> { - let Some((wl, port, svc)) = upstream else { - return Ok(None); + let (wl, port, svc) = match upstream { + Some(UpstreamDestination::UpstreamParts(wl, port, svc)) => (wl, port, svc), + None | Some(UpstreamDestination::OriginalDestination) => return Ok(None), }; let svc_desc = svc.clone().map(|s| ServiceDescription::from(s.as_ref())); let ip_family_restriction = svc.as_ref().and_then(|s| s.ip_families); @@ -855,7 +871,11 @@ impl DemandProxyState { (us, original_destination_address) } Some(Address::Workload(w)) => { - let us = Some((w, gw_address.hbone_mtls_port, None)); + let us = Some(UpstreamDestination::UpstreamParts( + w, + gw_address.hbone_mtls_port, + None, + )); (us, original_destination_address) } None => { @@ -912,7 +932,11 @@ impl DemandProxyState { (us, original_destination_address) } Some(Address::Workload(w)) => { - let us = Some((w, gw_address.hbone_mtls_port, None)); + let us = Some(UpstreamDestination::UpstreamParts( + w, + gw_address.hbone_mtls_port, + None, + )); (us, original_destination_address) } None => { @@ -1369,9 +1393,11 @@ mod tests { _ => ServiceResolutionMode::Standard, }; - let (_, port, _) = state - .find_upstream("".into(), &wl, "10.0.0.1:80".parse().unwrap(), mode) - .expect("upstream to be found"); + let port = match state.find_upstream("".into(), &wl, "10.0.0.1:80".parse().unwrap(), mode) { + Some(UpstreamDestination::UpstreamParts(_, port, _)) => port, + _ => panic!("upstream to be found"), + }; + assert_eq!(port, tc.expected_port()); } diff --git a/src/state/service.rs b/src/state/service.rs index a4fb8e9b01..ceca1705ac 100644 --- a/src/state/service.rs +++ b/src/state/service.rs @@ -123,6 +123,9 @@ pub enum LoadBalancerMode { Strict, // Prefer select endpoints matching all LoadBalancerScopes when picking endpoints but allow mismatches Failover, + // In PASSTHROUGH mode, endpoint selection will not be done and traffic passes directly through to the original + // desitnation address. + Passthrough, } impl From for LoadBalancerMode { @@ -133,6 +136,9 @@ impl From for LoadBalancerMode { xds::istio::workload::load_balancing::Mode::UnspecifiedMode => { LoadBalancerMode::Standard } + xds::istio::workload::load_balancing::Mode::Passthrough => { + LoadBalancerMode::Passthrough + } } } } diff --git a/src/state/workload.rs b/src/state/workload.rs index 0a85fc9dde..ab249bab23 100644 --- a/src/state/workload.rs +++ b/src/state/workload.rs @@ -906,7 +906,7 @@ pub enum WorkloadError { mod tests { use super::*; use crate::config::ConfigSource; - use crate::state::{DemandProxyState, ProxyState, ServiceResolutionMode}; + use crate::state::{DemandProxyState, ProxyState, ServiceResolutionMode, UpstreamDestination}; use crate::test_helpers::helpers::initialize_telemetry; use crate::xds::istio::workload::PortList as XdsPortList; use crate::xds::istio::workload::Service as XdsService; @@ -1828,12 +1828,14 @@ mod tests { .try_into() .unwrap(); for _ in 0..1000 { - if let Some((workload, _, _)) = state.state.read().unwrap().find_upstream( - strng::EMPTY, - &wl, - "127.0.1.1:80".parse().unwrap(), - ServiceResolutionMode::Standard, - ) { + if let Some(UpstreamDestination::UpstreamParts(workload, _, _)) = + state.state.read().unwrap().find_upstream( + strng::EMPTY, + &wl, + "127.0.1.1:80".parse().unwrap(), + ServiceResolutionMode::Standard, + ) + { let n = &workload.name; // borrow name instead of cloning found.insert(n.to_string()); // insert an owned copy of the borrowed n wants.remove(&n.to_string()); // remove using the borrow @@ -1871,17 +1873,16 @@ mod tests { // Make sure we get a valid workload assert!(wl.is_some()); assert_eq!(wl.as_ref().unwrap().service_account, "default"); - let (_, port, svc) = demand - .state - .read() - .unwrap() - .find_upstream( - strng::EMPTY, - wl.as_ref().unwrap(), - "127.10.0.1:80".parse().unwrap(), - ServiceResolutionMode::Standard, - ) - .expect("should get"); + + let (port, svc) = match demand.state.read().unwrap().find_upstream( + strng::EMPTY, + wl.as_ref().unwrap(), + "127.10.0.1:80".parse().unwrap(), + ServiceResolutionMode::Standard, + ) { + Some(UpstreamDestination::UpstreamParts(_, port, svc)) => (port, svc), + _ => panic!("should get"), + }; // Make sure we get a valid VIP assert_eq!(port, 8080); assert_eq!( @@ -1890,17 +1891,15 @@ mod tests { ); // test that we can have a service in another network than workloads it selects - let (_, port, _) = demand - .state - .read() - .unwrap() - .find_upstream( - "remote".into(), - wl.as_ref().unwrap(), - "127.10.0.2:80".parse().unwrap(), - ServiceResolutionMode::Standard, - ) - .expect("should get"); + let port = match demand.state.read().unwrap().find_upstream( + "remote".into(), + wl.as_ref().unwrap(), + "127.10.0.2:80".parse().unwrap(), + ServiceResolutionMode::Standard, + ) { + Some(UpstreamDestination::UpstreamParts(_, port, _)) => port, + _ => panic!("should get"), + }; // Make sure we get a valid VIP assert_eq!(port, 8080); }