diff --git a/Cargo.lock b/Cargo.lock index def5ddae561..e4b98c6b2d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4570,6 +4570,7 @@ name = "spacetimedb-durability" version = "1.0.0-rc1" dependencies = [ "anyhow", + "itertools 0.12.1", "log", "spacetimedb-commitlog", "spacetimedb-sats", diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 698f3558195..de002c08ec3 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -510,7 +510,7 @@ impl Commits { /// Helper to handle a successfully extracted commit in [`Self::next`]. /// /// Checks that the offset sequence is contiguous. - fn next_commit(&mut self, commit: StoredCommit) -> Option> { + fn next_commit(&mut self, commit: StoredCommit) -> Option> { // Pop the last error. Either we'll return it below, or it's no longer // interesting. let prev_error = self.last_error.take(); @@ -548,7 +548,7 @@ impl Commits { checksum: commit.checksum, }; - Some(Ok(Commit::from(commit))) + Some(Ok(commit)) } } @@ -572,7 +572,7 @@ impl Commits { } impl Iterator for Commits { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { if let Some(commits) = self.inner.as_mut() { @@ -642,7 +642,7 @@ impl Iterator for CommitsWithVersion { .current_segment_header() .map(|hdr| hdr.log_format_version) .expect("segment header none even though segment yielded a commit"); - Some(Ok((version, commit))) + Some(Ok((version, commit.into()))) } Err(e) => Some(Err(e)), } @@ -792,7 +792,10 @@ mod tests { assert_eq!( [commit1, commit2].as_slice(), - &log.commits_from(0).collect::, _>>().unwrap() + &log.commits_from(0) + .map_ok(Commit::from) + .collect::, _>>() + .unwrap() ); } diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index caa84bbf694..3f83af6916d 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -16,7 +16,7 @@ mod varchar; mod varint; pub use crate::{ - commit::Commit, + commit::{Commit, StoredCommit}, payload::{Decoder, Encode}, segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION}, varchar::Varchar, @@ -89,7 +89,7 @@ impl Default for Options { /// The canonical commitlog, backed by on-disk log files. /// /// Records in the log are of type `T`, which canonically is instantiated to -/// [`Txdata`]. +/// [`payload::Txdata`]. pub struct Commitlog { inner: RwLock>, } @@ -181,7 +181,7 @@ impl Commitlog { } /// Obtain an iterator which traverses the log from the start, yielding - /// [`Commit`]s. + /// [`StoredCommit`]s. /// /// The returned iterator is not aware of segment rotation. That is, if a /// new segment is created after this method returns, the iterator will not @@ -192,27 +192,27 @@ impl Commitlog { /// however, a new iterator should be created using [`Self::commits_from`] /// with the last transaction offset yielded. /// - /// Note that the very last [`Commit`] in a commitlog may be corrupt (e.g. - /// due to a partial write to disk), but a subsequent `append` will bring - /// the log into a consistent state. + /// Note that the very last [`StoredCommit`] in a commitlog may be corrupt + /// (e.g. due to a partial write to disk), but a subsequent `append` will + /// bring the log into a consistent state. /// /// This means that, when this iterator yields an `Err` value, the consumer /// may want to check if the iterator is exhausted (by calling `next()`) /// before treating the `Err` value as an application error. - pub fn commits(&self) -> impl Iterator> { + pub fn commits(&self) -> impl Iterator> { self.commits_from(0) } /// Obtain an iterator starting from transaction offset `offset`, yielding - /// [`Commit`]s. + /// [`StoredCommit`]s. /// /// Similar to [`Self::commits`] but will skip until the offset is contained - /// in the next [`Commit`] to yield. + /// in the next [`StoredCommit`] to yield. /// - /// Note that the first [`Commit`] yielded is the first commit containing - /// the given transaction offset, i.e. its `min_tx_offset` may be smaller - /// than `offset`. - pub fn commits_from(&self, offset: u64) -> impl Iterator> { + /// Note that the first [`StoredCommit`] yielded is the first commit + /// containing the given transaction offset, i.e. its `min_tx_offset` may be + /// smaller than `offset`. + pub fn commits_from(&self, offset: u64) -> impl Iterator> { self.inner.read().unwrap().commits_from(offset) } @@ -372,7 +372,7 @@ impl Commitlog { /// data (e.g. `Decoder`), as it will not allocate the commit /// payload into a struct. /// - /// Note that, unlike [`Self::transaction`], this method will ignore a + /// Note that, unlike [`Self::transactions`], this method will ignore a /// corrupt commit at the very end of the traversed log. pub fn fold_transactions(&self, de: D) -> Result<(), D::Error> where @@ -398,23 +398,23 @@ impl Commitlog { } /// Obtain an iterator which traverses the commitlog located at the `root` -/// directory from the start, yielding [`Commit`]s. +/// directory from the start, yielding [`StoredCommit`]s. /// /// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`]. /// See [`Commitlog::commits`] for more information. -pub fn commits(root: impl Into) -> io::Result>> { +pub fn commits(root: impl Into) -> io::Result>> { commits_from(root, 0) } /// Obtain an iterator which traverses the commitlog located at the `root` -/// directory starting from `offset` and yielding [`Commit`]s. +/// directory starting from `offset` and yielding [`StoredCommit`]s. /// /// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`]. /// See [`Commitlog::commits_from`] for more information. pub fn commits_from( root: impl Into, offset: u64, -) -> io::Result>> { +) -> io::Result>> { commitlog::commits_from(repo::Fs::new(root), DEFAULT_LOG_FORMAT_VERSION, offset) } diff --git a/crates/commitlog/src/payload.rs b/crates/commitlog/src/payload.rs index e5c1cdb4680..82610ad76da 100644 --- a/crates/commitlog/src/payload.rs +++ b/crates/commitlog/src/payload.rs @@ -41,7 +41,7 @@ impl Encode for () { /// /// Unlike [`Encode`], this is not a datatype: the canonical commitlog format /// requires to look up row types during log traversal in order to be able to -/// decode (see also [`RowDecoder`]). +/// decode. pub trait Decoder { /// The type of records this decoder can decode. /// This is also the type which can be appended to a commitlog, and so must @@ -53,7 +53,7 @@ pub trait Decoder { /// Decode one [`Self::Record`] from the given buffer. /// /// The `version` argument corresponds to the log format version of the - /// current segment (see [`segment::Header::log_format_version`]). + /// current segment (see [`crate::segment::Header::log_format_version`]). /// /// The `tx_argument` is the transaction offset of the current record /// relative to the start of the log. diff --git a/crates/commitlog/src/payload/txdata.rs b/crates/commitlog/src/payload/txdata.rs index fac203fe901..a3b1b30cb26 100644 --- a/crates/commitlog/src/payload/txdata.rs +++ b/crates/commitlog/src/payload/txdata.rs @@ -13,7 +13,7 @@ use crate::{ // Re-export so we get a hyperlink in rustdocs by default pub use spacetimedb_primitives::TableId; -/// A visitor useful to implement stateful [`Decoder`]s of [`Txdata`] payloads. +/// A visitor useful to implement stateful [`super::Decoder`]s of [`Txdata`] payloads. pub trait Visitor { type Error: From; /// The type corresponding to one element in [`Ops::rowdata`]. diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index e91eb3cb8fc..95866798966 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -46,7 +46,7 @@ pub trait Repo: Clone { /// `offset` does not exist. /// /// The method does not guarantee that the segment is non-empty -- this case - /// will be caught by [`open_segment_writer`] and [`open_segment_reader`] + /// will be caught by [`resume_segment_writer`] and [`open_segment_reader`] /// respectively. fn open_segment(&self, offset: u64) -> io::Result; @@ -178,9 +178,9 @@ pub fn resume_segment_writer( /// Open the existing segment at `offset` for reading. /// -/// Unlike [`open_segment_writer`], this does not traverse the segment. It does, -/// however, attempt to read the segment header and checks that the log format -/// version and checksum algorithm are compatible. +/// Unlike [`resume_segment_writer`], this does not traverse the segment. It +/// does, however, attempt to read the segment header and checks that the log +/// format version and checksum algorithm are compatible. pub fn open_segment_reader( repo: &R, max_log_format_version: u8, diff --git a/crates/durability/Cargo.toml b/crates/durability/Cargo.toml index 5e2c1c69c7f..e805acd04e9 100644 --- a/crates/durability/Cargo.toml +++ b/crates/durability/Cargo.toml @@ -9,6 +9,7 @@ description = "Traits and single-node implementation of durability for Spacetime [dependencies] anyhow.workspace = true +itertools.workspace = true log.workspace = true spacetimedb-commitlog.workspace = true spacetimedb-sats.workspace = true diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index b4cd6110a59..67b8b986c8a 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -14,6 +14,7 @@ use std::{ }; use anyhow::Context as _; +use itertools::Itertools as _; use log::{info, trace, warn}; use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction}; use tokio::{ @@ -140,7 +141,7 @@ impl Local { /// Obtain an iterator over the [`Commit`]s in the underlying log. pub fn commits_from(&self, offset: TxOffset) -> impl Iterator> { - self.clog.commits_from(offset) + self.clog.commits_from(offset).map_ok(Commit::from) } /// Apply all outstanding transactions to the [`Commitlog`] and flush it