From ea3588d4ce5b754ba0a8914d5a113f48248ec6f2 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sat, 11 Feb 2023 22:51:21 +0800 Subject: [PATCH 01/17] feat: compaction reader and writer --- Cargo.lock | 5 + Cargo.toml | 1 + src/common/time/src/lib.rs | 1 + src/common/time/src/timestamp.rs | 68 ++++++ src/datatypes/Cargo.toml | 1 + src/datatypes/src/arrow_array.rs | 5 + src/datatypes/src/vectors/primitive.rs | 2 +- src/datatypes/src/vectors/timestamp.rs | 20 ++ src/storage/Cargo.toml | 4 + src/storage/src/chunk.rs | 20 +- src/storage/src/compaction.rs | 1 + src/storage/src/compaction/writer.rs | 264 +++++++++++++++++++++++ src/storage/src/read.rs | 96 --------- src/storage/src/snapshot.rs | 2 +- src/storage/src/sst.rs | 5 +- src/storage/src/sst/parquet.rs | 281 ++++++++++++++++++++++++- src/storage/src/test_util/read_util.rs | 35 +-- 17 files changed, 663 insertions(+), 148 deletions(-) create mode 100644 src/storage/src/compaction/writer.rs diff --git a/Cargo.lock b/Cargo.lock index a98a7b047a02..263cdfbe4e78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2222,6 +2222,7 @@ name = "datatypes" version = "0.1.0" dependencies = [ "arrow", + "arrow-array", "arrow-schema", "common-base", "common-error", @@ -7056,6 +7057,8 @@ name = "storage" version = "0.1.0" dependencies = [ "arc-swap", + "arrow", + "arrow-array", "async-compat", "async-stream", "async-trait", @@ -7069,6 +7072,8 @@ dependencies = [ "common-telemetry", "common-time", "criterion 0.3.6", + "datafusion-common", + "datafusion-expr", "datatypes", "futures", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index e2f1c55c45f3..ad5d7d792b9d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ license = "Apache-2.0" [workspace.dependencies] arrow = "29.0" +arrow-array = "29.0" arrow-flight = "29.0" arrow-schema = { version = "29.0", features = ["serde"] } async-stream = "0.3" diff --git a/src/common/time/src/lib.rs b/src/common/time/src/lib.rs index fdc9033bed98..b71f7a880e43 100644 --- a/src/common/time/src/lib.rs +++ b/src/common/time/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(int_roundings)] // Copyright 2023 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index b326810a36f2..132af898d34f 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -85,6 +85,21 @@ impl Timestamp { } } + /// Convert a timestamp to given time unit. + /// Conversion from a timestamp with smaller unit to a larger unit will round the value + /// to ceil (positive infinity). + /// Return `None` if conversion causes overflow. + pub fn convert_to_ceil(&self, unit: TimeUnit) -> Option { + if self.unit().factor() >= unit.factor() { + let mul = self.unit().factor() / unit.factor(); + let value = self.value.checked_mul(mul)?; + Some(Timestamp::new(value, unit)) + } else { + let mul = unit.factor() / self.unit().factor(); + Some(Timestamp::new(self.value.div_ceil(mul), unit)) + } + } + /// Split a [Timestamp] into seconds part and nanoseconds part. /// Notice the seconds part of split result is always rounded down to floor. fn split(&self) -> (i64, i64) { @@ -718,4 +733,57 @@ mod tests { Timestamp::new(i64::MAX, TimeUnit::Second).split() ); } + + #[test] + fn test_convert_to_ceil() { + assert_eq!( + Timestamp::new(1, TimeUnit::Second), + Timestamp::new(1000, TimeUnit::Millisecond) + .convert_to_ceil(TimeUnit::Second) + .unwrap() + ); + + // These two cases shows how `Timestamp::convert_to_ceil` behaves differently + // from `Timestamp::convert_to` when converting larger unit to smaller unit. + assert_eq!( + Timestamp::new(1, TimeUnit::Second), + Timestamp::new(1001, TimeUnit::Millisecond) + .convert_to(TimeUnit::Second) + .unwrap() + ); + assert_eq!( + Timestamp::new(2, TimeUnit::Second), + Timestamp::new(1001, TimeUnit::Millisecond) + .convert_to_ceil(TimeUnit::Second) + .unwrap() + ); + + assert_eq!( + Timestamp::new(-1, TimeUnit::Second), + Timestamp::new(-1, TimeUnit::Millisecond) + .convert_to(TimeUnit::Second) + .unwrap() + ); + assert_eq!( + Timestamp::new(0, TimeUnit::Second), + Timestamp::new(-1, TimeUnit::Millisecond) + .convert_to_ceil(TimeUnit::Second) + .unwrap() + ); + + // When converting large unit to smaller unit, there will be no rounding error, + // so `Timestamp::convert_to_ceil` behaves just like `Timestamp::convert_to` + assert_eq!( + Timestamp::new(-1, TimeUnit::Second).convert_to(TimeUnit::Millisecond), + Timestamp::new(-1, TimeUnit::Second).convert_to_ceil(TimeUnit::Millisecond) + ); + assert_eq!( + Timestamp::new(1000, TimeUnit::Second).convert_to(TimeUnit::Millisecond), + Timestamp::new(1000, TimeUnit::Second).convert_to_ceil(TimeUnit::Millisecond) + ); + assert_eq!( + Timestamp::new(1, TimeUnit::Second).convert_to(TimeUnit::Millisecond), + Timestamp::new(1, TimeUnit::Second).convert_to_ceil(TimeUnit::Millisecond) + ); + } } diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index 0d0158a3b0ab..8eb03aa2dce9 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -9,6 +9,7 @@ default = [] test = [] [dependencies] +arrow-array.workspace = true arrow.workspace = true arrow-schema.workspace = true common-base = { path = "../common/base" } diff --git a/src/datatypes/src/arrow_array.rs b/src/datatypes/src/arrow_array.rs index d9b231bdb41e..220718a09c45 100644 --- a/src/datatypes/src/arrow_array.rs +++ b/src/datatypes/src/arrow_array.rs @@ -16,3 +16,8 @@ pub type BinaryArray = arrow::array::LargeBinaryArray; pub type MutableBinaryArray = arrow::array::LargeBinaryBuilder; pub type StringArray = arrow::array::StringArray; pub type MutableStringArray = arrow::array::StringBuilder; + +pub use arrow_array::types::{ + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, +}; diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index d797cf2d2b17..1be6831b1256 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -96,7 +96,7 @@ impl PrimitiveVector { } } - pub(crate) fn as_arrow(&self) -> &PrimitiveArray { + pub fn as_arrow(&self) -> &PrimitiveArray { &self.array } diff --git a/src/datatypes/src/vectors/timestamp.rs b/src/datatypes/src/vectors/timestamp.rs index 8248811ea35b..d9424556422d 100644 --- a/src/datatypes/src/vectors/timestamp.rs +++ b/src/datatypes/src/vectors/timestamp.rs @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use arrow::array::PrimitiveArray; +use paste::paste; + +use crate::arrow_array; use crate::types::{ TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, @@ -29,3 +33,19 @@ pub type TimestampMicrosecondVectorBuilder = PrimitiveVectorBuilder; pub type TimestampNanosecondVectorBuilder = PrimitiveVectorBuilder; + +macro_rules! impl_as_inner_for_timestamps { + ($($unit: ident), *) => { + $( + paste! { + impl [] { + pub fn as_inner(&self) -> &PrimitiveArray]> { + self.as_arrow() + } + } + } + )* + }; +} + +impl_as_inner_for_timestamps!(Second, Millisecond, Microsecond, Nanosecond); diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index b19a2012c91c..0da31bafa3a5 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -9,6 +9,8 @@ arc-swap = "1.0" async-compat = "0.2" async-stream.workspace = true async-trait = "0.1" +arrow.workspace = true +arrow-array.workspace = true bytes = "1.1" common-base = { path = "../common/base" } common-error = { path = "../common/error" } @@ -18,6 +20,8 @@ common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } +datafusion-common.workspace = true +datafusion-expr.workspace = true futures.workspace = true futures-util.workspace = true lazy_static = "1.4" diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index e88c317ed439..fb6f7b55335f 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -121,14 +121,27 @@ impl ChunkReaderBuilder { self } - pub fn pick_ssts(mut self, ssts: &LevelMetas) -> Result { + /// Picks all SSTs in all levels + pub fn pick_all_ssts(mut self, ssts: &LevelMetas) -> Result { ssts.visit_levels(&mut self)?; - Ok(self) } + /// Picks given SSTs to read. + pub fn pick_ssts(mut self, ssts: &[FileHandle]) -> Self { + for file in ssts { + self.files_to_read.push(file.clone()); + } + self + } + pub async fn build(mut self) -> Result { let time_range_predicate = self.build_time_range_predicate(); + debug!( + "Time range predicate for chunk reader: {:?}", + time_range_predicate + ); + let schema = Arc::new( ProjectedSchema::new(self.schema, self.projection) .context(error::InvalidProjectionSnafu)?, @@ -148,6 +161,7 @@ impl ChunkReaderBuilder { batch_size: self.iter_ctx.batch_size, projected_schema: schema.clone(), predicate: Predicate::new(self.filters), + time_range: time_range_predicate, }; for file in &self.files_to_read { if !Self::file_in_range(file, time_range_predicate) { @@ -192,8 +206,6 @@ impl ChunkReaderBuilder { impl Visitor for ChunkReaderBuilder { fn visit(&mut self, _level: usize, files: &[FileHandle]) -> Result<()> { - // TODO(yingwen): Filter files by time range. - // Now we read all files, so just reserve enough space to hold all files. self.files_to_read.reserve(files.len()); for file in files { diff --git a/src/storage/src/compaction.rs b/src/storage/src/compaction.rs index b4c2490b856f..4dfd973ae55b 100644 --- a/src/storage/src/compaction.rs +++ b/src/storage/src/compaction.rs @@ -18,3 +18,4 @@ mod rate_limit; mod scheduler; mod strategy; mod task; +mod writer; diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs new file mode 100644 index 000000000000..432b55325a26 --- /dev/null +++ b/src/storage/src/compaction/writer.rs @@ -0,0 +1,264 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_query::logical_plan::{DfExpr, Expr}; +use datafusion_common::ScalarValue; +use datafusion_expr::{BinaryExpr, Operator}; + +use crate::chunk::{ChunkReaderBuilder, ChunkReaderImpl}; +use crate::schema::RegionSchemaRef; +use crate::sst::{AccessLayerRef, FileHandle}; + +pub(crate) async fn build_sst_reader( + schema: RegionSchemaRef, + sst_layer: AccessLayerRef, + files: &[FileHandle], + lower_sec_inclusive: i64, + upper_sec_exclusive: i64, +) -> ChunkReaderImpl { + let ts_col_name = schema + .user_schema() + .timestamp_column() + .unwrap() + .name + .clone(); + let reader = ChunkReaderBuilder::new(schema, sst_layer) + .pick_ssts(files) + .filters(vec![build_time_range_filter( + lower_sec_inclusive, + upper_sec_exclusive, + &ts_col_name, + )]) + .build() + .await + .unwrap(); + + reader +} + +fn build_time_range_filter(low_sec: i64, high_sec: i64, ts_col_name: &str) -> Expr { + let ts_col = Box::new(DfExpr::Column(datafusion_common::Column::from_name( + ts_col_name, + ))); + let lower_bound_expr = Box::new(DfExpr::Literal(ScalarValue::TimestampSecond( + Some(low_sec), + None, + ))); + + let upper_bound_expr = Box::new(DfExpr::Literal(ScalarValue::TimestampSecond( + Some(high_sec), + None, + ))); + + let expr = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(DfExpr::BinaryExpr(BinaryExpr { + left: ts_col.clone(), + op: Operator::GtEq, + right: lower_bound_expr, + })), + op: Operator::And, + right: Box::new(DfExpr::BinaryExpr(BinaryExpr { + left: ts_col, + op: Operator::Lt, + right: upper_bound_expr, + })), + }); + + Expr::from(expr) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datatypes::prelude::{LogicalTypeId, ScalarVector, ScalarVectorBuilder}; + use datatypes::timestamp::TimestampMillisecond; + use datatypes::vectors::{ + TimestampMillisecondVector, TimestampMillisecondVectorBuilder, UInt64VectorBuilder, + }; + use object_store::backend::fs::Builder; + use object_store::ObjectStore; + use store_api::storage::{ChunkReader, OpType, SequenceNumber}; + + use super::*; + use crate::memtable::{ + DefaultMemtableBuilder, IterContext, KeyValues, Memtable, MemtableBuilder, + }; + use crate::metadata::RegionMetadata; + use crate::sst; + use crate::sst::parquet::ParquetWriter; + use crate::sst::{FileMeta, FsAccessLayer, SstInfo}; + use crate::test_util::descriptor_util::RegionDescBuilder; + + fn schema_for_test() -> RegionSchemaRef { + // Just build a region desc and use its columns metadata. + let desc = RegionDescBuilder::new("test") + .enable_version_column(false) + .push_value_column(("v", LogicalTypeId::UInt64, true)) + .build(); + let metadata: RegionMetadata = desc.try_into().unwrap(); + metadata.schema().clone() + } + + pub fn write_kvs( + memtable: &dyn Memtable, + sequence: SequenceNumber, + op_type: OpType, + ts: &[i64], // timestamp + values: &[Option], + ) { + let keys: Vec = ts.iter().map(|ts| (*ts).into()).collect(); + let kvs = kvs_for_test(sequence, op_type, &keys, values); + memtable.write(&kvs).unwrap(); + } + + fn kvs_for_test( + sequence: SequenceNumber, + op_type: OpType, + ts: &[TimestampMillisecond], + values: &[Option], + ) -> KeyValues { + let start_index_in_batch = 0; + assert_eq!(ts.len(), values.len()); + let mut key_builders = TimestampMillisecondVectorBuilder::with_capacity(ts.len()); + for key in ts { + key_builders.push(Some(key.clone())); + } + let row_keys = vec![Arc::new(key_builders.finish()) as _]; + + let mut value_builders = UInt64VectorBuilder::with_capacity(values.len()); + + for value in values { + value_builders.push(*value); + } + let row_values = vec![Arc::new(value_builders.finish()) as _]; + + let kvs = KeyValues { + sequence, + op_type, + start_index_in_batch, + keys: row_keys, + values: row_values, + }; + + assert_eq!(ts.len(), kvs.len()); + assert_eq!(ts.is_empty(), kvs.is_empty()); + + kvs + } + + async fn write_sst( + sst_file_name: &str, + schema: RegionSchemaRef, + object_store: ObjectStore, + ts: &[i64], + ) -> FileHandle { + let memtable = DefaultMemtableBuilder::default().build(schema.clone()); + + let values = ts.iter().map(|i| (*i) as u64).map(Some).collect::>(); + + write_kvs( + &*memtable, + 10, // sequence + OpType::Put, + ts, // keys + &values, // values + ); + + let iter = memtable.iter(&IterContext::default()).unwrap(); + let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone()); + + let SstInfo { + start_timestamp, + end_timestamp, + } = writer + .write_sst(&sst::WriteOptions::default()) + .await + .unwrap(); + FileHandle::new(FileMeta { + file_name: sst_file_name.to_string(), + level: 0, + start_timestamp, + end_timestamp, + }) + } + + async fn check_reads( + schema: RegionSchemaRef, + sst_layer: AccessLayerRef, + files: &[FileHandle], + lower_sec_inclusive: i64, + upper_sec_exclusive: i64, + expect: &[i64], + ) { + let mut reader = build_sst_reader( + schema, + sst_layer, + files, + lower_sec_inclusive, + upper_sec_exclusive, + ) + .await; + + let mut res = vec![]; + while let Some(f) = reader.next_chunk().await.unwrap() { + let ts_col = f.columns[0] + .as_any() + .downcast_ref::() + .unwrap(); + res.extend(ts_col.iter_data().map(|t| t.unwrap().0.value())); + } + assert_eq!(expect, &res); + } + + #[tokio::test] + async fn test_sst_reader() { + // let dir = TempDir::new("write_parquet").unwrap(); + // let path = dir.path().to_str().unwrap(); + let path = "/Users/lei/parquet"; + let backend = Builder::default().root(path).build().unwrap(); + let object_store = ObjectStore::new(backend); + + let schema = schema_for_test(); + let file1 = write_sst( + "a.parquet", + schema.clone(), + object_store.clone(), + &[1000, 2000, 3000, 4001, 5001], + ) + .await; + let file2 = write_sst( + "b.parquet", + schema.clone(), + object_store.clone(), + &[4002, 5002, 6000, 7000, 8000], + ) + .await; + let sst_layer = Arc::new(FsAccessLayer::new("./", object_store)); + + let files = vec![file1, file2]; + // read from two sst files with time range filter, + check_reads( + schema.clone(), + sst_layer.clone(), + &files, + 3, + 6, + &[3000, 4001, 4002, 5001, 5002], + ) + .await; + + check_reads(schema, sst_layer, &files, 1, 2, &[1000]).await; + } +} diff --git a/src/storage/src/read.rs b/src/storage/src/read.rs index fad5b613d6ff..53a701746c0a 100644 --- a/src/storage/src/read.rs +++ b/src/storage/src/read.rs @@ -258,99 +258,3 @@ pub trait BatchReader: Send { /// Pointer to [BatchReader]. pub type BoxedBatchReader = Box; - -/// Concat reader inputs. -pub struct ConcatReader { - readers: Vec, - curr_idx: usize, -} - -impl ConcatReader { - pub fn new(readers: Vec) -> ConcatReader { - ConcatReader { - readers, - curr_idx: 0, - } - } -} - -#[async_trait] -impl BatchReader for ConcatReader { - async fn next_batch(&mut self) -> Result> { - loop { - if self.curr_idx >= self.readers.len() { - return Ok(None); - } - - let reader = &mut self.readers[self.curr_idx]; - match reader.next_batch().await? { - Some(batch) => { - if !batch.is_empty() { - return Ok(Some(batch)); - } - } - None => self.curr_idx += 1, - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::read::BatchReader; - use crate::test_util::read_util; - - #[tokio::test] - async fn test_concat_reader_empty() { - let mut reader = ConcatReader::new(Vec::new()); - - assert!(reader.next_batch().await.unwrap().is_none()); - // Call next_batch() again is allowed. - assert!(reader.next_batch().await.unwrap().is_none()); - } - - #[tokio::test] - async fn test_concat_multiple_readers() { - let readers = vec![ - read_util::build_boxed_reader(&[&[(1, Some(1)), (2, Some(2))], &[(3, None)]]), - read_util::build_boxed_reader(&[&[(4, None)]]), - read_util::build_boxed_reader(&[&[(5, Some(5)), (6, Some(6))]]), - ]; - - let mut reader = ConcatReader::new(readers); - - read_util::check_reader_with_kv_batch( - &mut reader, - &[ - &[(1, Some(1)), (2, Some(2))], - &[(3, None)], - &[(4, None)], - &[(5, Some(5)), (6, Some(6))], - ], - ) - .await; - } - - #[tokio::test] - async fn test_concat_reader_with_empty_reader() { - let readers = vec![ - read_util::build_boxed_reader(&[&[(1, Some(1)), (2, Some(2))], &[(3, None)]]), - // Empty reader. - read_util::build_boxed_reader(&[&[]]), - read_util::build_boxed_reader(&[&[(5, Some(5)), (6, Some(6))]]), - ]; - - let mut reader = ConcatReader::new(readers); - - read_util::check_reader_with_kv_batch( - &mut reader, - &[ - &[(1, Some(1)), (2, Some(2))], - &[(3, None)], - &[(5, Some(5)), (6, Some(6))], - ], - ) - .await; - } -} diff --git a/src/storage/src/snapshot.rs b/src/storage/src/snapshot.rs index 2d8323531393..097bdfbf1ce3 100644 --- a/src/storage/src/snapshot.rs +++ b/src/storage/src/snapshot.rs @@ -66,7 +66,7 @@ impl Snapshot for SnapshotImpl { builder = builder.pick_memtables(memtable.clone()); } - let reader = builder.pick_ssts(self.version.ssts())?.build().await?; + let reader = builder.pick_all_ssts(self.version.ssts())?.build().await?; Ok(ScanResponse { reader }) } diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 7aca8f7c22b2..bbc69bc49717 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod parquet; +pub(crate) mod parquet; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use async_trait::async_trait; +use common_time::range::TimestampRange; use common_time::Timestamp; use object_store::{util, ObjectStore}; use serde::{Deserialize, Serialize}; @@ -236,6 +237,7 @@ pub struct ReadOptions { pub projected_schema: ProjectedSchemaRef, pub predicate: Predicate, + pub time_range: TimestampRange, } #[derive(Debug)] @@ -303,6 +305,7 @@ impl AccessLayer for FsAccessLayer { self.object_store.clone(), opts.projected_schema.clone(), opts.predicate.clone(), + opts.time_range.clone(), ); let stream = reader.chunk_stream().await?; diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 0b6c810981c3..86f59b91f6ca 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -18,21 +18,32 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; +use arrow::datatypes::DataType; +use arrow_array::types::Int64Type; +use arrow_array::{ + Array, PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, +}; use async_compat::CompatExt; use async_stream::try_stream; use async_trait::async_trait; use common_telemetry::error; +use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; +use datatypes::arrow::array::BooleanArray; +use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::ConcreteDataType; use futures_util::{Stream, StreamExt, TryStreamExt}; use object_store::ObjectStore; +use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::basic::{Compression, Encoding}; use parquet::file::metadata::KeyValue; use parquet::file::properties::WriterProperties; use parquet::format::FileMetaData; +use parquet::schema::types::SchemaDescriptor; use snafu::{OptionExt, ResultExt}; use table::predicate::Predicate; use tokio::io::BufReader; @@ -206,6 +217,7 @@ pub struct ParquetReader<'a> { object_store: ObjectStore, projected_schema: ProjectedSchemaRef, predicate: Predicate, + time_range: TimestampRange, } impl<'a> ParquetReader<'a> { @@ -214,12 +226,14 @@ impl<'a> ParquetReader<'a> { object_store: ObjectStore, projected_schema: ProjectedSchemaRef, predicate: Predicate, + time_range: TimestampRange, ) -> ParquetReader { ParquetReader { file_path, object_store, projected_schema, predicate, + time_range, } } @@ -260,18 +274,21 @@ impl<'a> ParquetReader<'a> { .filter_map(|(idx, valid)| if valid { Some(idx) } else { None }) .collect::>(); - let projection = ProjectionMask::roots( - builder.metadata().file_metadata().schema_descr(), - adapter.fields_to_read(), - ); + let parquet_schema_desc = builder.metadata().file_metadata().schema_descr_ptr(); - let mut stream = builder + let projection = ProjectionMask::roots(&parquet_schema_desc, adapter.fields_to_read()); + let mut builder = builder .with_projection(projection) - .with_row_groups(pruned_row_groups) - .build() - .context(ReadParquetSnafu { - file: self.file_path, - })?; + .with_row_groups(pruned_row_groups); + + // if time range row filter is present, we can push down the filter to reduce rows to scan. + if let Some(row_filter) = self.build_time_range_row_filter(&parquet_schema_desc) { + builder = builder.with_row_filter(row_filter); + } + + let mut stream = builder.build().context(ReadParquetSnafu { + file: self.file_path, + })?; let file_name = self.file_path.to_string(); let chunk_stream = try_stream!({ @@ -282,6 +299,121 @@ impl<'a> ParquetReader<'a> { ChunkStream::new(adapter, Box::pin(chunk_stream)) } + + /// Builds time range row filter. + fn build_time_range_row_filter(&self, schema_desc: &SchemaDescriptor) -> Option { + let ts_col_idx = self + .projected_schema + .schema_to_read() + .schema() + .timestamp_index()?; + let ts_col = self + .projected_schema + .schema_to_read() + .schema() + .timestamp_column()?; + + let ts_col_unit = match &ts_col.data_type { + ConcreteDataType::Int64(_) => TimeUnit::Millisecond, + ConcreteDataType::Timestamp(ts_type) => ts_type.unit(), + _ => unreachable!(), + }; + + // build lower and upper bound according to time range and timestamp column data type. + let lower = self + .time_range + .start() + .and_then(|s| s.convert_to(ts_col_unit)) + .map(|t| t.value()) + .unwrap_or(i64::MIN); + + let upper = self + .time_range + .end() + .and_then(|s| s.convert_to_ceil(ts_col_unit)) // convert to ceil to relax time range and prevent data loss caused by rounding error. + .map(|t| t.value()) + .unwrap_or(i64::MAX); + + let projection = ProjectionMask::roots(schema_desc, vec![ts_col_idx]); + let filter = RowFilter::new(vec![Box::new(TimestampRowFilter::new( + ts_col_idx, projection, lower, upper, + ))]); + Some(filter) + } +} + +/// `TimestampRowFilter` is used to filter rows within given timestamp range when reading +/// row groups from parquet files, while avoids fetching all columns from SSTs file. +struct TimestampRowFilter { + timestamp_index: usize, + lower_bound: i64, + upper_bound: i64, + projection: ProjectionMask, +} + +impl TimestampRowFilter { + fn new( + ts_col_idx: usize, + projection: ProjectionMask, + lower_bound: i64, + upper_bound: i64, + ) -> Self { + Self { + timestamp_index: ts_col_idx, + lower_bound, + upper_bound, + projection, + } + } +} + +impl ArrowPredicate for TimestampRowFilter { + fn projection(&self) -> &ProjectionMask { + &self.projection + } + + /// Selects the rows matching given time range. + fn evaluate(&mut self, batch: RecordBatch) -> std::result::Result { + let row_cnt = batch.num_rows(); + let ts_col = batch.column(self.timestamp_index); + + macro_rules! downcast_and_compute { + ($typ: ty) => { + { + let ts_col = ts_col + .as_any() + .downcast_ref::<$typ>() + .unwrap(); // safety: we've checked the data type of timestamp column. + let lower_bound = PrimitiveArray::from_value(self.lower_bound, row_cnt); + let upper_bound = PrimitiveArray::from_value(self.upper_bound, row_cnt); + let left = arrow::compute::gt_eq(ts_col, &lower_bound)?; + let right = arrow::compute::lt(ts_col, &upper_bound)?; + arrow::compute::and(&left, &right) + } + }; + } + + match ts_col.data_type() { + DataType::Timestamp(unit, _) => match unit { + arrow::datatypes::TimeUnit::Second => { + downcast_and_compute!(TimestampSecondArray) + } + arrow::datatypes::TimeUnit::Millisecond => { + downcast_and_compute!(TimestampMillisecondArray) + } + arrow::datatypes::TimeUnit::Microsecond => { + downcast_and_compute!(TimestampMicrosecondArray) + } + arrow::datatypes::TimeUnit::Nanosecond => { + downcast_and_compute!(TimestampNanosecondArray) + } + }, + DataType::Int64 => downcast_and_compute!(PrimitiveArray), + _ => { + unreachable!() + } + } + } } pub type SendableChunkStream = Pin> + Send>>; @@ -313,7 +445,8 @@ mod tests { use std::sync::Arc; use datatypes::arrow::array::{Array, ArrayRef, UInt64Array, UInt8Array}; - use datatypes::prelude::Vector; + use datatypes::prelude::{ScalarVector, Vector}; + use datatypes::types::{TimestampMillisecondType, TimestampType}; use datatypes::vectors::TimestampMillisecondVector; use object_store::backend::fs::Builder; use store_api::storage::OpType; @@ -483,6 +616,7 @@ mod tests { operator, projected_schema, Predicate::empty(), + TimestampRange::min_to_max(), ); let mut rows_fetched = 0; @@ -554,6 +688,7 @@ mod tests { operator, projected_schema, Predicate::empty(), + TimestampRange::min_to_max(), ); let mut stream = reader.chunk_stream().await.unwrap(); @@ -568,4 +703,128 @@ mod tests { .num_rows() ); } + + async fn check_range_read( + file_name: &str, + object_store: ObjectStore, + schema: ProjectedSchemaRef, + range: TimestampRange, + expect: Vec, + ) { + let reader = ParquetReader::new(file_name, object_store, schema, Predicate::empty(), range); + let mut stream = reader.chunk_stream().await.unwrap(); + let result = stream.next_batch().await; + + let Some(batch) = result.unwrap() else { + // if batch does not contain any row + assert!(expect.is_empty()); + return; + }; + + assert_eq!( + ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType)), + batch.column(0).data_type() + ); + + let ts = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .map(|t| t.unwrap().0.value()) + .collect::>(); + assert_eq!(expect, ts); + } + + #[tokio::test] + async fn test_parquet_reader_with_time_range_filter() { + common_telemetry::init_default_ut_logging(); + let schema = memtable_tests::schema_for_test(); + let memtable = DefaultMemtableBuilder::default().build(schema.clone()); + + memtable_tests::write_kvs( + &*memtable, + 10, // sequence + OpType::Put, + &[ + (1000, 1), + (1000, 2), + (2002, 1), + (2003, 1), + (2003, 5), + (1001, 1), + (3001, 1), + ], // keys + &[ + (Some(1), Some(1234)), + (Some(2), Some(1234)), + (Some(7), Some(1234)), + (Some(8), Some(1234)), + (Some(9), Some(1234)), + (Some(3), Some(1234)), + (Some(7), Some(1234)), + ], // values + ); + + let dir = TempDir::new("read-parquet-by-range").unwrap(); + let path = dir.path().to_str().unwrap(); + let backend = Builder::default().root(path).build().unwrap(); + let object_store = ObjectStore::new(backend); + let sst_file_name = "test-read.parquet"; + let iter = memtable.iter(&IterContext::default()).unwrap(); + let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone()); + + let SstInfo { + start_timestamp, + end_timestamp, + } = writer + .write_sst(&sst::WriteOptions::default()) + .await + .unwrap(); + + assert_eq!(Some(Timestamp::new_millisecond(1000)), start_timestamp); + assert_eq!(Some(Timestamp::new_millisecond(3001)), end_timestamp); + + let projected_schema = + Arc::new(ProjectedSchema::new(schema, Some(vec![1, 0, 3, 2])).unwrap()); + + // check_range_read( + // sst_file_name, + // object_store.clone(), + // projected_schema.clone(), + // TimestampRange::with_unit(1000, 2003, TimeUnit::Millisecond).unwrap(), + // vec![1000, 1000, 1001, 2002], + // ) + // .await; + // + // check_range_read( + // sst_file_name, + // object_store.clone(), + // projected_schema.clone(), + // TimestampRange::with_unit(2002, 3001, TimeUnit::Millisecond).unwrap(), + // vec![2002, 2003, 2003], + // ) + // .await; + // + // // read a range without any rows. + // check_range_read( + // sst_file_name, + // object_store.clone(), + // projected_schema.clone(), + // TimestampRange::with_unit(3002, 3003, TimeUnit::Millisecond).unwrap(), + // vec![], + // ) + // .await; + + // read full range + check_range_read( + sst_file_name, + object_store, + projected_schema, + TimestampRange::min_to_max(), + vec![1000, 1000, 1001, 2002, 2003, 2003, 3001], + ) + .await; + } } diff --git a/src/storage/src/test_util/read_util.rs b/src/storage/src/test_util/read_util.rs index 595bc3c77fbe..2e8ab94fa097 100644 --- a/src/storage/src/test_util/read_util.rs +++ b/src/storage/src/test_util/read_util.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use async_trait::async_trait; -use datatypes::prelude::{ScalarVector, WrapperType}; +use datatypes::prelude::ScalarVector; use datatypes::type_id::LogicalTypeId; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector}; use store_api::storage::OpType; @@ -71,27 +71,6 @@ pub fn new_full_kv_batch(all_values: &[(i64, i64, u64, OpType)]) -> Batch { Batch::new(vec![key, value, sequences, op_types]) } -fn check_kv_batch(batches: &[Batch], expect: &[&[(i64, Option)]]) { - for (batch, key_values) in batches.iter().zip(expect.iter()) { - let key = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let value = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - - for (i, (k, v)) in key_values.iter().enumerate() { - assert_eq!(key.get_data(i).unwrap().into_native(), *k); - assert_eq!(value.get_data(i), *v,); - } - } - assert_eq!(batches.len(), expect.len()); -} - pub async fn collect_kv_batch(reader: &mut dyn BatchReader) -> Vec<(i64, Option)> { let mut result = Vec::new(); while let Some(batch) = reader.next_batch().await.unwrap() { @@ -114,18 +93,6 @@ pub async fn collect_kv_batch(reader: &mut dyn BatchReader) -> Vec<(i64, Option< result } -pub async fn check_reader_with_kv_batch( - reader: &mut dyn BatchReader, - expect: &[&[(i64, Option)]], -) { - let mut result = Vec::new(); - while let Some(batch) = reader.next_batch().await.unwrap() { - result.push(batch); - } - - check_kv_batch(&result, expect); -} - /// A reader for test that pop batch from Vec. pub struct VecBatchReader { schema: ProjectedSchemaRef, From 68418b6df44e44c78a0c28010ad84a71be613589 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sun, 12 Feb 2023 14:11:18 +0800 Subject: [PATCH 02/17] feat: make ParquetWrite accept both memtable iterator and chunk reader --- src/storage/src/chunk.rs | 5 ++ src/storage/src/compaction/writer.rs | 4 +- src/storage/src/sst.rs | 4 +- src/storage/src/sst/parquet.rs | 106 ++++++++++++++++----------- 4 files changed, 73 insertions(+), 46 deletions(-) diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index fb6f7b55335f..d3b72b8e4447 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -64,6 +64,11 @@ impl ChunkReaderImpl { batch_reader, } } + + #[inline] + pub fn projected_schema(&self) -> &ProjectedSchemaRef { + &self.schema + } } /// Builder to create a new [ChunkReaderImpl] from scan request. diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index 432b55325a26..ad9795c13545 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -97,7 +97,7 @@ mod tests { }; use crate::metadata::RegionMetadata; use crate::sst; - use crate::sst::parquet::ParquetWriter; + use crate::sst::parquet::{ParquetWriter, Source}; use crate::sst::{FileMeta, FsAccessLayer, SstInfo}; use crate::test_util::descriptor_util::RegionDescBuilder; @@ -177,7 +177,7 @@ mod tests { ); let iter = memtable.iter(&IterContext::default()).unwrap(); - let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone()); + let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); let SstInfo { start_timestamp, diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index bbc69bc49717..07fda1a14aae 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -28,7 +28,7 @@ use crate::error::Result; use crate::memtable::BoxedBatchIterator; use crate::read::BoxedBatchReader; use crate::schema::ProjectedSchemaRef; -use crate::sst::parquet::{ParquetReader, ParquetWriter}; +use crate::sst::parquet::{ParquetReader, ParquetWriter, Source}; /// Maximum level of SSTs. pub const MAX_LEVEL: u8 = 2; @@ -294,7 +294,7 @@ impl AccessLayer for FsAccessLayer { // Now we only supports parquet format. We may allow caller to specific SST format in // WriteOptions in the future. let file_path = self.sst_file_path(file_name); - let writer = ParquetWriter::new(&file_path, iter, self.object_store.clone()); + let writer = ParquetWriter::new(&file_path, Source::Iter(iter), self.object_store.clone()); writer.write_sst(opts).await } diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 86f59b91f6ca..adf40fee8b90 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -45,9 +45,11 @@ use parquet::file::properties::WriterProperties; use parquet::format::FileMetaData; use parquet::schema::types::SchemaDescriptor; use snafu::{OptionExt, ResultExt}; +use store_api::storage::ChunkReader; use table::predicate::Predicate; use tokio::io::BufReader; +use crate::chunk::ChunkReaderImpl; use crate::error::{ self, DecodeParquetTimeRangeSnafu, NewRecordBatchSnafu, ReadObjectSnafu, ReadParquetSnafu, Result, WriteObjectSnafu, WriteParquetSnafu, @@ -62,20 +64,16 @@ use crate::sst::SstInfo; /// Parquet sst writer. pub struct ParquetWriter<'a> { file_path: &'a str, - iter: BoxedBatchIterator, + source: Source, object_store: ObjectStore, max_row_group_size: usize, } impl<'a> ParquetWriter<'a> { - pub fn new( - file_path: &'a str, - iter: BoxedBatchIterator, - object_store: ObjectStore, - ) -> ParquetWriter { + pub fn new(file_path: &'a str, source: Source, object_store: ObjectStore) -> ParquetWriter { ParquetWriter { file_path, - iter, + source, object_store, max_row_group_size: 4096, // TODO(hl): make this configurable } @@ -88,8 +86,8 @@ impl<'a> ParquetWriter<'a> { /// Iterates memtable and writes rows to Parquet file. /// A chunk of records yielded from each iteration with a size given /// in config will be written to a single row group. - async fn write_rows(self, extra_meta: Option>) -> Result { - let projected_schema = self.iter.schema(); + async fn write_rows(mut self, extra_meta: Option>) -> Result { + let projected_schema = self.source.projected_schema(); let store_schema = projected_schema.schema_to_read(); let schema = store_schema.arrow_schema().clone(); let object = self.object_store.object(self.file_path); @@ -111,8 +109,8 @@ impl<'a> ParquetWriter<'a> { let mut buf = vec![]; let mut arrow_writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(writer_props)) .context(WriteParquetSnafu)?; - for batch in self.iter { - let batch = batch?; + + while let Some(batch) = self.source.next_batch().await? { let arrow_batch = RecordBatch::try_new( schema.clone(), batch @@ -440,6 +438,30 @@ impl BatchReader for ChunkStream { } } +pub enum Source { + Iter(BoxedBatchIterator), + Reader(ChunkReaderImpl), +} + +impl Source { + async fn next_batch(&mut self) -> Result> { + match self { + Source::Iter(iter) => iter.next().transpose(), + Source::Reader(reader) => reader + .next_chunk() + .await + .map(|p| p.map(|chunk| Batch::new(chunk.columns))), + } + } + + fn projected_schema(&self) -> ProjectedSchemaRef { + match self { + Source::Iter(iter) => iter.schema(), + Source::Reader(reader) => reader.projected_schema().clone(), + } + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -491,7 +513,7 @@ mod tests { let object_store = ObjectStore::new(backend); let sst_file_name = "test-flush.parquet"; let iter = memtable.iter(&IterContext::default()).unwrap(); - let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone()); + let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); writer .write_sst(&sst::WriteOptions::default()) @@ -589,7 +611,7 @@ mod tests { let object_store = ObjectStore::new(backend); let sst_file_name = "test-read-large.parquet"; let iter = memtable.iter(&IterContext::default()).unwrap(); - let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone()); + let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); let SstInfo { time_range } = writer .write_sst(&sst::WriteOptions::default()) @@ -661,7 +683,7 @@ mod tests { let object_store = ObjectStore::new(backend); let sst_file_name = "test-read.parquet"; let iter = memtable.iter(&IterContext::default()).unwrap(); - let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone()); + let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); let SstInfo { time_range } = writer .write_sst(&sst::WriteOptions::default()) @@ -773,7 +795,7 @@ mod tests { let object_store = ObjectStore::new(backend); let sst_file_name = "test-read.parquet"; let iter = memtable.iter(&IterContext::default()).unwrap(); - let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone()); + let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); let SstInfo { start_timestamp, @@ -789,33 +811,33 @@ mod tests { let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1, 0, 3, 2])).unwrap()); - // check_range_read( - // sst_file_name, - // object_store.clone(), - // projected_schema.clone(), - // TimestampRange::with_unit(1000, 2003, TimeUnit::Millisecond).unwrap(), - // vec![1000, 1000, 1001, 2002], - // ) - // .await; - // - // check_range_read( - // sst_file_name, - // object_store.clone(), - // projected_schema.clone(), - // TimestampRange::with_unit(2002, 3001, TimeUnit::Millisecond).unwrap(), - // vec![2002, 2003, 2003], - // ) - // .await; - // - // // read a range without any rows. - // check_range_read( - // sst_file_name, - // object_store.clone(), - // projected_schema.clone(), - // TimestampRange::with_unit(3002, 3003, TimeUnit::Millisecond).unwrap(), - // vec![], - // ) - // .await; + check_range_read( + sst_file_name, + object_store.clone(), + projected_schema.clone(), + TimestampRange::with_unit(1000, 2003, TimeUnit::Millisecond).unwrap(), + vec![1000, 1000, 1001, 2002], + ) + .await; + + check_range_read( + sst_file_name, + object_store.clone(), + projected_schema.clone(), + TimestampRange::with_unit(2002, 3001, TimeUnit::Millisecond).unwrap(), + vec![2002, 2003, 2003], + ) + .await; + + // read a range without any rows. + check_range_read( + sst_file_name, + object_store.clone(), + projected_schema.clone(), + TimestampRange::with_unit(3002, 3003, TimeUnit::Millisecond).unwrap(), + vec![], + ) + .await; // read full range check_range_read( From a68982695dd09308d3f7cf659eed33b604c850a9 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sun, 12 Feb 2023 19:08:33 +0800 Subject: [PATCH 03/17] feat: adapt ParquetWriter to accomodate ChunkReaderImpl --- src/mito/src/table.rs | 3 +- src/mito/src/table/test_util/mock_engine.rs | 6 +- src/storage/src/chunk.rs | 14 +- src/storage/src/compaction/writer.rs | 242 ++++++++++++++++++-- src/storage/src/read.rs | 2 +- src/storage/src/region/tests.rs | 3 +- src/storage/src/region/tests/alter.rs | 3 +- src/storage/src/region/tests/projection.rs | 1 + src/storage/src/schema/projected.rs | 15 ++ src/storage/src/sst.rs | 2 +- src/storage/src/sst/parquet.rs | 3 + src/store-api/src/storage/chunk.rs | 6 +- 12 files changed, 268 insertions(+), 32 deletions(-) diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 8f1dd5b7d2c8..1b176bf274ba 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -171,7 +171,7 @@ impl Table for MitoTable { .context(table_error::TableOperationSnafu)? .reader; - let schema = reader.schema().clone(); + let schema = reader.user_schema().clone(); if let Some(first_schema) = &first_schema { // TODO(hl): we assume all regions' schemas are the same, but undergoing table altering // may make these schemas inconsistent. @@ -198,6 +198,7 @@ impl Table for MitoTable { let stream = Box::pin(async_stream::try_stream! { for mut reader in readers { while let Some(chunk) = reader.next_chunk().await.map_err(BoxedError::new).context(ExternalSnafu)? { + let chunk = reader.project_chunk(chunk); yield RecordBatch::new(stream_schema.clone(), chunk.columns)? } } diff --git a/src/mito/src/table/test_util/mock_engine.rs b/src/mito/src/table/test_util/mock_engine.rs index bbed1ccd0bdd..c84afd2d65d4 100644 --- a/src/mito/src/table/test_util/mock_engine.rs +++ b/src/mito/src/table/test_util/mock_engine.rs @@ -43,7 +43,7 @@ pub struct MockChunkReader { impl ChunkReader for MockChunkReader { type Error = MockError; - fn schema(&self) -> &SchemaRef { + fn user_schema(&self) -> &SchemaRef { &self.schema } @@ -69,6 +69,10 @@ impl ChunkReader for MockChunkReader { Ok(Some(Chunk::new(columns))) } + + fn project_chunk(&self, chunk: Chunk) -> Chunk { + chunk + } } pub struct MockSnapshot { diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index d3b72b8e4447..d11691854cfb 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -24,7 +24,7 @@ use table::predicate::{Predicate, TimeRangePredicateBuilder}; use crate::error::{self, Error, Result}; use crate::memtable::{IterContext, MemtableRef}; -use crate::read::{BoxedBatchReader, DedupReader, MergeReaderBuilder}; +use crate::read::{Batch, BoxedBatchReader, DedupReader, MergeReaderBuilder}; use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef}; use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor}; @@ -41,7 +41,7 @@ pub struct ChunkReaderImpl { impl ChunkReader for ChunkReaderImpl { type Error = Error; - fn schema(&self) -> &SchemaRef { + fn user_schema(&self) -> &SchemaRef { self.schema.projected_user_schema() } @@ -50,10 +50,14 @@ impl ChunkReader for ChunkReaderImpl { Some(b) => b, None => return Ok(None), }; + Ok(Some(Chunk::new(batch.columns))) + } - let chunk = self.schema.batch_to_chunk(&batch); - - Ok(Some(chunk)) + fn project_chunk(&self, chunk: Chunk) -> Chunk { + let batch = Batch { + columns: chunk.columns, + }; + self.schema.batch_to_chunk(&batch) } } diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index ad9795c13545..11bb3db387e7 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -17,23 +17,26 @@ use datafusion_common::ScalarValue; use datafusion_expr::{BinaryExpr, Operator}; use crate::chunk::{ChunkReaderBuilder, ChunkReaderImpl}; +use crate::error; use crate::schema::RegionSchemaRef; use crate::sst::{AccessLayerRef, FileHandle}; +/// Builds an SST reader that only reads rows within given time range. pub(crate) async fn build_sst_reader( schema: RegionSchemaRef, sst_layer: AccessLayerRef, files: &[FileHandle], lower_sec_inclusive: i64, upper_sec_exclusive: i64, -) -> ChunkReaderImpl { +) -> error::Result { let ts_col_name = schema .user_schema() .timestamp_column() .unwrap() .name .clone(); - let reader = ChunkReaderBuilder::new(schema, sst_layer) + + ChunkReaderBuilder::new(schema, sst_layer) .pick_ssts(files) .filters(vec![build_time_range_filter( lower_sec_inclusive, @@ -42,9 +45,6 @@ pub(crate) async fn build_sst_reader( )]) .build() .await - .unwrap(); - - reader } fn build_time_range_filter(low_sec: i64, high_sec: i64, ts_col_name: &str) -> Expr { @@ -80,8 +80,10 @@ fn build_time_range_filter(low_sec: i64, high_sec: i64, ts_col_name: &str) -> Ex #[cfg(test)] mod tests { + use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; + use common_time::Timestamp; use datatypes::prelude::{LogicalTypeId, ScalarVector, ScalarVectorBuilder}; use datatypes::timestamp::TimestampMillisecond; use datatypes::vectors::{ @@ -90,6 +92,7 @@ mod tests { use object_store::backend::fs::Builder; use object_store::ObjectStore; use store_api::storage::{ChunkReader, OpType, SequenceNumber}; + use tempdir::TempDir; use super::*; use crate::memtable::{ @@ -98,7 +101,7 @@ mod tests { use crate::metadata::RegionMetadata; use crate::sst; use crate::sst::parquet::{ParquetWriter, Source}; - use crate::sst::{FileMeta, FsAccessLayer, SstInfo}; + use crate::sst::{FileMeta, FsAccessLayer, SstInfo, WriteOptions}; use crate::test_util::descriptor_util::RegionDescBuilder; fn schema_for_test() -> RegionSchemaRef { @@ -161,20 +164,56 @@ mod tests { async fn write_sst( sst_file_name: &str, schema: RegionSchemaRef, + seq: &AtomicU64, object_store: ObjectStore, ts: &[i64], + ops: &[OpType], ) -> FileHandle { let memtable = DefaultMemtableBuilder::default().build(schema.clone()); + let mut breaks = ops + .iter() + .zip(ops.iter().skip(1)) + .enumerate() + .filter_map( + |(idx, (prev, next))| { + if prev != next { + Some(idx + 1) + } else { + None + } + }, + ) + .collect::>(); - let values = ts.iter().map(|i| (*i) as u64).map(Some).collect::>(); + breaks.insert(0, 0); + breaks.push(ts.len()); - write_kvs( - &*memtable, - 10, // sequence - OpType::Put, - ts, // keys - &values, // values - ); + for i in 0..breaks.len() - 1 { + let op = ops[i]; + let seg_len = breaks[i + 1] - breaks[i]; + let ts_seg = ts + .iter() + .skip(breaks[i]) + .take(seg_len) + .copied() + .collect::>(); + let value_seg = ts + .iter() + .skip(breaks[i]) + .take(seg_len) + .map(|i| (*i) as u64) + .map(Some) + .collect::>(); + + write_kvs( + &*memtable, + seq.load(Ordering::Relaxed), // sequence + op, + &ts_seg, // keys + &value_seg, // values + ); + seq.fetch_add(1, Ordering::Relaxed); + } let iter = memtable.iter(&IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); @@ -186,12 +225,14 @@ mod tests { .write_sst(&sst::WriteOptions::default()) .await .unwrap(); - FileHandle::new(FileMeta { + let handle = FileHandle::new(FileMeta { file_name: sst_file_name.to_string(), level: 0, start_timestamp, end_timestamp, - }) + }); + seq.fetch_add(1, Ordering::Relaxed); + handle } async fn check_reads( @@ -209,7 +250,8 @@ mod tests { lower_sec_inclusive, upper_sec_exclusive, ) - .await; + .await + .unwrap(); let mut res = vec![]; while let Some(f) = reader.next_chunk().await.unwrap() { @@ -224,25 +266,41 @@ mod tests { #[tokio::test] async fn test_sst_reader() { - // let dir = TempDir::new("write_parquet").unwrap(); - // let path = dir.path().to_str().unwrap(); - let path = "/Users/lei/parquet"; + let dir = TempDir::new("write_parquet").unwrap(); + let path = dir.path().to_str().unwrap(); let backend = Builder::default().root(path).build().unwrap(); let object_store = ObjectStore::new(backend); + let seq = AtomicU64::new(0); let schema = schema_for_test(); let file1 = write_sst( "a.parquet", schema.clone(), + &seq, object_store.clone(), &[1000, 2000, 3000, 4001, 5001], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + ], ) .await; let file2 = write_sst( "b.parquet", schema.clone(), + &seq, object_store.clone(), &[4002, 5002, 6000, 7000, 8000], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + ], ) .await; let sst_layer = Arc::new(FsAccessLayer::new("./", object_store)); @@ -261,4 +319,148 @@ mod tests { check_reads(schema, sst_layer, &files, 1, 2, &[1000]).await; } + + async fn read_file( + files: &[FileHandle], + schema: RegionSchemaRef, + sst_layer: AccessLayerRef, + ) -> Vec { + let mut timestamps = vec![]; + let mut reader = build_sst_reader(schema, sst_layer, files, i64::MIN, i64::MAX) + .await + .unwrap(); + while let Some(chunk) = reader.next_chunk().await.unwrap() { + let ts = chunk.columns[0] + .as_any() + .downcast_ref::() + .unwrap(); + timestamps.extend(ts.iter_data().map(|t| t.unwrap().0.value())); + } + timestamps + } + + /// Writes rows into file i1/i2 and splits these rows into sst file o1/o2/o3, + /// and check the output contains the same data as input files. + #[tokio::test] + async fn test_sst_split() { + let dir = TempDir::new("write_parquet").unwrap(); + let path = dir.path().to_str().unwrap(); + let backend = Builder::default().root(path).build().unwrap(); + let object_store = ObjectStore::new(backend); + + let schema = schema_for_test(); + let seq = AtomicU64::new(0); + let file1 = write_sst( + "i1.parquet", + schema.clone(), + &seq, + object_store.clone(), + &[1000, 2000, 3000, 4001, 5001], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + ], + ) + .await; + + // in file2 we delete the row with timestamp 1000. + let file2 = write_sst( + "i2.parquet", + schema.clone(), + &seq, + object_store.clone(), + &[1000, 5002, 6000, 7000, 8000], + &[ + OpType::Delete, // a deletion + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + ], + ) + .await; + let sst_layer = Arc::new(FsAccessLayer::new("./", object_store.clone())); + let input_files = vec![file2, file1]; + + let reader1 = build_sst_reader(schema.clone(), sst_layer.clone(), &input_files, 0, 3) + .await + .unwrap(); + let reader2 = build_sst_reader(schema.clone(), sst_layer.clone(), &input_files, 3, 6) + .await + .unwrap(); + let reader3 = build_sst_reader(schema.clone(), sst_layer.clone(), &input_files, 6, 10) + .await + .unwrap(); + + let opts = WriteOptions {}; + let s1 = ParquetWriter::new( + "./o1.parquet", + Source::Reader(reader1), + object_store.clone(), + ) + .write_sst(&opts) + .await + .unwrap(); + assert_eq!( + SstInfo { + start_timestamp: Some(Timestamp::new_millisecond(2000)), + end_timestamp: Some(Timestamp::new_millisecond(2000)), + }, + s1 + ); + + let s2 = ParquetWriter::new( + "./o2.parquet", + Source::Reader(reader2), + object_store.clone(), + ) + .write_sst(&opts) + .await + .unwrap(); + assert_eq!( + SstInfo { + start_timestamp: Some(Timestamp::new_millisecond(3000)), + end_timestamp: Some(Timestamp::new_millisecond(5002)), + }, + s2 + ); + + let s3 = ParquetWriter::new( + "./o3.parquet", + Source::Reader(reader3), + object_store.clone(), + ) + .write_sst(&opts) + .await + .unwrap(); + + assert_eq!( + SstInfo { + start_timestamp: Some(Timestamp::new_millisecond(6000)), + end_timestamp: Some(Timestamp::new_millisecond(8000)), + }, + s3 + ); + + let output_files = ["o1.parquet", "o2.parquet", "o3.parquet"] + .into_iter() + .map(|f| { + FileHandle::new(FileMeta { + file_name: f.to_string(), + start_timestamp: None, + end_timestamp: None, + level: 1, + }) + }) + .collect::>(); + + let timestamps_in_inputs = read_file(&input_files, schema.clone(), sst_layer.clone()).await; + let timestamps_in_outputs = + read_file(&output_files, schema.clone(), sst_layer.clone()).await; + + assert_eq!(timestamps_in_outputs, timestamps_in_inputs); + } } diff --git a/src/storage/src/read.rs b/src/storage/src/read.rs index 53a701746c0a..ebc1f3fe6eeb 100644 --- a/src/storage/src/read.rs +++ b/src/storage/src/read.rs @@ -38,7 +38,7 @@ pub struct Batch { /// /// Columns follow the same order convention of region schema: /// key, value, internal columns. - columns: Vec, + pub columns: Vec, } impl Batch { diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index fd2f2f4548d8..61f393d70ed1 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -115,10 +115,11 @@ impl TesterBase { let mut reader = resp.reader; let metadata = self.region.in_memory_metadata(); - assert_eq!(metadata.schema(), reader.schema()); + assert_eq!(metadata.schema(), reader.user_schema()); let mut dst = Vec::new(); while let Some(chunk) = reader.next_chunk().await.unwrap() { + let chunk = reader.project_chunk(chunk); append_chunk_to(&chunk, &mut dst); } diff --git a/src/storage/src/region/tests/alter.rs b/src/storage/src/region/tests/alter.rs index 26daeeda3c8d..2894ca4f19c8 100644 --- a/src/storage/src/region/tests/alter.rs +++ b/src/storage/src/region/tests/alter.rs @@ -179,10 +179,11 @@ impl AlterTester { let mut reader = resp.reader; let metadata = self.base().region.in_memory_metadata(); - assert_eq!(metadata.schema(), reader.schema()); + assert_eq!(metadata.schema(), reader.user_schema()); let mut dst = Vec::new(); while let Some(chunk) = reader.next_chunk().await.unwrap() { + let chunk = reader.project_chunk(chunk); append_chunk_to(&chunk, &mut dst); } diff --git a/src/storage/src/region/tests/projection.rs b/src/storage/src/region/tests/projection.rs index b1a9feb6c0be..d1f5e1976d8e 100644 --- a/src/storage/src/region/tests/projection.rs +++ b/src/storage/src/region/tests/projection.rs @@ -158,6 +158,7 @@ impl ProjectionTester { let mut dst = Vec::new(); while let Some(chunk) = reader.next_chunk().await.unwrap() { + let chunk = reader.project_chunk(chunk); append_chunk_to(&chunk, &mut dst); } diff --git a/src/storage/src/schema/projected.rs b/src/storage/src/schema/projected.rs index 9878a90f30bf..e1c2de08820e 100644 --- a/src/storage/src/schema/projected.rs +++ b/src/storage/src/schema/projected.rs @@ -542,6 +542,21 @@ mod tests { assert!(!selected[2]); } + #[test] + fn test_find_unique_with_op() { + let schema = read_util::new_projected_schema(); + let mut selected = BitVec::repeat(false, 3); + let batch = read_util::new_full_kv_batch(&[ + (1001, 1, 3, OpType::Put), + (1000, 1, 2, OpType::Delete), + (1000, 1, 1, OpType::Put), + ]); + schema.find_unique(&batch, &mut selected, None); + assert!(selected[0]); + assert!(selected[1]); + assert!(!selected[2]); + } + #[test] fn test_filter_batch() { let schema = read_util::new_projected_schema(); diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 07fda1a14aae..008401c3ca53 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -240,7 +240,7 @@ pub struct ReadOptions { pub time_range: TimestampRange, } -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub struct SstInfo { pub time_range: Option<(Timestamp, Timestamp)>, } diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index adf40fee8b90..c5f64f5982f1 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -438,8 +438,11 @@ impl BatchReader for ChunkStream { } } +/// Parquet writer data source. pub enum Source { + /// Writes rows from memtable to parquet Iter(BoxedBatchIterator), + /// Writes row from ChunkReaderImpl (maybe a set of SSTs) to parquet. Reader(ChunkReaderImpl), } diff --git a/src/store-api/src/storage/chunk.rs b/src/store-api/src/storage/chunk.rs index db6b46ad94b7..24c391f703b3 100644 --- a/src/store-api/src/storage/chunk.rs +++ b/src/store-api/src/storage/chunk.rs @@ -37,8 +37,12 @@ pub trait ChunkReader: Send { type Error: ErrorExt + Send + Sync; /// Schema of the chunks returned by this reader. - fn schema(&self) -> &SchemaRef; + /// This schema does not contain internal columns. + fn user_schema(&self) -> &SchemaRef; /// Fetch next chunk from the reader. async fn next_chunk(&mut self) -> Result, Self::Error>; + + // project the chunk according to required projection. + fn project_chunk(&self, chunk: Chunk) -> Chunk; } From f5336e9b5b0abc64ecdae61b457b51ccb98f384d Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sun, 12 Feb 2023 19:18:00 +0800 Subject: [PATCH 04/17] chore: rebase develop --- src/storage/src/compaction/writer.rs | 29 ++++++++++++++-------------- src/storage/src/sst/parquet.rs | 14 ++++++++------ 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index 11bb3db387e7..79dc17670b98 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -218,18 +218,14 @@ mod tests { let iter = memtable.iter(&IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); - let SstInfo { - start_timestamp, - end_timestamp, - } = writer + let SstInfo { time_range } = writer .write_sst(&sst::WriteOptions::default()) .await .unwrap(); let handle = FileHandle::new(FileMeta { file_name: sst_file_name.to_string(), + time_range, level: 0, - start_timestamp, - end_timestamp, }); seq.fetch_add(1, Ordering::Relaxed); handle @@ -406,8 +402,10 @@ mod tests { .unwrap(); assert_eq!( SstInfo { - start_timestamp: Some(Timestamp::new_millisecond(2000)), - end_timestamp: Some(Timestamp::new_millisecond(2000)), + time_range: Some(( + Timestamp::new_millisecond(2000), + Timestamp::new_millisecond(2000) + )), }, s1 ); @@ -422,8 +420,10 @@ mod tests { .unwrap(); assert_eq!( SstInfo { - start_timestamp: Some(Timestamp::new_millisecond(3000)), - end_timestamp: Some(Timestamp::new_millisecond(5002)), + time_range: Some(( + Timestamp::new_millisecond(3000), + Timestamp::new_millisecond(5002) + )), }, s2 ); @@ -439,8 +439,10 @@ mod tests { assert_eq!( SstInfo { - start_timestamp: Some(Timestamp::new_millisecond(6000)), - end_timestamp: Some(Timestamp::new_millisecond(8000)), + time_range: Some(( + Timestamp::new_millisecond(6000), + Timestamp::new_millisecond(8000) + )), }, s3 ); @@ -450,9 +452,8 @@ mod tests { .map(|f| { FileHandle::new(FileMeta { file_name: f.to_string(), - start_timestamp: None, - end_timestamp: None, level: 1, + time_range: None, }) }) .collect::>(); diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index c5f64f5982f1..9a428818a736 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -800,16 +800,18 @@ mod tests { let iter = memtable.iter(&IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); - let SstInfo { - start_timestamp, - end_timestamp, - } = writer + let SstInfo { time_range } = writer .write_sst(&sst::WriteOptions::default()) .await .unwrap(); - assert_eq!(Some(Timestamp::new_millisecond(1000)), start_timestamp); - assert_eq!(Some(Timestamp::new_millisecond(3001)), end_timestamp); + assert_eq!( + Some(( + Timestamp::new_millisecond(1000), + Timestamp::new_millisecond(3001) + )), + time_range + ); let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1, 0, 3, 2])).unwrap()); From d90fddcb01a903a03676171a5f0eeff3d126c9c7 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sun, 12 Feb 2023 22:13:08 +0800 Subject: [PATCH 05/17] wip: compile --- src/storage/src/compaction/dedup_deque.rs | 9 +- src/storage/src/compaction/picker.rs | 19 +++- src/storage/src/compaction/scheduler.rs | 47 ++++++--- src/storage/src/compaction/task.rs | 110 ++++++++++++++++++++-- src/storage/src/sst.rs | 15 ++- 5 files changed, 172 insertions(+), 28 deletions(-) diff --git a/src/storage/src/compaction/dedup_deque.rs b/src/storage/src/compaction/dedup_deque.rs index f129d5556a39..391f645f413d 100644 --- a/src/storage/src/compaction/dedup_deque.rs +++ b/src/storage/src/compaction/dedup_deque.rs @@ -18,13 +18,18 @@ use std::fmt::{Debug, Formatter}; use std::hash::Hash; /// Deque with key deduplication. -#[derive(Default)] pub struct DedupDeque { deque: VecDeque, existing: HashMap, } impl DedupDeque { + pub fn new_empty() -> Self { + Self { + deque: VecDeque::new(), + existing: HashMap::new(), + } + } /// Pushes a key value to the back of deque. /// Returns true if the deque does not already contain value with the same key, otherwise /// returns false. @@ -83,7 +88,7 @@ mod tests { #[test] fn test_dedup_deque() { - let mut deque = DedupDeque::default(); + let mut deque = DedupDeque::new_empty(); assert!(deque.push_back(1, "hello".to_string())); assert_eq!(1, deque.len()); assert!(deque.push_back(2, "world".to_string())); diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index 871de4884f1b..62598c08dc28 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::{Arc, Mutex}; + use common_telemetry::debug; +use store_api::logstore::LogStore; use crate::compaction::scheduler::CompactionRequestImpl; use crate::compaction::strategy::StrategyRef; @@ -26,7 +29,7 @@ pub trait Picker: Send + 'static { pub struct PickerContext {} -/// L0 -> L1 all-to-all compaction based on time windows. +/// L0 -> L1 compaction based on time windows. pub(crate) struct SimplePicker { strategy: StrategyRef, } @@ -55,8 +58,18 @@ impl Picker for SimplePicker { return Ok(None); } - debug!("Found SST files to compact {:?}", outputs); - // TODO(hl): build compaction task + debug!( + "Found SST files to compact {:?} on level: {}", + outputs, level_num + ); + return Ok(Some(CompactionTaskImpl { + schema: req.schema(), + sst_layer: req.sst_layer().clone(), + outputs, + writer: todo!(), + version: todo!(), + compacted_inputs: Arc::new(Mutex::new(vec![])), + })); } Ok(None) diff --git a/src/storage/src/compaction/scheduler.rs b/src/storage/src/compaction/scheduler.rs index 458655fc744a..930145964b4c 100644 --- a/src/storage/src/compaction/scheduler.rs +++ b/src/storage/src/compaction/scheduler.rs @@ -30,19 +30,46 @@ use crate::compaction::rate_limit::{ }; use crate::compaction::task::CompactionTask; use crate::error::{Result, StopCompactionSchedulerSnafu}; -use crate::version::LevelMetasRef; +use crate::region::RegionWriterRef; +use crate::schema::RegionSchemaRef; +use crate::sst::AccessLayerRef; +use crate::version::{LevelMetasRef, VersionControlRef}; /// Table compaction request. -#[derive(Default)] pub struct CompactionRequestImpl { table_id: TableId, levels: LevelMetasRef, + schema: RegionSchemaRef, + sst_layer: AccessLayerRef, + writer: RegionWriterRef, + version: VersionControlRef, } impl CompactionRequestImpl { + #[inline] pub fn levels(&self) -> &LevelMetasRef { &self.levels } + + #[inline] + pub fn schema(&self) -> RegionSchemaRef { + self.schema.clone() + } + + #[inline] + pub fn sst_layer(&self) -> &AccessLayerRef { + &self.sst_layer + } + + #[inline] + pub fn writer(&self) -> &RegionWriterRef { + &self.writer + } + + #[inline] + pub fn version(&self) -> &VersionControlRef { + &self.version + } } impl CompactionRequest for CompactionRequestImpl { @@ -52,7 +79,7 @@ impl CompactionRequest for CompactionRequestImpl { } } -pub trait CompactionRequest: Send + Sync + Default + 'static { +pub trait CompactionRequest: Send + Sync + 'static { fn table_id(&self) -> TableId; } @@ -128,7 +155,7 @@ where P: Picker + Send + Sync, { let request_queue: Arc>> = - Arc::new(RwLock::new(DedupDeque::default())); + Arc::new(RwLock::new(DedupDeque::new_empty())); let cancel_token = CancellationToken::new(); let task_notifier = Arc::new(Notify::new()); @@ -296,7 +323,7 @@ mod tests { #[tokio::test] async fn test_schedule_handler() { common_telemetry::init_default_ut_logging(); - let queue = Arc::new(RwLock::new(DedupDeque::default())); + let queue = Arc::new(RwLock::new(DedupDeque::new_empty())); let latch = Arc::new(CountdownLatch::new(2)); let latch_cloned = latch.clone(); let picker = MockPicker::new(vec![Arc::new(move || { @@ -316,15 +343,9 @@ mod tests { let handler_cloned = handler.clone(); common_runtime::spawn_bg(async move { handler_cloned.run().await }); - queue - .write() - .unwrap() - .push_back(1, CompactionRequestImpl::default()); + queue.write().unwrap().push_back(1, MockRequest::default()); handler.task_notifier.notify_one(); - queue - .write() - .unwrap() - .push_back(2, CompactionRequestImpl::default()); + queue.write().unwrap().push_back(2, MockRequest::default()); handler.task_notifier.notify_one(); tokio::time::timeout(Duration::from_secs(1), latch.wait()) diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index e80d238f3046..b0d383d23e6c 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -12,23 +12,90 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::{Arc, Mutex}; + +use object_store::ObjectStore; +use store_api::logstore::LogStore; +use uuid::Uuid; + +use crate::compaction::writer::build_sst_reader; use crate::error::Result; -use crate::sst::{FileHandle, Level}; +use crate::manifest::action::RegionEdit; +use crate::manifest::region::RegionManifest; +use crate::region::RegionWriterRef; +use crate::schema::RegionSchemaRef; +use crate::sst::parquet::{ParquetWriter, Source}; +use crate::sst::{AccessLayerRef, FileHandle, FileMeta, Level, SstInfo, WriteOptions}; +use crate::version::VersionControlRef; +use crate::wal::Wal; #[async_trait::async_trait] pub trait CompactionTask: Send + Sync + 'static { - async fn run(&self) -> Result<()>; + async fn run(self) -> Result<()>; } #[allow(unused)] pub(crate) struct CompactionTaskImpl { - inputs: Vec, + pub schema: RegionSchemaRef, + pub sst_layer: AccessLayerRef, + pub outputs: Vec, + pub writer: RegionWriterRef, + pub version: VersionControlRef, + pub compacted_inputs: Arc>>, +} + +impl CompactionTaskImpl { + async fn merge_ssts(&mut self) -> Result> { + let mut futs = Vec::with_capacity(self.outputs.len()); + for output in self.outputs.drain(..) { + let schema = self.schema.clone(); + let sst_layer = self.sst_layer.clone(); + let object_store = self.sst_layer.object_store(); + + let compacted = self.compacted_inputs.clone(); + futs.push(async move { + match output.run(schema, sst_layer, object_store).await { + Ok(meta) => { + let mut compacted = compacted.lock().unwrap(); + compacted.extend(output.inputs.iter().map(|f| FileMeta { + file_name: f.file_name().to_string(), + time_range: f.time_range().clone(), + level: f.level(), + })); + Ok(meta) + } + Err(e) => Err(e), + } + }); + } + + futures::future::join_all(futs) + .await + .into_iter() + .collect::>>() + } + + async fn write_manifest_and_apply(&self, files: Vec) -> Result<()> { + let region_version = self.version.metadata().version(); + let flushed_sequence = self.version.current().flushed_sequence(); + + let edit = RegionEdit { + region_version, + flushed_sequence, + files_to_add: files, + files_to_remove: vec![], + }; + + todo!() + // self.writer.write_edit_and_apply() + } } #[async_trait::async_trait] impl CompactionTask for CompactionTaskImpl { - // TODO(hl): Actual SST compaction tasks - async fn run(&self) -> Result<()> { + async fn run(mut self) -> Result<()> { + let ssts = self.merge_ssts().await?; + Ok(()) } } @@ -55,6 +122,37 @@ pub struct CompactionOutput { pub(crate) inputs: Vec, } +impl CompactionOutput { + async fn run( + &self, + schema: RegionSchemaRef, + sst_layer: AccessLayerRef, + object_store: ObjectStore, + ) -> Result { + let reader = build_sst_reader( + schema, + sst_layer, + &self.inputs, + self.bucket_bound, + self.bucket_bound + self.bucket, + ) + .await + .unwrap(); + let output_file_name = format!("{}.parquet", Uuid::new_v4().hyphenated()); + let opts = WriteOptions {}; + let SstInfo { time_range } = + ParquetWriter::new(&output_file_name, Source::Reader(reader), object_store) + .write_sst(&opts) + .await?; + + Ok(FileMeta { + file_name: output_file_name, + time_range, + level: self.output_level, + }) + } +} + #[cfg(test)] pub mod tests { use std::sync::Arc; @@ -75,7 +173,7 @@ pub mod tests { #[async_trait::async_trait] impl CompactionTask for NoopCompactionTask { - async fn run(&self) -> Result<()> { + async fn run(self) -> Result<()> { for cb in &self.cbs { cb() } diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 008401c3ca53..fa4846f8dd27 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -81,9 +81,9 @@ impl LevelMetas { pub fn merge(&self, files_to_add: impl Iterator) -> LevelMetas { let mut merged = self.clone(); for file in files_to_add { - let level = file.level_index(); + let level = file.level(); - merged.levels[level].add_file(file); + merged.levels[level as usize].add_file(file); } // TODO(yingwen): Support file removal. @@ -176,8 +176,8 @@ impl FileHandle { /// Returns level as usize so it can be used as index. #[inline] - pub fn level_index(&self) -> usize { - self.inner.meta.level.into() + pub fn level(&self) -> Level { + self.inner.meta.level } #[inline] @@ -258,6 +258,9 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug { /// Read SST file with given `file_name` and schema. async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result; + + /// Returns backend object store. + fn object_store(&self) -> ObjectStore; } pub type AccessLayerRef = Arc; @@ -311,4 +314,8 @@ impl AccessLayer for FsAccessLayer { let stream = reader.chunk_stream().await?; Ok(Box::new(stream)) } + + fn object_store(&self) -> ObjectStore { + self.object_store.clone() + } } From c6a8aab68f9d7be4bbe49a371131485aea87803a Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sun, 12 Feb 2023 22:40:15 +0800 Subject: [PATCH 06/17] wip: task logic --- src/storage/src/compaction/picker.rs | 31 +++++----- src/storage/src/compaction/scheduler.rs | 50 +++++------------ src/storage/src/compaction/task.rs | 75 ++++++++++++++++--------- src/storage/src/flush.rs | 2 +- src/storage/src/region/writer.rs | 4 +- src/storage/src/sst.rs | 2 +- 6 files changed, 83 insertions(+), 81 deletions(-) diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index 62598c08dc28..9b92d663a220 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::{Arc, Mutex}; +use std::marker::PhantomData; use common_telemetry::debug; use store_api::logstore::LogStore; @@ -30,24 +30,28 @@ pub trait Picker: Send + 'static { pub struct PickerContext {} /// L0 -> L1 compaction based on time windows. -pub(crate) struct SimplePicker { +pub(crate) struct SimplePicker { strategy: StrategyRef, + _phantom_data: PhantomData, } #[allow(unused)] -impl SimplePicker { +impl SimplePicker { pub fn new(strategy: StrategyRef) -> Self { - Self { strategy } + Self { + strategy, + _phantom_data: Default::default(), + } } } -impl Picker for SimplePicker { +impl Picker, CompactionTaskImpl> for SimplePicker { fn pick( &self, ctx: &PickerContext, - req: &CompactionRequestImpl, - ) -> crate::error::Result> { - let levels = req.levels(); + req: &CompactionRequestImpl, + ) -> crate::error::Result>> { + let levels = &req.levels; for level_num in 0..levels.level_num() { let level = levels.level(level_num as u8); @@ -63,12 +67,13 @@ impl Picker for SimplePicker { outputs, level_num ); return Ok(Some(CompactionTaskImpl { - schema: req.schema(), - sst_layer: req.sst_layer().clone(), + schema: req.schema.clone(), + sst_layer: req.sst_layer.clone(), outputs, - writer: todo!(), - version: todo!(), - compacted_inputs: Arc::new(Mutex::new(vec![])), + writer: req.writer.clone(), + shared_data: req.shared.clone(), + wal: req.wal.clone(), + manifest: req.manifest.clone(), })); } diff --git a/src/storage/src/compaction/scheduler.rs b/src/storage/src/compaction/scheduler.rs index 930145964b4c..c9862e367957 100644 --- a/src/storage/src/compaction/scheduler.rs +++ b/src/storage/src/compaction/scheduler.rs @@ -18,6 +18,7 @@ use std::sync::{Arc, Mutex, RwLock}; use async_trait::async_trait; use common_telemetry::{debug, info}; use snafu::ResultExt; +use store_api::logstore::LogStore; use table::metadata::TableId; use tokio::sync::Notify; use tokio::task::JoinHandle; @@ -30,49 +31,26 @@ use crate::compaction::rate_limit::{ }; use crate::compaction::task::CompactionTask; use crate::error::{Result, StopCompactionSchedulerSnafu}; -use crate::region::RegionWriterRef; +use crate::manifest::region::RegionManifest; +use crate::region::{RegionWriterRef, SharedDataRef}; use crate::schema::RegionSchemaRef; use crate::sst::AccessLayerRef; -use crate::version::{LevelMetasRef, VersionControlRef}; +use crate::version::LevelMetasRef; +use crate::wal::Wal; /// Table compaction request. -pub struct CompactionRequestImpl { +pub struct CompactionRequestImpl { table_id: TableId, - levels: LevelMetasRef, - schema: RegionSchemaRef, - sst_layer: AccessLayerRef, - writer: RegionWriterRef, - version: VersionControlRef, + pub levels: LevelMetasRef, + pub schema: RegionSchemaRef, + pub sst_layer: AccessLayerRef, + pub writer: RegionWriterRef, + pub shared: SharedDataRef, + pub manifest: RegionManifest, + pub wal: Wal, } -impl CompactionRequestImpl { - #[inline] - pub fn levels(&self) -> &LevelMetasRef { - &self.levels - } - - #[inline] - pub fn schema(&self) -> RegionSchemaRef { - self.schema.clone() - } - - #[inline] - pub fn sst_layer(&self) -> &AccessLayerRef { - &self.sst_layer - } - - #[inline] - pub fn writer(&self) -> &RegionWriterRef { - &self.writer - } - - #[inline] - pub fn version(&self) -> &VersionControlRef { - &self.version - } -} - -impl CompactionRequest for CompactionRequestImpl { +impl CompactionRequest for CompactionRequestImpl { #[inline] fn table_id(&self) -> TableId { self.table_id diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index b0d383d23e6c..24d971f94793 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::{Arc, Mutex}; +use common_telemetry::info; use object_store::ObjectStore; use store_api::logstore::LogStore; use uuid::Uuid; @@ -22,11 +24,10 @@ use crate::compaction::writer::build_sst_reader; use crate::error::Result; use crate::manifest::action::RegionEdit; use crate::manifest::region::RegionManifest; -use crate::region::RegionWriterRef; +use crate::region::{RegionWriterRef, SharedDataRef}; use crate::schema::RegionSchemaRef; use crate::sst::parquet::{ParquetWriter, Source}; use crate::sst::{AccessLayerRef, FileHandle, FileMeta, Level, SstInfo, WriteOptions}; -use crate::version::VersionControlRef; use crate::wal::Wal; #[async_trait::async_trait] @@ -35,33 +36,38 @@ pub trait CompactionTask: Send + Sync + 'static { } #[allow(unused)] -pub(crate) struct CompactionTaskImpl { +pub(crate) struct CompactionTaskImpl { pub schema: RegionSchemaRef, pub sst_layer: AccessLayerRef, pub outputs: Vec, pub writer: RegionWriterRef, - pub version: VersionControlRef, - pub compacted_inputs: Arc>>, + pub shared_data: SharedDataRef, + pub wal: Wal, + pub manifest: RegionManifest, } -impl CompactionTaskImpl { - async fn merge_ssts(&mut self) -> Result> { +impl CompactionTaskImpl { + /// Compacts inputs SSTs, returns `(output file, compacted input file)`. + async fn merge_ssts(&mut self) -> Result<(Vec, Vec)> { let mut futs = Vec::with_capacity(self.outputs.len()); + let compacted_inputs = Arc::new(Mutex::new(HashSet::new())); + for output in self.outputs.drain(..) { let schema = self.schema.clone(); let sst_layer = self.sst_layer.clone(); let object_store = self.sst_layer.object_store(); - - let compacted = self.compacted_inputs.clone(); + let compacted = compacted_inputs.clone(); futs.push(async move { match output.run(schema, sst_layer, object_store).await { Ok(meta) => { - let mut compacted = compacted.lock().unwrap(); - compacted.extend(output.inputs.iter().map(|f| FileMeta { - file_name: f.file_name().to_string(), - time_range: f.time_range().clone(), - level: f.level(), - })); + compacted + .lock() + .unwrap() + .extend(output.inputs.iter().map(|f| FileMeta { + file_name: f.file_name().to_string(), + time_range: f.time_range().clone(), + level: f.level(), + })); Ok(meta) } Err(e) => Err(e), @@ -69,33 +75,46 @@ impl CompactionTaskImpl { }); } - futures::future::join_all(futs) + let outputs = futures::future::join_all(futs) .await .into_iter() - .collect::>>() + .collect::>()?; + let compacted = compacted_inputs.lock().unwrap().drain().collect(); + Ok((outputs, compacted)) } - async fn write_manifest_and_apply(&self, files: Vec) -> Result<()> { - let region_version = self.version.metadata().version(); - let flushed_sequence = self.version.current().flushed_sequence(); + /// Writes updated SST info into manifest. + async fn write_manifest_and_apply( + &self, + output: Vec, + input: Vec, + ) -> Result<()> { + let version = &self.shared_data.version_control; + let region_version = version.metadata().version(); + let flushed_sequence = version.current().flushed_sequence(); let edit = RegionEdit { region_version, flushed_sequence, - files_to_add: files, - files_to_remove: vec![], + files_to_add: output, + files_to_remove: input, }; - - todo!() - // self.writer.write_edit_and_apply() + info!( + "Compacted region: {}, region edit: {:?}", + version.metadata().name(), + edit + ); + self.writer + .write_edit_and_apply(&self.wal, &self.shared_data, &self.manifest, edit, None) + .await } } #[async_trait::async_trait] -impl CompactionTask for CompactionTaskImpl { +impl CompactionTask for CompactionTaskImpl { async fn run(mut self) -> Result<()> { - let ssts = self.merge_ssts().await?; - + let (output, compacted) = self.merge_ssts().await?; + self.write_manifest_and_apply(output, compacted).await?; Ok(()) } } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index abb5f0cd5d17..33cc792a3b24 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -223,7 +223,7 @@ impl FlushJob { &self.shared, &self.manifest, edit, - self.max_memtable_id, + Some(self.max_memtable_id), ) .await?; self.wal.obsolete(self.flush_sequence).await diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 3b53ed421af9..f4266f3987e6 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -95,7 +95,7 @@ impl RegionWriter { shared: &SharedDataRef, manifest: &RegionManifest, edit: RegionEdit, - max_memtable_id: MemtableId, + max_memtable_id: Option, ) -> Result<()> { let _lock = self.version_mutex.lock().await; // HACK: We won't acquire the write lock here because write stall would hold @@ -123,7 +123,7 @@ impl RegionWriter { files_to_add, flushed_sequence: Some(flushed_sequence), manifest_version, - max_memtable_id: Some(max_memtable_id), + max_memtable_id, }; // We could tolerate failure during persisting manifest version to the WAL, since it won't diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index fa4846f8dd27..6f1b8b679edb 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -216,7 +216,7 @@ impl FileHandleInner { } /// Immutable metadata of a sst file. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct FileMeta { pub file_name: String, pub time_range: Option<(Timestamp, Timestamp)>, From 5f95712c064641953b055dd5a3a6c215bc403dff Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sun, 12 Feb 2023 23:36:27 +0800 Subject: [PATCH 07/17] feat: version and manifest update --- src/storage/src/chunk.rs | 8 +- src/storage/src/compaction/strategy.rs | 7 +- src/storage/src/compaction/task.rs | 4 +- src/storage/src/flush.rs | 2 +- src/storage/src/manifest/action.rs | 2 +- src/storage/src/manifest/test_utils.rs | 3 +- src/storage/src/region.rs | 3 +- src/storage/src/region/tests.rs | 2 +- src/storage/src/region/writer.rs | 4 +- src/storage/src/sst.rs | 129 +++++++++++++++++++++++-- src/storage/src/version.rs | 10 +- 11 files changed, 146 insertions(+), 28 deletions(-) diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index d11691854cfb..f528ac8c2614 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -214,9 +214,13 @@ impl ChunkReaderBuilder { } impl Visitor for ChunkReaderBuilder { - fn visit(&mut self, _level: usize, files: &[FileHandle]) -> Result<()> { + fn visit<'a>( + &mut self, + _level: usize, + files: impl Iterator, + ) -> Result<()> { // Now we read all files, so just reserve enough space to hold all files. - self.files_to_read.reserve(files.len()); + // self.files_to_read.reserve(files.len); for file in files { // We can't invoke async functions here, so we collects all files first, and // create the batch reader later in `ChunkReaderBuilder`. diff --git a/src/storage/src/compaction/strategy.rs b/src/storage/src/compaction/strategy.rs index 34e464ffe3ec..b1b11e422738 100644 --- a/src/storage/src/compaction/strategy.rs +++ b/src/storage/src/compaction/strategy.rs @@ -66,12 +66,7 @@ impl Strategy for SimpleTimeWindowStrategy { /// Currently they're files that is not currently under compaction. #[inline] fn find_compactable_files(level: &LevelMeta) -> Vec { - level - .files() - .iter() - .filter(|f| !f.compacting()) - .cloned() - .collect() + level.files().filter(|f| !f.compacting()).cloned().collect() } /// Calculates buckets for files. If file does not contain a time range in metadata, it will be diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index 24d971f94793..935b87a86647 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -90,12 +90,12 @@ impl CompactionTaskImpl { input: Vec, ) -> Result<()> { let version = &self.shared_data.version_control; + // TODO(hl): do we have to update region version? let region_version = version.metadata().version(); - let flushed_sequence = version.current().flushed_sequence(); let edit = RegionEdit { region_version, - flushed_sequence, + flushed_sequence: None, files_to_add: output, files_to_remove: input, }; diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 33cc792a3b24..e61bc244092e 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -212,7 +212,7 @@ impl FlushJob { async fn write_manifest_and_apply(&self, file_metas: &[FileMeta]) -> Result<()> { let edit = RegionEdit { region_version: self.shared.version_control.metadata().version(), - flushed_sequence: self.flush_sequence, + flushed_sequence: Some(self.flush_sequence), files_to_add: file_metas.to_vec(), files_to_remove: Vec::default(), }; diff --git a/src/storage/src/manifest/action.rs b/src/storage/src/manifest/action.rs index 1d56464e9c37..5eeef970cca4 100644 --- a/src/storage/src/manifest/action.rs +++ b/src/storage/src/manifest/action.rs @@ -73,7 +73,7 @@ pub struct RegionRemove { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionEdit { pub region_version: VersionNumber, - pub flushed_sequence: SequenceNumber, + pub flushed_sequence: Option, pub files_to_add: Vec, pub files_to_remove: Vec, } diff --git a/src/storage/src/manifest/test_utils.rs b/src/storage/src/manifest/test_utils.rs index 20da0051b8b9..cf960e78a92e 100644 --- a/src/storage/src/manifest/test_utils.rs +++ b/src/storage/src/manifest/test_utils.rs @@ -30,7 +30,6 @@ pub fn build_region_meta() -> RegionMetadata { desc.try_into().unwrap() } -// TODO(hl): region edit should contain the time range of added files pub fn build_region_edit( sequence: SequenceNumber, files_to_add: &[&str], @@ -38,7 +37,7 @@ pub fn build_region_edit( ) -> RegionEdit { RegionEdit { region_version: 0, - flushed_sequence: sequence, + flushed_sequence: Some(sequence), files_to_add: files_to_add .iter() .map(|f| FileMeta { diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index f471db9d70e7..05e66352fa33 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -344,7 +344,8 @@ impl RegionImpl { if let RegionMetaAction::Edit(e) = action { let edit = VersionEdit { files_to_add: e.files_to_add, - flushed_sequence: Some(e.flushed_sequence), + files_to_remove: e.files_to_remove, + flushed_sequence: e.flushed_sequence, manifest_version, max_memtable_id: None, }; diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 61f393d70ed1..96772c4b1c71 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -332,7 +332,7 @@ async fn test_recover_region_manifets() { assert_eq!(version.flushed_sequence(), 2); assert_eq!(version.manifest_version(), 1); let ssts = version.ssts(); - let files = ssts.levels()[0].files(); + let files = ssts.levels()[0].files().collect::>(); assert_eq!(3, files.len()); for (i, file) in files.iter().enumerate() { assert_eq!(format!("f{}", i + 1), file.file_name()); diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index f4266f3987e6..24607aee83e0 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -112,6 +112,7 @@ impl RegionWriter { ); let files_to_add = edit.files_to_add.clone(); + let files_to_remove = edit.files_to_remove.clone(); let flushed_sequence = edit.flushed_sequence; // Persist the meta action. @@ -121,7 +122,8 @@ impl RegionWriter { let version_edit = VersionEdit { files_to_add, - flushed_sequence: Some(flushed_sequence), + files_to_remove, + flushed_sequence, manifest_version, max_memtable_id, }; diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 6f1b8b679edb..2df269faa1df 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -14,6 +14,7 @@ pub(crate) mod parquet; +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -44,7 +45,11 @@ pub trait Visitor { /// Visit all `files` in `level`. /// /// Now the input `files` are unordered. - fn visit(&mut self, level: usize, files: &[FileHandle]) -> Result<()>; + fn visit<'a>( + &mut self, + level: usize, + files: impl Iterator, + ) -> Result<()>; } /// Metadata of all SSTs under a region. @@ -82,12 +87,21 @@ impl LevelMetas { let mut merged = self.clone(); for file in files_to_add { let level = file.level(); - merged.levels[level as usize].add_file(file); } + merged + } - // TODO(yingwen): Support file removal. - + /// Removes files with given file meta and builds a new [LevelMetas]. + /// + /// # Panics + /// Panics if level of [FileHandle] is greater than [MAX_LEVEL]. + pub fn remove(&self, files_to_remove: impl Iterator) -> LevelMetas { + let mut merged = self.clone(); + for file in files_to_remove { + let level = file.level; + merged.levels[level as usize].remove_file(file); + } merged } @@ -122,23 +136,27 @@ pub struct LevelMeta { /// Handles to the files in this level. // TODO(yingwen): Now for simplicity, files are unordered, maybe sort the files by time range // or use another structure to hold them. - files: Vec, + files: HashMap, } impl LevelMeta { pub fn new_empty(level: Level) -> Self { Self { level, - files: vec![], + files: HashMap::new(), } } fn add_file(&mut self, file: FileHandle) { - self.files.push(file); + self.files.insert(file.file_name().to_string(), file); + } + + fn remove_file(&mut self, file_to_remove: FileMeta) { + self.files.remove(&file_to_remove.file_name); } pub fn visit_level(&self, visitor: &mut V) -> Result<()> { - visitor.visit(self.level.into(), &self.files) + visitor.visit(self.level.into(), self.files.values()) } /// Returns the level of level meta. @@ -147,8 +165,8 @@ impl LevelMeta { self.level } - pub fn files(&self) -> &[FileHandle] { - &self.files + pub fn files(&self) -> impl Iterator { + self.files.values() } } @@ -319,3 +337,94 @@ impl AccessLayer for FsAccessLayer { self.object_store.clone() } } + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::*; + + fn create_meta(name: &str, level: Level) -> FileMeta { + FileMeta { + file_name: name.to_string(), + time_range: None, + level, + } + } + + fn create_handle(name: &str, level: Level) -> FileHandle { + FileHandle::new(create_meta(name, level)) + } + + #[test] + fn test_level_metas_add_and_remove() { + let metas = LevelMetas::new(); + let merged = metas.merge(vec![create_handle("a", 0), create_handle("b", 0)].into_iter()); + + assert_eq!( + HashSet::from(["a".to_string(), "b".to_string()]), + merged + .level(0) + .files() + .map(|f| f.file_name().to_string()) + .collect() + ); + + let merged1 = merged.merge(vec![create_handle("c", 1), create_handle("d", 1)].into_iter()); + assert_eq!( + HashSet::from(["a".to_string(), "b".to_string()]), + merged1 + .level(0) + .files() + .map(|f| f.file_name().to_string()) + .collect() + ); + + assert_eq!( + HashSet::from(["c".to_string(), "d".to_string()]), + merged1 + .level(1) + .files() + .map(|f| f.file_name().to_string()) + .collect() + ); + + let removed1 = merged1.remove(vec![create_meta("a", 0), create_meta("c", 0)].into_iter()); + assert_eq!( + HashSet::from(["b".to_string()]), + removed1 + .level(0) + .files() + .map(|f| f.file_name().to_string()) + .collect() + ); + + assert_eq!( + HashSet::from(["c".to_string(), "d".to_string()]), + removed1 + .level(1) + .files() + .map(|f| f.file_name().to_string()) + .collect() + ); + + let removed2 = removed1.remove(vec![create_meta("c", 1), create_meta("d", 1)].into_iter()); + assert_eq!( + HashSet::from(["b".to_string()]), + removed2 + .level(0) + .files() + .map(|f| f.file_name().to_string()) + .collect() + ); + + assert_eq!( + HashSet::new(), + removed2 + .level(1) + .files() + .map(|f| f.file_name().to_string()) + .collect() + ); + } +} diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index 80dde3bd6daf..973acc91a06f 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -24,6 +24,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use common_telemetry::info; use store_api::manifest::ManifestVersion; use store_api::storage::{SchemaRef, SequenceNumber}; @@ -129,6 +130,7 @@ impl VersionControl { #[derive(Debug)] pub struct VersionEdit { pub files_to_add: Vec, + pub files_to_remove: Vec, pub flushed_sequence: Option, pub manifest_version: ManifestVersion, pub max_memtable_id: Option, @@ -236,8 +238,14 @@ impl Version { let handles_to_add = edit.files_to_add.into_iter().map(FileHandle::new); let merged_ssts = self.ssts.merge(handles_to_add); + let removed_ssts = merged_ssts.remove(edit.files_to_remove.into_iter()); - self.ssts = Arc::new(merged_ssts); + info!( + "After region compaction, region: {}, SST files: {:?}", + self.metadata.id(), + removed_ssts + ); + self.ssts = Arc::new(removed_ssts); } /// Updates metadata of the version. From 5374a376f6c62f5adf90ff55a106d73732e4b74c Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sun, 12 Feb 2023 23:39:58 +0800 Subject: [PATCH 08/17] fix: remove useless as_inner from Timestamp vectors --- src/datatypes/src/vectors/timestamp.rs | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/src/datatypes/src/vectors/timestamp.rs b/src/datatypes/src/vectors/timestamp.rs index d9424556422d..8248811ea35b 100644 --- a/src/datatypes/src/vectors/timestamp.rs +++ b/src/datatypes/src/vectors/timestamp.rs @@ -12,10 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow::array::PrimitiveArray; -use paste::paste; - -use crate::arrow_array; use crate::types::{ TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, @@ -33,19 +29,3 @@ pub type TimestampMicrosecondVectorBuilder = PrimitiveVectorBuilder; pub type TimestampNanosecondVectorBuilder = PrimitiveVectorBuilder; - -macro_rules! impl_as_inner_for_timestamps { - ($($unit: ident), *) => { - $( - paste! { - impl [] { - pub fn as_inner(&self) -> &PrimitiveArray]> { - self.as_arrow() - } - } - } - )* - }; -} - -impl_as_inner_for_timestamps!(Second, Millisecond, Microsecond, Nanosecond); From e03482810978a4c307bcf07d9d30dc455b7a3472 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sun, 12 Feb 2023 23:55:31 +0800 Subject: [PATCH 09/17] feat: mark file compacting --- src/storage/src/compaction/task.rs | 26 +++++++++++++++++++++----- src/storage/src/sst.rs | 6 ++++++ 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index 935b87a86647..b54f6372b743 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -15,7 +15,7 @@ use std::collections::HashSet; use std::sync::{Arc, Mutex}; -use common_telemetry::info; +use common_telemetry::{error, info}; use object_store::ObjectStore; use store_api::logstore::LogStore; use uuid::Uuid; @@ -108,13 +108,30 @@ impl CompactionTaskImpl { .write_edit_and_apply(&self.wal, &self.shared_data, &self.manifest, edit, None) .await } + + /// Mark files are under compaction. + fn mark_files_compacting(&self, compacting: bool) { + for o in &self.outputs { + for input in &o.inputs { + input.set_compacting(compacting); + } + } + } } #[async_trait::async_trait] impl CompactionTask for CompactionTaskImpl { async fn run(mut self) -> Result<()> { - let (output, compacted) = self.merge_ssts().await?; - self.write_manifest_and_apply(output, compacted).await?; + self.mark_files_compacting(true); + match self.merge_ssts().await { + Ok((output, compacted)) => { + self.write_manifest_and_apply(output, compacted).await?; + } + Err(e) => { + self.mark_files_compacting(false); + error!(e; "Failed to compact region: {}", self.shared_data.name()); + } + } Ok(()) } } @@ -155,8 +172,7 @@ impl CompactionOutput { self.bucket_bound, self.bucket_bound + self.bucket, ) - .await - .unwrap(); + .await?; let output_file_name = format!("{}.parquet", Uuid::new_v4().hyphenated()); let opts = WriteOptions {}; let SstInfo { time_range } = diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 2df269faa1df..f4048ba0d649 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -213,6 +213,12 @@ impl FileHandle { pub fn compacting(&self) -> bool { self.inner.compacting.load(Ordering::Relaxed) } + + /// Sets the compacting flag. + #[inline] + pub fn set_compacting(&self, compacting: bool) { + self.inner.compacting.store(compacting, Ordering::Relaxed); + } } /// Actually data of [FileHandle]. From c57ce70c129ba1806199e11041409ae90490084d Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 13 Feb 2023 00:01:15 +0800 Subject: [PATCH 10/17] fix: unit test --- src/storage/src/region/tests.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 96772c4b1c71..a7771e05f20d 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -19,7 +19,7 @@ mod basic; mod flush; mod projection; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use common_telemetry::logging; use datatypes::prelude::{ScalarVector, WrapperType}; @@ -332,11 +332,15 @@ async fn test_recover_region_manifets() { assert_eq!(version.flushed_sequence(), 2); assert_eq!(version.manifest_version(), 1); let ssts = version.ssts(); - let files = ssts.levels()[0].files().collect::>(); + let files = ssts.levels()[0] + .files() + .map(|f| f.file_name().to_string()) + .collect::>(); assert_eq!(3, files.len()); - for (i, file) in files.iter().enumerate() { - assert_eq!(format!("f{}", i + 1), file.file_name()); - } + assert_eq!( + HashSet::from(["f1".to_string(), "f2".to_string(), "f3".to_string()]), + files + ); // check manifest state assert_eq!(3, manifest.last_version()); From 337ff93ee48152dd71168ba2db7a2b07df49c094 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 13 Feb 2023 10:59:01 +0800 Subject: [PATCH 11/17] fix: clippy warnings --- src/storage/src/compaction/picker.rs | 2 +- src/storage/src/compaction/task.rs | 2 +- src/storage/src/compaction/writer.rs | 2 +- src/storage/src/sst.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index 9b92d663a220..fc365c3a8692 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -59,7 +59,7 @@ impl Picker, CompactionTaskImpl> for Si if outputs.is_empty() { debug!("No SST file can be compacted at level {}", level_num); - return Ok(None); + continue; } debug!( diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index b54f6372b743..6ed8cc477fb0 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -65,7 +65,7 @@ impl CompactionTaskImpl { .unwrap() .extend(output.inputs.iter().map(|f| FileMeta { file_name: f.file_name().to_string(), - time_range: f.time_range().clone(), + time_range: *f.time_range(), level: f.level(), })); Ok(meta) diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index 79dc17670b98..afb7ec804663 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -136,7 +136,7 @@ mod tests { assert_eq!(ts.len(), values.len()); let mut key_builders = TimestampMillisecondVectorBuilder::with_capacity(ts.len()); for key in ts { - key_builders.push(Some(key.clone())); + key_builders.push(Some(*key)); } let row_keys = vec![Arc::new(key_builders.finish()) as _]; diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index f4048ba0d649..80a337151749 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -332,7 +332,7 @@ impl AccessLayer for FsAccessLayer { self.object_store.clone(), opts.projected_schema.clone(), opts.predicate.clone(), - opts.time_range.clone(), + opts.time_range, ); let stream = reader.chunk_stream().await?; From f31d2938eb1ff413681518ae9bfa2f705acf3088 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 13 Feb 2023 20:57:54 +0800 Subject: [PATCH 12/17] fix: CR comment --- src/storage/src/chunk.rs | 2 +- src/storage/src/compaction/task.rs | 1 - src/storage/src/compaction/writer.rs | 2 ++ src/storage/src/sst/parquet.rs | 10 ++++++++++ 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index f528ac8c2614..7ecdfa10b421 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -220,7 +220,7 @@ impl Visitor for ChunkReaderBuilder { files: impl Iterator, ) -> Result<()> { // Now we read all files, so just reserve enough space to hold all files. - // self.files_to_read.reserve(files.len); + self.files_to_read.reserve(files.size_hint().0); for file in files { // We can't invoke async functions here, so we collects all files first, and // create the batch reader later in `ChunkReaderBuilder`. diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index 6ed8cc477fb0..d8bd3dcd3cae 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -90,7 +90,6 @@ impl CompactionTaskImpl { input: Vec, ) -> Result<()> { let version = &self.shared_data.version_control; - // TODO(hl): do we have to update region version? let region_version = version.metadata().version(); let edit = RegionEdit { diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index afb7ec804663..e19877444ac4 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -29,6 +29,8 @@ pub(crate) async fn build_sst_reader( lower_sec_inclusive: i64, upper_sec_exclusive: i64, ) -> error::Result { + // TODO(hl): Schemas in different SSTs may differ, thus we should infer + // timestamp column name from Parquet metadata. let ts_col_name = schema .user_schema() .timestamp_column() diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 9a428818a736..1458cf6bf1bf 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -844,6 +844,16 @@ mod tests { ) .await; + // + check_range_read( + sst_file_name, + object_store.clone(), + projected_schema.clone(), + TimestampRange::with_unit(1000, 3000, TimeUnit::Millisecond).unwrap(), + vec![1000, 1000, 1001, 2002, 2003, 2003], + ) + .await; + // read full range check_range_read( sst_file_name, From c9ac637cb72842b9242b7819d1a652b0912eeace Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 13 Feb 2023 22:32:54 +0800 Subject: [PATCH 13/17] chore: according to cr comments, remove visit_levels from LevelMetas --- src/datatypes/src/arrow_array.rs | 5 ----- src/datatypes/src/vectors/primitive.rs | 2 +- src/storage/src/chunk.rs | 29 ++++++++------------------ src/storage/src/compaction/picker.rs | 4 ++-- src/storage/src/sst.rs | 29 -------------------------- 5 files changed, 12 insertions(+), 57 deletions(-) diff --git a/src/datatypes/src/arrow_array.rs b/src/datatypes/src/arrow_array.rs index 220718a09c45..d9b231bdb41e 100644 --- a/src/datatypes/src/arrow_array.rs +++ b/src/datatypes/src/arrow_array.rs @@ -16,8 +16,3 @@ pub type BinaryArray = arrow::array::LargeBinaryArray; pub type MutableBinaryArray = arrow::array::LargeBinaryBuilder; pub type StringArray = arrow::array::StringArray; pub type MutableStringArray = arrow::array::StringBuilder; - -pub use arrow_array::types::{ - TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, - TimestampSecondType, -}; diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 1be6831b1256..d797cf2d2b17 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -96,7 +96,7 @@ impl PrimitiveVector { } } - pub fn as_arrow(&self) -> &PrimitiveArray { + pub(crate) fn as_arrow(&self) -> &PrimitiveArray { &self.array } diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index 7ecdfa10b421..fa4ebf5cc7e7 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -26,7 +26,7 @@ use crate::error::{self, Error, Result}; use crate::memtable::{IterContext, MemtableRef}; use crate::read::{Batch, BoxedBatchReader, DedupReader, MergeReaderBuilder}; use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef}; -use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor}; +use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions}; /// Chunk reader implementation. // Now we use async-trait to implement the chunk reader, which is easier to implement than @@ -132,7 +132,14 @@ impl ChunkReaderBuilder { /// Picks all SSTs in all levels pub fn pick_all_ssts(mut self, ssts: &LevelMetas) -> Result { - ssts.visit_levels(&mut self)?; + let files = ssts.levels().iter().flat_map(|level| level.files()); + // Now we read all files, so just reserve enough space to hold all files. + self.files_to_read.reserve(files.size_hint().0); + for file in files { + // We can't invoke async functions here, so we collects all files first, and + // create the batch reader later in `ChunkReaderBuilder`. + self.files_to_read.push(file.clone()); + } Ok(self) } @@ -212,21 +219,3 @@ impl ChunkReaderBuilder { file_ts_range.intersects(&predicate) } } - -impl Visitor for ChunkReaderBuilder { - fn visit<'a>( - &mut self, - _level: usize, - files: impl Iterator, - ) -> Result<()> { - // Now we read all files, so just reserve enough space to hold all files. - self.files_to_read.reserve(files.size_hint().0); - for file in files { - // We can't invoke async functions here, so we collects all files first, and - // create the batch reader later in `ChunkReaderBuilder`. - self.files_to_read.push(file.clone()); - } - - Ok(()) - } -} diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index fc365c3a8692..39b3223ed780 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -30,13 +30,13 @@ pub trait Picker: Send + 'static { pub struct PickerContext {} /// L0 -> L1 compaction based on time windows. -pub(crate) struct SimplePicker { +pub(crate) struct SimplePicker { strategy: StrategyRef, _phantom_data: PhantomData, } #[allow(unused)] -impl SimplePicker { +impl SimplePicker { pub fn new(strategy: StrategyRef) -> Self { Self { strategy, diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 80a337151749..9437eb95644c 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -40,18 +40,6 @@ pub type Level = u8; // detail of LevelMetaVec should not be exposed to the user of [LevelMetas]. type LevelMetaVec = [LevelMeta; MAX_LEVEL as usize]; -/// Visitor to access file in each level. -pub trait Visitor { - /// Visit all `files` in `level`. - /// - /// Now the input `files` are unordered. - fn visit<'a>( - &mut self, - level: usize, - files: impl Iterator, - ) -> Result<()>; -} - /// Metadata of all SSTs under a region. /// /// Files are organized into multiple level, though there may be only one level. @@ -105,19 +93,6 @@ impl LevelMetas { merged } - /// Visit all SST files. - /// - /// Stop visiting remaining files if the visitor returns `Err`, and the `Err` - /// will be returned to caller. - pub fn visit_levels(&self, visitor: &mut V) -> Result<()> { - for level in &self.levels { - level.visit_level(visitor)?; - } - - Ok(()) - } - - #[cfg(test)] pub fn levels(&self) -> &[LevelMeta] { &self.levels } @@ -155,10 +130,6 @@ impl LevelMeta { self.files.remove(&file_to_remove.file_name); } - pub fn visit_level(&self, visitor: &mut V) -> Result<()> { - visitor.visit(self.level.into(), self.files.values()) - } - /// Returns the level of level meta. #[inline] pub fn level(&self) -> Level { From 5bf98b99acc07f59f0fd9c7971dbf20ab0336843 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 Feb 2023 11:30:55 +0800 Subject: [PATCH 14/17] fix: some CR comments --- src/storage/src/compaction/task.rs | 58 +++++++++++++++--------------- src/storage/src/sst.rs | 50 ++++++++++++++------------ src/storage/src/sst/parquet.rs | 7 ++-- src/storage/src/version.rs | 10 +++--- 4 files changed, 65 insertions(+), 60 deletions(-) diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index d8bd3dcd3cae..4aaafe2bc962 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::HashSet; -use std::sync::{Arc, Mutex}; use common_telemetry::{error, info}; use object_store::ObjectStore; @@ -46,30 +45,32 @@ pub(crate) struct CompactionTaskImpl { pub manifest: RegionManifest, } +impl Drop for CompactionTaskImpl { + fn drop(&mut self) { + self.mark_files_compacting(false); + } +} + impl CompactionTaskImpl { /// Compacts inputs SSTs, returns `(output file, compacted input file)`. async fn merge_ssts(&mut self) -> Result<(Vec, Vec)> { let mut futs = Vec::with_capacity(self.outputs.len()); - let compacted_inputs = Arc::new(Mutex::new(HashSet::new())); + let mut compacted_inputs = HashSet::new(); for output in self.outputs.drain(..) { let schema = self.schema.clone(); let sst_layer = self.sst_layer.clone(); let object_store = self.sst_layer.object_store(); - let compacted = compacted_inputs.clone(); + compacted_inputs.extend(output.inputs.iter().map(|f| FileMeta { + file_name: f.file_name().to_string(), + time_range: *f.time_range(), + level: f.level(), + })); + + // TODO(hl): Maybe spawn to runtime to exploit in-job parallelism. futs.push(async move { - match output.run(schema, sst_layer, object_store).await { - Ok(meta) => { - compacted - .lock() - .unwrap() - .extend(output.inputs.iter().map(|f| FileMeta { - file_name: f.file_name().to_string(), - time_range: *f.time_range(), - level: f.level(), - })); - Ok(meta) - } + match output.build(schema, sst_layer, object_store).await { + Ok(meta) => Ok(meta), Err(e) => Err(e), } }); @@ -79,8 +80,8 @@ impl CompactionTaskImpl { .await .into_iter() .collect::>()?; - let compacted = compacted_inputs.lock().unwrap().drain().collect(); - Ok((outputs, compacted)) + let inputs = compacted_inputs.into_iter().collect(); + Ok((outputs, inputs)) } /// Writes updated SST info into manifest. @@ -122,16 +123,17 @@ impl CompactionTaskImpl { impl CompactionTask for CompactionTaskImpl { async fn run(mut self) -> Result<()> { self.mark_files_compacting(true); - match self.merge_ssts().await { - Ok((output, compacted)) => { - self.write_manifest_and_apply(output, compacted).await?; - } - Err(e) => { - self.mark_files_compacting(false); - error!(e; "Failed to compact region: {}", self.shared_data.name()); - } - } - Ok(()) + + let (output, compacted) = self.merge_ssts().await.map_err(|e| { + error!(e; "Failed to compact region: {}", self.shared_data.name()); + e + })?; + self.write_manifest_and_apply(output, compacted) + .await + .map_err(|e| { + error!(e; "Failed to update region manifest: {}", self.shared_data.name()); + e + }) } } @@ -158,7 +160,7 @@ pub struct CompactionOutput { } impl CompactionOutput { - async fn run( + async fn build( &self, schema: RegionSchemaRef, sst_layer: AccessLayerRef, diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 9437eb95644c..1caa0a4de48b 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -71,23 +71,19 @@ impl LevelMetas { /// /// # Panics /// Panics if level of [FileHandle] is greater than [MAX_LEVEL]. - pub fn merge(&self, files_to_add: impl Iterator) -> LevelMetas { + pub fn merge( + &self, + files_to_add: impl Iterator, + files_to_remove: impl Iterator, + ) -> LevelMetas { let mut merged = self.clone(); for file in files_to_add { let level = file.level(); merged.levels[level as usize].add_file(file); } - merged - } - /// Removes files with given file meta and builds a new [LevelMetas]. - /// - /// # Panics - /// Panics if level of [FileHandle] is greater than [MAX_LEVEL]. - pub fn remove(&self, files_to_remove: impl Iterator) -> LevelMetas { - let mut merged = self.clone(); for file in files_to_remove { - let level = file.level; + let level = file.level(); merged.levels[level as usize].remove_file(file); } merged @@ -126,8 +122,8 @@ impl LevelMeta { self.files.insert(file.file_name().to_string(), file); } - fn remove_file(&mut self, file_to_remove: FileMeta) { - self.files.remove(&file_to_remove.file_name); + fn remove_file(&mut self, file_to_remove: FileHandle) { + self.files.remove(file_to_remove.file_name()); } /// Returns the level of level meta. @@ -321,22 +317,21 @@ mod tests { use super::*; - fn create_meta(name: &str, level: Level) -> FileMeta { - FileMeta { + fn create_handle(name: &str, level: Level) -> FileHandle { + FileHandle::new(FileMeta { file_name: name.to_string(), time_range: None, level, - } - } - - fn create_handle(name: &str, level: Level) -> FileHandle { - FileHandle::new(create_meta(name, level)) + }) } #[test] fn test_level_metas_add_and_remove() { let metas = LevelMetas::new(); - let merged = metas.merge(vec![create_handle("a", 0), create_handle("b", 0)].into_iter()); + let merged = metas.merge( + vec![create_handle("a", 0), create_handle("b", 0)].into_iter(), + vec![].into_iter(), + ); assert_eq!( HashSet::from(["a".to_string(), "b".to_string()]), @@ -347,7 +342,10 @@ mod tests { .collect() ); - let merged1 = merged.merge(vec![create_handle("c", 1), create_handle("d", 1)].into_iter()); + let merged1 = merged.merge( + vec![create_handle("c", 1), create_handle("d", 1)].into_iter(), + vec![].into_iter(), + ); assert_eq!( HashSet::from(["a".to_string(), "b".to_string()]), merged1 @@ -366,7 +364,10 @@ mod tests { .collect() ); - let removed1 = merged1.remove(vec![create_meta("a", 0), create_meta("c", 0)].into_iter()); + let removed1 = merged1.merge( + vec![].into_iter(), + vec![create_handle("a", 0), create_handle("c", 0)].into_iter(), + ); assert_eq!( HashSet::from(["b".to_string()]), removed1 @@ -385,7 +386,10 @@ mod tests { .collect() ); - let removed2 = removed1.remove(vec![create_meta("c", 1), create_meta("d", 1)].into_iter()); + let removed2 = removed1.merge( + vec![].into_iter(), + vec![create_handle("c", 1), create_handle("d", 1)].into_iter(), + ); assert_eq!( HashSet::from(["b".to_string()]), removed2 diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 1458cf6bf1bf..a544e59ea4e4 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -372,7 +372,6 @@ impl ArrowPredicate for TimestampRowFilter { /// Selects the rows matching given time range. fn evaluate(&mut self, batch: RecordBatch) -> std::result::Result { - let row_cnt = batch.num_rows(); let ts_col = batch.column(self.timestamp_index); macro_rules! downcast_and_compute { @@ -382,10 +381,8 @@ impl ArrowPredicate for TimestampRowFilter { .as_any() .downcast_ref::<$typ>() .unwrap(); // safety: we've checked the data type of timestamp column. - let lower_bound = PrimitiveArray::from_value(self.lower_bound, row_cnt); - let upper_bound = PrimitiveArray::from_value(self.upper_bound, row_cnt); - let left = arrow::compute::gt_eq(ts_col, &lower_bound)?; - let right = arrow::compute::lt(ts_col, &upper_bound)?; + let left = arrow::compute::gt_eq_scalar(ts_col, self.lower_bound)?; + let right = arrow::compute::lt_scalar(ts_col, self.upper_bound)?; arrow::compute::and(&left, &right) } }; diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index 973acc91a06f..4f333dc7c059 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -237,15 +237,17 @@ impl Version { } let handles_to_add = edit.files_to_add.into_iter().map(FileHandle::new); - let merged_ssts = self.ssts.merge(handles_to_add); - let removed_ssts = merged_ssts.remove(edit.files_to_remove.into_iter()); + let merged_ssts = self.ssts.merge( + handles_to_add, + edit.files_to_remove.into_iter().map(FileHandle::new), + ); info!( "After region compaction, region: {}, SST files: {:?}", self.metadata.id(), - removed_ssts + merged_ssts ); - self.ssts = Arc::new(removed_ssts); + self.ssts = Arc::new(merged_ssts); } /// Updates metadata of the version. From 2bb7fb637cdd0d888151c1286112cddf7bad2bdb Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 Feb 2023 14:37:28 +0800 Subject: [PATCH 15/17] fix: add PlainTimestampRowFilter for correctness --- src/storage/src/sst/parquet.rs | 173 +++++++++++++++++++++++++++++---- 1 file changed, 152 insertions(+), 21 deletions(-) diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index a544e59ea4e4..66749d2956fb 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -60,7 +60,6 @@ use crate::schema::compat::ReadAdapter; use crate::schema::{ProjectedSchemaRef, StoreSchema, StoreSchemaRef}; use crate::sst; use crate::sst::SstInfo; - /// Parquet sst writer. pub struct ParquetWriter<'a> { file_path: &'a str, @@ -317,39 +316,69 @@ impl<'a> ParquetReader<'a> { _ => unreachable!(), }; - // build lower and upper bound according to time range and timestamp column data type. - let lower = self - .time_range - .start() - .and_then(|s| s.convert_to(ts_col_unit)) - .map(|t| t.value()) - .unwrap_or(i64::MIN); + let projection = ProjectionMask::roots(schema_desc, vec![ts_col_idx]); - let upper = self - .time_range - .end() - .and_then(|s| s.convert_to_ceil(ts_col_unit)) // convert to ceil to relax time range and prevent data loss caused by rounding error. - .map(|t| t.value()) - .unwrap_or(i64::MAX); + // checks if converting time range unit into ts col unit will result into rounding error. + if time_unit_lossy(&self.time_range, ts_col_unit) { + let filter = RowFilter::new(vec![Box::new(PlainTimestampRowFilter::new( + ts_col_idx, + self.time_range, + projection, + ))]); + return Some(filter); + } - let projection = ProjectionMask::roots(schema_desc, vec![ts_col_idx]); - let filter = RowFilter::new(vec![Box::new(TimestampRowFilter::new( - ts_col_idx, projection, lower, upper, - ))]); + // If any of the conversion overflows, we cannot use arrow's computation method, instead + // we resort to plain filter that compares timestamp with given range, less efficient, + // but simpler. + // TODO(hl): If the range is gt_eq/lt, we also use PlainTimestampRowFilter, but these cases + // can also use arrow's gt_eq_scalar/lt_scalar methods. + let row_filter = if let (Some(lower), Some(upper)) = ( + self.time_range + .start() + .and_then(|s| s.convert_to(ts_col_unit)) + .map(|t| t.value()), + self.time_range + .end() + .and_then(|s| s.convert_to(ts_col_unit)) // convert to ceil to relax time range and prevent data loss caused by rounding error. + .map(|t| t.value()), + ) { + Box::new(FastTimestampRowFilter::new( + ts_col_idx, projection, lower, upper, + )) as _ + } else { + Box::new(PlainTimestampRowFilter::new( + ts_col_idx, + self.time_range, + projection, + )) as _ + }; + let filter = RowFilter::new(vec![row_filter]); Some(filter) } } +fn time_unit_lossy(range: &TimestampRange, ts_col_unit: TimeUnit) -> bool { + range + .start() + .map(|start| start.unit().factor() < ts_col_unit.factor()) + .unwrap_or(false) + || range + .end() + .map(|end| end.unit().factor() < ts_col_unit.factor()) + .unwrap_or(false) +} + /// `TimestampRowFilter` is used to filter rows within given timestamp range when reading /// row groups from parquet files, while avoids fetching all columns from SSTs file. -struct TimestampRowFilter { +struct FastTimestampRowFilter { timestamp_index: usize, lower_bound: i64, upper_bound: i64, projection: ProjectionMask, } -impl TimestampRowFilter { +impl FastTimestampRowFilter { fn new( ts_col_idx: usize, projection: ProjectionMask, @@ -365,7 +394,7 @@ impl TimestampRowFilter { } } -impl ArrowPredicate for TimestampRowFilter { +impl ArrowPredicate for FastTimestampRowFilter { fn projection(&self) -> &ProjectionMask { &self.projection } @@ -411,6 +440,74 @@ impl ArrowPredicate for TimestampRowFilter { } } +/// [PlainTimestampRowFilter] iterates each element in timestamp column, build a [Timestamp] struct +/// and checks if given time range contains the timestamp. +struct PlainTimestampRowFilter { + timestamp_index: usize, + time_range: TimestampRange, + projection: ProjectionMask, +} + +impl PlainTimestampRowFilter { + fn new(timestamp_index: usize, time_range: TimestampRange, projection: ProjectionMask) -> Self { + Self { + timestamp_index, + time_range, + projection, + } + } +} + +impl ArrowPredicate for PlainTimestampRowFilter { + fn projection(&self) -> &ProjectionMask { + &self.projection + } + + fn evaluate(&mut self, batch: RecordBatch) -> std::result::Result { + let ts_col = batch.column(self.timestamp_index); + + macro_rules! downcast_and_compute { + ($array_ty: ty, $unit: ident) => {{ + let ts_col = ts_col + .as_any() + .downcast_ref::<$array_ty>() + .unwrap(); // safety: we've checked the data type of timestamp column. + Ok(BooleanArray::from_iter(ts_col.iter().map(|ts| { + ts.map(|val| { + Timestamp::new(val, TimeUnit::$unit) + }).map(|ts| { + self.time_range.contains(&ts) + }) + }))) + + }}; + } + + match ts_col.data_type() { + DataType::Timestamp(unit, _) => match unit { + arrow::datatypes::TimeUnit::Second => { + downcast_and_compute!(TimestampSecondArray, Second) + } + arrow::datatypes::TimeUnit::Millisecond => { + downcast_and_compute!(TimestampMillisecondArray, Millisecond) + } + arrow::datatypes::TimeUnit::Microsecond => { + downcast_and_compute!(TimestampMicrosecondArray, Microsecond) + } + arrow::datatypes::TimeUnit::Nanosecond => { + downcast_and_compute!(TimestampNanosecondArray, Nanosecond) + } + }, + DataType::Int64 => { + downcast_and_compute!(PrimitiveArray, Millisecond) + } + _ => { + unreachable!() + } + } + } +} + pub type SendableChunkStream = Pin> + Send>>; pub struct ChunkStream { @@ -861,4 +958,38 @@ mod tests { ) .await; } + + fn check_unit_lossy(range_unit: TimeUnit, col_unit: TimeUnit, expect: bool) { + assert_eq!( + expect, + time_unit_lossy( + &TimestampRange::with_unit(0, 1, range_unit).unwrap(), + col_unit + ) + ) + } + + #[test] + fn test_time_unit_lossy() { + // converting a range with unit second to millisecond will not cause rounding error + check_unit_lossy(TimeUnit::Second, TimeUnit::Second, false); + check_unit_lossy(TimeUnit::Second, TimeUnit::Millisecond, false); + check_unit_lossy(TimeUnit::Second, TimeUnit::Microsecond, false); + check_unit_lossy(TimeUnit::Second, TimeUnit::Nanosecond, false); + + check_unit_lossy(TimeUnit::Millisecond, TimeUnit::Second, true); + check_unit_lossy(TimeUnit::Millisecond, TimeUnit::Millisecond, false); + check_unit_lossy(TimeUnit::Millisecond, TimeUnit::Microsecond, false); + check_unit_lossy(TimeUnit::Millisecond, TimeUnit::Nanosecond, false); + + check_unit_lossy(TimeUnit::Microsecond, TimeUnit::Second, true); + check_unit_lossy(TimeUnit::Microsecond, TimeUnit::Millisecond, true); + check_unit_lossy(TimeUnit::Microsecond, TimeUnit::Microsecond, false); + check_unit_lossy(TimeUnit::Microsecond, TimeUnit::Nanosecond, false); + + check_unit_lossy(TimeUnit::Nanosecond, TimeUnit::Second, true); + check_unit_lossy(TimeUnit::Nanosecond, TimeUnit::Millisecond, true); + check_unit_lossy(TimeUnit::Nanosecond, TimeUnit::Microsecond, true); + check_unit_lossy(TimeUnit::Nanosecond, TimeUnit::Nanosecond, false); + } } From 27e39bd43ebbfd24803e31a1ce5b712cfe2f448f Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 Feb 2023 15:02:50 +0800 Subject: [PATCH 16/17] fix: cr comments --- src/storage/src/compaction/dedup_deque.rs | 9 ++++++--- src/storage/src/compaction/scheduler.rs | 4 ++-- src/storage/src/sst/parquet.rs | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/storage/src/compaction/dedup_deque.rs b/src/storage/src/compaction/dedup_deque.rs index 391f645f413d..83c68f0ae2bc 100644 --- a/src/storage/src/compaction/dedup_deque.rs +++ b/src/storage/src/compaction/dedup_deque.rs @@ -23,13 +23,16 @@ pub struct DedupDeque { existing: HashMap, } -impl DedupDeque { - pub fn new_empty() -> Self { +impl Default for DedupDeque { + fn default() -> Self { Self { deque: VecDeque::new(), existing: HashMap::new(), } } +} + +impl DedupDeque { /// Pushes a key value to the back of deque. /// Returns true if the deque does not already contain value with the same key, otherwise /// returns false. @@ -88,7 +91,7 @@ mod tests { #[test] fn test_dedup_deque() { - let mut deque = DedupDeque::new_empty(); + let mut deque = DedupDeque::default(); assert!(deque.push_back(1, "hello".to_string())); assert_eq!(1, deque.len()); assert!(deque.push_back(2, "world".to_string())); diff --git a/src/storage/src/compaction/scheduler.rs b/src/storage/src/compaction/scheduler.rs index c9862e367957..17bbad72710a 100644 --- a/src/storage/src/compaction/scheduler.rs +++ b/src/storage/src/compaction/scheduler.rs @@ -133,7 +133,7 @@ where P: Picker + Send + Sync, { let request_queue: Arc>> = - Arc::new(RwLock::new(DedupDeque::new_empty())); + Arc::new(RwLock::new(DedupDeque::default())); let cancel_token = CancellationToken::new(); let task_notifier = Arc::new(Notify::new()); @@ -301,7 +301,7 @@ mod tests { #[tokio::test] async fn test_schedule_handler() { common_telemetry::init_default_ut_logging(); - let queue = Arc::new(RwLock::new(DedupDeque::new_empty())); + let queue = Arc::new(RwLock::new(DedupDeque::default())); let latch = Arc::new(CountdownLatch::new(2)); let latch_cloned = latch.clone(); let picker = MockPicker::new(vec![Arc::new(move || { diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 66749d2956fb..9205bc5419cf 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -340,7 +340,7 @@ impl<'a> ParquetReader<'a> { .map(|t| t.value()), self.time_range .end() - .and_then(|s| s.convert_to(ts_col_unit)) // convert to ceil to relax time range and prevent data loss caused by rounding error. + .and_then(|s| s.convert_to(ts_col_unit)) .map(|t| t.value()), ) { Box::new(FastTimestampRowFilter::new( From d58b041ccc009c87e50da16ca3a96e069109f687 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 Feb 2023 17:12:35 +0800 Subject: [PATCH 17/17] fix: some typos --- Cargo.lock | 1 - src/datatypes/Cargo.toml | 1 - src/storage/src/sst/parquet.rs | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 263cdfbe4e78..03b4025bd75d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2222,7 +2222,6 @@ name = "datatypes" version = "0.1.0" dependencies = [ "arrow", - "arrow-array", "arrow-schema", "common-base", "common-error", diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index 8eb03aa2dce9..0d0158a3b0ab 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -9,7 +9,6 @@ default = [] test = [] [dependencies] -arrow-array.workspace = true arrow.workspace = true arrow-schema.workspace = true common-base = { path = "../common/base" } diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 9205bc5419cf..972894e40469 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -369,7 +369,7 @@ fn time_unit_lossy(range: &TimestampRange, ts_col_unit: TimeUnit) -> bool { .unwrap_or(false) } -/// `TimestampRowFilter` is used to filter rows within given timestamp range when reading +/// `FastTimestampRowFilter` is used to filter rows within given timestamp range when reading /// row groups from parquet files, while avoids fetching all columns from SSTs file. struct FastTimestampRowFilter { timestamp_index: usize,