Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a transaction and auto_consistency journal #5003

Merged
merged 6 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions lib/journal/src/concrete/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;
use std::ops::Deref;
use std::sync::Arc;

impl ReadableJournal for Arc<DynReadableJournal> {
impl<R: ReadableJournal> ReadableJournal for Arc<R> {
fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
self.deref().read()
}
Expand All @@ -12,14 +12,22 @@ impl ReadableJournal for Arc<DynReadableJournal> {
}
}

impl WritableJournal for Arc<DynWritableJournal> {
impl<W: WritableJournal> WritableJournal for Arc<W> {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.deref().write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.deref().flush()
}

fn commit(&self) -> anyhow::Result<usize> {
self.deref().commit()
}

fn rollback(&self) -> anyhow::Result<usize> {
self.deref().rollback()
}
}

impl ReadableJournal for Arc<DynJournal> {
Expand All @@ -40,6 +48,14 @@ impl WritableJournal for Arc<DynJournal> {
fn flush(&self) -> anyhow::Result<()> {
self.deref().flush()
}

fn commit(&self) -> anyhow::Result<usize> {
self.deref().commit()
}

fn rollback(&self) -> anyhow::Result<usize> {
self.deref().rollback()
}
}

impl Journal for Arc<DynJournal> {
Expand Down
190 changes: 190 additions & 0 deletions lib/journal/src/concrete/auto_consistent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use std::{
collections::HashSet,
sync::{Arc, Mutex},
};

use super::*;

/// Journal which leave itself in a consistent state once it commits
/// by closing all the file descriptors that were opened while
/// it was recording writes.
#[derive(Debug)]
pub struct AutoConsistentJournal<W: WritableJournal, R: ReadableJournal> {
tx: AutoConsistentJournalTx<W>,
rx: AutoConsistentJournalRx<R>,
}

#[derive(Debug, Default, Clone)]
struct State {
open_files: HashSet<u32>,
open_sockets: HashSet<u32>,
}

#[derive(Debug)]
pub struct AutoConsistentJournalTx<W: WritableJournal> {
state: Arc<Mutex<State>>,
inner: W,
}

#[derive(Debug)]
pub struct AutoConsistentJournalRx<R: ReadableJournal> {
inner: R,
}

impl AutoConsistentJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
/// Creates a journal which will automatically correct inconsistencies when
/// it commits. E.g. it will close any open file descriptors that were left
/// open as it was processing events.
pub fn new<J>(inner: J) -> Self
where
J: Journal,
{
let state = Arc::new(Mutex::new(State::default()));
let (tx, rx) = inner.split();
Self {
tx: AutoConsistentJournalTx {
inner: tx,
state: state.clone(),
},
rx: AutoConsistentJournalRx { inner: rx },
}
}
}

impl<W: WritableJournal, R: ReadableJournal> AutoConsistentJournal<W, R> {
pub fn into_inner(self) -> RecombinedJournal<W, R> {
RecombinedJournal::new(self.tx.inner, self.rx.inner)
}
}

impl<W: WritableJournal> WritableJournal for AutoConsistentJournalTx<W> {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
match &entry {
JournalEntry::OpenFileDescriptorV1 { fd, .. }
| JournalEntry::CreateEventV1 { fd, .. } => {
let mut state = self.state.lock().unwrap();
state.open_files.insert(*fd);
}
JournalEntry::SocketAcceptedV1 { fd, .. } => {
let mut state = self.state.lock().unwrap();
state.open_sockets.insert(*fd);
}
JournalEntry::CreatePipeV1 { fd1, fd2 } => {
let mut state = self.state.lock().unwrap();
state.open_files.insert(*fd1);
state.open_files.insert(*fd2);
}
JournalEntry::RenumberFileDescriptorV1 { old_fd, new_fd } => {
let mut state = self.state.lock().unwrap();
if state.open_files.remove(old_fd) {
state.open_files.insert(*new_fd);
}
if state.open_sockets.remove(old_fd) {
state.open_sockets.insert(*new_fd);
}
}
JournalEntry::DuplicateFileDescriptorV1 {
original_fd,
copied_fd,
} => {
let mut state = self.state.lock().unwrap();
if state.open_files.contains(original_fd) {
state.open_files.insert(*copied_fd);
}
if state.open_sockets.contains(original_fd) {
state.open_sockets.insert(*copied_fd);
}
}
JournalEntry::CloseFileDescriptorV1 { fd } => {
let mut state = self.state.lock().unwrap();
state.open_files.remove(fd);
state.open_sockets.remove(fd);
}
JournalEntry::InitModuleV1 { .. }
| JournalEntry::ClearEtherealV1 { .. }
| JournalEntry::ProcessExitV1 { .. } => {
let mut state = self.state.lock().unwrap();
state.open_files.clear();
state.open_sockets.clear();
}
_ => {}
}
self.inner.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.inner.flush()
}

/// Commits the transaction
fn commit(&self) -> anyhow::Result<usize> {
let open_files = {
let mut state = self.state.lock().unwrap();
let mut open_files = Default::default();
std::mem::swap(&mut open_files, &mut state.open_files);
state.open_sockets.clear();
open_files
};
for fd in open_files {
let entry = JournalEntry::CloseFileDescriptorV1 { fd };
self.inner.write(entry)?;
}
self.inner.commit()
}

/// Rolls back the transaction and aborts its changes
fn rollback(&self) -> anyhow::Result<usize> {
{
let mut state = self.state.lock().unwrap();
state.open_files.clear();
state.open_sockets.clear();
}
self.inner.rollback()
}
}

