Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): TopN window operator #16726

Merged
merged 17 commits into from
Nov 13, 2024
361 changes: 350 additions & 11 deletions src/query/expression/src/aggregate/group_hash.rs

Large diffs are not rendered by default.

23 changes: 21 additions & 2 deletions src/query/expression/src/kernels/sort_compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct SortCompare {
current_column_index: usize,
validity: Option<Bitmap>,
equality_index: Vec<u8>,
force_equality: bool,
}

macro_rules! do_sorter {
Expand Down Expand Up @@ -112,12 +113,25 @@ impl SortCompare {
current_column_index: 0,
validity: None,
equality_index,
force_equality: matches!(limit, LimitType::LimitRank(_)),
}
}

pub fn with_force_equality(ordering_descs: Vec<SortColumnDescription>, rows: usize) -> Self {
Self {
rows,
limit: LimitType::None,
permutation: (0..rows as u32).collect(),
ordering_descs,
current_column_index: 0,
validity: None,
equality_index: vec![1; rows as _],
force_equality: true,
}
}

fn need_update_equality_index(&self) -> bool {
self.current_column_index != self.ordering_descs.len() - 1
|| matches!(self.limit, LimitType::LimitRank(_))
self.force_equality || self.current_column_index != self.ordering_descs.len() - 1
}

pub fn increment_column_index(&mut self) {
Expand Down Expand Up @@ -254,6 +268,11 @@ impl SortCompare {
}
}
}

pub fn equality_index(&self) -> &[u8] {
debug_assert!(self.force_equality);
&self.equality_index
}
}

impl ValueVisitor for SortCompare {
Expand Down
1 change: 1 addition & 0 deletions src/query/expression/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub use self::empty_map::EmptyMapType;
pub use self::generic::GenericType;
pub use self::geography::GeographyColumn;
pub use self::geography::GeographyType;
pub use self::geometry::GeometryType;
pub use self::map::MapType;
pub use self::null::NullType;
pub use self::nullable::NullableColumn;
Expand Down
6 changes: 6 additions & 0 deletions src/query/expression/src/types/geography.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ impl<'a> GeographyRef<'a> {
}
}

