Skip to content

chore: push down sort pipeline to cluster nodes. #13881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 5, 2023
16 changes: 3 additions & 13 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,19 +327,9 @@ impl DataBlock {
}

#[inline]
pub fn pop_columns(self, num: usize) -> Result<Self> {
let mut columns = self.columns.clone();
let len = columns.len();

for _ in 0..num.min(len) {
columns.pop().unwrap();
}

Ok(Self {
columns,
num_rows: self.num_rows,
meta: self.meta,
})
pub fn pop_columns(&mut self, num: usize) {
debug_assert!(num <= self.columns.len());
self.columns.truncate(self.columns.len() - num);
}

/// Resort the columns according to the schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub use transform_block_compact_for_copy::*;
pub use transform_blocking::*;
pub use transform_compact::*;
pub use transform_dummy::*;
pub use transform_multi_sort_merge::try_add_multi_sort_merge;
pub use transform_sort::*;
pub use transform_sort_merge::sort_merge;
pub use transform_sort_partial::*;
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Rows for StringColumn {

impl RowConverter<StringColumn> for CommonRowConverter {
fn create(
sort_columns_descriptions: Vec<SortColumnDescription>,
sort_columns_descriptions: &[SortColumnDescription],
output_schema: DataSchemaRef,
) -> Result<Self> {
let sort_fields = sort_columns_descriptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub trait RowConverter<T: Rows>
where Self: Sized
{
fn create(
sort_columns_descriptions: Vec<SortColumnDescription>,
sort_columns_descriptions: &[SortColumnDescription],
output_schema: DataSchemaRef,
) -> Result<Self>;
fn convert(&mut self, columns: &[BlockEntry], num_rows: usize) -> Result<T>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ where
T::Scalar: Ord,
{
fn create(
sort_columns_descriptions: Vec<SortColumnDescription>,
sort_columns_descriptions: &[SortColumnDescription],
_: DataSchemaRef,
) -> Result<Self> {
assert!(sort_columns_descriptions.len() == 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,21 @@ use common_pipeline_core::processors::ProcessorPtr;
use common_pipeline_core::Pipe;
use common_pipeline_core::PipeItem;
use common_pipeline_core::Pipeline;
use common_profile::SharedProcessorProfiles;

use super::sort::Cursor;
use super::sort::Rows;
use super::sort::SimpleRows;
use crate::processors::ProcessorProfileWrapper;

pub fn try_add_multi_sort_merge(
pipeline: &mut Pipeline,
input_schema: DataSchemaRef,
block_size: usize,
limit: Option<usize>,
sort_columns_descriptions: Vec<SortColumnDescription>,
prof_info: Option<(u32, SharedProcessorProfiles)>,
remove_order_col: bool,
) -> Result<()> {
if pipeline.is_empty() {
return Err(ErrorCode::Internal("Cannot resize empty pipe."));
Expand All @@ -71,8 +75,19 @@ pub fn try_add_multi_sort_merge(
block_size,
limit,
sort_columns_descriptions,
remove_order_col,
)?;

let processor = if let Some((plan_id, prof)) = &prof_info {
ProcessorPtr::create(ProcessorProfileWrapper::create(
processor,
*plan_id,
prof.clone(),
))
} else {
ProcessorPtr::create(processor)
};

pipeline.add_pipe(Pipe::create(inputs_port.len(), 1, vec![PipeItem::create(
processor,
inputs_port,
Expand All @@ -91,67 +106,71 @@ fn create_processor(
block_size: usize,
limit: Option<usize>,
sort_columns_descriptions: Vec<SortColumnDescription>,
) -> Result<ProcessorPtr> {
remove_order_col: bool,
) -> Result<Box<dyn Processor>> {
Ok(if sort_columns_descriptions.len() == 1 {
let sort_type = input_schema
.field(sort_columns_descriptions[0].offset)
.data_type();
match sort_type {
DataType::Number(num_ty) => with_number_mapped_type!(|NUM_TYPE| match num_ty {
NumberDataType::NUM_TYPE =>
ProcessorPtr::create(Box::new(MultiSortMergeProcessor::<
SimpleRows<NumberType<NUM_TYPE>>,
>::create(
inputs,
output,
block_size,
limit,
sort_columns_descriptions,
)?)),
NumberDataType::NUM_TYPE => Box::new(MultiSortMergeProcessor::<
SimpleRows<NumberType<NUM_TYPE>>,
>::create(
inputs,
output,
block_size,
limit,
sort_columns_descriptions,
remove_order_col,
)?),
}),
DataType::Date => ProcessorPtr::create(Box::new(MultiSortMergeProcessor::<
SimpleRows<DateType>,
>::create(
DataType::Date => Box::new(MultiSortMergeProcessor::<SimpleRows<DateType>>::create(
inputs,
output,
block_size,
limit,
sort_columns_descriptions,
)?)),
DataType::Timestamp => ProcessorPtr::create(Box::new(MultiSortMergeProcessor::<
SimpleRows<TimestampType>,
>::create(
inputs,
output,
block_size,
limit,
sort_columns_descriptions,
)?)),
DataType::String => ProcessorPtr::create(Box::new(MultiSortMergeProcessor::<
SimpleRows<StringType>,
>::create(
inputs,
output,
block_size,
limit,
sort_columns_descriptions,
)?)),
_ => ProcessorPtr::create(Box::new(MultiSortMergeProcessor::<StringColumn>::create(
remove_order_col,
)?),
DataType::Timestamp => Box::new(
MultiSortMergeProcessor::<SimpleRows<TimestampType>>::create(
inputs,
output,
block_size,
limit,
sort_columns_descriptions,
remove_order_col,
)?,
),
DataType::String => {
Box::new(MultiSortMergeProcessor::<SimpleRows<StringType>>::create(
inputs,
output,
block_size,
limit,
sort_columns_descriptions,
remove_order_col,
)?)
}
_ => Box::new(MultiSortMergeProcessor::<StringColumn>::create(
inputs,
output,
block_size,
limit,
sort_columns_descriptions,
)?)),
remove_order_col,
)?),
}
} else {
ProcessorPtr::create(Box::new(MultiSortMergeProcessor::<StringColumn>::create(
Box::new(MultiSortMergeProcessor::<StringColumn>::create(
inputs,
output,
block_size,
limit,
sort_columns_descriptions,
)?))
remove_order_col,
)?)
})
}

Expand All @@ -168,6 +187,11 @@ where R: Rows
// Parameters
block_size: usize,
limit: Option<usize>,
/// Indicate if we need to remove the order column.
/// In cluster sorting, the final processor on the cluster node will be [`MultiSortMergeProcessor`],
/// and the first processor on the coordinator node will be it, too.
/// Therefore, we don't need to remove the order column if it's a cluster node.
remove_order_col: bool,

/// For each input port, maintain a dequeue of data blocks.
blocks: Vec<VecDeque<DataBlock>>,
Expand Down Expand Up @@ -195,6 +219,7 @@ where R: Rows
block_size: usize,
limit: Option<usize>,
sort_desc: Vec<SortColumnDescription>,
remove_order_col: bool,
) -> Result<Self> {
let input_size = inputs.len();
Ok(Self {
Expand All @@ -203,6 +228,7 @@ where R: Rows
sort_desc,
block_size,
limit,
remove_order_col,
blocks: vec![VecDeque::with_capacity(2); input_size],
heap: BinaryHeap::with_capacity(input_size),
in_progress_rows: vec![],
Expand Down Expand Up @@ -484,7 +510,7 @@ where R: Rows + Send + 'static
if block.is_empty() {
continue;
}
let block = block.convert_to_full();
let mut block = block.convert_to_full();
let order_col = block
.columns()
.last()
Expand All @@ -497,7 +523,9 @@ where R: Rows + Send + 'static
ErrorCode::BadDataValueType("Order column type mismatched.")
})?;
// Remove the order column
let block = block.pop_columns(1)?;
if self.remove_order_col {
block.pop_columns(1);
}
let cursor = Cursor::new(input_index, rows);
self.heap.push(Reverse(cursor));
self.cursor_finished[input_index] = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,21 @@ pub fn build_full_sort_pipeline(
partial_block_size: usize,
final_block_size: usize,
prof_info: Option<(u32, SharedProcessorProfiles)>,
after_exchange: bool,
remove_order_col_at_last: bool,
) -> Result<()> {
// Partial sort
if limit.is_none() || !after_exchange {
// If the sort plan is after an exchange plan, the blocks are already partially sorted on other nodes.
pipeline.add_transform(|input, output| {
let transform =
TransformSortPartial::try_create(input, output, limit, sort_desc.clone())?;
if let Some((plan_id, prof)) = &prof_info {
Ok(ProcessorPtr::create(ProcessorProfileWrapper::create(
transform,
*plan_id,
prof.clone(),
)))
} else {
Ok(ProcessorPtr::create(transform))
}
})?;
}
pipeline.add_transform(|input, output| {
let transform = TransformSortPartial::try_create(input, output, limit, sort_desc.clone())?;
if let Some((plan_id, prof)) = &prof_info {
Ok(ProcessorPtr::create(ProcessorProfileWrapper::create(
transform,
*plan_id,
prof.clone(),
)))
} else {
Ok(ProcessorPtr::create(transform))
}
})?;

build_merge_sort_pipeline(
pipeline,
Expand All @@ -62,9 +58,12 @@ pub fn build_full_sort_pipeline(
partial_block_size,
final_block_size,
prof_info,
false,
remove_order_col_at_last,
)
}

#[allow(clippy::too_many_arguments)]
pub fn build_merge_sort_pipeline(
pipeline: &mut Pipeline,
input_schema: DataSchemaRef,
Expand All @@ -73,9 +72,17 @@ pub fn build_merge_sort_pipeline(
partial_block_size: usize,
final_block_size: usize,
prof_info: Option<(u32, SharedProcessorProfiles)>,
order_col_generated: bool,
remove_order_col_at_last: bool,
) -> Result<()> {
// Merge sort
let need_multi_merge = pipeline.output_len() > 1;
debug_assert!(if order_col_generated {
// If `order_col_generated`, it means this transform is the last processor in the distributed sort pipeline.
!need_multi_merge && remove_order_col_at_last
} else {
true
});
pipeline.add_transform(|input, output| {
let transform = match limit {
Some(limit) => try_create_transform_sort_merge_limit(
Expand All @@ -85,15 +92,17 @@ pub fn build_merge_sort_pipeline(
sort_desc.clone(),
partial_block_size,
limit,
need_multi_merge,
order_col_generated,
need_multi_merge || !remove_order_col_at_last,
)?,
_ => try_create_transform_sort_merge(
input,
output,
input_schema.clone(),
partial_block_size,
sort_desc.clone(),
need_multi_merge,
order_col_generated,
need_multi_merge || !remove_order_col_at_last,
)?,
};

Expand All @@ -110,7 +119,15 @@ pub fn build_merge_sort_pipeline(

if need_multi_merge {
// Multi-pipelines merge sort
try_add_multi_sort_merge(pipeline, input_schema, final_block_size, limit, sort_desc)?;
try_add_multi_sort_merge(
pipeline,
input_schema,
final_block_size,
limit,
sort_desc,
prof_info.clone(),
remove_order_col_at_last,
)?;
}

Ok(())
Expand Down
Loading