Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: coldWater <[email protected]>
  • Loading branch information
forsaken628 committed Nov 1, 2024
1 parent 960b682 commit cc57d0e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 27 deletions.
11 changes: 5 additions & 6 deletions src/query/expression/src/aggregate/group_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,12 @@ where I: Index
})
.collect::<Vec<_>>();
if IS_FIRST {
let mut v = IndexHashVisitor::<true, _>::new(&indices, &mut self.target);
v.visit_column(column.column)?;
let mut v = IndexHashVisitor::<true, _>::new(&indices, self.target);
v.visit_column(column.column)
} else {
let mut v = IndexHashVisitor::<false, _>::new(&indices, &mut self.target);
v.visit_column(column.column)?;
let mut v = IndexHashVisitor::<false, _>::new(&indices, self.target);
v.visit_column(column.column)
}
Ok(())
}

fn visit_typed_column<T: ValueType>(&mut self, column: T::Column) -> Result<()> {
Expand Down Expand Up @@ -500,7 +499,7 @@ mod tests {
use crate::Value;

fn merge_hash_slice(ls: &[u64]) -> u64 {
ls.iter().cloned().reduce(|a, b| merge_hash(a, b)).unwrap()
ls.iter().cloned().reduce(merge_hash).unwrap()
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/geography.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl<'a> BinaryLike<'a> for GeographyRef<'a> {

impl<'a> AsRef<[u8]> for GeographyRef<'a> {
fn as_ref(&self) -> &[u8] {
&self.0
self.0
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/query/service/src/pipelines/builders/builder_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use databend_common_expression::with_number_mapped_type;
use databend_common_expression::SortColumnDescription;
use databend_common_pipeline_core::processors::Processor;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_transforms::processors::TransformPipelineHelper;
use databend_common_sql::executor::physical_plans::Window;
use databend_common_sql::executor::physical_plans::WindowPartition;
use databend_storages_common_cache::TempDirManager;
Expand All @@ -32,9 +31,9 @@ use opendal::Operator;

use crate::pipelines::processors::transforms::FrameBound;
use crate::pipelines::processors::transforms::TransformWindow;
use crate::pipelines::processors::transforms::TransformWindowPartialTopN;
use crate::pipelines::processors::transforms::TransformWindowPartitionCollect;
use crate::pipelines::processors::transforms::WindowFunctionInfo;
use crate::pipelines::processors::transforms::WindowPartitionExchange;
use crate::pipelines::processors::transforms::WindowPartitionTopNExchange;
use crate::pipelines::processors::transforms::WindowSortDesc;
use crate::pipelines::processors::transforms::WindowSpillSettings;
Expand Down Expand Up @@ -179,7 +178,7 @@ impl PipelineBuilder {
sort_desc.clone(),
top_n.top,
top_n.func,
num_partitions,
num_partitions as u64,
),
)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,44 @@ impl Exchange for WindowPartitionTopNExchange {
group_hash_value_spread(&hash_indices, entry.value.to_owned(), i == 0, &mut hashes)?;
}

let partition_permutation = self.partition_permutation(
rows,
&permutation,
&hashes,
&partition_equality,
&full_equality,
);

// Partition the data blocks to different processors.
let mut output_data_blocks = vec![vec![]; n];
let mut buf = None;
for (partition_id, indices) in partition_permutation.into_iter().enumerate() {
output_data_blocks[partition_id % n]
.push((partition_id, block.take(&indices, &mut buf)?));
}

// Union data blocks for each processor.
Ok(output_data_blocks
.into_iter()
.map(WindowPartitionMeta::create)
.map(DataBlock::empty_with_meta)
.collect())
}
}

impl WindowPartitionTopNExchange {
fn partition_permutation(
&self,
rows: usize,
permutation: &[u32],
hashes: &[u64],
partition_equality: &[u8],
full_equality: &[u8],
) -> Vec<Vec<u32>> {
let mut partition_permutation = vec![Vec::new(); self.num_partitions as usize];

let mut start = 0;
let mut cur = 0;

while cur < rows {
let partition =
&mut partition_permutation[(hashes[start] % self.num_partitions) as usize];
Expand Down Expand Up @@ -142,24 +175,9 @@ impl Exchange for WindowPartitionTopNExchange {
}
}
}

cur += 1;
}
}

// Partition the data blocks to different processors.
let mut output_data_blocks = vec![vec![]; n];
let mut buf = None;
for (partition_id, indices) in partition_permutation.into_iter().enumerate() {
output_data_blocks[partition_id % n]
.push((partition_id, block.take(&indices, &mut buf)?));
}

// Union data blocks for each processor.
Ok(output_data_blocks
.into_iter()
.map(WindowPartitionMeta::create)
.map(DataBlock::empty_with_meta)
.collect())
partition_permutation
}
}

0 comments on commit cc57d0e

Please sign in to comment.