Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rework prune rewrite with iterators #3568

Merged
merged 2 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ extern crate log;
use failure;
#[macro_use]
extern crate failure_derive;
#[macro_use]
extern crate grin_core as core;
extern crate grin_util as util;

Expand Down
30 changes: 15 additions & 15 deletions store/src/pmmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,29 +354,29 @@ impl<T: PMMRable> PMMRBackend<T> {

// 1. Save compact copy of the hash file, skipping removed data.
{
let pos_to_rm = map_vec!(pos_to_rm, |pos| {
let shift = self.prune_list.get_shift(pos.into());
pos as u64 - shift
let pos_to_rm = pos_to_rm.iter().map(|x| x as u64).map(|pos| {
let shift = self.prune_list.get_shift(pos);
pos - shift
});

self.hash_file.save_prune(&pos_to_rm)?;
self.hash_file.write_tmp_pruned(pos_to_rm)?;
self.hash_file.replace_with_tmp()?;
}

// 2. Save compact copy of the data file, skipping removed leaves.
{
let leaf_pos_to_rm = pos_to_rm
let pos_to_rm = pos_to_rm
.iter()
.filter(|&x| pmmr::is_leaf(x.into()))
.map(|x| x as u64)
.collect::<Vec<_>>();

let pos_to_rm = map_vec!(leaf_pos_to_rm, |&pos| {
let flat_pos = pmmr::n_leaves(pos);
let shift = self.prune_list.get_leaf_shift(pos);
flat_pos - shift
});

self.data_file.save_prune(&pos_to_rm)?;
.filter(|&x| pmmr::is_leaf(x))
.map(|pos| {
let flat_pos = pmmr::n_leaves(pos);
let shift = self.prune_list.get_leaf_shift(pos);
flat_pos - shift
});

self.data_file.write_tmp_pruned(pos_to_rm)?;
self.data_file.replace_with_tmp()?;
}

// 3. Update the prune list and write to disk.
Expand Down
54 changes: 37 additions & 17 deletions store/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,18 @@ where
}

/// Write the file out to disk, pruning removed elements.
pub fn save_prune(&mut self, prune_pos: &[u64]) -> io::Result<()> {
pub fn write_tmp_pruned<I>(&self, prune_pos: I) -> io::Result<()>
where
I: IntoIterator<Item = u64>,
{
// Need to convert from 1-index to 0-index (don't ask).
let prune_idx: Vec<_> = prune_pos.iter().map(|x| x - 1).collect();
self.file.save_prune(prune_idx.as_slice())
let prune_idx = prune_pos.into_iter().map(|x| x - 1);
self.file.write_tmp_pruned(prune_idx)
}

/// Replace underlying file with the file at our tmp path.
pub fn replace_with_tmp(&mut self) -> io::Result<()> {
self.file.replace_with_tmp()
}
}

Expand Down Expand Up @@ -485,39 +493,52 @@ where
Ok(file)
}

fn tmp_file_path(&self) -> PathBuf {
self.path.with_extension("tmp")
}

/// Saves a copy of the current file content, skipping data at the provided
/// prune positions. prune_pos must be ordered.
pub fn save_prune(&mut self, prune_pos: &[u64]) -> io::Result<()> {
let tmp_path = self.path.with_extension("tmp");
fn write_tmp_pruned<I>(&self, prune_pos: I) -> io::Result<()>
where
I: IntoIterator<Item = u64>,
{
let mut prune_pos = prune_pos.into_iter().peekable();

// Scope the reader and writer to within the block so we can safely replace files later on.
{
let reader = File::open(&self.path)?;
let mut buf_reader = BufReader::new(reader);
let mut streaming_reader = StreamingReader::new(&mut buf_reader, self.version);

let mut buf_writer = BufWriter::new(File::create(&tmp_path)?);
let tmp_path = self.tmp_file_path();
let mut buf_writer = BufWriter::new(File::create(tmp_path)?);
let mut bin_writer = BinWriter::new(&mut buf_writer, self.version);

let mut current_pos = 0;
let mut prune_pos = prune_pos;
while let Ok(elmt) = T::read(&mut streaming_reader) {
if prune_pos.contains(&current_pos) {
// Pruned pos, moving on.
prune_pos = &prune_pos[1..];
} else {
// Not pruned, write to file.
elmt.write(&mut bin_writer)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
if let Some(pos) = prune_pos.peek() {
if *pos == current_pos {
// Pruned pos, moving on.
prune_pos.next();
current_pos += 1;
continue;
}
}
// Not pruned, write to file.
elmt.write(&mut bin_writer)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
current_pos += 1;
}
buf_writer.flush()?;
}

Ok(())
}

fn replace_with_tmp(&mut self) -> io::Result<()> {
// Replace the underlying file -
// pmmr_data.tmp -> pmmr_data.bin
self.replace(&tmp_path)?;
self.replace(self.tmp_file_path())?;

// Now rebuild our size file to reflect the pruned data file.
// This will replace the underlying file internally.
Expand Down Expand Up @@ -575,7 +596,6 @@ where
}

/// Replace the underlying file with another file, deleting the original.
/// Takes an optional size_file path in addition to path.
fn replace<P>(&mut self, with: P) -> io::Result<()>
where
P: AsRef<Path> + Debug,
Expand Down