Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compaction reader and writer #972

Merged
merged 17 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ license = "Apache-2.0"

[workspace.dependencies]
arrow = "29.0"
arrow-array = "29.0"
arrow-flight = "29.0"
arrow-schema = { version = "29.0", features = ["serde"] }
async-stream = "0.3"
Expand Down
1 change: 1 addition & 0 deletions src/common/time/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![feature(int_roundings)]
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
68 changes: 68 additions & 0 deletions src/common/time/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ impl Timestamp {
}
}

/// Convert a timestamp to given time unit.
/// Conversion from a timestamp with smaller unit to a larger unit will round the value
/// to ceil (positive infinity).
/// Return `None` if conversion causes overflow.
pub fn convert_to_ceil(&self, unit: TimeUnit) -> Option<Timestamp> {
if self.unit().factor() >= unit.factor() {
let mul = self.unit().factor() / unit.factor();
let value = self.value.checked_mul(mul)?;
Some(Timestamp::new(value, unit))
} else {
let mul = unit.factor() / self.unit().factor();
Some(Timestamp::new(self.value.div_ceil(mul), unit))
}
}

/// Split a [Timestamp] into seconds part and nanoseconds part.
/// Notice the seconds part of split result is always rounded down to floor.
fn split(&self) -> (i64, i64) {
Expand Down Expand Up @@ -718,4 +733,57 @@ mod tests {
Timestamp::new(i64::MAX, TimeUnit::Second).split()
);
}

