Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace file system with file builder #96

Merged
merged 3 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/append_compact_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
use raft_engine::{Config, LogBatch, MessageExt, RaftLogEngine, ReadableSize};
use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize};
use rand::thread_rng;
use rand_distr::{Distribution, Normal};

Expand All @@ -26,7 +26,7 @@ fn main() {
config.dir = "append-compact-purge-data".to_owned();
config.purge_threshold = ReadableSize::gb(2);
config.batch_compression_threshold = ReadableSize::kb(0);
let engine = RaftLogEngine::<MessageExtTyped>::open(config, None).expect("Open raft engine");
let engine = Engine::<MessageExtTyped>::open(config).expect("Open raft engine");

let compact_offset = 32; // In src/purge.rs, it's the limit for rewrite.

Expand Down
94 changes: 50 additions & 44 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use protobuf::{parse_from_bytes, Message};

use crate::config::Config;
use crate::event_listener::EventListener;
use crate::file_builder::*;
use crate::file_pipe_log::FilePipeLog;
use crate::file_system::FileSystem;
use crate::log_batch::{Command, LogBatch, LogItemContent, LogItemDrain, MessageExt, OpType};
use crate::memtable::{EntryIndex, MemTable};
use crate::metrics::*;
Expand Down Expand Up @@ -129,39 +129,56 @@ impl MemTableAccessor {
}
}

#[derive(Clone)]
pub struct Engine<M, P>
pub struct Engine<M, B = DefaultFileBuilder, P = FilePipeLog<B>>
where
M: MessageExt + Clone,
B: FileBuilder,
P: PipeLog,
{
cfg: Arc<Config>,
memtables: MemTableAccessor,
pipe_log: P,
global_stats: Arc<GlobalStats>,
pipe_log: Arc<P>,
purge_manager: PurgeManager<M, P>,

listeners: Vec<Arc<dyn EventListener>>,

_phantom: PhantomData<M>,
_phantom: PhantomData<B>,
}

impl<M> Engine<M, FilePipeLog>
impl<M> Engine<M, DefaultFileBuilder, FilePipeLog<DefaultFileBuilder>>
where
M: MessageExt + Clone,
{
pub fn open(
cfg: Config,
file_system: Option<Arc<dyn FileSystem>>,
) -> Result<Engine<M, FilePipeLog>> {
Self::open_with_listeners(cfg, file_system, vec![])
) -> Result<Engine<M, DefaultFileBuilder, FilePipeLog<DefaultFileBuilder>>> {
Self::open_with_listeners(cfg, vec![])
}

pub fn open_with_listeners(
cfg: Config,
file_system: Option<Arc<dyn FileSystem>>,
listeners: Vec<Arc<dyn EventListener>>,
) -> Result<Engine<M, DefaultFileBuilder, FilePipeLog<DefaultFileBuilder>>> {
Self::open_with(cfg, Arc::new(DefaultFileBuilder {}), listeners)
}
}

impl<M, B> Engine<M, B, FilePipeLog<B>>
where
M: MessageExt + Clone,
B: FileBuilder,
{
pub fn open_with_file_builder(
cfg: Config,
file_builder: Arc<B>,
) -> Result<Engine<M, B, FilePipeLog<B>>> {
Self::open_with(cfg, file_builder, vec![])
}

pub fn open_with(
cfg: Config,
file_builder: Arc<B>,
mut listeners: Vec<Arc<dyn EventListener>>,
) -> Result<Engine<M, FilePipeLog>> {
) -> Result<Engine<M, B, FilePipeLog<B>>> {
listeners.push(Arc::new(PurgeHook::new()) as Arc<dyn EventListener>);

let global_stats = Arc::new(GlobalStats::default());
Expand All @@ -176,9 +193,9 @@ where
}));

