Skip to content

Commit

Permalink
cleanup how we deal with fixed size vs variable size data files
Browse files Browse the repository at this point in the history
introduce a SizeInfo enum (to replace two options)
  • Loading branch information
antiochp committed Apr 17, 2019
1 parent b403ccb commit a462ac5
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 70 deletions.
25 changes: 13 additions & 12 deletions store/src/pmmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::core::core::BlockHeader;
use crate::core::ser::{FixedLength, PMMRable};
use crate::leaf_set::LeafSet;
use crate::prune_list::PruneList;
use crate::types::DataFile;
use crate::types::{AppendOnlyFile, DataFile, SizeEntry, SizeInfo};
use croaring::Bitmap;
use std::path::{Path, PathBuf};

Expand Down Expand Up @@ -206,21 +206,22 @@ impl<T: PMMRable> PMMRBackend<T> {
) -> io::Result<PMMRBackend<T>> {
let data_dir = data_dir.as_ref();

// We either have a fixed size *or* a path to a file for tracking sizes.
let (elmt_size, size_path) = if fixed_size {
(Some(T::E::LEN as u16), None)
// Are we dealing with "fixed size" data elements or "variable size" data elements
// maintained in an associated size file?
let size_info = if fixed_size {
SizeInfo::FixedSize(T::E::LEN as u16)
} else {
(None, Some(data_dir.join(PMMR_SIZE_FILE)))
SizeInfo::VariableSize(Box::new(AppendOnlyFile::open(
data_dir.join(PMMR_SIZE_FILE),
SizeInfo::FixedSize(SizeEntry::LEN as u16),
)?))
};

// Hash file is always "fixed size" and we use 32 bytes per hash.
let hash_file =
DataFile::open(&data_dir.join(PMMR_HASH_FILE), None, Some(Hash::LEN as u16))?;
let data_file = DataFile::open(
&data_dir.join(PMMR_DATA_FILE),
size_path.as_ref(),
elmt_size,
)?;
let hash_size_info = SizeInfo::FixedSize(Hash::LEN as u16);

let hash_file = DataFile::open(&data_dir.join(PMMR_HASH_FILE), hash_size_info)?;
let data_file = DataFile::open(&data_dir.join(PMMR_DATA_FILE), size_info)?;

let leaf_set_path = data_dir.join(PMMR_LEAF_FILE);

Expand Down
98 changes: 40 additions & 58 deletions store/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,15 @@ impl Writeable for SizeEntry {
}
}

/// Data file (MMR) wrapper around an append only file.
/// Are we dealing with "fixed size" data or "variable size" data in a data file?
pub enum SizeInfo {
/// Fixed size data.
FixedSize(u16),
/// Variable size data.
VariableSize(Box<AppendOnlyFile<SizeEntry>>),
}

/// Data file (MMR) wrapper around an append-only file.
pub struct DataFile<T> {
file: AppendOnlyFile<T>,
}
Expand All @@ -69,21 +77,13 @@ where
T: Readable + Writeable + Debug,
{
/// Open (or create) a file at the provided path on disk.
pub fn open<P>(path: P, size_path: Option<P>, elmt_size: Option<u16>) -> io::Result<DataFile<T>>
pub fn open<P>(path: P, size_info: SizeInfo) -> io::Result<DataFile<T>>
where
P: AsRef<Path> + Debug,
{
let size_file = if let Some(size_path) = size_path {
Some(AppendOnlyFile::open(
size_path,
None,
Some(SizeEntry::LEN as u16),
)?)
} else {
None
};
let file = AppendOnlyFile::open(path, size_file, elmt_size)?;
Ok(DataFile { file })
Ok(DataFile {
file: AppendOnlyFile::open(path, size_info)?,
})
}

/// Append an element to the file.
Expand Down Expand Up @@ -168,11 +168,7 @@ where
pub struct AppendOnlyFile<T> {
path: PathBuf,
file: Option<File>,

// We either have a fixed_size or an associated "size" file.
elmt_size: Option<u16>,
size_file: Option<Box<AppendOnlyFile<SizeEntry>>>,

size_info: SizeInfo,
mmap: Option<memmap::Mmap>,

// Buffer of unsync'd bytes. These bytes will be appended to the file when flushed.
Expand All @@ -187,20 +183,15 @@ where
T: Debug + Readable + Writeable,
{
/// Open a file (existing or not) as append-only, backed by a mmap.
pub fn open<P>(
path: P,
size_file: Option<AppendOnlyFile<SizeEntry>>,
elmt_size: Option<u16>,
) -> io::Result<AppendOnlyFile<T>>
pub fn open<P>(path: P, size_info: SizeInfo) -> io::Result<AppendOnlyFile<T>>
where
P: AsRef<Path> + Debug,
{
let mut aof = AppendOnlyFile {
file: None,
path: path.as_ref().to_path_buf(),
elmt_size,
size_info,
mmap: None,
size_file: size_file.map(|x| Box::new(x)),
buffer: vec![],
buffer_start_pos: 0,
buffer_start_pos_bak: 0,
Expand All @@ -212,7 +203,7 @@ where
// This will occur during "fast sync" as we do not sync the size_file
// and must build it locally.
// And we can *only* do this after init() the data file (so we know sizes).
if let Some(ref mut size_file) = &mut aof.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = &mut aof.size_info {
if size_file.size()? == 0 {
aof.rebuild_size_file()?;

Expand All @@ -228,7 +219,7 @@ where
/// (Re)init an underlying file and its associated memmap.
/// Taking care to initialize the mmap_offset_cache for each element.
pub fn init(&mut self) -> io::Result<()> {
if let Some(ref mut size_file) = self.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = self.size_info {
size_file.init()?;
}

Expand All @@ -252,22 +243,18 @@ where
}

fn size_in_elmts(&self) -> io::Result<u64> {
if let Some(elmt_size) = self.elmt_size {
Ok(self.size()? / elmt_size as u64)
} else if let Some(ref size_file) = &self.size_file {
size_file.size_in_elmts()
} else {
Ok(0)
match self.size_info {
SizeInfo::FixedSize(elmt_size) => Ok(self.size()? / elmt_size as u64),
SizeInfo::VariableSize(ref size_file) => size_file.size_in_elmts(),
}
}

fn size_unsync_in_elmts(&self) -> io::Result<u64> {
if let Some(elmt_size) = self.elmt_size {
Ok(self.buffer_start_pos + (self.buffer.len() as u64 / elmt_size as u64))
} else if let Some(ref size_file) = &self.size_file {
size_file.size_unsync_in_elmts()
} else {
Err(io::Error::new(io::ErrorKind::Other, "size file missing"))
match self.size_info {
SizeInfo::FixedSize(elmt_size) => {
Ok(self.buffer_start_pos + (self.buffer.len() as u64 / elmt_size as u64))
}
SizeInfo::VariableSize(ref size_file) => size_file.size_unsync_in_elmts(),
}
}

Expand All @@ -281,7 +268,7 @@ where
/// Append data to the file. Until the append-only file is synced, data is
/// only written to memory.
pub fn append(&mut self, bytes: &mut [u8]) -> io::Result<()> {
if let Some(ref mut size_file) = &mut self.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
let next_pos = size_file.size_unsync_in_elmts()?;
let offset = if next_pos == 0 {
0
Expand All @@ -303,26 +290,21 @@ where
// If pos is in the buffer then caller needs to remember to account for this
// when reading from the buffer.
fn offset_and_size(&self, pos: u64) -> io::Result<(u64, u16)> {
if let Some(size) = self.elmt_size {
// Calculating offset and size is simple if we have fixed size elements.
Ok((pos * size as u64, size))
} else if let Some(ref size_file) = &self.size_file {
// Otherwise we need to calculate offset and size from entries in the size_file.
let entry = size_file.read_as_elmt(pos)?;
Ok((entry.offset, entry.size))
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"variable size, missing size file",
))
match self.size_info {
SizeInfo::FixedSize(elmt_size) => Ok((pos * elmt_size as u64, elmt_size)),
SizeInfo::VariableSize(ref size_file) => {
// Otherwise we need to calculate offset and size from entries in the size_file.
let entry = size_file.read_as_elmt(pos)?;
Ok((entry.offset, entry.size))
}
}
}

/// Rewinds the data file back to a previous position.
/// We simply "rewind" the buffer_start_pos to the specified position.
/// Note: We do not currently support rewinding within the buffer itself.
pub fn rewind(&mut self, pos: u64) {
if let Some(ref mut size_file) = &mut self.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
size_file.rewind(pos);
}

Expand All @@ -335,7 +317,7 @@ where
/// Syncs all writes (fsync), reallocating the memory map to make the newly
/// written data accessible.
pub fn flush(&mut self) -> io::Result<()> {
if let Some(ref mut size_file) = &mut self.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
// Flush the associated size_file if we have one.
size_file.flush()?
}
Expand Down Expand Up @@ -398,7 +380,7 @@ where
}

// Discarding the data file will discard the associated size file if we have one.
if let Some(ref mut size_file) = &mut self.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
size_file.discard();
}

Expand Down Expand Up @@ -492,7 +474,7 @@ where

// Now rebuild our size file to reflect the pruned data file.
// This will replace the underlying file internally.
if let Some(_) = &self.size_file {
if let SizeInfo::VariableSize(_) = &self.size_info {
self.rebuild_size_file()?;
}

Expand All @@ -503,7 +485,7 @@ where
}

fn rebuild_size_file(&mut self) -> io::Result<()> {
if let Some(ref mut size_file) = &mut self.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
// Note: Reading from data file and writing sizes to the associated (tmp) size_file.
let tmp_path = size_file.path.with_extension("tmp");

Expand Down Expand Up @@ -563,7 +545,7 @@ where
self.file = None;

// Remember to release the size_file as well if we have one.
if let Some(ref mut size_file) = self.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
size_file.release();
}
}
Expand Down

0 comments on commit a462ac5

Please sign in to comment.