From 28340035fc7a8b8f2048c52d35eb369ddc9346cd Mon Sep 17 00:00:00 2001 From: tabokie Date: Tue, 7 Sep 2021 12:18:30 +0800 Subject: [PATCH 1/2] replace file system with file builder Signed-off-by: tabokie --- examples/append_compact_purge.rs | 4 +- src/engine.rs | 94 +++++------ src/file_builder.rs | 62 ++++++++ src/file_pipe_log.rs | 263 ++++++++++++++++++------------- src/file_system.rs | 31 ---- src/lib.rs | 20 +-- src/log_batch.rs | 7 +- src/log_file.rs | 77 ++------- src/pipe_log.rs | 2 +- src/purge.rs | 6 +- src/reader.rs | 20 +-- 11 files changed, 307 insertions(+), 279 deletions(-) create mode 100644 src/file_builder.rs delete mode 100644 src/file_system.rs diff --git a/examples/append_compact_purge.rs b/examples/append_compact_purge.rs index 2aaba37f..fb0340e1 100644 --- a/examples/append_compact_purge.rs +++ b/examples/append_compact_purge.rs @@ -2,7 +2,7 @@ use kvproto::raft_serverpb::RaftLocalState; use raft::eraftpb::Entry; -use raft_engine::{Config, LogBatch, MessageExt, RaftLogEngine, ReadableSize}; +use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize}; use rand::thread_rng; use rand_distr::{Distribution, Normal}; @@ -26,7 +26,7 @@ fn main() { config.dir = "append-compact-purge-data".to_owned(); config.purge_threshold = ReadableSize::gb(2); config.batch_compression_threshold = ReadableSize::kb(0); - let engine = RaftLogEngine::::open(config, None).expect("Open raft engine"); + let engine = Engine::::open(config).expect("Open raft engine"); let compact_offset = 32; // In src/purge.rs, it's the limit for rewrite. diff --git a/src/engine.rs b/src/engine.rs index fae0bc28..325cb5f2 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -13,8 +13,8 @@ use protobuf::{parse_from_bytes, Message}; use crate::config::Config; use crate::event_listener::EventListener; +use crate::file_builder::*; use crate::file_pipe_log::FilePipeLog; -use crate::file_system::FileSystem; use crate::log_batch::{Command, LogBatch, LogItemContent, LogItemDrain, MessageExt, OpType}; use crate::memtable::{EntryIndex, MemTable}; use crate::metrics::*; @@ -129,39 +129,56 @@ impl MemTableAccessor { } } -#[derive(Clone)] -pub struct Engine +pub struct Engine> where M: MessageExt + Clone, + B: FileBuilder, P: PipeLog, { - cfg: Arc, memtables: MemTableAccessor, - pipe_log: P, - global_stats: Arc, + pipe_log: Arc

, purge_manager: PurgeManager, listeners: Vec>, - _phantom: PhantomData, + _phantom: PhantomData, } -impl Engine +impl Engine> where M: MessageExt + Clone, { pub fn open( cfg: Config, - file_system: Option>, - ) -> Result> { - Self::open_with_listeners(cfg, file_system, vec![]) + ) -> Result>> { + Self::open_with_listeners(cfg, vec![]) } pub fn open_with_listeners( cfg: Config, - file_system: Option>, + listeners: Vec>, + ) -> Result>> { + Self::open_with(cfg, Arc::new(DefaultFileBuilder {}), listeners) + } +} + +impl Engine> +where + M: MessageExt + Clone, + B: FileBuilder, +{ + pub fn open_with_file_builder( + cfg: Config, + file_builder: Arc, + ) -> Result>> { + Self::open_with(cfg, file_builder, vec![]) + } + + pub fn open_with( + cfg: Config, + file_builder: Arc, mut listeners: Vec>, - ) -> Result> { + ) -> Result>> { listeners.push(Arc::new(PurgeHook::new()) as Arc); let global_stats = Arc::new(GlobalStats::default()); @@ -176,9 +193,9 @@ where })); let start = Instant::now(); - let pipe_log = FilePipeLog::open( + let pipe_log = Arc::new(FilePipeLog::open( &cfg, - file_system, + file_builder, listeners.clone(), |queue, file_id, mut item_batch| match queue { LogQueue::Rewrite => { @@ -188,7 +205,7 @@ where Self::apply_to_memtable(&memtables, item_batch.drain(), queue, file_id) } }, - )?; + )?); info!("Recovering raft logs takes {:?}", start.elapsed()); let ids = memtables.cleaned_region_ids(); @@ -206,18 +223,16 @@ where let cfg = Arc::new(cfg); let purge_manager = PurgeManager::new( - cfg.clone(), + cfg, memtables.clone(), pipe_log.clone(), - global_stats.clone(), + global_stats, listeners.clone(), ); Ok(Self { - cfg, memtables, pipe_log, - global_stats, purge_manager, listeners, _phantom: PhantomData, @@ -225,9 +240,10 @@ where } } -impl Engine +impl Engine where M: MessageExt + Clone, + B: FileBuilder, P: PipeLog, { fn apply_to_memtable( @@ -355,7 +371,7 @@ where let mut entry = None; if let Some(memtable) = self.memtables.get(region_id) { if let Some(idx) = memtable.read().get_entry(log_idx) { - entry = Some(read_entry_from_file::(&self.pipe_log, &idx)?); + entry = Some(read_entry_from_file::(self.pipe_log.as_ref(), &idx)?); } } ENGINE_READ_ENTRY_TIME_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64()); @@ -385,7 +401,7 @@ where .read() .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; for i in ents_idx { - vec.push(read_entry_from_file::(&self.pipe_log, &i)?); + vec.push(read_entry_from_file::(self.pipe_log.as_ref(), &i)?); } ENGINE_READ_ENTRY_TIME_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64()); return Ok(vec.len() - old_len); @@ -455,17 +471,7 @@ mod tests { use kvproto::raft_serverpb::RaftLocalState; use raft::eraftpb::Entry; - impl Engine - where - M: MessageExt + Clone, - P: PipeLog, - { - pub fn purge_manager(&self) -> &PurgeManager { - &self.purge_manager - } - } - - type RaftLogEngine = Engine; + type RaftLogEngine = Engine; impl RaftLogEngine { fn append(&self, raft_group_id: u64, entries: Vec) -> Result { let mut batch = LogBatch::default(); @@ -511,7 +517,7 @@ mod tests { cfg.dir = dir.path().to_str().unwrap().to_owned(); cfg.target_file_size = ReadableSize::kb(5); cfg.purge_threshold = ReadableSize::kb(80); - let engine = RaftLogEngine::open(cfg.clone(), None).unwrap(); + let engine = RaftLogEngine::open(cfg.clone()).unwrap(); append_log(&engine, 1, &Entry::new()); assert!(engine.memtables.get(1).is_some()); @@ -535,7 +541,7 @@ mod tests { let mut cfg = Config::default(); cfg.dir = dir.path().to_str().unwrap().to_owned(); - let engine = RaftLogEngine::open(cfg.clone(), None).unwrap(); + let engine = RaftLogEngine::open(cfg.clone()).unwrap(); let mut entry = Entry::new(); entry.set_data(vec![b'x'; entry_size].into()); for i in 10..20 { @@ -556,7 +562,7 @@ mod tests { drop(engine); // Recover the engine. - let engine = RaftLogEngine::open(cfg.clone(), None).unwrap(); + let engine = RaftLogEngine::open(cfg.clone()).unwrap(); for i in 10..20 { entry.set_index(i + 1); assert_eq!(engine.get_entry(i, i + 1).unwrap(), Some(entry.clone())); @@ -580,7 +586,7 @@ mod tests { cfg.target_file_size = ReadableSize::kb(5); cfg.purge_threshold = ReadableSize::kb(150); - let engine = RaftLogEngine::open(cfg.clone(), None).unwrap(); + let engine = RaftLogEngine::open(cfg.clone()).unwrap(); let mut entry = Entry::new(); entry.set_data(vec![b'x'; 1024].into()); for i in 0..100 { @@ -646,7 +652,7 @@ mod tests { cfg.dir = dir.path().to_str().unwrap().to_owned(); cfg.target_file_size = ReadableSize::kb(5); cfg.purge_threshold = ReadableSize::kb(80); - let engine = RaftLogEngine::open(cfg.clone(), None).unwrap(); + let engine = RaftLogEngine::open(cfg.clone()).unwrap(); // Put 100 entries into 10 regions. let mut entry = Entry::new(); @@ -683,7 +689,7 @@ mod tests { // Recover with rewrite queue and append queue. drop(engine); - let engine = RaftLogEngine::open(cfg.clone(), None).unwrap(); + let engine = RaftLogEngine::open(cfg.clone()).unwrap(); for i in 1..=10 { for j in 1..=10 { let e = engine.get_entry(j, i).unwrap().unwrap(); @@ -728,7 +734,7 @@ mod tests { cfg.dir = dir.path().to_str().unwrap().to_owned(); cfg.target_file_size = ReadableSize::kb(128); cfg.purge_threshold = ReadableSize::kb(512); - let engine = RaftLogEngine::open(cfg.clone(), None).unwrap(); + let engine = RaftLogEngine::open(cfg.clone()).unwrap(); let mut entry = Entry::new(); entry.set_data(vec![b'x'; 1024].into()); @@ -802,7 +808,7 @@ mod tests { // After the engine recovers, the removed raft group shouldn't appear again. drop(engine); - let engine = RaftLogEngine::open(cfg.clone(), None).unwrap(); + let engine = RaftLogEngine::open(cfg.clone()).unwrap(); assert!(engine.memtables.get(1).is_none()); } @@ -894,7 +900,7 @@ mod tests { let hook = Arc::new(Hook::default()); let engine = - RaftLogEngine::open_with_listeners(cfg.clone(), None, vec![hook.clone()]).unwrap(); + Arc::new(RaftLogEngine::open_with_listeners(cfg.clone(), vec![hook.clone()]).unwrap()); assert_eq!(hook.0[&LogQueue::Append].files(), 1); assert_eq!(hook.0[&LogQueue::Rewrite].files(), 1); @@ -983,6 +989,6 @@ mod tests { // Drop and then recover. drop(engine); - RaftLogEngine::open(cfg, None).unwrap(); + RaftLogEngine::open(cfg).unwrap(); } } diff --git a/src/file_builder.rs b/src/file_builder.rs new file mode 100644 index 00000000..99dcb3f7 --- /dev/null +++ b/src/file_builder.rs @@ -0,0 +1,62 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +use std::io::{Read, Seek, Write}; +use std::path::Path; + +/// A `FileBuilder` generates file readers or writers that are composed upon +/// existing ones. +pub trait FileBuilder: Send + Sync { + /// Types of outcome file reader/writer. They must not alter the length of + /// bytes stream, nor buffer bytes between operations. + type Reader: Seek + Read; + type Writer: Seek + Write; + + fn build_reader( + &self, + path: &Path, + reader: R, + ) -> Result, Box> + where + R: Seek + Read; + + fn build_writer( + &self, + path: &Path, + writer: W, + create: bool, + ) -> Result, Box> + where + W: Seek + Write; +} + +/// `DefaultFileBuilder` is a `FileBuilder` that builds out the original +/// `Reader`/`Writer` as it is. +pub struct DefaultFileBuilder {} + +impl FileBuilder for DefaultFileBuilder { + type Reader = R; + type Writer = W; + + fn build_reader( + &self, + _path: &Path, + reader: R, + ) -> Result, Box> + where + R: Seek + Read, + { + Ok(reader) + } + + fn build_writer( + &self, + _path: &Path, + writer: W, + _create: bool, + ) -> Result, Box> + where + W: Seek + Write, + { + Ok(writer) + } +} diff --git a/src/file_pipe_log.rs b/src/file_pipe_log.rs index c47b548d..f459c5b6 100644 --- a/src/file_pipe_log.rs +++ b/src/file_pipe_log.rs @@ -2,32 +2,32 @@ use std::collections::VecDeque; use std::fs; -use std::io::{Error as IoError, ErrorKind as IoErrorKind, Write}; +use std::io::BufRead; +use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Seek, Write}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Instant; use log::{info, warn}; +use num_derive::{FromPrimitive, ToPrimitive}; +use num_traits::{FromPrimitive, ToPrimitive}; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use crate::codec::{self, NumberEncoder}; use crate::config::{Config, RecoveryMode}; use crate::event_listener::EventListener; -use crate::file_system::{FileSystem, Readable, Writable}; +use crate::file_builder::FileBuilder; use crate::log_batch::{LogBatch, LogItemBatch}; -use crate::log_file::{LogFd, LogFile, LogFileHeader, LOG_FILE_MIN_HEADER_LEN}; +use crate::log_file::{LogFd, LogFile}; use crate::metrics::*; use crate::pipe_log::{FileId, LogQueue, PipeLog}; use crate::reader::LogItemBatchFileReader; use crate::util::InstantExt; use crate::{Error, Result}; -const LOG_SUFFIX: &str = ".raftlog"; const LOG_NUM_LEN: usize = 16; -const LOG_NAME_LEN: usize = LOG_NUM_LEN + LOG_SUFFIX.len(); - -const REWRITE_SUFFIX: &str = ".rewrite"; -const REWRITE_NUM_LEN: usize = 8; -const REWRITE_NAME_LEN: usize = REWRITE_NUM_LEN + REWRITE_SUFFIX.len(); +const LOG_APPEND_SUFFIX: &str = ".raftlog"; +const LOG_REWRITE_SUFFIX: &str = ".rewrite"; const INIT_FILE_ID: u64 = 1; @@ -36,125 +36,181 @@ const FILE_ALLOCATE_SIZE: usize = 2 * 1024 * 1024; fn build_file_name(queue: LogQueue, file_id: FileId) -> String { match queue { - LogQueue::Append => format!("{:0width$}{}", file_id, LOG_SUFFIX, width = LOG_NUM_LEN), + LogQueue::Append => format!( + "{:0width$}{}", + file_id, + LOG_APPEND_SUFFIX, + width = LOG_NUM_LEN + ), LogQueue::Rewrite => format!( "{:0width$}{}", file_id, - REWRITE_SUFFIX, - width = REWRITE_NUM_LEN + LOG_REWRITE_SUFFIX, + width = LOG_NUM_LEN ), } } +fn parse_file_name(file_name: &str) -> Result<(LogQueue, FileId)> { + if file_name.len() > LOG_NUM_LEN { + if let Ok(num) = file_name[..LOG_NUM_LEN].parse::() { + if file_name.ends_with(LOG_APPEND_SUFFIX) { + return Ok((LogQueue::Append, num.into())); + } else if file_name.ends_with(LOG_REWRITE_SUFFIX) { + return Ok((LogQueue::Rewrite, num.into())); + } + } + } + Err(Error::ParseFileName(file_name.to_owned())) +} + fn build_file_path>(dir: P, queue: LogQueue, file_id: FileId) -> PathBuf { let mut path = PathBuf::from(dir.as_ref()); path.push(build_file_name(queue, file_id)); path } -fn parse_file_name(file_name: &str) -> Result<(LogQueue, FileId)> { - if file_name.ends_with(LOG_SUFFIX) && file_name.len() == LOG_NAME_LEN { - if let Ok(num) = file_name[..LOG_NUM_LEN].parse::() { - return Ok((LogQueue::Append, num.into())); +const LOG_FILE_MAGIC_HEADER: &[u8] = b"RAFT-LOG-FILE-HEADER-9986AB3E47F320B394C8E84916EB0ED5"; +pub const LOG_FILE_HEADER_LEN: usize = LOG_FILE_MAGIC_HEADER.len() + Version::len(); + +#[derive(Clone, Copy, FromPrimitive, ToPrimitive)] +enum Version { + V1 = 1, +} + +impl Version { + const fn current() -> Self { + Self::V1 + } + + const fn len() -> usize { + 8 + } +} + +pub struct LogFileHeader {} + +impl LogFileHeader { + pub fn new() -> Self { + Self {} + } + + pub fn decode(buf: &mut &[u8]) -> Result { + if buf.len() < LOG_FILE_HEADER_LEN { + return Err(Error::Corruption("log file header too short".to_owned())); } - } else if file_name.ends_with(REWRITE_SUFFIX) && file_name.len() == REWRITE_NAME_LEN { - if let Ok(num) = file_name[..REWRITE_NUM_LEN].parse::() { - return Ok((LogQueue::Rewrite, num.into())); + if !buf.starts_with(LOG_FILE_MAGIC_HEADER) { + return Err(Error::Corruption( + "log file magic header mismatch".to_owned(), + )); } + buf.consume(LOG_FILE_MAGIC_HEADER.len()); + let v = codec::decode_u64(buf)?; + if Version::from_u64(v).is_none() { + return Err(Error::Corruption(format!( + "unrecognized log file version: {}", + v + ))); + } + Ok(Self {}) + } + + pub fn encode(&self, buf: &mut Vec) -> Result<()> { + buf.extend_from_slice(LOG_FILE_MAGIC_HEADER); + buf.encode_u64(Version::current().to_u64().unwrap())?; + Ok(()) } - Err(Error::ParseFileName(file_name.to_owned())) } -struct ActiveFile { +struct ActiveFile { fd: Arc, - writer: Box, - size: usize, + writer: W, + written: usize, capacity: usize, last_sync: usize, } -impl ActiveFile { - fn open(fd: Arc, writer: Box, size: usize) -> Result { - let file_size = fd.file_size()?; +impl ActiveFile { + fn open(fd: Arc, writer: W, written: usize) -> Result { + let capacity = fd.file_size()?; let mut f = Self { fd, writer, - size, - capacity: file_size, - last_sync: size, + written, + capacity, + last_sync: written, }; - if size < LOG_FILE_MIN_HEADER_LEN { + if written < LOG_FILE_HEADER_LEN { f.write_header()?; } else { - f.writer.seek(std::io::SeekFrom::Start(size as u64))?; + f.writer.seek(std::io::SeekFrom::Start(written as u64))?; } Ok(f) } - fn reset(&mut self, fd: Arc, writer: Box) -> Result<()> { - self.size = 0; - self.last_sync = 0; + fn rotate(&mut self, fd: Arc, writer: W) -> Result<()> { + self.writer = writer; + self.written = 0; self.capacity = fd.file_size()?; self.fd = fd; - self.writer = writer; + self.last_sync = 0; self.write_header() } fn truncate(&mut self) -> Result<()> { - self.fd.truncate(self.size)?; + self.fd.truncate(self.written)?; self.fd.sync()?; - self.capacity = self.size; + self.capacity = self.written; Ok(()) } fn write_header(&mut self) -> Result<()> { self.writer.seek(std::io::SeekFrom::Start(0))?; - self.size = 0; - let mut buf = Vec::with_capacity(LOG_FILE_MIN_HEADER_LEN); + self.written = 0; + let mut buf = Vec::with_capacity(LOG_FILE_HEADER_LEN); LogFileHeader::new().encode(&mut buf)?; - self.write(&buf, true)?; - Ok(()) + self.write(&buf, true) } fn write(&mut self, buf: &[u8], sync: bool) -> Result<()> { - if self.size + buf.len() > self.capacity { + if self.written + buf.len() > self.capacity { // Use fallocate to pre-allocate disk space for active file. - let alloc = std::cmp::max(self.size + buf.len() - self.capacity, FILE_ALLOCATE_SIZE); + let alloc = std::cmp::max(self.written + buf.len() - self.capacity, FILE_ALLOCATE_SIZE); self.fd.allocate(self.capacity, alloc)?; self.capacity += alloc; } self.writer.write_all(buf)?; - self.size += buf.len(); + self.written += buf.len(); if sync { - self.last_sync = self.size; + self.last_sync = self.written; } Ok(()) } fn since_last_sync(&self) -> usize { - self.size - self.last_sync + self.written - self.last_sync } } -struct LogManager { +struct LogManager { queue: LogQueue, dir: String, rotate_size: usize, bytes_per_sync: usize, - file_system: Option>, + file_builder: Arc, listeners: Vec>, pub first_file_id: FileId, pub active_file_id: FileId, all_files: VecDeque>, - active_file: ActiveFile, + active_file: ActiveFile>, } -impl LogManager { +impl LogManager { fn open( cfg: &Config, - file_system: Option>, + file_builder: Arc, listeners: Vec>, queue: LogQueue, mut min_file_id: FileId, @@ -164,7 +220,9 @@ impl LogManager { where F: Fn(LogQueue, FileId, LogItemBatch), { - let mut reader = LogItemBatchFileReader::new(cfg.recovery_read_block_size.0 as usize); + let mut reader = LogItemBatchFileReader::>::new( + cfg.recovery_read_block_size.0 as usize, + ); let mut all_files = VecDeque::with_capacity(DEFAULT_FILES_COUNT); if max_file_id.valid() { assert!(min_file_id <= max_file_id); @@ -180,12 +238,7 @@ impl LogManager { let file_size = fd.file_size()?; let tolerate_failure = file_id == max_file_id && cfg.recovery_mode == RecoveryMode::TolerateCorruptedTailRecords; - let raw_file_reader = Box::new(LogFile::new(fd)); - let file_reader = if let Some(ref fs) = file_system { - fs.open_file_reader(&path, raw_file_reader as Box)? - } else { - raw_file_reader - }; + let file_reader = file_builder.build_reader(&path, LogFile::new(fd))?; if let Err(e) = reader.open(file_reader, file_size) { if !tolerate_failure { return Err(Error::Corruption(format!("Unable to open log file: {}", e))); @@ -224,17 +277,13 @@ impl LogManager { } let active_file_size = reader.valid_offset(); let active_fd = all_files.back().unwrap().clone(); - let raw_writer = Box::new(LogFile::new(active_fd.clone())); let active_file = ActiveFile::open( - active_fd, - if let Some(ref fs) = file_system { - fs.open_file_writer( - &build_file_path(&cfg.dir, queue, max_file_id), - raw_writer as Box, - )? - } else { - raw_writer - }, + active_fd.clone(), + file_builder.build_writer( + &build_file_path(&cfg.dir, queue, max_file_id), + LogFile::new(active_fd), + false, /*create*/ + )?, active_file_size, )?; @@ -243,7 +292,7 @@ impl LogManager { dir: cfg.dir.clone(), rotate_size: cfg.target_file_size.0 as usize, bytes_per_sync: cfg.bytes_per_sync.0 as usize, - file_system, + file_builder, listeners, first_file_id: min_file_id, @@ -258,7 +307,7 @@ impl LogManager { fn new_log_file(&mut self) -> Result<()> { if self.active_file_id.valid() { - self.truncate_active_log()?; + // self.truncate_active_log()?; } self.active_file_id = if self.active_file_id.valid() { self.active_file_id.forward(1) @@ -269,16 +318,10 @@ impl LogManager { let path = build_file_path(&self.dir, self.queue, self.active_file_id); let fd = Arc::new(LogFd::create(&path)?); self.all_files.push_back(fd.clone()); - - let raw_writer = Box::new(LogFile::new(fd.clone())); - self.active_file.reset( - fd, - if let Some(ref fs) = self.file_system { - fs.open_file_writer(&path, raw_writer as Box) - .unwrap() - } else { - raw_writer - }, + self.active_file.rotate( + fd.clone(), + self.file_builder + .build_writer(&path, LogFile::new(fd), true /*create*/)?, )?; self.sync_dir()?; @@ -327,13 +370,13 @@ impl LogManager { } fn append(&mut self, content: &[u8], sync: &mut bool) -> Result<(FileId, u64, Arc)> { - if self.active_file.size >= self.rotate_size { + if self.active_file.written >= self.rotate_size { self.new_log_file()?; } if self.active_file.since_last_sync() >= self.bytes_per_sync { *sync = true; } - let offset = self.active_file.size as u64; + let offset = self.active_file.written as u64; self.active_file.write(content, *sync)?; Ok((self.active_file_id, offset, self.active_file.fd.clone())) } @@ -347,29 +390,29 @@ impl LogManager { fn size(&self) -> usize { self.active_file_id.step_after(&self.first_file_id).unwrap() * self.rotate_size - + self.active_file.size + + self.active_file.written } } #[derive(Clone)] -pub struct FilePipeLog { +pub struct FilePipeLog { dir: String, rotate_size: usize, compression_threshold: usize, - appender: Arc>, - rewriter: Arc>, - file_system: Option>, + appender: Arc>>, + rewriter: Arc>>, + file_builder: Arc, listeners: Vec>, } -impl FilePipeLog { +impl FilePipeLog { pub fn open( cfg: &Config, - file_system: Option>, + file_builder: Arc, listeners: Vec>, replay: F, - ) -> Result + ) -> Result> where F: Fn(LogQueue, FileId, LogItemBatch), { @@ -402,7 +445,7 @@ impl FilePipeLog { let appender = Arc::new(RwLock::new(LogManager::open( cfg, - file_system.clone(), + file_builder.clone(), listeners.clone(), LogQueue::Append, min_append_id, @@ -411,7 +454,7 @@ impl FilePipeLog { )?)); let rewriter = Arc::new(RwLock::new(LogManager::open( cfg, - file_system.clone(), + file_builder.clone(), listeners.clone(), LogQueue::Rewrite, min_rewrite_id, @@ -425,7 +468,7 @@ impl FilePipeLog { compression_threshold: cfg.batch_compression_threshold.0 as usize, appender, rewriter, - file_system, + file_builder, listeners, }) } @@ -441,20 +484,22 @@ impl FilePipeLog { listener.on_append_log_file(queue, file_id, content.len()); } if *sync { + let start = Instant::now(); fd.sync()?; + LOG_SYNC_TIME_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64()); } Ok((file_id, offset)) } - fn get_queue(&self, queue: LogQueue) -> RwLockReadGuard { + fn get_queue(&self, queue: LogQueue) -> RwLockReadGuard> { match queue { LogQueue::Append => self.appender.read(), LogQueue::Rewrite => self.rewriter.read(), } } - fn mut_queue(&self, queue: LogQueue) -> RwLockWriteGuard { + fn mut_queue(&self, queue: LogQueue) -> RwLockWriteGuard> { match queue { LogQueue::Append => self.appender.write(), LogQueue::Rewrite => self.rewriter.write(), @@ -462,7 +507,7 @@ impl FilePipeLog { } } -impl PipeLog for FilePipeLog { +impl PipeLog for FilePipeLog { fn close(&self) -> Result<()> { self.mut_queue(LogQueue::Rewrite).truncate_active_log()?; self.mut_queue(LogQueue::Append).truncate_active_log() @@ -486,12 +531,10 @@ impl PipeLog for FilePipeLog { len: u64, ) -> Result> { let fd = self.get_queue(queue).get_fd(file_id)?; - let raw_reader = Box::new(LogFile::new(fd)); - let mut reader = if let Some(ref fs) = self.file_system { - fs.open_file_reader(&build_file_path(&self.dir, queue, file_id), raw_reader)? - } else { - raw_reader - }; + let mut reader = self.file_builder.build_reader( + &build_file_path(&self.dir, queue, file_id), + LogFile::new(fd), + )?; reader.seek(std::io::SeekFrom::Start(offset))?; let mut buf = vec![0; len as usize]; let size = reader.read(&mut buf)?; @@ -564,8 +607,7 @@ impl PipeLog for FilePipeLog { let mut cur_file_id = file_id.backward(purge_count); for i in 0..purge_count { - let mut path = PathBuf::from(&self.dir); - path.push(build_file_name(queue, cur_file_id)); + let path = build_file_path(&self.dir, queue, cur_file_id); if let Err(e) = fs::remove_file(&path) { warn!("Remove purged log file {:?} fail: {}", path, e); return Ok(i); @@ -581,15 +623,20 @@ mod tests { use tempfile::Builder; use super::*; + use crate::file_builder::DefaultFileBuilder; use crate::util::ReadableSize; - fn new_test_pipe_log(path: &str, bytes_per_sync: usize, rotate_size: usize) -> FilePipeLog { + fn new_test_pipe_log( + path: &str, + bytes_per_sync: usize, + rotate_size: usize, + ) -> FilePipeLog { let mut cfg = Config::default(); cfg.dir = path.to_owned(); cfg.bytes_per_sync = ReadableSize(bytes_per_sync as u64); cfg.target_file_size = ReadableSize(rotate_size as u64); - FilePipeLog::open(&cfg, None, vec![], |_, _, _| {}).unwrap() + FilePipeLog::open(&cfg, Arc::new(DefaultFileBuilder {}), vec![], |_, _, _| {}).unwrap() } #[test] @@ -601,7 +648,7 @@ mod tests { ); assert_eq!(build_file_name(LogQueue::Append, 123.into()), file_name); - let file_name: &str = "00000123.rewrite"; + let file_name: &str = "0000000000000123.rewrite"; assert_eq!( parse_file_name(file_name).unwrap(), (LogQueue::Rewrite, 123.into()) @@ -623,7 +670,7 @@ mod tests { assert_eq!(pipe_log.first_file_id(queue), INIT_FILE_ID.into()); assert_eq!(pipe_log.active_file_id(queue), INIT_FILE_ID.into()); - let header_size = LOG_FILE_MIN_HEADER_LEN as u64; + let header_size = LOG_FILE_HEADER_LEN as u64; // generate file 1, 2, 3 let content: Vec = vec![b'a'; 1024]; diff --git a/src/file_system.rs b/src/file_system.rs deleted file mode 100644 index 46abbcd9..00000000 --- a/src/file_system.rs +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. - -use std::io::{Read, Seek, Write}; -use std::path::Path; - -pub trait Readable: Seek + Read + Send + Sync {} -impl Readable for T {} - -pub trait Writable: Seek + Write + Send + Sync {} -impl Writable for T {} - -/// An overlay abstraction for accessing files. -pub trait FileSystem: Send + Sync { - fn open_file_reader( - &self, - path: &Path, - reader: Box, - ) -> Result, Box>; - - fn open_file_writer( - &self, - path: &Path, - writer: Box, - ) -> Result, Box>; - - fn create_file_writer( - &self, - path: &Path, - writer: Box, - ) -> Result, Box>; -} diff --git a/src/lib.rs b/src/lib.rs index c61e84df..d0070514 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ #![feature(shrink_to)] #![feature(cell_update)] +#![feature(generic_associated_types)] #[macro_use] extern crate lazy_static; @@ -36,8 +37,8 @@ mod config; mod engine; mod errors; mod event_listener; +mod file_builder; mod file_pipe_log; -mod file_system; mod log_batch; mod log_file; mod memtable; @@ -49,15 +50,14 @@ mod reader; mod test_util; mod util; -use crate::file_pipe_log::FilePipeLog; - -pub use self::config::{Config, RecoveryMode}; -pub use self::errors::{Error, Result}; -pub use self::event_listener::EventListener; -pub use self::log_batch::{Command, LogBatch, MessageExt}; -pub use self::pipe_log::{FileId, LogQueue}; -pub use self::util::ReadableSize; -pub type RaftLogEngine = self::engine::Engine; +pub use config::{Config, RecoveryMode}; +pub use errors::{Error, Result}; +pub use event_listener::EventListener; +pub use file_builder::FileBuilder; +pub use log_batch::{Command, LogBatch, MessageExt}; +pub use pipe_log::{FileId, LogQueue}; +pub use util::ReadableSize; +pub type Engine = engine::Engine; #[derive(Default)] pub struct GlobalStats { diff --git a/src/log_batch.rs b/src/log_batch.rs index c335bf96..5a6563a8 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -431,7 +431,7 @@ impl LogItemBatch { enum BufState { Uninitialized, EntriesFilled, - Sealed(usize), + Sealed(usize), // Contains offset of item batch Incomplete, } @@ -499,7 +499,10 @@ impl LogBatch { self.buf.encode_u64(0)?; self.buf.encode_u64(0)?; } else if self.buf_state != BufState::EntriesFilled { - // return error + return Err(Error::Corruption(format!( + "Unexpected log batch buffer state: {:?}", + self.buf_state + ))); } else { self.buf_state = BufState::Incomplete; } diff --git a/src/log_file.rs b/src/log_file.rs index 1aa1461b..79de5b7f 100644 --- a/src/log_file.rs +++ b/src/log_file.rs @@ -1,79 +1,22 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::io::{BufRead, Read, Result as IoResult, Seek, SeekFrom, Write}; +use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; use std::os::unix::io::RawFd; use std::sync::Arc; -use std::time::Instant; use fail::fail_point; -use log::warn; +use log::error; use nix::errno::Errno; use nix::fcntl::{self, OFlag}; use nix::sys::stat::Mode; use nix::sys::uio::{pread, pwrite}; use nix::unistd::{close, fsync, ftruncate, lseek, Whence}; use nix::NixPath; -use num_derive::{FromPrimitive, ToPrimitive}; -use num_traits::{FromPrimitive, ToPrimitive}; - -use crate::codec::{self, NumberEncoder}; -use crate::metrics::*; -use crate::util::InstantExt; -use crate::{Error, Result}; - -const LOG_FILE_MAGIC_HEADER: &[u8] = b"RAFT-LOG-FILE-HEADER-9986AB3E47F320B394C8E84916EB0ED5"; -pub const LOG_FILE_MIN_HEADER_LEN: usize = LOG_FILE_MAGIC_HEADER.len() + Version::len(); -pub const LOG_FILE_MAX_HEADER_LEN: usize = LOG_FILE_MIN_HEADER_LEN; - -#[derive(Clone, Copy, FromPrimitive, ToPrimitive)] -enum Version { - V1 = 1, -} - -impl Version { - const fn current() -> Self { - Self::V1 - } - - const fn len() -> usize { - 8 - } -} - -pub struct LogFileHeader {} - -impl LogFileHeader { - pub fn new() -> Self { - Self {} - } - - pub fn decode(buf: &mut &[u8]) -> Result { - if buf.len() < LOG_FILE_MIN_HEADER_LEN { - return Err(Error::Corruption("log file header too short".to_owned())); - } - if !buf.starts_with(LOG_FILE_MAGIC_HEADER) { - return Err(Error::Corruption( - "log file magic header mismatch".to_owned(), - )); - } - buf.consume(LOG_FILE_MAGIC_HEADER.len()); - let v = codec::decode_u64(buf)?; - if Version::from_u64(v).is_none() { - return Err(Error::Corruption(format!( - "unrecognized log file version: {}", - v - ))); - } - Ok(Self {}) - } - - pub fn encode(&self, buf: &mut Vec) -> Result<()> { - buf.extend_from_slice(LOG_FILE_MAGIC_HEADER); - buf.encode_u64(Version::current().to_u64().unwrap())?; - Ok(()) - } -} +/// A `LogFd` is a RAII file that provides basic I/O functionality. +/// +/// This implementation is a thin wrapper around `RawFd`, and primarily targets +/// UNIX-based systems. pub struct LogFd(RawFd); fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { @@ -116,10 +59,7 @@ impl LogFd { } pub fn sync(&self) -> IoResult<()> { - let start = Instant::now(); - let res = fsync(self.0).map_err(|e| from_nix_error(e, "fsync")); - LOG_SYNC_TIME_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64()); - res + fsync(self.0).map_err(|e| from_nix_error(e, "fsync")) } pub fn read(&self, mut offset: usize, buf: &mut [u8]) -> IoResult { @@ -188,11 +128,12 @@ impl LogFd { impl Drop for LogFd { fn drop(&mut self) { if let Err(e) = self.close() { - warn!("close: {}", e); + error!("error while closing file: {}", e); } } } +/// A `LogFile` is a `LogFd` wrapper that implements `Seek`, `Write` and `Read`. pub struct LogFile { inner: Arc, offset: usize, diff --git a/src/pipe_log.rs b/src/pipe_log.rs index f81931d0..588133c6 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -83,7 +83,7 @@ impl FileId { } } -pub trait PipeLog: Sized + Clone + Send { +pub trait PipeLog: Sized { /// Close the pipe log. fn close(&self) -> Result<()>; diff --git a/src/purge.rs b/src/purge.rs index 579d7184..0fb66370 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -35,7 +35,7 @@ where { cfg: Arc, memtables: MemTableAccessor, - pipe_log: P, + pipe_log: Arc

, global_stats: Arc, listeners: Vec>, @@ -53,7 +53,7 @@ where pub fn new( cfg: Arc, memtables: MemTableAccessor, - pipe_log: P, + pipe_log: Arc

, global_stats: Arc, listeners: Vec>, ) -> PurgeManager { @@ -237,7 +237,7 @@ where }; for i in entry_indexes { - let entry = read_entry_from_file::(&self.pipe_log, &i)?; + let entry = read_entry_from_file::(self.pipe_log.as_ref(), &i)?; total_size += entry.compute_size(); entries.push(entry); } diff --git a/src/reader.rs b/src/reader.rs index c4c27b45..288a1216 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,12 +1,13 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. -use crate::file_system::Readable; +use std::io::{Read, Seek}; + +use crate::file_pipe_log::{LogFileHeader, LOG_FILE_HEADER_LEN}; use crate::log_batch::{LogBatch, LogItemBatch, LOG_BATCH_HEADER_LEN}; -use crate::log_file::{LogFileHeader, LOG_FILE_MAX_HEADER_LEN}; use crate::{Error, Result}; -pub struct LogItemBatchFileReader { - file: Option>, +pub struct LogItemBatchFileReader { + file: Option, size: usize, buffer: Vec, @@ -16,7 +17,7 @@ pub struct LogItemBatchFileReader { read_block_size: usize, } -impl LogItemBatchFileReader { +impl LogItemBatchFileReader { pub fn new(read_block_size: usize) -> Self { Self { file: None, @@ -30,16 +31,15 @@ impl LogItemBatchFileReader { } } - pub fn open(&mut self, file: Box, size: usize) -> Result<()> { + pub fn open(&mut self, file: R, size: usize) -> Result<()> { self.file = Some(file); self.size = size; self.buffer.clear(); self.buffer_offset = 0; self.valid_offset = 0; - let peek_size = std::cmp::min(LOG_FILE_MAX_HEADER_LEN, size); - let mut header = self.peek(0, peek_size, LOG_BATCH_HEADER_LEN)?; + let mut header = self.peek(0, LOG_FILE_HEADER_LEN, LOG_BATCH_HEADER_LEN)?; LogFileHeader::decode(&mut header)?; - self.valid_offset = peek_size - header.len(); + self.valid_offset = LOG_FILE_HEADER_LEN; Ok(()) } @@ -94,7 +94,7 @@ impl LogItemBatchFileReader { self.buffer_offset + read ))); } - self.buffer.resize(read, 0); + self.buffer.truncate(read); Ok(&self.buffer[..size]) } else { let should_read = (offset + size + prefetch).saturating_sub(end); From 2258c4b828f00ab86b9e5714d93a53e3cbe54cdc Mon Sep 17 00:00:00 2001 From: tabokie Date: Tue, 7 Sep 2021 12:46:39 +0800 Subject: [PATCH 2/2] clippy stress crate Signed-off-by: tabokie --- stress/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/stress/src/main.rs b/stress/src/main.rs index c7fc2324..9409c8ab 100644 --- a/stress/src/main.rs +++ b/stress/src/main.rs @@ -14,12 +14,12 @@ use hdrhistogram::Histogram; use parking_lot_core::SpinWait; use raft::eraftpb::Entry; use raft_engine::{ - Command, Config, EventListener, FileId, LogBatch, LogQueue, MessageExt, RaftLogEngine, + Command, Config, Engine as RawEngine, EventListener, FileId, LogBatch, LogQueue, MessageExt, ReadableSize, }; use rand::{thread_rng, Rng}; -type Engine = RaftLogEngine; +type Engine = RawEngine; type WriteBatch = LogBatch; #[derive(Clone)] @@ -617,7 +617,7 @@ fn main() { let wb = Arc::new(WrittenBytesHook::new()); - let engine = Arc::new(Engine::open_with_listeners(config, None, vec![wb.clone()]).unwrap()); + let engine = Arc::new(Engine::open_with_listeners(config, vec![wb.clone()]).unwrap()); let mut write_threads = Vec::new(); let mut read_threads = Vec::new(); let mut misc_threads = Vec::new();