Skip to content

Commit

Permalink
adding max_by and min_by
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Sep 5, 2024
1 parent 91b1d2b commit 4d80b0f
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 1 deletion.
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl FirstValueAccumulator {
}

// Updates state with the values in the given row.
fn update_with_new_row(&mut self, row: &[ScalarValue]) {
pub fn update_with_new_row(&mut self, row: &[ScalarValue]) {
self.first = row[0].clone();
self.orderings = row[1..].to_vec();
self.is_set = true;
Expand Down
3 changes: 3 additions & 0 deletions datafusion/functions-aggregate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub mod bit_and_or_xor;
pub mod bool_and_or;
pub mod grouping;
pub mod kurtosis_pop;
pub mod max_min_by;
pub mod nth_value;
pub mod string_agg;

Expand Down Expand Up @@ -172,6 +173,8 @@ pub fn all_default_aggregate_functions() -> Vec<Arc<AggregateUDF>> {
grouping::grouping_udaf(),
nth_value::nth_value_udaf(),
kurtosis_pop::kurtosis_pop_udaf(),
max_min_by::max_by_udaf(),
max_min_by::min_by_udaf(),
]
}

Expand Down
205 changes: 205 additions & 0 deletions datafusion/functions-aggregate/src/max_min_by.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::first_last::last_value_udaf;
use arrow_schema::DataType;
use datafusion_common::{exec_err, Result};
use datafusion_expr::expr::{AggregateFunction, Sort};
use datafusion_expr::simplify::SimplifyInfo;
use datafusion_expr::{Accumulator, AggregateUDFImpl, Expr, Signature, Volatility};
use datafusion_functions_aggregate_common::accumulator::AccumulatorArgs;
use std::any::Any;
use std::fmt::Debug;
use std::ops::Deref;

make_udaf_expr_and_func!(
MaxByFunction,
max_by,
x y,
"Returns the value of the first column corresponding to the maximum value in the second column.",
max_by_udaf
);

pub struct MaxByFunction {
signature: Signature,
}

impl Debug for MaxByFunction {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("MaxBy")
.field("name", &self.name())
.field("signature", &self.signature)
.field("accumulator", &"<FUNC>")
.finish()
}
}
impl Default for MaxByFunction {
fn default() -> Self {
Self::new()
}
}

impl MaxByFunction {
pub fn new() -> Self {
Self {
signature: Signature::user_defined(Volatility::Immutable),
}
}
}

fn get_min_max_by_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
match &input_types[0] {
DataType::Dictionary(_, dict_value_type) => {
// TODO add checker, if the value type is complex data type
Ok(vec![dict_value_type.deref().clone()])
}
_ => Ok(input_types.to_vec()),
}
}

impl AggregateUDFImpl for MaxByFunction {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"max_by"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].to_owned())
}

fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
exec_err!("should not reach here")
}
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
get_min_max_by_result_type(arg_types)
}

fn simplify(
&self,
) -> Option<datafusion_expr::function::AggregateFunctionSimplification> {
let simplify = |mut aggr_func: datafusion_expr::expr::AggregateFunction,
_: &dyn SimplifyInfo| {
let mut order_by = aggr_func.order_by.unwrap_or_else(|| vec![]);
let (second_arg, first_arg) =
(aggr_func.args.remove(1), aggr_func.args.remove(0));

order_by.push(Sort::new(second_arg, true, false));

Ok(Expr::AggregateFunction(AggregateFunction::new_udf(
last_value_udaf(),
vec![first_arg],
aggr_func.distinct,
aggr_func.filter,
Some(order_by),
aggr_func.null_treatment,
)))
};
Some(Box::new(simplify))
}
}

make_udaf_expr_and_func!(
MinByFunction,
min_by,
x y,
"Returns the value of the first column corresponding to the minimum value in the second column.",
min_by_udaf
);

pub struct MinByFunction {
signature: Signature,
}

impl Debug for MinByFunction {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("MinBy")
.field("name", &self.name())
.field("signature", &self.signature)
.field("accumulator", &"<FUNC>")
.finish()
}
}

impl Default for MinByFunction {
fn default() -> Self {
Self::new()
}
}

impl MinByFunction {
pub fn new() -> Self {
Self {
signature: Signature::user_defined(Volatility::Immutable),
}
}
}

