Skip to content

Commit

Permalink
refactor: CacheReader globally unify RangeCache to avoid duplicate …
Browse files Browse the repository at this point in the history
…creation
  • Loading branch information
KKould committed Oct 28, 2024
1 parent cd33a8f commit 4e0e04a
Show file tree
Hide file tree
Showing 18 changed files with 385 additions and 152 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ tokio = { version = "1", features = ["io-util"], default-features = false }
tokio-util = { version = "0.7" }
tonbo_macros = { version = "0.1.0", path = "tonbo_macros" }
tracing = "0.1"
ulid = "1"
ulid = { version = "1", features = ["serde"] }

# Only used for benchmarks
log = "0.4.22"
Expand Down
4 changes: 3 additions & 1 deletion examples/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ async fn main() {
// make sure the path exists
let _ = fs::create_dir_all("./db_path/users").await;

let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap());
let options = DbOption::from_path(Path::from_filesystem_path("./db_path/users").unwrap())
.await
.unwrap();
// pluggable async runtime and I/O
let db = DB::new(options, TokioExecutor::default()).await.unwrap();

Expand Down
22 changes: 16 additions & 6 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ where
.await?;

streams.push(ScanStream::SsTable {
inner: SsTable::open(option, file, path, is_local)
inner: SsTable::open(option, file, scope.gen, !is_local)
.await?
.scan(
(Bound::Unbounded, Bound::Unbounded),
Expand Down Expand Up @@ -583,7 +583,9 @@ pub(crate) mod tests {
let temp_dir = tempfile::tempdir().unwrap();
let temp_dir_l0 = tempfile::tempdir().unwrap();

let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap())
let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap())
.await
.unwrap()
.level_path(
0,
Path::from_filesystem_path(temp_dir_l0.path()).unwrap(),
Expand Down Expand Up @@ -697,7 +699,9 @@ pub(crate) mod tests {
Path::from_filesystem_path(temp_dir.path()).unwrap(),
"id".to_string(),
0,
);
)
.await
.unwrap();
manager
.base_fs()
.create_dir_all(&option.wal_dir_path())
Expand Down Expand Up @@ -765,7 +769,9 @@ pub(crate) mod tests {
let temp_dir_l0 = TempDir::new().unwrap();
let temp_dir_l1 = TempDir::new().unwrap();

let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap())
let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap())
.await
.unwrap()
.level_path(
0,
Path::from_filesystem_path(temp_dir_l0.path()).unwrap(),
Expand Down Expand Up @@ -1104,7 +1110,9 @@ pub(crate) mod tests {
pub(crate) async fn major_panic() {
let temp_dir = TempDir::new().unwrap();

let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap());
let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap())
.await
.unwrap();
option.major_threshold_with_sst_size = 1;
option.level_sst_magnification = 1;
let manager =
Expand Down Expand Up @@ -1211,7 +1219,9 @@ pub(crate) mod tests {
async fn test_flush_major_level_sort() {
let temp_dir = TempDir::new().unwrap();

let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap());
let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap())
.await
.unwrap();
option.immutable_chunk_num = 1;
option.immutable_chunk_max_num = 0;
option.major_threshold_with_sst_size = 2;
Expand Down
213 changes: 184 additions & 29 deletions src/fs/cache_reader.rs
Original file line number Diff line number Diff line change
@@ -1,70 +1,62 @@
use std::{ops::Range, sync::Arc};

use bytes::Bytes;
use foyer::{Cache, DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder};
use fusio::path::{path_to_local, Path};
use foyer::{Cache, HybridCache};
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData};

use crate::fs::{CacheError, CacheOption};
use crate::fs::FileId;

pub(crate) type MetaCache = Arc<Cache<Path, Arc<ParquetMetaData>>>;
pub(crate) type MetaCache = Arc<Cache<FileId, Arc<ParquetMetaData>>>;
pub(crate) type RangeCache = HybridCache<(FileId, Range<usize>), Bytes>;

pub(crate) struct CacheReader<R> {
gen: FileId,
inner: R,
meta_path: Path,
cache: HybridCache<Range<usize>, Bytes>,
range_cache: RangeCache,
meta_cache: MetaCache,
}

impl<R> CacheReader<R> {
pub(crate) async fn new(
option: &CacheOption,
pub(crate) fn new(
meta_cache: MetaCache,
range_cache: RangeCache,
gen: FileId,
inner: R,
meta_path: Path,
) -> Result<CacheReader<R>, CacheError> {
// SAFETY: `meta_path` must be the path of a parquet file
let path = path_to_local(&option.path.child(meta_path.filename().unwrap()))?;
let cache: HybridCache<Range<usize>, Bytes> = HybridCacheBuilder::new()
.memory(option.memory)
.storage(Engine::Large) // use large object disk cache engine only
.with_device_options(DirectFsDeviceOptions::new(path).with_capacity(option.local))
.build()
.await?;

Ok(Self {
) -> CacheReader<R> {
Self {
gen,
inner,
meta_path,
cache,
range_cache,
meta_cache,
})
}
}
}

impl<R: AsyncFileReader> AsyncFileReader for CacheReader<R> {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
async move {
let key = (self.gen, range);
if let Some(entry) = self
.cache
.get(&range)
.range_cache
.get(&key)
.await
.map_err(|e| parquet::errors::ParquetError::External(From::from(e)))?
{
return Ok(entry.value().clone());
}

let bytes = self.inner.get_bytes(range.clone()).await?;
let entry = self.cache.insert(range, bytes);
let bytes = self.inner.get_bytes(key.1.clone()).await?;
let entry = self.range_cache.insert(key, bytes);
Ok(entry.value().clone())
}
.boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
async move {
if let Some(entry) = self.meta_cache.get(&self.meta_path) {
if let Some(entry) = self.meta_cache.get(&self.gen) {
return Ok(entry.value().clone());
}

Expand All @@ -73,10 +65,173 @@ impl<R: AsyncFileReader> AsyncFileReader for CacheReader<R> {
.get_metadata()
.await
.map_err(|e| parquet::errors::ParquetError::External(From::from(e)))?;
let entry = self.meta_cache.insert(self.meta_path.clone(), meta);
let entry = self.meta_cache.insert(self.gen, meta);

Ok(entry.value().clone())
}
.boxed()
}
}

#[cfg(test)]
pub(crate) mod tests {
use std::{
collections::Bound,
sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
Arc,
},
};

use fusio::{dynamic::DynFile, path::Path, Error, IoBuf, IoBufMut, Read, Write};
use futures_util::StreamExt;
use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask};
use tempfile::TempDir;
use ulid::Ulid;

use crate::{
compaction::tests::build_parquet_table,
fs::FileType,
ondisk::sstable::SsTable,
record::{Record, RecordInstance},
tests::Test,
wal::log::LogType,
DbOption,
};

struct CountFile {
inner: Box<dyn DynFile>,
read_count: Arc<AtomicUsize>,
}

impl Read for CountFile {
async fn read_exact_at<B: IoBufMut>(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) {
self.read_count.fetch_add(1, SeqCst);
self.inner.read_exact_at(buf, pos).await
}

async fn read_to_end_at(&mut self, buf: Vec<u8>, pos: u64) -> (Result<(), Error>, Vec<u8>) {
self.read_count.fetch_add(1, SeqCst);
self.inner.read_to_end_at(buf, pos).await
}

async fn size(&self) -> Result<u64, Error> {
self.inner.size().await
}
}

impl Write for CountFile {
async fn write_all<B: IoBuf>(&mut self, buf: B) -> (Result<(), Error>, B) {
self.inner.write_all(buf).await
}

async fn flush(&mut self) -> Result<(), Error> {
self.inner.flush().await
}

async fn close(&mut self) -> Result<(), Error> {
self.inner.close().await
}
}

#[tokio::test]
async fn test_cache_read() {
let temp_dir = TempDir::new().unwrap();
let option =
DbOption::<Test>::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap())
.await
.unwrap();
let fs = option.base_fs.clone().parse().unwrap();
fs.create_dir_all(&option.version_log_dir_path())
.await
.unwrap();
fs.create_dir_all(&option.wal_dir_path()).await.unwrap();

let table_gen = Ulid::new();
build_parquet_table::<Test>(
&option,
table_gen,
vec![
(
LogType::Full,
Test {
vstring: 1.to_string(),
vu32: 0,
vbool: Some(true),
},
0.into(),
),
(
LogType::Full,
Test {
vstring: 2.to_string(),
vu32: 0,
vbool: Some(true),
},
0.into(),
),
(
LogType::Full,
Test {
vstring: 3.to_string(),
vu32: 0,
vbool: Some(true),
},
0.into(),
),
],
&RecordInstance::Normal,
0,
&fs,
)
.await
.unwrap();

let read_count = Arc::new(AtomicUsize::new(0));
let table_path = option.table_path(&table_gen, 0);
for _ in 0..1000 {
let mut scan = SsTable::<Test>::open(
&option,
Box::new(CountFile {
inner: fs
.open_options(&table_path, FileType::Parquet.open_options(true))
.await
.unwrap(),
read_count: read_count.clone(),
}),
table_gen,
true,
)
.await
.unwrap()
.scan(
(Bound::Unbounded, Bound::Unbounded),
0_u32.into(),
None,
ProjectionMask::roots(
&arrow_to_parquet_schema(Test::arrow_schema()).unwrap(),
[0, 1, 2, 3, 4],
),
)
.await
.unwrap();

let entry_0 = scan.next().await.unwrap().unwrap();
assert_eq!(entry_0.get().unwrap().vstring, "1");
assert_eq!(entry_0.get().unwrap().vu32, Some(0));
assert_eq!(entry_0.get().unwrap().vbool, Some(true));

let entry_1 = scan.next().await.unwrap().unwrap();
assert_eq!(entry_1.get().unwrap().vstring, "2");
assert_eq!(entry_1.get().unwrap().vu32, Some(0));
assert_eq!(entry_1.get().unwrap().vbool, Some(true));

let entry_2 = scan.next().await.unwrap().unwrap();
assert_eq!(entry_2.get().unwrap().vstring, "3");
assert_eq!(entry_2.get().unwrap().vu32, Some(0));
assert_eq!(entry_2.get().unwrap().vbool, Some(true));
}

assert_eq!(read_count.load(SeqCst), 9);
}
}
7 changes: 0 additions & 7 deletions src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,6 @@ pub(crate) fn parse_file_id(path: &Path, suffix: FileType) -> Result<Option<File
.transpose()
}

