Skip to content

Commit

Permalink
chore(query): add more logs on aggregation (#16552)
Browse files Browse the repository at this point in the history
* add consume_convert_blocks:

* update

* add tests

* add more logs

* add more logs
  • Loading branch information
sundy-li authored Oct 14, 2024
1 parent 7548f99 commit d249097
Show file tree
Hide file tree
Showing 18 changed files with 271 additions and 76 deletions.
29 changes: 0 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 13 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,26 @@ useless_format = "allow"
mutable_key_type = "allow"
result_large_err = "allow"

## DONT'T DELETE THIS: If we want best performance, we should use this profile but it will take longer time to compile.
## Test SQL:
## select sum(number) from numbers_mt(10000000000); ~ 3x performance
## select max(number) from numbers_mt(10000000000); ~ 3x performance
# [profile.release]
# debug = 1
# lto = "thin"
# overflow-checks = false
# incremental = false
# codegen-units = 1

[profile.release]
debug = 1
lto = "thin"
overflow-checks = false
opt-level = "s" ## defaults to be 3
incremental = false
opt-level = "s"

# codegen-units = 1 # Reduce number of codegen units to increase optimizations.

# [profile.release.package]
# arrow2 = { codegen-units = 4 }
# common-functions = { codegen-units = 16 }
# databend-common-arrow = { codegen-units = 16 }
# databend-query = { codegen-units = 4 }
# databend-binaries = { codegen-units = 4 }

Expand Down
2 changes: 2 additions & 0 deletions src/query/expression/src/aggregate/aggregate_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct AggregateHashTable {
// use for append rows directly during deserialize
pub direct_append: bool,
pub config: HashTableConfig,

current_radix_bits: u64,
entries: Vec<Entry>,
count: usize,
Expand Down Expand Up @@ -585,6 +586,7 @@ impl AggregateHashTable {
.iter()
.map(|arena| arena.allocated_bytes())
.sum::<usize>()
+ self.entries.len() * std::mem::size_of::<Entry>()
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,18 @@ impl DataBlock {
self.columns().iter().map(|entry| entry.memory_size()).sum()
}

pub fn consume_convert_to_full(self) -> Self {
if self
.columns()
.iter()
.all(|entry| entry.value.as_column().is_some())
{
return self;
}

self.convert_to_full()
}

pub fn convert_to_full(&self) -> Self {
let columns = self
.columns()
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/converts/arrow/to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl DataBlock {
let arrow_schema = table_schema_to_arrow_schema(table_schema);
let mut arrays = Vec::with_capacity(self.columns().len());
for (entry, arrow_field) in self
.convert_to_full()
.consume_convert_to_full()
.columns()
.iter()
.zip(arrow_schema.fields())
Expand Down
50 changes: 50 additions & 0 deletions src/query/expression/tests/it/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use databend_common_expression::FromData;
use databend_common_expression::SortColumnDescription;

use crate::common::new_block;
use crate::rand_block_for_all_types;

#[test]
fn test_block_sort() -> Result<()> {
Expand Down Expand Up @@ -201,3 +202,52 @@ fn test_block_sort() -> Result<()> {

Ok(())
}

#[test]
fn sort_concat() {
// Sort(Sort A || Sort B) = Sort (A || B)
use databend_common_expression::DataBlock;
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::Rng;

let mut rng = rand::thread_rng();
let num_blocks = 100;

for _i in 0..num_blocks {
let block_a = rand_block_for_all_types(rng.gen_range(0..100));
let block_b = rand_block_for_all_types(rng.gen_range(0..100));

let mut sort_index: Vec<usize> = (0..block_a.num_columns()).collect();
sort_index.shuffle(&mut rng);

let sort_desc = sort_index
.iter()
.map(|i| SortColumnDescription {
offset: *i,
asc: rng.gen_bool(0.5),
nulls_first: rng.gen_bool(0.5),
is_nullable: rng.gen_bool(0.5),
})
.collect_vec();

let concat_ab_0 = DataBlock::concat(&[block_a.clone(), block_b.clone()]).unwrap();

let sort_a = DataBlock::sort(&block_a, &sort_desc, None).unwrap();
let sort_b = DataBlock::sort(&block_b, &sort_desc, None).unwrap();
let concat_ab_1 = DataBlock::concat(&[sort_a, sort_b]).unwrap();

let block_1 = DataBlock::sort(&concat_ab_0, &sort_desc, None).unwrap();
let block_2 = DataBlock::sort(&concat_ab_1, &sort_desc, None).unwrap();

assert_eq!(block_1.num_columns(), block_2.num_columns());
assert_eq!(block_1.num_rows(), block_2.num_rows());

let columns_1 = block_1.columns();
let columns_2 = block_2.columns();
for idx in 0..columns_1.len() {
assert_eq!(columns_1[idx].data_type, columns_2[idx].data_type);
assert_eq!(columns_1[idx].value, columns_2[idx].value);
}
}
}
1 change: 0 additions & 1 deletion src/query/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ lexical-core = "0.8.5"
libm = "0.2.6"
match-template = { workspace = true }
md-5 = "0.10.5"
multiversion = "0.7.4"
naive-cityhash = "0.2.0"
num-traits = "0.2.15"
once_cell = { workspace = true }
Expand Down
31 changes: 31 additions & 0 deletions src/query/functions/src/aggregates/aggregate_min_max_any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use borsh::BorshDeserialize;
use borsh::BorshSerialize;
use databend_common_arrow::arrow::bitmap::Bitmap;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::decimal::*;
Expand Down Expand Up @@ -92,6 +93,36 @@ where
Ok(())
}

fn add_batch(
&mut self,
other: T::Column,
validity: Option<&Bitmap>,
function_data: Option<&dyn FunctionData>,
) -> Result<()> {
let column_len = T::column_len(&other);
if column_len == 0 {
return Ok(());
}

let column_iter = T::iter_column(&other);
if let Some(validity) = validity {
if validity.unset_bits() == column_len {
return Ok(());
}
for (data, valid) in column_iter.zip(validity.iter()) {
if valid {
let _ = self.add(data, function_data);
}
}
} else {
let v = column_iter.reduce(|l, r| if !C::change_if(&l, &r) { l } else { r });
if let Some(v) = v {
let _ = self.add(v, function_data);
}
}
Ok(())
}

fn merge(&mut self, rhs: &Self) -> Result<()> {
if let Some(v) = &rhs.value {
self.add(T::to_scalar_ref(v), None)?;
Expand Down
42 changes: 29 additions & 13 deletions src/query/functions/src/aggregates/aggregate_sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use borsh::BorshDeserialize;
use borsh::BorshSerialize;
use databend_common_arrow::arrow::bitmap::Bitmap;
use databend_common_arrow::arrow::buffer::Buffer;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::decimal::*;
Expand Down Expand Up @@ -80,21 +81,33 @@ where
}
}

#[multiversion::multiversion(targets("x86_64+avx", "x86_64+sse"))]
fn sum_batch<T, N>(other: T::Column) -> N::Scalar
// #[multiversion::multiversion(targets("x86_64+avx", "x86_64+sse"))]
#[inline]
pub fn sum_batch<T, TSum>(inner: Buffer<T>, validity: Option<&Bitmap>) -> TSum
where
T: ValueType + Sync + Send,
N: ValueType,
T::Scalar: Number + AsPrimitive<N::Scalar>,
N::Scalar: Number + AsPrimitive<f64> + std::ops::AddAssign,
for<'a> T::ScalarRef<'a>: Number + AsPrimitive<N::Scalar>,
T: Number + AsPrimitive<TSum>,
TSum: Number + std::ops::AddAssign,
{
// use temp variable to hint the compiler to unroll the loop
let mut sum = N::Scalar::default();
for value in T::iter_column(&other) {
sum += value.as_();
match validity {
Some(v) if v.unset_bits() > 0 => {
let mut sum = TSum::default();
inner.iter().zip(v.iter()).for_each(|(t, b)| {
if b {
sum += t.as_();
}
});

sum
}
_ => {
let mut sum = TSum::default();
inner.iter().for_each(|t| {
sum += t.as_();
});

sum
}
}
sum
}

impl<T, N> UnaryState<T, N> for NumberSumState<N>
Expand All @@ -117,9 +130,12 @@ where
fn add_batch(
&mut self,
other: T::Column,
validity: Option<&Bitmap>,
_function_data: Option<&dyn FunctionData>,
) -> Result<()> {
self.value += sum_batch::<T, N>(other);
let col = T::upcast_column(other);
let buffer = NumberType::<T::Scalar>::try_downcast_column(&col).unwrap();
self.value += sum_batch::<T::Scalar, N::Scalar>(buffer, validity);
Ok(())
}

Expand Down
30 changes: 16 additions & 14 deletions src/query/functions/src/aggregates/aggregate_unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,22 @@ where
fn add_batch(
&mut self,
other: T::Column,
validity: Option<&Bitmap>,
function_data: Option<&dyn FunctionData>,
) -> Result<()> {
for value in T::iter_column(&other) {
self.add(value, function_data)?;
match validity {
Some(validity) => {
for (data, valid) in T::iter_column(&other).zip(validity.iter()) {
if valid {
self.add(data, function_data)?;
}
}
}
None => {
for value in T::iter_column(&other) {
self.add(value, function_data)?;
}
}
}
Ok(())
}
Expand Down Expand Up @@ -206,18 +218,8 @@ where
) -> Result<()> {
let column = T::try_downcast_column(&columns[0]).unwrap();
let state: &mut S = place.get::<S>();
match validity {
Some(bitmap) if bitmap.unset_bits() > 0 => {
let column_iter = T::iter_column(&column);
for (value, is_valid) in column_iter.zip(bitmap.iter()) {
if is_valid {
state.add(value, self.function_data.as_deref())?;
}
}
Ok(())
}
_ => state.add_batch(column, self.function_data.as_deref()),
}

state.add_batch(column, validity, self.function_data.as_deref())
}

fn accumulate_row(&self, place: StateAddr, columns: InputColumns, row: usize) -> Result<()> {
Expand Down
Loading

0 comments on commit d249097

Please sign in to comment.