Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 160 additions & 28 deletions benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,40 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::Ordering::{Equal, Greater, Less};
use std::env;
use std::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;

use bytes::BufMut;
use criterion::{
criterion_group, criterion_main, BenchmarkId, Criterion, SamplingMode, Throughput,
};
use pprof::criterion::{Output, PProfProfiler};
use prometheus_client::registry::Registry;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;
use tracing::info;

use ztunnel::metrics::IncrementRecorder;
use ztunnel::rbac::{Authorization, RbacMatch, StringMatch};
use ztunnel::state::workload::{Protocol, Workload};
use ztunnel::test_helpers::app::TestApp;
use ztunnel::test_helpers::tcp::Mode;
use ztunnel::test_helpers::TEST_WORKLOAD_HBONE;
use ztunnel::test_helpers::TEST_WORKLOAD_SOURCE;
use ztunnel::test_helpers::TEST_WORKLOAD_TCP;
use ztunnel::test_helpers::{helpers, tcp};
use ztunnel::xds::LocalWorkload;
use ztunnel::{app, identity, metrics, proxy, test_helpers};

const KB: usize = 1024;
const MB: usize = 1024 * KB;
// Must be less than or equal to 254
const MAX_HBONE_WORKLOADS: u8 = 64;

struct TestEnv {
ta: TestApp,
Expand Down Expand Up @@ -132,16 +139,16 @@ fn initialize_environment(
let _ = tokio::join!(app.wait_termination(), echo.run());
});
let mut hbone = ta
.socks5_connect(helpers::with_ip(
echo_addr,
TEST_WORKLOAD_HBONE.parse().unwrap(),
))
.socks5_connect(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section is repeated often.

e.g.

 let mut s =
    e.ta.socks5_connect(helpers::with_ip(
        e.echo_addr,
        TEST_WORKLOAD_TCP.parse().unwrap(),
    ))
    .await;

Wdyt about abstracting that logic into a helper function, maybe something called newTCPStream or similar. Have it maybe take in the TEST_WORKLOAD_<SOURCE/HBONE/TCP> string value as an argument. Would help to maintain that DRY principle while also making it convenient for the next person to extend this test with more benches?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll think about this. The whole echo server setup is clever but also confusing. You can access the echo server through any ip address on lo, but if you want to use ztunnel/socks5, the address you pick to access the echo server needs to be a registered workload.

Point is, I feel like the way its abstracted right now is already confusing, so I didn't want to abstract it any further. I do think there is probably a better way to do this. My first idea it to make TEST_WORKLOAD_* actual IP address instead of a string that can be parsed into an ip address. Thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The abstraction I refer to is to really help reduce the repetitive and confusing code in the tests. Using a helper function should actually make it more understandable.

helpers::with_ip(echo_addr, TEST_WORKLOAD_HBONE.parse().unwrap()),
TEST_WORKLOAD_SOURCE.parse().unwrap(),
)
.await;
let mut tcp = ta
.socks5_connect(helpers::with_ip(
echo_addr,
TEST_WORKLOAD_TCP.parse().unwrap(),
))
.socks5_connect(
helpers::with_ip(echo_addr, TEST_WORKLOAD_TCP.parse().unwrap()),
TEST_WORKLOAD_SOURCE.parse().unwrap(),
)
.await;
let mut direct = TcpStream::connect(echo_addr).await.unwrap();
direct.set_nodelay(true).unwrap();
Expand Down Expand Up @@ -286,24 +293,23 @@ pub fn connections(c: &mut Criterion) {
b.to_async(&rt).iter(|| async {
let e = env.lock().await;
let mut s =
e.ta.socks5_connect(helpers::with_ip(
e.echo_addr,
TEST_WORKLOAD_TCP.parse().unwrap(),
))
e.ta.socks5_connect(
helpers::with_ip(e.echo_addr, TEST_WORKLOAD_TCP.parse().unwrap()),
TEST_WORKLOAD_SOURCE.parse().unwrap(),
)
.await;
tcp::run_client(&mut s, 1, Mode::ReadWrite).await
})
});
// TODO(https://github.com/istio/ztunnel/issues/15): when we have pooling, split this into "new hbone connection"
// and "new connection on existing HBONE connection"
// This tests connection time over an existing HBONE connection.
c.bench_function("hbone", |b| {
b.to_async(&rt).iter(|| async {
let e = env.lock().await;
let mut s =
e.ta.socks5_connect(helpers::with_ip(
e.echo_addr,
TEST_WORKLOAD_HBONE.parse().unwrap(),
))
e.ta.socks5_connect(
helpers::with_ip(e.echo_addr, TEST_WORKLOAD_HBONE.parse().unwrap()),
TEST_WORKLOAD_SOURCE.parse().unwrap(),
)
.await;
tcp::run_client(&mut s, 1, Mode::ReadWrite).await
})
Expand All @@ -325,10 +331,10 @@ pub fn rbac_connections(c: &mut Criterion) {
b.to_async(&rt).iter(|| async {
let e = env.lock().await;
let mut s =
e.ta.socks5_connect(helpers::with_ip(
e.echo_addr,
TEST_WORKLOAD_TCP.parse().unwrap(),
))
e.ta.socks5_connect(
helpers::with_ip(e.echo_addr, TEST_WORKLOAD_TCP.parse().unwrap()),
TEST_WORKLOAD_SOURCE.parse().unwrap(),
)
.await;
tcp::run_client(&mut s, 1, Mode::ReadWrite).await
})
Expand All @@ -339,10 +345,10 @@ pub fn rbac_connections(c: &mut Criterion) {
b.to_async(&rt).iter(|| async {
let e = env.lock().await;
let mut s =
e.ta.socks5_connect(helpers::with_ip(
e.echo_addr,
TEST_WORKLOAD_HBONE.parse().unwrap(),
))
e.ta.socks5_connect(
helpers::with_ip(e.echo_addr, TEST_WORKLOAD_HBONE.parse().unwrap()),
TEST_WORKLOAD_SOURCE.parse().unwrap(),
)
.await;
tcp::run_client(&mut s, 1, Mode::ReadWrite).await
})
Expand Down Expand Up @@ -374,12 +380,138 @@ pub fn metrics(c: &mut Criterion) {
});
}

/// Iterate through possible IP pairs restricted to 0 < ip_pair.0 < ip_pair.1 <= MAX_HBONE_WORKLOADS.
fn next_ip_pair(ip_pair: (u8, u8)) -> (u8, u8) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this is only n*(n-1)/2 combos. If we wanted more new HBONE connections from the same number of IPs this could be optimized.

if ip_pair.0 == 0 || ip_pair.1 == 0 {
panic!("Invalid host");
}
match (
Ord::cmp(&ip_pair.0, &(MAX_HBONE_WORKLOADS - 1)),
Ord::cmp(&ip_pair.1, &MAX_HBONE_WORKLOADS),
) {
(Greater, _) | (_, Greater) | (Equal, Equal) => panic!("Invalid host"),
(_, Less) => (ip_pair.0, ip_pair.1 + 1),
(Less, Equal) => (ip_pair.0 + 1, ip_pair.0 + 2),
}
}

/// Reserve IPs in 127.0.1.0/24 for these HBONE connection tests.
/// Thus, we identify hosts by a u8 which represents an IP in the form 127.0.1.x.
fn hbone_connection_ip(x: u8) -> IpAddr {
IpAddr::V4(Ipv4Addr::new(127, 0, 1, x))
}

fn hbone_connection_config() -> ztunnel::config::ConfigSource {
let mut workloads: Vec<LocalWorkload> = Vec::with_capacity(MAX_HBONE_WORKLOADS as usize);
// We can't create one work load with many IPs because ztunnel could connect to any one causing
// inconsistent behavior. Instead, we create one workload per IP.
for i in 1..MAX_HBONE_WORKLOADS + 1 {
let lwl = LocalWorkload {
workload: Workload {
workload_ips: vec![hbone_connection_ip(i)],
protocol: Protocol::HBONE,
uid: format!("cluster1//v1/Pod/default/local-source{}", i),
name: format!("workload-{}", i),
namespace: format!("namespace-{}", i),
service_account: format!("service-account-{}", i),
..test_helpers::test_default_workload()
},
services: Default::default(),
};
workloads.push(lwl);
}

let lc = ztunnel::xds::LocalConfig {
workloads,
policies: vec![],
services: vec![],
};
let mut b = bytes::BytesMut::new().writer();
serde_yaml::to_writer(&mut b, &lc).ok();
let b = b.into_inner().freeze();
ztunnel::config::ConfigSource::Static(b)
}

/// Benchmark how long it takes to establish a new HBONE connection.
/// This is tricky because ztunnel will keep a connection pool.
/// Repeated connections from the same source to the same destination will use the pooled
/// connection. Instead, we register MAX_HBONE_WORKLOADS giving us O(MAX_HBONE_WORKLOADS^2)
/// source/destination IP combinations which is (hopefully) enough.
fn hbone_connections(c: &mut Criterion) {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

// Global setup: spin up an echo server and ztunnel instance
let (echo_addr, ta) = rt.block_on(async move {
let cert_manager = identity::mock::new_secret_manager(Duration::from_secs(10));
let port = 80;
let config_source = Some(hbone_connection_config());
let config = test_helpers::test_config_with_port_xds_addr_and_root_cert(
port,
None,
None,
config_source,
);
let app = app::build_with_cert(config, cert_manager.clone())
.await
.unwrap();
let ta = TestApp::from((&app, cert_manager));
ta.ready().await;

let echo = tcp::TestServer::new(Mode::ReadWrite, 0).await;
let echo_addr = echo.address();
drop(tokio::spawn(async move {
let _ = tokio::join!(app.wait_termination(), echo.run());
}));
(echo_addr, ta)
});

let ta: Arc<Mutex<TestApp>> = Arc::new(Mutex::new(ta));
let addresses = Arc::new(Mutex::new((1u8, 2u8)));

let mut c = c.benchmark_group("hbone_connection");
// WARNING: increasing the measurement time could lead to running out of IP pairs or having too
// many open connections.
c.measurement_time(Duration::from_secs(5));
c.bench_function("connect_request_response", |b| {
b.to_async(&rt).iter(|| async {
let bench = async {
let mut addresses = addresses.lock().await;
let ta = ta.lock().await;

// Get next address pair
*addresses = next_ip_pair(*addresses);
let source_addr = hbone_connection_ip(addresses.0);
let dest_addr = hbone_connection_ip(addresses.1);

// Start HBONE connection
let mut hbone = ta
.socks5_connect(helpers::with_ip(echo_addr, dest_addr), source_addr)
.await;

// TCP ping
hbone.write_u8(42).await.ok();
hbone.read_u8().await.ok();
};

// If misconfigured, `socks5_connect` will silently fail causing subsequent commands
// to hang. Panic if too slow.
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => panic!("Timeout: Test is hanging."),
_ = bench => ()
};
})
});
}

