diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index a9bd396a..75550ee9 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -78,14 +78,16 @@ jobs: BLIXT_UDP_SERVER_IMAGE: "ghcr.io/kubernetes-sigs/blixt-udp-test-server" TAG: "integration-tests" - - name: run integration tests with bpfd - run: make test.integration - env: - BLIXT_CONTROLPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-controlplane" - BLIXT_DATAPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-dataplane" - BLIXT_UDP_SERVER_IMAGE: "ghcr.io/kubernetes-sigs/blixt-udp-test-server" - BLIXT_USE_BPFD: true - TAG: "integration-tests" + # temporarily disabled due to upstream changes in bpfman (previously bpfd) + # ref: https://github.com/kubernetes-sigs/blixt/issues/152 + # - name: run integration tests with bpfd + # run: make test.integration + # env: + # BLIXT_CONTROLPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-controlplane" + # BLIXT_DATAPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-dataplane" + # BLIXT_UDP_SERVER_IMAGE: "ghcr.io/kubernetes-sigs/blixt-udp-test-server" + # BLIXT_USE_BPFD: true + # TAG: "integration-tests" ## Upload diagnostics if integration test step failed. - name: upload diagnostics diff --git a/config/samples/tcproute/server.yaml b/config/samples/tcproute/server.yaml index cc0942f2..cebc923b 100644 --- a/config/samples/tcproute/server.yaml +++ b/config/samples/tcproute/server.yaml @@ -16,11 +16,9 @@ spec: spec: containers: - name: server - image: ghcr.io/shaneutt/malutki + image: istio/tcp-echo-server:1.1 imagePullPolicy: IfNotPresent - env: - - name: LISTEN_PORT - value: "8080" + args: [ "8080", "blixt-tcproute-sample:" ] ports: - containerPort: 8080 protocol: TCP diff --git a/config/tests/tcproute-rr/kustomization.yaml b/config/tests/tcproute-rr/kustomization.yaml new file mode 100644 index 00000000..7a75e9ec --- /dev/null +++ b/config/tests/tcproute-rr/kustomization.yaml @@ -0,0 +1,7 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +resources: +- ../../samples/tcproute +- server.yaml +patches: +- path: patch.yaml diff --git a/config/tests/tcproute-rr/patch.yaml b/config/tests/tcproute-rr/patch.yaml new file mode 100644 index 00000000..5dce38a4 --- /dev/null +++ b/config/tests/tcproute-rr/patch.yaml @@ -0,0 +1,17 @@ +apiVersion: gateway.networking.k8s.io/v1alpha2 +kind: TCPRoute +metadata: + name: blixt-tcproute-sample +spec: + parentRefs: + - name: blixt-tcproute-sample + port: 8080 + rules: + - backendRefs: + - name: blixt-tcproute-sample + port: 8080 + - backendRefs: + - name: tcproute-rr-v1 + port: 8080 + - name: tcproute-rr-v2 + port: 8080 diff --git a/config/tests/tcproute-rr/server.yaml b/config/tests/tcproute-rr/server.yaml new file mode 100644 index 00000000..5c323dd5 --- /dev/null +++ b/config/tests/tcproute-rr/server.yaml @@ -0,0 +1,76 @@ +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tcproute-rr-v1 + labels: + app: tcproute-rr-v1 +spec: + selector: + matchLabels: + app: tcproute-rr-v1 + template: + metadata: + labels: + app: tcproute-rr-v1 + spec: + containers: + - name: tcp-echo + image: istio/tcp-echo-server:1.1 + imagePullPolicy: IfNotPresent + args: [ "8080", "tcproute-rr-v1:" ] + ports: + - containerPort: 8080 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tcproute-rr-v2 + labels: + app: tcproute-rr-v2 +spec: + selector: + matchLabels: + app: tcproute-rr-v2 + template: + metadata: + labels: + app: tcproute-rr-v2 + spec: + containers: + - name: tcp-echo + image: istio/tcp-echo-server:1.1 + imagePullPolicy: IfNotPresent + args: [ "8080", "tcproute-rr-v2:" ] + ports: + - containerPort: 8080 +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: tcproute-rr-v1 + name: tcproute-rr-v1 +spec: + ports: + - name: tcp + port: 8080 + protocol: TCP + selector: + app: tcproute-rr-v1 + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: tcproute-rr-v2 + name: tcproute-rr-v2 +spec: + ports: + - name: tcp + port: 8080 + protocol: TCP + selector: + app: tcproute-rr-v2 + type: ClusterIP diff --git a/dataplane/api-server/src/backends.rs b/dataplane/api-server/src/backends.rs index 25b70150..5571964e 100644 --- a/dataplane/api-server/src/backends.rs +++ b/dataplane/api-server/src/backends.rs @@ -45,8 +45,8 @@ pub struct InterfaceIndexConfirmation { /// Generated client implementations. pub mod backends_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; #[derive(Debug, Clone)] pub struct BackendsClient { inner: tonic::client::Grpc, @@ -90,8 +90,9 @@ pub mod backends_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { BackendsClient::new(InterceptedService::new(inner, interceptor)) } @@ -129,16 +130,23 @@ pub mod backends_client { pub async fn get_interface_index( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/backends.backends/GetInterfaceIndex"); + let path = http::uri::PathAndQuery::from_static( + "/backends.backends/GetInterfaceIndex", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("backends.backends", "GetInterfaceIndex")); @@ -148,34 +156,38 @@ pub mod backends_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/backends.backends/Update"); let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("backends.backends", "Update")); + req.extensions_mut().insert(GrpcMethod::new("backends.backends", "Update")); self.inner.unary(req, path, codec).await } pub async fn delete( &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/backends.backends/Delete"); let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("backends.backends", "Delete")); + req.extensions_mut().insert(GrpcMethod::new("backends.backends", "Delete")); self.inner.unary(req, path, codec).await } } @@ -190,7 +202,10 @@ pub mod backends_server { async fn get_interface_index( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn update( &self, request: tonic::Request, @@ -223,7 +238,10 @@ pub mod backends_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -279,12 +297,21 @@ pub mod backends_server { "/backends.backends/GetInterfaceIndex" => { #[allow(non_camel_case_types)] struct GetInterfaceIndexSvc(pub Arc); - impl tonic::server::UnaryService for GetInterfaceIndexSvc { + impl tonic::server::UnaryService + for GetInterfaceIndexSvc { type Response = super::InterfaceIndexConfirmation; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { (*inner).get_interface_index(request).await }; + let fut = async move { + (*inner).get_interface_index(request).await + }; Box::pin(fut) } } @@ -314,9 +341,13 @@ pub mod backends_server { "/backends.backends/Update" => { #[allow(non_camel_case_types)] struct UpdateSvc(pub Arc); - impl tonic::server::UnaryService for UpdateSvc { + impl tonic::server::UnaryService + for UpdateSvc { type Response = super::Confirmation; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -352,10 +383,17 @@ pub mod backends_server { "/backends.backends/Delete" => { #[allow(non_camel_case_types)] struct DeleteSvc(pub Arc); - impl tonic::server::UnaryService for DeleteSvc { + impl tonic::server::UnaryService + for DeleteSvc { type Response = super::Confirmation; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { (*inner).delete(request).await }; Box::pin(fut) @@ -384,14 +422,18 @@ pub mod backends_server { }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/dataplane/api-server/src/lib.rs b/dataplane/api-server/src/lib.rs index 579913df..bcd51e86 100644 --- a/dataplane/api-server/src/lib.rs +++ b/dataplane/api-server/src/lib.rs @@ -15,15 +15,16 @@ use aya::maps::{HashMap, MapData}; use tonic::transport::Server; use backends::backends_server::BackendsServer; -use common::{BackendKey, BackendList}; +use common::{BackendKey, BackendList, ClientKey, TCPBackend}; pub async fn start( addr: Ipv4Addr, port: u16, backends_map: HashMap, gateway_indexes_map: HashMap, + tcp_conns_map: HashMap, ) -> Result<(), Error> { - let server = server::BackendService::new(backends_map, gateway_indexes_map); + let server = server::BackendService::new(backends_map, gateway_indexes_map, tcp_conns_map); // TODO: mTLS https://github.com/Kong/blixt/issues/50 Server::builder() .add_service(BackendsServer::new(server)) diff --git a/dataplane/api-server/src/server.rs b/dataplane/api-server/src/server.rs index 91d0bf53..3d898944 100644 --- a/dataplane/api-server/src/server.rs +++ b/dataplane/api-server/src/server.rs @@ -8,28 +8,31 @@ use std::net::Ipv4Addr; use std::sync::Arc; use anyhow::Error; -use aya::maps::{HashMap, MapData}; +use aya::maps::{HashMap, MapData, MapError}; use tokio::sync::Mutex; use tonic::{Request, Response, Status}; use crate::backends::backends_server::Backends; use crate::backends::{Confirmation, InterfaceIndexConfirmation, PodIp, Targets, Vip}; use crate::netutils::{if_name_for_routing_ip, if_nametoindex}; -use common::{Backend, BackendKey, BackendList, BACKENDS_ARRAY_CAPACITY}; +use common::{Backend, BackendKey, BackendList, ClientKey, TCPBackend, BACKENDS_ARRAY_CAPACITY}; pub struct BackendService { backends_map: Arc>>, gateway_indexes_map: Arc>>, + tcp_conns_map: Arc>>, } impl BackendService { pub fn new( backends_map: HashMap, gateway_indexes_map: HashMap, + tcp_conns_map: HashMap, ) -> BackendService { BackendService { backends_map: Arc::new(Mutex::new(backends_map)), gateway_indexes_map: Arc::new(Mutex::new(gateway_indexes_map)), + tcp_conns_map: Arc::new(Mutex::new(tcp_conns_map)), } } @@ -51,6 +54,35 @@ impl BackendService { backends_map.remove(&key)?; let mut gateway_indexes_map = self.gateway_indexes_map.lock().await; gateway_indexes_map.remove(&key)?; + + // Delete all entries in our tcp connection tracking map that this backend + // key was related to. This is needed because the TCPRoute might have been + // deleted with TCP connection(s) still open, so without the below logic + // they'll hang around forever. + // Its better to do this rather than maintain a reverse index because the index + // would need to be updated with each new connection. With remove being a less + // frequently used operation, the performance cost is less visible. + let mut tcp_conns_map = self.tcp_conns_map.lock().await; + for item in tcp_conns_map + .iter() + .collect::>>() + { + match item { + Ok(( + client_key, + TCPBackend { + backend: _, + backend_key, + state: _, + }, + )) => { + if backend_key == key { + tcp_conns_map.remove(&client_key)?; + }; + } + Err(err) => return Err(err.into()), + }; + } Ok(()) } } @@ -144,9 +176,10 @@ impl Backends for BackendService { match self.insert_and_reset_index(key, backend_list).await { Ok(_) => Ok(Response::new(Confirmation { confirmation: format!( - "success, vip {}:{} was updated", + "success, vip {}:{} was updated with {} backends", Ipv4Addr::from(vip.ip), - vip.port + vip.port, + count, ), })), Err(err) => Err(Status::internal(format!("failure: {}", err))), diff --git a/dataplane/common/src/lib.rs b/dataplane/common/src/lib.rs index 10fd7b7e..1da5155d 100644 --- a/dataplane/common/src/lib.rs +++ b/dataplane/common/src/lib.rs @@ -20,7 +20,7 @@ pub struct Backend { #[cfg(feature = "user")] unsafe impl aya::Pod for Backend {} -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] #[repr(C)] pub struct BackendKey { pub ip: u32, @@ -40,3 +40,41 @@ pub struct BackendList { #[cfg(feature = "user")] unsafe impl aya::Pod for BackendList {} + +#[derive(Copy, Clone, Debug)] +#[repr(C)] +pub struct ClientKey { + pub ip: u32, + pub port: u32, +} + +#[cfg(feature = "user")] +unsafe impl aya::Pod for ClientKey {} + +// TCPState contains variants that represent the current phase of the TCP connection at a point in +// time during the connection's termination. +#[derive(Copy, Clone, Debug, Default)] +#[repr(C)] +pub enum TCPState { + #[default] + Established, + FinWait1, + FinWait2, + Closing, + TimeWait, + Closed, +} + +#[cfg(feature = "user")] +unsafe impl aya::Pod for TCPState {} + +#[derive(Copy, Clone, Debug)] +#[repr(C)] +pub struct TCPBackend { + pub backend: Backend, + pub backend_key: BackendKey, + pub state: TCPState, +} + +#[cfg(feature = "user")] +unsafe impl aya::Pod for TCPBackend {} diff --git a/dataplane/ebpf/src/egress/tcp.rs b/dataplane/ebpf/src/egress/tcp.rs index 89829329..48358ab8 100644 --- a/dataplane/ebpf/src/egress/tcp.rs +++ b/dataplane/ebpf/src/egress/tcp.rs @@ -12,11 +12,12 @@ use aya_bpf::{ programs::TcContext, }; use aya_log_ebpf::info; +use common::ClientKey; use network_types::{eth::EthHdr, ip::Ipv4Hdr, tcp::TcpHdr}; use crate::{ - utils::{csum_fold_helper, ptr_at}, - BLIXT_CONNTRACK, + utils::{csum_fold_helper, ptr_at, update_tcp_conns}, + TCP_CONNECTIONS, }; pub fn handle_tcp_egress(ctx: TcContext) -> Result { @@ -29,25 +30,30 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result { // capture some IP and port information let client_addr = unsafe { (*ip_hdr).dst_addr }; - let dest_port = unsafe { (*tcp_hdr).dest.to_be() }; - let ip_port_tuple = unsafe { BLIXT_CONNTRACK.get(&client_addr) }.ok_or(TC_ACT_PIPE)?; - - // verify traffic destination - if ip_port_tuple.1 as u16 != dest_port { - return Ok(TC_ACT_PIPE); - } + let dest_port = unsafe { (*tcp_hdr).dest }; + // The source identifier + let client_key = ClientKey { + ip: u32::from_be(client_addr), + port: u16::from_be(dest_port) as u32, + }; + let tcp_backend = unsafe { TCP_CONNECTIONS.get(&client_key) }.ok_or(TC_ACT_PIPE)?; info!( &ctx, - "Received TCP packet destined for tracked IP {:i}:{} setting source IP to VIP {:i}", + "Received TCP packet destined for tracked IP {:i}:{} setting source IP to VIP {:i}:{}", u32::from_be(client_addr), - ip_port_tuple.1 as u16, - u32::from_be(ip_port_tuple.0), + u16::from_be(dest_port), + tcp_backend.backend_key.ip, + tcp_backend.backend_key.port, ); + // TODO: connection tracking cleanup https://github.com/kubernetes-sigs/blixt/issues/85 + // SNAT the ip address unsafe { - (*ip_hdr).src_addr = ip_port_tuple.0; + (*ip_hdr).src_addr = tcp_backend.backend_key.ip.to_be(); }; + // SNAT the port + unsafe { (*tcp_hdr).source = u16::from_be(tcp_backend.backend_key.port as u16) }; if (ctx.data() + EthHdr::LEN + Ipv4Hdr::LEN) > ctx.data_end() { info!(&ctx, "Iphdr is out of bounds"); @@ -67,7 +73,18 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result { unsafe { (*ip_hdr).check = csum_fold_helper(full_cksum) }; unsafe { (*tcp_hdr).check = 0 }; - // TODO: connection tracking cleanup https://github.com/kubernetes-sigs/blixt/issues/85 + let tcp_hdr_ref = unsafe { tcp_hdr.as_ref().ok_or(TC_ACT_OK)? }; + + // If the packet has the RST flag set, it means the connection is being terminated, so remove it + // from our map. + if tcp_hdr_ref.rst() == 1 { + unsafe { + TCP_CONNECTIONS.remove(&client_key)?; + } + } + + let mut tcp_bk = *tcp_backend; + update_tcp_conns(tcp_hdr_ref, &client_key, &mut tcp_bk)?; Ok(TC_ACT_PIPE) } diff --git a/dataplane/ebpf/src/ingress/tcp.rs b/dataplane/ebpf/src/ingress/tcp.rs index 3fb56c5b..c6464d97 100644 --- a/dataplane/ebpf/src/ingress/tcp.rs +++ b/dataplane/ebpf/src/ingress/tcp.rs @@ -11,14 +11,14 @@ use aya_bpf::{ helpers::{bpf_csum_diff, bpf_redirect_neigh}, programs::TcContext, }; -use aya_log_ebpf::info; +use aya_log_ebpf::{debug, info}; use network_types::{eth::EthHdr, ip::Ipv4Hdr, tcp::TcpHdr}; use crate::{ - utils::{csum_fold_helper, ptr_at}, - BACKENDS, BLIXT_CONNTRACK, + utils::{csum_fold_helper, ptr_at, update_tcp_conns}, + BACKENDS, GATEWAY_INDEXES, TCP_CONNECTIONS, }; -use common::BackendKey; +use common::{Backend, BackendKey, ClientKey, TCPBackend, TCPState, BACKENDS_ARRAY_CAPACITY}; pub fn handle_tcp_ingress(ctx: TcContext) -> Result { let ip_hdr: *mut Ipv4Hdr = unsafe { ptr_at(&ctx, EthHdr::LEN)? }; @@ -29,14 +29,64 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result { let original_daddr = unsafe { (*ip_hdr).dst_addr }; - let key = BackendKey { - ip: u32::from_be(original_daddr), - port: (u16::from_be(unsafe { (*tcp_hdr).dest })) as u32, + // The source identifier + let client_key = ClientKey { + ip: u32::from_be(unsafe { (*ip_hdr).src_addr }), + port: (u16::from_be(unsafe { (*tcp_hdr).source })) as u32, }; - let backend_list = unsafe { BACKENDS.get(&key) }.ok_or(TC_ACT_OK)?; - // Only a single backend is supported for TCP connections. - // TODO(aryan9600): Add support for multiple backends (https://github.com/kubernetes-sigs/blixt/issues/119) - let backend = backend_list.backends[0]; + // The backend that is responsible for handling this TCP connection. + let mut backend: Backend; + // The Gateway that the TCP connections is forwarded from. + let backend_key: BackendKey; + // Flag to check whether this is a new connection. + let mut new_conn = false; + // The state of this TCP connection. + let mut tcp_state = TCPState::default(); + + // Try to find the backend previously used for this connection. If not found, it means that + // this is a new connection, so assign it the next backend in line. + if let Some(val) = unsafe { TCP_CONNECTIONS.get(&client_key) } { + backend = val.backend; + tcp_state = val.state; + backend_key = val.backend_key; + } else { + new_conn = true; + + backend_key = BackendKey { + ip: u32::from_be(original_daddr), + port: (u16::from_be(unsafe { (*tcp_hdr).dest })) as u32, + }; + let backend_list = unsafe { BACKENDS.get(&backend_key) }.ok_or(TC_ACT_OK)?; + let backend_index = unsafe { GATEWAY_INDEXES.get(&backend_key) }.ok_or(TC_ACT_OK)?; + + debug!(&ctx, "Destination backend index: {}", *backend_index); + debug!(&ctx, "Backends length: {}", backend_list.backends_len); + + // this check asserts that we don't use a "zero-value" Backend + if backend_list.backends_len <= *backend_index { + return Ok(TC_ACT_OK); + } + // the bpf verifier is aware of variables that are used as an index for + // an array and requires that we check the array boundaries against + // the index to ensure our access is in-bounds. + if *backend_index as usize >= BACKENDS_ARRAY_CAPACITY { + return Ok(TC_ACT_OK); + } + + backend = backend_list.backends[0]; + if let Some(val) = backend_list.backends.get(*backend_index as usize) { + backend = *val; + } + + // move the index to the next backend in our list + let mut next = *backend_index + 1; + if next >= backend_list.backends_len { + next = 0; + } + unsafe { + GATEWAY_INDEXES.insert(&backend_key, &next, 0_u64)?; + } + } info!( &ctx, @@ -45,9 +95,12 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result { u16::from_be(unsafe { (*tcp_hdr).dest }) ); + // DNAT the ip address unsafe { (*ip_hdr).dst_addr = backend.daddr.to_be(); } + // DNAT the port + unsafe { (*tcp_hdr).dest = (backend.dport as u16).to_be() }; if (ctx.data() + EthHdr::LEN + Ipv4Hdr::LEN) > ctx.data_end() { info!(&ctx, "Iphdr is out of bounds"); @@ -67,9 +120,6 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result { ) } as u64; unsafe { (*ip_hdr).check = csum_fold_helper(full_cksum) }; - - // Update destination port - unsafe { (*tcp_hdr).dest = (backend.dport as u16).to_be() }; // FIXME unsafe { (*tcp_hdr).check = 0 }; @@ -82,15 +132,35 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result { ) }; - unsafe { - BLIXT_CONNTRACK.insert( - &(*ip_hdr).src_addr, - &(original_daddr, (*tcp_hdr).source.to_be() as u32), - 0 as u64, - )?; + let mut tcp_backend = TCPBackend { + backend, + backend_key, + state: tcp_state, }; - info!(&ctx, "redirect action: {}", action); + // If the connection is new, then record it in our map for future tracking. + if new_conn { + unsafe { + TCP_CONNECTIONS.insert(&client_key, &tcp_backend, 0_u64)?; + } + // since this is a new connection, there is nothing else to do, so exit early + info!(&ctx, "redirect action: {}", action); + return Ok(action as i32); + } + + let tcp_hdr_ref = unsafe { tcp_hdr.as_ref().ok_or(TC_ACT_OK)? }; + + // If the packet has the RST flag set, it means the connection is being terminated, so remove it + // from our map. + if tcp_hdr_ref.rst() == 1 { + unsafe { + TCP_CONNECTIONS.remove(&client_key)?; + } + } + + update_tcp_conns(tcp_hdr_ref, &client_key, &mut tcp_backend)?; + + info!(&ctx, "redirect action: {}", action); Ok(action as i32) } diff --git a/dataplane/ebpf/src/main.rs b/dataplane/ebpf/src/main.rs index 4e050d9b..cbcdfea2 100644 --- a/dataplane/ebpf/src/main.rs +++ b/dataplane/ebpf/src/main.rs @@ -22,7 +22,7 @@ use aya_bpf::{ programs::TcContext, }; -use common::{BackendKey, BackendList, BPF_MAPS_CAPACITY}; +use common::{BackendKey, BackendList, ClientKey, TCPBackend, BPF_MAPS_CAPACITY}; use egress::{icmp::handle_icmp_egress, tcp::handle_tcp_egress}; use ingress::{tcp::handle_tcp_ingress, udp::handle_udp_ingress}; @@ -48,6 +48,10 @@ static mut GATEWAY_INDEXES: HashMap = static mut BLIXT_CONNTRACK: HashMap = HashMap::::with_max_entries(BPF_MAPS_CAPACITY, 0); +#[map(name = "TCP_CONNECTIONS")] +static mut TCP_CONNECTIONS: HashMap = + HashMap::::with_max_entries(128, 0); + // ----------------------------------------------------------------------------- // Ingress // ----------------------------------------------------------------------------- diff --git a/dataplane/ebpf/src/utils.rs b/dataplane/ebpf/src/utils.rs index 4cc8aaf0..47e9af82 100644 --- a/dataplane/ebpf/src/utils.rs +++ b/dataplane/ebpf/src/utils.rs @@ -4,8 +4,12 @@ Copyright 2023 The Kubernetes Authors. SPDX-License-Identifier: (GPL-2.0-only OR BSD-2-Clause) */ -use core::mem; use aya_bpf::{bindings::TC_ACT_OK, programs::TcContext}; +use core::mem; +use network_types::tcp::TcpHdr; + +use crate::TCP_CONNECTIONS; +use common::{ClientKey, TCPBackend, TCPState}; // ----------------------------------------------------------------------------- // Helper Functions @@ -34,3 +38,87 @@ pub fn csum_fold_helper(mut csum: u64) -> u16 { } return !(csum as u16); } + +// Updates the TCP connection's state based on the current phase and the incoming packet's header. +// It returns true if the state transitioned to a different phase. +// Ref: https://en.wikipedia.org/wiki/File:Tcp_state_diagram.png and +// http://www.tcpipguide.com/free/t_TCPConnectionTermination-2.htm +#[inline(always)] +pub fn process_tcp_state_transition(hdr: &TcpHdr, state: &mut TCPState) -> bool { + let fin = hdr.fin() == 1; + let ack = hdr.ack() == 1; + match state { + TCPState::Established => { + // At the Established state, a FIN packet moves the state to FinWait1. + if fin { + *state = TCPState::FinWait1; + return true; + } + } + TCPState::FinWait1 => { + // At the FinWait1 state, a packet with both the FIN and ACK bits set + // moves the state to TimeWait. + if fin && ack { + *state = TCPState::TimeWait; + return true; + } + // At the FinWait1 state, a FIN packet moves the state to Closing. + if fin { + *state = TCPState::Closing; + return true; + } + // At the FinWait1 state, an ACK packet moves the state to FinWait2. + if ack { + *state = TCPState::FinWait2; + return true; + } + } + TCPState::FinWait2 => { + // At the FinWait2 state, an ACK packet moves the state to TimeWait. + if ack { + *state = TCPState::TimeWait; + return true; + } + } + TCPState::Closing => { + // At the Closing state, an ACK packet moves the state to TimeWait. + if ack { + *state = TCPState::TimeWait; + return true; + } + } + TCPState::TimeWait => { + if ack { + *state = TCPState::Closed; + return true; + } + } + TCPState::Closed => {} + } + return false; +} + +// Modifies the map tracking TCP connections based on the current state +// of the TCP connection and the incoming TCP packet's header. +#[inline(always)] +pub fn update_tcp_conns( + hdr: &TcpHdr, + client_key: &ClientKey, + tcp_backend: &mut TCPBackend, +) -> Result<(), i64> { + let transitioned = process_tcp_state_transition(hdr, &mut tcp_backend.state); + if let TCPState::Closed = tcp_backend.state { + unsafe { + return TCP_CONNECTIONS.remove(&client_key); + } + } + // If the connection has not reached the Closed state yet, but it did transition to a new state, + // then record the new state. + if transitioned { + unsafe { + return TCP_CONNECTIONS.insert(&client_key, &tcp_backend, 0_u64); + } + } + + Ok(()) +} diff --git a/dataplane/loader/src/main.rs b/dataplane/loader/src/main.rs index 88db3e08..f62fc4be 100644 --- a/dataplane/loader/src/main.rs +++ b/dataplane/loader/src/main.rs @@ -13,7 +13,7 @@ use aya::programs::{tc, SchedClassifier, TcAttachType}; use aya::{include_bytes_aligned, Bpf}; use aya_log::BpfLogger; use clap::Parser; -use common::{BackendKey, BackendList}; +use common::{BackendKey, BackendList, ClientKey, TCPBackend}; use log::{info, warn}; #[derive(Debug, Parser)] @@ -46,9 +46,21 @@ async fn main() -> Result<(), anyhow::Error> { .expect("no maps named GATEWAY_INDEXES"), ) .try_into()?; + let tcp_conns: HashMap<_, ClientKey, TCPBackend> = Map::HashMap( + MapData::from_pin(bpfd_maps.join("TCP_CONNECTIONS")) + .expect("no maps named TCP_CONNECTIONS"), + ) + .try_into()?; info!("starting api server"); - start_api_server(Ipv4Addr::new(0, 0, 0, 0), 9874, backends, gateway_indexes).await?; + start_api_server( + Ipv4Addr::new(0, 0, 0, 0), + 9874, + backends, + gateway_indexes, + tcp_conns, + ) + .await?; } else { info!("loading ebpf programs"); @@ -90,8 +102,19 @@ async fn main() -> Result<(), anyhow::Error> { bpf.take_map("GATEWAY_INDEXES") .expect("no maps named GATEWAY_INDEXES"), )?; + let tcp_conns: HashMap<_, ClientKey, TCPBackend> = HashMap::try_from( + bpf.take_map("TCP_CONNECTIONS") + .expect("no maps named TCP_CONNECTIONS"), + )?; - start_api_server(Ipv4Addr::new(0, 0, 0, 0), 9874, backends, gateway_indexes).await?; + start_api_server( + Ipv4Addr::new(0, 0, 0, 0), + 9874, + backends, + gateway_indexes, + tcp_conns, + ) + .await?; } info!("Exiting..."); diff --git a/internal/dataplane/client/utils.go b/internal/dataplane/client/utils.go index 8977beeb..8eef5c1a 100644 --- a/internal/dataplane/client/utils.go +++ b/internal/dataplane/client/utils.go @@ -31,7 +31,6 @@ import ( // CompileUDPRouteToDataPlaneBackend takes a UDPRoute and the Gateway it is // attached to and produces Backend Targets for the DataPlane to configure. func CompileUDPRouteToDataPlaneBackend(ctx context.Context, c client.Client, udproute *gatewayv1alpha2.UDPRoute, gateway *gatewayv1beta1.Gateway) (*Targets, error) { - gatewayIP, err := GetGatewayIP(gateway) if gatewayIP == nil { return nil, err @@ -98,19 +97,8 @@ func CompileUDPRouteToDataPlaneBackend(ctx context.Context, c client.Client, udp // CompileTCPRouteToDataPlaneBackend takes a TCPRoute and the Gateway it is // attached to and produces Backend Targets for the DataPlane to configure. -func CompileTCPRouteToDataPlaneBackend(ctx context.Context, c client.Client, tcproute *gatewayv1alpha2.TCPRoute, gateway *gatewayv1beta1.Gateway) (*Targets, error) { - // TODO: add support for multiple rules https://github.com/Kong/blixt/issues/10 - if len(tcproute.Spec.Rules) != 1 { - return nil, fmt.Errorf("currently can only support 1 TCPRoute rule, received %d", len(tcproute.Spec.Rules)) - } - rule := tcproute.Spec.Rules[0] - - // TODO: add support for multiple rules https://github.com/Kong/blixt/issues/10 - if len(rule.BackendRefs) != 1 { - return nil, fmt.Errorf("expect 1 backendRef received %d", len(rule.BackendRefs)) - } - backendRef := rule.BackendRefs[0] - +func CompileTCPRouteToDataPlaneBackend(ctx context.Context, c client.Client, + tcproute *gatewayv1alpha2.TCPRoute, gateway *gatewayv1beta1.Gateway) (*Targets, error) { gatewayIP, err := GetGatewayIP(gateway) if gatewayIP == nil { return nil, err @@ -120,42 +108,49 @@ func CompileTCPRouteToDataPlaneBackend(ctx context.Context, c client.Client, tcp if err != nil { return nil, err } - - // TODO only using one endpoint for now until https://github.com/Kong/blixt/issues/10 - var target *Target - if tcproute.DeletionTimestamp == nil { - endpoints, err := endpointsFromBackendRef(ctx, c, tcproute.Namespace, backendRef) - if err != nil { - return nil, err - } - - for _, subset := range endpoints.Subsets { - if len(subset.Addresses) < 1 { - return nil, fmt.Errorf("addresses not ready for endpoints") - } - if len(subset.Ports) < 1 { - return nil, fmt.Errorf("ports not ready for endpoints") + var backendTargets []*Target + for _, rule := range tcproute.Spec.Rules { + for _, backendRef := range rule.BackendRefs { + endpoints, err := endpointsFromBackendRef(ctx, c, tcproute.Namespace, backendRef) + if err != nil { + return nil, err } - if subset.Addresses[0].IP == "" { - return nil, fmt.Errorf("empty IP for endpoint subset") + if len(endpoints.Subsets) < 1 { + return nil, fmt.Errorf("endpoint has no subsets") } + for _, subset := range endpoints.Subsets { + if len(subset.Addresses) < 1 { + return nil, fmt.Errorf("addresses not ready for endpoints") + } + if len(subset.Ports) < 1 { + return nil, fmt.Errorf("ports not ready for endpoints") + } - ip := net.ParseIP(subset.Addresses[0].IP) - podip := binary.BigEndian.Uint32(ip.To4()) - podPort, err := getBackendPort(ctx, c, tcproute.Namespace, backendRef, subset.Ports) - if err != nil { - return nil, err - } + for _, addr := range subset.Addresses { + if addr.IP == "" { + return nil, fmt.Errorf("empty IP for endpoint subset") + } - target = &Target{ - Daddr: podip, - Dport: uint32(podPort), + ip := net.ParseIP(addr.IP) + podip := binary.BigEndian.Uint32(ip.To4()) + podPort, err := getBackendPort(ctx, c, tcproute.Namespace, backendRef, subset.Ports) + if err != nil { + return nil, err + } + + target := &Target{ + Daddr: podip, + Dport: uint32(podPort), + } + backendTargets = append(backendTargets, target) + } } } - if target == nil { - return nil, fmt.Errorf("endpoints not ready") - } + } + + if len(backendTargets) == 0 { + return nil, fmt.Errorf("no healthy backends") } ipint := binary.BigEndian.Uint32(gatewayIP.To4()) @@ -165,8 +160,7 @@ func CompileTCPRouteToDataPlaneBackend(ctx context.Context, c client.Client, tcp Ip: ipint, Port: gatewayPort, }, - // TODO(aryan9600): Add support for multiple targets (https://github.com/kubernetes-sigs/blixt/issues/119) - Targets: []*Target{target}, + Targets: backendTargets, } return targets, nil diff --git a/test/integration/tcproute_test.go b/test/integration/tcproute_test.go index 0ac88143..3e05fc91 100644 --- a/test/integration/tcproute_test.go +++ b/test/integration/tcproute_test.go @@ -20,9 +20,10 @@ limitations under the License. package integration import ( + "bufio" "context" "fmt" - "net/http" + "net" "strings" "testing" "time" @@ -38,9 +39,12 @@ import ( const ( tcprouteSampleKustomize = "../../config/tests/tcproute" + tcprouteRRKustomize = "../../config/tests/tcproute-rr" tcprouteSampleName = "blixt-tcproute-sample" ) +var tcpServerNames = []string{"blixt-tcproute-sample", "tcproute-rr-v1", "tcproute-rr-v2"} + func TestTCPRouteBasics(t *testing.T) { tcpRouteBasicsCleanupKey := "tcproutebasics" defer func() { @@ -69,38 +73,184 @@ func TestTCPRouteBasics(t *testing.T) { require.Equal(t, gatewayv1beta1.IPAddressType, *gw.Status.Addresses[0].Type) gwaddr := fmt.Sprintf("%s:8080", gw.Status.Addresses[0].Value) - t.Log("waiting for HTTP server to be available") + t.Log("waiting for TCP server to be available") require.Eventually(t, func() bool { server, err := env.Cluster().Client().AppsV1().Deployments(corev1.NamespaceDefault).Get(ctx, tcprouteSampleName, metav1.GetOptions{}) require.NoError(t, err) return server.Status.AvailableReplicas > 0 }, time.Minute, time.Second) - t.Log("verifying HTTP connectivity to the server") - httpc := http.Client{Timeout: time.Second * 30} + t.Log("verifying TCP connectivity to the server") + var conn net.Conn require.Eventually(t, func() bool { - resp, err := httpc.Get(fmt.Sprintf("http://%s/status/%d", gwaddr, http.StatusTeapot)) + var err error + conn, err = net.Dial("tcp", gwaddr) if err != nil { - t.Logf("received error checking HTTP server: [%s], retrying...", err) + t.Logf("received error connecting to TCP server: [%s], retrying...", err) return false } - defer resp.Body.Close() - return resp.StatusCode == http.StatusTeapot + return true }, time.Minute*5, time.Second) - t.Log("deleting the TCPRoute and verifying that HTTP traffic stops") + response := writeAndReadTCP(t, conn) + require.Contains(t, response, tcpServerNames[0]) + + t.Log("deleting the TCPRoute and verifying that TCP connection is closed") require.NoError(t, gwclient.GatewayV1alpha2().TCPRoutes(corev1.NamespaceDefault).Delete(ctx, tcprouteSampleName, metav1.DeleteOptions{})) - httpc = http.Client{Timeout: time.Second * 3} require.Eventually(t, func() bool { - resp, err := httpc.Get(fmt.Sprintf("http://%s/status/%d", gwaddr, http.StatusTeapot)) + _, err := conn.Write([]byte("blahhh\n")) + require.NoError(t, err) + + err = conn.SetReadDeadline(time.Now().Add(time.Second * 3)) + require.NoError(t, err) + reader := bufio.NewReader(conn) + _, err = reader.ReadBytes(byte('\n')) if err != nil { - if strings.Contains(err.Error(), "context deadline exceeded") { + if strings.Contains(err.Error(), "i/o timeout") { return true } t.Logf("received unexpected error waiting for TCPRoute to decomission: %s", err) return false } - defer resp.Body.Close() return false }, time.Minute, time.Second) } + +func TestTCPRouteRoundRobin(t *testing.T) { + tcpRouteRRCleanupKey := "tcprouterr" + defer func() { + testutils.DumpDiagnosticsIfFailed(ctx, t, env.Cluster()) + if err := runCleanup(tcpRouteRRCleanupKey); err != nil { + t.Errorf("cleanup failed: %s", err) + } + }() + + t.Log("deploying config/samples/tcproute-rr kustomize") + require.NoError(t, clusters.KustomizeDeployForCluster(ctx, env.Cluster(), tcprouteRRKustomize)) + addCleanup(tcpRouteRRCleanupKey, func(ctx context.Context) error { + cleanupLog("cleaning up config/samples/tcproute-rr kustomize") + return clusters.KustomizeDeleteForCluster(ctx, env.Cluster(), tcprouteRRKustomize, "--ignore-not-found=true") + }) + + t.Log("waiting for Gateway to have an address") + var gw *gatewayv1beta1.Gateway + require.Eventually(t, func() bool { + var err error + gw, err = gwclient.GatewayV1beta1().Gateways(corev1.NamespaceDefault).Get(ctx, tcprouteSampleName, metav1.GetOptions{}) + require.NoError(t, err) + return len(gw.Status.Addresses) > 0 + }, time.Minute, time.Second) + require.NotNil(t, gw.Status.Addresses[0].Type) + require.Equal(t, gatewayv1beta1.IPAddressType, *gw.Status.Addresses[0].Type) + gwaddr := fmt.Sprintf("%s:8080", gw.Status.Addresses[0].Value) + + t.Log("waiting for TCP servers to be available") + labelSelector := metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "app", + Operator: metav1.LabelSelectorOpIn, + Values: tcpServerNames, + }, + }, + } + require.Eventually(t, func() bool { + servers, err := env.Cluster().Client().AppsV1().Deployments(corev1.NamespaceDefault).List(ctx, metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(&labelSelector), + }) + require.NoError(t, err) + for _, server := range servers.Items { + if server.Status.AvailableReplicas <= 0 { + return false + } + } + return true + }, time.Minute, time.Second) + + t.Log("verifying TCP connectivity to the servers") + // We create three TCP connections, one for each backend. + var conn1 net.Conn + require.Eventually(t, func() bool { + var err error + conn1, err = net.Dial("tcp", gwaddr) + if err != nil { + t.Logf("received error connecting to TCP server: [%s], retrying...", err) + return false + } + return true + }, time.Minute*5, time.Second) + conn2, err := net.Dial("tcp", gwaddr) + require.NoError(t, err) + conn3, err := net.Dial("tcp", gwaddr) + require.NoError(t, err) + conns := []net.Conn{conn1, conn2, conn3} + + // Run it twice to verify that we load balance in a round-robin fashion. + for c := 0; c < 2; c++ { + // We can't do names := tcpServerNames because we overwrite this in the loop later. + var names []string + names = append(names, tcpServerNames...) + + for _, conn := range conns { + response := writeAndReadTCP(t, conn) + split := strings.Split(response, ":") + require.Len(t, split, 2) + name := split[0] + var removed bool + names, removed = removeName(names, name) + // If no name was removed from the list, it means that the response + // does not contain the name of a known server. + if !removed { + t.Fatalf("received unexpected response from backend: %s", name) + } + } + require.Len(t, names, 0) + } + + t.Log("deleting the TCPRoute and verifying that all TCP connections are closed") + require.NoError(t, gwclient.GatewayV1alpha2().TCPRoutes(corev1.NamespaceDefault).Delete(ctx, tcprouteSampleName, metav1.DeleteOptions{})) + require.Eventually(t, func() bool { + for _, conn := range conns { + _, err := conn.Write([]byte("blahhh\n")) + require.NoError(t, err) + err = conn.SetReadDeadline(time.Now().Add(time.Second * 3)) + require.NoError(t, err) + + reader := bufio.NewReader(conn) + _, err = reader.ReadBytes(byte('\n')) + if err != nil { + if strings.Contains(err.Error(), "i/o timeout") { + continue + } + t.Logf("received unexpected error waiting for TCPRoute to decomission: %s", err) + } + return false + } + return true + }, time.Minute, time.Second) +} + +func removeName(names []string, name string) ([]string, bool) { + for i, v := range names { + if v == name { + names = append(names[:i], names[i+1:]...) + return names, true + } + } + return nil, false +} + +func writeAndReadTCP(t *testing.T, conn net.Conn) string { + t.Helper() + + t.Logf("writing data to TCP connection with backend %s", conn.RemoteAddr().String()) + request := "wazzzaaaa" + _, err := conn.Write([]byte(request + "\n")) + require.NoError(t, err) + + t.Logf("reading data from TCP connection with backend %s", conn.RemoteAddr().String()) + reader := bufio.NewReader(conn) + response, err := reader.ReadBytes(byte('\n')) + require.NoError(t, err) + return string(response) +}