Skip to content

Commit

Permalink
feat: Support scanning the mutable part (GreptimeTeam#4)
Browse files Browse the repository at this point in the history
* feat: Implement iter for merge tree memtable

* feat: define iter

* feat: expose VectorOp

* feat: allow clone primitive and binary vector

* feat: prune plain vectors

* feat: sort and dedup

* feat: finish scan part

* feat: impl iter for the memtable

* style: fix clippy

* chore: todo

* chore: remove comment

* refactor: move test fn to memtable_util

* test: add tests and fix tests issues

* chore: fix compiler issues

* feat: remove dict block

* feat: implement is_empty

* feat: implement mark_immutable()

* refactor: rename schema_for_test to metadata_for_test

* fix: iter issues with the merge tree

* test: test projection

* fix: iter return empty if no primary key

* fix: dedup without pk

* test: test projection for no pk

* fix: do not filter deleted in memtable

* feat: metrics for scan

* fix: num rows metrics without prunning

* perf: avoid converting batch if nothing to prune

* fix: add primary key usage to key_bytes
  • Loading branch information
evenyag authored Jan 30, 2024
1 parent 1c30976 commit e2a6b43
Show file tree
Hide file tree
Showing 11 changed files with 1,058 additions and 338 deletions.
2 changes: 1 addition & 1 deletion src/datatypes/src/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::data_type::ConcreteDataType;
use crate::error::{self, Result};
use crate::serialize::Serializable;
use crate::value::{Value, ValueRef};
use crate::vectors::operations::VectorOp;
pub use crate::vectors::operations::VectorOp;

mod binary;
mod boolean;
Expand Down
2 changes: 1 addition & 1 deletion src/datatypes/src/vectors/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::value::{Value, ValueRef};
use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef};

/// Vector of binary strings.
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct BinaryVector {
array: BinaryArray,
}
Expand Down
8 changes: 8 additions & 0 deletions src/datatypes/src/vectors/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ pub struct PrimitiveVector<T: LogicalPrimitiveType> {
array: PrimitiveArray<T::ArrowPrimitive>,
}

impl<T: LogicalPrimitiveType> Clone for PrimitiveVector<T> {
fn clone(&self) -> Self {
Self {
array: self.array.clone(),
}
}
}

impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
pub fn new(array: PrimitiveArray<T::ArrowPrimitive>) -> Self {
Self { array }
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl RegionFlushTask {
}

let file_id = FileId::random();
let iter = mem.iter(None, None);
let iter = mem.iter(None, None)?;
let source = Source::Iter(iter);
let create_inverted_index = self.engine_config.inverted_index.create_on_flush.auto();
let mem_threshold_index_create = self
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> BoxedBatchIterator;
) -> Result<BoxedBatchIterator>;

/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;
Expand Down
207 changes: 177 additions & 30 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::Arc;

use common_base::readable_size::ReadableSize;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
Expand All @@ -38,28 +37,8 @@ use crate::memtable::{
};

/// Config for the merge tree memtable.
#[derive(Debug, Clone)]
pub struct MergeTreeConfig {
/// Enable dictionary.
enable_dict: bool,
/// Number of keys in a dictionary.
dict_key_num: usize,
/// Maximum bytes of keys in a dictionary.
dict_key_bytes: ReadableSize,
/// Max number of dictionaries.
max_dict_num: usize,
}

impl Default for MergeTreeConfig {
fn default() -> Self {
Self {
enable_dict: true,
dict_key_num: 50_000,
dict_key_bytes: ReadableSize::kb(32),
max_dict_num: 16,
}
}
}
#[derive(Debug, Default, Clone)]
pub struct MergeTreeConfig {}

/// Memtable based on a merge tree.
pub struct MergeTreeMemtable {
Expand All @@ -80,7 +59,7 @@ impl fmt::Debug for MergeTreeMemtable {

impl Memtable for MergeTreeMemtable {
fn id(&self) -> MemtableId {
todo!()
self.id
}

fn write(&self, kvs: &KeyValues) -> Result<()> {
Expand All @@ -96,18 +75,18 @@ impl Memtable for MergeTreeMemtable {

fn iter(
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
) -> BoxedBatchIterator {
todo!()
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Result<BoxedBatchIterator> {
self.tree.scan(projection, predicate)
}

fn is_empty(&self) -> bool {
todo!()
self.tree.is_empty()
}

fn mark_immutable(&self) {
todo!()
self.alloc_tracker.done_allocating();
}

fn stats(&self) -> MemtableStats {
Expand Down Expand Up @@ -233,3 +212,171 @@ impl MemtableBuilder for MergeTreeMemtableBuilder {
))
}
}

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;

use common_time::Timestamp;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};

use super::*;
use crate::test_util::memtable_util;

#[test]
fn test_memtable_sorted_input() {
write_iter_sorted_input(true);
write_iter_sorted_input(false);
}

fn write_iter_sorted_input(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_for_test()
} else {
memtable_util::metadata_with_primary_key(vec![])
};
let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 42, 100);
let memtable = MergeTreeMemtable::new(1, metadata, None, &MergeTreeConfig::default());
memtable.write(&kvs).unwrap();

let expected_ts = kvs
.iter()
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<BTreeSet<_>>();

let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<BTreeSet<_>>();
assert_eq!(expected_ts, read);

let stats = memtable.stats();
assert!(stats.bytes_allocated() > 0);
assert_eq!(
Some((
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(99)
)),
stats.time_range()
);
}

#[test]
fn test_memtable_unsorted_input() {
write_iter_unsorted_input(true);
write_iter_unsorted_input(false);
}

fn write_iter_unsorted_input(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_for_test()
} else {
memtable_util::metadata_with_primary_key(vec![])
};
let memtable =
MergeTreeMemtable::new(1, metadata.clone(), None, &MergeTreeConfig::default());

let kvs = memtable_util::build_key_values_with_ts_seq(
&metadata,
"hello".to_string(),
0,
[1, 3, 7, 5, 6].into_iter(),
0, // sequence 0, 1, 2, 3, 4
);
memtable.write(&kvs).unwrap();

let kvs = memtable_util::build_key_values_with_ts_seq(
&metadata,
"hello".to_string(),
0,
[5, 2, 4, 0, 7].into_iter(),
5, // sequence 5, 6, 7, 8, 9
);
memtable.write(&kvs).unwrap();

let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);

let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.sequences()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap())
.collect::<Vec<_>>();
assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);

let stats = memtable.stats();
assert!(stats.bytes_allocated() > 0);
assert_eq!(
Some((Timestamp::new_millisecond(0), Timestamp::new_millisecond(7))),
stats.time_range()
);
}

#[test]
fn test_memtable_projection() {
write_iter_projection(true);
write_iter_projection(false);
}

fn write_iter_projection(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_for_test()
} else {
memtable_util::metadata_with_primary_key(vec![])
};
let memtable = MergeTreeMemtableBuilder::new(None).build(&metadata);

let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, 100);
memtable.write(&kvs).unwrap();
let iter = memtable.iter(Some(&[3]), None).unwrap();

let mut v0_all = vec![];
for res in iter {
let batch = res.unwrap();
assert_eq!(1, batch.fields().len());
let v0 = batch
.fields()
.first()
.unwrap()
.data
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
}
assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
}
}
Loading

0 comments on commit e2a6b43

Please sign in to comment.