impl<R: ReadableJournal> ReadableJournal for AutoConsistentJournalRx<R> {
fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
self.inner.read()
}

fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
Ok(Box::new(AutoConsistentJournalRx {
inner: self.inner.as_restarted()?,
}))
}
}

impl<W: WritableJournal, R: ReadableJournal> WritableJournal for AutoConsistentJournal<W, R> {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.tx.write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}

fn commit(&self) -> anyhow::Result<usize> {
self.tx.commit()
}

fn rollback(&self) -> anyhow::Result<usize> {
self.tx.rollback()
}
}

impl<W: WritableJournal, R: ReadableJournal> ReadableJournal for AutoConsistentJournal<W, R> {
fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
self.rx.read()
}

fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
self.rx.as_restarted()
}
}

impl Journal for AutoConsistentJournal<Box<DynWritableJournal>, Box<DynReadableJournal>> {
fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
(Box::new(self.tx), Box::new(self.rx))
}
}
27 changes: 11 additions & 16 deletions lib/journal/src/concrete/boxed.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::ops::Deref;
use std::{ops::Deref, sync::Arc};

use super::*;

impl ReadableJournal for Box<DynReadableJournal> {
impl<R: ReadableJournal + ?Sized> ReadableJournal for Box<R> {
fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
self.deref().read()
}
Expand All @@ -12,32 +12,27 @@ impl ReadableJournal for Box<DynReadableJournal> {
}
}

impl WritableJournal for Box<DynWritableJournal> {
impl<W: WritableJournal + ?Sized> WritableJournal for Box<W> {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.deref().write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.deref().flush()
}
}

impl ReadableJournal for Box<DynJournal> {
fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
self.deref().read()
fn commit(&self) -> anyhow::Result<usize> {
self.deref().commit()
}

fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
self.deref().as_restarted()
fn rollback(&self) -> anyhow::Result<usize> {
self.deref().rollback()
}
}

impl WritableJournal for Box<DynJournal> {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
self.deref().write(entry)
}

fn flush(&self) -> anyhow::Result<()> {
self.deref().flush()
impl Journal for Box<DynJournal> {
fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
let this = Arc::new(self);
(Box::new(this.clone()), Box::new(this))
}
}
8 changes: 8 additions & 0 deletions lib/journal/src/concrete/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ impl WritableJournal for BufferedJournal {
fn flush(&self) -> anyhow::Result<()> {
self.tx.flush()
}

fn commit(&self) -> anyhow::Result<usize> {
self.tx.commit()
}

fn rollback(&self) -> anyhow::Result<usize> {
self.tx.rollback()
}
}

impl ReadableJournal for BufferedJournal {
Expand Down
Loading
Loading