Skip to content

Commit

Permalink
tcproute: add support for round-robin load balancing
Browse files Browse the repository at this point in the history
Add support for round-robin load balancing for `TCPRoute`. This enables
mutliple backend references in a UDPRoute object, with traffic being
distributed to each backend in a round-robin fashion.

A new BPF map `TCP_CONNECTIONS` was introduced to help keep track of
active TCP connections, with its key storing the client <ip:port>
identifier and the value storing the backend's <ip:port> along with
the state of the connection.

Signed-off-by: Sanskar Jaiswal <[email protected]>
  • Loading branch information
aryan9600 committed Dec 1, 2023
1 parent dc1a070 commit 31d8a2d
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 91 deletions.
5 changes: 3 additions & 2 deletions dataplane/api-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ use aya::maps::{HashMap, MapData};
use tonic::transport::Server;

use backends::backends_server::BackendsServer;
use common::{BackendKey, BackendList};
use common::{BackendKey, BackendList, ClientKey, TCPBackend};

pub async fn start(
addr: Ipv4Addr,
port: u16,
backends_map: HashMap<MapData, BackendKey, BackendList>,
gateway_indexes_map: HashMap<MapData, BackendKey, u16>,
tcp_conns_map: HashMap<MapData, ClientKey, TCPBackend>,
) -> Result<(), Error> {
let server = server::BackendService::new(backends_map, gateway_indexes_map);
let server = server::BackendService::new(backends_map, gateway_indexes_map, tcp_conns_map);
// TODO: mTLS https://github.com/Kong/blixt/issues/50
Server::builder()
.add_service(BackendsServer::new(server))
Expand Down
35 changes: 32 additions & 3 deletions dataplane/api-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,24 @@ use tonic::{Request, Response, Status};
use crate::backends::backends_server::Backends;
use crate::backends::{Confirmation, InterfaceIndexConfirmation, PodIp, Targets, Vip};
use crate::netutils::{if_name_for_routing_ip, if_nametoindex};
use common::{Backend, BackendKey, BackendList, BACKENDS_ARRAY_CAPACITY};
use common::{Backend, BackendKey, BackendList, ClientKey, TCPBackend, BACKENDS_ARRAY_CAPACITY};

pub struct BackendService {
backends_map: Arc<Mutex<HashMap<MapData, BackendKey, BackendList>>>,
gateway_indexes_map: Arc<Mutex<HashMap<MapData, BackendKey, u16>>>,
tcp_conns_map: Arc<Mutex<HashMap<MapData, ClientKey, TCPBackend>>>,
}

impl BackendService {
pub fn new(
backends_map: HashMap<MapData, BackendKey, BackendList>,
gateway_indexes_map: HashMap<MapData, BackendKey, u16>,
tcp_conns_map: HashMap<MapData, ClientKey, TCPBackend>,
) -> BackendService {
BackendService {
backends_map: Arc::new(Mutex::new(backends_map)),
gateway_indexes_map: Arc::new(Mutex::new(gateway_indexes_map)),
tcp_conns_map: Arc::new(Mutex::new(tcp_conns_map)),
}
}

Expand All @@ -51,6 +54,31 @@ impl BackendService {
backends_map.remove(&key)?;
let mut gateway_indexes_map = self.gateway_indexes_map.lock().await;
gateway_indexes_map.remove(&key)?;

// Delete all entries in our tcp connection tracking map that this backend
// key was related to. This is needed because the TCPRoute might have been
// deleted when there we TCP connections active, so without the below logic
// they'll hang around forever.
// Its better to do this rather than maintain a reverse index because the index
// would need to be updated with each new connection and delete is less
// used operation.
let mut tcp_conns_map = self.tcp_conns_map.lock().await;
let mut expired_client_keys: Vec<ClientKey> = vec![];
tcp_conns_map.iter().for_each(|val| {
if let Ok((client_key, tcp_backned)) = val {
let bk = BackendKey {
ip: u32::from_be(key.ip),
port: u16::from_be(key.port as u16) as u32,
};
if tcp_backned.backend_key == bk {
expired_client_keys.push(client_key);
}
}
});

for ck in expired_client_keys {
tcp_conns_map.remove(&ck)?;
}
Ok(())
}
}
Expand Down Expand Up @@ -144,9 +172,10 @@ impl Backends for BackendService {
match self.insert_and_reset_index(key, backend_list).await {
Ok(_) => Ok(Response::new(Confirmation {
confirmation: format!(
"success, vip {}:{} was updated",
"success, vip {}:{} was updated with {} backends",
Ipv4Addr::from(vip.ip),
vip.port
vip.port,
count,
),
})),
Err(err) => Err(Status::internal(format!("failure: {}", err))),
Expand Down
38 changes: 37 additions & 1 deletion dataplane/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct Backend {
#[cfg(feature = "user")]
unsafe impl aya::Pod for Backend {}

#[derive(Copy, Clone, Debug)]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[repr(C)]
pub struct BackendKey {
pub ip: u32,
Expand All @@ -40,3 +40,39 @@ pub struct BackendList {

#[cfg(feature = "user")]
unsafe impl aya::Pod for BackendList {}

#[derive(Copy, Clone, Debug)]
#[repr(C)]
pub struct ClientKey {
pub ip: u32,
pub port: u32,
}

#[cfg(feature = "user")]
unsafe impl aya::Pod for ClientKey {}

#[derive(Copy, Clone, Debug, Default)]
#[repr(C)]
pub enum TCPState {
#[default]
Established,
FinWait1,
FinWait2,
Closing,
TimeWait,
Closed,
}

#[cfg(feature = "user")]
unsafe impl aya::Pod for TCPState {}

#[derive(Copy, Clone, Debug)]
#[repr(C)]
pub struct TCPBackend {
pub backend: Backend,
pub backend_key: BackendKey,
pub state: TCPState,
}

#[cfg(feature = "user")]
unsafe impl aya::Pod for TCPBackend {}
62 changes: 49 additions & 13 deletions dataplane/ebpf/src/egress/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use aya_bpf::{
programs::TcContext,
};
use aya_log_ebpf::info;
use common::{ClientKey, TCPBackend, TCPState};
use network_types::{eth::EthHdr, ip::Ipv4Hdr, tcp::TcpHdr};

use crate::{
utils::{csum_fold_helper, ptr_at},
BLIXT_CONNTRACK,
utils::{csum_fold_helper, handle_tcp_conn_close, ptr_at},
TCP_CONNECTIONS,
};

pub fn handle_tcp_egress(ctx: TcContext) -> Result<i32, i64> {
Expand All @@ -29,24 +30,26 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result<i32, i64> {

// capture some IP and port information
let client_addr = unsafe { (*ip_hdr).dst_addr };
let dest_port = unsafe { (*tcp_hdr).dest.to_be() };
let ip_port_tuple = unsafe { BLIXT_CONNTRACK.get(&client_addr) }.ok_or(TC_ACT_PIPE)?;

// verify traffic destination
if ip_port_tuple.1 as u16 != dest_port {
return Ok(TC_ACT_PIPE);
}
let dest_port = unsafe { (*tcp_hdr).dest };
// The source identifier
let client_key = ClientKey {
ip: u32::from_be(client_addr),
port: u16::from_be(dest_port) as u32,
};
let tcp_backend = unsafe { TCP_CONNECTIONS.get(&client_key) }.ok_or(TC_ACT_PIPE)?;

info!(
&ctx,
"Received TCP packet destined for tracked IP {:i}:{} setting source IP to VIP {:i}",
"Received TCP packet destined for tracked IP {:i}:{} setting source IP to VIP {:i}:{}",
u32::from_be(client_addr),
ip_port_tuple.1 as u16,
u32::from_be(ip_port_tuple.0),
u16::from_be(dest_port),
tcp_backend.backend_key.ip,
tcp_backend.backend_key.port,
);

// SNAT the ip address
unsafe {
(*ip_hdr).src_addr = ip_port_tuple.0;
(*ip_hdr).src_addr = tcp_backend.backend_key.ip;
};

if (ctx.data() + EthHdr::LEN + Ipv4Hdr::LEN) > ctx.data_end() {
Expand All @@ -68,6 +71,39 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result<i32, i64> {
unsafe { (*tcp_hdr).check = 0 };

// TODO: connection tracking cleanup https://github.com/kubernetes-sigs/blixt/issues/85
// SNAT the port
unsafe { (*tcp_hdr).source = tcp_backend.backend_key.port as u16 };

let tcp_hdr_ref = unsafe { tcp_hdr.as_ref().ok_or(TC_ACT_OK)? };

// If the packet has the RST flag set, it means the connection is being terminated, so remove it
// from our map.
if tcp_hdr_ref.rst() == 1 {
unsafe {
TCP_CONNECTIONS.remove(&client_key)?;
}
}

let mut tcp_state = tcp_backend.state;
let moved = handle_tcp_conn_close(tcp_hdr_ref, &mut tcp_state);
// If the connection has moved to the Closed state, stop tracking it.
if let TCPState::Closed = tcp_state {
unsafe {
TCP_CONNECTIONS.remove(&client_key)?;
}
// If the connection has not reached the Closed state yet, but it did advance to a new state,
// then record the new state.
} else if moved {
let bk = *tcp_backend;
let new_tcp_backend = TCPBackend {
backend: bk.backend,
backend_key: bk.backend_key,
state: tcp_state,
};
unsafe {
TCP_CONNECTIONS.insert(&client_key, &new_tcp_backend, 0_u64)?;
}
}

Ok(TC_ACT_PIPE)
}
Loading

0 comments on commit 31d8a2d

Please sign in to comment.