From 905e4050142ec5ec5855e6be86a276b69aae444a Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Fri, 22 Mar 2024 11:29:38 +1100 Subject: [PATCH 1/6] Now properly attaching the ctrl-c handler for WASM processes --- lib/cli/Cargo.toml | 1 + lib/wasix/src/runners/wasi.rs | 4 +++- lib/wasix/src/state/builder.rs | 26 ++++++++++++++++++++++++++ lib/wasix/src/state/run.rs | 1 + 4 files changed, 31 insertions(+), 1 deletion(-) diff --git a/lib/cli/Cargo.toml b/lib/cli/Cargo.toml index f95e046fa02..079cfc9bd91 100644 --- a/lib/cli/Cargo.toml +++ b/lib/cli/Cargo.toml @@ -83,6 +83,7 @@ wasmer-wasix = { version = "0.18.2", path = "../wasix", features = [ "webc_runner_rt_dproxy", "webc_runner_rt_emscripten", "host-fs", + "ctrlc" ] } wasmer-wast = { version = "=4.2.7", path = "../../tests/lib/wast", optional = true } wasmer-types = { version = "=4.2.7", path = "../types", features = [ diff --git a/lib/wasix/src/runners/wasi.rs b/lib/wasix/src/runners/wasi.rs index c36b95cbb65..9e039a76cde 100644 --- a/lib/wasix/src/runners/wasi.rs +++ b/lib/wasix/src/runners/wasi.rs @@ -262,7 +262,9 @@ impl WasiRunner { ) -> Result<(), Error> { let wasi = webc::metadata::annotations::Wasi::new(program_name); let mut store = runtime.new_store(); - let env = self.prepare_webc_env(program_name, &wasi, None, runtime, None)?; + let env = self + .prepare_webc_env(program_name, &wasi, None, runtime, None)? + .attach_ctrl_c(); if asyncify { env.run_with_store_async(module.clone(), module_hash, store)?; diff --git a/lib/wasix/src/state/builder.rs b/lib/wasix/src/state/builder.rs index f46e6b1b0ee..03c8b0dcb55 100644 --- a/lib/wasix/src/state/builder.rs +++ b/lib/wasix/src/state/builder.rs @@ -84,6 +84,9 @@ pub struct WasiEnvBuilder { #[cfg(feature = "journal")] pub(super) journals: Vec>, + + #[cfg(feature = "ctrlc")] + pub(super) attach_ctrl_c: bool, } impl std::fmt::Debug for WasiEnvBuilder { @@ -167,6 +170,13 @@ impl WasiEnvBuilder { self } + /// Attaches a ctrl-c handler which will send signals to the + /// process rather than immediately termiante it + pub fn attach_ctrl_c(mut self) -> Self { + self.attach_ctrl_c = true; + self + } + /// Add an environment variable pair. /// /// Both the key and value of an environment variable must not @@ -1032,7 +1042,23 @@ impl WasiEnvBuilder { module_hash: ModuleHash, mut store: Store, ) -> Result<(), WasiRuntimeError> { + let attach_ctrl_c = self.attach_ctrl_c; + let (_, env) = self.instantiate_ext(module, module_hash, &mut store)?; + + // Install the ctrl-c handler + #[cfg(feature = "ctrlc")] + if attach_ctrl_c { + tokio::spawn({ + let process = env.data(&store).process.clone(); + async move { + while tokio::signal::ctrl_c().await.is_ok() { + process.signal_process(wasmer_wasix_types::wasi::Signal::Sigint); + } + } + }); + } + env.run_async(store)?; Ok(()) } diff --git a/lib/wasix/src/state/run.rs b/lib/wasix/src/state/run.rs index f878f4422d3..265b4733131 100644 --- a/lib/wasix/src/state/run.rs +++ b/lib/wasix/src/state/run.rs @@ -49,6 +49,7 @@ impl WasiFunctionEnv { } } }; + run_with_deep_sleep(store, rewind_state, this, tx); }))?; From a84462a78459d810c5f15406a3d906fe9c480b68 Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Fri, 22 Mar 2024 11:46:44 +1100 Subject: [PATCH 2/6] Fixed an issue with the snapshot triggers not being attached in certain usecases --- lib/wasix/src/runners/wasi.rs | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/lib/wasix/src/runners/wasi.rs b/lib/wasix/src/runners/wasi.rs index 9e039a76cde..f582a3041aa 100644 --- a/lib/wasix/src/runners/wasi.rs +++ b/lib/wasix/src/runners/wasi.rs @@ -262,14 +262,35 @@ impl WasiRunner { ) -> Result<(), Error> { let wasi = webc::metadata::annotations::Wasi::new(program_name); let mut store = runtime.new_store(); - let env = self + + let mut builder = self .prepare_webc_env(program_name, &wasi, None, runtime, None)? .attach_ctrl_c(); + #[cfg(feature = "journal")] + { + for trigger in self.wasi.snapshot_on.iter().cloned() { + builder.add_snapshot_trigger(trigger); + } + if self.wasi.snapshot_on.is_empty() && !self.wasi.journals.is_empty() { + for on in crate::journal::DEFAULT_SNAPSHOT_TRIGGERS { + builder.add_snapshot_trigger(on); + } + } + if let Some(period) = self.wasi.snapshot_interval { + if self.wasi.journals.is_empty() { + return Err(anyhow::format_err!( + "If you specify a snapshot interval then you must also specify a journal file" + )); + } + builder.with_snapshot_interval(period); + } + } + if asyncify { - env.run_with_store_async(module.clone(), module_hash, store)?; + builder.run_with_store_async(module.clone(), module_hash, store)?; } else { - env.run_with_store_ext(module.clone(), module_hash, &mut store)?; + builder.run_with_store_ext(module.clone(), module_hash, &mut store)?; } Ok(()) From a021a9bddb36b998c5df2df0ab92bdc3fbcb6b35 Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Wed, 3 Apr 2024 22:39:33 +1100 Subject: [PATCH 3/6] Fixed a compile issue with teh ctrlc feature --- lib/wasix/src/runners/wasi.rs | 9 ++++++--- lib/wasix/src/state/builder.rs | 2 ++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/wasix/src/runners/wasi.rs b/lib/wasix/src/runners/wasi.rs index f582a3041aa..c72a8e50c37 100644 --- a/lib/wasix/src/runners/wasi.rs +++ b/lib/wasix/src/runners/wasi.rs @@ -263,9 +263,12 @@ impl WasiRunner { let wasi = webc::metadata::annotations::Wasi::new(program_name); let mut store = runtime.new_store(); - let mut builder = self - .prepare_webc_env(program_name, &wasi, None, runtime, None)? - .attach_ctrl_c(); + let mut builder = self.prepare_webc_env(program_name, &wasi, None, runtime, None)?; + + #[cfg(feature = "ctrlc")] + { + builder = builder.attach_ctrl_c(); + } #[cfg(feature = "journal")] { diff --git a/lib/wasix/src/state/builder.rs b/lib/wasix/src/state/builder.rs index 03c8b0dcb55..ad98bc164da 100644 --- a/lib/wasix/src/state/builder.rs +++ b/lib/wasix/src/state/builder.rs @@ -172,6 +172,7 @@ impl WasiEnvBuilder { /// Attaches a ctrl-c handler which will send signals to the /// process rather than immediately termiante it + #[cfg(feature = "ctrlc")] pub fn attach_ctrl_c(mut self) -> Self { self.attach_ctrl_c = true; self @@ -1042,6 +1043,7 @@ impl WasiEnvBuilder { module_hash: ModuleHash, mut store: Store, ) -> Result<(), WasiRuntimeError> { + #[cfg(feature = "ctrlc")] let attach_ctrl_c = self.attach_ctrl_c; let (_, env) = self.instantiate_ext(module, module_hash, &mut store)?; From bdbdbb6b64c720dfa8571a38970314d9f6df9bd3 Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Wed, 3 Apr 2024 23:27:24 +1100 Subject: [PATCH 4/6] Added additional bootstrap methods and enums --- lib/journal/src/concrete/archived.rs | 1 + lib/journal/src/concrete/archived_from.rs | 3 + lib/journal/src/snapshot.rs | 3 + lib/wasix/src/os/task/process.rs | 111 ++++++++++++++++++ .../src/syscalls/journal/wait_for_snapshot.rs | 28 +---- 5 files changed, 119 insertions(+), 27 deletions(-) diff --git a/lib/journal/src/concrete/archived.rs b/lib/journal/src/concrete/archived.rs index 6a800480389..068a728d186 100644 --- a/lib/journal/src/concrete/archived.rs +++ b/lib/journal/src/concrete/archived.rs @@ -1577,6 +1577,7 @@ pub enum JournalSnapshotTriggerV1 { Sigtstp, Sigstop, NonDeterministicCall, + Bootstrap, } #[repr(C)] diff --git a/lib/journal/src/concrete/archived_from.rs b/lib/journal/src/concrete/archived_from.rs index 1e76643a54f..867945dbd73 100644 --- a/lib/journal/src/concrete/archived_from.rs +++ b/lib/journal/src/concrete/archived_from.rs @@ -184,6 +184,7 @@ impl From for JournalSnapshotTriggerV1 { SnapshotTrigger::Sigtstp => JournalSnapshotTriggerV1::Sigtstp, SnapshotTrigger::Sigstop => JournalSnapshotTriggerV1::Sigstop, SnapshotTrigger::NonDeterministicCall => JournalSnapshotTriggerV1::NonDeterministicCall, + SnapshotTrigger::Bootstrap => JournalSnapshotTriggerV1::Bootstrap, } } } @@ -201,6 +202,7 @@ impl From for SnapshotTrigger { JournalSnapshotTriggerV1::Sigtstp => SnapshotTrigger::Sigtstp, JournalSnapshotTriggerV1::Sigstop => SnapshotTrigger::Sigstop, JournalSnapshotTriggerV1::NonDeterministicCall => SnapshotTrigger::NonDeterministicCall, + JournalSnapshotTriggerV1::Bootstrap => SnapshotTrigger::Bootstrap, } } } @@ -220,6 +222,7 @@ impl From<&'_ ArchivedJournalSnapshotTriggerV1> for SnapshotTrigger { ArchivedJournalSnapshotTriggerV1::NonDeterministicCall => { SnapshotTrigger::NonDeterministicCall } + ArchivedJournalSnapshotTriggerV1::Bootstrap => SnapshotTrigger::Bootstrap, } } } diff --git a/lib/journal/src/snapshot.rs b/lib/journal/src/snapshot.rs index ce5b7029028..380b810557f 100644 --- a/lib/journal/src/snapshot.rs +++ b/lib/journal/src/snapshot.rs @@ -26,6 +26,8 @@ pub enum SnapshotTrigger { Sigstop, /// When a non-determinstic call is made NonDeterministicCall, + /// Bootstrapping process + Bootstrap, } impl SnapshotTrigger { @@ -61,6 +63,7 @@ impl FromStr for SnapshotTrigger { "sigtstp" | "ctrlz" | "ctrl-z" => Self::Sigtstp, "stop" | "sigstop" => Self::Sigstop, "non-deterministic-call" => Self::NonDeterministicCall, + "bootstrap" => Self::Bootstrap, a => return Err(anyhow::format_err!("invalid or unknown trigger ({a})")), }) } diff --git a/lib/wasix/src/os/task/process.rs b/lib/wasix/src/os/task/process.rs index 3d56365748f..5d13e9558ab 100644 --- a/lib/wasix/src/os/task/process.rs +++ b/lib/wasix/src/os/task/process.rs @@ -169,6 +169,9 @@ pub struct WasiProcessInner { /// Represents a checkpoint which blocks all the threads /// and then executes some maintenance action pub checkpoint: WasiProcessCheckpoint, + /// If true then the journaling will be disabled after the + /// next snapshot is taken + pub disable_journaling_after_checkpoint: bool, /// List of situations that the process will checkpoint on #[cfg(feature = "journal")] pub snapshot_on: HashSet, @@ -290,6 +293,9 @@ impl WasiProcessInner { // Clear the checkpointing flag and notify everyone to wake up ctx.data().thread.set_checkpointing(false); trace!("checkpoint complete"); + if guard.disable_journaling_after_checkpoint { + ctx.data_mut().enable_journal = false; + } guard.checkpoint = WasiProcessCheckpoint::Execute; for waker in guard.wakers.drain(..) { waker.wake(); @@ -359,6 +365,9 @@ impl WasiProcessInner { // Clear the checkpointing flag and notify everyone to wake up ctx.data().thread.set_checkpointing(false); trace!("checkpoint complete"); + if guard.disable_journaling_after_checkpoint { + ctx.data_mut().enable_journal = false; + } guard.checkpoint = WasiProcessCheckpoint::Execute; for waker in guard.wakers.drain(..) { waker.wake(); @@ -418,6 +427,7 @@ impl WasiProcess { snapshot_on: Default::default(), #[cfg(feature = "journal")] snapshot_memory_hash: Default::default(), + disable_journaling_after_checkpoint: false, backoff: WasiProcessCpuBackoff::new(max_cpu_backoff_time, max_cpu_cool_off_time), }), Condvar::new(), @@ -571,6 +581,107 @@ impl WasiProcess { signal_process_internal(&self.inner, signal); } + /// Takes a snapshot of the process and disables journaling returning + /// a future that can be waited on for the snapshot to complete + /// + /// Note: If you ignore the returned future the checkpoint will still + /// occur but it will execute asynchronously + pub fn snapshot_and_disable_journaling( + &self, + trigger: SnapshotTrigger, + ) -> std::pin::Pin + Send + Sync>> { + let mut guard = self.inner.0.lock().unwrap(); + guard.disable_journaling_after_checkpoint = true; + guard.checkpoint = WasiProcessCheckpoint::Snapshot { trigger }; + self.wait_for_checkpoint_finish() + } + + /// Disables the journaling functionality + pub fn disable_journaling_after_checkpoint(&self) { + let mut guard = self.inner.0.lock().unwrap(); + guard.disable_journaling_after_checkpoint = true; + } + + /// Wait for the checkout process to finish + #[cfg(not(feature = "journal"))] + pub fn wait_for_checkpoint( + &self, + ) -> std::pin::Pin + Send + Sync>> { + Box::pin(std::future::pending()) + } + + /// Wait for the checkout process to finish + #[cfg(feature = "journal")] + pub fn wait_for_checkpoint( + &self, + ) -> std::pin::Pin + Send + Sync>> { + use futures::Future; + use std::{ + pin::Pin, + task::{Context, Poll}, + }; + + struct Poller { + inner: LockableWasiProcessInner, + } + impl Future for Poller { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut guard = self.inner.0.lock().unwrap(); + if !matches!(guard.checkpoint, WasiProcessCheckpoint::Execute) { + return Poll::Ready(()); + } + if !guard.wakers.iter().any(|w| w.will_wake(cx.waker())) { + guard.wakers.push(cx.waker().clone()); + } + Poll::Pending + } + } + Box::pin(Poller { + inner: self.inner.clone(), + }) + } + + /// Wait for the checkout process to finish + #[cfg(not(feature = "journal"))] + pub fn wait_for_checkpoint_finish( + &self, + ) -> std::pin::Pin + Send + Sync>> { + Box::pin(std::future::pending()) + } + + /// Wait for the checkout process to finish + #[cfg(feature = "journal")] + pub fn wait_for_checkpoint_finish( + &self, + ) -> std::pin::Pin + Send + Sync>> { + use futures::Future; + use std::{ + pin::Pin, + task::{Context, Poll}, + }; + + struct Poller { + inner: LockableWasiProcessInner, + } + impl Future for Poller { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut guard = self.inner.0.lock().unwrap(); + if matches!(guard.checkpoint, WasiProcessCheckpoint::Execute) { + return Poll::Ready(()); + } + if !guard.wakers.iter().any(|w| w.will_wake(cx.waker())) { + guard.wakers.push(cx.waker().clone()); + } + Poll::Pending + } + } + Box::pin(Poller { + inner: self.inner.clone(), + }) + } + /// Signals one of the threads every interval pub fn signal_interval(&self, signal: Signal, interval: Option, repeat: bool) { let mut inner = self.inner.0.lock().unwrap(); diff --git a/lib/wasix/src/syscalls/journal/wait_for_snapshot.rs b/lib/wasix/src/syscalls/journal/wait_for_snapshot.rs index bead2155f7f..4f6e52da705 100644 --- a/lib/wasix/src/syscalls/journal/wait_for_snapshot.rs +++ b/lib/wasix/src/syscalls/journal/wait_for_snapshot.rs @@ -1,31 +1,5 @@ use super::*; -#[cfg(not(feature = "journal"))] -pub fn wait_for_snapshot(_env: &WasiEnv) -> Pin + Send + Sync>> { - Box::pin(std::future::pending()) -} - -#[cfg(feature = "journal")] pub fn wait_for_snapshot(env: &WasiEnv) -> Pin + Send + Sync>> { - use crate::os::task::process::{LockableWasiProcessInner, WasiProcessCheckpoint}; - - struct Poller { - inner: LockableWasiProcessInner, - } - impl Future for Poller { - type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut guard = self.inner.0.lock().unwrap(); - if !matches!(guard.checkpoint, WasiProcessCheckpoint::Execute) { - return Poll::Ready(()); - } - if !guard.wakers.iter().any(|w| w.will_wake(cx.waker())) { - guard.wakers.push(cx.waker().clone()); - } - Poll::Pending - } - } - Box::pin(Poller { - inner: env.process.inner.clone(), - }) + env.process.wait_for_checkpoint() } From fad2645855943c962c3f04297b7f4d33e589d8ed Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Thu, 4 Apr 2024 03:03:59 +1100 Subject: [PATCH 5/6] Added automatic flushing of journals upon snapshots --- lib/cli/src/commands/journal/mount/fs.rs | 6 ++++++ lib/journal/src/concrete/arc.rs | 8 ++++++++ lib/journal/src/concrete/boxed.rs | 8 ++++++++ lib/journal/src/concrete/buffered.rs | 8 ++++++++ lib/journal/src/concrete/compacting.rs | 8 ++++++++ lib/journal/src/concrete/compacting_log_file.rs | 8 ++++++++ lib/journal/src/concrete/counting.rs | 4 ++++ lib/journal/src/concrete/filter.rs | 8 ++++++++ lib/journal/src/concrete/log_file.rs | 10 ++++++++++ lib/journal/src/concrete/null.rs | 4 ++++ lib/journal/src/concrete/pipe.rs | 8 ++++++++ lib/journal/src/concrete/printing.rs | 4 ++++ lib/journal/src/concrete/recombined.rs | 4 ++++ lib/journal/src/concrete/unsupported.rs | 4 ++++ lib/journal/src/lib.rs | 3 +++ lib/wasix/src/journal/effector/memory_and_snapshot.rs | 4 ++++ 16 files changed, 99 insertions(+) diff --git a/lib/cli/src/commands/journal/mount/fs.rs b/lib/cli/src/commands/journal/mount/fs.rs index f5baaf894d3..5544eed0f56 100644 --- a/lib/cli/src/commands/journal/mount/fs.rs +++ b/lib/cli/src/commands/journal/mount/fs.rs @@ -338,6 +338,12 @@ impl WritableJournal for MutexState { } Ok(ret) } + + fn flush(&self) -> anyhow::Result<()> { + let mut state = self.inner.lock().unwrap(); + state.mem_fs.flush()?; + Ok(()) + } } impl JournalFileSystem { diff --git a/lib/journal/src/concrete/arc.rs b/lib/journal/src/concrete/arc.rs index 1790777215a..650f2d5fdb2 100644 --- a/lib/journal/src/concrete/arc.rs +++ b/lib/journal/src/concrete/arc.rs @@ -16,6 +16,10 @@ impl WritableJournal for Arc { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.deref().write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.deref().flush() + } } impl ReadableJournal for Arc { @@ -32,6 +36,10 @@ impl WritableJournal for Arc { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.deref().write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.deref().flush() + } } impl Journal for Arc { diff --git a/lib/journal/src/concrete/boxed.rs b/lib/journal/src/concrete/boxed.rs index 9db8cdba87d..ce54a13c019 100644 --- a/lib/journal/src/concrete/boxed.rs +++ b/lib/journal/src/concrete/boxed.rs @@ -16,6 +16,10 @@ impl WritableJournal for Box { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.deref().write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.deref().flush() + } } impl ReadableJournal for Box { @@ -32,4 +36,8 @@ impl WritableJournal for Box { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.deref().write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.deref().flush() + } } diff --git a/lib/journal/src/concrete/buffered.rs b/lib/journal/src/concrete/buffered.rs index c5011c417c0..a985d44da4a 100644 --- a/lib/journal/src/concrete/buffered.rs +++ b/lib/journal/src/concrete/buffered.rs @@ -50,6 +50,10 @@ impl WritableJournal for BufferedJournalTx { record_end: state.offset as u64 + estimate_size as u64, }) } + + fn flush(&self) -> anyhow::Result<()> { + Ok(()) + } } impl ReadableJournal for BufferedJournalRx { @@ -81,6 +85,10 @@ impl WritableJournal for BufferedJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } } impl ReadableJournal for BufferedJournal { diff --git a/lib/journal/src/concrete/compacting.rs b/lib/journal/src/concrete/compacting.rs index 1a002e0bc45..3626e0533e5 100644 --- a/lib/journal/src/concrete/compacting.rs +++ b/lib/journal/src/concrete/compacting.rs @@ -485,6 +485,10 @@ impl WritableJournal for CompactingJournalTx { } state.inner_tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.state.lock().unwrap().inner_tx.flush() + } } impl CompactingJournal { @@ -524,6 +528,10 @@ impl WritableJournal for CompactingJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } } impl ReadableJournal for CompactingJournal { diff --git a/lib/journal/src/concrete/compacting_log_file.rs b/lib/journal/src/concrete/compacting_log_file.rs index 7bc33026ce2..9f26a73d97e 100644 --- a/lib/journal/src/concrete/compacting_log_file.rs +++ b/lib/journal/src/concrete/compacting_log_file.rs @@ -237,6 +237,10 @@ impl WritableJournal for CompactingLogFileJournalTx { Ok(res) } + + fn flush(&self) -> anyhow::Result<()> { + self.inner.flush() + } } impl ReadableJournal for CompactingLogFileJournal { @@ -253,6 +257,10 @@ impl WritableJournal for CompactingLogFileJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } } impl Journal for CompactingLogFileJournal { diff --git a/lib/journal/src/concrete/counting.rs b/lib/journal/src/concrete/counting.rs index e7c32c038d1..d78fe7029fb 100644 --- a/lib/journal/src/concrete/counting.rs +++ b/lib/journal/src/concrete/counting.rs @@ -42,6 +42,10 @@ impl WritableJournal for CountingJournal { record_end: offset as u64 + size, }) } + + fn flush(&self) -> anyhow::Result<()> { + Ok(()) + } } impl Journal for CountingJournal { diff --git a/lib/journal/src/concrete/filter.rs b/lib/journal/src/concrete/filter.rs index f0bef520119..fa856cdffd6 100644 --- a/lib/journal/src/concrete/filter.rs +++ b/lib/journal/src/concrete/filter.rs @@ -344,6 +344,10 @@ impl WritableJournal for FilteredJournalTx { }; self.inner.write(evt) } + + fn flush(&self) -> anyhow::Result<()> { + self.inner.flush() + } } impl ReadableJournal for FilteredJournalRx { @@ -362,6 +366,10 @@ impl WritableJournal for FilteredJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } } impl ReadableJournal for FilteredJournal { diff --git a/lib/journal/src/concrete/log_file.rs b/lib/journal/src/concrete/log_file.rs index 864bd0e313e..5c17d500a17 100644 --- a/lib/journal/src/concrete/log_file.rs +++ b/lib/journal/src/concrete/log_file.rs @@ -198,6 +198,12 @@ impl WritableJournal for LogFileJournalTx { record_end: offset_end, }) } + + fn flush(&self) -> anyhow::Result<()> { + let mut state = self.state.lock().unwrap(); + state.file.flush()?; + Ok(()) + } } impl ReadableJournal for LogFileJournalRx { @@ -288,6 +294,10 @@ impl WritableJournal for LogFileJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } } impl ReadableJournal for LogFileJournal { diff --git a/lib/journal/src/concrete/null.rs b/lib/journal/src/concrete/null.rs index f7b34e62abe..8033ade317d 100644 --- a/lib/journal/src/concrete/null.rs +++ b/lib/journal/src/concrete/null.rs @@ -28,6 +28,10 @@ impl WritableJournal for NullJournal { record_end: entry.estimate_size() as u64, }) } + + fn flush(&self) -> anyhow::Result<()> { + Ok(()) + } } impl Journal for NullJournal { diff --git a/lib/journal/src/concrete/pipe.rs b/lib/journal/src/concrete/pipe.rs index 94a8c851c9a..5c3b7bdd45b 100644 --- a/lib/journal/src/concrete/pipe.rs +++ b/lib/journal/src/concrete/pipe.rs @@ -83,6 +83,10 @@ impl WritableJournal for PipeJournalTx { record_end: sender.offset + entry_size, }) } + + fn flush(&self) -> anyhow::Result<()> { + Ok(()) + } } impl ReadableJournal for PipeJournalRx { @@ -108,6 +112,10 @@ impl WritableJournal for PipeJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } } impl ReadableJournal for PipeJournal { diff --git a/lib/journal/src/concrete/printing.rs b/lib/journal/src/concrete/printing.rs index baa29599149..66bd0863191 100644 --- a/lib/journal/src/concrete/printing.rs +++ b/lib/journal/src/concrete/printing.rs @@ -51,6 +51,10 @@ impl WritableJournal for PrintingJournal { record_end: entry.estimate_size() as u64, }) } + + fn flush(&self) -> anyhow::Result<()> { + Ok(()) + } } impl Journal for PrintingJournal { diff --git a/lib/journal/src/concrete/recombined.rs b/lib/journal/src/concrete/recombined.rs index 6a97c9b44e1..93a618c7e35 100644 --- a/lib/journal/src/concrete/recombined.rs +++ b/lib/journal/src/concrete/recombined.rs @@ -15,6 +15,10 @@ impl WritableJournal for RecombinedJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } } impl ReadableJournal for RecombinedJournal { diff --git a/lib/journal/src/concrete/unsupported.rs b/lib/journal/src/concrete/unsupported.rs index 3ac2abd0b34..da011d64c85 100644 --- a/lib/journal/src/concrete/unsupported.rs +++ b/lib/journal/src/concrete/unsupported.rs @@ -22,6 +22,10 @@ impl WritableJournal for UnsupportedJournal { tracing::debug!("journal event: {:?}", entry); Err(anyhow::format_err!("unsupported")) } + + fn flush(&self) -> anyhow::Result<()> { + Ok(()) + } } impl Journal for UnsupportedJournal { diff --git a/lib/journal/src/lib.rs b/lib/journal/src/lib.rs index c7e04c06ea3..06fccd615bb 100644 --- a/lib/journal/src/lib.rs +++ b/lib/journal/src/lib.rs @@ -35,6 +35,9 @@ pub trait WritableJournal { /// Takes in a stream of snapshot log entries and saves them so that they /// may be restored at a later moment fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result; + + /// Flushes the data to disk or network + fn flush(&self) -> anyhow::Result<()>; } /// The results of an operation to read a log entry from the log diff --git a/lib/wasix/src/journal/effector/memory_and_snapshot.rs b/lib/wasix/src/journal/effector/memory_and_snapshot.rs index 145f5ce4419..ca7aa11e109 100644 --- a/lib/wasix/src/journal/effector/memory_and_snapshot.rs +++ b/lib/wasix/src/journal/effector/memory_and_snapshot.rs @@ -167,6 +167,10 @@ impl JournalEffector { journal .write(JournalEntry::SnapshotV1 { when, trigger }) .map_err(map_snapshot_err)?; + + // When writing snapshots we also flush the journal so that + // its guaranteed to be on the disk or network pipe + journal.flush().map_err(map_snapshot_err)?; Ok(()) } From c4b4d4a379f437690f0868422342f28dd99c96db Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Tue, 23 Apr 2024 08:43:41 +0800 Subject: [PATCH 6/6] Fix for rustls security patches --- Cargo.lock | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f9fe967b90..b96d4154246 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2223,7 +2223,7 @@ dependencies = [ "futures-util", "http", "hyper", - "rustls 0.21.10", + "rustls 0.21.11", "tokio 1.37.0", "tokio-rustls", ] @@ -3823,7 +3823,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.10", + "rustls 0.21.11", "rustls-pemfile", "serde", "serde_json", @@ -4006,9 +4006,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.10" +version = "0.21.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4" dependencies = [ "log 0.4.21", "ring", @@ -4018,9 +4018,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.3" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99008d7ad0bbbea527ec27bddbc0e432c5b87d8175178cee68d2eec9c4a1813c" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" dependencies = [ "log 0.4.21", "ring", @@ -5110,7 +5110,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.10", + "rustls 0.21.11", "tokio 1.37.0", ] @@ -5205,7 +5205,7 @@ checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" dependencies = [ "futures-util", "log 0.4.21", - "rustls 0.21.10", + "rustls 0.21.11", "rustls-native-certs", "tokio 1.37.0", "tokio-rustls", @@ -5532,7 +5532,7 @@ dependencies = [ "httparse", "log 0.4.21", "rand", - "rustls 0.21.10", + "rustls 0.21.11", "sha1", "thiserror", "url", @@ -5545,7 +5545,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 0.1.10", "static_assertions", ] @@ -5670,7 +5670,7 @@ dependencies = [ "flate2", "log 0.4.21", "once_cell", - "rustls 0.22.3", + "rustls 0.22.4", "rustls-pki-types", "rustls-webpki 0.102.2", "url",