Skip to content

Commit

Permalink
remove old code
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 committed May 16, 2024
1 parent facbbc5 commit c32b2ef
Show file tree
Hide file tree
Showing 12 changed files with 731 additions and 1,847 deletions.
769 changes: 381 additions & 388 deletions datafusion/core/src/physical_optimizer/aggregate_statistics.rs

Large diffs are not rendered by default.

190 changes: 94 additions & 96 deletions datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,10 @@ mod tests {
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
use crate::physical_plan::expressions::lit;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::{displayable, Partitioning, Statistics};
use crate::physical_plan::{displayable, Statistics};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_physical_expr::expressions::{col, Count, Sum};
use datafusion_physical_expr::expressions::{col, Sum};

/// Runs the CombinePartialFinalAggregate optimizer and asserts the plan against the expected
macro_rules! assert_optimized {
Expand Down Expand Up @@ -302,98 +300,98 @@ mod tests {
)
}

fn repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap(),
)
}

#[test]
fn aggregations_not_combined() -> Result<()> {
let schema = schema();

let aggr_expr = vec![Arc::new(Count::new(
lit(1i8),
"COUNT(1)".to_string(),
DataType::Int64,
)) as _];
let plan = final_aggregate_exec(
repartition_exec(partial_aggregate_exec(
parquet_exec(&schema),
PhysicalGroupBy::default(),
aggr_expr.clone(),
)),
PhysicalGroupBy::default(),
aggr_expr,
);
// should not combine the Partial/Final AggregateExecs
let expected = &[
"AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
];
assert_optimized!(expected, plan);

let aggr_expr1 = vec![Arc::new(Count::new(
lit(1i8),
"COUNT(1)".to_string(),
DataType::Int64,
)) as _];
let aggr_expr2 = vec![Arc::new(Count::new(
lit(1i8),
"COUNT(2)".to_string(),
DataType::Int64,
)) as _];

let plan = final_aggregate_exec(
partial_aggregate_exec(
parquet_exec(&schema),
PhysicalGroupBy::default(),
aggr_expr1,
),
PhysicalGroupBy::default(),
aggr_expr2,
);
// should not combine the Partial/Final AggregateExecs
let expected = &[
"AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)]",
"AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
];

assert_optimized!(expected, plan);

Ok(())
}

#[test]
fn aggregations_combined() -> Result<()> {
let schema = schema();
let aggr_expr = vec![Arc::new(Count::new(
lit(1i8),
"COUNT(1)".to_string(),
DataType::Int64,
)) as _];

let plan = final_aggregate_exec(
partial_aggregate_exec(
parquet_exec(&schema),
PhysicalGroupBy::default(),
aggr_expr.clone(),
),
PhysicalGroupBy::default(),
aggr_expr,
);
// should combine the Partial/Final AggregateExecs to tne Single AggregateExec
let expected = &[
"AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
];

assert_optimized!(expected, plan);
Ok(())
}
// fn repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
// Arc::new(
// RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap(),
// )
// }

// #[test]
// fn aggregations_not_combined() -> Result<()> {
// let schema = schema();

// let aggr_expr = vec![Arc::new(Count::new(
// lit(1i8),
// "COUNT(1)".to_string(),
// DataType::Int64,
// )) as _];
// let plan = final_aggregate_exec(
// repartition_exec(partial_aggregate_exec(
// parquet_exec(&schema),
// PhysicalGroupBy::default(),
// aggr_expr.clone(),
// )),
// PhysicalGroupBy::default(),
// aggr_expr,
// );
// // should not combine the Partial/Final AggregateExecs
// let expected = &[
// "AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]",
// "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
// "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
// "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
// ];
// assert_optimized!(expected, plan);

// let aggr_expr1 = vec![Arc::new(Count::new(
// lit(1i8),
// "COUNT(1)".to_string(),
// DataType::Int64,
// )) as _];
// let aggr_expr2 = vec![Arc::new(Count::new(
// lit(1i8),
// "COUNT(2)".to_string(),
// DataType::Int64,
// )) as _];

// let plan = final_aggregate_exec(
// partial_aggregate_exec(
// parquet_exec(&schema),
// PhysicalGroupBy::default(),
// aggr_expr1,
// ),
// PhysicalGroupBy::default(),
// aggr_expr2,
// );
// // should not combine the Partial/Final AggregateExecs
// let expected = &[
// "AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)]",
// "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
// "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
// ];

// assert_optimized!(expected, plan);

// Ok(())
// }

// #[test]
// fn aggregations_combined() -> Result<()> {
// let schema = schema();
// let aggr_expr = vec![Arc::new(Count::new(
// lit(1i8),
// "COUNT(1)".to_string(),
// DataType::Int64,
// )) as _];

// let plan = final_aggregate_exec(
// partial_aggregate_exec(
// parquet_exec(&schema),
// PhysicalGroupBy::default(),
// aggr_expr.clone(),
// ),
// PhysicalGroupBy::default(),
// aggr_expr,
// );
// // should combine the Partial/Final AggregateExecs to tne Single AggregateExec
// let expected = &[
// "AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)]",
// "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
// ];

