Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ libc = "0.2"
log = "0.4"
nix = { version = "0.29", features = ["socket", "sched", "uio", "fs", "ioctl", "user", "net", "mount"] }
once_cell = "1.21"
num_cpus = "1.16"
ppp = "2.3"
prometheus-client = { version = "0.23" }
prometheus-parse = "0.2"
Expand Down
1 change: 1 addition & 0 deletions fuzz/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ fn new_data_plane_pool(num_worker_threads: usize) -> mpsc::Sender<DataPlaneTask>
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("ztunnel-proxy-{id}")
// Thread name can only be 16 chars so keep it short
format!("ztunnel-{id}")
Comment thread
howardjohn marked this conversation as resolved.
})
.enable_all()
.build()
Expand Down
107 changes: 105 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const CA_ADDRESS: &str = "CA_ADDRESS";
const SECRET_TTL: &str = "SECRET_TTL";
const FAKE_CA: &str = "FAKE_CA";
const ZTUNNEL_WORKER_THREADS: &str = "ZTUNNEL_WORKER_THREADS";
const ZTUNNEL_CPU_LIMIT: &str = "ZTUNNEL_CPU_LIMIT";
const POOL_MAX_STREAMS_PER_CONNECTION: &str = "POOL_MAX_STREAMS_PER_CONNECTION";
const POOL_UNUSED_RELEASE_TIMEOUT: &str = "POOL_UNUSED_RELEASE_TIMEOUT";
// CONNECTION_TERMINATION_DEADLINE configures an explicit deadline
Expand Down Expand Up @@ -407,6 +408,60 @@ fn parse_headers(prefix: &str) -> Result<MetadataVector, Error> {
Ok(metadata)
}

fn get_cpu_count() -> Result<usize, Error> {
// Allow overriding the count with an env var. This can be used to pass the CPU limit on Kubernetes
// from the downward API.
// Note the downward API will return the total thread count ("logical cores") if no limit is set,
// so it is really the same as num_cpus.
// We allow num_cpus for cases its not set (not on Kubernetes, etc).
match parse::<usize>(ZTUNNEL_CPU_LIMIT)? {
Some(limit) => Ok(limit),
// This is *logical cores*
None => Ok(num_cpus::get()),
}
}

/// Parse worker threads configuration, supporting both fixed numbers and percentages
fn parse_worker_threads(default: usize) -> Result<usize, Error> {
match parse::<String>(ZTUNNEL_WORKER_THREADS)? {
Some(value) => {
if let Some(percent_str) = value.strip_suffix('%') {
// Parse as percentage
let percent: f64 = percent_str.parse().map_err(|e| {
Error::EnvVar(
ZTUNNEL_WORKER_THREADS.to_string(),
value.clone(),
format!("invalid percentage: {}", e),
)
})?;

if percent <= 0.0 || percent > 100.0 {
return Err(Error::EnvVar(
ZTUNNEL_WORKER_THREADS.to_string(),
value,
"percentage must be between 0 and 100".to_string(),
));
}

let cpu_count = get_cpu_count()?;
// Round up, minimum of 1
let threads = ((cpu_count as f64 * percent / 100.0).ceil() as usize).max(1);
Ok(threads)
} else {
// Parse as fixed number
value.parse::<usize>().map_err(|e| {
Error::EnvVar(
ZTUNNEL_WORKER_THREADS.to_string(),
value,
format!("invalid number: {}", e),
)
})
}
}
None => Ok(default),
}
}

pub fn parse_config() -> Result<Config, Error> {
let pc = parse_proxy_config()?;
construct_config(pc)
Expand Down Expand Up @@ -725,8 +780,7 @@ pub fn construct_config(pc: ProxyConfig) -> Result<Config, Error> {
fake_ca,
auth,

num_worker_threads: parse_default(
ZTUNNEL_WORKER_THREADS,
num_worker_threads: parse_worker_threads(
pc.concurrency.unwrap_or(DEFAULT_WORKER_THREADS).into(),
)?,

Expand Down Expand Up @@ -1094,4 +1148,53 @@ pub mod tests {
assert!(metadata.vec.contains(&(key, value)));
}
}

#[test]
fn test_parse_worker_threads() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we flake if test_parse_worker_threads and test_get_cpu_count run in parallel since they are both setting/removing the same env vars and making assertions based on the contents?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it does. will need to fix.

We should probably have some form of 'env var provider' abstraction

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short term we could probably combine them into a single test to work around it. Little wonky but probably ok

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea done

unsafe {
// Test fixed number
env::set_var(ZTUNNEL_WORKER_THREADS, "4");
assert_eq!(parse_worker_threads(2).unwrap(), 4);

// Test percentage with CPU limit
env::set_var(ZTUNNEL_CPU_LIMIT, "8");
env::set_var(ZTUNNEL_WORKER_THREADS, "50%");
assert_eq!(parse_worker_threads(2).unwrap(), 4); // 50% of 8 CPUs = 4 threads

// Test percentage with CPU limit
env::set_var(ZTUNNEL_CPU_LIMIT, "16");
env::set_var(ZTUNNEL_WORKER_THREADS, "30%");
assert_eq!(parse_worker_threads(2).unwrap(), 5); // Round up to 5

// Test low percentage that rounds up to 1
env::set_var(ZTUNNEL_CPU_LIMIT, "4");
env::set_var(ZTUNNEL_WORKER_THREADS, "10%");
assert_eq!(parse_worker_threads(2).unwrap(), 1); // 10% of 4 CPUs = 0.4, rounds up to 1

// Test default when no env var is set
env::remove_var(ZTUNNEL_WORKER_THREADS);
assert_eq!(parse_worker_threads(2).unwrap(), 2);

// Test invalid percentage
env::set_var(ZTUNNEL_WORKER_THREADS, "150%");
assert!(parse_worker_threads(2).is_err());

// Test invalid number
env::set_var(ZTUNNEL_WORKER_THREADS, "invalid");
assert!(parse_worker_threads(2).is_err());

// Test without CPU limit (should use system CPU count)
env::remove_var(ZTUNNEL_CPU_LIMIT);
let system_cpus = num_cpus::get();
assert_eq!(get_cpu_count().unwrap(), system_cpus);

// Test with CPU limit
env::set_var(ZTUNNEL_CPU_LIMIT, "12");
assert_eq!(get_cpu_count().unwrap(), 12);

// Clean up
env::remove_var(ZTUNNEL_WORKER_THREADS);
env::remove_var(ZTUNNEL_CPU_LIMIT);
}
}
}