diff --git a/Cargo.lock b/Cargo.lock index 8d5e6db945a..2d9412d3110 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -910,6 +910,16 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7059fff8937831a9ae6f0fe4d658ffabf58f2ca96aa9dec1c889f936f705f216" +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.3" @@ -3793,6 +3803,19 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "rusty_pool" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ed36cdb20de66d89a17ea04b8883fc7a386f2cf877aaedca5005583ce4876ff" +dependencies = [ + "crossbeam-channel", + "futures", + "futures-channel", + "futures-executor", + "num_cpus", +] + [[package]] name = "ryu" version = "1.0.15" @@ -6240,8 +6263,8 @@ dependencies = [ "pin-project", "pretty_assertions", "rand", - "rayon", "reqwest", + "rusty_pool", "semver 1.0.20", "serde", "serde_cbor", diff --git a/lib/wasix/Cargo.toml b/lib/wasix/Cargo.toml index 189b55f9c11..e2156ac668c 100644 --- a/lib/wasix/Cargo.toml +++ b/lib/wasix/Cargo.toml @@ -13,6 +13,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] +rusty_pool = { version = "0.7.0", optional = true } cfg-if = "1.0" thiserror = "1" tracing = { version = "0.1.37" } @@ -66,7 +67,6 @@ tower-http = { version = "0.4.0", features = ["trace", "util", "catch-panic", "c tower = { version = "0.4.13", features = ["make", "util"], optional = true } url = "2.3.1" petgraph = "0.6.3" -rayon = { version = "1.7.0", optional = true } wasm-bindgen = { version = "0.2.87", optional = true } js-sys = { version = "0.3.64", optional = true } wasm-bindgen-futures = { version = "0.4.37", optional = true } @@ -118,7 +118,7 @@ webc_runner_rt_emscripten = ["wasmer-emscripten"] sys = ["webc/mmap", "time", "virtual-mio/sys"] sys-default = ["sys", "logging", "host-fs", "sys-poll", "sys-thread", "host-vnet", "host-threads", "host-reqwest"] sys-poll = [] -sys-thread = ["tokio/rt", "tokio/time", "tokio/rt-multi-thread", "rayon"] +sys-thread = ["tokio/rt", "tokio/time", "tokio/rt-multi-thread", "rusty_pool"] # Deprecated. Kept it for compatibility compiler = [] diff --git a/lib/wasix/src/runtime/task_manager/tokio.rs b/lib/wasix/src/runtime/task_manager/tokio.rs index f7b0dc1d5d5..b9f2a4c098c 100644 --- a/lib/wasix/src/runtime/task_manager/tokio.rs +++ b/lib/wasix/src/runtime/task_manager/tokio.rs @@ -43,11 +43,34 @@ impl RuntimeOrHandle { } } +#[derive(Clone)] +pub struct ThreadPool { + inner: rusty_pool::ThreadPool, +} + +impl std::ops::Deref for ThreadPool { + type Target = rusty_pool::ThreadPool; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl std::fmt::Debug for ThreadPool { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ThreadPool") + .field("name", &self.get_name()) + .field("current_worker_count", &self.get_current_worker_count()) + .field("idle_worker_count", &self.get_idle_worker_count()) + .finish() + } +} + /// A task manager that uses tokio to spawn tasks. #[derive(Clone, Debug)] pub struct TokioTaskManager { rt: RuntimeOrHandle, - pool: Arc, + pool: Arc, } impl TokioTaskManager { @@ -62,12 +85,13 @@ impl TokioTaskManager { Self { rt: rt.into(), - pool: Arc::new( - rayon::ThreadPoolBuilder::new() - .num_threads(max_threads) - .build() - .unwrap(), - ), + pool: Arc::new(ThreadPool { + inner: rusty_pool::Builder::new() + .name("TokioTaskManager Thread Pool".to_string()) + .core_size(max_threads) + .max_size(max_threads) + .build(), + }), } } @@ -141,7 +165,7 @@ impl VirtualTaskManager for TokioTaskManager { self.rt.handle().spawn(async move { let result = trigger.await; // Build the task that will go on the callback - pool.spawn(move || { + pool.execute(move || { // Invoke the callback run(TaskWasmRunProperties { ctx, @@ -154,7 +178,7 @@ impl VirtualTaskManager for TokioTaskManager { tracing::trace!("spawning task_wasm in blocking thread"); // Run the callback on a dedicated thread - self.pool.spawn(move || { + self.pool.execute(move || { tracing::trace!("task_wasm started in blocking thread"); // Invoke the callback @@ -173,7 +197,7 @@ impl VirtualTaskManager for TokioTaskManager { &self, task: Box, ) -> Result<(), WasiThreadError> { - self.pool.spawn(move || { + self.pool.execute(move || { task(); }); Ok(())