diff --git a/lib/journal/src/concrete/arc.rs b/lib/journal/src/concrete/arc.rs index 650f2d5fdb2..21d0fbb8e5e 100644 --- a/lib/journal/src/concrete/arc.rs +++ b/lib/journal/src/concrete/arc.rs @@ -2,7 +2,7 @@ use super::*; use std::ops::Deref; use std::sync::Arc; -impl ReadableJournal for Arc { +impl ReadableJournal for Arc { fn read(&self) -> anyhow::Result>> { self.deref().read() } @@ -12,7 +12,7 @@ impl ReadableJournal for Arc { } } -impl WritableJournal for Arc { +impl WritableJournal for Arc { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.deref().write(entry) } @@ -20,6 +20,14 @@ impl WritableJournal for Arc { fn flush(&self) -> anyhow::Result<()> { self.deref().flush() } + + fn commit(&self) -> anyhow::Result { + self.deref().commit() + } + + fn rollback(&self) -> anyhow::Result { + self.deref().rollback() + } } impl ReadableJournal for Arc { @@ -40,6 +48,14 @@ impl WritableJournal for Arc { fn flush(&self) -> anyhow::Result<()> { self.deref().flush() } + + fn commit(&self) -> anyhow::Result { + self.deref().commit() + } + + fn rollback(&self) -> anyhow::Result { + self.deref().rollback() + } } impl Journal for Arc { diff --git a/lib/journal/src/concrete/auto_consistent.rs b/lib/journal/src/concrete/auto_consistent.rs new file mode 100644 index 00000000000..511106afb23 --- /dev/null +++ b/lib/journal/src/concrete/auto_consistent.rs @@ -0,0 +1,190 @@ +use std::{ + collections::HashSet, + sync::{Arc, Mutex}, +}; + +use super::*; + +/// Journal which leave itself in a consistent state once it commits +/// by closing all the file descriptors that were opened while +/// it was recording writes. +#[derive(Debug)] +pub struct AutoConsistentJournal { + tx: AutoConsistentJournalTx, + rx: AutoConsistentJournalRx, +} + +#[derive(Debug, Default, Clone)] +struct State { + open_files: HashSet, + open_sockets: HashSet, +} + +#[derive(Debug)] +pub struct AutoConsistentJournalTx { + state: Arc>, + inner: W, +} + +#[derive(Debug)] +pub struct AutoConsistentJournalRx { + inner: R, +} + +impl AutoConsistentJournal, Box> { + /// Creates a journal which will automatically correct inconsistencies when + /// it commits. E.g. it will close any open file descriptors that were left + /// open as it was processing events. + pub fn new(inner: J) -> Self + where + J: Journal, + { + let state = Arc::new(Mutex::new(State::default())); + let (tx, rx) = inner.split(); + Self { + tx: AutoConsistentJournalTx { + inner: tx, + state: state.clone(), + }, + rx: AutoConsistentJournalRx { inner: rx }, + } + } +} + +impl AutoConsistentJournal { + pub fn into_inner(self) -> RecombinedJournal { + RecombinedJournal::new(self.tx.inner, self.rx.inner) + } +} + +impl WritableJournal for AutoConsistentJournalTx { + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + match &entry { + JournalEntry::OpenFileDescriptorV1 { fd, .. } + | JournalEntry::CreateEventV1 { fd, .. } => { + let mut state = self.state.lock().unwrap(); + state.open_files.insert(*fd); + } + JournalEntry::SocketAcceptedV1 { fd, .. } => { + let mut state = self.state.lock().unwrap(); + state.open_sockets.insert(*fd); + } + JournalEntry::CreatePipeV1 { fd1, fd2 } => { + let mut state = self.state.lock().unwrap(); + state.open_files.insert(*fd1); + state.open_files.insert(*fd2); + } + JournalEntry::RenumberFileDescriptorV1 { old_fd, new_fd } => { + let mut state = self.state.lock().unwrap(); + if state.open_files.remove(old_fd) { + state.open_files.insert(*new_fd); + } + if state.open_sockets.remove(old_fd) { + state.open_sockets.insert(*new_fd); + } + } + JournalEntry::DuplicateFileDescriptorV1 { + original_fd, + copied_fd, + } => { + let mut state = self.state.lock().unwrap(); + if state.open_files.contains(original_fd) { + state.open_files.insert(*copied_fd); + } + if state.open_sockets.contains(original_fd) { + state.open_sockets.insert(*copied_fd); + } + } + JournalEntry::CloseFileDescriptorV1 { fd } => { + let mut state = self.state.lock().unwrap(); + state.open_files.remove(fd); + state.open_sockets.remove(fd); + } + JournalEntry::InitModuleV1 { .. } + | JournalEntry::ClearEtherealV1 { .. } + | JournalEntry::ProcessExitV1 { .. } => { + let mut state = self.state.lock().unwrap(); + state.open_files.clear(); + state.open_sockets.clear(); + } + _ => {} + } + self.inner.write(entry) + } + + fn flush(&self) -> anyhow::Result<()> { + self.inner.flush() + } + + /// Commits the transaction + fn commit(&self) -> anyhow::Result { + let open_files = { + let mut state = self.state.lock().unwrap(); + let mut open_files = Default::default(); + std::mem::swap(&mut open_files, &mut state.open_files); + state.open_sockets.clear(); + open_files + }; + for fd in open_files { + let entry = JournalEntry::CloseFileDescriptorV1 { fd }; + self.inner.write(entry)?; + } + self.inner.commit() + } + + /// Rolls back the transaction and aborts its changes + fn rollback(&self) -> anyhow::Result { + { + let mut state = self.state.lock().unwrap(); + state.open_files.clear(); + state.open_sockets.clear(); + } + self.inner.rollback() + } +} + +impl ReadableJournal for AutoConsistentJournalRx { + fn read(&self) -> anyhow::Result>> { + self.inner.read() + } + + fn as_restarted(&self) -> anyhow::Result> { + Ok(Box::new(AutoConsistentJournalRx { + inner: self.inner.as_restarted()?, + })) + } +} + +impl WritableJournal for AutoConsistentJournal { + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + self.tx.write(entry) + } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } +} + +impl ReadableJournal for AutoConsistentJournal { + fn read(&self) -> anyhow::Result>> { + self.rx.read() + } + + fn as_restarted(&self) -> anyhow::Result> { + self.rx.as_restarted() + } +} + +impl Journal for AutoConsistentJournal, Box> { + fn split(self) -> (Box, Box) { + (Box::new(self.tx), Box::new(self.rx)) + } +} diff --git a/lib/journal/src/concrete/boxed.rs b/lib/journal/src/concrete/boxed.rs index ce54a13c019..3caf5e48702 100644 --- a/lib/journal/src/concrete/boxed.rs +++ b/lib/journal/src/concrete/boxed.rs @@ -1,8 +1,8 @@ -use std::ops::Deref; +use std::{ops::Deref, sync::Arc}; use super::*; -impl ReadableJournal for Box { +impl ReadableJournal for Box { fn read(&self) -> anyhow::Result>> { self.deref().read() } @@ -12,7 +12,7 @@ impl ReadableJournal for Box { } } -impl WritableJournal for Box { +impl WritableJournal for Box { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.deref().write(entry) } @@ -20,24 +20,19 @@ impl WritableJournal for Box { fn flush(&self) -> anyhow::Result<()> { self.deref().flush() } -} -impl ReadableJournal for Box { - fn read(&self) -> anyhow::Result>> { - self.deref().read() + fn commit(&self) -> anyhow::Result { + self.deref().commit() } - fn as_restarted(&self) -> anyhow::Result> { - self.deref().as_restarted() + fn rollback(&self) -> anyhow::Result { + self.deref().rollback() } } -impl WritableJournal for Box { - fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { - self.deref().write(entry) - } - - fn flush(&self) -> anyhow::Result<()> { - self.deref().flush() +impl Journal for Box { + fn split(self) -> (Box, Box) { + let this = Arc::new(self); + (Box::new(this.clone()), Box::new(this)) } } diff --git a/lib/journal/src/concrete/buffered.rs b/lib/journal/src/concrete/buffered.rs index a985d44da4a..2b21010fee4 100644 --- a/lib/journal/src/concrete/buffered.rs +++ b/lib/journal/src/concrete/buffered.rs @@ -89,6 +89,14 @@ impl WritableJournal for BufferedJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } impl ReadableJournal for BufferedJournal { diff --git a/lib/journal/src/concrete/compacting.rs b/lib/journal/src/concrete/compacting.rs index 4c858841e43..3d56945669b 100644 --- a/lib/journal/src/concrete/compacting.rs +++ b/lib/journal/src/concrete/compacting.rs @@ -1,4 +1,3 @@ -use derivative::Derivative; use std::{ collections::{HashMap, HashSet}, ops::{DerefMut, Range}, @@ -33,8 +32,7 @@ impl From> for MemoryRange { #[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)] struct DescriptorLookup(u64); -#[derive(Derivative)] -#[derivative(Debug)] +#[derive(Debug)] struct State { /// The descriptor seed is used generate descriptor lookups descriptor_seed: u64, @@ -89,17 +87,27 @@ struct State { // after a compact started delta_list: Option>, // The inner journal that we will write to - #[derivative(Debug = "ignore")] inner_tx: Box, // The inner journal that we read from - #[derivative(Debug = "ignore")] inner_rx: Box, } impl State { - fn create_filter(&self, inner: J) -> FilteredJournal + fn create_filter( + &self, + inner: J, + ) -> FilteredJournal, Box> where J: Journal, + { + let (w, r) = inner.split(); + self.create_split_filter(w, r) + } + + fn create_split_filter(&self, writer: W, reader: R) -> FilteredJournal + where + W: WritableJournal, + R: ReadableJournal, { let has_threads = !self.thread_map.is_empty(); @@ -182,7 +190,7 @@ impl State { } } } - filter.build(inner) + filter.build_split(writer, reader) } } @@ -195,10 +203,8 @@ pub struct CompactingJournalTx { compacting: Arc>, } -#[derive(Derivative)] -#[derivative(Debug)] +#[derive(Debug)] pub struct CompactingJournalRx { - #[derivative(Debug = "ignore")] inner: Box, } @@ -254,6 +260,28 @@ impl CompactingJournal { rx: CompactingJournalRx { inner: rx }, }) } + + /// Creates a filter jounral which will write all + /// its events to an inner journal + pub fn create_filter( + &self, + inner: J, + ) -> FilteredJournal, Box> + where + J: Journal, + { + self.tx.create_filter(inner) + } + + /// Creates a filter journal which will write all + /// its events to writer and readers supplied + pub fn create_split_filter(&self, writer: W, reader: R) -> FilteredJournal + where + W: WritableJournal, + R: ReadableJournal, + { + self.tx.create_split_filter(writer, reader) + } } /// Represents the results of a compaction operation @@ -264,7 +292,10 @@ pub struct CompactResult { } impl CompactingJournalTx { - pub fn create_filter(&self, inner: J) -> FilteredJournal + pub fn create_filter( + &self, + inner: J, + ) -> FilteredJournal, Box> where J: Journal, { @@ -272,6 +303,15 @@ impl CompactingJournalTx { state.create_filter(inner) } + pub fn create_split_filter(&self, writer: W, reader: R) -> FilteredJournal + where + W: WritableJournal, + R: ReadableJournal, + { + let state = self.state.lock().unwrap(); + state.create_split_filter(writer, reader) + } + pub fn swap(&self, other: Self) -> Self { let mut state1 = self.state.lock().unwrap(); let mut state2 = other.state.lock().unwrap(); @@ -725,6 +765,14 @@ impl WritableJournal for CompactingJournalTx { fn flush(&self) -> anyhow::Result<()> { self.state.lock().unwrap().inner_tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.state.lock().unwrap().inner_tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.state.lock().unwrap().inner_tx.rollback() + } } impl CompactingJournal { @@ -768,6 +816,14 @@ impl WritableJournal for CompactingJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } impl ReadableJournal for CompactingJournal { diff --git a/lib/journal/src/concrete/compacting_log_file.rs b/lib/journal/src/concrete/compacting_log_file.rs index 9f26a73d97e..9efba2b6031 100644 --- a/lib/journal/src/concrete/compacting_log_file.rs +++ b/lib/journal/src/concrete/compacting_log_file.rs @@ -241,6 +241,14 @@ impl WritableJournal for CompactingLogFileJournalTx { fn flush(&self) -> anyhow::Result<()> { self.inner.flush() } + + fn commit(&self) -> anyhow::Result { + self.inner.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.inner.rollback() + } } impl ReadableJournal for CompactingLogFileJournal { @@ -261,6 +269,14 @@ impl WritableJournal for CompactingLogFileJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } impl Journal for CompactingLogFileJournal { diff --git a/lib/journal/src/concrete/compacting_transaction.rs b/lib/journal/src/concrete/compacting_transaction.rs new file mode 100644 index 00000000000..4bd6418de2e --- /dev/null +++ b/lib/journal/src/concrete/compacting_transaction.rs @@ -0,0 +1,170 @@ +use super::*; + +#[derive(Debug)] +pub struct CompactingTransactionJournalTx { + inner: TransactionJournalTx, +} + +#[derive(Debug)] +pub struct CompactingTransactionJournalRx { + inner: TransactionJournalRx, +} + +/// Journal which will store the events locally in memory until it +/// is either committed or rolled back +#[derive(Debug)] +pub struct CompactingTransactionJournal { + tx: CompactingTransactionJournalTx, + rx: CompactingTransactionJournalRx, +} + +impl CompactingTransactionJournal, Box> { + /// Creates a compacting transactional journal which will hold events in + /// memory until the journal is either committed or rolled back. + /// + /// When the journal is commited it will perform a compaction of the events + /// before they are misseeed to the underlying journal + pub fn new(inner: J) -> Self + where + J: Journal, + { + let inner = TransactionJournal::new(inner); + Self { + rx: CompactingTransactionJournalRx { inner: inner.rx }, + tx: CompactingTransactionJournalTx { inner: inner.tx }, + } + } +} + +impl CompactingTransactionJournal { + pub fn into_inner(self) -> TransactionJournal { + TransactionJournal { + rx: self.rx.inner, + tx: self.tx.inner, + } + } +} + +impl WritableJournal for CompactingTransactionJournalTx { + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + self.inner.write(entry) + } + + fn flush(&self) -> anyhow::Result<()> { + self.inner.flush() + } + + fn commit(&self) -> anyhow::Result { + // We read all the events that have been buffered + let (records, mut new_offset) = { + let mut state = self.inner.state.lock().unwrap(); + let mut records = Default::default(); + std::mem::swap(&mut records, &mut state.records); + (records, state.offset) + }; + if records.is_empty() { + return Ok(0); + } + + // We prepare a compacting journal which does nothing + // with the events other than learn from them + let compacting = CompactingJournal::new(NullJournal::default())?; + for record in records.iter() { + compacting.write(record.clone())?; + } + + // Next we create an inline journal that is used for streaming the + // events the journal this is under this super journal + #[derive(Debug)] + struct RelayJournal<'a, W: WritableJournal> { + inner: &'a CompactingTransactionJournalTx, + } + impl WritableJournal for RelayJournal<'_, W> { + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + self.inner.write(entry) + } + fn flush(&self) -> anyhow::Result<()> { + Ok(()) + } + fn commit(&self) -> anyhow::Result { + self.inner.commit() + } + fn rollback(&self) -> anyhow::Result { + self.inner.rollback() + } + } + let relay_journal = RelayJournal { inner: self }; + + // Now we create a filter journal which will filter out the events + // that are not needed and stream them down + let mut ret = 0; + let filter = + compacting.create_split_filter(relay_journal, NullJournal::default().split().1); + for entry in records { + let res = filter.write(entry)?; + if res.record_start == 0 && res.record_end == 0 { + continue; + } + ret += 1; + new_offset = new_offset.max(res.record_end); + } + { + let mut state = self.inner.state.lock().unwrap(); + state.offset = state.offset.max(new_offset); + } + ret += self.inner.commit()?; + Ok(ret) + } + + fn rollback(&self) -> anyhow::Result { + self.inner.rollback() + } +} + +impl ReadableJournal for CompactingTransactionJournalRx { + fn read(&self) -> anyhow::Result>> { + self.inner.read() + } + + fn as_restarted(&self) -> anyhow::Result> { + self.inner.as_restarted() + } +} + +impl WritableJournal + for CompactingTransactionJournal +{ + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + self.tx.write(entry) + } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } +} + +impl ReadableJournal + for CompactingTransactionJournal +{ + fn read(&self) -> anyhow::Result>> { + self.rx.read() + } + + fn as_restarted(&self) -> anyhow::Result> { + self.rx.as_restarted() + } +} + +impl Journal for CompactingTransactionJournal, Box> { + fn split(self) -> (Box, Box) { + (Box::new(self.tx), Box::new(self.rx)) + } +} diff --git a/lib/journal/src/concrete/filter.rs b/lib/journal/src/concrete/filter.rs index fa856cdffd6..c6412be1e79 100644 --- a/lib/journal/src/concrete/filter.rs +++ b/lib/journal/src/concrete/filter.rs @@ -3,17 +3,15 @@ use std::{ sync::atomic::{AtomicUsize, Ordering}, }; -use derivative::Derivative; - use super::*; /// Filters out a specific set of journal events and drops the rest, this /// journal can be useful for restoring to a previous call point but /// retaining the memory changes (e.g. WCGI runner). #[derive(Debug)] -pub struct FilteredJournal { - tx: FilteredJournalTx, - rx: FilteredJournalRx, +pub struct FilteredJournal { + tx: FilteredJournalTx, + rx: FilteredJournalRx, } /// Represents what will be filtered by the filtering process @@ -62,19 +60,15 @@ impl Clone for FilteredJournalConfig { } } -#[derive(Derivative)] -#[derivative(Debug)] -pub struct FilteredJournalTx { - #[derivative(Debug = "ignore")] - inner: Box, +#[derive(Debug)] +pub struct FilteredJournalTx { + inner: W, config: FilteredJournalConfig, } -#[derive(Derivative)] -#[derivative(Debug)] -pub struct FilteredJournalRx { - #[derivative(Debug = "ignore")] - inner: Box, +#[derive(Debug)] +pub struct FilteredJournalRx { + inner: R, } /// Constructs a filter with a set of parameters that will be filtered on @@ -88,13 +82,24 @@ impl FilteredJournalBuilder { Self::default() } - pub fn build(self, inner: J) -> FilteredJournal + pub fn build( + self, + inner: J, + ) -> FilteredJournal, Box> where J: Journal, { FilteredJournal::new(inner, self.config) } + pub fn build_split(self, writer: W, reader: R) -> FilteredJournal + where + W: WritableJournal, + R: ReadableJournal, + { + FilteredJournal::new_split(writer, reader, self.config) + } + pub fn with_ignore_memory(mut self, val: bool) -> Self { self.config.filter_memory = val; self @@ -177,7 +182,7 @@ impl FilteredJournalBuilder { } } -impl FilteredJournal { +impl FilteredJournal, Box> { fn new(inner: J, config: FilteredJournalConfig) -> Self where J: Journal, @@ -200,13 +205,25 @@ impl FilteredJournal { rx: FilteredJournalRx { inner: rx }, } } +} - pub fn into_inner(self) -> RecombinedJournal { +impl FilteredJournal { + pub fn into_inner(self) -> RecombinedJournal { RecombinedJournal::new(self.tx.inner, self.rx.inner) } + + fn new_split(writer: W, reader: R, config: FilteredJournalConfig) -> Self { + Self { + tx: FilteredJournalTx { + inner: writer, + config, + }, + rx: FilteredJournalRx { inner: reader }, + } + } } -impl WritableJournal for FilteredJournalTx { +impl WritableJournal for FilteredJournalTx { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { let event_index = self.config.event_index.fetch_add(1, Ordering::SeqCst); if let Some(events) = self.config.filter_events.as_ref() { @@ -348,9 +365,17 @@ impl WritableJournal for FilteredJournalTx { fn flush(&self) -> anyhow::Result<()> { self.inner.flush() } + + fn commit(&self) -> anyhow::Result { + self.inner.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.inner.rollback() + } } -impl ReadableJournal for FilteredJournalRx { +impl ReadableJournal for FilteredJournalRx { fn read(&self) -> anyhow::Result>> { self.inner.read() } @@ -362,7 +387,7 @@ impl ReadableJournal for FilteredJournalRx { } } -impl WritableJournal for FilteredJournal { +impl WritableJournal for FilteredJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } @@ -370,9 +395,17 @@ impl WritableJournal for FilteredJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } -impl ReadableJournal for FilteredJournal { +impl ReadableJournal for FilteredJournal { fn read(&self) -> anyhow::Result>> { self.rx.read() } @@ -382,7 +415,7 @@ impl ReadableJournal for FilteredJournal { } } -impl Journal for FilteredJournal { +impl Journal for FilteredJournal, Box> { fn split(self) -> (Box, Box) { (Box::new(self.tx), Box::new(self.rx)) } diff --git a/lib/journal/src/concrete/log_file.rs b/lib/journal/src/concrete/log_file.rs index be3a876d71c..f309816ecc6 100644 --- a/lib/journal/src/concrete/log_file.rs +++ b/lib/journal/src/concrete/log_file.rs @@ -150,7 +150,9 @@ impl LogFileJournal { } /// Create a new journal from a buffer - pub fn from_buffer(buffer: OwnedBuffer) -> RecombinedJournal { + pub fn from_buffer( + buffer: OwnedBuffer, + ) -> RecombinedJournal { // Create the rx let rx = LogFileJournalRx { tx: None, @@ -163,7 +165,7 @@ impl LogFileJournal { let tx = UnsupportedJournal::default(); // Now recombine - RecombinedJournal::new(Box::new(tx), Box::new(rx)) + RecombinedJournal::new(tx, rx) } } @@ -303,6 +305,14 @@ impl WritableJournal for LogFileJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } impl ReadableJournal for LogFileJournal { diff --git a/lib/journal/src/concrete/mod.rs b/lib/journal/src/concrete/mod.rs index a7a8dfc38b5..f3dcb57033c 100644 --- a/lib/journal/src/concrete/mod.rs +++ b/lib/journal/src/concrete/mod.rs @@ -3,11 +3,13 @@ mod aligned_cow_vec; mod arc; mod archived; mod archived_from; +mod auto_consistent; mod boxed; mod buffered; mod compacting; #[cfg(feature = "log-file")] mod compacting_log_file; +mod compacting_transaction; mod counting; mod filter; #[cfg(feature = "log-file")] @@ -19,6 +21,7 @@ mod printing; mod recombined; #[cfg(test)] mod tests; +mod transaction; mod unsupported; pub(super) use super::*; @@ -26,10 +29,12 @@ pub(super) use super::*; pub use aligned_cow_str::*; pub use aligned_cow_vec::*; pub use archived::*; +pub use auto_consistent::*; pub use buffered::*; pub use compacting::*; #[cfg(feature = "log-file")] pub use compacting_log_file::*; +pub use compacting_transaction::*; pub use counting::*; pub use filter::*; #[cfg(feature = "log-file")] @@ -39,4 +44,5 @@ pub use null::*; pub use pipe::*; pub use printing::*; pub use recombined::*; +pub use transaction::*; pub use unsupported::*; diff --git a/lib/journal/src/concrete/pipe.rs b/lib/journal/src/concrete/pipe.rs index 5c3b7bdd45b..fa0dfaf3f8e 100644 --- a/lib/journal/src/concrete/pipe.rs +++ b/lib/journal/src/concrete/pipe.rs @@ -116,6 +116,14 @@ impl WritableJournal for PipeJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } impl ReadableJournal for PipeJournal { diff --git a/lib/journal/src/concrete/recombined.rs b/lib/journal/src/concrete/recombined.rs index 93a618c7e35..8b170d3b15f 100644 --- a/lib/journal/src/concrete/recombined.rs +++ b/lib/journal/src/concrete/recombined.rs @@ -1,17 +1,18 @@ use super::*; -pub struct RecombinedJournal { - tx: Box, - rx: Box, +#[derive(Debug)] +pub struct RecombinedJournal { + tx: W, + rx: R, } -impl RecombinedJournal { - pub fn new(tx: Box, rx: Box) -> Self { +impl RecombinedJournal { + pub fn new(tx: W, rx: R) -> Self { Self { tx, rx } } } -impl WritableJournal for RecombinedJournal { +impl WritableJournal for RecombinedJournal { fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { self.tx.write(entry) } @@ -19,9 +20,17 @@ impl WritableJournal for RecombinedJournal { fn flush(&self) -> anyhow::Result<()> { self.tx.flush() } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } } -impl ReadableJournal for RecombinedJournal { +impl ReadableJournal for RecombinedJournal { fn read(&self) -> anyhow::Result>> { self.rx.read() } @@ -31,8 +40,12 @@ impl ReadableJournal for RecombinedJournal { } } -impl Journal for RecombinedJournal { +impl Journal for RecombinedJournal +where + W: WritableJournal + Send + Sync + 'static, + R: ReadableJournal + Send + Sync + 'static, +{ fn split(self) -> (Box, Box) { - (self.tx, self.rx) + (Box::new(self.tx), Box::new(self.rx)) } } diff --git a/lib/journal/src/concrete/transaction.rs b/lib/journal/src/concrete/transaction.rs new file mode 100644 index 00000000000..a8249e86865 --- /dev/null +++ b/lib/journal/src/concrete/transaction.rs @@ -0,0 +1,160 @@ +use std::sync::{Arc, Mutex}; + +use super::*; + +/// Journal which will store the events locally in memory until it +/// is either committed or rolled back +#[derive(Debug)] +pub struct TransactionJournal { + pub(super) tx: TransactionJournalTx, + pub(super) rx: TransactionJournalRx, +} + +#[derive(Debug, Default, Clone)] +pub(super) struct State { + pub(super) records: Vec>, + pub(super) offset: u64, +} + +#[derive(Debug)] +pub struct TransactionJournalTx { + pub(super) state: Arc>, + inner: W, +} + +#[derive(Debug)] +pub struct TransactionJournalRx { + state: Arc>, + inner: R, +} + +impl TransactionJournal, Box> { + /// Creates a transactional journal which will hold events in memory + /// until the journal is either committed or rolled back + pub fn new(inner: J) -> Self + where + J: Journal, + { + let state = Arc::new(Mutex::new(State::default())); + let (tx, rx) = inner.split(); + Self { + tx: TransactionJournalTx { + inner: tx, + state: state.clone(), + }, + rx: TransactionJournalRx { + inner: rx, + state: state.clone(), + }, + } + } +} + +impl TransactionJournal { + pub fn into_inner(self) -> RecombinedJournal { + RecombinedJournal::new(self.tx.inner, self.rx.inner) + } +} + +impl WritableJournal for TransactionJournalTx { + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + let entry = entry.into_owned(); + let mut state = self.state.lock().unwrap(); + let estimate_size = entry.estimate_size(); + state.records.push(entry); + Ok(LogWriteResult { + record_start: state.offset, + record_end: state.offset + estimate_size as u64, + }) + } + + fn flush(&self) -> anyhow::Result<()> { + self.inner.flush() + } + + /// Commits the transaction + fn commit(&self) -> anyhow::Result { + let (records, mut new_offset) = { + let mut state = self.state.lock().unwrap(); + let mut records = Default::default(); + std::mem::swap(&mut records, &mut state.records); + (records, state.offset) + }; + + let mut ret = records.len(); + for entry in records { + let ret = self.inner.write(entry)?; + new_offset = new_offset.max(ret.record_end); + } + { + let mut state = self.state.lock().unwrap(); + state.offset = state.offset.max(new_offset); + } + ret += self.inner.commit()?; + Ok(ret) + } + + /// Rolls back the transaction and aborts its changes + fn rollback(&self) -> anyhow::Result { + let mut ret = { + let mut state = self.state.lock().unwrap(); + let ret = state.records.len(); + state.records.clear(); + ret + }; + ret += self.inner.rollback()?; + Ok(ret) + } +} + +impl ReadableJournal for TransactionJournalRx { + fn read(&self) -> anyhow::Result>> { + let ret = self.inner.read()?; + if let Some(res) = ret.as_ref() { + let mut state = self.state.lock().unwrap(); + state.offset = state.offset.max(res.record_end); + } + Ok(ret) + } + + fn as_restarted(&self) -> anyhow::Result> { + Ok(Box::new(TransactionJournalRx { + inner: self.inner.as_restarted()?, + state: Arc::new(Mutex::new(State::default())), + })) + } +} + +impl WritableJournal for TransactionJournal { + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + self.tx.write(entry) + } + + fn flush(&self) -> anyhow::Result<()> { + self.tx.flush() + } + + fn commit(&self) -> anyhow::Result { + self.tx.commit() + } + + fn rollback(&self) -> anyhow::Result { + self.tx.rollback() + } +} + +impl ReadableJournal for TransactionJournal { + fn read(&self) -> anyhow::Result>> { + self.rx.read() + } + + fn as_restarted(&self) -> anyhow::Result> { + self.rx.as_restarted() + } +} + +impl Journal for TransactionJournal, Box> { + fn split(self) -> (Box, Box) { + (Box::new(self.tx), Box::new(self.rx)) + } +} diff --git a/lib/journal/src/lib.rs b/lib/journal/src/lib.rs index 06fccd615bb..478945eec20 100644 --- a/lib/journal/src/lib.rs +++ b/lib/journal/src/lib.rs @@ -31,13 +31,25 @@ impl LogWriteResult { /// a WASM process at a point in time and saves it so that it can be restored. /// It also allows for the restoration of that state at a later moment #[allow(unused_variables)] -pub trait WritableJournal { +pub trait WritableJournal: std::fmt::Debug { /// Takes in a stream of snapshot log entries and saves them so that they /// may be restored at a later moment fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result; /// Flushes the data to disk or network fn flush(&self) -> anyhow::Result<()>; + + /// Commits the transaction + /// Returns the number of events committed + fn commit(&self) -> anyhow::Result { + Ok(0) + } + + /// Rolls back the transaction and aborts its changes + /// Returns the number of events rolled back + fn rollback(&self) -> anyhow::Result { + Ok(0) + } } /// The results of an operation to read a log entry from the log @@ -69,7 +81,7 @@ impl<'a> Deref for LogReadResult<'a> { /// a WASM process at a point in time and saves it so that it can be restored. /// It also allows for the restoration of that state at a later moment #[allow(unused_variables)] -pub trait ReadableJournal { +pub trait ReadableJournal: std::fmt::Debug { /// Returns a stream of snapshot objects that the runtime will use /// to restore the state of a WASM process to a previous moment in time fn read(&self) -> anyhow::Result>>; @@ -83,7 +95,7 @@ pub trait ReadableJournal { /// a WASM process at a point in time and saves it so that it can be restored. /// It also allows for the restoration of that state at a later moment #[allow(unused_variables)] -pub trait Journal: WritableJournal + ReadableJournal { +pub trait Journal: WritableJournal + ReadableJournal + std::fmt::Debug { /// Splits the journal into a read and write side fn split(self) -> (Box, Box); } diff --git a/lib/wasix/src/runners/dproxy/factory.rs b/lib/wasix/src/runners/dproxy/factory.rs index 34465ef3885..1833a781ca1 100644 --- a/lib/wasix/src/runners/dproxy/factory.rs +++ b/lib/wasix/src/runners/dproxy/factory.rs @@ -64,9 +64,10 @@ impl DProxyInstanceFactory { .clone() .into_iter() .map(|journal| { - let tx = Box::new(journal.clone()); + let tx = journal.clone(); let rx = journal.as_restarted()?; - anyhow::Result::Ok(Arc::new(RecombinedJournal::new(tx, rx)) as Arc) + let combined = RecombinedJournal::new(tx, rx); + anyhow::Result::Ok(Arc::new(combined) as Arc) }) .collect::>>()?; let mut runtime = OverriddenRuntime::new(runtime).with_journals(journals); diff --git a/lib/wasix/src/state/env.rs b/lib/wasix/src/state/env.rs index f42b7e529d1..97f83e74f00 100644 --- a/lib/wasix/src/state/env.rs +++ b/lib/wasix/src/state/env.rs @@ -1000,11 +1000,17 @@ impl WasiEnv { self.enable_journal && !self.replaying_journal } + /// Returns true if the environment has an active journal + #[cfg(feature = "journal")] + pub fn has_active_journal(&self) -> bool { + self.runtime().active_journal().is_some() + } + /// Returns the active journal or fails with an error #[cfg(feature = "journal")] pub fn active_journal(&self) -> Result<&DynJournal, Errno> { self.runtime().active_journal().ok_or_else(|| { - tracing::warn!("failed to save thread exit as there is not active journal"); + tracing::debug!("failed to save thread exit as there is not active journal"); Errno::Fault }) } @@ -1252,7 +1258,7 @@ impl WasiEnv { // If snap-shooting is enabled then we should record an event that the thread has exited. #[cfg(feature = "journal")] - if self.should_journal() { + if self.should_journal() && self.has_active_journal() { if let Err(err) = JournalEffector::save_thread_exit(self, self.tid(), exit_code) { tracing::warn!("failed to save snapshot event for thread exit - {}", err); }