let start = Instant::now();
let pipe_log = FilePipeLog::open(
let pipe_log = Arc::new(FilePipeLog::open(
&cfg,
file_system,
file_builder,
listeners.clone(),
|queue, file_id, mut item_batch| match queue {
LogQueue::Rewrite => {
Expand All @@ -188,7 +205,7 @@ where
Self::apply_to_memtable(&memtables, item_batch.drain(), queue, file_id)
}
},
)?;
)?);
info!("Recovering raft logs takes {:?}", start.elapsed());

let ids = memtables.cleaned_region_ids();
Expand All @@ -206,28 +223,27 @@ where

let cfg = Arc::new(cfg);
let purge_manager = PurgeManager::new(
cfg.clone(),
cfg,
memtables.clone(),
pipe_log.clone(),
global_stats.clone(),
global_stats,
listeners.clone(),
);

Ok(Self {
cfg,
memtables,
pipe_log,
global_stats,
purge_manager,
listeners,
_phantom: PhantomData,
})
}
}

impl<M, P> Engine<M, P>
impl<M, B, P> Engine<M, B, P>
where
M: MessageExt + Clone,
B: FileBuilder,
P: PipeLog,
{
fn apply_to_memtable(
Expand Down Expand Up @@ -354,7 +370,7 @@ where
let mut entry = None;
if let Some(memtable) = self.memtables.get(region_id) {
if let Some(idx) = memtable.read().get_entry(log_idx) {
entry = Some(read_entry_from_file::<M, _>(&self.pipe_log, &idx)?);
entry = Some(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), &idx)?);
}
}
ENGINE_READ_ENTRY_TIME_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64());
Expand Down Expand Up @@ -384,7 +400,7 @@ where
.read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
for i in ents_idx {
vec.push(read_entry_from_file::<M, _>(&self.pipe_log, &i)?);
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), &i)?);
}
ENGINE_READ_ENTRY_TIME_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64());
return Ok(vec.len() - old_len);
Expand Down Expand Up @@ -454,17 +470,7 @@ mod tests {
use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;

impl<M, P> Engine<M, P>
where
M: MessageExt + Clone,
P: PipeLog,
{
pub fn purge_manager(&self) -> &PurgeManager<M, P> {
&self.purge_manager
}
}

type RaftLogEngine = Engine<Entry, FilePipeLog>;
type RaftLogEngine = Engine<Entry>;
impl RaftLogEngine {
fn append(&self, raft_group_id: u64, entries: &Vec<Entry>) -> Result<usize> {
let mut batch = LogBatch::default();
Expand Down Expand Up @@ -510,7 +516,7 @@ mod tests {
cfg.dir = dir.path().to_str().unwrap().to_owned();
cfg.target_file_size = ReadableSize::kb(5);
cfg.purge_threshold = ReadableSize::kb(80);
let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();

append_log(&engine, 1, &Entry::new());
assert!(engine.memtables.get(1).is_some());
Expand All @@ -534,7 +540,7 @@ mod tests {
let mut cfg = Config::default();
cfg.dir = dir.path().to_str().unwrap().to_owned();

let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();
let mut entry = Entry::new();
entry.set_data(vec![b'x'; entry_size].into());
for i in 10..20 {
Expand All @@ -555,7 +561,7 @@ mod tests {
drop(engine);

// Recover the engine.
let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();
for i in 10..20 {
entry.set_index(i + 1);
assert_eq!(engine.get_entry(i, i + 1).unwrap(), Some(entry.clone()));
Expand All @@ -579,7 +585,7 @@ mod tests {
cfg.target_file_size = ReadableSize::kb(5);
cfg.purge_threshold = ReadableSize::kb(150);

let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();
let mut entry = Entry::new();
entry.set_data(vec![b'x'; 1024].into());
for i in 0..100 {
Expand Down Expand Up @@ -645,7 +651,7 @@ mod tests {
cfg.dir = dir.path().to_str().unwrap().to_owned();
cfg.target_file_size = ReadableSize::kb(5);
cfg.purge_threshold = ReadableSize::kb(80);
let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();

// Put 100 entries into 10 regions.
let mut entry = Entry::new();
Expand Down Expand Up @@ -682,7 +688,7 @@ mod tests {

// Recover with rewrite queue and append queue.
drop(engine);
let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();
for i in 1..=10 {
for j in 1..=10 {
let e = engine.get_entry(j, i).unwrap().unwrap();
Expand Down Expand Up @@ -727,7 +733,7 @@ mod tests {
cfg.dir = dir.path().to_str().unwrap().to_owned();
cfg.target_file_size = ReadableSize::kb(128);
cfg.purge_threshold = ReadableSize::kb(512);
let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();

let mut entry = Entry::new();
entry.set_data(vec![b'x'; 1024].into());
Expand Down Expand Up @@ -801,7 +807,7 @@ mod tests {

// After the engine recovers, the removed raft group shouldn't appear again.
drop(engine);
let engine = RaftLogEngine::open(cfg.clone(), None).unwrap();
let engine = RaftLogEngine::open(cfg.clone()).unwrap();
assert!(engine.memtables.get(1).is_none());
}

Expand Down Expand Up @@ -893,7 +899,7 @@ mod tests {

let hook = Arc::new(Hook::default());
let engine =
RaftLogEngine::open_with_listeners(cfg.clone(), None, vec![hook.clone()]).unwrap();
Arc::new(RaftLogEngine::open_with_listeners(cfg.clone(), vec![hook.clone()]).unwrap());
assert_eq!(hook.0[&LogQueue::Append].files(), 1);
assert_eq!(hook.0[&LogQueue::Rewrite].files(), 1);

Expand Down Expand Up @@ -982,6 +988,6 @@ mod tests {

// Drop and then recover.
drop(engine);
RaftLogEngine::open(cfg, None).unwrap();
RaftLogEngine::open(cfg).unwrap();
}
}
62 changes: 62 additions & 0 deletions src/file_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.

use std::io::{Read, Seek, Write};
use std::path::Path;

/// A `FileBuilder` generates file readers or writers that are composed upon
/// existing ones.
pub trait FileBuilder: Send + Sync {
/// Types of outcome file reader/writer. They must not alter the length of
/// bytes stream, nor buffer bytes between operations.
type Reader<R: Seek + Read>: Seek + Read;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type Reader: Seek + Read should be enough? Then all across the file builder should use Reader.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is enough, but then we can't guarantee the implementor will actually use R/W from typing perspective. Though I'm not sure this is the idiomatic way to contracting such requirement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think it would be hard for implementor to wrap a generic R/W into a non-generic Reader type.

type Writer<W: Seek + Write>: Seek + Write;

fn build_reader<R>(
&self,
path: &Path,
reader: R,
) -> Result<Self::Reader<R>, Box<dyn std::error::Error>>
where

This comment was marked as resolved.

R: Seek + Read;

fn build_writer<W>(
&self,
path: &Path,
writer: W,
create: bool,
) -> Result<Self::Writer<W>, Box<dyn std::error::Error>>
where
W: Seek + Write;
}

/// `DefaultFileBuilder` is a `FileBuilder` that builds out the original
/// `Reader`/`Writer` as it is.
pub struct DefaultFileBuilder {}

impl FileBuilder for DefaultFileBuilder {
type Reader<R: Seek + Read> = R;
type Writer<W: Seek + Write> = W;

fn build_reader<R>(
&self,
_path: &Path,
reader: R,
) -> Result<Self::Reader<R>, Box<dyn std::error::Error>>
where
R: Seek + Read,
{
Ok(reader)
}

fn build_writer<W>(
&self,
_path: &Path,
writer: W,
_create: bool,
) -> Result<Self::Writer<W>, Box<dyn std::error::Error>>
where
W: Seek + Write,
{
Ok(writer)
}
}
Loading