Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single mode for multi column group by -- Almost 2x for ClickBench Q32 #11792

Closed
wants to merge 16 commits into from
2 changes: 1 addition & 1 deletion benchmarks/queries/clickbench/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventD
SELECT "TraficSourceID", "SearchEngineID", "AdvEngineID", CASE WHEN ("SearchEngineID" = 0 AND "AdvEngineID" = 0) THEN "Referer" ELSE '' END AS Src, "URL" AS Dst, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 GROUP BY "TraficSourceID", "SearchEngineID", "AdvEngineID", Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
SELECT "URLHash", "EventDate"::INT::DATE, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "TraficSourceID" IN (-1, 6) AND "RefererHash" = 3594120000172545465 GROUP BY "URLHash", "EventDate"::INT::DATE ORDER BY PageViews DESC LIMIT 10 OFFSET 100;
SELECT "WindowClientWidth", "WindowClientHeight", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "DontCountHits" = 0 AND "URLHash" = 2868770270353813622 GROUP BY "WindowClientWidth", "WindowClientHeight" ORDER BY PageViews DESC LIMIT 10 OFFSET 10000;
SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000;
SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000;
4 changes: 4 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ name = "sort_limit_query_sql"
harness = false
name = "math_query_sql"

[[bench]]
harness = false
name = "multi_groupby"

[[bench]]
harness = false
name = "filter_query_sql"
Expand Down
131 changes: 131 additions & 0 deletions datafusion/core/benches/multi_groupby.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::{
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use arrow_array::{Int64Array, StringArray};
use arrow_schema::SchemaRef;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::prelude::SessionContext;
use datafusion::{datasource::MemTable, error::Result};
use futures::executor::block_on;
use std::sync::Arc;
use tokio::runtime::Runtime;

async fn query(ctx: &mut SessionContext, sql: &str) {
let rt = Runtime::new().unwrap();

// execute the query
let df = rt.block_on(ctx.sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}

fn high_cardinality_batches(
array_len: usize,
batch_size: usize,
schema: SchemaRef,
) -> Vec<RecordBatch> {
(0..array_len / batch_size)
.map(|i| {
let data1 = (0..batch_size)
.map(|j| (batch_size * i + j) as i64)
.collect::<Vec<_>>();
let data2 = (0..batch_size)
.map(|j| format!("a{}", (batch_size * i + j)))
.collect::<Vec<_>>();

RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(data1)),
Arc::new(StringArray::from(data2)),
],
)
.unwrap()
})
.collect::<Vec<_>>()
}

fn low_cardinality_batches(
array_len: usize,
batch_size: usize,
schema: SchemaRef,
) -> Vec<RecordBatch> {
(0..array_len / batch_size)
.map(|i| {
let data1 = (0..batch_size)
.map(|j| ((batch_size * i + j) % 4 > 1) as i64)
.collect::<Vec<_>>();
let data2 = (0..batch_size)
.map(|j| format!("a{}", ((batch_size * i + j) % 2)))
.collect::<Vec<_>>();

RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(data1)),
Arc::new(StringArray::from(data2)),
],
)
.unwrap()
})
.collect::<Vec<_>>()
}

fn create_context(
batches: Vec<RecordBatch>,
schema: SchemaRef,
) -> Result<SessionContext> {
let ctx = SessionContext::new();

// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
let provider = MemTable::try_new(schema, vec![batches])?;
ctx.register_table("t", Arc::new(provider))?;

Ok(ctx)
}

fn criterion_benchmark(c: &mut Criterion) {
let array_len = 2000000; // 2M rows
let batch_size = 8192;
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Utf8, false),
]));

c.bench_function("benchmark high cardinality", |b| {
let batches = high_cardinality_batches(array_len, batch_size, Arc::clone(&schema));
let mut ctx = create_context(batches, Arc::clone(&schema)).unwrap();
b.iter(|| block_on(query(&mut ctx, "select a, b, count(*) from t group by a, b order by count(*) desc limit 10")))
});

c.bench_function("benchmark low cardinality", |b| {
let batches = low_cardinality_batches(array_len, batch_size, Arc::clone(&schema));
let mut ctx = create_context(batches, Arc::clone(&schema)).unwrap();
b.iter(|| block_on(query(&mut ctx, "select a, b, count(*) from t group by a, b order by count(*) desc limit 10")))
});
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gnuplot not found, using plotters backend
benchmark high cardinality
                        time:   [273.00 ms 350.24 ms 451.67 ms]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) high mild

