Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proto/workload.proto
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ message LoadBalancing {
// 3. Endpoints matching `[NETWORK]`
// 4. Any endpoints
FAILOVER = 2;
// In PASSTHROUGH mode, endpoint selection will not be done and traffic passes directly through to the original
// desitnation address.
PASSTHROUGH = 3;
}
enum HealthPolicy {
// Only select healthy endpoints
Expand Down
99 changes: 95 additions & 4 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::proxy::{ConnectionOpen, ConnectionResult, DerivedWorkload, metrics};
use crate::drain::DrainWatcher;
use crate::drain::run_with_drain;
use crate::proxy::h2::{H2Stream, client::WorkloadKey};
use crate::state::service::{Service, ServiceDescription};
use crate::state::service::{LoadBalancerMode, Service, ServiceDescription};
use crate::state::workload::OutboundProtocol;
use crate::state::workload::{InboundProtocol, NetworkAddress, Workload, address::Address};
use crate::state::{ServiceResolutionMode, Upstream};
Expand Down Expand Up @@ -517,8 +517,17 @@ impl OutboundConnection {
)
.await?
else {
if service.is_some() {
return Err(Error::NoHealthyUpstream(target));
if let Some(service) = service {
if service.
load_balancer.
as_ref().
// If we are not a passthrough service, we should have an upstream
map(|lb| lb.mode != LoadBalancerMode::Passthrough).
// If the service had no lb, we should have an upstream
unwrap_or(true)
{
return Err(Error::NoHealthyUpstream(target));
}
}
debug!("built request as passthrough; no upstream found");
return Ok(Request {
Expand Down Expand Up @@ -702,10 +711,10 @@ mod tests {
use crate::state::WorkloadInfo;
use crate::test_helpers::helpers::{initialize_telemetry, test_proxy_metrics};
use crate::test_helpers::new_proxy_state;
use crate::xds::istio::workload::TunnelProtocol as XdsProtocol;
use crate::xds::istio::workload::Workload as XdsWorkload;
use crate::xds::istio::workload::address::Type as XdsAddressType;
use crate::xds::istio::workload::{IpFamilies, Port};
use crate::xds::istio::workload::{LoadBalancing, TunnelProtocol as XdsProtocol};
use crate::xds::istio::workload::{
NamespacedHostname as XdsNamespacedHostname, NetworkAddress as XdsNetworkAddress, PortList,
};
Expand Down Expand Up @@ -1751,6 +1760,88 @@ mod tests {
.await;
}

#[tokio::test]
async fn build_request_passthrough_svc() {
run_build_request(
"127.0.0.1",
"1.2.3.4:80",
XdsAddressType::Service(XdsService {
hostname: "example.com".to_string(),
waypoint: None,
load_balancing: Some(LoadBalancing {
mode: xds::istio::workload::load_balancing::Mode::Passthrough.into(),
..Default::default()
}),
addresses: vec![
XdsNetworkAddress {
network: "".to_string(),
address: vec![1, 2, 3, 4],
},
XdsNetworkAddress {
network: "".to_string(),
address: vec![1, 5, 6, 7],
},
],
ports: vec![Port {
service_port: 80,
target_port: 80,
}],
..Default::default()
}),
Some(ExpectedRequest {
protocol: OutboundProtocol::TCP,
hbone_destination: "",
destination: "1.2.3.4:80",
}),
)
.await;
}

#[tokio::test]
async fn build_request_passthrough_svc_with_waypoint() {
run_build_request(
"127.0.0.1",
"1.2.3.4:80",
XdsAddressType::Service(XdsService {
hostname: "example.com".to_string(),
waypoint: Some(xds::istio::workload::GatewayAddress {
destination: Some(xds::istio::workload::gateway_address::Destination::Address(
XdsNetworkAddress {
network: "".to_string(),
address: [127, 0, 0, 10].to_vec(),
},
)),
hbone_mtls_port: 15008,
}),
load_balancing: Some(LoadBalancing {
mode: xds::istio::workload::load_balancing::Mode::Passthrough.into(),
..Default::default()
}),
addresses: vec![
XdsNetworkAddress {
network: "".to_string(),
address: vec![1, 2, 3, 4],
},
XdsNetworkAddress {
network: "".to_string(),
address: vec![1, 5, 6, 7],
},
],
ports: vec![Port {
service_port: 80,
target_port: 80,
}],
..Default::default()
}),
Some(ExpectedRequest {
protocol: OutboundProtocol::HBONE,
destination: "127.0.0.10:15008",
hbone_destination: "1.2.3.4:80",
}),
)
.await;
}

#[test]
fn build_forwarded() {
assert_eq!(
Expand Down
50 changes: 38 additions & 12 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ pub struct Upstream {
pub destination_service: Option<ServiceDescription>,
}

#[derive(Clone, Debug, Eq, PartialEq)]
enum UpstreamDestination {
UpstreamParts(Arc<Workload>, u16, Option<Arc<Service>>),
OriginalDestination,
}

impl Upstream {
pub fn workload_socket_addr(&self) -> Option<SocketAddr> {
self.selected_workload_ip
Expand Down Expand Up @@ -288,11 +294,16 @@ impl ProxyState {
source_workload: &Workload,
addr: SocketAddr,
resolution_mode: ServiceResolutionMode,
) -> Option<(Arc<Workload>, u16, Option<Arc<Service>>)> {
) -> Option<UpstreamDestination> {
if let Some(svc) = self
.services
.get_by_vip(&network_addr(network.clone(), addr.ip()))
{
if let Some(lb) = &svc.load_balancer {
if lb.mode == LoadBalancerMode::Passthrough {
return Some(UpstreamDestination::OriginalDestination);
}
}
return self.find_upstream_from_service(
source_workload,
addr.port(),
Expand All @@ -304,7 +315,7 @@ impl ProxyState {
.workloads
.find_address(&network_addr(network, addr.ip()))
{
return Some((wl, addr.port(), None));
return Some(UpstreamDestination::UpstreamParts(wl, addr.port(), None));
}
None
}
Expand All @@ -315,7 +326,7 @@ impl ProxyState {
svc_port: u16,
resolution_mode: ServiceResolutionMode,
svc: Arc<Service>,
) -> Option<(Arc<Workload>, u16, Option<Arc<Service>>)> {
) -> Option<UpstreamDestination> {
// Randomly pick an upstream
// TODO: do this more efficiently, and not just randomly
let Some((ep, wl)) = self.load_balance(source_workload, &svc, svc_port, resolution_mode)
Expand Down Expand Up @@ -343,7 +354,11 @@ impl ProxyState {
return None;
};

Some((wl, target_port, Some(svc)))
Some(UpstreamDestination::UpstreamParts(
wl,
target_port,
Some(svc),
))
}

fn load_balance<'a>(
Expand Down Expand Up @@ -789,10 +804,11 @@ impl DemandProxyState {
&self,
source_workload: &Workload,
original_target_address: SocketAddr,
upstream: Option<(Arc<Workload>, u16, Option<Arc<Service>>)>,
upstream: Option<UpstreamDestination>,
) -> Result<Option<Upstream>, Error> {
let Some((wl, port, svc)) = upstream else {
return Ok(None);
let (wl, port, svc) = match upstream {
Some(UpstreamDestination::UpstreamParts(wl, port, svc)) => (wl, port, svc),
None | Some(UpstreamDestination::OriginalDestination) => return Ok(None),
};
let svc_desc = svc.clone().map(|s| ServiceDescription::from(s.as_ref()));
let ip_family_restriction = svc.as_ref().and_then(|s| s.ip_families);
Expand Down Expand Up @@ -855,7 +871,11 @@ impl DemandProxyState {
(us, original_destination_address)
}
Some(Address::Workload(w)) => {
let us = Some((w, gw_address.hbone_mtls_port, None));
let us = Some(UpstreamDestination::UpstreamParts(
w,
gw_address.hbone_mtls_port,
None,
));
(us, original_destination_address)
}
None => {
Expand Down Expand Up @@ -912,7 +932,11 @@ impl DemandProxyState {
(us, original_destination_address)
}
Some(Address::Workload(w)) => {
let us = Some((w, gw_address.hbone_mtls_port, None));
let us = Some(UpstreamDestination::UpstreamParts(
w,
gw_address.hbone_mtls_port,
None,
));
(us, original_destination_address)
}
None => {
Expand Down Expand Up @@ -1369,9 +1393,11 @@ mod tests {
_ => ServiceResolutionMode::Standard,
};

let (_, port, _) = state
.find_upstream("".into(), &wl, "10.0.0.1:80".parse().unwrap(), mode)
.expect("upstream to be found");
let port = match state.find_upstream("".into(), &wl, "10.0.0.1:80".parse().unwrap(), mode) {
Some(UpstreamDestination::UpstreamParts(_, port, _)) => port,
_ => panic!("upstream to be found"),
};

assert_eq!(port, tc.expected_port());
}

Expand Down
6 changes: 6 additions & 0 deletions src/state/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ pub enum LoadBalancerMode {
Strict,
// Prefer select endpoints matching all LoadBalancerScopes when picking endpoints but allow mismatches
Failover,
// In PASSTHROUGH mode, endpoint selection will not be done and traffic passes directly through to the original
// desitnation address.
Passthrough,
}

impl From<xds::istio::workload::load_balancing::Mode> for LoadBalancerMode {
Expand All @@ -133,6 +136,9 @@ impl From<xds::istio::workload::load_balancing::Mode> for LoadBalancerMode {
xds::istio::workload::load_balancing::Mode::UnspecifiedMode => {
LoadBalancerMode::Standard
}
xds::istio::workload::load_balancing::Mode::Passthrough => {
LoadBalancerMode::Passthrough
}
}
}
}
Expand Down
57 changes: 28 additions & 29 deletions src/state/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ pub enum WorkloadError {
mod tests {
use super::*;
use crate::config::ConfigSource;
use crate::state::{DemandProxyState, ProxyState, ServiceResolutionMode};
use crate::state::{DemandProxyState, ProxyState, ServiceResolutionMode, UpstreamDestination};
use crate::test_helpers::helpers::initialize_telemetry;
use crate::xds::istio::workload::PortList as XdsPortList;
use crate::xds::istio::workload::Service as XdsService;
Expand Down Expand Up @@ -1828,12 +1828,14 @@ mod tests {
.try_into()
.unwrap();
for _ in 0..1000 {
if let Some((workload, _, _)) = state.state.read().unwrap().find_upstream(
strng::EMPTY,
&wl,
"127.0.1.1:80".parse().unwrap(),
ServiceResolutionMode::Standard,
) {
if let Some(UpstreamDestination::UpstreamParts(workload, _, _)) =
state.state.read().unwrap().find_upstream(
strng::EMPTY,
&wl,
"127.0.1.1:80".parse().unwrap(),
ServiceResolutionMode::Standard,
)
{
let n = &workload.name; // borrow name instead of cloning
found.insert(n.to_string()); // insert an owned copy of the borrowed n
wants.remove(&n.to_string()); // remove using the borrow
Expand Down Expand Up @@ -1871,17 +1873,16 @@ mod tests {
// Make sure we get a valid workload
assert!(wl.is_some());
assert_eq!(wl.as_ref().unwrap().service_account, "default");
let (_, port, svc) = demand
.state
.read()
.unwrap()
.find_upstream(
strng::EMPTY,
wl.as_ref().unwrap(),
"127.10.0.1:80".parse().unwrap(),
ServiceResolutionMode::Standard,
)
.expect("should get");

let (port, svc) = match demand.state.read().unwrap().find_upstream(
strng::EMPTY,
wl.as_ref().unwrap(),
"127.10.0.1:80".parse().unwrap(),
ServiceResolutionMode::Standard,
) {
Some(UpstreamDestination::UpstreamParts(_, port, svc)) => (port, svc),
_ => panic!("should get"),
};
// Make sure we get a valid VIP
assert_eq!(port, 8080);
assert_eq!(
Expand All @@ -1890,17 +1891,15 @@ mod tests {
);

// test that we can have a service in another network than workloads it selects
let (_, port, _) = demand
.state
.read()
.unwrap()
.find_upstream(
"remote".into(),
wl.as_ref().unwrap(),
"127.10.0.2:80".parse().unwrap(),
ServiceResolutionMode::Standard,
)
.expect("should get");
let port = match demand.state.read().unwrap().find_upstream(
"remote".into(),
wl.as_ref().unwrap(),
"127.10.0.2:80".parse().unwrap(),
ServiceResolutionMode::Standard,
) {
Some(UpstreamDestination::UpstreamParts(_, port, _)) => port,
_ => panic!("should get"),
};
// Make sure we get a valid VIP
assert_eq!(port, 8080);
}
Expand Down