diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_partial_top_n_exchange.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_partial_top_n_exchange.rs index dba42dc41eee0..9753354dd8076 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_partial_top_n_exchange.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_partial_top_n_exchange.rs @@ -68,13 +68,33 @@ impl WindowPartitionTopNExchange { impl Exchange for WindowPartitionTopNExchange { const NAME: &'static str = "WindowTopN"; fn partition(&self, block: DataBlock, n: usize) -> Result> { - let rows = block.num_rows(); + let partition_permutation = self.partition_permutation(&block); + + // Partition the data blocks to different processors. + let mut output_data_blocks = vec![vec![]; n]; + let mut buf = None; + for (partition_id, indices) in partition_permutation.into_iter().enumerate() { + output_data_blocks[partition_id % n] + .push((partition_id, block.take(&indices, &mut buf)?)); + } + // Union data blocks for each processor. + Ok(output_data_blocks + .into_iter() + .map(WindowPartitionMeta::create) + .map(DataBlock::empty_with_meta) + .collect()) + } +} + +impl WindowPartitionTopNExchange { + fn partition_permutation(&self, block: &DataBlock) -> Vec> { + let rows = block.num_rows(); let mut sort_compare = SortCompareEquality::new(self.sort_desc.to_vec(), rows); for &offset in &self.partition_indices { let array = block.get_by_offset(offset).value.clone(); - sort_compare.visit_value(array)?; + sort_compare.visit_value(array).unwrap(); sort_compare.increment_column_index(); } @@ -82,7 +102,7 @@ impl Exchange for WindowPartitionTopNExchange { for desc in self.sort_desc.iter().skip(self.partition_indices.len()) { let array = block.get_by_offset(desc.offset).value.clone(); - sort_compare.visit_value(array)?; + sort_compare.visit_value(array).unwrap(); sort_compare.increment_column_index(); } @@ -101,50 +121,17 @@ impl Exchange for WindowPartitionTopNExchange { let mut hashes = vec![0u64; rows]; for (i, &offset) in self.partition_indices.iter().enumerate() { let entry = block.get_by_offset(offset); - group_hash_value_spread(&hash_indices, entry.value.to_owned(), i == 0, &mut hashes)?; - } - - let partition_permutation = self.partition_permutation( - rows, - &permutation, - &hashes, - &partition_equality, - &full_equality, - ); - - // Partition the data blocks to different processors. - let mut output_data_blocks = vec![vec![]; n]; - let mut buf = None; - for (partition_id, indices) in partition_permutation.into_iter().enumerate() { - output_data_blocks[partition_id % n] - .push((partition_id, block.take(&indices, &mut buf)?)); + group_hash_value_spread(&hash_indices, entry.value.to_owned(), i == 0, &mut hashes) + .unwrap(); } - // Union data blocks for each processor. - Ok(output_data_blocks - .into_iter() - .map(WindowPartitionMeta::create) - .map(DataBlock::empty_with_meta) - .collect()) - } -} - -impl WindowPartitionTopNExchange { - fn partition_permutation( - &self, - rows: usize, - permutation: &[u32], - hashes: &[u64], - partition_equality: &[u8], - full_equality: &[u8], - ) -> Vec> { let mut partition_permutation = vec![Vec::new(); self.num_partitions as usize]; let mut start = 0; let mut cur = 0; while cur < rows { - let partition = - &mut partition_permutation[(hashes[start] % self.num_partitions) as usize]; + let partition = &mut partition_permutation + [(hashes[permutation[start] as usize] % self.num_partitions) as usize]; partition.push(permutation[start]); let mut rank = 0; // this first value is rank 0 @@ -184,14 +171,22 @@ impl WindowPartitionTopNExchange { #[cfg(test)] mod tests { + use databend_common_expression::types::ArgType; + use databend_common_expression::types::Int32Type; + use databend_common_expression::types::StringType; + use databend_common_expression::BlockEntry; + use databend_common_expression::FromData; + use databend_common_expression::Scalar; + use databend_common_expression::Value; + use super::*; #[test] - fn test_row_number() { + fn test_row_number() -> Result<()> { let p = WindowPartitionTopNExchange::create( - vec![0, 1], + vec![1, 2], vec![SortColumnDescription { - offset: 2, + offset: 0, asc: true, nulls_first: false, }], @@ -200,37 +195,63 @@ mod tests { 8, ); - let permutation: Vec = vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]; - let hashes: Vec = vec![5, 5, 5, 5, 5, 6, 7, 7, 7, 9, 10]; - let partition_equality = vec![1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 0]; - let full_equality = vec![1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; - let got = p.partition_permutation( - permutation.len(), - &permutation, - &hashes, - &partition_equality, - &full_equality, + let data = DataBlock::new( + vec![ + BlockEntry::new( + Int32Type::data_type(), + Value::Column(Int32Type::from_data(vec![3, 1, 2, 2, 4, 3, 7, 0, 3])), + ), + BlockEntry::new( + StringType::data_type(), + Value::Scalar(Scalar::String("a".to_string())), + ), + BlockEntry::new( + Int32Type::data_type(), + Value::Column(Int32Type::from_data(vec![3, 1, 3, 2, 2, 3, 4, 3, 3])), + ), + BlockEntry::new( + StringType::data_type(), + Value::Column(StringType::from_data(vec![ + "a", "b", "c", "d", "e", "f", "g", "h", "i", + ])), + ), + ], + 9, ); + data.check_valid()?; + + let got = p.partition_permutation(&data); let want = vec![ vec![], vec![1], - vec![0], vec![], + vec![3, 4], + vec![], + vec![6], vec![], - vec![10, 9, 8], - vec![5], - vec![4, 3, 2], + vec![7, 2, 0], ]; - assert_eq!(&want, &got) + if got != want { + let got = got + .iter() + .map(|indices| data.take(indices, &mut None).unwrap()) + .collect::>(); + for x in got { + println!("{}", x.to_string()) + } + } + assert_eq!(&want, &got); + + Ok(()) } #[test] - fn test_rank() { + fn test_rank() -> Result<()> { let p = WindowPartitionTopNExchange::create( - vec![0, 1], + vec![1], vec![SortColumnDescription { - offset: 2, + offset: 0, asc: true, nulls_first: false, }], @@ -239,37 +260,50 @@ mod tests { 8, ); - let permutation: Vec = vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]; - let hashes: Vec = vec![5, 5, 5, 5, 5, 6, 7, 7, 7, 9, 10]; - let partition_equality = vec![1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 0]; - let full_equality = vec![1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0]; - let got = p.partition_permutation( - permutation.len(), - &permutation, - &hashes, - &partition_equality, - &full_equality, + let data = DataBlock::new( + vec![ + BlockEntry::new( + Int32Type::data_type(), + Value::Column(Int32Type::from_data(vec![7, 7, 7, 6, 5, 5, 4, 1, 3, 1, 1])), + ), + BlockEntry::new( + Int32Type::data_type(), + Value::Column(Int32Type::from_data(vec![7, 6, 5, 5, 5, 4, 3, 3, 2, 3, 3])), + ), + ], + 11, ); + data.check_valid()?; + + let got = p.partition_permutation(&data); let want = vec![ vec![], vec![1], - vec![0], + vec![8, 0], vec![], + vec![5, 4, 3, 2], + vec![], + vec![7, 9, 10], vec![], - vec![10, 9, 8, 7], - vec![5], - vec![4, 3, 2], ]; - assert_eq!(&want, &got) + // let got2 = got + // .iter() + // .map(|indices| data.take(indices, &mut None).unwrap()) + // .collect::>(); + // for x in got2 { + // println!("{}", x.to_string()) + // } + assert_eq!(&want, &got); + Ok(()) } #[test] - fn test_dense_rank() { + fn test_dense_rank() -> Result<()> { let p = WindowPartitionTopNExchange::create( - vec![0, 1], + vec![1], vec![SortColumnDescription { - offset: 2, + offset: 0, asc: true, nulls_first: false, }], @@ -278,28 +312,41 @@ mod tests { 8, ); - let permutation: Vec = vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]; - let hashes: Vec = vec![5, 5, 5, 5, 5, 6, 7, 7, 7, 9, 10]; - let partition_equality = vec![1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 0]; - let full_equality = vec![1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0]; - let got = p.partition_permutation( - permutation.len(), - &permutation, - &hashes, - &partition_equality, - &full_equality, + let data = DataBlock::new( + vec![ + BlockEntry::new( + Int32Type::data_type(), + Value::Column(Int32Type::from_data(vec![5, 2, 3, 3, 2, 2, 1, 1, 1, 1, 1])), + ), + BlockEntry::new( + Int32Type::data_type(), + Value::Column(Int32Type::from_data(vec![2, 2, 4, 3, 2, 2, 5, 4, 3, 3, 3])), + ), + ], + 11, ); + data.check_valid()?; + + let got = p.partition_permutation(&data); let want = vec![ vec![], - vec![1], - vec![0], vec![], + vec![1, 4, 5, 0], + vec![], + vec![7, 2, 6], + vec![], + vec![8, 9, 10, 3], vec![], - vec![10, 9, 8, 7, 6], - vec![5], - vec![4, 3, 2], ]; - assert_eq!(&want, &got) + // let got2 = got + // .iter() + // .map(|indices| data.take(indices, &mut None).unwrap()) + // .collect::>(); + // for x in got2 { + // println!("{}", x.to_string()) + // } + assert_eq!(&want, &got); + Ok(()) } } diff --git a/tests/sqllogictests/suites/tpcds/spill.test b/tests/sqllogictests/suites/tpcds/spill.test index 366c85121b6d5..de6c46c8f8cfa 100644 --- a/tests/sqllogictests/suites/tpcds/spill.test +++ b/tests/sqllogictests/suites/tpcds/spill.test @@ -47,25 +47,25 @@ statement ok drop table if exists t; statement ok -set max_block_size = 65536; +unset max_block_size; statement ok -set join_spilling_memory_ratio = 60; +unset join_spilling_memory_ratio; statement ok -set join_spilling_bytes_threshold_per_proc = 0; +unset join_spilling_bytes_threshold_per_proc; statement ok -set join_spilling_buffer_threshold_per_proc_mb = 512; +unset join_spilling_buffer_threshold_per_proc_mb; statement ok -set sort_spilling_memory_ratio = 60; +unset sort_spilling_memory_ratio; statement ok -set sort_spilling_bytes_threshold_per_proc = 0; +unset sort_spilling_bytes_threshold_per_proc; statement ok -set window_partition_spilling_memory_ratio = 60; +unset window_partition_spilling_memory_ratio; statement ok -set window_partition_spilling_bytes_threshold_per_proc = 0; \ No newline at end of file +unset window_partition_spilling_bytes_threshold_per_proc; \ No newline at end of file