#[derive(Debug, Clone)]
pub struct CacheOption {
pub(crate) path: Path,
pub(crate) memory: usize,
pub(crate) local: usize,
}

#[derive(Debug, Error)]
pub enum CacheError {
#[error("cache io error: {0}")]
Expand Down
12 changes: 9 additions & 3 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ mod tests {

let temp_dir = tempfile::tempdir().unwrap();
let fs = Arc::new(TokioFs) as Arc<dyn DynFs>;
let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap());
let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap())
.await
.unwrap();
fs.create_dir_all(&option.wal_dir_path()).await.unwrap();

let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
Expand Down Expand Up @@ -265,7 +267,9 @@ mod tests {
async fn range() {
let temp_dir = tempfile::tempdir().unwrap();
let fs = Arc::new(TokioFs) as Arc<dyn DynFs>;
let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap());
let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap())
.await
.unwrap();
fs.create_dir_all(&option.wal_dir_path()).await.unwrap();

let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
Expand Down Expand Up @@ -352,7 +356,9 @@ mod tests {
Path::from_filesystem_path(temp_dir.path()).unwrap(),
"age".to_string(),
0,
);
)
.await
.unwrap();
let fs = Arc::new(TokioFs) as Arc<dyn DynFs>;
fs.create_dir_all(&option.wal_dir_path()).await.unwrap();

Expand Down
Loading

0 comments on commit 4e0e04a

Please sign in to comment.