Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for snapshots in certain use cases #4514

Merged
merged 11 commits into from
Apr 23, 2024
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ wasmer-wasix = { version = "0.18.3", path = "../wasix", features = [
"webc_runner_rt_dproxy",
"webc_runner_rt_emscripten",
"host-fs",
"ctrlc"
] }
wasmer-wast = { version = "=4.2.8", path = "../../tests/lib/wast", optional = true }
wasmer-types = { version = "=4.2.8", path = "../types", features = [
Expand Down
6 changes: 6 additions & 0 deletions lib/cli/src/commands/journal/mount/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@ impl WritableJournal for MutexState {
}
Ok(ret)
}

fn flush(&self) -> anyhow::Result<()> {
let mut state = self.inner.lock().unwrap();
state.mem_fs.flush()?;
Ok(())
}
}

impl JournalFileSystem {
Expand Down
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ impl WritableJournal for Arc<DynWritableJournal> {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.deref().write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.deref().flush()
}
}

impl ReadableJournal for Arc<DynJournal> {
Expand All @@ -32,6 +36,10 @@ impl WritableJournal for Arc<DynJournal> {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.deref().write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.deref().flush()
}
}

impl Journal for Arc<DynJournal> {
Expand Down
1 change: 1 addition & 0 deletions lib/journal/src/concrete/archived.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,7 @@ pub enum JournalSnapshotTriggerV1 {
Sigtstp,
Sigstop,
NonDeterministicCall,
Bootstrap,
}

#[repr(C)]
Expand Down
3 changes: 3 additions & 0 deletions lib/journal/src/concrete/archived_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ impl From<SnapshotTrigger> for JournalSnapshotTriggerV1 {
SnapshotTrigger::Sigtstp => JournalSnapshotTriggerV1::Sigtstp,
SnapshotTrigger::Sigstop => JournalSnapshotTriggerV1::Sigstop,
SnapshotTrigger::NonDeterministicCall => JournalSnapshotTriggerV1::NonDeterministicCall,
SnapshotTrigger::Bootstrap => JournalSnapshotTriggerV1::Bootstrap,
}
}
}
Expand All @@ -201,6 +202,7 @@ impl From<JournalSnapshotTriggerV1> for SnapshotTrigger {
JournalSnapshotTriggerV1::Sigtstp => SnapshotTrigger::Sigtstp,
JournalSnapshotTriggerV1::Sigstop => SnapshotTrigger::Sigstop,
JournalSnapshotTriggerV1::NonDeterministicCall => SnapshotTrigger::NonDeterministicCall,
JournalSnapshotTriggerV1::Bootstrap => SnapshotTrigger::Bootstrap,
}
}
}
Expand All @@ -220,6 +222,7 @@ impl From<&'_ ArchivedJournalSnapshotTriggerV1> for SnapshotTrigger {
ArchivedJournalSnapshotTriggerV1::NonDeterministicCall => {
SnapshotTrigger::NonDeterministicCall
}
ArchivedJournalSnapshotTriggerV1::Bootstrap => SnapshotTrigger::Bootstrap,
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ impl WritableJournal for Box<DynWritableJournal> {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.deref().write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.deref().flush()
}
}

impl ReadableJournal for Box<DynJournal> {
Expand All @@ -32,4 +36,8 @@ impl WritableJournal for Box<DynJournal> {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.deref().write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.deref().flush()
}
}
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ impl WritableJournal for BufferedJournalTx {
record_end: state.offset as u64 + estimate_size as u64,
})
}

fn flush(&self) -> anyhow::Result<()> {
Ok(())
}
}

impl ReadableJournal for BufferedJournalRx {
Expand Down Expand Up @@ -81,6 +85,10 @@ impl WritableJournal for BufferedJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
}

impl ReadableJournal for BufferedJournal {
Expand Down
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/compacting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,10 @@ impl WritableJournal for CompactingJournalTx {
}
state.inner_tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.state.lock().unwrap().inner_tx.flush()
}
}

impl CompactingJournal {
Expand Down Expand Up @@ -524,6 +528,10 @@ impl WritableJournal for CompactingJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
}

impl ReadableJournal for CompactingJournal {
Expand Down
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/compacting_log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ impl WritableJournal for CompactingLogFileJournalTx {

Ok(res)
}

fn flush(&self) -> anyhow::Result<()> {
self.inner.flush()
}
}