benchmark low cardinality
                        time:   [15.764 ms 16.531 ms 17.145 ms]

Copy link
Contributor Author

@jayzhan211 jayzhan211 Aug 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main branch

Gnuplot not found, using plotters backend
Benchmarking benchmark high cardinality: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 11.0s.
benchmark high cardinality
                        time:   [565.74 ms 964.40 ms 1.3864 s]
                        change: [+69.461% +175.35% +332.28%] (p = 0.01 < 0.05)
                        Performance has regressed.

benchmark low cardinality
                        time:   [10.736 ms 11.140 ms 11.577 ms]
                        change: [-35.106% -31.565% -28.118%] (p = 0.00 < 0.05)
                        Performance has improved.

This PR has slightly regression for low cardinality but huge gain for high cardinality


criterion_group! {
name = benches;
// This can be any expression that returns a `Criterion` object.
config = Criterion::default().sample_size(10);
targets = criterion_benchmark
}
criterion_main!(benches);
118 changes: 72 additions & 46 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,55 +956,81 @@ impl DefaultPhysicalPlanner {
let (aggregates, filters, _order_bys): (Vec<_>, Vec<_>, Vec<_>) =
multiunzip(agg_filter);

let initial_aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates.clone(),
filters.clone(),
input_exec,
physical_input_schema.clone(),
)?);

// update group column indices based on partial aggregate plan evaluation
let final_group: Vec<Arc<dyn PhysicalExpr>> =
initial_aggr.output_group_expr();

let can_repartition = !groups.is_empty()
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_aggregations();

// Some aggregators may be modified during initialization for
// optimization purposes. For example, a FIRST_VALUE may turn
// into a LAST_VALUE with the reverse ordering requirement.
// To reflect such changes to subsequent stages, use the updated
// `AggregateExpr`/`PhysicalSortExpr` objects.
let updated_aggregates = initial_aggr.aggr_expr().to_vec();

let next_partition_mode = if can_repartition {
// construct a second aggregation with 'AggregateMode::FinalPartitioned'
AggregateMode::FinalPartitioned
if group_expr.len() > 1 {
let can_repartition = !groups.is_empty()
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_aggregations();

let mode = if can_repartition {
// construct a second aggregation with 'AggregateMode::FinalPartitioned'
AggregateMode::SinglePartitioned
} else {
// construct a second aggregation, keeping the final column name equal to the
// first aggregation and the expressions corresponding to the respective aggregate
AggregateMode::Single
};

let aggr = AggregateExec::try_new(
mode,
groups.clone(),
aggregates.clone(),
filters.clone(),
input_exec.clone(),
physical_input_schema.clone(),
)?;
Arc::new(aggr)
} else {
// construct a second aggregation, keeping the final column name equal to the
// first aggregation and the expressions corresponding to the respective aggregate
AggregateMode::Final
};
let initial_aggr: Arc<AggregateExec> =
Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates.clone(),
filters.clone(),
input_exec,
physical_input_schema.clone(),
)?);

let final_grouping_set = PhysicalGroupBy::new_single(
final_group
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), groups.expr()[i].1.clone()))
.collect(),
);
// update group column indices based on partial aggregate plan evaluation
let final_group: Vec<Arc<dyn PhysicalExpr>> =
initial_aggr.output_group_expr();

Arc::new(AggregateExec::try_new(
next_partition_mode,
final_grouping_set,
updated_aggregates,
filters,
initial_aggr,
physical_input_schema.clone(),
)?)
let can_repartition = !groups.is_empty()
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_aggregations();

// Some aggregators may be modified during initialization for
// optimization purposes. For example, a FIRST_VALUE may turn
// into a LAST_VALUE with the reverse ordering requirement.
// To reflect such changes to subsequent stages, use the updated
// `AggregateExpr`/`PhysicalSortExpr` objects.
let updated_aggregates = initial_aggr.aggr_expr().to_vec();

let next_partition_mode = if can_repartition {
// construct a second aggregation with 'AggregateMode::FinalPartitioned'
AggregateMode::FinalPartitioned
} else {
// construct a second aggregation, keeping the final column name equal to the
// first aggregation and the expressions corresponding to the respective aggregate
AggregateMode::Final
};