impl<'a> AsRef<[u8]> for GeographyRef<'a> {
fn as_ref(&self) -> &[u8] {
self.0
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct GeographyType;

Expand Down
24 changes: 14 additions & 10 deletions src/query/expression/src/utils/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,25 @@ pub trait ValueVisitor {
self.visit_typed_column::<EmptyMapType>(len)
}

fn visit_any_number(&mut self, column: NumberColumn) -> Result<()> {
with_number_type!(|NUM_TYPE| match column {
NumberColumn::NUM_TYPE(b) => self.visit_number(b),
})
}

fn visit_number<T: Number>(
&mut self,
column: <NumberType<T> as ValueType>::Column,
) -> Result<()> {
self.visit_typed_column::<NumberType<T>>(column)
}

fn visit_any_decimal(&mut self, column: DecimalColumn) -> Result<()> {
with_decimal_type!(|DECIMAL_TYPE| match column {
DecimalColumn::DECIMAL_TYPE(b, size) => self.visit_decimal(b, size),
})
}

fn visit_decimal<T: Decimal>(&mut self, column: Buffer<T>, _size: DecimalSize) -> Result<()> {
self.visit_typed_column::<DecimalType<T>>(column)
}
Expand Down Expand Up @@ -113,16 +125,8 @@ pub trait ValueVisitor {
Column::Null { len } => self.visit_null(len),
Column::EmptyArray { len } => self.visit_empty_array(len),
Column::EmptyMap { len } => self.visit_empty_map(len),
Column::Number(column) => {
with_number_type!(|NUM_TYPE| match column {
NumberColumn::NUM_TYPE(b) => self.visit_number(b),
})
}
Column::Decimal(column) => {
with_decimal_type!(|DECIMAL_TYPE| match column {
DecimalColumn::DECIMAL_TYPE(b, size) => self.visit_decimal(b, size),
})
}
Column::Number(column) => self.visit_any_number(column),
Column::Decimal(column) => self.visit_any_decimal(column),
Column::Boolean(bitmap) => self.visit_boolean(bitmap),
Column::Binary(column) => self.visit_binary(column),
Column::String(column) => self.visit_string(column),
Expand Down
5 changes: 3 additions & 2 deletions src/query/pipeline/core/src/processors/shuffle_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub enum MultiwayStrategy {
}

pub trait Exchange: Send + Sync + 'static {
const NAME: &'static str;
const STRATEGY: MultiwayStrategy = MultiwayStrategy::Random;

fn partition(&self, data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>>;
Expand Down Expand Up @@ -185,7 +186,7 @@ impl<T: Exchange> PartitionProcessor<T> {

impl<T: Exchange> Processor for PartitionProcessor<T> {
fn name(&self) -> String {
String::from("ShufflePartition")
format!("ShufflePartition({})", T::NAME)
}

fn as_any(&mut self) -> &mut dyn Any {
Expand Down Expand Up @@ -287,7 +288,7 @@ impl<T: Exchange> MergePartitionProcessor<T> {

impl<T: Exchange> Processor for MergePartitionProcessor<T> {
fn name(&self) -> String {
String::from("ShuffleMergePartition")
format!("ShuffleMergePartition({})", T::NAME)
}

fn as_any(&mut self) -> &mut dyn Any {
Expand Down
22 changes: 18 additions & 4 deletions src/query/service/src/pipelines/builders/builder_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::pipelines::processors::transforms::TransformWindow;
use crate::pipelines::processors::transforms::TransformWindowPartitionCollect;
use crate::pipelines::processors::transforms::WindowFunctionInfo;
use crate::pipelines::processors::transforms::WindowPartitionExchange;
use crate::pipelines::processors::transforms::WindowPartitionTopNExchange;
use crate::pipelines::processors::transforms::WindowSortDesc;
use crate::pipelines::processors::transforms::WindowSpillSettings;
use crate::pipelines::PipelineBuilder;
Expand Down Expand Up @@ -169,10 +170,23 @@ impl PipelineBuilder {
})
.collect::<Result<Vec<_>>>()?;

self.main_pipeline.exchange(
num_processors,
WindowPartitionExchange::create(partition_by.clone(), num_partitions),
);
if let Some(top_n) = &window_partition.top_n {
self.main_pipeline.exchange(
num_processors,
WindowPartitionTopNExchange::create(
partition_by.clone(),
sort_desc.clone(),
top_n.top,
top_n.func,
num_partitions as u64,
),
)
} else {
self.main_pipeline.exchange(
num_processors,
WindowPartitionExchange::create(partition_by.clone(), num_partitions),
);
}

let disk_bytes_limit = settings.get_window_partition_spilling_to_disk_bytes_limit()?;
let temp_dir_manager = TempDirManager::instance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ mod transform_window_partition_collect;
mod window_partition_buffer;
mod window_partition_exchange;
mod window_partition_meta;
mod window_partition_partial_top_n_exchange;

pub use transform_window_partition_collect::*;
pub use window_partition_buffer::*;
pub use window_partition_exchange::*;
pub use window_partition_meta::*;
pub use window_partition_partial_top_n_exchange::*;
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::group_hash_columns_slice;
use databend_common_expression::ColumnBuilder;
use databend_common_expression::group_hash_columns;
use databend_common_expression::DataBlock;
use databend_common_expression::Value;
use databend_common_expression::InputColumns;
use databend_common_pipeline_core::processors::Exchange;

use super::WindowPartitionMeta;
Expand All @@ -38,27 +37,17 @@ impl WindowPartitionExchange {
}

impl Exchange for WindowPartitionExchange {
const NAME: &'static str = "Window";
fn partition(&self, data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
let num_rows = data_block.num_rows();

// Extract the columns used for hash computation.
let hash_cols = self
.hash_keys
.iter()
.map(|&offset| {
let entry = data_block.get_by_offset(offset);
match &entry.value {
Value::Scalar(s) => {
ColumnBuilder::repeat(&s.as_ref(), num_rows, &entry.data_type).build()
}
Value::Column(c) => c.clone(),
}
})
.collect::<Vec<_>>();
let data_block = data_block.consume_convert_to_full();
let hash_cols = InputColumns::new_block_proxy(&self.hash_keys, &data_block);

// Compute the hash value for each row.
let mut hashes = vec![0u64; num_rows];
group_hash_columns_slice(&hash_cols, &mut hashes);
group_hash_columns(hash_cols, &mut hashes);

// Scatter the data block to different partitions.
let indices = hashes
Expand Down
Loading
Loading