impl AggregateUDFImpl for MinByFunction {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"min_by"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].to_owned())
}

fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
exec_err!("should not reach here")
}

fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
get_min_max_by_result_type(arg_types)
}

fn simplify(
&self,
) -> Option<datafusion_expr::function::AggregateFunctionSimplification> {
let simplify = |mut aggr_func: datafusion_expr::expr::AggregateFunction,
_: &dyn SimplifyInfo| {
let mut order_by = aggr_func.order_by.unwrap_or_else(|| vec![]);
let (second_arg, first_arg) =
(aggr_func.args.remove(1), aggr_func.args.remove(0));

order_by.push(Sort::new(second_arg, false, false)); // false for ascending sort

Ok(Expr::AggregateFunction(AggregateFunction::new_udf(
last_value_udaf(),
vec![first_arg],
aggr_func.distinct,
aggr_func.filter,
Some(order_by),
aggr_func.null_treatment,
)))
};
Some(Box::new(simplify))
}
}
67 changes: 67 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5924,3 +5924,70 @@ SELECT kurtosis_pop(c1) FROM t1;

statement ok
DROP TABLE t1;

# test max_by function
query I
SELECT max_by(num_column, value) AS max_num
FROM VALUES
(10, 1),
(20, 2),
(30, 3) AS tab(num_column, value);
----
30

query R
SELECT max_by(float_column, value) AS max_float
FROM VALUES
(10.5, 5),
(20.75, 10),
(15.25, 15) AS tab(float_column, value);
----
15.25

query T
SELECT max_by(date_column, value) AS max_date
FROM VALUES
('2024-01-01', 1),
('2024-02-01', 2),
('2024-03-01', 3) AS tab(date_column, value);
----
2024-03-01


# Test max_by function with same maximum values (order sensitivity)
query T
SELECT max_by(value_column, metric) AS max_value
FROM VALUES
('A', 2),
('B', 2),
('C', 1) AS tab(value_column, metric);
----
A

query T
SELECT max_by(value_column, metric) AS max_value
FROM VALUES
('B', 2), -- Reversed order
('A', 2),
('C', 1) AS tab(value_column, metric);
----
B


query T
SELECT min_by(value_column, metric) AS min_value
FROM VALUES
('A', 1),
('B', 0), -- same min metric value as 'C'
('C', 0) AS tab(value_column, metric);
----
B

query T
SELECT min_by(value_column, metric) AS min_value
FROM VALUES
('C', 0), -- Reversed order
('B', 0),
('A', 1) AS tab(value_column, metric);
----
C
32 changes: 32 additions & 0 deletions docs/source/user-guide/sql/aggregate_functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ last_value(expression [ORDER BY expression])
- [regr_syy](#regr_syy)
- [regr_sxy](#regr_sxy)
- [kurtosis_pop](#kurtosis_pop)
- [max_by](#max_by)
- [min_by](#min_by)

### `corr`

Expand Down Expand Up @@ -541,6 +543,36 @@ kurtois_pop(expression)
- **expression**: Expression to operate on.
Can be a constant, column, or function, and any combination of arithmetic operators.

### `max_by`

Returns the value of the first expression corresponding to the maximum value in the second expression. If there are multiple values in the first expression with the same maximum value in the second expression, the result will be the first occurrence of such a value based on the input order.

```
max_by(expression1, expression2)
```
#### Arguments

- **expression1**: First expression to return the value from.
Can be a constant, column, or function, and any combination of arithmetic operators.

- **expression2** Second expression used to determine the maximum value.
Can be a constant, column, or function, and any combination of arithmetic operators.

### `min_by`

Returns the value of the first expression corresponding to the minimum value in the second expression. If there are multiple values in the first expression with the same minimum value in the second expression, the result will be the first occurrence of such a value based on the input order.

```
min_by(expression1, expression2)
```
#### Arguments

- **expression1**: First expression to return the value from.
Can be a constant, column, or function, and any combination of arithmetic operators.

- **expression2** Second expression used to determine the maximum value.
Can be a constant, column, or function, and any combination of arithmetic operators.

## Approximate

- [approx_distinct](#approx_distinct)
Expand Down

0 comments on commit 4d80b0f

Please sign in to comment.