#[test]
fn test_convert_to_ceil() {
assert_eq!(
Timestamp::new(1, TimeUnit::Second),
Timestamp::new(1000, TimeUnit::Millisecond)
.convert_to_ceil(TimeUnit::Second)
.unwrap()
);

// These two cases shows how `Timestamp::convert_to_ceil` behaves differently
// from `Timestamp::convert_to` when converting larger unit to smaller unit.
assert_eq!(
Timestamp::new(1, TimeUnit::Second),
Timestamp::new(1001, TimeUnit::Millisecond)
.convert_to(TimeUnit::Second)
.unwrap()
);
assert_eq!(
Timestamp::new(2, TimeUnit::Second),
Timestamp::new(1001, TimeUnit::Millisecond)
.convert_to_ceil(TimeUnit::Second)
.unwrap()
);

assert_eq!(
Timestamp::new(-1, TimeUnit::Second),
Timestamp::new(-1, TimeUnit::Millisecond)
.convert_to(TimeUnit::Second)
.unwrap()
);
assert_eq!(
Timestamp::new(0, TimeUnit::Second),
Timestamp::new(-1, TimeUnit::Millisecond)
.convert_to_ceil(TimeUnit::Second)
.unwrap()
);

// When converting large unit to smaller unit, there will be no rounding error,
// so `Timestamp::convert_to_ceil` behaves just like `Timestamp::convert_to`
assert_eq!(
Timestamp::new(-1, TimeUnit::Second).convert_to(TimeUnit::Millisecond),
Timestamp::new(-1, TimeUnit::Second).convert_to_ceil(TimeUnit::Millisecond)
);
assert_eq!(
Timestamp::new(1000, TimeUnit::Second).convert_to(TimeUnit::Millisecond),
Timestamp::new(1000, TimeUnit::Second).convert_to_ceil(TimeUnit::Millisecond)
);
assert_eq!(
Timestamp::new(1, TimeUnit::Second).convert_to(TimeUnit::Millisecond),
Timestamp::new(1, TimeUnit::Second).convert_to_ceil(TimeUnit::Millisecond)
);
}
}
3 changes: 2 additions & 1 deletion src/mito/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl<R: Region> Table for MitoTable<R> {
.context(table_error::TableOperationSnafu)?
.reader;

let schema = reader.schema().clone();
let schema = reader.user_schema().clone();
if let Some(first_schema) = &first_schema {
// TODO(hl): we assume all regions' schemas are the same, but undergoing table altering
// may make these schemas inconsistent.
Expand All @@ -198,6 +198,7 @@ impl<R: Region> Table for MitoTable<R> {
let stream = Box::pin(async_stream::try_stream! {
for mut reader in readers {
while let Some(chunk) = reader.next_chunk().await.map_err(BoxedError::new).context(ExternalSnafu)? {
let chunk = reader.project_chunk(chunk);
yield RecordBatch::new(stream_schema.clone(), chunk.columns)?
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/mito/src/table/test_util/mock_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct MockChunkReader {
impl ChunkReader for MockChunkReader {
type Error = MockError;

fn schema(&self) -> &SchemaRef {
fn user_schema(&self) -> &SchemaRef {
&self.schema
}

Expand All @@ -69,6 +69,10 @@ impl ChunkReader for MockChunkReader {

Ok(Some(Chunk::new(columns)))
}

fn project_chunk(&self, chunk: Chunk) -> Chunk {
chunk
}
}

pub struct MockSnapshot {
Expand Down
4 changes: 4 additions & 0 deletions src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ arc-swap = "1.0"
async-compat = "0.2"
async-stream.workspace = true
async-trait = "0.1"
arrow.workspace = true
arrow-array.workspace = true
bytes = "1.1"
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
Expand All @@ -18,6 +20,8 @@ common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
datafusion-common.workspace = true
datafusion-expr.workspace = true
futures.workspace = true
futures-util.workspace = true
lazy_static = "1.4"
Expand Down
64 changes: 39 additions & 25 deletions src/storage/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use table::predicate::{Predicate, TimeRangePredicateBuilder};

use crate::error::{self, Error, Result};
use crate::memtable::{IterContext, MemtableRef};
use crate::read::{BoxedBatchReader, DedupReader, MergeReaderBuilder};
use crate::read::{Batch, BoxedBatchReader, DedupReader, MergeReaderBuilder};
use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef};
use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor};
use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions};

/// Chunk reader implementation.
// Now we use async-trait to implement the chunk reader, which is easier to implement than
Expand All @@ -41,7 +41,7 @@ pub struct ChunkReaderImpl {
impl ChunkReader for ChunkReaderImpl {
type Error = Error;

fn schema(&self) -> &SchemaRef {
fn user_schema(&self) -> &SchemaRef {
self.schema.projected_user_schema()
}

Expand All @@ -50,10 +50,14 @@ impl ChunkReader for ChunkReaderImpl {
Some(b) => b,
None => return Ok(None),
};
Ok(Some(Chunk::new(batch.columns)))
}

let chunk = self.schema.batch_to_chunk(&batch);

Ok(Some(chunk))
fn project_chunk(&self, chunk: Chunk) -> Chunk {
let batch = Batch {
columns: chunk.columns,
};
self.schema.batch_to_chunk(&batch)
}
}

Expand All @@ -64,6 +68,11 @@ impl ChunkReaderImpl {
batch_reader,
}
}

#[inline]
pub fn projected_schema(&self) -> &ProjectedSchemaRef {
&self.schema
}
}

/// Builder to create a new [ChunkReaderImpl] from scan request.
Expand Down Expand Up @@ -121,14 +130,34 @@ impl ChunkReaderBuilder {
self
}

pub fn pick_ssts(mut self, ssts: &LevelMetas) -> Result<Self> {
ssts.visit_levels(&mut self)?;

/// Picks all SSTs in all levels
pub fn pick_all_ssts(mut self, ssts: &LevelMetas) -> Result<Self> {
let files = ssts.levels().iter().flat_map(|level| level.files());
// Now we read all files, so just reserve enough space to hold all files.
self.files_to_read.reserve(files.size_hint().0);
for file in files {
// We can't invoke async functions here, so we collects all files first, and
// create the batch reader later in `ChunkReaderBuilder`.
self.files_to_read.push(file.clone());
}
Ok(self)
}

/// Picks given SSTs to read.
pub fn pick_ssts(mut self, ssts: &[FileHandle]) -> Self {
for file in ssts {
self.files_to_read.push(file.clone());
}
self
}

pub async fn build(mut self) -> Result<ChunkReaderImpl> {
let time_range_predicate = self.build_time_range_predicate();
debug!(
"Time range predicate for chunk reader: {:?}",
time_range_predicate
);

let schema = Arc::new(
ProjectedSchema::new(self.schema, self.projection)
.context(error::InvalidProjectionSnafu)?,
Expand All @@ -148,6 +177,7 @@ impl ChunkReaderBuilder {
batch_size: self.iter_ctx.batch_size,
projected_schema: schema.clone(),
predicate: Predicate::new(self.filters),
time_range: time_range_predicate,
};
for file in &self.files_to_read {
if !Self::file_in_range(file, time_range_predicate) {
Expand Down Expand Up @@ -189,19 +219,3 @@ impl ChunkReaderBuilder {
file_ts_range.intersects(&predicate)
}
}

impl Visitor for ChunkReaderBuilder {
fn visit(&mut self, _level: usize, files: &[FileHandle]) -> Result<()> {
// TODO(yingwen): Filter files by time range.

// Now we read all files, so just reserve enough space to hold all files.
self.files_to_read.reserve(files.len());
for file in files {
// We can't invoke async functions here, so we collects all files first, and
// create the batch reader later in `ChunkReaderBuilder`.
self.files_to_read.push(file.clone());
}

Ok(())
}
}
1 change: 1 addition & 0 deletions src/storage/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ mod rate_limit;
mod scheduler;
mod strategy;
mod task;
mod writer;
10 changes: 9 additions & 1 deletion src/storage/src/compaction/dedup_deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ use std::fmt::{Debug, Formatter};
use std::hash::Hash;

/// Deque with key deduplication.
#[derive(Default)]
pub struct DedupDeque<K, V> {
deque: VecDeque<K>,
existing: HashMap<K, V>,
}

impl<K, V> Default for DedupDeque<K, V> {
fn default() -> Self {
Self {
deque: VecDeque::new(),
existing: HashMap::new(),
}
}
}

impl<K: Eq + Hash + Clone, V> DedupDeque<K, V> {
/// Pushes a key value to the back of deque.
/// Returns true if the deque does not already contain value with the same key, otherwise
Expand Down
40 changes: 29 additions & 11 deletions src/storage/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::marker::PhantomData;

use common_telemetry::debug;
use store_api::logstore::LogStore;

use crate::compaction::scheduler::CompactionRequestImpl;
use crate::compaction::strategy::StrategyRef;
Expand All @@ -26,37 +29,52 @@ pub trait Picker<R, T: CompactionTask>: Send + 'static {

pub struct PickerContext {}

/// L0 -> L1 all-to-all compaction based on time windows.
pub(crate) struct SimplePicker {
/// L0 -> L1 compaction based on time windows.
pub(crate) struct SimplePicker<S> {
strategy: StrategyRef,
_phantom_data: PhantomData<S>,
}

#[allow(unused)]
impl SimplePicker {
impl<S> SimplePicker<S> {
pub fn new(strategy: StrategyRef) -> Self {
Self { strategy }
Self {
strategy,
_phantom_data: Default::default(),
}
}
}

impl Picker<CompactionRequestImpl, CompactionTaskImpl> for SimplePicker {
impl<S: LogStore> Picker<CompactionRequestImpl<S>, CompactionTaskImpl<S>> for SimplePicker<S> {
fn pick(
&self,
ctx: &PickerContext,
req: &CompactionRequestImpl,
) -> crate::error::Result<Option<CompactionTaskImpl>> {
let levels = req.levels();
req: &CompactionRequestImpl<S>,
) -> crate::error::Result<Option<CompactionTaskImpl<S>>> {
let levels = &req.levels;

for level_num in 0..levels.level_num() {
let level = levels.level(level_num as u8);
let outputs = self.strategy.pick(ctx, level);

if outputs.is_empty() {
debug!("No SST file can be compacted at level {}", level_num);
return Ok(None);
continue;
}

debug!("Found SST files to compact {:?}", outputs);
// TODO(hl): build compaction task
debug!(
"Found SST files to compact {:?} on level: {}",
outputs, level_num
);
return Ok(Some(CompactionTaskImpl {
schema: req.schema.clone(),
sst_layer: req.sst_layer.clone(),
outputs,
writer: req.writer.clone(),
shared_data: req.shared.clone(),
wal: req.wal.clone(),
manifest: req.manifest.clone(),
}));
}

Ok(None)
Expand Down
Loading