Skip to content

feat(query): TopN window operator #16726

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Nov 13, 2024
379 changes: 368 additions & 11 deletions src/query/expression/src/aggregate/group_hash.rs

Large diffs are not rendered by default.

64 changes: 62 additions & 2 deletions src/query/expression/src/kernels/sort_compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct SortCompare {
current_column_index: usize,
validity: Option<Bitmap>,
equality_index: Vec<u8>,
force_equality: bool,
}

macro_rules! do_sorter {
Expand Down Expand Up @@ -111,12 +112,12 @@ impl SortCompare {
current_column_index: 0,
validity: None,
equality_index,
force_equality: matches!(limit, LimitType::LimitRank(_)),
}
}

fn need_update_equality_index(&self) -> bool {
self.current_column_index != self.ordering_descs.len() - 1
|| matches!(self.limit, LimitType::LimitRank(_))
self.force_equality || self.current_column_index != self.ordering_descs.len() - 1
}

pub fn increment_column_index(&mut self) {
Expand Down Expand Up @@ -294,6 +295,65 @@ impl ValueVisitor for SortCompare {
}
}

pub struct SortCompareEquality(SortCompare);

impl SortCompareEquality {
pub fn new(ordering_descs: Vec<SortColumnDescription>, rows: usize) -> Self {
Self(SortCompare {
rows,
limit: LimitType::None,
permutation: (0..rows as u32).collect(),
ordering_descs,
current_column_index: 0,
validity: None,
equality_index: vec![1; rows as _],
force_equality: true,
})
}

pub fn increment_column_index(&mut self) {
self.0.increment_column_index()
}

pub fn take_permutation(self) -> Vec<u32> {
self.0.take_permutation()
}

pub fn equality_index(&self) -> &[u8] {
&self.0.equality_index
}

pub fn first_change(index: &[u8]) -> Option<usize> {
memchr(0, index)
}
}

impl ValueVisitor for SortCompareEquality {
fn visit_scalar(&mut self, _scalar: crate::Scalar) -> Result<()> {
Ok(())
}

fn visit_number<T: Number>(&mut self, column: Buffer<T>) -> Result<()> {
self.0.visit_number(column)
}

fn visit_timestamp(&mut self, buffer: Buffer<i64>) -> Result<()> {
self.0.visit_timestamp(buffer)
}

fn visit_date(&mut self, buffer: Buffer<i32>) -> Result<()> {
self.0.visit_date(buffer)
}

fn visit_typed_column<T: ValueType>(&mut self, col: T::Column) -> Result<()> {
self.0.visit_typed_column::<T>(col)
}

fn visit_nullable(&mut self, column: Box<NullableColumn<AnyType>>) -> Result<()> {
self.0.visit_nullable(column)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/query/expression/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub use self::empty_map::EmptyMapType;
pub use self::generic::GenericType;
pub use self::geography::GeographyColumn;
pub use self::geography::GeographyType;
pub use self::geometry::GeometryType;
pub use self::map::MapType;
pub use self::null::NullType;
pub use self::nullable::NullableColumn;
Expand Down
6 changes: 6 additions & 0 deletions src/query/expression/src/types/geography.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ impl<'a> BinaryLike<'a> for GeographyRef<'a> {
}
}

