Skip to content

Commit

Permalink
Added automatic flushing of journals upon snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
john-sharratt authored and theduke committed Apr 23, 2024
1 parent 2a2b0f7 commit 81754c2
Show file tree
Hide file tree
Showing 16 changed files with 99 additions and 0 deletions.
6 changes: 6 additions & 0 deletions lib/cli/src/commands/journal/mount/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@ impl WritableJournal for MutexState {
}
Ok(ret)
}

fn flush(&self) -> anyhow::Result<()> {
let mut state = self.inner.lock().unwrap();
state.mem_fs.flush()?;
Ok(())
}
}

impl JournalFileSystem {
Expand Down
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ impl WritableJournal for Arc<DynWritableJournal> {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.deref().write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.deref().flush()
}
}

impl ReadableJournal for Arc<DynJournal> {
Expand All @@ -32,6 +36,10 @@ impl WritableJournal for Arc<DynJournal> {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.deref().write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.deref().flush()
}
}

impl Journal for Arc<DynJournal> {
Expand Down
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ impl WritableJournal for Box<DynWritableJournal> {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.deref().write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.deref().flush()
}
}

impl ReadableJournal for Box<DynJournal> {
Expand All @@ -32,4 +36,8 @@ impl WritableJournal for Box<DynJournal> {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.deref().write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.deref().flush()
}
}
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ impl WritableJournal for BufferedJournalTx {
record_end: state.offset as u64 + estimate_size as u64,
})
}

fn flush(&self) -> anyhow::Result<()> {
Ok(())
}
}

impl ReadableJournal for BufferedJournalRx {
Expand Down Expand Up @@ -81,6 +85,10 @@ impl WritableJournal for BufferedJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
}

impl ReadableJournal for BufferedJournal {
Expand Down
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/compacting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,10 @@ impl WritableJournal for CompactingJournalTx {
}
state.inner_tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.state.lock().unwrap().inner_tx.flush()
}
}

impl CompactingJournal {
Expand Down Expand Up @@ -524,6 +528,10 @@ impl WritableJournal for CompactingJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
}

impl ReadableJournal for CompactingJournal {
Expand Down
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/compacting_log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ impl WritableJournal for CompactingLogFileJournalTx {

Ok(res)
}

fn flush(&self) -> anyhow::Result<()> {
self.inner.flush()
}
}

impl ReadableJournal for CompactingLogFileJournal {
Expand All @@ -253,6 +257,10 @@ impl WritableJournal for CompactingLogFileJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
}

impl Journal for CompactingLogFileJournal {
Expand Down
4 changes: 4 additions & 0 deletions lib/journal/src/concrete/counting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ impl WritableJournal for CountingJournal {
record_end: offset as u64 + size,
})
}

fn flush(&self) -> anyhow::Result<()> {
Ok(())
}
}

impl Journal for CountingJournal {
Expand Down
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ impl WritableJournal for FilteredJournalTx {
};
self.inner.write(evt)
}

fn flush(&self) -> anyhow::Result<()> {
self.inner.flush()
}
}

impl ReadableJournal for FilteredJournalRx {
Expand All @@ -362,6 +366,10 @@ impl WritableJournal for FilteredJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
}

impl ReadableJournal for FilteredJournal {
Expand Down
10 changes: 10 additions & 0 deletions lib/journal/src/concrete/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ impl WritableJournal for LogFileJournalTx {
record_end: offset_end,
})
}

fn flush(&self) -> anyhow::Result<()> {
let mut state = self.state.lock().unwrap();
state.file.flush()?;
Ok(())
}
}

impl ReadableJournal for LogFileJournalRx {
Expand Down Expand Up @@ -288,6 +294,10 @@ impl WritableJournal for LogFileJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
}

impl ReadableJournal for LogFileJournal {
Expand Down
4 changes: 4 additions & 0 deletions lib/journal/src/concrete/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ impl WritableJournal for NullJournal {
record_end: entry.estimate_size() as u64,
})
}

fn flush(&self) -> anyhow::Result<()> {
Ok(())
}
}

impl Journal for NullJournal {
Expand Down
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ impl WritableJournal for PipeJournalTx {
record_end: sender.offset + entry_size,
})
}

fn flush(&self) -> anyhow::Result<()> {
Ok(())
}
}

impl ReadableJournal for PipeJournalRx {
Expand All @@ -108,6 +112,10 @@ impl WritableJournal for PipeJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
}

impl ReadableJournal for PipeJournal {
Expand Down
4 changes: 4 additions & 0 deletions lib/journal/src/concrete/printing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ impl WritableJournal for PrintingJournal {
record_end: entry.estimate_size() as u64,
})
}

fn flush(&self) -> anyhow::Result<()> {
Ok(())
}
}

impl Journal for PrintingJournal {
Expand Down
4 changes: 4 additions & 0 deletions lib/journal/src/concrete/recombined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ impl WritableJournal for RecombinedJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}
}

impl ReadableJournal for RecombinedJournal {
Expand Down
4 changes: 4 additions & 0 deletions lib/journal/src/concrete/unsupported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ impl WritableJournal for UnsupportedJournal {
tracing::debug!("journal event: {:?}", entry);
Err(anyhow::format_err!("unsupported"))
}

fn flush(&self) -> anyhow::Result<()> {
Ok(())
}
}

impl Journal for UnsupportedJournal {
Expand Down
3 changes: 3 additions & 0 deletions lib/journal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub trait WritableJournal {
/// Takes in a stream of snapshot log entries and saves them so that they
/// may be restored at a later moment
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult>;

/// Flushes the data to disk or network
fn flush(&self) -> anyhow::Result<()>;
}

/// The results of an operation to read a log entry from the log
Expand Down
4 changes: 4 additions & 0 deletions lib/wasix/src/journal/effector/memory_and_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ impl JournalEffector {
journal
.write(JournalEntry::SnapshotV1 { when, trigger })
.map_err(map_snapshot_err)?;

// When writing snapshots we also flush the journal so that
// its guaranteed to be on the disk or network pipe
journal.flush().map_err(map_snapshot_err)?;
Ok(())
}

Expand Down

0 comments on commit 81754c2

Please sign in to comment.