Skip to content

Commit

Permalink
Remove Built-in sum and Rename to lowercase sum (apache#10831)
Browse files Browse the repository at this point in the history
* rm sum

Signed-off-by: jayzhan211 <[email protected]>

* mv stub to df:expr

Signed-off-by: jayzhan211 <[email protected]>

* fix sql example

Signed-off-by: jayzhan211 <[email protected]>

* lowercase in slt

Signed-off-by: jayzhan211 <[email protected]>

* rename to lowercase

Signed-off-by: jayzhan211 <[email protected]>

* rename stl in tpch

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 authored and findepi committed Jul 16, 2024
1 parent 3b81fce commit 50a1c6c
Show file tree
Hide file tree
Showing 71 changed files with 814 additions and 1,313 deletions.
6 changes: 3 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1820,7 +1820,7 @@ mod tests {

assert_batches_sorted_eq!(
["+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+",
"| c1 | MIN(aggregate_test_100.c12) | MAX(aggregate_test_100.c12) | AVG(aggregate_test_100.c12) | SUM(aggregate_test_100.c12) | COUNT(aggregate_test_100.c12) | COUNT(DISTINCT aggregate_test_100.c12) |",
"| c1 | MIN(aggregate_test_100.c12) | MAX(aggregate_test_100.c12) | AVG(aggregate_test_100.c12) | sum(aggregate_test_100.c12) | COUNT(aggregate_test_100.c12) | COUNT(DISTINCT aggregate_test_100.c12) |",
"+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+",
"| a | 0.02182578039211991 | 0.9800193410444061 | 0.48754517466109415 | 10.238448667882977 | 21 | 21 |",
"| b | 0.04893135681998029 | 0.9185813970744787 | 0.41040709263815384 | 7.797734760124923 | 19 | 19 |",
Expand Down Expand Up @@ -2395,7 +2395,7 @@ mod tests {
assert_batches_sorted_eq!(
[
"+----+-----------------------------+",
"| c1 | SUM(aggregate_test_100.c12) |",
"| c1 | sum(aggregate_test_100.c12) |",
"+----+-----------------------------+",
"| a | 10.238448667882977 |",
"| b | 7.797734760124923 |",
Expand All @@ -2411,7 +2411,7 @@ mod tests {
assert_batches_sorted_eq!(
[
"+----+---------------------+",
"| c1 | SUM(test_table.c12) |",
"| c1 | sum(test_table.c12) |",
"+----+---------------------+",
"| a | 10.238448667882977 |",
"| b | 7.797734760124923 |",
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ mod tests {

#[rustfmt::skip]
let expected = ["+--------------+",
"| SUM(aggr.c2) |",
"| sum(aggr.c2) |",
"+--------------+",
"| 285 |",
"+--------------+"];
Expand Down Expand Up @@ -956,7 +956,7 @@ mod tests {

#[rustfmt::skip]
let expected = ["+--------------+",
"| SUM(aggr.c3) |",
"| sum(aggr.c3) |",
"+--------------+",
"| 781 |",
"+--------------+"];
Expand Down Expand Up @@ -1122,7 +1122,7 @@ mod tests {

#[rustfmt::skip]
let expected = ["+---------------------+",
"| SUM(empty.column_1) |",
"| sum(empty.column_1) |",
"+---------------------+",
"| 10 |",
"+---------------------+"];
Expand Down Expand Up @@ -1161,7 +1161,7 @@ mod tests {

#[rustfmt::skip]
let expected = ["+-----------------------+",
"| SUM(one_col.column_1) |",
"| sum(one_col.column_1) |",
"+-----------------------+",
"| 50 |",
"+-----------------------+"];
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,15 +470,15 @@ mod tests {
ctx.register_json("json_parallel", table_path, options)
.await?;

let query = "SELECT SUM(a) FROM json_parallel;";
let query = "SELECT sum(a) FROM json_parallel;";

let result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_num_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = [
"+----------------------+",
"| SUM(json_parallel.a) |",
"| sum(json_parallel.a) |",
"+----------------------+",
"| -7 |",
"+----------------------+"
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/execution/context/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ mod tests {
)
.await?;
let results =
plan_and_collect(&ctx, "SELECT SUM(c1), SUM(c2), COUNT(*) FROM test").await?;
plan_and_collect(&ctx, "SELECT sum(c1), sum(c2), COUNT(*) FROM test").await?;

assert_eq!(results.len(), 1);
let expected = [
"+--------------+--------------+----------+",
"| SUM(test.c1) | SUM(test.c2) | COUNT(*) |",
"| sum(test.c1) | sum(test.c2) | COUNT(*) |",
"+--------------+--------------+----------+",
"| 10 | 110 | 20 |",
"+--------------+--------------+----------+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ mod tests {
use crate::physical_plan::{displayable, Partitioning};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_physical_expr::expressions::{col, Count, Sum};
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::expressions::{col, Count};
use datafusion_physical_plan::udaf::create_aggregate_expr;

/// Runs the CombinePartialFinalAggregate optimizer and asserts the plan against the expected
macro_rules! assert_optimized {
Expand Down Expand Up @@ -391,12 +393,17 @@ mod tests {
#[test]
fn aggregations_with_group_combined() -> Result<()> {
let schema = schema();
let aggr_expr = vec![Arc::new(Sum::new(
col("b", &schema)?,
"Sum(b)".to_string(),
DataType::Int64,
)) as _];

let aggr_expr = vec![create_aggregate_expr(
&sum_udaf(),
&[col("b", &schema)?],
&[],
&[],
&schema,
"Sum(b)",
false,
false,
)?];
let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
vec![(col("c", &schema)?, "c".to_string())];

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2586,7 +2586,7 @@ mod tests {
.downcast_ref::<AggregateExec>()
.expect("hash aggregate");
assert_eq!(
"SUM(aggregate_test_100.c2)",
"sum(aggregate_test_100.c2)",
final_hash_agg.schema().field(1).name()
);
// we need access to the input to the partial aggregate so that other projects can
Expand Down Expand Up @@ -2614,7 +2614,7 @@ mod tests {
.downcast_ref::<AggregateExec>()
.expect("hash aggregate");
assert_eq!(
"SUM(aggregate_test_100.c3)",
"sum(aggregate_test_100.c3)",
final_hash_agg.schema().field(2).name()
);
// we need access to the input to the partial aggregate so that other projects can
Expand Down
20 changes: 14 additions & 6 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::{collect, displayable, ExecutionPlan};
use datafusion::prelude::{DataFrame, SessionConfig, SessionContext};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_physical_expr::expressions::{col, Sum};
use datafusion_physical_expr::{AggregateExpr, PhysicalSortExpr};
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::udaf::create_aggregate_expr;
use datafusion_physical_plan::InputOrderMode;
use test_utils::{add_empty_batches, StringBatchGenerator};

Expand Down Expand Up @@ -101,11 +103,17 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
.with_sort_information(vec![sort_keys]),
);

let aggregate_expr = vec![Arc::new(Sum::new(
col("d", &schema).unwrap(),
let aggregate_expr = vec![create_aggregate_expr(
&sum_udaf(),
&[col("d", &schema).unwrap()],
&[],
&[],
&schema,
"sum1",
DataType::Int64,
)) as Arc<dyn AggregateExpr>];
false,
false,
)
.unwrap()];
let expr = group_by_columns
.iter()
.map(|elem| (col(elem, &schema).unwrap(), elem.to_string()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ async fn test_udaf_shadows_builtin_fn() {
// compute with builtin `sum` aggregator
let expected = [
"+---------------------------------------+",
"| SUM(arrow_cast(t.time,Utf8(\"Int64\"))) |",
"| sum(arrow_cast(t.time,Utf8(\"Int64\"))) |",
"+---------------------------------------+",
"| 19000 |",
"+---------------------------------------+",
Expand Down
6 changes: 0 additions & 6 deletions datafusion/expr/src/aggregate_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ use strum_macros::EnumIter;
pub enum AggregateFunction {
/// Count
Count,
/// Sum
Sum,
/// Minimum
Min,
/// Maximum
Expand Down Expand Up @@ -102,7 +100,6 @@ impl AggregateFunction {
use AggregateFunction::*;
match self {
Count => "COUNT",
Sum => "SUM",
Min => "MIN",
Max => "MAX",
Avg => "AVG",
Expand Down Expand Up @@ -157,7 +154,6 @@ impl FromStr for AggregateFunction {
"max" => AggregateFunction::Max,
"mean" => AggregateFunction::Avg,
"min" => AggregateFunction::Min,
"sum" => AggregateFunction::Sum,
"array_agg" => AggregateFunction::ArrayAgg,
"nth_value" => AggregateFunction::NthValue,
"string_agg" => AggregateFunction::StringAgg,
Expand Down Expand Up @@ -223,7 +219,6 @@ impl AggregateFunction {
// The coerced_data_types is same with input_types.
Ok(coerced_data_types[0].clone())
}
AggregateFunction::Sum => sum_return_type(&coerced_data_types[0]),
AggregateFunction::BitAnd
| AggregateFunction::BitOr
| AggregateFunction::BitXor => Ok(coerced_data_types[0].clone()),
Expand Down Expand Up @@ -308,7 +303,6 @@ impl AggregateFunction {
Signature::uniform(1, vec![DataType::Boolean], Volatility::Immutable)
}
AggregateFunction::Avg
| AggregateFunction::Sum
| AggregateFunction::VariancePop
| AggregateFunction::Stddev
| AggregateFunction::StddevPop
Expand Down
1 change: 0 additions & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2239,7 +2239,6 @@ mod test {
"max",
"count",
"avg",
"sum",
];
for name in names {
let fun = find_df_window_func(name).unwrap();
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub mod logical_plan;
pub mod registry;
pub mod simplify;
pub mod sort_properties;
pub mod test;
pub mod tree_node;
pub mod type_coercion;
pub mod utils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,40 @@
// specific language governing permissions and limitations
// under the License.

//! Aggregate function stubs to test SQL optimizers.
//! Aggregate function stubs for test in expr / optimizer.
//!
//! These are used to avoid a dependence on `datafusion-functions-aggregate` which live in a different crate

use std::any::Any;

use arrow::datatypes::{
DataType, Field, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
};
use datafusion_common::{exec_err, Result};
use datafusion_expr::{
use crate::{
expr::AggregateFunction,
function::{AccumulatorArgs, StateFieldsArgs},
utils::AggregateOrderSensitivity,
Accumulator, AggregateUDFImpl, Expr, GroupsAccumulator, ReversedUDAF, Signature,
Volatility,
};
use arrow::datatypes::{
DataType, Field, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
};
use datafusion_common::{exec_err, Result};

macro_rules! create_func {
($UDAF:ty, $AGGREGATE_UDF_FN:ident) => {
paste::paste! {
/// Singleton instance of [$UDAF], ensures the UDAF is only created once
/// named STATIC_$(UDAF). For example `STATIC_FirstValue`
#[allow(non_upper_case_globals)]
static [< STATIC_ $UDAF >]: std::sync::OnceLock<std::sync::Arc<datafusion_expr::AggregateUDF>> =
static [< STATIC_ $UDAF >]: std::sync::OnceLock<std::sync::Arc<crate::AggregateUDF>> =
std::sync::OnceLock::new();

/// AggregateFunction that returns a [AggregateUDF] for [$UDAF]
///
/// [AggregateUDF]: datafusion_expr::AggregateUDF
pub fn $AGGREGATE_UDF_FN() -> std::sync::Arc<datafusion_expr::AggregateUDF> {
/// [AggregateUDF]: crate::AggregateUDF
pub fn $AGGREGATE_UDF_FN() -> std::sync::Arc<crate::AggregateUDF> {
[< STATIC_ $UDAF >]
.get_or_init(|| {
std::sync::Arc::new(datafusion_expr::AggregateUDF::from(<$UDAF>::default()))
std::sync::Arc::new(crate::AggregateUDF::from(<$UDAF>::default()))
})
.clone()
}
Expand All @@ -58,7 +58,7 @@ macro_rules! create_func {

create_func!(Sum, sum_udaf);

pub(crate) fn sum(expr: Expr) -> Expr {
pub fn sum(expr: Expr) -> Expr {
Expr::AggregateFunction(AggregateFunction::new_udf(
sum_udaf(),
vec![expr],
Expand All @@ -73,14 +73,12 @@ pub(crate) fn sum(expr: Expr) -> Expr {
#[derive(Debug)]
pub struct Sum {
signature: Signature,
aliases: Vec<String>,
}

impl Sum {
pub fn new() -> Self {
Self {
signature: Signature::user_defined(Volatility::Immutable),
aliases: vec!["sum".to_string()],
}
}
}
Expand All @@ -97,7 +95,7 @@ impl AggregateUDFImpl for Sum {
}

fn name(&self) -> &str {
"SUM"
"sum"
}

fn signature(&self) -> &Signature {
Expand Down Expand Up @@ -162,7 +160,7 @@ impl AggregateUDFImpl for Sum {
}

fn aliases(&self) -> &[String] {
&self.aliases
&[]
}

fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
Expand Down
18 changes: 18 additions & 0 deletions datafusion/expr/src/test/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub mod function_stub;
Loading

0 comments on commit 50a1c6c

Please sign in to comment.