// assert_optimized!(expected, plan);
// Ok(())
// }

#[test]
fn aggregations_with_group_combined() -> Result<()> {
Expand Down
138 changes: 68 additions & 70 deletions datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation {
mod tests {

use super::*;
use crate::physical_optimizer::aggregate_statistics::tests::TestAggregate;
use crate::physical_optimizer::enforce_distribution::tests::{
parquet_exec_with_sort, schema, trim_plan_display,
};
Expand All @@ -209,8 +208,7 @@ mod tests {
use arrow::util::pretty::pretty_format_batches;
use arrow_schema::SchemaRef;
use datafusion_execution::config::SessionConfig;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{cast, col};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{expressions, PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_plan::aggregates::AggregateMode;
use datafusion_physical_plan::displayable;
Expand Down Expand Up @@ -507,73 +505,73 @@ mod tests {
Ok(())
}

#[test]
fn test_has_aggregate_expression() -> Result<()> {
let source = mock_data()?;
let schema = source.schema();
let agg = TestAggregate::new_count_star();

// `SELECT <aggregate with no expressions> FROM MemoryExec LIMIT 10;`, Single AggregateExec
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![agg.count_expr()], /* aggr_expr */
vec![None], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
10, // fetch
);
// expected not to push the limit to the AggregateExec
let expected = [
"LocalLimitExec: fetch=10",
"AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]",
"MemoryExec: partitions=1, partition_sizes=[1]",
];
let plan: Arc<dyn ExecutionPlan> = Arc::new(limit_exec);
assert_plan_matches_expected(&plan, &expected)?;
Ok(())
}

#[test]
fn test_has_filter() -> Result<()> {
let source = mock_data()?;
let schema = source.schema();

// `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec
// the `a > 1` filter is applied in the AggregateExec
let filter_expr = Some(expressions::binary(
expressions::col("a", &schema)?,
Operator::Gt,
cast(expressions::lit(1u32), &schema, DataType::Int32)?,
&schema,
)?);
let agg = TestAggregate::new_count_star();
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![agg.count_expr()], /* aggr_expr */
vec![filter_expr], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
10, // fetch
);
// expected not to push the limit to the AggregateExec
// TODO(msirek): open an issue for `filter_expr` of `AggregateExec` not printing out
let expected = [
"LocalLimitExec: fetch=10",
"AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]",
"MemoryExec: partitions=1, partition_sizes=[1]",
];
let plan: Arc<dyn ExecutionPlan> = Arc::new(limit_exec);
assert_plan_matches_expected(&plan, &expected)?;
Ok(())
}
// #[test]
// fn test_has_aggregate_expression() -> Result<()> {
// let source = mock_data()?;
// let schema = source.schema();
// let agg = TestAggregate::new_count_star();

// // `SELECT <aggregate with no expressions> FROM MemoryExec LIMIT 10;`, Single AggregateExec
// let single_agg = AggregateExec::try_new(
// AggregateMode::Single,
// build_group_by(&schema.clone(), vec!["a".to_string()]),
// vec![agg.count_expr()], /* aggr_expr */
// vec![None], /* filter_expr */
// source, /* input */
// schema.clone(), /* input_schema */
// )?;
// let limit_exec = LocalLimitExec::new(
// Arc::new(single_agg),
// 10, // fetch
// );
// // expected not to push the limit to the AggregateExec
// let expected = [
// "LocalLimitExec: fetch=10",
// "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]",
// "MemoryExec: partitions=1, partition_sizes=[1]",
// ];
// let plan: Arc<dyn ExecutionPlan> = Arc::new(limit_exec);
// assert_plan_matches_expected(&plan, &expected)?;
// Ok(())
// }

// #[test]
// fn test_has_filter() -> Result<()> {
// let source = mock_data()?;
// let schema = source.schema();

// // `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec
// // the `a > 1` filter is applied in the AggregateExec
// let filter_expr = Some(expressions::binary(
// expressions::col("a", &schema)?,
// Operator::Gt,
// cast(expressions::lit(1u32), &schema, DataType::Int32)?,
// &schema,
// )?);
// let agg = TestAggregate::new_count_star();
// let single_agg = AggregateExec::try_new(
// AggregateMode::Single,
// build_group_by(&schema.clone(), vec!["a".to_string()]),
// vec![agg.count_expr()], /* aggr_expr */
// vec![filter_expr], /* filter_expr */
// source, /* input */
// schema.clone(), /* input_schema */
// )?;
// let limit_exec = LocalLimitExec::new(
// Arc::new(single_agg),
// 10, // fetch
// );
// // expected not to push the limit to the AggregateExec
// // TODO(msirek): open an issue for `filter_expr` of `AggregateExec` not printing out
// let expected = [
// "LocalLimitExec: fetch=10",
// "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]",
// "MemoryExec: partitions=1, partition_sizes=[1]",
// ];
// let plan: Arc<dyn ExecutionPlan> = Arc::new(limit_exec);
// assert_plan_matches_expected(&plan, &expected)?;
// Ok(())
// }

#[test]
fn test_has_order_by() -> Result<()> {
Expand Down
Loading

0 comments on commit c32b2ef

Please sign in to comment.