Skip to content

Commit

Permalink
replace file system with file builder (#96)
Browse files Browse the repository at this point in the history
Replace file system (#91) with an Allocator-like type `FileBuilder`.

File builder is better, it's strongly typed with our own file writer/reader, so that 1) we don't need to propagate `Send` and `Sync` trait bound outside of this library, 2) user writers or readers are promised to use our writer/reader as a backend.

There still are some reasonable requirements on user types, such as no buffering and no length altering, which are documented in trait definition.

Signed-off-by: tabokie <[email protected]>
  • Loading branch information
tabokie authored Sep 9, 2021
1 parent c2abe9f commit dc20bab
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 280 deletions.
4 changes: 2 additions & 2 deletions examples/append_compact_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
use raft_engine::{Config, LogBatch, MessageExt, RaftLogEngine, ReadableSize};
use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize};
use rand::thread_rng;
use rand_distr::{Distribution, Normal};

Expand All @@ -26,7 +26,7 @@ fn main() {
config.dir = "append-compact-purge-data".to_owned();
config.purge_threshold = ReadableSize::gb(2);
config.batch_compression_threshold = ReadableSize::kb(0);
let engine = RaftLogEngine::<MessageExtTyped>::open(config, None).expect("Open raft engine");
let engine = Engine::<MessageExtTyped>::open(config).expect("Open raft engine");

let compact_offset = 32; // In src/purge.rs, it's the limit for rewrite.

Expand Down
94 changes: 50 additions & 44 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use protobuf::{parse_from_bytes, Message};

use crate::config::Config;
use crate::event_listener::EventListener;
use crate::file_builder::*;
use crate::file_pipe_log::FilePipeLog;
use crate::file_system::FileSystem;
use crate::log_batch::{Command, LogBatch, LogItemContent, LogItemDrain, MessageExt, OpType};
use crate::memtable::{EntryIndex, MemTable};
use crate::metrics::*;
Expand Down Expand Up @@ -129,39 +129,56 @@ impl MemTableAccessor {
}
}

#[derive(Clone)]
pub struct Engine<M, P>
pub struct Engine<M, B = DefaultFileBuilder, P = FilePipeLog<B>>
where
M: MessageExt + Clone,
B: FileBuilder,
P: PipeLog,
{
cfg: Arc<Config>,
memtables: MemTableAccessor,
pipe_log: P,
global_stats: Arc<GlobalStats>,
pipe_log: Arc<P>,
purge_manager: PurgeManager<M, P>,

listeners: Vec<Arc<dyn EventListener>>,

_phantom: PhantomData<M>,
_phantom: PhantomData<B>,
}

impl<M> Engine<M, FilePipeLog>
impl<M> Engine<M, DefaultFileBuilder, FilePipeLog<DefaultFileBuilder>>
where
M: MessageExt + Clone,
{
pub fn open(
cfg: Config,
file_system: Option<Arc<dyn FileSystem>>,
) -> Result<Engine<M, FilePipeLog>> {
Self::open_with_listeners(cfg, file_system, vec![])
) -> Result<Engine<M, DefaultFileBuilder, FilePipeLog<DefaultFileBuilder>>> {
Self::open_with_listeners(cfg, vec![])
}

pub fn open_with_listeners(
cfg: Config,
file_system: Option<Arc<dyn FileSystem>>,
listeners: Vec<Arc<dyn EventListener>>,
) -> Result<Engine<M, DefaultFileBuilder, FilePipeLog<DefaultFileBuilder>>> {
Self::open_with(cfg, Arc::new(DefaultFileBuilder {}), listeners)
}
}