impl<'a> AsRef<[u8]> for GeographyRef<'a> {
fn as_ref(&self) -> &[u8] {
self.0
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct GeographyType;

Expand Down
5 changes: 3 additions & 2 deletions src/query/pipeline/core/src/processors/shuffle_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub enum MultiwayStrategy {
}

pub trait Exchange: Send + Sync + 'static {
const NAME: &'static str;
const STRATEGY: MultiwayStrategy = MultiwayStrategy::Random;

fn partition(&self, data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>>;
Expand Down Expand Up @@ -185,7 +186,7 @@ impl<T: Exchange> PartitionProcessor<T> {

impl<T: Exchange> Processor for PartitionProcessor<T> {
fn name(&self) -> String {
String::from("ShufflePartition")
format!("ShufflePartition({})", T::NAME)
}

fn as_any(&mut self) -> &mut dyn Any {
Expand Down Expand Up @@ -287,7 +288,7 @@ impl<T: Exchange> MergePartitionProcessor<T> {

impl<T: Exchange> Processor for MergePartitionProcessor<T> {
fn name(&self) -> String {
String::from("ShuffleMergePartition")
format!("ShuffleMergePartition({})", T::NAME)
}

fn as_any(&mut self) -> &mut dyn Any {
Expand Down
22 changes: 18 additions & 4 deletions src/query/service/src/pipelines/builders/builder_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::pipelines::processors::transforms::TransformWindow;
use crate::pipelines::processors::transforms::TransformWindowPartitionCollect;
use crate::pipelines::processors::transforms::WindowFunctionInfo;
use crate::pipelines::processors::transforms::WindowPartitionExchange;
use crate::pipelines::processors::transforms::WindowPartitionTopNExchange;
use crate::pipelines::processors::transforms::WindowSortDesc;
use crate::pipelines::processors::transforms::WindowSpillSettings;
use crate::pipelines::PipelineBuilder;
Expand Down Expand Up @@ -169,10 +170,23 @@ impl PipelineBuilder {
})
.collect::<Result<Vec<_>>>()?;

self.main_pipeline.exchange(
num_processors,
WindowPartitionExchange::create(partition_by.clone(), num_partitions),
);
if let Some(top_n) = &window_partition.top_n {
self.main_pipeline.exchange(
num_processors,
WindowPartitionTopNExchange::create(
partition_by.clone(),
sort_desc.clone(),
top_n.top,
top_n.func,
num_partitions as u64,
),
)
} else {
self.main_pipeline.exchange(
num_processors,
WindowPartitionExchange::create(partition_by.clone(), num_partitions),
);
}

let disk_bytes_limit = settings.get_window_partition_spilling_to_disk_bytes_limit()?;
let temp_dir_manager = TempDirManager::instance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ mod transform_window_partition_collect;
mod window_partition_buffer;
mod window_partition_exchange;
mod window_partition_meta;
mod window_partition_partial_top_n_exchange;

pub use transform_window_partition_collect::*;
pub use window_partition_buffer::*;
pub use window_partition_exchange::*;
pub use window_partition_meta::*;
pub use window_partition_partial_top_n_exchange::*;
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::group_hash_columns_slice;
use databend_common_expression::ColumnBuilder;
use databend_common_expression::group_hash_columns;
use databend_common_expression::DataBlock;
use databend_common_expression::Value;
use databend_common_expression::InputColumns;
use databend_common_pipeline_core::processors::Exchange;

use super::WindowPartitionMeta;
Expand All @@ -38,27 +37,17 @@ impl WindowPartitionExchange {
}

impl Exchange for WindowPartitionExchange {
const NAME: &'static str = "Window";
fn partition(&self, data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
let num_rows = data_block.num_rows();

// Extract the columns used for hash computation.
let hash_cols = self
.hash_keys
.iter()
.map(|&offset| {
let entry = data_block.get_by_offset(offset);
match &entry.value {
Value::Scalar(s) => {
ColumnBuilder::repeat(&s.as_ref(), num_rows, &entry.data_type).build()
}
Value::Column(c) => c.clone(),
}
})
.collect::<Vec<_>>();
let data_block = data_block.consume_convert_to_full();
let hash_cols = InputColumns::new_block_proxy(&self.hash_keys, &data_block);

// Compute the hash value for each row.
let mut hashes = vec![0u64; num_rows];
group_hash_columns_slice(&hash_cols, &mut hashes);
group_hash_columns(hash_cols, &mut hashes);

// Scatter the data block to different partitions.
let indices = hashes
Expand Down
Loading
Loading