Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup how we deal with fixed size vs variable size data files #2757

Merged
merged 1 commit into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions store/src/pmmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::core::core::BlockHeader;
use crate::core::ser::{FixedLength, PMMRable};
use crate::leaf_set::LeafSet;
use crate::prune_list::PruneList;
use crate::types::DataFile;
use crate::types::{AppendOnlyFile, DataFile, SizeEntry, SizeInfo};
use croaring::Bitmap;
use std::path::{Path, PathBuf};

Expand Down Expand Up @@ -206,21 +206,22 @@ impl<T: PMMRable> PMMRBackend<T> {
) -> io::Result<PMMRBackend<T>> {
let data_dir = data_dir.as_ref();

// We either have a fixed size *or* a path to a file for tracking sizes.
let (elmt_size, size_path) = if fixed_size {
(Some(T::E::LEN as u16), None)
// Are we dealing with "fixed size" data elements or "variable size" data elements
// maintained in an associated size file?
let size_info = if fixed_size {
SizeInfo::FixedSize(T::E::LEN as u16)
} else {
(None, Some(data_dir.join(PMMR_SIZE_FILE)))
SizeInfo::VariableSize(Box::new(AppendOnlyFile::open(
data_dir.join(PMMR_SIZE_FILE),
SizeInfo::FixedSize(SizeEntry::LEN as u16),
)?))
};

// Hash file is always "fixed size" and we use 32 bytes per hash.
let hash_file =
DataFile::open(&data_dir.join(PMMR_HASH_FILE), None, Some(Hash::LEN as u16))?;
let data_file = DataFile::open(
&data_dir.join(PMMR_DATA_FILE),
size_path.as_ref(),
elmt_size,
)?;
let hash_size_info = SizeInfo::FixedSize(Hash::LEN as u16);

let hash_file = DataFile::open(&data_dir.join(PMMR_HASH_FILE), hash_size_info)?;
let data_file = DataFile::open(&data_dir.join(PMMR_DATA_FILE), size_info)?;

let leaf_set_path = data_dir.join(PMMR_LEAF_FILE);

Expand Down
98 changes: 40 additions & 58 deletions store/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,15 @@ impl Writeable for SizeEntry {
}
}

/// Data file (MMR) wrapper around an append only file.
/// Are we dealing with "fixed size" data or "variable size" data in a data file?
pub enum SizeInfo {
/// Fixed size data.
FixedSize(u16),
/// Variable size data.
VariableSize(Box<AppendOnlyFile<SizeEntry>>),
}

/// Data file (MMR) wrapper around an append-only file.
pub struct DataFile<T> {
file: AppendOnlyFile<T>,
}
Expand All @@ -69,21 +77,13 @@ where
T: Readable + Writeable + Debug,
{
/// Open (or create) a file at the provided path on disk.
pub fn open<P>(path: P, size_path: Option<P>, elmt_size: Option<u16>) -> io::Result<DataFile<T>>
pub fn open<P>(path: P, size_info: SizeInfo) -> io::Result<DataFile<T>>
where
P: AsRef<Path> + Debug,
{
let size_file = if let Some(size_path) = size_path {
Some(AppendOnlyFile::open(
size_path,
None,
Some(SizeEntry::LEN as u16),
)?)
} else {
None
};
let file = AppendOnlyFile::open(path, size_file, elmt_size)?;
Ok(DataFile { file })
Ok(DataFile {
file: AppendOnlyFile::open(path, size_info)?,
})
}

/// Append an element to the file.
Expand Down Expand Up @@ -168,11 +168,7 @@ where
pub struct AppendOnlyFile<T> {
path: PathBuf,
file: Option<File>,

// We either have a fixed_size or an associated "size" file.
elmt_size: Option<u16>,
size_file: Option<Box<AppendOnlyFile<SizeEntry>>>,

size_info: SizeInfo,
mmap: Option<memmap::Mmap>,

// Buffer of unsync'd bytes. These bytes will be appended to the file when flushed.
Expand All @@ -187,20 +183,15 @@ where
T: Debug + Readable + Writeable,
{
/// Open a file (existing or not) as append-only, backed by a mmap.
pub fn open<P>(
path: P,
size_file: Option<AppendOnlyFile<SizeEntry>>,
elmt_size: Option<u16>,
) -> io::Result<AppendOnlyFile<T>>
pub fn open<P>(path: P, size_info: SizeInfo) -> io::Result<AppendOnlyFile<T>>
where
P: AsRef<Path> + Debug,
{
let mut aof = AppendOnlyFile {
file: None,
path: path.as_ref().to_path_buf(),
elmt_size,
size_info,
mmap: None,
size_file: size_file.map(|x| Box::new(x)),
buffer: vec![],
buffer_start_pos: 0,
buffer_start_pos_bak: 0,
Expand All @@ -212,7 +203,7 @@ where
// This will occur during "fast sync" as we do not sync the size_file
// and must build it locally.
// And we can *only* do this after init() the data file (so we know sizes).
if let Some(ref mut size_file) = &mut aof.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = &mut aof.size_info {
if size_file.size()? == 0 {
aof.rebuild_size_file()?;

Expand All @@ -228,7 +219,7 @@ where
/// (Re)init an underlying file and its associated memmap.
/// Taking care to initialize the mmap_offset_cache for each element.
pub fn init(&mut self) -> io::Result<()> {
if let Some(ref mut size_file) = self.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = self.size_info {
size_file.init()?;
}

Expand All @@ -252,22 +243,18 @@ where
}

fn size_in_elmts(&self) -> io::Result<u64> {
if let Some(elmt_size) = self.elmt_size {
Ok(self.size()? / elmt_size as u64)
} else if let Some(ref size_file) = &self.size_file {
size_file.size_in_elmts()
} else {
Ok(0)
match self.size_info {
SizeInfo::FixedSize(elmt_size) => Ok(self.size()? / elmt_size as u64),
SizeInfo::VariableSize(ref size_file) => size_file.size_in_elmts(),
}
}

fn size_unsync_in_elmts(&self) -> io::Result<u64> {
if let Some(elmt_size) = self.elmt_size {
Ok(self.buffer_start_pos + (self.buffer.len() as u64 / elmt_size as u64))
} else if let Some(ref size_file) = &self.size_file {
size_file.size_unsync_in_elmts()
} else {
Err(io::Error::new(io::ErrorKind::Other, "size file missing"))
match self.size_info {
SizeInfo::FixedSize(elmt_size) => {
Ok(self.buffer_start_pos + (self.buffer.len() as u64 / elmt_size as u64))
}
SizeInfo::VariableSize(ref size_file) => size_file.size_unsync_in_elmts(),
}
}

Expand All @@ -281,7 +268,7 @@ where
/// Append data to the file. Until the append-only file is synced, data is
/// only written to memory.
pub fn append(&mut self, bytes: &mut [u8]) -> io::Result<()> {
if let Some(ref mut size_file) = &mut self.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
let next_pos = size_file.size_unsync_in_elmts()?;
let offset = if next_pos == 0 {
0
Expand All @@ -303,26 +290,21 @@ where
// If pos is in the buffer then caller needs to remember to account for this
// when reading from the buffer.
fn offset_and_size(&self, pos: u64) -> io::Result<(u64, u16)> {
if let Some(size) = self.elmt_size {
// Calculating offset and size is simple if we have fixed size elements.
Ok((pos * size as u64, size))
} else if let Some(ref size_file) = &self.size_file {
// Otherwise we need to calculate offset and size from entries in the size_file.
let entry = size_file.read_as_elmt(pos)?;
Ok((entry.offset, entry.size))
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"variable size, missing size file",
))
match self.size_info {
SizeInfo::FixedSize(elmt_size) => Ok((pos * elmt_size as u64, elmt_size)),
SizeInfo::VariableSize(ref size_file) => {
// Otherwise we need to calculate offset and size from entries in the size_file.
let entry = size_file.read_as_elmt(pos)?;
Ok((entry.offset, entry.size))
}
}
}

/// Rewinds the data file back to a previous position.
/// We simply "rewind" the buffer_start_pos to the specified position.
/// Note: We do not currently support rewinding within the buffer itself.
pub fn rewind(&mut self, pos: u64) {
if let Some(ref mut size_file) = &mut self.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
size_file.rewind(pos);
}

Expand All @@ -335,7 +317,7 @@ where
/// Syncs all writes (fsync), reallocating the memory map to make the newly
/// written data accessible.
pub fn flush(&mut self) -> io::Result<()> {
if let Some(ref mut size_file) = &mut self.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
// Flush the associated size_file if we have one.
size_file.flush()?
}
Expand Down Expand Up @@ -398,7 +380,7 @@ where
}

// Discarding the data file will discard the associated size file if we have one.
if let Some(ref mut size_file) = &mut self.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
size_file.discard();
}

Expand Down Expand Up @@ -492,7 +474,7 @@ where

// Now rebuild our size file to reflect the pruned data file.
// This will replace the underlying file internally.
if let Some(_) = &self.size_file {
if let SizeInfo::VariableSize(_) = &self.size_info {
self.rebuild_size_file()?;
}

Expand All @@ -503,7 +485,7 @@ where
}

fn rebuild_size_file(&mut self) -> io::Result<()> {
if let Some(ref mut size_file) = &mut self.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
// Note: Reading from data file and writing sizes to the associated (tmp) size_file.
let tmp_path = size_file.path.with_extension("tmp");

Expand Down Expand Up @@ -563,7 +545,7 @@ where
self.file = None;

// Remember to release the size_file as well if we have one.
if let Some(ref mut size_file) = self.size_file {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
size_file.release();
}
}
Expand Down