Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
Signed-off-by: coldWater <[email protected]>
  • Loading branch information
forsaken628 committed Nov 5, 2024
1 parent 606da80 commit cde5b15
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,41 @@ impl WindowPartitionTopNExchange {
impl Exchange for WindowPartitionTopNExchange {
const NAME: &'static str = "WindowTopN";
fn partition(&self, block: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
let rows = block.num_rows();
let partition_permutation = self.partition_permutation(&block);

// Partition the data blocks to different processors.
let mut output_data_blocks = vec![vec![]; n];
let mut buf = None;
for (partition_id, indices) in partition_permutation.into_iter().enumerate() {
output_data_blocks[partition_id % n]
.push((partition_id, block.take(&indices, &mut buf)?));
}

// Union data blocks for each processor.
Ok(output_data_blocks
.into_iter()
.map(WindowPartitionMeta::create)
.map(DataBlock::empty_with_meta)
.collect())
}
}

impl WindowPartitionTopNExchange {
fn partition_permutation(&self, block: &DataBlock) -> Vec<Vec<u32>> {
let rows = block.num_rows();
let mut sort_compare = SortCompareEquality::new(self.sort_desc.to_vec(), rows);

for &offset in &self.partition_indices {
let array = block.get_by_offset(offset).value.clone();
sort_compare.visit_value(array)?;
sort_compare.visit_value(array).unwrap();
sort_compare.increment_column_index();
}

let partition_equality = sort_compare.equality_index().to_vec();

for desc in self.sort_desc.iter().skip(self.partition_indices.len()) {
let array = block.get_by_offset(desc.offset).value.clone();
sort_compare.visit_value(array)?;
sort_compare.visit_value(array).unwrap();
sort_compare.increment_column_index();
}

Expand All @@ -101,50 +121,17 @@ impl Exchange for WindowPartitionTopNExchange {
let mut hashes = vec![0u64; rows];
for (i, &offset) in self.partition_indices.iter().enumerate() {
let entry = block.get_by_offset(offset);
group_hash_value_spread(&hash_indices, entry.value.to_owned(), i == 0, &mut hashes)?;
}

let partition_permutation = self.partition_permutation(
rows,
&permutation,
&hashes,
&partition_equality,
&full_equality,
);

// Partition the data blocks to different processors.
let mut output_data_blocks = vec![vec![]; n];
let mut buf = None;
for (partition_id, indices) in partition_permutation.into_iter().enumerate() {
output_data_blocks[partition_id % n]
.push((partition_id, block.take(&indices, &mut buf)?));
group_hash_value_spread(&hash_indices, entry.value.to_owned(), i == 0, &mut hashes)
.unwrap();
}

// Union data blocks for each processor.
Ok(output_data_blocks
.into_iter()
.map(WindowPartitionMeta::create)
.map(DataBlock::empty_with_meta)
.collect())
}
}

impl WindowPartitionTopNExchange {
fn partition_permutation(
&self,
rows: usize,
permutation: &[u32],
hashes: &[u64],
partition_equality: &[u8],
full_equality: &[u8],
) -> Vec<Vec<u32>> {
let mut partition_permutation = vec![Vec::new(); self.num_partitions as usize];

let mut start = 0;
let mut cur = 0;
while cur < rows {
let partition =
&mut partition_permutation[(hashes[start] % self.num_partitions) as usize];
let partition = &mut partition_permutation
[(hashes[permutation[start] as usize] % self.num_partitions) as usize];
partition.push(permutation[start]);

let mut rank = 0; // this first value is rank 0
Expand Down Expand Up @@ -184,14 +171,22 @@ impl WindowPartitionTopNExchange {

#[cfg(test)]
mod tests {
use databend_common_expression::types::ArgType;
use databend_common_expression::types::Int32Type;
use databend_common_expression::types::StringType;
use databend_common_expression::BlockEntry;
use databend_common_expression::FromData;
use databend_common_expression::Scalar;
use databend_common_expression::Value;

use super::*;

#[test]
fn test_row_number() {
fn test_row_number() -> Result<()> {
let p = WindowPartitionTopNExchange::create(
vec![0, 1],
vec![1, 2],
vec![SortColumnDescription {
offset: 2,
offset: 0,
asc: true,
nulls_first: false,
}],
Expand All @@ -200,37 +195,63 @@ mod tests {
8,
);

let permutation: Vec<u32> = vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0];
let hashes: Vec<u64> = vec![5, 5, 5, 5, 5, 6, 7, 7, 7, 9, 10];
let partition_equality = vec![1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 0];
let full_equality = vec![1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
let got = p.partition_permutation(
permutation.len(),
&permutation,
&hashes,
&partition_equality,
&full_equality,
let data = DataBlock::new(
vec![
BlockEntry::new(
Int32Type::data_type(),
Value::Column(Int32Type::from_data(vec![3, 1, 2, 2, 4, 3, 7, 0, 3])),
),
BlockEntry::new(
StringType::data_type(),
Value::Scalar(Scalar::String("a".to_string())),
),
BlockEntry::new(
Int32Type::data_type(),
Value::Column(Int32Type::from_data(vec![3, 1, 3, 2, 2, 3, 4, 3, 3])),
),
BlockEntry::new(
StringType::data_type(),
Value::Column(StringType::from_data(vec![
"a", "b", "c", "d", "e", "f", "g", "h", "i",
])),
),
],
9,
);
data.check_valid()?;

let got = p.partition_permutation(&data);

let want = vec![
vec![],
vec![1],
vec![0],
vec![],
vec![3, 4],
vec![],
vec![6],
vec![],
vec![10, 9, 8],
vec![5],
vec![4, 3, 2],
vec![7, 2, 0],
];
assert_eq!(&want, &got)
if got != want {
let got = got
.iter()
.map(|indices| data.take(indices, &mut None).unwrap())
.collect::<Vec<_>>();
for x in got {
println!("{}", x.to_string())
}
}
assert_eq!(&want, &got);

Ok(())
}

#[test]
fn test_rank() {
fn test_rank() -> Result<()> {
let p = WindowPartitionTopNExchange::create(
vec![0, 1],
vec![1],
vec![SortColumnDescription {
offset: 2,
offset: 0,
asc: true,
nulls_first: false,
}],
Expand All @@ -239,37 +260,50 @@ mod tests {
8,
);

let permutation: Vec<u32> = vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0];
let hashes: Vec<u64> = vec![5, 5, 5, 5, 5, 6, 7, 7, 7, 9, 10];
let partition_equality = vec![1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 0];
let full_equality = vec![1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0];
let got = p.partition_permutation(
permutation.len(),
&permutation,
&hashes,
&partition_equality,
&full_equality,
let data = DataBlock::new(
vec![
BlockEntry::new(
Int32Type::data_type(),
Value::Column(Int32Type::from_data(vec![7, 7, 7, 6, 5, 5, 4, 1, 3, 1, 1])),
),
BlockEntry::new(
Int32Type::data_type(),
Value::Column(Int32Type::from_data(vec![7, 6, 5, 5, 5, 4, 3, 3, 2, 3, 3])),
),
],
11,
);
data.check_valid()?;

let got = p.partition_permutation(&data);

let want = vec![
vec![],
vec![1],
vec![0],
vec![8, 0],
vec![],
vec![5, 4, 3, 2],
vec![],
vec![7, 9, 10],
vec![],
vec![10, 9, 8, 7],
vec![5],
vec![4, 3, 2],
];
assert_eq!(&want, &got)
// let got2 = got
// .iter()
// .map(|indices| data.take(indices, &mut None).unwrap())
// .collect::<Vec<_>>();
// for x in got2 {
// println!("{}", x.to_string())
// }
assert_eq!(&want, &got);
Ok(())
}

#[test]
fn test_dense_rank() {
fn test_dense_rank() -> Result<()> {
let p = WindowPartitionTopNExchange::create(
vec![0, 1],
vec![1],
vec![SortColumnDescription {
offset: 2,
offset: 0,
asc: true,
nulls_first: false,
}],
Expand All @@ -278,28 +312,41 @@ mod tests {
8,
);

let permutation: Vec<u32> = vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0];
let hashes: Vec<u64> = vec![5, 5, 5, 5, 5, 6, 7, 7, 7, 9, 10];
let partition_equality = vec![1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 0];
let full_equality = vec![1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0];
let got = p.partition_permutation(
permutation.len(),
&permutation,
&hashes,
&partition_equality,
&full_equality,
let data = DataBlock::new(
vec![
BlockEntry::new(
Int32Type::data_type(),
Value::Column(Int32Type::from_data(vec![5, 2, 3, 3, 2, 2, 1, 1, 1, 1, 1])),
),
BlockEntry::new(
Int32Type::data_type(),
Value::Column(Int32Type::from_data(vec![2, 2, 4, 3, 2, 2, 5, 4, 3, 3, 3])),
),
],
11,
);
data.check_valid()?;

let got = p.partition_permutation(&data);

let want = vec![
vec![],
vec![1],
vec![0],
vec![],
vec![1, 4, 5, 0],
vec![],
vec![7, 2, 6],
vec![],
vec![8, 9, 10, 3],
vec![],
vec![10, 9, 8, 7, 6],
vec![5],
vec![4, 3, 2],
];
assert_eq!(&want, &got)
// let got2 = got
// .iter()
// .map(|indices| data.take(indices, &mut None).unwrap())
// .collect::<Vec<_>>();
// for x in got2 {
// println!("{}", x.to_string())
// }
assert_eq!(&want, &got);
Ok(())
}
}
16 changes: 8 additions & 8 deletions tests/sqllogictests/suites/tpcds/spill.test
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,25 @@ statement ok
drop table if exists t;

statement ok
set max_block_size = 65536;
unset max_block_size;

statement ok
set join_spilling_memory_ratio = 60;
unset join_spilling_memory_ratio;

statement ok
set join_spilling_bytes_threshold_per_proc = 0;
unset join_spilling_bytes_threshold_per_proc;

statement ok
set join_spilling_buffer_threshold_per_proc_mb = 512;
unset join_spilling_buffer_threshold_per_proc_mb;

statement ok
set sort_spilling_memory_ratio = 60;
unset sort_spilling_memory_ratio;

statement ok
set sort_spilling_bytes_threshold_per_proc = 0;
unset sort_spilling_bytes_threshold_per_proc;

statement ok
set window_partition_spilling_memory_ratio = 60;
unset window_partition_spilling_memory_ratio;

statement ok
set window_partition_spilling_bytes_threshold_per_proc = 0;
unset window_partition_spilling_bytes_threshold_per_proc;

0 comments on commit cde5b15

Please sign in to comment.