let final_grouping_set = PhysicalGroupBy::new_single(
final_group
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), groups.expr()[i].1.clone()))
.collect(),
);

Arc::new(AggregateExec::try_new(
next_partition_mode,
final_grouping_set,
updated_aggregates,
filters,
initial_aggr,
physical_input_schema.clone(),
)?)
}
}
LogicalPlan::Projection(Projection { input, expr, .. }) => self
.create_project_physical_exec(
Expand Down
4 changes: 3 additions & 1 deletion datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,9 @@ async fn roundtrip_logical_plan_copy_to_parquet() -> Result<()> {
// Set specific Parquet format options
let mut key_value_metadata = HashMap::new();
key_value_metadata.insert("test".to_string(), Some("test".to_string()));
parquet_format.key_value_metadata = key_value_metadata.clone();
parquet_format
.key_value_metadata
.clone_from(&key_value_metadata);

parquet_format.global.allow_single_file_parallelism = false;
parquet_format.global.created_by = "test".to_string();
Expand Down
39 changes: 13 additions & 26 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4568,11 +4568,8 @@ logical_plan
03)----TableScan: aggregate_test_100 projection=[c2, c3]
physical_plan
01)GlobalLimitExec: skip=4, fetch=5
02)--AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[], lim=[9]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[], lim=[9]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], has_header=true
02)--AggregateExec: mode=Single, gby=[c2@0 as c2, c3@1 as c3], aggr=[], lim=[9]
03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], has_header=true

query II
SELECT c2, c3 FROM aggregate_test_100 group by c2, c3 limit 5 offset 4;
Expand Down Expand Up @@ -4601,13 +4598,12 @@ physical_plan
04)------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[4]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------ProjectionExec: expr=[c3@1 as c3]
07)------------AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
07)------------AggregateExec: mode=Single, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
08)--------------CoalescePartitionsExec
09)----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
10)------------------CoalesceBatchesExec: target_batch_size=8192
11)--------------------FilterExec: c3@1 >= 10 AND c3@1 <= 20
12)----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
13)------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], has_header=true
09)----------------CoalesceBatchesExec: target_batch_size=8192
10)------------------FilterExec: c3@1 >= 10 AND c3@1 <= 20
11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
12)----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], has_header=true

query I
SELECT DISTINCT c3 FROM aggregate_test_100 WHERE c3 between 10 and 20 group by c2, c3 limit 4;
Expand All @@ -4629,11 +4625,8 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[MAX(aggregate_test_100.c1)@2 as MAX(aggregate_test_100.c1), c2@0 as c2, c3@1 as c3]
02)--GlobalLimitExec: skip=0, fetch=5
03)----AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[MAX(aggregate_test_100.c1)]
04)------CoalescePartitionsExec
05)--------AggregateExec: mode=Partial, gby=[c2@1 as c2, c3@2 as c3], aggr=[MAX(aggregate_test_100.c1)]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true
03)----AggregateExec: mode=Single, gby=[c2@1 as c2, c3@2 as c3], aggr=[MAX(aggregate_test_100.c1)]
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true

# TODO(msirek): Extend checking in LimitedDistinctAggregation equal groupings to ignore the order of columns
# in the group-by column lists, so the limit could be pushed to the lowest AggregateExec in this case
Expand All @@ -4648,16 +4641,10 @@ logical_plan
05)--------TableScan: aggregate_test_100 projection=[c2, c3]
physical_plan
01)GlobalLimitExec: skip=10, fetch=3
02)--AggregateExec: mode=Final, gby=[c3@0 as c3, c2@1 as c2], aggr=[], lim=[13]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[c3@0 as c3, c2@1 as c2], aggr=[], lim=[13]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------ProjectionExec: expr=[c3@1 as c3, c2@0 as c2]
07)------------AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
08)--------------CoalescePartitionsExec
09)----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)--------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], has_header=true
02)--AggregateExec: mode=Single, gby=[c3@0 as c3, c2@1 as c2], aggr=[], lim=[13]
03)----ProjectionExec: expr=[c3@1 as c3, c2@0 as c2]
04)------AggregateExec: mode=Single, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], has_header=true

query II
SELECT DISTINCT c3, c2 FROM aggregate_test_100 group by c2, c3 limit 3 offset 10;
Expand Down
Loading