criterion_group! {
name = benches;
config = Criterion::default()
.with_profiler(PProfProfiler::new(100, Output::Protobuf))
.warm_up_time(Duration::from_millis(1));
targets = latency, throughput, connections, rbac_latency, rbac_throughput, rbac_connections,
targets = hbone_connections, latency, throughput, connections, rbac_latency, rbac_throughput, rbac_connections,
}

criterion_main!(benches);
2 changes: 1 addition & 1 deletion examples/localhost.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ services:
vips:
- remote/127.10.0.2
ports:
80: 8080
80: 8080
9 changes: 3 additions & 6 deletions src/test_helpers/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tokio::net::{TcpSocket, TcpStream};

use crate::app::Bound;
use crate::identity::SecretManager;
use crate::test_helpers::{localhost_error_message, TEST_WORKLOAD_SOURCE};
use crate::test_helpers::localhost_error_message;
use crate::*;

use super::helpers::*;
Expand Down Expand Up @@ -150,7 +150,7 @@ impl TestApp {
panic!("failed to get ready (last: {last_err:?})");
}

pub async fn socks5_connect(&self, addr: SocketAddr) -> TcpStream {
pub async fn socks5_connect(&self, addr: SocketAddr, source: IpAddr) -> TcpStream {
// Always use IPv4 address. In theory, we can resolve `localhost` to pick to support any machine
// However, we need to make sure the WorkloadStore knows about both families then.
let socks_addr = with_ip(
Expand All @@ -160,10 +160,7 @@ impl TestApp {
// Set source IP to TEST_WORKLOAD_SOURCE
let socket = TcpSocket::new_v4().unwrap();
socket
.bind(SocketAddr::from((
TEST_WORKLOAD_SOURCE.parse::<IpAddr>().unwrap(),
0,
)))
.bind(SocketAddr::from((source, 0)))
.map_err(|e| anyhow!("{:?}. {}", e, localhost_error_message()))
.unwrap();

Expand Down
20 changes: 15 additions & 5 deletions tests/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ async fn test_shutdown_drain() {
// we shouldn't be shutdown yet
assert!(shutdown_rx.try_recv().is_err());
let dst = helpers::with_ip(echo_addr, TEST_WORKLOAD_HBONE.parse().unwrap());
let mut stream = ta.socks5_connect(dst).await;
let mut stream = ta
.socks5_connect(dst, TEST_WORKLOAD_SOURCE.parse().unwrap())
.await;
read_write_stream(&mut stream).await;
// Since we are connected, the app shouldn't shutdown
shutdown.shutdown_now().await;
Expand Down Expand Up @@ -146,7 +148,9 @@ async fn test_shutdown_forced_drain() {
// we shouldn't be shutdown yet
assert!(shutdown_rx.try_recv().is_err());
let dst = helpers::with_ip(echo_addr, TEST_WORKLOAD_HBONE.parse().unwrap());
let mut stream = ta.socks5_connect(dst).await;
let mut stream = ta
.socks5_connect(dst, TEST_WORKLOAD_SOURCE.parse().unwrap())
.await;
const BODY: &[u8] = "hello world".as_bytes();
stream.write_all(BODY).await.unwrap();

Expand Down Expand Up @@ -197,7 +201,9 @@ async fn run_requests_test(
let dst = SocketAddr::from_str(target)
.unwrap_or_else(|_| helpers::with_ip(echo_addr, target.parse().unwrap()));
for _ in 0..num_queries {
let mut stream = app.socks5_connect(dst).await;
let mut stream = app
.socks5_connect(dst, TEST_WORKLOAD_SOURCE.parse().unwrap())
.await;
read_write_stream(&mut stream).await;
}
if let Some(assertions) = metrics_assertions {
Expand Down Expand Up @@ -349,7 +355,9 @@ async fn test_tcp_connections_metrics() {
tokio::spawn(echo.run());
testapp::with_app(test_config(), |app| async move {
let dst = helpers::with_ip(echo_addr, TEST_WORKLOAD_TCP.parse().unwrap());
let mut stream = app.socks5_connect(dst).await;
let mut stream = app
.socks5_connect(dst, TEST_WORKLOAD_SOURCE.parse().unwrap())
.await;
read_write_stream(&mut stream).await;

// We should have 1 open connection but 0 closed connections
Expand Down Expand Up @@ -405,7 +413,9 @@ async fn test_tcp_bytes_metrics() {
let cfg = test_config();
testapp::with_app(cfg, |app| async move {
let dst = helpers::with_ip(echo_addr, TEST_WORKLOAD_TCP.parse().unwrap());
let mut stream = app.socks5_connect(dst).await;
let mut stream = app
.socks5_connect(dst, TEST_WORKLOAD_SOURCE.parse().unwrap())
.await;
let size = read_write_stream(&mut stream).await as u64;
drop(stream);

Expand Down