From 21f843627d66c4a1f36509c529933f26a15570ae Mon Sep 17 00:00:00 2001 From: John Howard Date: Tue, 3 Jun 2025 07:52:54 -0700 Subject: [PATCH 1/2] Allow dynamic configuration of thread count --- Cargo.lock | 1 + Cargo.toml | 1 + fuzz/Cargo.lock | 1 + src/app.rs | 3 +- src/config.rs | 115 +++++++++++++++++++++++++++++++++++++++++++++++- 5 files changed, 118 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 78a1a0c51e..2410a9441b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4441,6 +4441,7 @@ dependencies = [ "matches", "netns-rs", "nix 0.29.0", + "num_cpus", "oid-registry", "once_cell", "openssl", diff --git a/Cargo.toml b/Cargo.toml index e950d7a705..5d4992ffd1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ libc = "0.2" log = "0.4" nix = { version = "0.29", features = ["socket", "sched", "uio", "fs", "ioctl", "user", "net", "mount"] } once_cell = "1.21" +num_cpus = "1.16" ppp = "2.3" prometheus-client = { version = "0.23" } prometheus-parse = "0.2" diff --git a/fuzz/Cargo.lock b/fuzz/Cargo.lock index 6659f0c2af..301b06b1ef 100644 --- a/fuzz/Cargo.lock +++ b/fuzz/Cargo.lock @@ -3843,6 +3843,7 @@ dependencies = [ "log", "netns-rs", "nix 0.29.0", + "num_cpus", "once_cell", "pin-project-lite", "pingora-pool", diff --git a/src/app.rs b/src/app.rs index 320054f69c..482cedd03c 100644 --- a/src/app.rs +++ b/src/app.rs @@ -266,7 +266,8 @@ fn new_data_plane_pool(num_worker_threads: usize) -> mpsc::Sender .thread_name_fn(|| { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); - format!("ztunnel-proxy-{id}") + // Thread name can only be 16 chars so keep it short + format!("ztunnel-{id}") }) .enable_all() .build() diff --git a/src/config.rs b/src/config.rs index c5b9492fe9..5bc508c417 100644 --- a/src/config.rs +++ b/src/config.rs @@ -58,6 +58,7 @@ const CA_ADDRESS: &str = "CA_ADDRESS"; const SECRET_TTL: &str = "SECRET_TTL"; const FAKE_CA: &str = "FAKE_CA"; const ZTUNNEL_WORKER_THREADS: &str = "ZTUNNEL_WORKER_THREADS"; +const ZTUNNEL_CPU_LIMIT: &str = "ZTUNNEL_CPU_LIMIT"; const POOL_MAX_STREAMS_PER_CONNECTION: &str = "POOL_MAX_STREAMS_PER_CONNECTION"; const POOL_UNUSED_RELEASE_TIMEOUT: &str = "POOL_UNUSED_RELEASE_TIMEOUT"; // CONNECTION_TERMINATION_DEADLINE configures an explicit deadline @@ -407,6 +408,60 @@ fn parse_headers(prefix: &str) -> Result { Ok(metadata) } +fn get_cpu_count() -> Result { + // Allow overriding the count with an env var. This can be used to pass the CPU limit on Kubernetes + // from the downward API. + // Note the downward API will return the total thread count ("logical cores") if no limit is set, + // so it is really the same as num_cpus. + // We allow num_cpus for cases its not set (not on Kubernetes, etc). + match parse::(ZTUNNEL_CPU_LIMIT)? { + Some(limit) => Ok(limit), + // This is *logical cores* + None => Ok(num_cpus::get()), + } +} + +/// Parse worker threads configuration, supporting both fixed numbers and percentages +fn parse_worker_threads(default: usize) -> Result { + match parse::(ZTUNNEL_WORKER_THREADS)? { + Some(value) => { + if let Some(percent_str) = value.strip_suffix('%') { + // Parse as percentage + let percent: f64 = percent_str.parse().map_err(|e| { + Error::EnvVar( + ZTUNNEL_WORKER_THREADS.to_string(), + value.clone(), + format!("invalid percentage: {}", e), + ) + })?; + + if percent <= 0.0 || percent > 100.0 { + return Err(Error::EnvVar( + ZTUNNEL_WORKER_THREADS.to_string(), + value, + "percentage must be between 0 and 100".to_string(), + )); + } + + let cpu_count = get_cpu_count()?; + // Round up, minimum of 1 + let threads = ((cpu_count as f64 * percent / 100.0).ceil() as usize).max(1); + Ok(threads) + } else { + // Parse as fixed number + value.parse::().map_err(|e| { + Error::EnvVar( + ZTUNNEL_WORKER_THREADS.to_string(), + value, + format!("invalid number: {}", e), + ) + }) + } + } + None => Ok(default), + } +} + pub fn parse_config() -> Result { let pc = parse_proxy_config()?; construct_config(pc) @@ -725,8 +780,7 @@ pub fn construct_config(pc: ProxyConfig) -> Result { fake_ca, auth, - num_worker_threads: parse_default( - ZTUNNEL_WORKER_THREADS, + num_worker_threads: parse_worker_threads( pc.concurrency.unwrap_or(DEFAULT_WORKER_THREADS).into(), )?, @@ -1094,4 +1148,61 @@ pub mod tests { assert!(metadata.vec.contains(&(key, value))); } } + + #[test] + fn test_parse_worker_threads() { + unsafe { + // Test fixed number + env::set_var(ZTUNNEL_WORKER_THREADS, "4"); + assert_eq!(parse_worker_threads(2).unwrap(), 4); + + // Test percentage with CPU limit + env::set_var(ZTUNNEL_CPU_LIMIT, "8"); + env::set_var(ZTUNNEL_WORKER_THREADS, "50%"); + assert_eq!(parse_worker_threads(2).unwrap(), 4); // 50% of 8 CPUs = 4 threads + + // Test percentage with CPU limit + env::set_var(ZTUNNEL_CPU_LIMIT, "16"); + env::set_var(ZTUNNEL_WORKER_THREADS, "30%"); + assert_eq!(parse_worker_threads(2).unwrap(), 5); // Round up to 5 + + // Test low percentage that rounds up to 1 + env::set_var(ZTUNNEL_CPU_LIMIT, "4"); + env::set_var(ZTUNNEL_WORKER_THREADS, "10%"); + assert_eq!(parse_worker_threads(2).unwrap(), 1); // 10% of 4 CPUs = 0.4, rounds up to 1 + + // Test default when no env var is set + env::remove_var(ZTUNNEL_WORKER_THREADS); + assert_eq!(parse_worker_threads(2).unwrap(), 2); + + // Test invalid percentage + env::set_var(ZTUNNEL_WORKER_THREADS, "150%"); + assert!(parse_worker_threads(2).is_err()); + + // Test invalid number + env::set_var(ZTUNNEL_WORKER_THREADS, "invalid"); + assert!(parse_worker_threads(2).is_err()); + + // Clean up + env::remove_var(ZTUNNEL_WORKER_THREADS); + env::remove_var(ZTUNNEL_CPU_LIMIT); + } + } + + #[test] + fn test_get_cpu_count() { + unsafe { + // Test without CPU limit (should use system CPU count) + env::remove_var(ZTUNNEL_CPU_LIMIT); + let system_cpus = num_cpus::get(); + assert_eq!(get_cpu_count().unwrap(), system_cpus); + + // Test with CPU limit + env::set_var(ZTUNNEL_CPU_LIMIT, "12"); + assert_eq!(get_cpu_count().unwrap(), 12); + + // Clean up + env::remove_var(ZTUNNEL_CPU_LIMIT); + } + } } From 346dde66aa390146b20f364b9727441f7a51f92c Mon Sep 17 00:00:00 2001 From: John Howard Date: Wed, 4 Jun 2025 11:04:51 -0700 Subject: [PATCH 2/2] fix flakes --- src/config.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/config.rs b/src/config.rs index 5bc508c417..8530196cbc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1183,15 +1183,6 @@ pub mod tests { env::set_var(ZTUNNEL_WORKER_THREADS, "invalid"); assert!(parse_worker_threads(2).is_err()); - // Clean up - env::remove_var(ZTUNNEL_WORKER_THREADS); - env::remove_var(ZTUNNEL_CPU_LIMIT); - } - } - - #[test] - fn test_get_cpu_count() { - unsafe { // Test without CPU limit (should use system CPU count) env::remove_var(ZTUNNEL_CPU_LIMIT); let system_cpus = num_cpus::get(); @@ -1202,6 +1193,7 @@ pub mod tests { assert_eq!(get_cpu_count().unwrap(), 12); // Clean up + env::remove_var(ZTUNNEL_WORKER_THREADS); env::remove_var(ZTUNNEL_CPU_LIMIT); } }