Skip to content

Commit

Permalink
add downcast_owned_builder
Browse files Browse the repository at this point in the history
  • Loading branch information
ariesdevil committed Nov 22, 2023
1 parent 24960b6 commit df3a287
Show file tree
Hide file tree
Showing 19 changed files with 105 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/query/expression/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ pub trait ValueType: Debug + Clone + PartialEq + Sized + 'static {
builder: &'a mut ColumnBuilder,
) -> Option<&'a mut Self::ColumnBuilder>;

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder>;
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder>;

fn try_upcast_column_builder(builder: Self::ColumnBuilder) -> Option<ColumnBuilder>;

Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl ValueType for AnyType {
Some(builder)
}

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
Some(builder)
}

Expand Down
13 changes: 12 additions & 1 deletion src/query/expression/src/types/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,21 @@ impl<T: ValueType> ValueType for ArrayType<T> {
None
}

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
#[allow(clippy::manual_map)]
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
match builder {
ColumnBuilder::Array(inner) => {
let builder = T::try_downcast_owned_builder(inner.builder);
// ```
// builder.map(|builder| ArrayColumnBuilder {
// builder,
// offsets: inner.offsets,
// })
// ```
// If we using the clippy recommend way like above, the compiler will complain:
// use of partially moved value: `inner`.
// That's rust borrow checker error, if we using the new borrow checker named polonius,
// everything goes fine, but polonius is very slow, so we allow manual map here.
if let Some(builder) = builder {
Some(ArrayColumnBuilder {
builder,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl ValueType for BitmapType {
}
}

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
match builder {
ColumnBuilder::Bitmap(builder) => Some(builder),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl ValueType for BooleanType {
}
}

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
match builder {
ColumnBuilder::Boolean(builder) => Some(builder),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl ValueType for DateType {
}
}

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
match builder {
ColumnBuilder::Date(builder) => Some(builder),
_ => None,
Expand Down
38 changes: 34 additions & 4 deletions src/query/expression/src/types/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ impl<Num: Decimal> ValueType for DecimalType<Num> {
Num::try_downcast_builder(builder)
}

fn try_downcast_owned_builder<'a>(_builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
todo!()
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
Num::try_downcast_owned_builder(builder)
}

fn try_upcast_column_builder(_builder: Self::ColumnBuilder) -> Option<ColumnBuilder> {
todo!()
fn try_upcast_column_builder(builder: Self::ColumnBuilder) -> Option<ColumnBuilder> {
Num::try_upcast_column_builder(builder, Num::default_decimal_size())
}

fn upcast_scalar(scalar: Self::Scalar) -> Scalar {
Expand Down Expand Up @@ -299,6 +299,10 @@ pub trait Decimal:
fn try_downcast_column(column: &Column) -> Option<(Buffer<Self>, DecimalSize)>;
fn try_downcast_builder<'a>(builder: &'a mut ColumnBuilder) -> Option<&'a mut Vec<Self>>;

fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Vec<Self>>;

fn try_upcast_column_builder(builder: Vec<Self>, size: DecimalSize) -> Option<ColumnBuilder>;

fn try_downcast_scalar(scalar: &DecimalScalar) -> Option<Self>;
fn try_downcast_domain(domain: &DecimalDomain) -> Option<SimpleDomain<Self>>;

Expand Down Expand Up @@ -485,6 +489,19 @@ impl Decimal for i128 {
}
}

fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Vec<Self>> {
match builder {
ColumnBuilder::Decimal(DecimalColumnBuilder::Decimal128(s, _)) => Some(s),
_ => None,
}
}

fn try_upcast_column_builder(builder: Vec<Self>, size: DecimalSize) -> Option<ColumnBuilder> {
Some(ColumnBuilder::Decimal(DecimalColumnBuilder::Decimal128(
builder, size,
)))
}

fn try_downcast_scalar<'a>(scalar: &DecimalScalar) -> Option<Self> {
match scalar {
DecimalScalar::Decimal128(val, _) => Some(*val),
Expand Down Expand Up @@ -638,6 +655,19 @@ impl Decimal for i256 {
}
}

fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Vec<Self>> {
match builder {
ColumnBuilder::Decimal(DecimalColumnBuilder::Decimal256(s, _)) => Some(s),
_ => None,
}
}

fn try_upcast_column_builder(builder: Vec<Self>, size: DecimalSize) -> Option<ColumnBuilder> {
Some(ColumnBuilder::Decimal(DecimalColumnBuilder::Decimal256(
builder, size,
)))
}

fn try_downcast_scalar<'a>(scalar: &DecimalScalar) -> Option<Self> {
match scalar {
DecimalScalar::Decimal256(val, _) => Some(*val),
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/empty_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl ValueType for EmptyArrayType {
}
}

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
match builder {
ColumnBuilder::EmptyArray { len } => Some(len),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/empty_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl ValueType for EmptyMapType {
}
}

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
match builder {
ColumnBuilder::EmptyMap { len } => Some(len),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<const INDEX: usize> ValueType for GenericType<INDEX> {
Some(builder)
}

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
Some(builder)
}

Expand Down
4 changes: 2 additions & 2 deletions src/query/expression/src/types/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ impl<K: ValueType, V: ValueType> ValueType for MapType<K, V> {
<MapInternal<K, V> as ValueType>::try_downcast_owned_builder(builder)
}

fn try_upcast_column_builder(_builder: Self::ColumnBuilder) -> Option<ColumnBuilder> {
None
fn try_upcast_column_builder(builder: Self::ColumnBuilder) -> Option<ColumnBuilder> {
<MapInternal<K, V> as ValueType>::try_upcast_column_builder(builder)
}

fn upcast_scalar(scalar: Self::Scalar) -> Scalar {
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl ValueType for NullType {
}
}

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
match builder {
ColumnBuilder::Null { len } => Some(len),
_ => None,
Expand Down
13 changes: 12 additions & 1 deletion src/query/expression/src/types/nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,21 @@ impl<T: ValueType> ValueType for NullableType<T> {
None
}

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
#[allow(clippy::manual_map)]
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
match builder {
ColumnBuilder::Nullable(inner) => {
let builder = T::try_downcast_owned_builder(inner.builder);
// ```
// builder.map(|builder| NullableColumnBuilder {
// builder,
// validity: inner.validity,
// })
// ```
// If we using the clippy recommend way like above, the compiler will complain:
// use of partially moved value: `inner`.
// That's rust borrow checker error, if we using the new borrow checker named polonius,
// everything goes fine, but polonius is very slow, so we allow manual map here.
if let Some(builder) = builder {
Some(NullableColumnBuilder {
builder,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl<Num: Number> ValueType for NumberType<Num> {
}
}

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
match builder {
ColumnBuilder::Number(num) => Num::try_downcast_owned_builder(num),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl ValueType for StringType {
}
}

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
match builder {
ColumnBuilder::String(builder) => Some(builder),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl ValueType for TimestampType {
}
}

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
match builder {
ColumnBuilder::Timestamp(builder) => Some(builder),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl ValueType for VariantType {
}
}

fn try_downcast_owned_builder<'a>(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
fn try_downcast_owned_builder(builder: ColumnBuilder) -> Option<Self::ColumnBuilder> {
match builder {
ColumnBuilder::Variant(builder) => Some(builder),
_ => None,
Expand Down
12 changes: 10 additions & 2 deletions src/query/functions/src/aggregates/aggregate_unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use common_expression::ColumnBuilder;
use common_expression::Scalar;
use common_expression::StateAddr;

use crate::aggregates::take_mut;

pub trait UnaryState<T, R>: Send + Sync + Default
where
T: ValueType,
Expand Down Expand Up @@ -226,8 +228,14 @@ where

fn merge_result(&self, place: StateAddr, builder: &mut ColumnBuilder) -> Result<()> {
let state: &mut S = place.get::<S>();
let builder = R::try_downcast_builder(builder).unwrap();
state.merge_result(builder, self.function_data.as_deref())?;
// some `ValueType` like `NullableType` need ownership to downcast builder,
// so here we using an unsafe way to take the ownership of builder.
take_mut(builder, |builder| {
let mut builder = R::try_downcast_owned_builder(builder).unwrap();
state
.merge_result(&mut builder, self.function_data.as_deref())
.map(|_| R::try_upcast_column_builder(builder).unwrap())
})?;
Ok(())
}

Expand Down
22 changes: 22 additions & 0 deletions src/query/functions/src/aggregates/aggregator_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::fmt::Display;
use std::panic;

use bumpalo::Bump;
use common_exception::ErrorCode;
Expand Down Expand Up @@ -167,3 +168,24 @@ pub fn deserialize_state<T: serde::de::DeserializeOwned>(slice: &mut &[u8]) -> R
*slice = &slice[bytes_read..];
Ok(value)
}

// copy from https://docs.rs/take_mut/0.2.2/take_mut/fn.take.html with some modifications.
// if a panic occurs, the entire process will be aborted, as there's no valid `T` to put back into the `&mut T`.
pub fn take_mut<T, F>(mut_ref: &mut T, closure: F) -> Result<()>
where F: FnOnce(T) -> Result<T> {
use std::ptr;

unsafe {
let old_t = ptr::read(mut_ref);
let closure_result = panic::catch_unwind(panic::AssertUnwindSafe(|| closure(old_t)));

match closure_result {
Ok(Ok(new_t)) => {
ptr::write(mut_ref, new_t);
Ok(())
}
Ok(Err(e)) => Err(e),
Err(_) => ::std::process::abort(),
}
}
}

0 comments on commit df3a287

Please sign in to comment.