From 3aa4e4f379b679d6abfc2c291621acb83443e5e5 Mon Sep 17 00:00:00 2001 From: Steven Jin Xuan Date: Wed, 2 Aug 2023 00:50:01 +0000 Subject: [PATCH 1/3] HBONE connections benches Signed-off-by: Steven Jin Xuan --- benches/throughput.rs | 189 ++++++++++++++++++++++++++++++++++------ examples/localhost.yaml | 2 +- src/test_helpers/app.rs | 6 +- tests/direct.rs | 21 +++-- 4 files changed, 181 insertions(+), 37 deletions(-) diff --git a/benches/throughput.rs b/benches/throughput.rs index 51095f2153..acb2fd8807 100644 --- a/benches/throughput.rs +++ b/benches/throughput.rs @@ -12,16 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering::{Equal, Greater, Less}; use std::env; -use std::net::SocketAddr; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use std::time::Duration; +use bytes::BufMut; use criterion::{ criterion_group, criterion_main, BenchmarkId, Criterion, SamplingMode, Throughput, }; use pprof::criterion::{Output, PProfProfiler}; use prometheus_client::registry::Registry; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::runtime::Runtime; use tokio::sync::Mutex; @@ -29,16 +32,20 @@ use tracing::info; use ztunnel::metrics::IncrementRecorder; use ztunnel::rbac::{Authorization, RbacMatch, StringMatch}; +use ztunnel::state::workload::{Protocol, Workload}; use ztunnel::test_helpers::app::TestApp; use ztunnel::test_helpers::tcp::Mode; use ztunnel::test_helpers::TEST_WORKLOAD_HBONE; use ztunnel::test_helpers::TEST_WORKLOAD_SOURCE; use ztunnel::test_helpers::TEST_WORKLOAD_TCP; use ztunnel::test_helpers::{helpers, tcp}; +use ztunnel::xds::LocalWorkload; use ztunnel::{app, identity, metrics, proxy, test_helpers}; const KB: usize = 1024; const MB: usize = 1024 * KB; +// Must be less than or equal to 254 +const MAX_HBONE_WORKLOADS: u8 = 64; struct TestEnv { ta: TestApp, @@ -132,16 +139,16 @@ fn initialize_environment( let _ = tokio::join!(app.wait_termination(), echo.run()); }); let mut hbone = ta - .socks5_connect(helpers::with_ip( - echo_addr, - TEST_WORKLOAD_HBONE.parse().unwrap(), - )) + .socks5_connect( + helpers::with_ip(echo_addr, TEST_WORKLOAD_HBONE.parse().unwrap()), + TEST_WORKLOAD_SOURCE.parse().unwrap(), + ) .await; let mut tcp = ta - .socks5_connect(helpers::with_ip( - echo_addr, - TEST_WORKLOAD_TCP.parse().unwrap(), - )) + .socks5_connect( + helpers::with_ip(echo_addr, TEST_WORKLOAD_TCP.parse().unwrap()), + TEST_WORKLOAD_SOURCE.parse().unwrap(), + ) .await; let mut direct = TcpStream::connect(echo_addr).await.unwrap(); direct.set_nodelay(true).unwrap(); @@ -286,24 +293,23 @@ pub fn connections(c: &mut Criterion) { b.to_async(&rt).iter(|| async { let e = env.lock().await; let mut s = - e.ta.socks5_connect(helpers::with_ip( - e.echo_addr, - TEST_WORKLOAD_TCP.parse().unwrap(), - )) + e.ta.socks5_connect( + helpers::with_ip(e.echo_addr, TEST_WORKLOAD_TCP.parse().unwrap()), + TEST_WORKLOAD_SOURCE.parse().unwrap(), + ) .await; tcp::run_client(&mut s, 1, Mode::ReadWrite).await }) }); - // TODO(https://github.com/istio/ztunnel/issues/15): when we have pooling, split this into "new hbone connection" - // and "new connection on existing HBONE connection" + // This tests connection time over an existing HBONE connection. c.bench_function("hbone", |b| { b.to_async(&rt).iter(|| async { let e = env.lock().await; let mut s = - e.ta.socks5_connect(helpers::with_ip( - e.echo_addr, - TEST_WORKLOAD_HBONE.parse().unwrap(), - )) + e.ta.socks5_connect( + helpers::with_ip(e.echo_addr, TEST_WORKLOAD_HBONE.parse().unwrap()), + TEST_WORKLOAD_SOURCE.parse().unwrap(), + ) .await; tcp::run_client(&mut s, 1, Mode::ReadWrite).await }) @@ -325,10 +331,10 @@ pub fn rbac_connections(c: &mut Criterion) { b.to_async(&rt).iter(|| async { let e = env.lock().await; let mut s = - e.ta.socks5_connect(helpers::with_ip( - e.echo_addr, - TEST_WORKLOAD_TCP.parse().unwrap(), - )) + e.ta.socks5_connect( + helpers::with_ip(e.echo_addr, TEST_WORKLOAD_TCP.parse().unwrap()), + TEST_WORKLOAD_SOURCE.parse().unwrap(), + ) .await; tcp::run_client(&mut s, 1, Mode::ReadWrite).await }) @@ -339,10 +345,10 @@ pub fn rbac_connections(c: &mut Criterion) { b.to_async(&rt).iter(|| async { let e = env.lock().await; let mut s = - e.ta.socks5_connect(helpers::with_ip( - e.echo_addr, - TEST_WORKLOAD_HBONE.parse().unwrap(), - )) + e.ta.socks5_connect( + helpers::with_ip(e.echo_addr, TEST_WORKLOAD_HBONE.parse().unwrap()), + TEST_WORKLOAD_SOURCE.parse().unwrap(), + ) .await; tcp::run_client(&mut s, 1, Mode::ReadWrite).await }) @@ -374,12 +380,139 @@ pub fn metrics(c: &mut Criterion) { }); } +/// Iterate through possible IP pairs restricted to 0 < ip_pair.0 < ip_pair.1 <= MAX_HBONE_WORKLOADS. +fn next_ip_pair(ip_pair: (u8, u8)) -> (u8, u8) { + if ip_pair.0 == 0 || ip_pair.1 == 0 { + panic!("Invalid host"); + } + match ( + Ord::cmp(&ip_pair.0, &(MAX_HBONE_WORKLOADS - 1)), + Ord::cmp(&ip_pair.1, &MAX_HBONE_WORKLOADS), + ) { + (Greater, _) | (_, Greater) | (Equal, Equal) => panic!("Invalid host"), + (_, Less) => (ip_pair.0, ip_pair.1 + 1), + (Less, Equal) => (ip_pair.0 + 1, ip_pair.0 + 2), + } +} + +/// Reserve IPs in 127.0.1.0/24 for these HBONE connection tests. +/// Thus, we identify hosts by a u8 in the form 127.0.1.x +fn hbone_connection_ip(x: u8) -> IpAddr { + IpAddr::V4(Ipv4Addr::new(127, 0, 1, x)) +} + +fn hbone_connection_config() -> ztunnel::config::ConfigSource { + let mut workloads: Vec = Vec::with_capacity(MAX_HBONE_WORKLOADS as usize); + // We can't create one work load with many IPs because ztunnel could connect to any one causing + // inconsistent behavior. + for i in 1..MAX_HBONE_WORKLOADS + 1 { + let lwl = LocalWorkload { + workload: Workload { + workload_ips: vec![hbone_connection_ip(i)], + protocol: Protocol::HBONE, + uid: format!("cluster1//v1/Pod/default/local-source{}", i), + name: format!("workload-{}", i), + namespace: format!("namespace-{}", i), + service_account: format!("service-account-{}", i), + ..test_helpers::test_default_workload() + }, + services: Default::default(), + }; + workloads.push(lwl); + } + + let lc = ztunnel::xds::LocalConfig { + workloads, + policies: vec![], + services: vec![], + }; + let mut b = bytes::BytesMut::new().writer(); + serde_yaml::to_writer(&mut b, &lc).ok(); + let b = b.into_inner().freeze(); + ztunnel::config::ConfigSource::Static(b) +} + +/// Benchmark how long it takes to establish a new HBONE connection. +/// This is tricky because ztunnel will keep a connection pool. +/// Repeated connections from the same source to the same destination will use the pooled +/// connection. Instead, we register MAX_HBONE_WORKLOADS giving us O(MAX_HBONE_WORKLOADS^2) +/// source/destination IP combinations which is (hopefully) enough. +fn hbone_connections(c: &mut Criterion) { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + // Global setup: spin up an echo server and ztunnel instance + let (echo_addr, ta) = rt.block_on(async move { + let cert_manager = identity::mock::new_secret_manager(Duration::from_secs(10)); + let port = 80; + let config_source = Some(hbone_connection_config()); + let config = test_helpers::test_config_with_port_xds_addr_and_root_cert( + port, + None, + None, + config_source, + ); + let app = app::build_with_cert(config, cert_manager.clone()) + .await + .unwrap(); + let ta = TestApp::from((&app, cert_manager)); + ta.ready().await; + + let echo = tcp::TestServer::new(Mode::ReadWrite, 0).await; + let echo_addr = echo.address(); + let _ = tokio::spawn(async move { + let _ = tokio::join!(app.wait_termination(), echo.run()); + }); + (echo_addr, ta) + }); + + let ta: Arc> = Arc::new(Mutex::new(ta)); + // Host addresses can't end with 0. + let addresses = Arc::new(Mutex::new((1u8, 2u8))); + + let mut c = c.benchmark_group("hbone_connection"); + // WARNING increasing the measurement time could lead to running out of IP pairs or having to + // many open connections. + c.measurement_time(Duration::from_secs(5)); + c.bench_function("connect_request_response", |b| { + b.to_async(&rt).iter(|| async { + let bench = async { + let mut addresses = addresses.lock().await; + let ta = ta.lock().await; + + // Get next address pair + *addresses = next_ip_pair(*addresses); + let source_addr = hbone_connection_ip(addresses.0); + let dest_addr = hbone_connection_ip(addresses.1); + + // Start HBONE connection + let mut hbone = ta + .socks5_connect(helpers::with_ip(echo_addr, dest_addr), source_addr) + .await; + + // TCP ping + hbone.write_u8(42).await.ok(); + hbone.read_u8().await.ok(); + }; + + // If misconfigured, `socks5_connect` will silently fail causing subsequent commands + // to hang. Panic if too slow. + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(1)) => panic!("Timeout: Test is hanging."), + _ = bench => () + }; + }) + }); +} + criterion_group! { name = benches; config = Criterion::default() .with_profiler(PProfProfiler::new(100, Output::Protobuf)) .warm_up_time(Duration::from_millis(1)); - targets = latency, throughput, connections, rbac_latency, rbac_throughput, rbac_connections, + targets = hbone_connections, latency, throughput, connections, rbac_latency, rbac_throughput, rbac_connections, } criterion_main!(benches); diff --git a/examples/localhost.yaml b/examples/localhost.yaml index 25e07e5893..1e486ae898 100644 --- a/examples/localhost.yaml +++ b/examples/localhost.yaml @@ -52,4 +52,4 @@ services: vips: - remote/127.10.0.2 ports: - 80: 8080 \ No newline at end of file + 80: 8080 diff --git a/src/test_helpers/app.rs b/src/test_helpers/app.rs index 59ebf75eab..b941bb676d 100644 --- a/src/test_helpers/app.rs +++ b/src/test_helpers/app.rs @@ -33,7 +33,7 @@ use tokio::net::{TcpSocket, TcpStream}; use crate::app::Bound; use crate::identity::SecretManager; -use crate::test_helpers::{localhost_error_message, TEST_WORKLOAD_SOURCE}; +use crate::test_helpers::localhost_error_message; use crate::*; use super::helpers::*; @@ -150,7 +150,7 @@ impl TestApp { panic!("failed to get ready (last: {last_err:?})"); } - pub async fn socks5_connect(&self, addr: SocketAddr) -> TcpStream { + pub async fn socks5_connect(&self, addr: SocketAddr, source: IpAddr) -> TcpStream { // Always use IPv4 address. In theory, we can resolve `localhost` to pick to support any machine // However, we need to make sure the WorkloadStore knows about both families then. let socks_addr = with_ip( @@ -161,7 +161,7 @@ impl TestApp { let socket = TcpSocket::new_v4().unwrap(); socket .bind(SocketAddr::from(( - TEST_WORKLOAD_SOURCE.parse::().unwrap(), + source, 0, ))) .map_err(|e| anyhow!("{:?}. {}", e, localhost_error_message())) diff --git a/tests/direct.rs b/tests/direct.rs index 36a3ab2f10..f019abbb1c 100644 --- a/tests/direct.rs +++ b/tests/direct.rs @@ -105,7 +105,9 @@ async fn test_shutdown_drain() { // we shouldn't be shutdown yet assert!(shutdown_rx.try_recv().is_err()); let dst = helpers::with_ip(echo_addr, TEST_WORKLOAD_HBONE.parse().unwrap()); - let mut stream = ta.socks5_connect(dst).await; + let mut stream = ta + .socks5_connect(dst, TEST_WORKLOAD_SOURCE.parse().unwrap()) + .await; read_write_stream(&mut stream).await; // Since we are connected, the app shouldn't shutdown shutdown.shutdown_now().await; @@ -146,7 +148,9 @@ async fn test_shutdown_forced_drain() { // we shouldn't be shutdown yet assert!(shutdown_rx.try_recv().is_err()); let dst = helpers::with_ip(echo_addr, TEST_WORKLOAD_HBONE.parse().unwrap()); - let mut stream = ta.socks5_connect(dst).await; + let mut stream = ta + .socks5_connect(dst, TEST_WORKLOAD_SOURCE.parse().unwrap()) + .await; const BODY: &[u8] = "hello world".as_bytes(); stream.write_all(BODY).await.unwrap(); @@ -197,7 +201,10 @@ async fn run_requests_test( let dst = SocketAddr::from_str(target) .unwrap_or_else(|_| helpers::with_ip(echo_addr, target.parse().unwrap())); for _ in 0..num_queries { - let mut stream = app.socks5_connect(dst).await; + let mut stream = app.socks5_connect(dst, + + TEST_WORKLOAD_SOURCE.parse().unwrap(), + ).await; read_write_stream(&mut stream).await; } if let Some(assertions) = metrics_assertions { @@ -349,7 +356,9 @@ async fn test_tcp_connections_metrics() { tokio::spawn(echo.run()); testapp::with_app(test_config(), |app| async move { let dst = helpers::with_ip(echo_addr, TEST_WORKLOAD_TCP.parse().unwrap()); - let mut stream = app.socks5_connect(dst).await; + let mut stream = app + .socks5_connect(dst, TEST_WORKLOAD_SOURCE.parse().unwrap()) + .await; read_write_stream(&mut stream).await; // We should have 1 open connection but 0 closed connections @@ -405,7 +414,9 @@ async fn test_tcp_bytes_metrics() { let cfg = test_config(); testapp::with_app(cfg, |app| async move { let dst = helpers::with_ip(echo_addr, TEST_WORKLOAD_TCP.parse().unwrap()); - let mut stream = app.socks5_connect(dst).await; + let mut stream = app + .socks5_connect(dst, TEST_WORKLOAD_SOURCE.parse().unwrap()) + .await; let size = read_write_stream(&mut stream).await as u64; drop(stream); From 330ab7aec3a735faf349a66d3af680ba046ef48d Mon Sep 17 00:00:00 2001 From: Steven Jin Xuan Date: Thu, 3 Aug 2023 16:02:25 +0000 Subject: [PATCH 2/3] Code review --- benches/throughput.rs | 7 +++---- tests/direct.rs | 7 +++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/benches/throughput.rs b/benches/throughput.rs index acb2fd8807..abad7ac62b 100644 --- a/benches/throughput.rs +++ b/benches/throughput.rs @@ -396,7 +396,7 @@ fn next_ip_pair(ip_pair: (u8, u8)) -> (u8, u8) { } /// Reserve IPs in 127.0.1.0/24 for these HBONE connection tests. -/// Thus, we identify hosts by a u8 in the form 127.0.1.x +/// Thus, we identify hosts by a u8 which represents an IP in the form 127.0.1.x. fn hbone_connection_ip(x: u8) -> IpAddr { IpAddr::V4(Ipv4Addr::new(127, 0, 1, x)) } @@ -404,7 +404,7 @@ fn hbone_connection_ip(x: u8) -> IpAddr { fn hbone_connection_config() -> ztunnel::config::ConfigSource { let mut workloads: Vec = Vec::with_capacity(MAX_HBONE_WORKLOADS as usize); // We can't create one work load with many IPs because ztunnel could connect to any one causing - // inconsistent behavior. + // inconsistent behavior. Instead, we create one workload per IP. for i in 1..MAX_HBONE_WORKLOADS + 1 { let lwl = LocalWorkload { workload: Workload { @@ -469,11 +469,10 @@ fn hbone_connections(c: &mut Criterion) { }); let ta: Arc> = Arc::new(Mutex::new(ta)); - // Host addresses can't end with 0. let addresses = Arc::new(Mutex::new((1u8, 2u8))); let mut c = c.benchmark_group("hbone_connection"); - // WARNING increasing the measurement time could lead to running out of IP pairs or having to + // WARNING: increasing the measurement time could lead to running out of IP pairs or having too // many open connections. c.measurement_time(Duration::from_secs(5)); c.bench_function("connect_request_response", |b| { diff --git a/tests/direct.rs b/tests/direct.rs index f019abbb1c..03ef015abd 100644 --- a/tests/direct.rs +++ b/tests/direct.rs @@ -201,10 +201,9 @@ async fn run_requests_test( let dst = SocketAddr::from_str(target) .unwrap_or_else(|_| helpers::with_ip(echo_addr, target.parse().unwrap())); for _ in 0..num_queries { - let mut stream = app.socks5_connect(dst, - - TEST_WORKLOAD_SOURCE.parse().unwrap(), - ).await; + let mut stream = app + .socks5_connect(dst, TEST_WORKLOAD_SOURCE.parse().unwrap()) + .await; read_write_stream(&mut stream).await; } if let Some(assertions) = metrics_assertions { From 0d61a916c1223e51f0fe242c32cb47ac7f32081b Mon Sep 17 00:00:00 2001 From: Steven Jin Xuan Date: Fri, 25 Aug 2023 17:01:33 +0000 Subject: [PATCH 3/3] Drop app and echo tasks --- benches/throughput.rs | 4 ++-- src/test_helpers/app.rs | 5 +---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/benches/throughput.rs b/benches/throughput.rs index abad7ac62b..516bc89384 100644 --- a/benches/throughput.rs +++ b/benches/throughput.rs @@ -462,9 +462,9 @@ fn hbone_connections(c: &mut Criterion) { let echo = tcp::TestServer::new(Mode::ReadWrite, 0).await; let echo_addr = echo.address(); - let _ = tokio::spawn(async move { + drop(tokio::spawn(async move { let _ = tokio::join!(app.wait_termination(), echo.run()); - }); + })); (echo_addr, ta) }); diff --git a/src/test_helpers/app.rs b/src/test_helpers/app.rs index b941bb676d..1f479bda6d 100644 --- a/src/test_helpers/app.rs +++ b/src/test_helpers/app.rs @@ -160,10 +160,7 @@ impl TestApp { // Set source IP to TEST_WORKLOAD_SOURCE let socket = TcpSocket::new_v4().unwrap(); socket - .bind(SocketAddr::from(( - source, - 0, - ))) + .bind(SocketAddr::from((source, 0))) .map_err(|e| anyhow!("{:?}. {}", e, localhost_error_message())) .unwrap();