diff --git a/lib/wasix/src/capabilities.rs b/lib/wasix/src/capabilities.rs index fae47feac7d..fb47c122087 100644 --- a/lib/wasix/src/capabilities.rs +++ b/lib/wasix/src/capabilities.rs @@ -56,6 +56,10 @@ pub struct CapabilityThreadingV1 { /// time that it will pause the CPU) /// (default = off) pub enable_exponential_cpu_backoff: Option, + + /// Switches to a blocking sleep implementation instead + /// of the asynchronous runtime based implementation + pub enable_blocking_sleep: bool, } impl CapabilityThreadingV1 { @@ -64,11 +68,13 @@ impl CapabilityThreadingV1 { max_threads, enable_asynchronous_threading, enable_exponential_cpu_backoff, + enable_blocking_sleep, } = other; self.enable_asynchronous_threading |= enable_asynchronous_threading; if let Some(val) = enable_exponential_cpu_backoff { self.enable_exponential_cpu_backoff = Some(val); } self.max_threads = max_threads.or(self.max_threads); + self.enable_blocking_sleep |= enable_blocking_sleep; } } diff --git a/lib/wasix/src/syscalls/wasi/poll_oneoff.rs b/lib/wasix/src/syscalls/wasi/poll_oneoff.rs index 2f9885c2963..98f3a501e81 100644 --- a/lib/wasix/src/syscalls/wasi/poll_oneoff.rs +++ b/lib/wasix/src/syscalls/wasi/poll_oneoff.rs @@ -220,6 +220,7 @@ where let pid = ctx.data().pid(); let tid = ctx.data().tid(); + let subs_len = subs.len(); // Determine if we are in silent polling mode let mut env = ctx.data(); @@ -379,6 +380,50 @@ where Some(time) } }; + + // Function to process a timeout + let process_timeout = { + let clock_subs = clock_subs.clone(); + |ctx: &FunctionEnvMut<'a, WasiEnv>| { + // The timeout has triggered so lets add that event + if clock_subs.is_empty() { + tracing::warn!("triggered_timeout (without any clock subscriptions)",); + } + let mut evts = Vec::new(); + for (clock_info, userdata) in clock_subs { + let evt = Event { + userdata, + error: Errno::Success, + type_: Eventtype::Clock, + u: EventUnion { clock: 0 }, + }; + Span::current().record( + "seen", + &format!( + "clock(id={},userdata={})", + clock_info.clock_id as u32, evt.userdata + ), + ); + evts.push(evt); + } + evts + } + }; + + #[cfg(feature = "sys")] + if env.capabilities.threading.enable_blocking_sleep && subs_len == 1 { + // Here, `poll_oneoff` is merely in a sleeping state + // due to a single relative timer event. This particular scenario was + // added following experimental findings indicating that std::thread::sleep + // yields more consistent sleep durations, allowing wasmer to meet + // real-time demands with greater precision. + if let Some(timeout) = timeout { + std::thread::sleep(timeout); + process_events(&ctx, process_timeout(&ctx)); + return Ok(Errno::Success); + } + } + let tasks = env.tasks().clone(); let timeout = async move { if let Some(timeout) = timeout { @@ -414,30 +459,7 @@ where // Process the events process_events(ctx, evts) } - Err(Errno::Timedout) => { - // The timeout has triggered so lets add that event - if clock_subs.is_empty() { - tracing::warn!("triggered_timeout (without any clock subscriptions)",); - } - let mut evts = Vec::new(); - for (clock_info, userdata) in clock_subs { - let evt = Event { - userdata, - error: Errno::Success, - type_: Eventtype::Clock, - u: EventUnion { clock: 0 }, - }; - Span::current().record( - "seen", - &format!( - "clock(id={},userdata={})", - clock_info.clock_id as u32, evt.userdata - ), - ); - evts.push(evt); - } - process_events(ctx, evts) - } + Err(Errno::Timedout) => process_events(ctx, process_timeout(ctx)), // If nonblocking the Errno::Again needs to be turned into an empty list Err(Errno::Again) => process_events(ctx, Default::default()), // Otherwise process the error