Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for median(distinct) aggregation function #10226

Merged
merged 2 commits into from
Apr 26, 2024

Conversation

Jefffrey
Copy link
Contributor

Which issue does this PR close?

Closes #2411

Rationale for this change

Support for explicit distinct median aggregation function, allowing queries such as below to execute now:

select median(a), median(distinct a) from t

What changes are included in this PR?

Essentially duplicated the existing Median aggregation function but aggregate into HashSet instead of a Vec.

Chose HashSet over BTreeMap as figured the insert path is hotter than the fetch path (which is essentially once at evaluation), and (limited) benchmarking didn't show too significant a difference between the two.

Are these changes tested?

Yes, tests added

Are there any user-facing changes?

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Apr 25, 2024
/// MEDIAN(DISTINCT) aggregate expression. Similar to MEDIAN but computes after taking
/// all unique values. This may use a lot of memory if the cardinality is high.
#[derive(Debug)]
pub struct DistinctMedian {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lot of this code is duplicated with above median, but figured that since it's only duplicated once it should be fine? Or can macro it I suppose. Also colocated within existing median.rs file for this reason, to make it clear how similar they are.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main difference seems to be the Accumulator implementation

What do you think about adding a field on Median like distinct

pub struct DistinctMedian {
...
  distinct: bool
}

And then instantiating the correct accumulator increate_accumulator ? That would add an additional check when creating an accumulator but that seems inconsequential compared to the work to actually allocate and compute the median

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, implemented 👍

Comment on lines 223 to 230
macro_rules! helper {
($t:ty, $dt:expr) => {
Ok(Box::new(DistinctMedianAccumulator::<$t> {
data_type: $dt.clone(),
distinct_values: Default::default(),
}))
};
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rename this macro to a more specific name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it follows the name used in Median

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this bit of duplication by folding distinct into Median

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closes #2411

Wow -- another awesomely low ticket being fixed 👏

Thank you @Jefffrey and @Weijun-H -- I left some comments about how to potentially improve this PR but I think overall it looks really nice and could be merged a as is

I do think it would be nice to avoid duplicating DistinctMedian if possible, though I can't come up with a better way for the accumulator.

I think changing some of the tests so the input data wasn't sorted would also improve this PR.

/// MEDIAN(DISTINCT) aggregate expression. Similar to MEDIAN but computes after taking
/// all unique values. This may use a lot of memory if the cardinality is high.
#[derive(Debug)]
pub struct DistinctMedian {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main difference seems to be the Accumulator implementation

What do you think about adding a field on Median like distinct

pub struct DistinctMedian {
...
  distinct: bool
}

And then instantiating the correct accumulator increate_accumulator ? That would add an additional check when creating an accumulator but that seems inconsequential compared to the work to actually allocate and compute the median

Comment on lines 223 to 230
macro_rules! helper {
($t:ty, $dt:expr) => {
Ok(Box::new(DistinctMedianAccumulator::<$t> {
data_type: $dt.clone(),
distinct_values: Default::default(),
}))
};
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it follows the name used in Median

/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
/// in the final evaluation step so that we avoid expensive conversions and
/// allocations during `update_batch`.
struct DistinctMedianAccumulator<T: ArrowNumericType> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started playing around with trying to make a generic trait that could handle both Vec and HashSet. I couldn't make the types work out and I convinced myself it would end up being at least as much code as having the replication across accumulators. Thus I think having a copy/paste/modify version of DistinctMedianAccumulator is fine

/// A trait for a container of numeric types that can be compared
/// A `Vec` is used for Median and `HashSet` for DistinctMedian
trait MedianValues: Send + Sync + std::fmt::Debug {
    type T: ArrowNativeType;

    fn reserve(&mut self, additional: usize);
    fn extend(&mut self, values: impl Iterator<Item = Self::T>);
    fn into_iter(self) -> Box<dyn Iterator<Item = Self::T>>;
    /// Convert the elements of this container into a ListArray
    fn into_list_array(self) -> ListArray;
}

impl <T:ArrowNativeType> MedianValues for Vec<T> {
    type T = T;

    fn reserve(&mut self, additional: usize) {
        todo!()
    }

    fn extend(&mut self, values: impl Iterator<Item=Self::T>) {
        todo!()
    }

    fn into_iter(self) -> Box<dyn Iterator<Item=Self::T>> {
        todo!()
    }

    fn into_list_array(self) -> ListArray {
        todo!()
    }
}


/// The median accumulator accumulates the raw input values
/// as `ScalarValue`s
///
/// The intermediate state is represented as a List of scalar values updated by
/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
/// in the final evaluation step so that we avoid expensive conversions and
/// allocations during `update_batch`.
struct MedianAccumulator<T: ArrowNumericType, V: MedianValues<T = T>>  {
    data_type: DataType,
    all_values: V,
}

I couldn't quite make this work -- it errors like this

error[E0271]: type mismatch resolving `<Vec<i8> as MedianValues>::T == Int8Type`
   --> datafusion/physical-expr/src/aggregate/median.rs:76:33
    |
76  |                       all_values: vec![],
    |                                   ^^^^^^ type mismatch resolving `<Vec<i8> as MedianValues>::T == Int8Type`
...
81  | /         downcast_integer! {
82  | |             dt => (helper, dt),
83  | |             DataType::Float16 => helper!(Float16Type, dt),
84  | |             DataType::Float32 => helper!(Float32Type, dt),
...   |
92  | |             ))),

Here is the full diff if anyone wants to play around

Details

diff --git a/datafusion/physical-expr/src/aggregate/median.rs b/datafusion/physical-expr/src/aggregate/median.rs
index 1049187a5..0e9b0b87d 100644
--- a/datafusion/physical-expr/src/aggregate/median.rs
+++ b/datafusion/physical-expr/src/aggregate/median.rs
@@ -23,7 +23,7 @@ use crate::{AggregateExpr, PhysicalExpr};
 use arrow::array::{Array, ArrayRef};
 use arrow::datatypes::{DataType, Field};
 use arrow_array::cast::AsArray;
-use arrow_array::{downcast_integer, ArrowNativeTypeOp, ArrowNumericType};
+use arrow_array::{downcast_integer, ArrowNativeTypeOp, ArrowNumericType, ListArray};
 use arrow_buffer::ArrowNativeType;
 use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_expr::Accumulator;
@@ -71,7 +71,7 @@ impl AggregateExpr for Median {
         use arrow_array::types::*;
         macro_rules! helper {
             ($t:ty, $dt:expr) => {
-                Ok(Box::new(MedianAccumulator::<$t> {
+                Ok(Box::new(MedianAccumulator::<$t, Vec<<$t as ArrowPrimitiveType>::Native>> {
                     data_type: $dt.clone(),
                     all_values: vec![],
                 }))
@@ -127,6 +127,39 @@ impl PartialEq<dyn Any> for Median {
     }
 }

+/// A trait for a container of numeric types that can be compared
+/// A `Vec` is used for Median and `HashSet` for DistinctMedian
+trait MedianValues: Send + Sync + std::fmt::Debug {
+    type T: ArrowNativeType;
+
+    fn reserve(&mut self, additional: usize);
+    fn extend(&mut self, values: impl Iterator<Item = Self::T>);
+    fn into_iter(self) -> Box<dyn Iterator<Item = Self::T>>;
+    /// Convert the elements of this container into a ListArray
+    fn into_list_array(self) -> ListArray;
+}
+
+impl <T:ArrowNativeType> MedianValues for Vec<T> {
+    type T = T;
+
+    fn reserve(&mut self, additional: usize) {
+        todo!()
+    }
+
+    fn extend(&mut self, values: impl Iterator<Item=Self::T>) {
+        todo!()
+    }
+
+    fn into_iter(self) -> Box<dyn Iterator<Item=Self::T>> {
+        todo!()
+    }
+
+    fn into_list_array(self) -> ListArray {
+        todo!()
+    }
+}
+
+
 /// The median accumulator accumulates the raw input values
 /// as `ScalarValue`s
 ///
@@ -134,18 +167,18 @@ impl PartialEq<dyn Any> for Median {
 /// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
 /// in the final evaluation step so that we avoid expensive conversions and
 /// allocations during `update_batch`.
-struct MedianAccumulator<T: ArrowNumericType> {
+struct MedianAccumulator<T: ArrowNumericType, V: MedianValues<T = T>>  {
     data_type: DataType,
-    all_values: Vec<T::Native>,
+    all_values: V,
 }

-impl<T: ArrowNumericType> std::fmt::Debug for MedianAccumulator<T> {
+impl<T: ArrowNumericType, V: MedianValues<T = T>> std::fmt::Debug for MedianAccumulator<T, V> {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(f, "MedianAccumulator({})", self.data_type)
     }
 }

-impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
+impl<T: ArrowNumericType, V: MedianValues<T = T>> Accumulator for MedianAccumulator<T, V> {
     fn state(&mut self) -> Result<Vec<ScalarValue>> {
         let all_values = self
             .all_values

#[test]
fn distinct_median_decimal_with_nulls() -> Result<()> {
let array: ArrayRef = Arc::new(
vec![Some(1), Some(2), None, Some(3), Some(3), Some(3), Some(3)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend adding values in non sorted order in these tests to make sure there is nothing related to sorting going on

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shuffled some of the tests

}

let array = values[0].as_primitive::<T>();
match array.nulls().filter(|x| x.null_count() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to check this I think that might be clearer is array.null_count() https://docs.rs/arrow/latest/arrow/array/trait.Array.html#method.null_count

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually just ripped this from sum_distinct:

let array = values[0].as_primitive::<T>();
match array.nulls().filter(|x| x.null_count() > 0) {
Some(n) => {
for idx in n.valid_indices() {
self.values.insert(Hashable(array.value(idx)));
}
}
None => array.values().iter().for_each(|x| {
self.values.insert(Hashable(*x));
}),
}

I guess its a way to avoid iterating over options since can use the inner null buffer to get valid indices, but not sure how much a performance difference it would make 🤷

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a common optimization (in the arrow-rs kernels and datafusion) to special case the 'no nulls' case -- if you know there are no nulls in the input you can avoid a branch (to check for null) in the inner loop, which gives the compiler a better chance for auto-vectorization

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point in #10226 (comment) was that the way this is checking for no-nulls seems overly obscure to me

I think the code could look like

        if array.null_count() > 0 {
          for val in array.iter() {
             if let Some(value) = val {
                    self.distinct_values.insert(Hashable(value))
          }
        } else {
           array.values().iter().for_each(|x| {
                self.distinct_values.insert(Hashable(*x));
            }),
       }

But I also don't think it is a big deal

@alamb
Copy link
Contributor

alamb commented Apr 26, 2024

Thanks a lot @Jefffrey -- looks great ❤️

@alamb alamb merged commit c22c299 into apache:main Apr 26, 2024
23 checks passed
@Jefffrey Jefffrey deleted the distinct-median branch April 26, 2024 11:59
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

median(distinct) support
3 participants