From 31d8a2d1a292730f4f61add02fbc92f4d1a2dd5f Mon Sep 17 00:00:00 2001 From: Sanskar Jaiswal Date: Fri, 1 Dec 2023 14:23:20 +0000 Subject: [PATCH] tcproute: add support for round-robin load balancing Add support for round-robin load balancing for `TCPRoute`. This enables mutliple backend references in a UDPRoute object, with traffic being distributed to each backend in a round-robin fashion. A new BPF map `TCP_CONNECTIONS` was introduced to help keep track of active TCP connections, with its key storing the client identifier and the value storing the backend's along with the state of the connection. Signed-off-by: Sanskar Jaiswal --- dataplane/api-server/src/lib.rs | 5 +- dataplane/api-server/src/server.rs | 35 +++++++- dataplane/common/src/lib.rs | 38 ++++++++- dataplane/ebpf/src/egress/tcp.rs | 62 +++++++++++--- dataplane/ebpf/src/ingress/tcp.rs | 133 ++++++++++++++++++++++++----- dataplane/ebpf/src/main.rs | 6 +- dataplane/ebpf/src/utils.rs | 63 +++++++++++++- dataplane/loader/src/main.rs | 29 ++++++- internal/dataplane/client/utils.go | 86 +++++++++---------- 9 files changed, 366 insertions(+), 91 deletions(-) diff --git a/dataplane/api-server/src/lib.rs b/dataplane/api-server/src/lib.rs index 579913df..bcd51e86 100644 --- a/dataplane/api-server/src/lib.rs +++ b/dataplane/api-server/src/lib.rs @@ -15,15 +15,16 @@ use aya::maps::{HashMap, MapData}; use tonic::transport::Server; use backends::backends_server::BackendsServer; -use common::{BackendKey, BackendList}; +use common::{BackendKey, BackendList, ClientKey, TCPBackend}; pub async fn start( addr: Ipv4Addr, port: u16, backends_map: HashMap, gateway_indexes_map: HashMap, + tcp_conns_map: HashMap, ) -> Result<(), Error> { - let server = server::BackendService::new(backends_map, gateway_indexes_map); + let server = server::BackendService::new(backends_map, gateway_indexes_map, tcp_conns_map); // TODO: mTLS https://github.com/Kong/blixt/issues/50 Server::builder() .add_service(BackendsServer::new(server)) diff --git a/dataplane/api-server/src/server.rs b/dataplane/api-server/src/server.rs index 91d0bf53..8c26a1a2 100644 --- a/dataplane/api-server/src/server.rs +++ b/dataplane/api-server/src/server.rs @@ -15,21 +15,24 @@ use tonic::{Request, Response, Status}; use crate::backends::backends_server::Backends; use crate::backends::{Confirmation, InterfaceIndexConfirmation, PodIp, Targets, Vip}; use crate::netutils::{if_name_for_routing_ip, if_nametoindex}; -use common::{Backend, BackendKey, BackendList, BACKENDS_ARRAY_CAPACITY}; +use common::{Backend, BackendKey, BackendList, ClientKey, TCPBackend, BACKENDS_ARRAY_CAPACITY}; pub struct BackendService { backends_map: Arc>>, gateway_indexes_map: Arc>>, + tcp_conns_map: Arc>>, } impl BackendService { pub fn new( backends_map: HashMap, gateway_indexes_map: HashMap, + tcp_conns_map: HashMap, ) -> BackendService { BackendService { backends_map: Arc::new(Mutex::new(backends_map)), gateway_indexes_map: Arc::new(Mutex::new(gateway_indexes_map)), + tcp_conns_map: Arc::new(Mutex::new(tcp_conns_map)), } } @@ -51,6 +54,31 @@ impl BackendService { backends_map.remove(&key)?; let mut gateway_indexes_map = self.gateway_indexes_map.lock().await; gateway_indexes_map.remove(&key)?; + + // Delete all entries in our tcp connection tracking map that this backend + // key was related to. This is needed because the TCPRoute might have been + // deleted when there we TCP connections active, so without the below logic + // they'll hang around forever. + // Its better to do this rather than maintain a reverse index because the index + // would need to be updated with each new connection and delete is less + // used operation. + let mut tcp_conns_map = self.tcp_conns_map.lock().await; + let mut expired_client_keys: Vec = vec![]; + tcp_conns_map.iter().for_each(|val| { + if let Ok((client_key, tcp_backned)) = val { + let bk = BackendKey { + ip: u32::from_be(key.ip), + port: u16::from_be(key.port as u16) as u32, + }; + if tcp_backned.backend_key == bk { + expired_client_keys.push(client_key); + } + } + }); + + for ck in expired_client_keys { + tcp_conns_map.remove(&ck)?; + } Ok(()) } } @@ -144,9 +172,10 @@ impl Backends for BackendService { match self.insert_and_reset_index(key, backend_list).await { Ok(_) => Ok(Response::new(Confirmation { confirmation: format!( - "success, vip {}:{} was updated", + "success, vip {}:{} was updated with {} backends", Ipv4Addr::from(vip.ip), - vip.port + vip.port, + count, ), })), Err(err) => Err(Status::internal(format!("failure: {}", err))), diff --git a/dataplane/common/src/lib.rs b/dataplane/common/src/lib.rs index 10fd7b7e..8369f665 100644 --- a/dataplane/common/src/lib.rs +++ b/dataplane/common/src/lib.rs @@ -20,7 +20,7 @@ pub struct Backend { #[cfg(feature = "user")] unsafe impl aya::Pod for Backend {} -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] #[repr(C)] pub struct BackendKey { pub ip: u32, @@ -40,3 +40,39 @@ pub struct BackendList { #[cfg(feature = "user")] unsafe impl aya::Pod for BackendList {} + +#[derive(Copy, Clone, Debug)] +#[repr(C)] +pub struct ClientKey { + pub ip: u32, + pub port: u32, +} + +#[cfg(feature = "user")] +unsafe impl aya::Pod for ClientKey {} + +#[derive(Copy, Clone, Debug, Default)] +#[repr(C)] +pub enum TCPState { + #[default] + Established, + FinWait1, + FinWait2, + Closing, + TimeWait, + Closed, +} + +#[cfg(feature = "user")] +unsafe impl aya::Pod for TCPState {} + +#[derive(Copy, Clone, Debug)] +#[repr(C)] +pub struct TCPBackend { + pub backend: Backend, + pub backend_key: BackendKey, + pub state: TCPState, +} + +#[cfg(feature = "user")] +unsafe impl aya::Pod for TCPBackend {} diff --git a/dataplane/ebpf/src/egress/tcp.rs b/dataplane/ebpf/src/egress/tcp.rs index 89829329..05490a12 100644 --- a/dataplane/ebpf/src/egress/tcp.rs +++ b/dataplane/ebpf/src/egress/tcp.rs @@ -12,11 +12,12 @@ use aya_bpf::{ programs::TcContext, }; use aya_log_ebpf::info; +use common::{ClientKey, TCPBackend, TCPState}; use network_types::{eth::EthHdr, ip::Ipv4Hdr, tcp::TcpHdr}; use crate::{ - utils::{csum_fold_helper, ptr_at}, - BLIXT_CONNTRACK, + utils::{csum_fold_helper, handle_tcp_conn_close, ptr_at}, + TCP_CONNECTIONS, }; pub fn handle_tcp_egress(ctx: TcContext) -> Result { @@ -29,24 +30,26 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result { // capture some IP and port information let client_addr = unsafe { (*ip_hdr).dst_addr }; - let dest_port = unsafe { (*tcp_hdr).dest.to_be() }; - let ip_port_tuple = unsafe { BLIXT_CONNTRACK.get(&client_addr) }.ok_or(TC_ACT_PIPE)?; - - // verify traffic destination - if ip_port_tuple.1 as u16 != dest_port { - return Ok(TC_ACT_PIPE); - } + let dest_port = unsafe { (*tcp_hdr).dest }; + // The source identifier + let client_key = ClientKey { + ip: u32::from_be(client_addr), + port: u16::from_be(dest_port) as u32, + }; + let tcp_backend = unsafe { TCP_CONNECTIONS.get(&client_key) }.ok_or(TC_ACT_PIPE)?; info!( &ctx, - "Received TCP packet destined for tracked IP {:i}:{} setting source IP to VIP {:i}", + "Received TCP packet destined for tracked IP {:i}:{} setting source IP to VIP {:i}:{}", u32::from_be(client_addr), - ip_port_tuple.1 as u16, - u32::from_be(ip_port_tuple.0), + u16::from_be(dest_port), + tcp_backend.backend_key.ip, + tcp_backend.backend_key.port, ); + // SNAT the ip address unsafe { - (*ip_hdr).src_addr = ip_port_tuple.0; + (*ip_hdr).src_addr = tcp_backend.backend_key.ip; }; if (ctx.data() + EthHdr::LEN + Ipv4Hdr::LEN) > ctx.data_end() { @@ -68,6 +71,39 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result { unsafe { (*tcp_hdr).check = 0 }; // TODO: connection tracking cleanup https://github.com/kubernetes-sigs/blixt/issues/85 + // SNAT the port + unsafe { (*tcp_hdr).source = tcp_backend.backend_key.port as u16 }; + + let tcp_hdr_ref = unsafe { tcp_hdr.as_ref().ok_or(TC_ACT_OK)? }; + + // If the packet has the RST flag set, it means the connection is being terminated, so remove it + // from our map. + if tcp_hdr_ref.rst() == 1 { + unsafe { + TCP_CONNECTIONS.remove(&client_key)?; + } + } + + let mut tcp_state = tcp_backend.state; + let moved = handle_tcp_conn_close(tcp_hdr_ref, &mut tcp_state); + // If the connection has moved to the Closed state, stop tracking it. + if let TCPState::Closed = tcp_state { + unsafe { + TCP_CONNECTIONS.remove(&client_key)?; + } + // If the connection has not reached the Closed state yet, but it did advance to a new state, + // then record the new state. + } else if moved { + let bk = *tcp_backend; + let new_tcp_backend = TCPBackend { + backend: bk.backend, + backend_key: bk.backend_key, + state: tcp_state, + }; + unsafe { + TCP_CONNECTIONS.insert(&client_key, &new_tcp_backend, 0_u64)?; + } + } Ok(TC_ACT_PIPE) } diff --git a/dataplane/ebpf/src/ingress/tcp.rs b/dataplane/ebpf/src/ingress/tcp.rs index 3fb56c5b..6721358d 100644 --- a/dataplane/ebpf/src/ingress/tcp.rs +++ b/dataplane/ebpf/src/ingress/tcp.rs @@ -11,14 +11,14 @@ use aya_bpf::{ helpers::{bpf_csum_diff, bpf_redirect_neigh}, programs::TcContext, }; -use aya_log_ebpf::info; +use aya_log_ebpf::{debug, info}; use network_types::{eth::EthHdr, ip::Ipv4Hdr, tcp::TcpHdr}; use crate::{ - utils::{csum_fold_helper, ptr_at}, - BACKENDS, BLIXT_CONNTRACK, + utils::{csum_fold_helper, handle_tcp_conn_close, ptr_at}, + BACKENDS, GATEWAY_INDEXES, TCP_CONNECTIONS, }; -use common::BackendKey; +use common::{Backend, BackendKey, ClientKey, TCPBackend, TCPState, BACKENDS_ARRAY_CAPACITY}; pub fn handle_tcp_ingress(ctx: TcContext) -> Result { let ip_hdr: *mut Ipv4Hdr = unsafe { ptr_at(&ctx, EthHdr::LEN)? }; @@ -29,14 +29,59 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result { let original_daddr = unsafe { (*ip_hdr).dst_addr }; - let key = BackendKey { - ip: u32::from_be(original_daddr), - port: (u16::from_be(unsafe { (*tcp_hdr).dest })) as u32, + // The source identifier + let client_key = ClientKey { + ip: u32::from_be(unsafe { (*ip_hdr).src_addr }), + port: (u16::from_be(unsafe { (*tcp_hdr).source })) as u32, }; - let backend_list = unsafe { BACKENDS.get(&key) }.ok_or(TC_ACT_OK)?; - // Only a single backend is supported for TCP connections. - // TODO(aryan9600): Add support for multiple backends (https://github.com/kubernetes-sigs/blixt/issues/119) - let backend = backend_list.backends[0]; + // The backend that is responsible for handling this TCP connection. + let mut backend: Backend; + // Flag to check whether this is a new connection. + let mut new_conn = false; + // The state of this TCP connection. + let mut tcp_state = TCPState::default(); + + // Try to find the backend previously used for this connection. If not found, it means that + // this is a new connection, so assign it the next backend in line. + if let Some(val) = unsafe { TCP_CONNECTIONS.get(&client_key) } { + backend = val.backend; + tcp_state = val.state; + } else { + new_conn = true; + + let backend_key = BackendKey { + ip: u32::from_be(original_daddr), + port: (u16::from_be(unsafe { (*tcp_hdr).dest })) as u32, + }; + let backend_list = unsafe { BACKENDS.get(&backend_key) }.ok_or(TC_ACT_OK)?; + let backend_index = unsafe { GATEWAY_INDEXES.get(&backend_key) }.ok_or(TC_ACT_OK)?; + + debug!(&ctx, "Destination backend index: {}", *backend_index); + debug!(&ctx, "Backends length: {}", backend_list.backends_len); + + // this check asserts that we don't use a "zero-value" Backend + if backend_list.backends_len <= *backend_index { + return Ok(TC_ACT_OK); + } + // this check is to make the verifier happy + if *backend_index as usize >= BACKENDS_ARRAY_CAPACITY { + return Ok(TC_ACT_OK); + } + + backend = backend_list.backends[0]; + if let Some(val) = backend_list.backends.get(*backend_index as usize) { + backend = *val; + } + + // move the index to the next backend in our list + let mut next = *backend_index + 1; + if next >= backend_list.backends_len { + next = 0; + } + unsafe { + GATEWAY_INDEXES.insert(&backend_key, &next, 0_u64)?; + } + } info!( &ctx, @@ -45,6 +90,7 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result { u16::from_be(unsafe { (*tcp_hdr).dest }) ); + // DNAT the ip address unsafe { (*ip_hdr).dst_addr = backend.daddr.to_be(); } @@ -67,12 +113,13 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result { ) } as u64; unsafe { (*ip_hdr).check = csum_fold_helper(full_cksum) }; - - // Update destination port - unsafe { (*tcp_hdr).dest = (backend.dport as u16).to_be() }; // FIXME unsafe { (*tcp_hdr).check = 0 }; + let original_dport = unsafe { (*tcp_hdr).dest }; + // DNAT the port + unsafe { (*tcp_hdr).dest = (backend.dport as u16).to_be() }; + let action = unsafe { bpf_redirect_neigh( backend.ifindex as u32, @@ -82,15 +129,57 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result { ) }; - unsafe { - BLIXT_CONNTRACK.insert( - &(*ip_hdr).src_addr, - &(original_daddr, (*tcp_hdr).source.to_be() as u32), - 0 as u64, - )?; - }; + let tcp_hdr_ref = unsafe { tcp_hdr.as_ref().ok_or(TC_ACT_OK)? }; + + // If the connection is new, then record it in our map for future tracking. + if new_conn { + let tcp_backend = TCPBackend { + backend, + backend_key: BackendKey { + ip: original_daddr, + port: original_dport as u32, + }, + state: tcp_state, + }; + unsafe { + TCP_CONNECTIONS.insert(&client_key, &tcp_backend, 0_u64)?; + } + + // since this is a new connection, there is nothing else to do, so exit early + info!(&ctx, "redirect action: {}", action); + return Ok(action as i32); + } - info!(&ctx, "redirect action: {}", action); + // If the packet has the RST flag set, it means the connection is being terminated, so remove it + // from our map. + if tcp_hdr_ref.rst() == 1 { + unsafe { + TCP_CONNECTIONS.remove(&client_key)?; + } + } + let moved = handle_tcp_conn_close(tcp_hdr_ref, &mut tcp_state); + // If the connection has moved to the Closed state, stop tracking it. + if let TCPState::Closed = tcp_state { + unsafe { + TCP_CONNECTIONS.remove(&client_key)?; + } + // If the connection has not reached the Closed state yet, but it did advance to a new state, + // then record the new state. + } else if moved { + let tcp_backend = TCPBackend { + backend, + backend_key: BackendKey { + ip: original_daddr, + port: original_dport as u32, + }, + state: tcp_state, + }; + unsafe { + TCP_CONNECTIONS.insert(&client_key, &tcp_backend, 0_u64)?; + } + } + + info!(&ctx, "redirect action: {}", action); Ok(action as i32) } diff --git a/dataplane/ebpf/src/main.rs b/dataplane/ebpf/src/main.rs index 4e050d9b..cbcdfea2 100644 --- a/dataplane/ebpf/src/main.rs +++ b/dataplane/ebpf/src/main.rs @@ -22,7 +22,7 @@ use aya_bpf::{ programs::TcContext, }; -use common::{BackendKey, BackendList, BPF_MAPS_CAPACITY}; +use common::{BackendKey, BackendList, ClientKey, TCPBackend, BPF_MAPS_CAPACITY}; use egress::{icmp::handle_icmp_egress, tcp::handle_tcp_egress}; use ingress::{tcp::handle_tcp_ingress, udp::handle_udp_ingress}; @@ -48,6 +48,10 @@ static mut GATEWAY_INDEXES: HashMap = static mut BLIXT_CONNTRACK: HashMap = HashMap::::with_max_entries(BPF_MAPS_CAPACITY, 0); +#[map(name = "TCP_CONNECTIONS")] +static mut TCP_CONNECTIONS: HashMap = + HashMap::::with_max_entries(128, 0); + // ----------------------------------------------------------------------------- // Ingress // ----------------------------------------------------------------------------- diff --git a/dataplane/ebpf/src/utils.rs b/dataplane/ebpf/src/utils.rs index 4cc8aaf0..c588b729 100644 --- a/dataplane/ebpf/src/utils.rs +++ b/dataplane/ebpf/src/utils.rs @@ -4,8 +4,11 @@ Copyright 2023 The Kubernetes Authors. SPDX-License-Identifier: (GPL-2.0-only OR BSD-2-Clause) */ -use core::mem; use aya_bpf::{bindings::TC_ACT_OK, programs::TcContext}; +use core::mem; +use network_types::tcp::TcpHdr; + +use common::TCPState; // ----------------------------------------------------------------------------- // Helper Functions @@ -34,3 +37,61 @@ pub fn csum_fold_helper(mut csum: u64) -> u16 { } return !(csum as u16); } + +// Handles the termination of connection. +// Ref: https://en.wikipedia.org/wiki/File:Tcp_state_diagram.png and +// http://www.tcpipguide.com/free/t_TCPConnectionTermination-2.htm +#[inline(always)] +pub fn handle_tcp_conn_close(hdr: &TcpHdr, state: &mut TCPState) -> bool { + let fin = hdr.fin() == 1; + let ack = hdr.ack() == 1; + match state { + TCPState::Established => { + // At the Established state, a FIN packet moves the state to FinWait1. + if fin { + *state = TCPState::FinWait1; + return true; + } + } + TCPState::FinWait1 => { + // At the FinWait1 state, a packet with both the FIN and ACK bits set + // moves the state to TimeWait. + if fin && ack { + *state = TCPState::TimeWait; + return true; + } + // At the FinWait1 state, a FIN packet moves the state to Closing. + if fin { + *state = TCPState::Closing; + return true; + } + // At the FinWait1 state, an ACK packet moves the state to FinWait2. + if ack { + *state = TCPState::FinWait2; + return true; + } + } + TCPState::FinWait2 => { + // At the FinWait2 state, an ACK packet moves the state to TimeWait. + if ack { + *state = TCPState::TimeWait; + return true; + } + } + TCPState::Closing => { + // At the Closing state, an ACK packet moves the state to TimeWait. + if ack { + *state = TCPState::TimeWait; + return true; + } + } + TCPState::TimeWait => { + if ack { + *state = TCPState::Closed; + return true; + } + } + TCPState::Closed => {} + } + return false; +} diff --git a/dataplane/loader/src/main.rs b/dataplane/loader/src/main.rs index 88db3e08..f62fc4be 100644 --- a/dataplane/loader/src/main.rs +++ b/dataplane/loader/src/main.rs @@ -13,7 +13,7 @@ use aya::programs::{tc, SchedClassifier, TcAttachType}; use aya::{include_bytes_aligned, Bpf}; use aya_log::BpfLogger; use clap::Parser; -use common::{BackendKey, BackendList}; +use common::{BackendKey, BackendList, ClientKey, TCPBackend}; use log::{info, warn}; #[derive(Debug, Parser)] @@ -46,9 +46,21 @@ async fn main() -> Result<(), anyhow::Error> { .expect("no maps named GATEWAY_INDEXES"), ) .try_into()?; + let tcp_conns: HashMap<_, ClientKey, TCPBackend> = Map::HashMap( + MapData::from_pin(bpfd_maps.join("TCP_CONNECTIONS")) + .expect("no maps named TCP_CONNECTIONS"), + ) + .try_into()?; info!("starting api server"); - start_api_server(Ipv4Addr::new(0, 0, 0, 0), 9874, backends, gateway_indexes).await?; + start_api_server( + Ipv4Addr::new(0, 0, 0, 0), + 9874, + backends, + gateway_indexes, + tcp_conns, + ) + .await?; } else { info!("loading ebpf programs"); @@ -90,8 +102,19 @@ async fn main() -> Result<(), anyhow::Error> { bpf.take_map("GATEWAY_INDEXES") .expect("no maps named GATEWAY_INDEXES"), )?; + let tcp_conns: HashMap<_, ClientKey, TCPBackend> = HashMap::try_from( + bpf.take_map("TCP_CONNECTIONS") + .expect("no maps named TCP_CONNECTIONS"), + )?; - start_api_server(Ipv4Addr::new(0, 0, 0, 0), 9874, backends, gateway_indexes).await?; + start_api_server( + Ipv4Addr::new(0, 0, 0, 0), + 9874, + backends, + gateway_indexes, + tcp_conns, + ) + .await?; } info!("Exiting..."); diff --git a/internal/dataplane/client/utils.go b/internal/dataplane/client/utils.go index 8977beeb..29cf4e0e 100644 --- a/internal/dataplane/client/utils.go +++ b/internal/dataplane/client/utils.go @@ -31,7 +31,6 @@ import ( // CompileUDPRouteToDataPlaneBackend takes a UDPRoute and the Gateway it is // attached to and produces Backend Targets for the DataPlane to configure. func CompileUDPRouteToDataPlaneBackend(ctx context.Context, c client.Client, udproute *gatewayv1alpha2.UDPRoute, gateway *gatewayv1beta1.Gateway) (*Targets, error) { - gatewayIP, err := GetGatewayIP(gateway) if gatewayIP == nil { return nil, err @@ -98,19 +97,8 @@ func CompileUDPRouteToDataPlaneBackend(ctx context.Context, c client.Client, udp // CompileTCPRouteToDataPlaneBackend takes a TCPRoute and the Gateway it is // attached to and produces Backend Targets for the DataPlane to configure. -func CompileTCPRouteToDataPlaneBackend(ctx context.Context, c client.Client, tcproute *gatewayv1alpha2.TCPRoute, gateway *gatewayv1beta1.Gateway) (*Targets, error) { - // TODO: add support for multiple rules https://github.com/Kong/blixt/issues/10 - if len(tcproute.Spec.Rules) != 1 { - return nil, fmt.Errorf("currently can only support 1 TCPRoute rule, received %d", len(tcproute.Spec.Rules)) - } - rule := tcproute.Spec.Rules[0] - - // TODO: add support for multiple rules https://github.com/Kong/blixt/issues/10 - if len(rule.BackendRefs) != 1 { - return nil, fmt.Errorf("expect 1 backendRef received %d", len(rule.BackendRefs)) - } - backendRef := rule.BackendRefs[0] - +func CompileTCPRouteToDataPlaneBackend(ctx context.Context, c client.Client, + tcproute *gatewayv1alpha2.TCPRoute, gateway *gatewayv1beta1.Gateway) (*Targets, error) { gatewayIP, err := GetGatewayIP(gateway) if gatewayIP == nil { return nil, err @@ -120,42 +108,49 @@ func CompileTCPRouteToDataPlaneBackend(ctx context.Context, c client.Client, tcp if err != nil { return nil, err } - - // TODO only using one endpoint for now until https://github.com/Kong/blixt/issues/10 - var target *Target - if tcproute.DeletionTimestamp == nil { - endpoints, err := endpointsFromBackendRef(ctx, c, tcproute.Namespace, backendRef) - if err != nil { - return nil, err - } - - for _, subset := range endpoints.Subsets { - if len(subset.Addresses) < 1 { - return nil, fmt.Errorf("addresses not ready for endpoints") - } - if len(subset.Ports) < 1 { - return nil, fmt.Errorf("ports not ready for endpoints") + var backendTargets []*Target + for _, rule := range tcproute.Spec.Rules { + for _, backendRef := range rule.BackendRefs { + endpoints, err := endpointsFromBackendRef(ctx, c, tcproute.Namespace, backendRef) + if err != nil { + return nil, err } - if subset.Addresses[0].IP == "" { - return nil, fmt.Errorf("empty IP for endpoint subset") + if len(endpoints.Subsets) < 1 { + return nil, fmt.Errorf("endpoint has no subsets") } + for _, subset := range endpoints.Subsets { + if len(subset.Addresses) < 1 { + return nil, fmt.Errorf("addresses not ready for endpoints") + } + if len(subset.Ports) < 1 { + return nil, fmt.Errorf("ports not ready for endpoints") + } - ip := net.ParseIP(subset.Addresses[0].IP) - podip := binary.BigEndian.Uint32(ip.To4()) - podPort, err := getBackendPort(ctx, c, tcproute.Namespace, backendRef, subset.Ports) - if err != nil { - return nil, err - } + for _, addr := range subset.Addresses { + if addr.IP == "" { + return nil, fmt.Errorf("empty IP for endpoint subset") + } + + ip := net.ParseIP(addr.IP) + podip := binary.BigEndian.Uint32(ip.To4()) + podPort, err := getBackendPort(ctx, c, tcproute.Namespace, backendRef, subset.Ports) + if err != nil { + return nil, err + } - target = &Target{ - Daddr: podip, - Dport: uint32(podPort), + target := &Target{ + Daddr: podip, + Dport: uint32(podPort), + } + backendTargets = append(backendTargets, target) + } } } - if target == nil { - return nil, fmt.Errorf("endpoints not ready") - } + } + + if len(backendTargets) == 0 { + return nil, fmt.Errorf("no healthy backends") } ipint := binary.BigEndian.Uint32(gatewayIP.To4()) @@ -165,13 +160,14 @@ func CompileTCPRouteToDataPlaneBackend(ctx context.Context, c client.Client, tcp Ip: ipint, Port: gatewayPort, }, - // TODO(aryan9600): Add support for multiple targets (https://github.com/kubernetes-sigs/blixt/issues/119) - Targets: []*Target{target}, + Targets: backendTargets, } return targets, nil } +// func makeBackendTargets (ctx context.Context, c client.Client, backendRefs []gatewayv1beta1.BackendRef, ns string) ([]*Target, error) + func endpointsFromBackendRef(ctx context.Context, c client.Client, namespace string, backendRef gatewayv1alpha2.BackendRef) (*corev1.Endpoints, error) { if backendRef.Namespace != nil { namespace = string(*backendRef.Namespace)