From 003f937f0a28881991f71119c744f828acd68ba7 Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Sun, 11 Aug 2024 17:01:13 +1000 Subject: [PATCH 1/6] Added a transaction and auto_consistency journal --- lib/journal/src/concrete/auto_consistent.rs | 171 ++++++++++++++++++++ lib/journal/src/concrete/mod.rs | 4 + lib/journal/src/concrete/transaction.rs | 151 +++++++++++++++++ lib/journal/src/lib.rs | 10 ++ 4 files changed, 336 insertions(+) create mode 100644 lib/journal/src/concrete/auto_consistent.rs create mode 100644 lib/journal/src/concrete/transaction.rs diff --git a/lib/journal/src/concrete/auto_consistent.rs b/lib/journal/src/concrete/auto_consistent.rs new file mode 100644 index 00000000000..c2e0e05a218 --- /dev/null +++ b/lib/journal/src/concrete/auto_consistent.rs @@ -0,0 +1,171 @@ +use std::{ + collections::HashSet, + sync::{Arc, Mutex}, +}; + +use derivative::Derivative; + +use super::*; + +/// Journal which leave itself in a consistent state once it commits +/// by closing all the file descriptors that were opened while +/// it was recording writes. +#[derive(Debug)] +pub struct AutoConsistentJournal { + tx: AutoConsistentJournalTx, + rx: AutoConsistentJournalRx, +} + +#[derive(Debug, Default, Clone)] +struct State { + open_files: HashSet, +} + +#[derive(Derivative)] +#[derivative(Debug)] +pub struct AutoConsistentJournalTx { + state: Arc>, + #[derivative(Debug = "ignore")] + inner: Box, +} + +#[derive(Derivative)] +#[derivative(Debug)] +pub struct AutoConsistentJournalRx { + state: Arc>, + #[derivative(Debug = "ignore")] + inner: Box, +} + +impl AutoConsistentJournal { + /// Creates a journal which will automatically correct inconsistencies when + /// it commits. E.g. it will close any open file descriptors that were left + /// open as it was processing events. + pub fn new(inner: J) -> Self + where + J: Journal, + { + let state = Arc::new(Mutex::new(State::default())); + let (tx, rx) = inner.split(); + Self { + tx: AutoConsistentJournalTx { + inner: tx, + state: state.clone(), + }, + rx: AutoConsistentJournalRx { + inner: rx, + state: state.clone(), + }, + } + } + + pub fn into_inner(self) -> RecombinedJournal { + RecombinedJournal::new(self.tx.inner, self.rx.inner) + } +} + +impl WritableJournal for AutoConsistentJournalTx { + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + match &entry { + JournalEntry::OpenFileDescriptorV1 { fd, .. } + | JournalEntry::SocketAcceptedV1 { fd, .. } => { + let mut state = self.state.lock().unwrap(); + state.open_files.insert(*fd); + } + JournalEntry::RenumberFileDescriptorV1 { old_fd, new_fd } => { + let mut state = self.state.lock().unwrap(); + if state.open_files.remove(old_fd) { + state.open_files.insert(*new_fd); + } + } + JournalEntry::DuplicateFileDescriptorV1 { + original_fd, + copied_fd, + } => { + let mut state = self.state.lock().unwrap(); + if state.open_files.contains(original_fd) { + state.open_files.insert(*copied_fd); + } + } + JournalEntry::CloseFileDescriptorV1 { fd } => { + let mut state = self.state.lock().unwrap(); + state.open_files.remove(fd); + } + JournalEntry::InitModuleV1 { .. } + | JournalEntry::ClearEtherealV1 { .. } + | JournalEntry::ProcessExitV1 { .. } => { + let mut state = self.state.lock().unwrap(); + state.open_files.clear(); + } + _ => {} + } + self.inner.write(entry) + } + + fn flush(&self) -> anyhow::Result<()> { + self.inner.flush() + } + + /// Commits the transaction + fn commit(&self) -> anyhow::Result<()> { + let open_files = { + let mut state = self.state.lock().unwrap(); + let mut open_files = Default::default(); + std::mem::swap(&mut open_files, &mut state.open_files); + open_files + }; + for fd in open_files { + let entry = JournalEntry::CloseFileDescriptorV1 { fd }; + self.inner.write(entry)?; + } + self.inner.commit() + } + + /// Rolls back the transaction and aborts its changes + fn rollback(&self) -> anyhow::Result<()> { + { + let mut state = self.state.lock().unwrap(); + state.open_files.clear(); + } + self.inner.rollback() + } +} + +impl ReadableJournal for AutoConsistentJournalRx { + fn read(&self) -> anyhow::Result>> { + self.inner.read() + } + + fn as_restarted(&self) -> anyhow::Result> { + Ok(Box::new(AutoConsistentJournalRx { + inner: self.inner.as_restarted()?, + state: Arc::new(Mutex::new(State::default())), + })) + } +} + +impl WritableJournal for AutoConsistentJournal { + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + self.tx.write(entry) + } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } +} + +impl ReadableJournal for AutoConsistentJournal { + fn read(&self) -> anyhow::Result>> { + self.rx.read() + } + + fn as_restarted(&self) -> anyhow::Result> { + self.rx.as_restarted() + } +} + +impl Journal for AutoConsistentJournal { + fn split(self) -> (Box, Box) { + (Box::new(self.tx), Box::new(self.rx)) + } +} diff --git a/lib/journal/src/concrete/mod.rs b/lib/journal/src/concrete/mod.rs index a7a8dfc38b5..fd49c34ec06 100644 --- a/lib/journal/src/concrete/mod.rs +++ b/lib/journal/src/concrete/mod.rs @@ -3,6 +3,7 @@ mod aligned_cow_vec; mod arc; mod archived; mod archived_from; +mod auto_consistent; mod boxed; mod buffered; mod compacting; @@ -19,6 +20,7 @@ mod printing; mod recombined; #[cfg(test)] mod tests; +mod transaction; mod unsupported; pub(super) use super::*; @@ -26,6 +28,7 @@ pub(super) use super::*; pub use aligned_cow_str::*; pub use aligned_cow_vec::*; pub use archived::*; +pub use auto_consistent::*; pub use buffered::*; pub use compacting::*; #[cfg(feature = "log-file")] @@ -39,4 +42,5 @@ pub use null::*; pub use pipe::*; pub use printing::*; pub use recombined::*; +pub use transaction::*; pub use unsupported::*; diff --git a/lib/journal/src/concrete/transaction.rs b/lib/journal/src/concrete/transaction.rs new file mode 100644 index 00000000000..eb65b3bc999 --- /dev/null +++ b/lib/journal/src/concrete/transaction.rs @@ -0,0 +1,151 @@ +use std::sync::{Arc, Mutex}; + +use derivative::Derivative; + +use super::*; + +/// Journal which will store the events locally in memory until it +/// is either committed or rolled back +#[derive(Debug)] +pub struct TransactionJournal { + tx: TransactionJournalTx, + rx: TransactionJournalRx, +} + +#[derive(Debug, Default, Clone)] +struct State { + records: Vec>, + offset: u64, +} + +#[derive(Derivative)] +#[derivative(Debug)] +pub struct TransactionJournalTx { + state: Arc>, + #[derivative(Debug = "ignore")] + inner: Box, +} + +#[derive(Derivative)] +#[derivative(Debug)] +pub struct TransactionJournalRx { + state: Arc>, + #[derivative(Debug = "ignore")] + inner: Box, +} + +impl TransactionJournal { + /// Creates a transactional journal which will hold events in memory + /// until the journal is either committed or rolled back + pub fn new(inner: J) -> Self + where + J: Journal, + { + let state = Arc::new(Mutex::new(State::default())); + let (tx, rx) = inner.split(); + Self { + tx: TransactionJournalTx { + inner: tx, + state: state.clone(), + }, + rx: TransactionJournalRx { + inner: rx, + state: state.clone(), + }, + } + } + + pub fn into_inner(self) -> RecombinedJournal { + RecombinedJournal::new(self.tx.inner, self.rx.inner) + } +} + +impl WritableJournal for TransactionJournalTx { + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + let entry = entry.into_owned(); + let mut state = self.state.lock().unwrap(); + let estimate_size = entry.estimate_size(); + state.records.push(entry); + Ok(LogWriteResult { + record_start: state.offset as u64, + record_end: state.offset as u64 + estimate_size as u64, + }) + } + + fn flush(&self) -> anyhow::Result<()> { + self.inner.flush() + } + + /// Commits the transaction + fn commit(&self) -> anyhow::Result<()> { + let (records, mut new_offset) = { + let mut state = self.state.lock().unwrap(); + let mut records = Default::default(); + std::mem::swap(&mut records, &mut state.records); + (records, state.offset) + }; + + for entry in records { + let ret = self.inner.write(entry)?; + new_offset = new_offset.max(ret.record_end); + } + { + let mut state = self.state.lock().unwrap(); + state.offset = state.offset.max(new_offset); + } + self.inner.commit() + } + + /// Rolls back the transaction and aborts its changes + fn rollback(&self) -> anyhow::Result<()> { + { + let mut state = self.state.lock().unwrap(); + state.records.clear(); + } + self.inner.rollback() + } +} + +impl ReadableJournal for TransactionJournalRx { + fn read(&self) -> anyhow::Result>> { + let ret = self.inner.read()?; + if let Some(res) = ret.as_ref() { + let mut state = self.state.lock().unwrap(); + state.offset = state.offset.max(res.record_end); + } + Ok(ret) + } + + fn as_restarted(&self) -> anyhow::Result> { + Ok(Box::new(TransactionJournalRx { + inner: self.inner.as_restarted()?, + state: Arc::new(Mutex::new(State::default())), + })) + } +} + +impl WritableJournal for TransactionJournal { + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + self.tx.write(entry) + } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } +} + +impl ReadableJournal for TransactionJournal { + fn read(&self) -> anyhow::Result>> { + self.rx.read() + } + + fn as_restarted(&self) -> anyhow::Result> { + self.rx.as_restarted() + } +} + +impl Journal for TransactionJournal { + fn split(self) -> (Box, Box) { + (Box::new(self.tx), Box::new(self.rx)) + } +} diff --git a/lib/journal/src/lib.rs b/lib/journal/src/lib.rs index 06fccd615bb..400db5ffebc 100644 --- a/lib/journal/src/lib.rs +++ b/lib/journal/src/lib.rs @@ -38,6 +38,16 @@ pub trait WritableJournal { /// Flushes the data to disk or network fn flush(&self) -> anyhow::Result<()>; + + /// Commits the transaction + fn commit(&self) -> anyhow::Result<()> { + self.flush() + } + + /// Rolls back the transaction and aborts its changes + fn rollback(&self) -> anyhow::Result<()> { + Ok(()) + } } /// The results of an operation to read a log entry from the log From 36606657702bcc6784c703ae3e736cbee7e6c41e Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Sun, 11 Aug 2024 19:16:38 +1000 Subject: [PATCH 2/6] Added a compacting transaction journal --- lib/journal/src/concrete/auto_consistent.rs | 12 +- lib/journal/src/concrete/compacting.rs | 9 + .../src/concrete/compacting_transaction.rs | 161 ++++++++++++++++++ lib/journal/src/concrete/mod.rs | 2 + lib/journal/src/concrete/transaction.rs | 29 ++-- lib/journal/src/lib.rs | 10 +- 6 files changed, 204 insertions(+), 19 deletions(-) create mode 100644 lib/journal/src/concrete/compacting_transaction.rs diff --git a/lib/journal/src/concrete/auto_consistent.rs b/lib/journal/src/concrete/auto_consistent.rs index c2e0e05a218..2f45e7d3a6c 100644 --- a/lib/journal/src/concrete/auto_consistent.rs +++ b/lib/journal/src/concrete/auto_consistent.rs @@ -68,10 +68,16 @@ impl WritableJournal for AutoConsistentJournalTx { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { match &entry { JournalEntry::OpenFileDescriptorV1 { fd, .. } - | JournalEntry::SocketAcceptedV1 { fd, .. } => { + | JournalEntry::SocketAcceptedV1 { fd, .. } + | JournalEntry::CreateEventV1 { fd, .. } => { let mut state = self.state.lock().unwrap(); state.open_files.insert(*fd); } + JournalEntry::CreatePipeV1 { fd1, fd2 } => { + let mut state = self.state.lock().unwrap(); + state.open_files.insert(*fd1); + state.open_files.insert(*fd2); + } JournalEntry::RenumberFileDescriptorV1 { old_fd, new_fd } => { let mut state = self.state.lock().unwrap(); if state.open_files.remove(old_fd) { @@ -107,7 +113,7 @@ impl WritableJournal for AutoConsistentJournalTx { } /// Commits the transaction - fn commit(&self) -> anyhow::Result<()> { + fn commit(&self) -> anyhow::Result { let open_files = { let mut state = self.state.lock().unwrap(); let mut open_files = Default::default(); @@ -122,7 +128,7 @@ impl WritableJournal for AutoConsistentJournalTx { } /// Rolls back the transaction and aborts its changes - fn rollback(&self) -> anyhow::Result<()> { + fn rollback(&self) -> anyhow::Result { { let mut state = self.state.lock().unwrap(); state.open_files.clear(); diff --git a/lib/journal/src/concrete/compacting.rs b/lib/journal/src/concrete/compacting.rs index 4c858841e43..6ab380387be 100644 --- a/lib/journal/src/concrete/compacting.rs +++ b/lib/journal/src/concrete/compacting.rs @@ -254,6 +254,15 @@ impl CompactingJournal { rx: CompactingJournalRx { inner: rx }, }) } + + /// Creates a filter jounral which will write all + /// its events to an inner journal + pub fn create_filter(&self, inner: J) -> FilteredJournal + where + J: Journal, + { + self.tx.create_filter(inner) + } } /// Represents the results of a compaction operation diff --git a/lib/journal/src/concrete/compacting_transaction.rs b/lib/journal/src/concrete/compacting_transaction.rs new file mode 100644 index 00000000000..ebfde2f97cf --- /dev/null +++ b/lib/journal/src/concrete/compacting_transaction.rs @@ -0,0 +1,161 @@ +use super::*; + +#[derive(Debug)] +pub struct CompactingTransactionJournalTx { + inner: TransactionJournalTx, +} + +#[derive(Debug)] +pub struct CompactingTransactionJournalRx { + inner: TransactionJournalRx, +} + +/// Journal which will store the events locally in memory until it +/// is either committed or rolled back +#[derive(Debug)] +pub struct CompactingTransactionJournal { + tx: CompactingTransactionJournalTx, + rx: CompactingTransactionJournalRx, +} + +impl CompactingTransactionJournal { + /// Creates a compacting transactional journal which will hold events in + /// memory until the journal is either committed or rolled back. + /// + /// When the journal is commited it will perform a compaction of the events + /// before they are misseeed to the underlying journal + pub fn new(inner: J) -> Self + where + J: Journal, + { + let inner = TransactionJournal::new(inner); + Self { + rx: CompactingTransactionJournalRx { inner: inner.rx }, + tx: CompactingTransactionJournalTx { inner: inner.tx }, + } + } + + pub fn into_inner(self) -> TransactionJournal { + TransactionJournal { + rx: self.rx.inner, + tx: self.tx.inner, + } + } +} + +impl WritableJournal for CompactingTransactionJournalTx { + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + self.inner.write(entry) + } + + fn flush(&self) -> anyhow::Result<()> { + self.inner.flush() + } + + fn commit(&self) -> anyhow::Result { + // We read all the events that have been buffered + let (records, mut new_offset) = { + let mut state = self.inner.state.lock().unwrap(); + let mut records = Default::default(); + std::mem::swap(&mut records, &mut state.records); + (records, state.offset) + }; + if records.is_empty() { + return Ok(0); + } + + // We prepare a compacting journal which does nothing + // with the events other than learn from them + let compacting = CompactingJournal::new(NullJournal::default())?; + for record in records.iter() { + compacting.write(record.clone())?; + } + + // Next we create an inline journal that is used for streaming the + // events the journal this is under this super journal + struct RelayJournal<'a> { + inner: &'a CompactingTransactionJournalTx, + } + impl WritableJournal for RelayJournal<'_> { + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + self.inner.write(entry) + } + fn flush(&self) -> anyhow::Result<()> { + Ok(()) + } + } + impl ReadableJournal for RelayJournal<'_> { + fn read(&self) -> anyhow::Result>> { + Ok(None) + } + fn as_restarted(&self) -> anyhow::Result> { + NullJournal::default().split().1.as_restarted() + } + } + impl Journal for RelayJournal<'_> { + fn split(self) -> (Box, Box) { + NullJournal::default().split() + } + } + let relay_journal = RelayJournal { inner: self }; + + // Now we create a filter journal which will filter out the events + // that are not needed and stream them down + let mut ret = 0; + let filter = compacting.create_filter(relay_journal); + for entry in records { + let res = filter.write(entry)?; + if res.record_start == 0 && res.record_end == 0 { + continue; + } + ret += 1; + new_offset = new_offset.max(res.record_end); + } + { + let mut state = self.inner.state.lock().unwrap(); + state.offset = state.offset.max(new_offset); + } + ret += self.inner.commit()?; + Ok(ret) + } + + fn rollback(&self) -> anyhow::Result { + self.inner.rollback() + } +} + +impl ReadableJournal for CompactingTransactionJournalRx { + fn read(&self) -> anyhow::Result>> { + self.inner.read() + } + + fn as_restarted(&self) -> anyhow::Result> { + self.inner.as_restarted() + } +} + +impl WritableJournal for CompactingTransactionJournal { + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + self.tx.write(entry) + } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } +} + +impl ReadableJournal for CompactingTransactionJournal { + fn read(&self) -> anyhow::Result>> { + self.rx.read() + } + + fn as_restarted(&self) -> anyhow::Result> { + self.rx.as_restarted() + } +} + +impl Journal for CompactingTransactionJournal { + fn split(self) -> (Box, Box) { + (Box::new(self.tx), Box::new(self.rx)) + } +} diff --git a/lib/journal/src/concrete/mod.rs b/lib/journal/src/concrete/mod.rs index fd49c34ec06..f3dcb57033c 100644 --- a/lib/journal/src/concrete/mod.rs +++ b/lib/journal/src/concrete/mod.rs @@ -9,6 +9,7 @@ mod buffered; mod compacting; #[cfg(feature = "log-file")] mod compacting_log_file; +mod compacting_transaction; mod counting; mod filter; #[cfg(feature = "log-file")] @@ -33,6 +34,7 @@ pub use buffered::*; pub use compacting::*; #[cfg(feature = "log-file")] pub use compacting_log_file::*; +pub use compacting_transaction::*; pub use counting::*; pub use filter::*; #[cfg(feature = "log-file")] diff --git a/lib/journal/src/concrete/transaction.rs b/lib/journal/src/concrete/transaction.rs index eb65b3bc999..ff1405d02d4 100644 --- a/lib/journal/src/concrete/transaction.rs +++ b/lib/journal/src/concrete/transaction.rs @@ -8,20 +8,20 @@ use super::*; /// is either committed or rolled back #[derive(Debug)] pub struct TransactionJournal { - tx: TransactionJournalTx, - rx: TransactionJournalRx, + pub(super) tx: TransactionJournalTx, + pub(super) rx: TransactionJournalRx, } #[derive(Debug, Default, Clone)] -struct State { - records: Vec>, - offset: u64, +pub(super) struct State { + pub(super) records: Vec>, + pub(super) offset: u64, } #[derive(Derivative)] #[derivative(Debug)] pub struct TransactionJournalTx { - state: Arc>, + pub(super) state: Arc>, #[derivative(Debug = "ignore")] inner: Box, } @@ -77,7 +77,7 @@ impl WritableJournal for TransactionJournalTx { } /// Commits the transaction - fn commit(&self) -> anyhow::Result<()> { + fn commit(&self) -> anyhow::Result { let (records, mut new_offset) = { let mut state = self.state.lock().unwrap(); let mut records = Default::default(); @@ -85,6 +85,7 @@ impl WritableJournal for TransactionJournalTx { (records, state.offset) }; + let mut ret = records.len(); for entry in records { let ret = self.inner.write(entry)?; new_offset = new_offset.max(ret.record_end); @@ -93,16 +94,20 @@ impl WritableJournal for TransactionJournalTx { let mut state = self.state.lock().unwrap(); state.offset = state.offset.max(new_offset); } - self.inner.commit() + ret += self.inner.commit()?; + Ok(ret) } /// Rolls back the transaction and aborts its changes - fn rollback(&self) -> anyhow::Result<()> { - { + fn rollback(&self) -> anyhow::Result { + let mut ret = { let mut state = self.state.lock().unwrap(); + let ret = state.records.len(); state.records.clear(); - } - self.inner.rollback() + ret + }; + ret += self.inner.rollback()?; + Ok(ret) } } diff --git a/lib/journal/src/lib.rs b/lib/journal/src/lib.rs index 400db5ffebc..7748f66a454 100644 --- a/lib/journal/src/lib.rs +++ b/lib/journal/src/lib.rs @@ -40,13 +40,15 @@ pub trait WritableJournal { fn flush(&self) -> anyhow::Result<()>; /// Commits the transaction - fn commit(&self) -> anyhow::Result<()> { - self.flush() + /// Returns the number of events committed + fn commit(&self) -> anyhow::Result { + Ok(0) } /// Rolls back the transaction and aborts its changes - fn rollback(&self) -> anyhow::Result<()> { - Ok(()) + /// Returns the number of events rolled back + fn rollback(&self) -> anyhow::Result { + Ok(0) } } From 09782f10a70bc76d558c295bbdca25030cf60aff Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Sun, 11 Aug 2024 22:04:25 +1000 Subject: [PATCH 3/6] Fixed the compacting transaction journals --- lib/journal/src/concrete/arc.rs | 20 ++++- lib/journal/src/concrete/auto_consistent.rs | 71 ++++++++++------- lib/journal/src/concrete/boxed.rs | 27 +++---- lib/journal/src/concrete/buffered.rs | 8 ++ lib/journal/src/concrete/compacting.rs | 73 ++++++++++++++--- .../src/concrete/compacting_log_file.rs | 16 ++++ .../src/concrete/compacting_transaction.rs | 67 +++++++++------- lib/journal/src/concrete/filter.rs | 79 +++++++++++++------ lib/journal/src/concrete/log_file.rs | 14 +++- lib/journal/src/concrete/pipe.rs | 8 ++ lib/journal/src/concrete/recombined.rs | 31 +++++--- lib/journal/src/concrete/transaction.rs | 48 +++++------ lib/journal/src/lib.rs | 6 +- lib/wasix/src/runners/dproxy/factory.rs | 5 +- lib/wasix/src/state/env.rs | 9 ++- 15 files changed, 330 insertions(+), 152 deletions(-) diff --git a/lib/journal/src/concrete/arc.rs b/lib/journal/src/concrete/arc.rs index 650f2d5fdb2..21d0fbb8e5e 100644 --- a/lib/journal/src/concrete/arc.rs +++ b/lib/journal/src/concrete/arc.rs @@ -2,7 +2,7 @@ use super::*; use std::ops::Deref; use std::sync::Arc; -impl ReadableJournal for Arc { +impl ReadableJournal for Arc { fn read(&self) -> anyhow::Result>> { self.deref().read() } @@ -12,7 +12,7 @@ impl ReadableJournal for Arc { } } -impl WritableJournal for Arc { +impl WritableJournal for Arc { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.deref().write(entry) } @@ -20,6 +20,14 @@ impl WritableJournal for Arc { fn flush(&self) -> anyhow::Result<()> { self.deref().flush() } + + fn commit(&self) -> anyhow::Result { + self.deref().commit() + } + + fn rollback(&self) -> anyhow::Result { + self.deref().rollback() + } } impl ReadableJournal for Arc { @@ -40,6 +48,14 @@ impl WritableJournal for Arc { fn flush(&self) -> anyhow::Result<()> { self.deref().flush() } + + fn commit(&self) -> anyhow::Result { + self.deref().commit() + } + + fn rollback(&self) -> anyhow::Result { + self.deref().rollback() + } } impl Journal for Arc { diff --git a/lib/journal/src/concrete/auto_consistent.rs b/lib/journal/src/concrete/auto_consistent.rs index 2f45e7d3a6c..511106afb23 100644 --- a/lib/journal/src/concrete/auto_consistent.rs +++ b/lib/journal/src/concrete/auto_consistent.rs @@ -3,41 +3,35 @@ use std::{ sync::{Arc, Mutex}, }; -use derivative::Derivative; - use super::*; /// Journal which leave itself in a consistent state once it commits /// by closing all the file descriptors that were opened while /// it was recording writes. #[derive(Debug)] -pub struct AutoConsistentJournal { - tx: AutoConsistentJournalTx, - rx: AutoConsistentJournalRx, +pub struct AutoConsistentJournal { + tx: AutoConsistentJournalTx, + rx: AutoConsistentJournalRx, } #[derive(Debug, Default, Clone)] struct State { open_files: HashSet, + open_sockets: HashSet, } -#[derive(Derivative)] -#[derivative(Debug)] -pub struct AutoConsistentJournalTx { +#[derive(Debug)] +pub struct AutoConsistentJournalTx { state: Arc>, - #[derivative(Debug = "ignore")] - inner: Box, + inner: W, } -#[derive(Derivative)] -#[derivative(Debug)] -pub struct AutoConsistentJournalRx { - state: Arc>, - #[derivative(Debug = "ignore")] - inner: Box, +#[derive(Debug)] +pub struct AutoConsistentJournalRx { + inner: R, } -impl AutoConsistentJournal { +impl AutoConsistentJournal, Box> { /// Creates a journal which will automatically correct inconsistencies when /// it commits. E.g. it will close any open file descriptors that were left /// open as it was processing events. @@ -52,27 +46,29 @@ impl AutoConsistentJournal { inner: tx, state: state.clone(), }, - rx: AutoConsistentJournalRx { - inner: rx, - state: state.clone(), - }, + rx: AutoConsistentJournalRx { inner: rx }, } } +} - pub fn into_inner(self) -> RecombinedJournal { +impl AutoConsistentJournal { + pub fn into_inner(self) -> RecombinedJournal { RecombinedJournal::new(self.tx.inner, self.rx.inner) } } -impl WritableJournal for AutoConsistentJournalTx { +impl WritableJournal for AutoConsistentJournalTx { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { match &entry { JournalEntry::OpenFileDescriptorV1 { fd, .. } - | JournalEntry::SocketAcceptedV1 { fd, .. } | JournalEntry::CreateEventV1 { fd, .. } => { let mut state = self.state.lock().unwrap(); state.open_files.insert(*fd); } + JournalEntry::SocketAcceptedV1 { fd, .. } => { + let mut state = self.state.lock().unwrap(); + state.open_sockets.insert(*fd); + } JournalEntry::CreatePipeV1 { fd1, fd2 } => { let mut state = self.state.lock().unwrap(); state.open_files.insert(*fd1); @@ -83,6 +79,9 @@ impl WritableJournal for AutoConsistentJournalTx { if state.open_files.remove(old_fd) { state.open_files.insert(*new_fd); } + if state.open_sockets.remove(old_fd) { + state.open_sockets.insert(*new_fd); + } } JournalEntry::DuplicateFileDescriptorV1 { original_fd, @@ -92,16 +91,21 @@ impl WritableJournal for AutoConsistentJournalTx { if state.open_files.contains(original_fd) { state.open_files.insert(*copied_fd); } + if state.open_sockets.contains(original_fd) { + state.open_sockets.insert(*copied_fd); + } } JournalEntry::CloseFileDescriptorV1 { fd } => { let mut state = self.state.lock().unwrap(); state.open_files.remove(fd); + state.open_sockets.remove(fd); } JournalEntry::InitModuleV1 { .. } | JournalEntry::ClearEtherealV1 { .. } | JournalEntry::ProcessExitV1 { .. } => { let mut state = self.state.lock().unwrap(); state.open_files.clear(); + state.open_sockets.clear(); } _ => {} } @@ -118,6 +122,7 @@ impl WritableJournal for AutoConsistentJournalTx { let mut state = self.state.lock().unwrap(); let mut open_files = Default::default(); std::mem::swap(&mut open_files, &mut state.open_files); + state.open_sockets.clear(); open_files }; for fd in open_files { @@ -132,12 +137,13 @@ impl WritableJournal for AutoConsistentJournalTx { { let mut state = self.state.lock().unwrap(); state.open_files.clear(); + state.open_sockets.clear(); } self.inner.rollback() } } -impl ReadableJournal for AutoConsistentJournalRx { +impl ReadableJournal for AutoConsistentJournalRx { fn read(&self) -> anyhow::Result>> { self.inner.read() } @@ -145,12 +151,11 @@ impl ReadableJournal for AutoConsistentJournalRx { fn as_restarted(&self) -> anyhow::Result> { Ok(Box::new(AutoConsistentJournalRx { inner: self.inner.as_restarted()?, - state: Arc::new(Mutex::new(State::default())), })) } } -impl WritableJournal for AutoConsistentJournal { +impl WritableJournal for AutoConsistentJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } @@ -158,9 +163,17 @@ impl WritableJournal for AutoConsistentJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } -impl ReadableJournal for AutoConsistentJournal { +impl ReadableJournal for AutoConsistentJournal { fn read(&self) -> anyhow::Result>> { self.rx.read() } @@ -170,7 +183,7 @@ impl ReadableJournal for AutoConsistentJournal { } } -impl Journal for AutoConsistentJournal { +impl Journal for AutoConsistentJournal, Box> { fn split(self) -> (Box, Box) { (Box::new(self.tx), Box::new(self.rx)) } diff --git a/lib/journal/src/concrete/boxed.rs b/lib/journal/src/concrete/boxed.rs index ce54a13c019..3caf5e48702 100644 --- a/lib/journal/src/concrete/boxed.rs +++ b/lib/journal/src/concrete/boxed.rs @@ -1,8 +1,8 @@ -use std::ops::Deref; +use std::{ops::Deref, sync::Arc}; use super::*; -impl ReadableJournal for Box { +impl ReadableJournal for Box { fn read(&self) -> anyhow::Result>> { self.deref().read() } @@ -12,7 +12,7 @@ impl ReadableJournal for Box { } } -impl WritableJournal for Box { +impl WritableJournal for Box { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.deref().write(entry) } @@ -20,24 +20,19 @@ impl WritableJournal for Box { fn flush(&self) -> anyhow::Result<()> { self.deref().flush() } -} -impl ReadableJournal for Box { - fn read(&self) -> anyhow::Result>> { - self.deref().read() + fn commit(&self) -> anyhow::Result { + self.deref().commit() } - fn as_restarted(&self) -> anyhow::Result> { - self.deref().as_restarted() + fn rollback(&self) -> anyhow::Result { + self.deref().rollback() } } -impl WritableJournal for Box { - fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { - self.deref().write(entry) - } - - fn flush(&self) -> anyhow::Result<()> { - self.deref().flush() +impl Journal for Box { + fn split(self) -> (Box, Box) { + let this = Arc::new(self); + (Box::new(this.clone()), Box::new(this)) } } diff --git a/lib/journal/src/concrete/buffered.rs b/lib/journal/src/concrete/buffered.rs index a985d44da4a..2b21010fee4 100644 --- a/lib/journal/src/concrete/buffered.rs +++ b/lib/journal/src/concrete/buffered.rs @@ -89,6 +89,14 @@ impl WritableJournal for BufferedJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } impl ReadableJournal for BufferedJournal { diff --git a/lib/journal/src/concrete/compacting.rs b/lib/journal/src/concrete/compacting.rs index 6ab380387be..d43fd716363 100644 --- a/lib/journal/src/concrete/compacting.rs +++ b/lib/journal/src/concrete/compacting.rs @@ -1,4 +1,3 @@ -use derivative::Derivative; use std::{ collections::{HashMap, HashSet}, ops::{DerefMut, Range}, @@ -33,8 +32,7 @@ impl From> for MemoryRange { #[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)] struct DescriptorLookup(u64); -#[derive(Derivative)] -#[derivative(Debug)] +#[derive(Debug)] struct State { /// The descriptor seed is used generate descriptor lookups descriptor_seed: u64, @@ -89,20 +87,30 @@ struct State { // after a compact started delta_list: Option>, // The inner journal that we will write to - #[derivative(Debug = "ignore")] inner_tx: Box, // The inner journal that we read from - #[derivative(Debug = "ignore")] inner_rx: Box, } impl State { - fn create_filter(&self, inner: J) -> FilteredJournal + fn create_filter( + &self, + inner: J, + ) -> FilteredJournal, Box> where J: Journal, { - let has_threads = !self.thread_map.is_empty(); + let (w, r) = inner.split(); + self.create_split_filter(w, r) + } + fn create_split_filter(&self, writer: W, reader: R) -> FilteredJournal + where + W: WritableJournal, + R: ReadableJournal, + { + let has_threads = !self.thread_map.is_empty(); + let mut filter = FilteredJournalBuilder::new() .with_filter_events(self.whitelist.clone().into_iter().collect()); if let Some(tty) = self.tty.as_ref() { @@ -182,7 +190,7 @@ impl State { } } } - filter.build(inner) + filter.build_split(writer, reader) } } @@ -195,10 +203,8 @@ pub struct CompactingJournalTx { compacting: Arc>, } -#[derive(Derivative)] -#[derivative(Debug)] +#[derive(Debug)] pub struct CompactingJournalRx { - #[derivative(Debug = "ignore")] inner: Box, } @@ -257,12 +263,25 @@ impl CompactingJournal { /// Creates a filter jounral which will write all /// its events to an inner journal - pub fn create_filter(&self, inner: J) -> FilteredJournal + pub fn create_filter( + &self, + inner: J, + ) -> FilteredJournal, Box> where J: Journal, { self.tx.create_filter(inner) } + + /// Creates a filter journal which will write all + /// its events to writer and readers supplied + pub fn create_split_filter(&self, writer: W, reader: R) -> FilteredJournal + where + W: WritableJournal, + R: ReadableJournal, + { + self.tx.create_split_filter(writer, reader) + } } /// Represents the results of a compaction operation @@ -273,7 +292,10 @@ pub struct CompactResult { } impl CompactingJournalTx { - pub fn create_filter(&self, inner: J) -> FilteredJournal + pub fn create_filter( + &self, + inner: J, + ) -> FilteredJournal, Box> where J: Journal, { @@ -281,6 +303,15 @@ impl CompactingJournalTx { state.create_filter(inner) } + pub fn create_split_filter(&self, writer: W, reader: R) -> FilteredJournal + where + W: WritableJournal, + R: ReadableJournal, + { + let state = self.state.lock().unwrap(); + state.create_split_filter(writer, reader) + } + pub fn swap(&self, other: Self) -> Self { let mut state1 = self.state.lock().unwrap(); let mut state2 = other.state.lock().unwrap(); @@ -734,6 +765,14 @@ impl WritableJournal for CompactingJournalTx { fn flush(&self) -> anyhow::Result<()> { self.state.lock().unwrap().inner_tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.state.lock().unwrap().inner_tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.state.lock().unwrap().inner_tx.rollback() + } } impl CompactingJournal { @@ -777,6 +816,14 @@ impl WritableJournal for CompactingJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } impl ReadableJournal for CompactingJournal { diff --git a/lib/journal/src/concrete/compacting_log_file.rs b/lib/journal/src/concrete/compacting_log_file.rs index 9f26a73d97e..9efba2b6031 100644 --- a/lib/journal/src/concrete/compacting_log_file.rs +++ b/lib/journal/src/concrete/compacting_log_file.rs @@ -241,6 +241,14 @@ impl WritableJournal for CompactingLogFileJournalTx { fn flush(&self) -> anyhow::Result<()> { self.inner.flush() } + + fn commit(&self) -> anyhow::Result { + self.inner.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.inner.rollback() + } } impl ReadableJournal for CompactingLogFileJournal { @@ -261,6 +269,14 @@ impl WritableJournal for CompactingLogFileJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } impl Journal for CompactingLogFileJournal { diff --git a/lib/journal/src/concrete/compacting_transaction.rs b/lib/journal/src/concrete/compacting_transaction.rs index ebfde2f97cf..4bd6418de2e 100644 --- a/lib/journal/src/concrete/compacting_transaction.rs +++ b/lib/journal/src/concrete/compacting_transaction.rs @@ -1,24 +1,24 @@ use super::*; #[derive(Debug)] -pub struct CompactingTransactionJournalTx { - inner: TransactionJournalTx, +pub struct CompactingTransactionJournalTx { + inner: TransactionJournalTx, } #[derive(Debug)] -pub struct CompactingTransactionJournalRx { - inner: TransactionJournalRx, +pub struct CompactingTransactionJournalRx { + inner: TransactionJournalRx, } /// Journal which will store the events locally in memory until it /// is either committed or rolled back #[derive(Debug)] -pub struct CompactingTransactionJournal { - tx: CompactingTransactionJournalTx, - rx: CompactingTransactionJournalRx, +pub struct CompactingTransactionJournal { + tx: CompactingTransactionJournalTx, + rx: CompactingTransactionJournalRx, } -impl CompactingTransactionJournal { +impl CompactingTransactionJournal, Box> { /// Creates a compacting transactional journal which will hold events in /// memory until the journal is either committed or rolled back. /// @@ -34,8 +34,10 @@ impl CompactingTransactionJournal { tx: CompactingTransactionJournalTx { inner: inner.tx }, } } +} - pub fn into_inner(self) -> TransactionJournal { +impl CompactingTransactionJournal { + pub fn into_inner(self) -> TransactionJournal { TransactionJournal { rx: self.rx.inner, tx: self.tx.inner, @@ -43,7 +45,7 @@ impl CompactingTransactionJournal { } } -impl WritableJournal for CompactingTransactionJournalTx { +impl WritableJournal for CompactingTransactionJournalTx { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.inner.write(entry) } @@ -73,28 +75,22 @@ impl WritableJournal for CompactingTransactionJournalTx { // Next we create an inline journal that is used for streaming the // events the journal this is under this super journal - struct RelayJournal<'a> { - inner: &'a CompactingTransactionJournalTx, + #[derive(Debug)] + struct RelayJournal<'a, W: WritableJournal> { + inner: &'a CompactingTransactionJournalTx, } - impl WritableJournal for RelayJournal<'_> { + impl WritableJournal for RelayJournal<'_, W> { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.inner.write(entry) } fn flush(&self) -> anyhow::Result<()> { Ok(()) } - } - impl ReadableJournal for RelayJournal<'_> { - fn read(&self) -> anyhow::Result>> { - Ok(None) + fn commit(&self) -> anyhow::Result { + self.inner.commit() } - fn as_restarted(&self) -> anyhow::Result> { - NullJournal::default().split().1.as_restarted() - } - } - impl Journal for RelayJournal<'_> { - fn split(self) -> (Box, Box) { - NullJournal::default().split() + fn rollback(&self) -> anyhow::Result { + self.inner.rollback() } } let relay_journal = RelayJournal { inner: self }; @@ -102,7 +98,8 @@ impl WritableJournal for CompactingTransactionJournalTx { // Now we create a filter journal which will filter out the events // that are not needed and stream them down let mut ret = 0; - let filter = compacting.create_filter(relay_journal); + let filter = + compacting.create_split_filter(relay_journal, NullJournal::default().split().1); for entry in records { let res = filter.write(entry)?; if res.record_start == 0 && res.record_end == 0 { @@ -124,7 +121,7 @@ impl WritableJournal for CompactingTransactionJournalTx { } } -impl ReadableJournal for CompactingTransactionJournalRx { +impl ReadableJournal for CompactingTransactionJournalRx { fn read(&self) -> anyhow::Result>> { self.inner.read() } @@ -134,7 +131,9 @@ impl ReadableJournal for CompactingTransactionJournalRx { } } -impl WritableJournal for CompactingTransactionJournal { +impl WritableJournal + for CompactingTransactionJournal +{ fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } @@ -142,9 +141,19 @@ impl WritableJournal for CompactingTransactionJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } -impl ReadableJournal for CompactingTransactionJournal { +impl ReadableJournal + for CompactingTransactionJournal +{ fn read(&self) -> anyhow::Result>> { self.rx.read() } @@ -154,7 +163,7 @@ impl ReadableJournal for CompactingTransactionJournal { } } -impl Journal for CompactingTransactionJournal { +impl Journal for CompactingTransactionJournal, Box> { fn split(self) -> (Box, Box) { (Box::new(self.tx), Box::new(self.rx)) } diff --git a/lib/journal/src/concrete/filter.rs b/lib/journal/src/concrete/filter.rs index fa856cdffd6..c6412be1e79 100644 --- a/lib/journal/src/concrete/filter.rs +++ b/lib/journal/src/concrete/filter.rs @@ -3,17 +3,15 @@ use std::{ sync::atomic::{AtomicUsize, Ordering}, }; -use derivative::Derivative; - use super::*; /// Filters out a specific set of journal events and drops the rest, this /// journal can be useful for restoring to a previous call point but /// retaining the memory changes (e.g. WCGI runner). #[derive(Debug)] -pub struct FilteredJournal { - tx: FilteredJournalTx, - rx: FilteredJournalRx, +pub struct FilteredJournal { + tx: FilteredJournalTx, + rx: FilteredJournalRx, } /// Represents what will be filtered by the filtering process @@ -62,19 +60,15 @@ impl Clone for FilteredJournalConfig { } } -#[derive(Derivative)] -#[derivative(Debug)] -pub struct FilteredJournalTx { - #[derivative(Debug = "ignore")] - inner: Box, +#[derive(Debug)] +pub struct FilteredJournalTx { + inner: W, config: FilteredJournalConfig, } -#[derive(Derivative)] -#[derivative(Debug)] -pub struct FilteredJournalRx { - #[derivative(Debug = "ignore")] - inner: Box, +#[derive(Debug)] +pub struct FilteredJournalRx { + inner: R, } /// Constructs a filter with a set of parameters that will be filtered on @@ -88,13 +82,24 @@ impl FilteredJournalBuilder { Self::default() } - pub fn build(self, inner: J) -> FilteredJournal + pub fn build( + self, + inner: J, + ) -> FilteredJournal, Box> where J: Journal, { FilteredJournal::new(inner, self.config) } + pub fn build_split(self, writer: W, reader: R) -> FilteredJournal + where + W: WritableJournal, + R: ReadableJournal, + { + FilteredJournal::new_split(writer, reader, self.config) + } + pub fn with_ignore_memory(mut self, val: bool) -> Self { self.config.filter_memory = val; self @@ -177,7 +182,7 @@ impl FilteredJournalBuilder { } } -impl FilteredJournal { +impl FilteredJournal, Box> { fn new(inner: J, config: FilteredJournalConfig) -> Self where J: Journal, @@ -200,13 +205,25 @@ impl FilteredJournal { rx: FilteredJournalRx { inner: rx }, } } +} - pub fn into_inner(self) -> RecombinedJournal { +impl FilteredJournal { + pub fn into_inner(self) -> RecombinedJournal { RecombinedJournal::new(self.tx.inner, self.rx.inner) } + + fn new_split(writer: W, reader: R, config: FilteredJournalConfig) -> Self { + Self { + tx: FilteredJournalTx { + inner: writer, + config, + }, + rx: FilteredJournalRx { inner: reader }, + } + } } -impl WritableJournal for FilteredJournalTx { +impl WritableJournal for FilteredJournalTx { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { let event_index = self.config.event_index.fetch_add(1, Ordering::SeqCst); if let Some(events) = self.config.filter_events.as_ref() { @@ -348,9 +365,17 @@ impl WritableJournal for FilteredJournalTx { fn flush(&self) -> anyhow::Result<()> { self.inner.flush() } + + fn commit(&self) -> anyhow::Result { + self.inner.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.inner.rollback() + } } -impl ReadableJournal for FilteredJournalRx { +impl ReadableJournal for FilteredJournalRx { fn read(&self) -> anyhow::Result>> { self.inner.read() } @@ -362,7 +387,7 @@ impl ReadableJournal for FilteredJournalRx { } } -impl WritableJournal for FilteredJournal { +impl WritableJournal for FilteredJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } @@ -370,9 +395,17 @@ impl WritableJournal for FilteredJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } -impl ReadableJournal for FilteredJournal { +impl ReadableJournal for FilteredJournal { fn read(&self) -> anyhow::Result>> { self.rx.read() } @@ -382,7 +415,7 @@ impl ReadableJournal for FilteredJournal { } } -impl Journal for FilteredJournal { +impl Journal for FilteredJournal, Box> { fn split(self) -> (Box, Box) { (Box::new(self.tx), Box::new(self.rx)) } diff --git a/lib/journal/src/concrete/log_file.rs b/lib/journal/src/concrete/log_file.rs index be3a876d71c..f309816ecc6 100644 --- a/lib/journal/src/concrete/log_file.rs +++ b/lib/journal/src/concrete/log_file.rs @@ -150,7 +150,9 @@ impl LogFileJournal { } /// Create a new journal from a buffer - pub fn from_buffer(buffer: OwnedBuffer) -> RecombinedJournal { + pub fn from_buffer( + buffer: OwnedBuffer, + ) -> RecombinedJournal { // Create the rx let rx = LogFileJournalRx { tx: None, @@ -163,7 +165,7 @@ impl LogFileJournal { let tx = UnsupportedJournal::default(); // Now recombine - RecombinedJournal::new(Box::new(tx), Box::new(rx)) + RecombinedJournal::new(tx, rx) } } @@ -303,6 +305,14 @@ impl WritableJournal for LogFileJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } impl ReadableJournal for LogFileJournal { diff --git a/lib/journal/src/concrete/pipe.rs b/lib/journal/src/concrete/pipe.rs index 5c3b7bdd45b..fa0dfaf3f8e 100644 --- a/lib/journal/src/concrete/pipe.rs +++ b/lib/journal/src/concrete/pipe.rs @@ -116,6 +116,14 @@ impl WritableJournal for PipeJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } impl ReadableJournal for PipeJournal { diff --git a/lib/journal/src/concrete/recombined.rs b/lib/journal/src/concrete/recombined.rs index 93a618c7e35..8b170d3b15f 100644 --- a/lib/journal/src/concrete/recombined.rs +++ b/lib/journal/src/concrete/recombined.rs @@ -1,17 +1,18 @@ use super::*; -pub struct RecombinedJournal { - tx: Box, - rx: Box, +#[derive(Debug)] +pub struct RecombinedJournal { + tx: W, + rx: R, } -impl RecombinedJournal { - pub fn new(tx: Box, rx: Box) -> Self { +impl RecombinedJournal { + pub fn new(tx: W, rx: R) -> Self { Self { tx, rx } } } -impl WritableJournal for RecombinedJournal { +impl WritableJournal for RecombinedJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } @@ -19,9 +20,17 @@ impl WritableJournal for RecombinedJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } -impl ReadableJournal for RecombinedJournal { +impl ReadableJournal for RecombinedJournal { fn read(&self) -> anyhow::Result>> { self.rx.read() } @@ -31,8 +40,12 @@ impl ReadableJournal for RecombinedJournal { } } -impl Journal for RecombinedJournal { +impl Journal for RecombinedJournal +where + W: WritableJournal + Send + Sync + 'static, + R: ReadableJournal + Send + Sync + 'static, +{ fn split(self) -> (Box, Box) { - (self.tx, self.rx) + (Box::new(self.tx), Box::new(self.rx)) } } diff --git a/lib/journal/src/concrete/transaction.rs b/lib/journal/src/concrete/transaction.rs index ff1405d02d4..551958534b9 100644 --- a/lib/journal/src/concrete/transaction.rs +++ b/lib/journal/src/concrete/transaction.rs @@ -1,15 +1,13 @@ use std::sync::{Arc, Mutex}; -use derivative::Derivative; - use super::*; /// Journal which will store the events locally in memory until it /// is either committed or rolled back #[derive(Debug)] -pub struct TransactionJournal { - pub(super) tx: TransactionJournalTx, - pub(super) rx: TransactionJournalRx, +pub struct TransactionJournal { + pub(super) tx: TransactionJournalTx, + pub(super) rx: TransactionJournalRx, } #[derive(Debug, Default, Clone)] @@ -18,23 +16,19 @@ pub(super) struct State { pub(super) offset: u64, } -#[derive(Derivative)] -#[derivative(Debug)] -pub struct TransactionJournalTx { +#[derive(Debug)] +pub struct TransactionJournalTx { pub(super) state: Arc>, - #[derivative(Debug = "ignore")] - inner: Box, + inner: W, } -#[derive(Derivative)] -#[derivative(Debug)] -pub struct TransactionJournalRx { +#[derive(Debug)] +pub struct TransactionJournalRx { state: Arc>, - #[derivative(Debug = "ignore")] - inner: Box, + inner: R, } -impl TransactionJournal { +impl TransactionJournal, Box> { /// Creates a transactional journal which will hold events in memory /// until the journal is either committed or rolled back pub fn new(inner: J) -> Self @@ -54,13 +48,15 @@ impl TransactionJournal { }, } } +} - pub fn into_inner(self) -> RecombinedJournal { +impl TransactionJournal { + pub fn into_inner(self) -> RecombinedJournal { RecombinedJournal::new(self.tx.inner, self.rx.inner) } } -impl WritableJournal for TransactionJournalTx { +impl WritableJournal for TransactionJournalTx { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { let entry = entry.into_owned(); let mut state = self.state.lock().unwrap(); @@ -111,7 +107,7 @@ impl WritableJournal for TransactionJournalTx { } } -impl ReadableJournal for TransactionJournalRx { +impl ReadableJournal for TransactionJournalRx { fn read(&self) -> anyhow::Result>> { let ret = self.inner.read()?; if let Some(res) = ret.as_ref() { @@ -129,7 +125,7 @@ impl ReadableJournal for TransactionJournalRx { } } -impl WritableJournal for TransactionJournal { +impl WritableJournal for TransactionJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } @@ -137,9 +133,17 @@ impl WritableJournal for TransactionJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } -impl ReadableJournal for TransactionJournal { +impl ReadableJournal for TransactionJournal { fn read(&self) -> anyhow::Result>> { self.rx.read() } @@ -149,7 +153,7 @@ impl ReadableJournal for TransactionJournal { } } -impl Journal for TransactionJournal { +impl Journal for TransactionJournal, Box> { fn split(self) -> (Box, Box) { (Box::new(self.tx), Box::new(self.rx)) } diff --git a/lib/journal/src/lib.rs b/lib/journal/src/lib.rs index 7748f66a454..478945eec20 100644 --- a/lib/journal/src/lib.rs +++ b/lib/journal/src/lib.rs @@ -31,7 +31,7 @@ impl LogWriteResult { /// a WASM process at a point in time and saves it so that it can be restored. /// It also allows for the restoration of that state at a later moment #[allow(unused_variables)] -pub trait WritableJournal { +pub trait WritableJournal: std::fmt::Debug { /// Takes in a stream of snapshot log entries and saves them so that they /// may be restored at a later moment fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result; @@ -81,7 +81,7 @@ impl<'a> Deref for LogReadResult<'a> { /// a WASM process at a point in time and saves it so that it can be restored. /// It also allows for the restoration of that state at a later moment #[allow(unused_variables)] -pub trait ReadableJournal { +pub trait ReadableJournal: std::fmt::Debug { /// Returns a stream of snapshot objects that the runtime will use /// to restore the state of a WASM process to a previous moment in time fn read(&self) -> anyhow::Result>>; @@ -95,7 +95,7 @@ pub trait ReadableJournal { /// a WASM process at a point in time and saves it so that it can be restored. /// It also allows for the restoration of that state at a later moment #[allow(unused_variables)] -pub trait Journal: WritableJournal + ReadableJournal { +pub trait Journal: WritableJournal + ReadableJournal + std::fmt::Debug { /// Splits the journal into a read and write side fn split(self) -> (Box, Box); } diff --git a/lib/wasix/src/runners/dproxy/factory.rs b/lib/wasix/src/runners/dproxy/factory.rs index 34465ef3885..1833a781ca1 100644 --- a/lib/wasix/src/runners/dproxy/factory.rs +++ b/lib/wasix/src/runners/dproxy/factory.rs @@ -64,9 +64,10 @@ impl DProxyInstanceFactory { .clone() .into_iter() .map(|journal| { - let tx = Box::new(journal.clone()); + let tx = journal.clone(); let rx = journal.as_restarted()?; - anyhow::Result::Ok(Arc::new(RecombinedJournal::new(tx, rx)) as Arc) + let combined = RecombinedJournal::new(tx, rx); + anyhow::Result::Ok(Arc::new(combined) as Arc) }) .collect::>>()?; let mut runtime = OverriddenRuntime::new(runtime).with_journals(journals); diff --git a/lib/wasix/src/state/env.rs b/lib/wasix/src/state/env.rs index f42b7e529d1..ff32cbc74c6 100644 --- a/lib/wasix/src/state/env.rs +++ b/lib/wasix/src/state/env.rs @@ -1000,11 +1000,16 @@ impl WasiEnv { self.enable_journal && !self.replaying_journal } + /// Returns true if the environment has an active journal + pub fn has_active_journal(&self) -> bool { + self.runtime().active_journal().is_some() + } + /// Returns the active journal or fails with an error #[cfg(feature = "journal")] pub fn active_journal(&self) -> Result<&DynJournal, Errno> { self.runtime().active_journal().ok_or_else(|| { - tracing::warn!("failed to save thread exit as there is not active journal"); + tracing::debug!("failed to save thread exit as there is not active journal"); Errno::Fault }) } @@ -1252,7 +1257,7 @@ impl WasiEnv { // If snap-shooting is enabled then we should record an event that the thread has exited. #[cfg(feature = "journal")] - if self.should_journal() { + if self.should_journal() && self.has_active_journal() { if let Err(err) = JournalEffector::save_thread_exit(self, self.tid(), exit_code) { tracing::warn!("failed to save snapshot event for thread exit - {}", err); } From 3014a44591b6ddc00d5c7415c779e67e221533ba Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Sun, 11 Aug 2024 22:20:12 +1000 Subject: [PATCH 4/6] Linting --- lib/journal/src/concrete/compacting.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/journal/src/concrete/compacting.rs b/lib/journal/src/concrete/compacting.rs index d43fd716363..3d56945669b 100644 --- a/lib/journal/src/concrete/compacting.rs +++ b/lib/journal/src/concrete/compacting.rs @@ -110,7 +110,7 @@ impl State { R: ReadableJournal, { let has_threads = !self.thread_map.is_empty(); - + let mut filter = FilteredJournalBuilder::new() .with_filter_events(self.whitelist.clone().into_iter().collect()); if let Some(tty) = self.tty.as_ref() { From ac6d429e1796dec63e0e25b866ee1ff7df699b1c Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Sun, 11 Aug 2024 22:29:35 +1000 Subject: [PATCH 5/6] More linting fixes --- lib/journal/src/concrete/transaction.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/journal/src/concrete/transaction.rs b/lib/journal/src/concrete/transaction.rs index 551958534b9..a8249e86865 100644 --- a/lib/journal/src/concrete/transaction.rs +++ b/lib/journal/src/concrete/transaction.rs @@ -63,8 +63,8 @@ impl WritableJournal for TransactionJournalTx { let estimate_size = entry.estimate_size(); state.records.push(entry); Ok(LogWriteResult { - record_start: state.offset as u64, - record_end: state.offset as u64 + estimate_size as u64, + record_start: state.offset, + record_end: state.offset + estimate_size as u64, }) } From c79e7ec43cc4fdc487c0d3dc251be5b758aa55dc Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Sun, 11 Aug 2024 22:44:16 +1000 Subject: [PATCH 6/6] Fix for nodejs --- lib/wasix/src/state/env.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/wasix/src/state/env.rs b/lib/wasix/src/state/env.rs index ff32cbc74c6..97f83e74f00 100644 --- a/lib/wasix/src/state/env.rs +++ b/lib/wasix/src/state/env.rs @@ -1001,6 +1001,7 @@ impl WasiEnv { } /// Returns true if the environment has an active journal + #[cfg(feature = "journal")] pub fn has_active_journal(&self) -> bool { self.runtime().active_journal().is_some() }