Skip to content

Commit

Permalink
Merge pull request #4739 from daichifukui/allow-blocking-for-realtime
Browse files Browse the repository at this point in the history
Allow blocking for realtime
  • Loading branch information
syrusakbary authored May 23, 2024
2 parents d5df452 + 2c6a3a4 commit e921a04
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 24 deletions.
6 changes: 6 additions & 0 deletions lib/wasix/src/capabilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ pub struct CapabilityThreadingV1 {
/// time that it will pause the CPU)
/// (default = off)
pub enable_exponential_cpu_backoff: Option<Duration>,

/// Switches to a blocking sleep implementation instead
/// of the asynchronous runtime based implementation
pub enable_blocking_sleep: bool,
}

impl CapabilityThreadingV1 {
Expand All @@ -64,11 +68,13 @@ impl CapabilityThreadingV1 {
max_threads,
enable_asynchronous_threading,
enable_exponential_cpu_backoff,
enable_blocking_sleep,
} = other;
self.enable_asynchronous_threading |= enable_asynchronous_threading;
if let Some(val) = enable_exponential_cpu_backoff {
self.enable_exponential_cpu_backoff = Some(val);
}
self.max_threads = max_threads.or(self.max_threads);
self.enable_blocking_sleep |= enable_blocking_sleep;
}
}
70 changes: 46 additions & 24 deletions lib/wasix/src/syscalls/wasi/poll_oneoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ where

let pid = ctx.data().pid();
let tid = ctx.data().tid();
let subs_len = subs.len();

// Determine if we are in silent polling mode
let mut env = ctx.data();
Expand Down Expand Up @@ -379,6 +380,50 @@ where
Some(time)
}
};

// Function to process a timeout
let process_timeout = {
let clock_subs = clock_subs.clone();
|ctx: &FunctionEnvMut<'a, WasiEnv>| {
// The timeout has triggered so lets add that event
if clock_subs.is_empty() {
tracing::warn!("triggered_timeout (without any clock subscriptions)",);
}
let mut evts = Vec::new();
for (clock_info, userdata) in clock_subs {
let evt = Event {
userdata,
error: Errno::Success,
type_: Eventtype::Clock,
u: EventUnion { clock: 0 },
};
Span::current().record(
"seen",
&format!(
"clock(id={},userdata={})",
clock_info.clock_id as u32, evt.userdata
),
);
evts.push(evt);
}
evts
}
};

#[cfg(feature = "sys")]
if env.capabilities.threading.enable_blocking_sleep && subs_len == 1 {
// Here, `poll_oneoff` is merely in a sleeping state
// due to a single relative timer event. This particular scenario was
// added following experimental findings indicating that std::thread::sleep
// yields more consistent sleep durations, allowing wasmer to meet
// real-time demands with greater precision.
if let Some(timeout) = timeout {
std::thread::sleep(timeout);
process_events(&ctx, process_timeout(&ctx));
return Ok(Errno::Success);
}
}

let tasks = env.tasks().clone();
let timeout = async move {
if let Some(timeout) = timeout {
Expand Down Expand Up @@ -414,30 +459,7 @@ where
// Process the events
process_events(ctx, evts)
}
Err(Errno::Timedout) => {
// The timeout has triggered so lets add that event
if clock_subs.is_empty() {
tracing::warn!("triggered_timeout (without any clock subscriptions)",);
}
let mut evts = Vec::new();
for (clock_info, userdata) in clock_subs {
let evt = Event {
userdata,
error: Errno::Success,
type_: Eventtype::Clock,
u: EventUnion { clock: 0 },
};
Span::current().record(
"seen",
&format!(
"clock(id={},userdata={})",
clock_info.clock_id as u32, evt.userdata
),
);
evts.push(evt);
}
process_events(ctx, evts)
}
Err(Errno::Timedout) => process_events(ctx, process_timeout(ctx)),
// If nonblocking the Errno::Again needs to be turned into an empty list
Err(Errno::Again) => process_events(ctx, Default::default()),
// Otherwise process the error
Expand Down

0 comments on commit e921a04

Please sign in to comment.