diff --git a/Cargo.lock b/Cargo.lock index 2f9fe967b90..b96d4154246 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2223,7 +2223,7 @@ dependencies = [ "futures-util", "http", "hyper", - "rustls 0.21.10", + "rustls 0.21.11", "tokio 1.37.0", "tokio-rustls", ] @@ -3823,7 +3823,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.10", + "rustls 0.21.11", "rustls-pemfile", "serde", "serde_json", @@ -4006,9 +4006,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.10" +version = "0.21.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4" dependencies = [ "log 0.4.21", "ring", @@ -4018,9 +4018,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.3" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99008d7ad0bbbea527ec27bddbc0e432c5b87d8175178cee68d2eec9c4a1813c" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" dependencies = [ "log 0.4.21", "ring", @@ -5110,7 +5110,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.10", + "rustls 0.21.11", "tokio 1.37.0", ] @@ -5205,7 +5205,7 @@ checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" dependencies = [ "futures-util", "log 0.4.21", - "rustls 0.21.10", + "rustls 0.21.11", "rustls-native-certs", "tokio 1.37.0", "tokio-rustls", @@ -5532,7 +5532,7 @@ dependencies = [ "httparse", "log 0.4.21", "rand", - "rustls 0.21.10", + "rustls 0.21.11", "sha1", "thiserror", "url", @@ -5545,7 +5545,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 0.1.10", "static_assertions", ] @@ -5670,7 +5670,7 @@ dependencies = [ "flate2", "log 0.4.21", "once_cell", - "rustls 0.22.3", + "rustls 0.22.4", "rustls-pki-types", "rustls-webpki 0.102.2", "url", diff --git a/lib/cli/Cargo.toml b/lib/cli/Cargo.toml index 76599e57537..ec5749ed161 100644 --- a/lib/cli/Cargo.toml +++ b/lib/cli/Cargo.toml @@ -83,6 +83,7 @@ wasmer-wasix = { version = "0.18.3", path = "../wasix", features = [ "webc_runner_rt_dproxy", "webc_runner_rt_emscripten", "host-fs", + "ctrlc" ] } wasmer-wast = { version = "=4.2.8", path = "../../tests/lib/wast", optional = true } wasmer-types = { version = "=4.2.8", path = "../types", features = [ diff --git a/lib/cli/src/commands/journal/mount/fs.rs b/lib/cli/src/commands/journal/mount/fs.rs index f5baaf894d3..5544eed0f56 100644 --- a/lib/cli/src/commands/journal/mount/fs.rs +++ b/lib/cli/src/commands/journal/mount/fs.rs @@ -338,6 +338,12 @@ impl WritableJournal for MutexState { } Ok(ret) } + + fn flush(&self) -> anyhow::Result<()> { + let mut state = self.inner.lock().unwrap(); + state.mem_fs.flush()?; + Ok(()) + } } impl JournalFileSystem { diff --git a/lib/journal/src/concrete/arc.rs b/lib/journal/src/concrete/arc.rs index 1790777215a..650f2d5fdb2 100644 --- a/lib/journal/src/concrete/arc.rs +++ b/lib/journal/src/concrete/arc.rs @@ -16,6 +16,10 @@ impl WritableJournal for Arc { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.deref().write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.deref().flush() + } } impl ReadableJournal for Arc { @@ -32,6 +36,10 @@ impl WritableJournal for Arc { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.deref().write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.deref().flush() + } } impl Journal for Arc { diff --git a/lib/journal/src/concrete/archived.rs b/lib/journal/src/concrete/archived.rs index 0536d8b3d12..9ba35130d91 100644 --- a/lib/journal/src/concrete/archived.rs +++ b/lib/journal/src/concrete/archived.rs @@ -1577,6 +1577,7 @@ pub enum JournalSnapshotTriggerV1 { Sigtstp, Sigstop, NonDeterministicCall, + Bootstrap, } #[repr(C)] diff --git a/lib/journal/src/concrete/archived_from.rs b/lib/journal/src/concrete/archived_from.rs index 1e76643a54f..867945dbd73 100644 --- a/lib/journal/src/concrete/archived_from.rs +++ b/lib/journal/src/concrete/archived_from.rs @@ -184,6 +184,7 @@ impl From for JournalSnapshotTriggerV1 { SnapshotTrigger::Sigtstp => JournalSnapshotTriggerV1::Sigtstp, SnapshotTrigger::Sigstop => JournalSnapshotTriggerV1::Sigstop, SnapshotTrigger::NonDeterministicCall => JournalSnapshotTriggerV1::NonDeterministicCall, + SnapshotTrigger::Bootstrap => JournalSnapshotTriggerV1::Bootstrap, } } } @@ -201,6 +202,7 @@ impl From for SnapshotTrigger { JournalSnapshotTriggerV1::Sigtstp => SnapshotTrigger::Sigtstp, JournalSnapshotTriggerV1::Sigstop => SnapshotTrigger::Sigstop, JournalSnapshotTriggerV1::NonDeterministicCall => SnapshotTrigger::NonDeterministicCall, + JournalSnapshotTriggerV1::Bootstrap => SnapshotTrigger::Bootstrap, } } } @@ -220,6 +222,7 @@ impl From<&'_ ArchivedJournalSnapshotTriggerV1> for SnapshotTrigger { ArchivedJournalSnapshotTriggerV1::NonDeterministicCall => { SnapshotTrigger::NonDeterministicCall } + ArchivedJournalSnapshotTriggerV1::Bootstrap => SnapshotTrigger::Bootstrap, } } } diff --git a/lib/journal/src/concrete/boxed.rs b/lib/journal/src/concrete/boxed.rs index 9db8cdba87d..ce54a13c019 100644 --- a/lib/journal/src/concrete/boxed.rs +++ b/lib/journal/src/concrete/boxed.rs @@ -16,6 +16,10 @@ impl WritableJournal for Box { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.deref().write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.deref().flush() + } } impl ReadableJournal for Box { @@ -32,4 +36,8 @@ impl WritableJournal for Box { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.deref().write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.deref().flush() + } } diff --git a/lib/journal/src/concrete/buffered.rs b/lib/journal/src/concrete/buffered.rs index c5011c417c0..a985d44da4a 100644 --- a/lib/journal/src/concrete/buffered.rs +++ b/lib/journal/src/concrete/buffered.rs @@ -50,6 +50,10 @@ impl WritableJournal for BufferedJournalTx { record_end: state.offset as u64 + estimate_size as u64, }) } + + fn flush(&self) -> anyhow::Result<()> { + Ok(()) + } } impl ReadableJournal for BufferedJournalRx { @@ -81,6 +85,10 @@ impl WritableJournal for BufferedJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } } impl ReadableJournal for BufferedJournal { diff --git a/lib/journal/src/concrete/compacting.rs b/lib/journal/src/concrete/compacting.rs index 1a002e0bc45..3626e0533e5 100644 --- a/lib/journal/src/concrete/compacting.rs +++ b/lib/journal/src/concrete/compacting.rs @@ -485,6 +485,10 @@ impl WritableJournal for CompactingJournalTx { } state.inner_tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.state.lock().unwrap().inner_tx.flush() + } } impl CompactingJournal { @@ -524,6 +528,10 @@ impl WritableJournal for CompactingJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } } impl ReadableJournal for CompactingJournal { diff --git a/lib/journal/src/concrete/compacting_log_file.rs b/lib/journal/src/concrete/compacting_log_file.rs index 7bc33026ce2..9f26a73d97e 100644 --- a/lib/journal/src/concrete/compacting_log_file.rs +++ b/lib/journal/src/concrete/compacting_log_file.rs @@ -237,6 +237,10 @@ impl WritableJournal for CompactingLogFileJournalTx { Ok(res) } + + fn flush(&self) -> anyhow::Result<()> { + self.inner.flush() + } } impl ReadableJournal for CompactingLogFileJournal { @@ -253,6 +257,10 @@ impl WritableJournal for CompactingLogFileJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } } impl Journal for CompactingLogFileJournal { diff --git a/lib/journal/src/concrete/counting.rs b/lib/journal/src/concrete/counting.rs index e7c32c038d1..d78fe7029fb 100644 --- a/lib/journal/src/concrete/counting.rs +++ b/lib/journal/src/concrete/counting.rs @@ -42,6 +42,10 @@ impl WritableJournal for CountingJournal { record_end: offset as u64 + size, }) } + + fn flush(&self) -> anyhow::Result<()> { + Ok(()) + } } impl Journal for CountingJournal { diff --git a/lib/journal/src/concrete/filter.rs b/lib/journal/src/concrete/filter.rs index f0bef520119..fa856cdffd6 100644 --- a/lib/journal/src/concrete/filter.rs +++ b/lib/journal/src/concrete/filter.rs @@ -344,6 +344,10 @@ impl WritableJournal for FilteredJournalTx { }; self.inner.write(evt) } + + fn flush(&self) -> anyhow::Result<()> { + self.inner.flush() + } } impl ReadableJournal for FilteredJournalRx { @@ -362,6 +366,10 @@ impl WritableJournal for FilteredJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } } impl ReadableJournal for FilteredJournal { diff --git a/lib/journal/src/concrete/log_file.rs b/lib/journal/src/concrete/log_file.rs index 864bd0e313e..5c17d500a17 100644 --- a/lib/journal/src/concrete/log_file.rs +++ b/lib/journal/src/concrete/log_file.rs @@ -198,6 +198,12 @@ impl WritableJournal for LogFileJournalTx { record_end: offset_end, }) } + + fn flush(&self) -> anyhow::Result<()> { + let mut state = self.state.lock().unwrap(); + state.file.flush()?; + Ok(()) + } } impl ReadableJournal for LogFileJournalRx { @@ -288,6 +294,10 @@ impl WritableJournal for LogFileJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } } impl ReadableJournal for LogFileJournal { diff --git a/lib/journal/src/concrete/null.rs b/lib/journal/src/concrete/null.rs index f7b34e62abe..8033ade317d 100644 --- a/lib/journal/src/concrete/null.rs +++ b/lib/journal/src/concrete/null.rs @@ -28,6 +28,10 @@ impl WritableJournal for NullJournal { record_end: entry.estimate_size() as u64, }) } + + fn flush(&self) -> anyhow::Result<()> { + Ok(()) + } } impl Journal for NullJournal { diff --git a/lib/journal/src/concrete/pipe.rs b/lib/journal/src/concrete/pipe.rs index 94a8c851c9a..5c3b7bdd45b 100644 --- a/lib/journal/src/concrete/pipe.rs +++ b/lib/journal/src/concrete/pipe.rs @@ -83,6 +83,10 @@ impl WritableJournal for PipeJournalTx { record_end: sender.offset + entry_size, }) } + + fn flush(&self) -> anyhow::Result<()> { + Ok(()) + } } impl ReadableJournal for PipeJournalRx { @@ -108,6 +112,10 @@ impl WritableJournal for PipeJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } } impl ReadableJournal for PipeJournal { diff --git a/lib/journal/src/concrete/printing.rs b/lib/journal/src/concrete/printing.rs index baa29599149..66bd0863191 100644 --- a/lib/journal/src/concrete/printing.rs +++ b/lib/journal/src/concrete/printing.rs @@ -51,6 +51,10 @@ impl WritableJournal for PrintingJournal { record_end: entry.estimate_size() as u64, }) } + + fn flush(&self) -> anyhow::Result<()> { + Ok(()) + } } impl Journal for PrintingJournal { diff --git a/lib/journal/src/concrete/recombined.rs b/lib/journal/src/concrete/recombined.rs index 6a97c9b44e1..93a618c7e35 100644 --- a/lib/journal/src/concrete/recombined.rs +++ b/lib/journal/src/concrete/recombined.rs @@ -15,6 +15,10 @@ impl WritableJournal for RecombinedJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } } impl ReadableJournal for RecombinedJournal { diff --git a/lib/journal/src/concrete/unsupported.rs b/lib/journal/src/concrete/unsupported.rs index 3ac2abd0b34..da011d64c85 100644 --- a/lib/journal/src/concrete/unsupported.rs +++ b/lib/journal/src/concrete/unsupported.rs @@ -22,6 +22,10 @@ impl WritableJournal for UnsupportedJournal { tracing::debug!("journal event: {:?}", entry); Err(anyhow::format_err!("unsupported")) } + + fn flush(&self) -> anyhow::Result<()> { + Ok(()) + } } impl Journal for UnsupportedJournal { diff --git a/lib/journal/src/lib.rs b/lib/journal/src/lib.rs index c7e04c06ea3..06fccd615bb 100644 --- a/lib/journal/src/lib.rs +++ b/lib/journal/src/lib.rs @@ -35,6 +35,9 @@ pub trait WritableJournal { /// Takes in a stream of snapshot log entries and saves them so that they /// may be restored at a later moment fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result; + + /// Flushes the data to disk or network + fn flush(&self) -> anyhow::Result<()>; } /// The results of an operation to read a log entry from the log diff --git a/lib/journal/src/snapshot.rs b/lib/journal/src/snapshot.rs index ce5b7029028..380b810557f 100644 --- a/lib/journal/src/snapshot.rs +++ b/lib/journal/src/snapshot.rs @@ -26,6 +26,8 @@ pub enum SnapshotTrigger { Sigstop, /// When a non-determinstic call is made NonDeterministicCall, + /// Bootstrapping process + Bootstrap, } impl SnapshotTrigger { @@ -61,6 +63,7 @@ impl FromStr for SnapshotTrigger { "sigtstp" | "ctrlz" | "ctrl-z" => Self::Sigtstp, "stop" | "sigstop" => Self::Sigstop, "non-deterministic-call" => Self::NonDeterministicCall, + "bootstrap" => Self::Bootstrap, a => return Err(anyhow::format_err!("invalid or unknown trigger ({a})")), }) } diff --git a/lib/wasix/src/journal/effector/memory_and_snapshot.rs b/lib/wasix/src/journal/effector/memory_and_snapshot.rs index 145f5ce4419..ca7aa11e109 100644 --- a/lib/wasix/src/journal/effector/memory_and_snapshot.rs +++ b/lib/wasix/src/journal/effector/memory_and_snapshot.rs @@ -167,6 +167,10 @@ impl JournalEffector { journal .write(JournalEntry::SnapshotV1 { when, trigger }) .map_err(map_snapshot_err)?; + + // When writing snapshots we also flush the journal so that + // its guaranteed to be on the disk or network pipe + journal.flush().map_err(map_snapshot_err)?; Ok(()) } diff --git a/lib/wasix/src/os/task/process.rs b/lib/wasix/src/os/task/process.rs index 3d56365748f..5d13e9558ab 100644 --- a/lib/wasix/src/os/task/process.rs +++ b/lib/wasix/src/os/task/process.rs @@ -169,6 +169,9 @@ pub struct WasiProcessInner { /// Represents a checkpoint which blocks all the threads /// and then executes some maintenance action pub checkpoint: WasiProcessCheckpoint, + /// If true then the journaling will be disabled after the + /// next snapshot is taken + pub disable_journaling_after_checkpoint: bool, /// List of situations that the process will checkpoint on #[cfg(feature = "journal")] pub snapshot_on: HashSet, @@ -290,6 +293,9 @@ impl WasiProcessInner { // Clear the checkpointing flag and notify everyone to wake up ctx.data().thread.set_checkpointing(false); trace!("checkpoint complete"); + if guard.disable_journaling_after_checkpoint { + ctx.data_mut().enable_journal = false; + } guard.checkpoint = WasiProcessCheckpoint::Execute; for waker in guard.wakers.drain(..) { waker.wake(); @@ -359,6 +365,9 @@ impl WasiProcessInner { // Clear the checkpointing flag and notify everyone to wake up ctx.data().thread.set_checkpointing(false); trace!("checkpoint complete"); + if guard.disable_journaling_after_checkpoint { + ctx.data_mut().enable_journal = false; + } guard.checkpoint = WasiProcessCheckpoint::Execute; for waker in guard.wakers.drain(..) { waker.wake(); @@ -418,6 +427,7 @@ impl WasiProcess { snapshot_on: Default::default(), #[cfg(feature = "journal")] snapshot_memory_hash: Default::default(), + disable_journaling_after_checkpoint: false, backoff: WasiProcessCpuBackoff::new(max_cpu_backoff_time, max_cpu_cool_off_time), }), Condvar::new(), @@ -571,6 +581,107 @@ impl WasiProcess { signal_process_internal(&self.inner, signal); } + /// Takes a snapshot of the process and disables journaling returning + /// a future that can be waited on for the snapshot to complete + /// + /// Note: If you ignore the returned future the checkpoint will still + /// occur but it will execute asynchronously + pub fn snapshot_and_disable_journaling( + &self, + trigger: SnapshotTrigger, + ) -> std::pin::Pin + Send + Sync>> { + let mut guard = self.inner.0.lock().unwrap(); + guard.disable_journaling_after_checkpoint = true; + guard.checkpoint = WasiProcessCheckpoint::Snapshot { trigger }; + self.wait_for_checkpoint_finish() + } + + /// Disables the journaling functionality + pub fn disable_journaling_after_checkpoint(&self) { + let mut guard = self.inner.0.lock().unwrap(); + guard.disable_journaling_after_checkpoint = true; + } + + /// Wait for the checkout process to finish + #[cfg(not(feature = "journal"))] + pub fn wait_for_checkpoint( + &self, + ) -> std::pin::Pin + Send + Sync>> { + Box::pin(std::future::pending()) + } + + /// Wait for the checkout process to finish + #[cfg(feature = "journal")] + pub fn wait_for_checkpoint( + &self, + ) -> std::pin::Pin + Send + Sync>> { + use futures::Future; + use std::{ + pin::Pin, + task::{Context, Poll}, + }; + + struct Poller { + inner: LockableWasiProcessInner, + } + impl Future for Poller { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut guard = self.inner.0.lock().unwrap(); + if !matches!(guard.checkpoint, WasiProcessCheckpoint::Execute) { + return Poll::Ready(()); + } + if !guard.wakers.iter().any(|w| w.will_wake(cx.waker())) { + guard.wakers.push(cx.waker().clone()); + } + Poll::Pending + } + } + Box::pin(Poller { + inner: self.inner.clone(), + }) + } + + /// Wait for the checkout process to finish + #[cfg(not(feature = "journal"))] + pub fn wait_for_checkpoint_finish( + &self, + ) -> std::pin::Pin + Send + Sync>> { + Box::pin(std::future::pending()) + } + + /// Wait for the checkout process to finish + #[cfg(feature = "journal")] + pub fn wait_for_checkpoint_finish( + &self, + ) -> std::pin::Pin + Send + Sync>> { + use futures::Future; + use std::{ + pin::Pin, + task::{Context, Poll}, + }; + + struct Poller { + inner: LockableWasiProcessInner, + } + impl Future for Poller { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut guard = self.inner.0.lock().unwrap(); + if matches!(guard.checkpoint, WasiProcessCheckpoint::Execute) { + return Poll::Ready(()); + } + if !guard.wakers.iter().any(|w| w.will_wake(cx.waker())) { + guard.wakers.push(cx.waker().clone()); + } + Poll::Pending + } + } + Box::pin(Poller { + inner: self.inner.clone(), + }) + } + /// Signals one of the threads every interval pub fn signal_interval(&self, signal: Signal, interval: Option, repeat: bool) { let mut inner = self.inner.0.lock().unwrap(); diff --git a/lib/wasix/src/runners/wasi.rs b/lib/wasix/src/runners/wasi.rs index aebb6dafdb5..72fabdcf268 100644 --- a/lib/wasix/src/runners/wasi.rs +++ b/lib/wasix/src/runners/wasi.rs @@ -262,12 +262,38 @@ impl WasiRunner { ) -> Result<(), Error> { let wasi = webc::metadata::annotations::Wasi::new(program_name); let mut store = runtime.new_store(); - let env = self.prepare_webc_env(program_name, &wasi, None, runtime, None)?; + + let mut builder = self.prepare_webc_env(program_name, &wasi, None, runtime, None)?; + + #[cfg(feature = "ctrlc")] + { + builder = builder.attach_ctrl_c(); + } + + #[cfg(feature = "journal")] + { + for trigger in self.wasi.snapshot_on.iter().cloned() { + builder.add_snapshot_trigger(trigger); + } + if self.wasi.snapshot_on.is_empty() && !self.wasi.journals.is_empty() { + for on in crate::journal::DEFAULT_SNAPSHOT_TRIGGERS { + builder.add_snapshot_trigger(on); + } + } + if let Some(period) = self.wasi.snapshot_interval { + if self.wasi.journals.is_empty() { + return Err(anyhow::format_err!( + "If you specify a snapshot interval then you must also specify a journal file" + )); + } + builder.with_snapshot_interval(period); + } + } if asyncify { - env.run_with_store_async(module.clone(), module_hash, store)?; + builder.run_with_store_async(module.clone(), module_hash, store)?; } else { - env.run_with_store_ext(module.clone(), module_hash, &mut store)?; + builder.run_with_store_ext(module.clone(), module_hash, &mut store)?; } Ok(()) diff --git a/lib/wasix/src/state/builder.rs b/lib/wasix/src/state/builder.rs index f46e6b1b0ee..ad98bc164da 100644 --- a/lib/wasix/src/state/builder.rs +++ b/lib/wasix/src/state/builder.rs @@ -84,6 +84,9 @@ pub struct WasiEnvBuilder { #[cfg(feature = "journal")] pub(super) journals: Vec>, + + #[cfg(feature = "ctrlc")] + pub(super) attach_ctrl_c: bool, } impl std::fmt::Debug for WasiEnvBuilder { @@ -167,6 +170,14 @@ impl WasiEnvBuilder { self } + /// Attaches a ctrl-c handler which will send signals to the + /// process rather than immediately termiante it + #[cfg(feature = "ctrlc")] + pub fn attach_ctrl_c(mut self) -> Self { + self.attach_ctrl_c = true; + self + } + /// Add an environment variable pair. /// /// Both the key and value of an environment variable must not @@ -1032,7 +1043,24 @@ impl WasiEnvBuilder { module_hash: ModuleHash, mut store: Store, ) -> Result<(), WasiRuntimeError> { + #[cfg(feature = "ctrlc")] + let attach_ctrl_c = self.attach_ctrl_c; + let (_, env) = self.instantiate_ext(module, module_hash, &mut store)?; + + // Install the ctrl-c handler + #[cfg(feature = "ctrlc")] + if attach_ctrl_c { + tokio::spawn({ + let process = env.data(&store).process.clone(); + async move { + while tokio::signal::ctrl_c().await.is_ok() { + process.signal_process(wasmer_wasix_types::wasi::Signal::Sigint); + } + } + }); + } + env.run_async(store)?; Ok(()) } diff --git a/lib/wasix/src/state/run.rs b/lib/wasix/src/state/run.rs index f878f4422d3..265b4733131 100644 --- a/lib/wasix/src/state/run.rs +++ b/lib/wasix/src/state/run.rs @@ -49,6 +49,7 @@ impl WasiFunctionEnv { } } }; + run_with_deep_sleep(store, rewind_state, this, tx); }))?; diff --git a/lib/wasix/src/syscalls/journal/wait_for_snapshot.rs b/lib/wasix/src/syscalls/journal/wait_for_snapshot.rs index bead2155f7f..4f6e52da705 100644 --- a/lib/wasix/src/syscalls/journal/wait_for_snapshot.rs +++ b/lib/wasix/src/syscalls/journal/wait_for_snapshot.rs @@ -1,31 +1,5 @@ use super::*; -#[cfg(not(feature = "journal"))] -pub fn wait_for_snapshot(_env: &WasiEnv) -> Pin + Send + Sync>> { - Box::pin(std::future::pending()) -} - -#[cfg(feature = "journal")] pub fn wait_for_snapshot(env: &WasiEnv) -> Pin + Send + Sync>> { - use crate::os::task::process::{LockableWasiProcessInner, WasiProcessCheckpoint}; - - struct Poller { - inner: LockableWasiProcessInner, - } - impl Future for Poller { - type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut guard = self.inner.0.lock().unwrap(); - if !matches!(guard.checkpoint, WasiProcessCheckpoint::Execute) { - return Poll::Ready(()); - } - if !guard.wakers.iter().any(|w| w.will_wake(cx.waker())) { - guard.wakers.push(cx.waker().clone()); - } - Poll::Pending - } - } - Box::pin(Poller { - inner: env.process.inner.clone(), - }) + env.process.wait_for_checkpoint() }