Skip to content

Commit

Permalink
Deal with the case if max_threads = 1.
Browse files Browse the repository at this point in the history
  • Loading branch information
RinChanNOWWW committed Dec 3, 2023
1 parent 933fee8 commit bbad78b
Show file tree
Hide file tree
Showing 14 changed files with 223 additions and 111 deletions.
16 changes: 3 additions & 13 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,19 +327,9 @@ impl DataBlock {
}

#[inline]
pub fn pop_columns(self, num: usize) -> Result<Self> {
let mut columns = self.columns.clone();
let len = columns.len();

for _ in 0..num.min(len) {
columns.pop().unwrap();
}

Ok(Self {
columns,
num_rows: self.num_rows,
meta: self.meta,
})
pub fn pop_columns(&mut self, num: usize) {
debug_assert!(num <= self.columns.len());
self.columns.truncate(self.columns.len() - num);
}

/// Resort the columns according to the schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Rows for StringColumn {

impl RowConverter<StringColumn> for CommonRowConverter {
fn create(
sort_columns_descriptions: Vec<SortColumnDescription>,
sort_columns_descriptions: &[SortColumnDescription],
output_schema: DataSchemaRef,
) -> Result<Self> {
let sort_fields = sort_columns_descriptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub trait RowConverter<T: Rows>
where Self: Sized
{
fn create(
sort_columns_descriptions: Vec<SortColumnDescription>,
sort_columns_descriptions: &[SortColumnDescription],
output_schema: DataSchemaRef,
) -> Result<Self>;
fn convert(&mut self, columns: &[BlockEntry], num_rows: usize) -> Result<T>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ where
T::Scalar: Ord,
{
fn create(
sort_columns_descriptions: Vec<SortColumnDescription>,
sort_columns_descriptions: &[SortColumnDescription],
_: DataSchemaRef,
) -> Result<Self> {
assert!(sort_columns_descriptions.len() == 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ where R: Rows + Send + 'static
})?;
// Remove the order column
if self.remove_order_col {
block = block.pop_columns(1)?;
block.pop_columns(1);
}
let cursor = Cursor::new(input_index, rows);
self.heap.push(Reverse(cursor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub fn build_full_sort_pipeline(
partial_block_size,
final_block_size,
prof_info,
false,
remove_order_col_at_last,
)
}
Expand All @@ -71,10 +72,17 @@ pub fn build_merge_sort_pipeline(
partial_block_size: usize,
final_block_size: usize,
prof_info: Option<(u32, SharedProcessorProfiles)>,
order_col_generated: bool,
remove_order_col_at_last: bool,
) -> Result<()> {
// Merge sort
let need_multi_merge = pipeline.output_len() > 1;
debug_assert!(if order_col_generated {
// If `order_col_generated`, it means this transform is the last processor in the distributed sort pipeline.
!need_multi_merge && remove_order_col_at_last
} else {
true
});
pipeline.add_transform(|input, output| {
let transform = match limit {
Some(limit) => try_create_transform_sort_merge_limit(
Expand All @@ -84,6 +92,7 @@ pub fn build_merge_sort_pipeline(
sort_desc.clone(),
partial_block_size,
limit,
order_col_generated,
need_multi_merge || !remove_order_col_at_last,
)?,
_ => try_create_transform_sort_merge(
Expand All @@ -92,6 +101,7 @@ pub fn build_merge_sort_pipeline(
input_schema.clone(),
partial_block_size,
sort_desc.clone(),
order_col_generated,
need_multi_merge || !remove_order_col_at_last,
)?,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,18 @@ use super::TransformCompact;
pub struct SortMergeCompactor<R, Converter> {
block_size: usize,
row_converter: Converter,
order_by_cols: Vec<usize>,
sort_desc: Vec<SortColumnDescription>,

aborting: Arc<AtomicBool>,

/// If the next transform of current transform is [`super::transform_multi_sort_merge::MultiSortMergeProcessor`],
/// we can generate the order column to avoid the extra converting in the next transform.
gen_order_col: bool,
/// we can generate and output the order column to avoid the extra converting in the next transform.
output_order_col: bool,
/// If this transform is after an Exchange transform,
/// it means it will compact the data from cluster nodes.
/// And the order column is already generated in each cluster node,
/// so we don't need to generate the order column again.
order_col_generated: bool,

_c: PhantomData<Converter>,
_r: PhantomData<R>,
Expand All @@ -75,16 +80,26 @@ where
schema: DataSchemaRef,
block_size: usize,
sort_desc: Vec<SortColumnDescription>,
gen_order_col: bool,
order_col_generated: bool,
output_order_col: bool,
) -> Result<Self> {
let order_by_cols = sort_desc.iter().map(|i| i.offset).collect::<Vec<_>>();
let row_converter = Converter::create(sort_desc, schema)?;
debug_assert!(if order_col_generated {
// If the order column is already generated,
// it means this transform is after a exchange source and it's the last transform for sorting.
// We should remove the order column.
!output_order_col
} else {
true
});

let row_converter = Converter::create(&sort_desc, schema)?;
Ok(SortMergeCompactor {
order_by_cols,
row_converter,
block_size,
sort_desc,
aborting: Arc::new(AtomicBool::new(false)),
gen_order_col,
order_col_generated,
output_order_col,
_c: PhantomData,
_r: PhantomData,
})
Expand Down Expand Up @@ -120,19 +135,26 @@ where
.collect::<Vec<_>>();

if blocks.len() == 1 {
if self.gen_order_col {
let block = blocks.get_mut(0).ok_or(ErrorCode::Internal("It's a bug"))?;
let block = blocks.get_mut(0).ok_or(ErrorCode::Internal("It's a bug"))?;
if self.order_col_generated {
// Need to remove order column.
block.pop_columns(1);
return Ok(blocks);
}
if self.output_order_col {
let columns = self
.order_by_cols
.sort_desc
.iter()
.map(|i| block.get_by_offset(*i).clone())
.map(|d| block.get_by_offset(d.offset).clone())
.collect::<Vec<_>>();
let rows = self.row_converter.convert(&columns, block.num_rows())?;
let order_col = rows.to_column();
block.add_column(BlockEntry {
data_type: order_col.data_type(),
value: Value::Column(order_col),
});
if self.output_order_col {
block.add_column(BlockEntry {
data_type: order_col.data_type(),
value: Value::Column(order_col),
});
}
}
return Ok(blocks);
}
Expand All @@ -144,20 +166,36 @@ where

// 1. Put all blocks into a min-heap.
for (i, block) in blocks.iter_mut().enumerate() {
let columns = self
.order_by_cols
.iter()
.map(|i| block.get_by_offset(*i).clone())
.collect::<Vec<_>>();
let rows = self.row_converter.convert(&columns, block.num_rows())?;

if self.gen_order_col {
let order_col = rows.to_column();
block.add_column(BlockEntry {
data_type: order_col.data_type(),
value: Value::Column(order_col),
});
}
let rows = if self.order_col_generated {
let order_col = block
.columns()
.last()
.unwrap()
.value
.as_column()
.unwrap()
.clone();
let rows = R::from_column(order_col, &self.sort_desc)
.ok_or_else(|| ErrorCode::BadDataValueType("Order column type mismatched."))?;
// Need to remove order column.
block.pop_columns(1);
rows
} else {
let columns = self
.sort_desc
.iter()
.map(|d| block.get_by_offset(d.offset).clone())
.collect::<Vec<_>>();
let rows = self.row_converter.convert(&columns, block.num_rows())?;
if self.output_order_col {
let order_col = rows.to_column();
block.add_column(BlockEntry {
data_type: order_col.data_type(),
value: Value::Column(order_col),
});
}
rows
};
let cursor = Cursor::new(i, rows);
heap.push(Reverse(cursor));
}
Expand Down Expand Up @@ -246,7 +284,8 @@ pub fn try_create_transform_sort_merge(
output_schema: DataSchemaRef,
block_size: usize,
sort_desc: Vec<SortColumnDescription>,
gen_order_col: bool,
order_col_generated: bool,
output_order_col: bool,
) -> Result<Box<dyn Processor>> {
if sort_desc.len() == 1 {
let sort_type = output_schema.field(sort_desc[0].offset).data_type();
Expand All @@ -264,7 +303,11 @@ pub fn try_create_transform_sort_merge(
SimpleRows<NumberType<NUM_TYPE>>,
SimpleRowConverter<NumberType<NUM_TYPE>>,
>::try_create(
output_schema, block_size, sort_desc, gen_order_col
output_schema,
block_size,
sort_desc,
order_col_generated,
output_order_col
)?
),
}),
Expand All @@ -275,7 +318,8 @@ pub fn try_create_transform_sort_merge(
output_schema,
block_size,
sort_desc,
gen_order_col,
order_col_generated,
output_order_col,
)?,
),
DataType::Timestamp => SimpleTimestampSort::try_create(
Expand All @@ -285,7 +329,8 @@ pub fn try_create_transform_sort_merge(
output_schema,
block_size,
sort_desc,
gen_order_col,
order_col_generated,
output_order_col,
)?,
),
DataType::String => SimpleStringSort::try_create(
Expand All @@ -295,20 +340,33 @@ pub fn try_create_transform_sort_merge(
output_schema,
block_size,
sort_desc,
gen_order_col,
order_col_generated,
output_order_col,
)?,
),
_ => CommonSort::try_create(
input,
output,
CommonCompactor::try_create(output_schema, block_size, sort_desc, gen_order_col)?,
CommonCompactor::try_create(
output_schema,
block_size,
sort_desc,
order_col_generated,
output_order_col,
)?,
),
}
} else {
CommonSort::try_create(
input,
output,
CommonCompactor::try_create(output_schema, block_size, sort_desc, gen_order_col)?,
CommonCompactor::try_create(
output_schema,
block_size,
sort_desc,
order_col_generated,
output_order_col,
)?,
)
}
}
Expand All @@ -319,6 +377,7 @@ pub fn sort_merge(
sort_desc: Vec<SortColumnDescription>,
data_blocks: Vec<DataBlock>,
) -> Result<Vec<DataBlock>> {
let mut compactor = CommonCompactor::try_create(data_schema, block_size, sort_desc, false)?;
let mut compactor =
CommonCompactor::try_create(data_schema, block_size, sort_desc, false, false)?;
compactor.compact_final(data_blocks)
}
Loading

0 comments on commit bbad78b

Please sign in to comment.