Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

commitlog: Support traversal without opening the log #1103

Merged
merged 1 commit into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 106 additions & 36 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,47 +224,19 @@ impl<R: Repo, T: Encode> Generic<R, T> {
R: 'a,
T: 'a,
{
self.commits_from(offset)
.with_log_format_version()
.map(|x| x.map_err(Into::into))
.map_ok(move |(version, commit)| commit.into_transactions(version, deserializer))
.flatten_ok()
.flatten_ok()
.skip_while(move |x| x.as_ref().map(|tx| tx.offset < offset).unwrap_or(false))
transactions_from_internal(
self.commits_from(offset).with_log_format_version(),
offset,
deserializer,
)
}

pub fn fold_transactions_from<D>(&self, offset: u64, decoder: D) -> Result<(), D::Error>
where
D: Decoder,
D::Error: From<error::Traversal>,
{
let mut iter = self.commits_from(offset).with_log_format_version();
while let Some(commit) = iter.next() {
let (version, commit) = match commit {
Ok(version_and_commit) => version_and_commit,
Err(e) => {
// Ignore it if the very last commit in the log is broken.
// The next `append` will fix the log, but the `decoder`
// has no way to tell whether we're at the end or not.
// This is unlike the consumer of an iterator, which can
// perform below check itself.
if iter.next().is_none() {
return Ok(());
}

return Err(e.into());
}
};
trace!("commit {} n={} version={}", commit.min_tx_offset, commit.n, version);

let records = &mut commit.records.as_slice();
for n in 0..commit.n {
let tx_offset = commit.min_tx_offset + n as u64;
decoder.decode_record(version, tx_offset, records)?;
}
}

Ok(())
fold_transactions_internal(self.commits_from(offset).with_log_format_version(), decoder)
}
}

Expand All @@ -279,6 +251,104 @@ impl<R: Repo, T> Drop for Generic<R, T> {
}
}

pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -> io::Result<Commits<R>> {
let mut offsets = repo.existing_offsets()?;
if let Some(pos) = offsets.iter().rposition(|&off| off <= offset) {
offsets = offsets.split_off(pos);
}
let last_offset = offsets.first().cloned().unwrap_or(offset);
let segments = Segments {
offs: offsets.into_iter(),
repo,
max_log_format_version,
};
Ok(Commits {
inner: None,
segments,
last_offset,
last_error: None,
})
}

pub fn transactions_from<'a, R, D, T>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth having these transactions_from variants here? Maybe I'm not aware of existing code to persist the tables schema state, or how to skip entire tx ranges using these functions, but it seems that unless the caller can persist their decoder state, they always have to replay from 0. In the case of code under dev, where a full replay is needed on code changes, going through the full snapshot on every iteration is too slow, and --release isn't friendly inside the debugger.

For analytics, the custom decoder calls decode_record_fn and only replays records for the system tables to create the schemas needed to decode rows, and processes everything else on the fly from within the Visitor.

For information, I'm only using commits. This is because, given the large snapshot at the beginning of the log, it's starting at offset 0 to load the schema information, then skips to offset 163 to load the relevant table's snapshot, and then skips all the way to the first post-snapshot commit. This saves most of the startup time without having to persist the state of the schemas, ie many millions of records don't have to be decoded at all before getting to the interesting data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nevermind most of that, just realized Decoder.decode_record is meant to call decode_record_fn, and the tx could be filtered there instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_from is useful for forensics, and for replication.

repo: R,
max_log_format_version: u8,
offset: u64,
de: &'a D,
) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
where
R: Repo + 'a,
D: Decoder<Record = T>,
D::Error: From<error::Traversal>,
T: 'a,
{
commits_from(repo, max_log_format_version, offset)
.map(|commits| transactions_from_internal(commits.with_log_format_version(), offset, de))
}

pub fn fold_transactions_from<R, D>(repo: R, max_log_format_version: u8, offset: u64, de: D) -> Result<(), D::Error>
where
R: Repo,
D: Decoder,
D::Error: From<error::Traversal> + From<io::Error>,
{
let commits = commits_from(repo, max_log_format_version, offset)?;
fold_transactions_internal(commits.with_log_format_version(), de)
}

fn transactions_from_internal<'a, R, D, T>(
commits: CommitsWithVersion<R>,
offset: u64,
de: &'a D,
) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
where
R: Repo + 'a,
D: Decoder<Record = T>,
D::Error: From<error::Traversal>,
T: 'a,
{
commits
.map(|x| x.map_err(Into::into))
.map_ok(move |(version, commit)| commit.into_transactions(version, de))
.flatten_ok()
.flatten_ok()
.skip_while(move |x| x.as_ref().map(|tx| tx.offset < offset).unwrap_or(false))
}

fn fold_transactions_internal<R, D>(mut commits: CommitsWithVersion<R>, de: D) -> Result<(), D::Error>
where
R: Repo,
D: Decoder,
D::Error: From<error::Traversal>,
{
while let Some(commit) = commits.next() {
let (version, commit) = match commit {
Ok(version_and_commit) => version_and_commit,
Err(e) => {
// Ignore it if the very last commit in the log is broken.
// The next `append` will fix the log, but the `decoder`
// has no way to tell whether we're at the end or not.
// This is unlike the consumer of an iterator, which can
// perform below check itself.
if commits.next().is_none() {
return Ok(());
}

return Err(e.into());
}
};
trace!("commit {} n={} version={}", commit.min_tx_offset, commit.n, version);

let records = &mut commit.records.as_slice();
for n in 0..commit.n {
let tx_offset = commit.min_tx_offset + n as u64;
de.decode_record(version, tx_offset, records)?;
}
}

Ok(())
}