impl<M, B> Engine<M, B, FilePipeLog<B>>
where
M: MessageExt + Clone,
B: FileBuilder,
{
pub fn open_with_file_builder(
cfg: Config,
file_builder: Arc<B>,
) -> Result<Engine<M, B, FilePipeLog<B>>> {
Self::open_with(cfg, file_builder, vec![])
}

pub fn open_with(
cfg: Config,
file_builder: Arc<B>,
mut listeners: Vec<Arc<dyn EventListener>>,
) -> Result<Engine<M, FilePipeLog>> {
) -> Result<Engine<M, B, FilePipeLog<B>>> {
listeners.push(Arc::new(PurgeHook::new()) as Arc<dyn EventListener>);

let global_stats = Arc::new(GlobalStats::default());
Expand All @@ -176,9 +193,9 @@ where
}));

let start = Instant::now();
let pipe_log = FilePipeLog::open(
let pipe_log = Arc::new(FilePipeLog::open(
&cfg,
file_system,
file_builder,
listeners.clone(),
|queue, file_id, mut item_batch| match queue {
LogQueue::Rewrite => {
Expand All @@ -188,7 +205,7 @@ where
Self::apply_to_memtable(&memtables, item_batch.drain(), queue, file_id)
}
},
)?;
)?);
info!("Recovering raft logs takes {:?}", start.elapsed());

let ids = memtables.cleaned_region_ids();
Expand All @@ -206,28 +223,27 @@ where

let cfg = Arc::new(cfg);
let purge_manager = PurgeManager::new(
cfg.clone(),
cfg,
memtables.clone(),
pipe_log.clone(),
global_stats.clone(),
global_stats,
listeners.clone(),
);

Ok(Self {
cfg,
memtables,
pipe_log,
global_stats,
purge_manager,
listeners,
_phantom: PhantomData,
})
}
}

