Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 216 additions & 97 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ simd = ["arrow/simd"]
# Workspace dependencies

# Github dependencies

arrow = { git="https://github.com/datafuse-extras/arrow-rs", rev = "86d127b" }
arrow-flight = {git="https://github.com/datafuse-extras/arrow-rs", rev = "86d127b" }
parquet = { git="https://github.com/datafuse-extras/arrow-rs", rev = "86d127b", features = ["arrow"] }

arrow = { package = "arrow2", git="https://github.com/datafuse-extras/arrow2", rev = "ea2f82d" }
arrow-flight = { git="https://github.com/datafuse-extras/arrow2", rev = "ea2f82d" }
parquet = {package = "parquet2", git = "https://github.com/datafuse-extras/parquet2", branch = "main"}
# Crates.io dependencies

[dev-dependencies]
9 changes: 5 additions & 4 deletions common/datablocks/src/data_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::fmt;
use std::sync::Arc;

use common_arrow::arrow;
use common_arrow::arrow::array::ArrayRef;
use common_arrow::arrow::record_batch::RecordBatch;
use common_datavalues::columns::DataColumn;
use common_datavalues::series::IntoSeries;
Expand Down Expand Up @@ -47,9 +48,9 @@ impl DataBlock {
pub fn empty_with_schema(schema: DataSchemaRef) -> Self {
let mut columns = vec![];
for f in schema.fields().iter() {
columns.push(DataColumn::Array(
arrow::array::new_empty_array(&f.data_type().to_arrow()).into_series(),
))
let array = arrow::array::new_empty_array(f.data_type().to_arrow());
let array: ArrayRef = Arc::from(array);
columns.push(DataColumn::Array(array.into_series()))
}
DataBlock { schema, columns }
}
Expand Down Expand Up @@ -152,7 +153,7 @@ impl TryFrom<arrow::record_batch::RecordBatch> for DataBlock {
type Error = ErrorCode;

fn try_from(v: arrow::record_batch::RecordBatch) -> Result<DataBlock> {
let schema = Arc::new(v.schema().into());
let schema = Arc::new(v.schema().as_ref().into());
let series = v
.columns()
.iter()
Expand Down
2 changes: 1 addition & 1 deletion common/datablocks/src/kernels/data_block_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0.

use common_arrow::arrow::util::bit_util::ceil;
use common_datavalues::prelude::ceil;
use common_exception::Result;

use crate::DataBlock;
Expand Down
169 changes: 134 additions & 35 deletions common/datablocks/src/kernels/data_block_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
//
// SPDX-License-Identifier: Apache-2.0.

use common_arrow::arrow::compute;
use std::iter::once;

use common_arrow::arrow::array::growable::make_growable;
use common_arrow::arrow::array::Array;
use common_arrow::arrow::array::ArrayRef;
use common_arrow::arrow::compute::merge_sort::*;
use common_arrow::arrow::compute::sort as arrow_sort;
use common_datavalues::prelude::*;
use common_exception::ErrorCode;
use common_exception::Result;
Expand All @@ -23,18 +29,24 @@ impl DataBlock {
) -> Result<DataBlock> {
let order_columns = sort_columns_descriptions
.iter()
.map(|f| {
Ok(compute::SortColumn {
values: block.try_array_by_name(&f.column_name)?.get_array_ref(),
options: Some(compute::SortOptions {
.map(|f| Ok(block.try_array_by_name(&f.column_name)?.get_array_ref()))
.collect::<Result<Vec<_>>>()?;

let order_arrays = sort_columns_descriptions
.iter()
.zip(order_columns.iter())
.map(|(f, array)| {
Ok(arrow_sort::SortColumn {
values: array.as_ref(),
options: Some(arrow_sort::SortOptions {
descending: !f.asc,
nulls_first: f.nulls_first,
}),
})
})
.collect::<Result<Vec<_>>>()?;

let indices = compute::lexsort_to_indices(&order_columns, limit)?;
let indices = arrow_sort::lexsort_to_indices(&order_arrays, limit)?;
DataBlock::block_take_by_indices(block, &[], indices.values())
}

Expand All @@ -52,45 +64,132 @@ impl DataBlock {
return Ok(lhs.clone());
}

let mut sort_columns = vec![];
for block in [lhs, rhs].iter() {
let columns = sort_columns_descriptions
.iter()
.map(|f| Ok(block.try_column_by_name(&f.column_name)?.clone()))
.collect::<Result<Vec<_>>>()?;
sort_columns.push(columns);
}

let sort_options = sort_columns_descriptions
let sort_arrays = sort_columns_descriptions
.iter()
.map(|f| {
Ok(compute::SortOptions {
descending: !f.asc,
nulls_first: f.nulls_first,
})
let left = lhs.try_column_by_name(&f.column_name)?.clone();
let left = left.to_array()?;

let right = rhs.try_column_by_name(&f.column_name)?.clone();
let right = right.to_array()?;

Ok(vec![left.get_array_ref(), right.get_array_ref()])
})
.collect::<Result<Vec<_>>>()?;

let indices = DataColumnCommon::merge_indices(
&sort_columns[0],
&sort_columns[1],
&sort_options,
limit,
)?;
let sort_dyn_arrays = sort_arrays
.iter()
.map(|f| vec![f[0].as_ref(), f[1].as_ref()])
.collect::<Vec<_>>();

let sort_options = sort_columns_descriptions
.iter()
.map(|f| arrow_sort::SortOptions {
descending: !f.asc,
nulls_first: f.nulls_first,
})
.collect::<Vec<_>>();

let indices = match limit {
Some(limit) => &indices[0..limit.min(indices.len())],
_ => &indices,
};
let sort_options_with_array = sort_dyn_arrays
.iter()
.zip(sort_options.iter())
.map(|(s, opt)| {
let paris: (&[&dyn Array], &SortOptions) = (s, opt);
paris
})
.collect::<Vec<_>>();

let comparator = build_comparator(&sort_options_with_array)?;
let lhs_indices = (0, 0, lhs.num_rows());
let rhs_indices = (1, 0, rhs.num_rows());
let slices = merge_sort_slices(once(&lhs_indices), once(&rhs_indices), &comparator);
let slices = Self::materialize_merge_indices(slices, limit);

let arrays = lhs
.columns()
let fields = lhs.schema().fields();
let columns = fields
.iter()
.zip(rhs.columns().iter())
.map(|(a, b)| DataColumnCommon::merge_columns(a, b, indices))
.map(|f| {
let left = lhs.try_column_by_name(f.name())?;
let right = rhs.try_column_by_name(f.name())?;

let left = left.to_array()?;
let right = right.to_array()?;

let taked = Self::take_arrays_by_slices(
&[
left.get_array_ref().as_ref(),
right.get_array_ref().as_ref(),
],
&slices,
limit,
);
let taked: ArrayRef = Arc::from(taked);

Ok(DataColumn::Array(taked.into_series()))
})
.collect::<Result<Vec<_>>>()?;

Ok(DataBlock::create(lhs.schema().clone(), arrays))
Ok(DataBlock::create(lhs.schema().clone(), columns))
}

fn materialize_merge_indices<
'a,
L: Iterator<Item = &'a MergeSlice>,
R: Iterator<Item = &'a MergeSlice>,
>(
slices: MergeSortSlices<'a, L, R>,
limit: Option<usize>,
) -> Vec<MergeSlice> {
match limit {
Some(limit) => {
let mut v = Vec::with_capacity(limit);
let mut current_len = 0;
for (index, start, len) in slices {
v.push((index, start, len));

if len + current_len >= limit {
break;
} else {
current_len += len;
}
}

v
}
None => slices.into_iter().collect(),
}
}

pub fn take_arrays_by_slices(
arrays: &[&dyn Array],
slices: &[MergeSlice],
limit: Option<usize>,
) -> Box<dyn Array> {
let slices = slices.iter();
let len = arrays.iter().map(|array| array.len()).sum();

let limit = limit.unwrap_or(len);
let limit = limit.min(len);
let mut growable = make_growable(arrays, false, limit);

if limit != len {
let mut current_len = 0;
for (index, start, len) in slices {
if len + current_len >= limit {
growable.extend(*index, *start, limit - current_len);
break;
} else {
growable.extend(*index, *start, *len);
current_len += len;
}
}
} else {
for (index, start, len) in slices {
growable.extend(*index, *start, *len);
}
}

growable.as_box()
}

pub fn merge_sort_blocks(
Expand Down
45 changes: 45 additions & 0 deletions common/datablocks/src/kernels/data_block_sort_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,48 @@ fn test_data_block_sort() -> Result<()> {
}
Ok(())
}

#[test]
fn test_data_block_merge_sort() -> Result<()> {
let schema = DataSchemaRefExt::create(vec![
DataField::new("a", DataType::Int64, false),
DataField::new("b", DataType::Utf8, false),
]);

let raw1 = DataBlock::create_by_array(schema.clone(), vec![
Series::new(vec![3, 5, 7]),
Series::new(vec!["b1", "b2", "b3"]),
]);

let raw2 = DataBlock::create_by_array(schema.clone(), vec![
Series::new(vec![2, 4, 6]),
Series::new(vec!["b4", "b5", "b6"]),
]);

{
let options = vec![SortColumnDescription {
column_name: "a".to_owned(),
asc: true,
nulls_first: false,
}];
let results = DataBlock::merge_sort_block(&raw1, &raw2, &options, None)?;

assert_eq!(raw1.schema(), results.schema());

let expected = vec![
"+---+----+",
"| a | b |",
"+---+----+",
"| 2 | b4 |",
"| 3 | b1 |",
"| 4 | b5 |",
"| 5 | b2 |",
"| 6 | b6 |",
"| 7 | b3 |",
"+---+----+",
];
crate::assert_blocks_eq(expected, &[results]);
}

Ok(())
}
Loading