pub struct Segments<R> {
repo: R,
offs: vec::IntoIter<u64>,
Expand Down Expand Up @@ -309,7 +379,7 @@ impl<R: Repo> Commits<R> {

/// Turn `self` into an iterator which pairs the log format version of the
/// current segment with the [`Commit`].
pub fn with_log_format_version(self) -> impl Iterator<Item = Result<(u8, Commit), error::Traversal>> {
pub fn with_log_format_version(self) -> CommitsWithVersion<R> {
CommitsWithVersion { inner: self }
}
}
Expand Down Expand Up @@ -377,7 +447,7 @@ impl<R: Repo> Iterator for Commits<R> {
}
}

struct CommitsWithVersion<R: Repo> {
pub struct CommitsWithVersion<R: Repo> {
inner: Commits<R>,
}

Expand Down
89 changes: 89 additions & 0 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ impl<T> Commitlog<T> {
/// Open the log at root directory `root` with [`Options`].
///
/// The root directory must already exist.
///
/// Note that opening a commitlog involves I/O: some consistency checks are
/// performed, and the next writing position is determined.
///
/// This is only necessary when opening the commitlog for writing. See the
/// free-standing functions in this module for how to traverse a read-only
/// commitlog.
pub fn open(root: impl Into<PathBuf>, opts: Options) -> io::Result<Self> {
let inner = commitlog::Generic::open(repo::Fs::new(root), opts)?;

Expand Down Expand Up @@ -390,3 +397,85 @@ impl<T: Send + Sync + 'static> Commitlog<T> {
rx
}
}

/// Obtain an iterator which traverses the commitlog located at the `root`
/// directory from the start, yielding [`Commit`]s.
///
/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
/// See [`Commitlog::commits`] for more information.
pub fn commits(root: impl Into<PathBuf>) -> io::Result<impl Iterator<Item = Result<Commit, error::Traversal>>> {
commits_from(root, 0)
}

/// Obtain an iterator which traverses the commitlog located at the `root`
/// directory starting from `offset` and yielding [`Commit`]s.
///
/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
/// See [`Commitlog::commits_from`] for more information.
pub fn commits_from(
root: impl Into<PathBuf>,
offset: u64,
) -> io::Result<impl Iterator<Item = Result<Commit, error::Traversal>>> {
commitlog::commits_from(repo::Fs::new(root), DEFAULT_LOG_FORMAT_VERSION, offset)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the only difference between these and the commitlog:: variants the abstraction over the repo?

Would it be better to just have the ones in commitlog for this and let the caller call repo::Fs::new manually?

(In my case, a custom TarRepo is used to consume the logs without extracting them from the *.tar archive first)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of what you’re using isn’t exported from the crate. We can change that, but I’ll probably be molested to add docstrings everywhere 😿


/// Obtain an iterator which traverses the commitlog located at the `root`
/// directory from the start, yielding [`Transaction`]s.
///
/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
/// See [`Commitlog::transactions`] for more information.
pub fn transactions<'a, D, T>(
root: impl Into<PathBuf>,
de: &'a D,
) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
where
D: Decoder<Record = T>,
D::Error: From<error::Traversal>,
T: 'a,
{
transactions_from(root, 0, de)
}

/// Obtain an iterator which traverses the commitlog located at the `root`
/// directory starting from `offset` and yielding [`Transaction`]s.
///
/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
/// See [`Commitlog::transactions_from`] for more information.
pub fn transactions_from<'a, D, T>(
root: impl Into<PathBuf>,
offset: u64,
de: &'a D,
) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
where
D: Decoder<Record = T>,
D::Error: From<error::Traversal>,
T: 'a,
{
commitlog::transactions_from(repo::Fs::new(root), DEFAULT_LOG_FORMAT_VERSION, offset, de)
}

/// Traverse the commitlog located at the `root` directory from the start and
/// "fold" its transactions into the provided [`Decoder`].
///
/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
/// See [`Commitlog::fold_transactions`] for more information.
pub fn fold_transactions<D>(root: impl Into<PathBuf>, de: D) -> Result<(), D::Error>
where
D: Decoder,
D::Error: From<error::Traversal> + From<io::Error>,
{
fold_transactions_from(root, 0, de)
}

/// Traverse the commitlog located at the `root` directory starting from `offset`
/// and "fold" its transactions into the provided [`Decoder`].
///
/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
/// See [`Commitlog::fold_transactions_from`] for more information.
pub fn fold_transactions_from<D>(root: impl Into<PathBuf>, offset: u64, de: D) -> Result<(), D::Error>
where
D: Decoder,
D::Error: From<error::Traversal> + From<io::Error>,
{
commitlog::fold_transactions_from(repo::Fs::new(root), DEFAULT_LOG_FORMAT_VERSION, offset, de)
}
Loading