impl<M, P> Engine<M, P>
impl<M, B, P> Engine<M, B, P>
where
M: MessageExt + Clone,
B: FileBuilder,
P: PipeLog,
{
fn apply_to_memtable(
Expand Down Expand Up @@ -354,7 +370,7 @@ where
let mut entry = None;
if let Some(memtable) = self.memtables.get(region_id) {
if let Some(idx) = memtable.read().get_entry(log_idx) {
entry = Some(read_entry_from_file::<M, _>(&self.pipe_log, &idx)?);
entry = Some(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), &idx)?);
}
}
ENGINE_READ_ENTRY_TIME_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64());
Expand Down Expand Up @@ -384,7 +400,7 @@ where
.read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
for i in ents_idx {
vec.push(read_entry_from_file::<M, _>(&self.pipe_log, &i)?);
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), &i)?);
}
ENGINE_READ_ENTRY_TIME_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64());
return Ok(vec.len() - old_len);
Expand Down Expand Up @@ -454,17 +470,7 @@ mod tests {
use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;

impl<M, P> Engine<M, P>
where
M: MessageExt + Clone,
P: PipeLog,
{
pub fn purge_manager(&self) -> &PurgeManager<M, P> {
&self.purge_manager
}
}

type RaftLogEngine = Engine<Entry, FilePipeLog>;
type RaftLogEngine = Engine<Entry>;
impl RaftLogEngine {
fn append(&self, raft_group_id: u64, entries: &Vec<Entry>) -> Result<usize> {
let mut batch = LogBatch::default();
Expand Down Expand Up @@ -510,7 +516,7 @@ mod tests {
cfg.dir = dir.path().to_str().unwrap().to_owned();
cfg.target_file_size = ReadableSize::kb(5);
cfg.purge_threshold = ReadableSize::kb(80);
let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();

append_log(&engine, 1, &Entry::new());
assert!(engine.memtables.get(1).is_some());
Expand All @@ -534,7 +540,7 @@ mod tests {
let mut cfg = Config::default();
cfg.dir = dir.path().to_str().unwrap().to_owned();

let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();
let mut entry = Entry::new();
entry.set_data(vec![b'x'; entry_size].into());
for i in 10..20 {
Expand All @@ -555,7 +561,7 @@ mod tests {
drop(engine);

// Recover the engine.
let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();
for i in 10..20 {
entry.set_index(i + 1);
assert_eq!(engine.get_entry(i, i + 1).unwrap(), Some(entry.clone()));
Expand All @@ -579,7 +585,7 @@ mod tests {
cfg.target_file_size = ReadableSize::kb(5);
cfg.purge_threshold = ReadableSize::kb(150);

let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();
let mut entry = Entry::new();
entry.set_data(vec![b'x'; 1024].into());
for i in 0..100 {
Expand Down Expand Up @@ -645,7 +651,7 @@ mod tests {
cfg.dir = dir.path().to_str().unwrap().to_owned();
cfg.target_file_size = ReadableSize::kb(5);
cfg.purge_threshold = ReadableSize::kb(80);
let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();

// Put 100 entries into 10 regions.
let mut entry = Entry::new();
Expand Down Expand Up @@ -682,7 +688,7 @@ mod tests {

// Recover with rewrite queue and append queue.
drop(engine);
let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();
for i in 1..=10 {
for j in 1..=10 {
let e = engine.get_entry(j, i).unwrap().unwrap();
Expand Down Expand Up @@ -727,7 +733,7 @@ mod tests {
cfg.dir = dir.path().to_str().unwrap().to_owned();
cfg.target_file_size = ReadableSize::kb(128);
cfg.purge_threshold = ReadableSize::kb(512);
let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();

let mut entry = Entry::new();
entry.set_data(vec![b'x'; 1024].into());
Expand Down Expand Up @@ -801,7 +807,7 @@ mod tests {

// After the engine recovers, the removed raft group shouldn't appear again.
drop(engine);
let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();
assert!(engine.memtables.get(1).is_none());
}

Expand Down Expand Up @@ -893,7 +899,7 @@ mod tests {

let hook = Arc::new(Hook::default());
let engine =
RaftLogEngine::open_with_listeners(cfg.clone(), None, vec![hook.clone()]).unwrap();
Arc::new(RaftLogEngine::open_with_listeners(cfg.clone(), vec![hook.clone()]).unwrap());
assert_eq!(hook.0[&LogQueue::Append].files(), 1);
assert_eq!(hook.0[&LogQueue::Rewrite].files(), 1);

Expand Down Expand Up @@ -982,6 +988,6 @@ mod tests {

// Drop and then recover.
drop(engine);
RaftLogEngine::open(cfg, None).unwrap();
RaftLogEngine::open(cfg).unwrap();
}
}
62 changes: 62 additions & 0 deletions src/file_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.

use std::io::{Read, Seek, Write};
use std::path::Path;

/// A `FileBuilder` generates file readers or writers that are composed upon
/// existing ones.
pub trait FileBuilder: Send + Sync {
/// Types of outcome file reader/writer. They must not alter the length of
/// bytes stream, nor buffer bytes between operations.
type Reader<R: Seek + Read>: Seek + Read;
type Writer<W: Seek + Write>: Seek + Write;

fn build_reader<R>(
&self,
path: &Path,
reader: R,
) -> Result<Self::Reader<R>, Box<dyn std::error::Error>>
where
R: Seek + Read;

fn build_writer<W>(
&self,
path: &Path,
writer: W,
create: bool,
) -> Result<Self::Writer<W>, Box<dyn std::error::Error>>
where
W: Seek + Write;
}

/// `DefaultFileBuilder` is a `FileBuilder` that builds out the original
/// `Reader`/`Writer` as it is.
pub struct DefaultFileBuilder {}

impl FileBuilder for DefaultFileBuilder {
type Reader<R: Seek + Read> = R;
type Writer<W: Seek + Write> = W;

fn build_reader<R>(
&self,
_path: &Path,
reader: R,
) -> Result<Self::Reader<R>, Box<dyn std::error::Error>>
where
R: Seek + Read,
{
Ok(reader)
}

fn build_writer<W>(
&self,
_path: &Path,
writer: W,
_create: bool,
) -> Result<Self::Writer<W>, Box<dyn std::error::Error>>
where
W: Seek + Write,
{
Ok(writer)
}
}
Loading

0 comments on commit dc20bab

Please sign in to comment.