impl ReadableJournal for CompactingLogFileJournal {
Expand All @@ -253,6 +257,10 @@ impl WritableJournal for CompactingLogFileJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
}

impl Journal for CompactingLogFileJournal {
Expand Down
4 changes: 4 additions & 0 deletions lib/journal/src/concrete/counting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ impl WritableJournal for CountingJournal {
record_end: offset as u64 + size,
})
}

fn flush(&self) -> anyhow::Result<()> {
Ok(())
}
}

impl Journal for CountingJournal {
Expand Down
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ impl WritableJournal for FilteredJournalTx {
};
self.inner.write(evt)
}

fn flush(&self) -> anyhow::Result<()> {
self.inner.flush()
}
}

impl ReadableJournal for FilteredJournalRx {
Expand All @@ -362,6 +366,10 @@ impl WritableJournal for FilteredJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
}

impl ReadableJournal for FilteredJournal {
Expand Down
10 changes: 10 additions & 0 deletions lib/journal/src/concrete/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ impl WritableJournal for LogFileJournalTx {
record_end: offset_end,
})
}

fn flush(&self) -> anyhow::Result<()> {
let mut state = self.state.lock().unwrap();
state.file.flush()?;
Ok(())
}
}

impl ReadableJournal for LogFileJournalRx {
Expand Down Expand Up @@ -288,6 +294,10 @@ impl WritableJournal for LogFileJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
}

impl ReadableJournal for LogFileJournal {
Expand Down
4 changes: 4 additions & 0 deletions lib/journal/src/concrete/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ impl WritableJournal for NullJournal {
record_end: entry.estimate_size() as u64,
})
}

fn flush(&self) -> anyhow::Result<()> {
Ok(())
}
}

impl Journal for NullJournal {
Expand Down
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ impl WritableJournal for PipeJournalTx {
record_end: sender.offset + entry_size,
})
}

fn flush(&self) -> anyhow::Result<()> {
Ok(())
}
}

impl ReadableJournal for PipeJournalRx {
Expand All @@ -108,6 +112,10 @@ impl WritableJournal for PipeJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
}

impl ReadableJournal for PipeJournal {
Expand Down
4 changes: 4 additions & 0 deletions lib/journal/src/concrete/printing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ impl WritableJournal for PrintingJournal {
record_end: entry.estimate_size() as u64,
})
}

fn flush(&self) -> anyhow::Result<()> {
Ok(())
}
}

impl Journal for PrintingJournal {
Expand Down
4 changes: 4 additions & 0 deletions lib/journal/src/concrete/recombined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ impl WritableJournal for RecombinedJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
}

impl ReadableJournal for RecombinedJournal {
Expand Down
4 changes: 4 additions & 0 deletions lib/journal/src/concrete/unsupported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ impl WritableJournal for UnsupportedJournal {
tracing::debug!("journal event: {:?}", entry);
Err(anyhow::format_err!("unsupported"))
}

fn flush(&self) -> anyhow::Result<()> {
Ok(())
}
}

impl Journal for UnsupportedJournal {
Expand Down
3 changes: 3 additions & 0 deletions lib/journal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub trait WritableJournal {
/// Takes in a stream of snapshot log entries and saves them so that they
/// may be restored at a later moment
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult>;

/// Flushes the data to disk or network
fn flush(&self) -> anyhow::Result<()>;
}

/// The results of an operation to read a log entry from the log
Expand Down
3 changes: 3 additions & 0 deletions lib/journal/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub enum SnapshotTrigger {
Sigstop,
/// When a non-determinstic call is made
NonDeterministicCall,
/// Bootstrapping process
Bootstrap,
}

impl SnapshotTrigger {
Expand Down Expand Up @@ -61,6 +63,7 @@ impl FromStr for SnapshotTrigger {
"sigtstp" | "ctrlz" | "ctrl-z" => Self::Sigtstp,
"stop" | "sigstop" => Self::Sigstop,
"non-deterministic-call" => Self::NonDeterministicCall,
"bootstrap" => Self::Bootstrap,
a => return Err(anyhow::format_err!("invalid or unknown trigger ({a})")),
})
}
Expand Down
Loading
Loading