Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

commitlog: Yield StoredCommit in iterators #1791

Merged
merged 2 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ impl<R: Repo> Commits<R> {
/// Helper to handle a successfully extracted commit in [`Self::next`].
///
/// Checks that the offset sequence is contiguous.
fn next_commit(&mut self, commit: StoredCommit) -> Option<Result<Commit, error::Traversal>> {
fn next_commit(&mut self, commit: StoredCommit) -> Option<Result<StoredCommit, error::Traversal>> {
// Pop the last error. Either we'll return it below, or it's no longer
// interesting.
let prev_error = self.last_error.take();
Expand Down Expand Up @@ -548,7 +548,7 @@ impl<R: Repo> Commits<R> {
checksum: commit.checksum,
};

Some(Ok(Commit::from(commit)))
Some(Ok(commit))
}
}

Expand All @@ -572,7 +572,7 @@ impl<R: Repo> Commits<R> {
}

impl<R: Repo> Iterator for Commits<R> {
type Item = Result<Commit, error::Traversal>;
type Item = Result<StoredCommit, error::Traversal>;

fn next(&mut self) -> Option<Self::Item> {
if let Some(commits) = self.inner.as_mut() {
Expand Down Expand Up @@ -642,7 +642,7 @@ impl<R: Repo> Iterator for CommitsWithVersion<R> {
.current_segment_header()
.map(|hdr| hdr.log_format_version)
.expect("segment header none even though segment yielded a commit");
Some(Ok((version, commit)))
Some(Ok((version, commit.into())))
}
Err(e) => Some(Err(e)),
}
Expand Down Expand Up @@ -792,7 +792,10 @@ mod tests {

assert_eq!(
[commit1, commit2].as_slice(),
&log.commits_from(0).collect::<Result<Vec<_>, _>>().unwrap()
&log.commits_from(0)
.map_ok(Commit::from)
.collect::<Result<Vec<_>, _>>()
.unwrap()
);
}

Expand Down
36 changes: 18 additions & 18 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod varchar;
mod varint;

pub use crate::{
commit::Commit,
commit::{Commit, StoredCommit},
payload::{Decoder, Encode},
segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
varchar::Varchar,
Expand Down Expand Up @@ -89,7 +89,7 @@ impl Default for Options {
/// The canonical commitlog, backed by on-disk log files.
///
/// Records in the log are of type `T`, which canonically is instantiated to
/// [`Txdata`].
/// [`payload::Txdata`].
pub struct Commitlog<T> {
inner: RwLock<commitlog::Generic<repo::Fs, T>>,
}
Expand Down Expand Up @@ -181,7 +181,7 @@ impl<T> Commitlog<T> {
}

/// Obtain an iterator which traverses the log from the start, yielding
/// [`Commit`]s.
/// [`StoredCommit`]s.
///
/// The returned iterator is not aware of segment rotation. That is, if a
/// new segment is created after this method returns, the iterator will not
Expand All @@ -192,27 +192,27 @@ impl<T> Commitlog<T> {
/// however, a new iterator should be created using [`Self::commits_from`]
/// with the last transaction offset yielded.
///
/// Note that the very last [`Commit`] in a commitlog may be corrupt (e.g.
/// due to a partial write to disk), but a subsequent `append` will bring
/// the log into a consistent state.
/// Note that the very last [`StoredCommit`] in a commitlog may be corrupt
/// (e.g. due to a partial write to disk), but a subsequent `append` will
/// bring the log into a consistent state.
///
/// This means that, when this iterator yields an `Err` value, the consumer
/// may want to check if the iterator is exhausted (by calling `next()`)
/// before treating the `Err` value as an application error.
pub fn commits(&self) -> impl Iterator<Item = Result<Commit, error::Traversal>> {
pub fn commits(&self) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
self.commits_from(0)
}

/// Obtain an iterator starting from transaction offset `offset`, yielding
/// [`Commit`]s.
/// [`StoredCommit`]s.
///
/// Similar to [`Self::commits`] but will skip until the offset is contained
/// in the next [`Commit`] to yield.
/// in the next [`StoredCommit`] to yield.
///
/// Note that the first [`Commit`] yielded is the first commit containing
/// the given transaction offset, i.e. its `min_tx_offset` may be smaller
/// than `offset`.
pub fn commits_from(&self, offset: u64) -> impl Iterator<Item = Result<Commit, error::Traversal>> {
/// Note that the first [`StoredCommit`] yielded is the first commit
/// containing the given transaction offset, i.e. its `min_tx_offset` may be
/// smaller than `offset`.
pub fn commits_from(&self, offset: u64) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
self.inner.read().unwrap().commits_from(offset)
}

Expand Down Expand Up @@ -372,7 +372,7 @@ impl<T: Encode> Commitlog<T> {
/// data (e.g. `Decoder<Record = ()>`), as it will not allocate the commit
/// payload into a struct.
///
/// Note that, unlike [`Self::transaction`], this method will ignore a
/// Note that, unlike [`Self::transactions`], this method will ignore a
/// corrupt commit at the very end of the traversed log.
pub fn fold_transactions<D>(&self, de: D) -> Result<(), D::Error>
where
Expand All @@ -398,23 +398,23 @@ impl<T: Encode> Commitlog<T> {
}

/// Obtain an iterator which traverses the commitlog located at the `root`
/// directory from the start, yielding [`Commit`]s.
/// directory from the start, yielding [`StoredCommit`]s.
///
/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
/// See [`Commitlog::commits`] for more information.
pub fn commits(root: impl Into<PathBuf>) -> io::Result<impl Iterator<Item = Result<Commit, error::Traversal>>> {
pub fn commits(root: impl Into<PathBuf>) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
commits_from(root, 0)
}

/// Obtain an iterator which traverses the commitlog located at the `root`
/// directory starting from `offset` and yielding [`Commit`]s.
/// directory starting from `offset` and yielding [`StoredCommit`]s.
///
/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
/// See [`Commitlog::commits_from`] for more information.
pub fn commits_from(
root: impl Into<PathBuf>,
offset: u64,
) -> io::Result<impl Iterator<Item = Result<Commit, error::Traversal>>> {
) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
commitlog::commits_from(repo::Fs::new(root), DEFAULT_LOG_FORMAT_VERSION, offset)
}

Expand Down
4 changes: 2 additions & 2 deletions crates/commitlog/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Encode for () {
///
/// Unlike [`Encode`], this is not a datatype: the canonical commitlog format
/// requires to look up row types during log traversal in order to be able to
/// decode (see also [`RowDecoder`]).
/// decode.
pub trait Decoder {
/// The type of records this decoder can decode.
/// This is also the type which can be appended to a commitlog, and so must
Expand All @@ -53,7 +53,7 @@ pub trait Decoder {
/// Decode one [`Self::Record`] from the given buffer.
///
/// The `version` argument corresponds to the log format version of the
/// current segment (see [`segment::Header::log_format_version`]).
/// current segment (see [`crate::segment::Header::log_format_version`]).
///
/// The `tx_argument` is the transaction offset of the current record
/// relative to the start of the log.
Expand Down
2 changes: 1 addition & 1 deletion crates/commitlog/src/payload/txdata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
// Re-export so we get a hyperlink in rustdocs by default
pub use spacetimedb_primitives::TableId;

/// A visitor useful to implement stateful [`Decoder`]s of [`Txdata`] payloads.
/// A visitor useful to implement stateful [`super::Decoder`]s of [`Txdata`] payloads.
pub trait Visitor {
type Error: From<DecodeError>;
/// The type corresponding to one element in [`Ops::rowdata`].
Expand Down
8 changes: 4 additions & 4 deletions crates/commitlog/src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub trait Repo: Clone {
/// `offset` does not exist.
///
/// The method does not guarantee that the segment is non-empty -- this case
/// will be caught by [`open_segment_writer`] and [`open_segment_reader`]
/// will be caught by [`resume_segment_writer`] and [`open_segment_reader`]
/// respectively.
fn open_segment(&self, offset: u64) -> io::Result<Self::Segment>;

Expand Down Expand Up @@ -178,9 +178,9 @@ pub fn resume_segment_writer<R: Repo>(

/// Open the existing segment at `offset` for reading.
///
/// Unlike [`open_segment_writer`], this does not traverse the segment. It does,
/// however, attempt to read the segment header and checks that the log format
/// version and checksum algorithm are compatible.
/// Unlike [`resume_segment_writer`], this does not traverse the segment. It
/// does, however, attempt to read the segment header and checks that the log
/// format version and checksum algorithm are compatible.
pub fn open_segment_reader<R: Repo>(
repo: &R,
max_log_format_version: u8,
Expand Down
1 change: 1 addition & 0 deletions crates/durability/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ description = "Traits and single-node implementation of durability for Spacetime

[dependencies]
anyhow.workspace = true
itertools.workspace = true
log.workspace = true
spacetimedb-commitlog.workspace = true
spacetimedb-sats.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion crates/durability/src/imp/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::{
};

use anyhow::Context as _;
use itertools::Itertools as _;
use log::{info, trace, warn};
use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction};
use tokio::{
Expand Down Expand Up @@ -140,7 +141,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {

/// Obtain an iterator over the [`Commit`]s in the underlying log.
pub fn commits_from(&self, offset: TxOffset) -> impl Iterator<Item = Result<Commit, error::Traversal>> {
self.clog.commits_from(offset)
self.clog.commits_from(offset).map_ok(Commit::from)
}

/// Apply all outstanding transactions to the [`Commitlog`] and flush it
Expand Down
Loading