Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sketch for aggregation intermediate results blocked management #11943

Open
wants to merge 112 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
112 commits
Select commit Hold shift + click to select a range
eb74278
re-design the sketch.
Rachelint Aug 14, 2024
728a62e
disable blocked optimization when hash agg is swtched to streaming ag…
Rachelint Aug 15, 2024
a1f5e2d
fix style.
Rachelint Aug 15, 2024
d7d22f6
impl blocked GroupValuesRows.
Rachelint Aug 15, 2024
e62172a
impl simple blocked mode for Count.
Rachelint Aug 15, 2024
99cd66a
impl simple blocked mod for prim_op and avg.
Rachelint Aug 15, 2024
520b2eb
fix init.
Rachelint Aug 15, 2024
aae2a3b
fix tests.
Rachelint Aug 15, 2024
ed7c1b7
support blocked mode in NullState.
Rachelint Aug 16, 2024
8d9d0c0
define the `blocked_accumulate`, so that we wont change the logic for…
Rachelint Aug 17, 2024
cb53724
impl blocked version prim_op and avg accumulators.
Rachelint Aug 17, 2024
cce049d
fix streming tests.
Rachelint Aug 17, 2024
89da481
impl blocked version count accumulator.
Rachelint Aug 17, 2024
b2fc6d3
fix the special cast that total groups num is zero.
Rachelint Aug 17, 2024
fc5d05b
move `BlockedNullState` to accumulate.rs.
Rachelint Aug 17, 2024
6092943
fix tests.
Rachelint Aug 18, 2024
6fcc831
define the `Blocks` to replace `VecDeque`.
Rachelint Aug 18, 2024
00cfa06
introduce Block to accumulators.
Rachelint Aug 18, 2024
261b0f1
make `Blocks` more general.
Rachelint Aug 18, 2024
53950b9
use `Blocks` in `BlockedNullState`.
Rachelint Aug 18, 2024
6de7fc7
use `Blocks` in `GroupValuesRows`.
Rachelint Aug 18, 2024
1567057
use debug_assert instead of assert.
Rachelint Aug 18, 2024
a780401
refactor `Blocks`.
Rachelint Aug 18, 2024
5fc7bf5
rename `CurrentBlock` to `NextBlock`, and add more comments.
Rachelint Aug 18, 2024
e143c59
minor optimization.
Rachelint Aug 18, 2024
5fb1748
fix comments.
Rachelint Aug 18, 2024
564e6d3
add todos.
Rachelint Aug 18, 2024
7e607eb
reduce repeated codes.
Rachelint Aug 18, 2024
ec9bf21
disable blocked optimization in spilling case, and add comments.
Rachelint Aug 19, 2024
cb8da87
add more comments and remove stale codes.
Rachelint Aug 19, 2024
e054c8b
not try to support spilling in blocked mode in currently.
Rachelint Aug 19, 2024
478627d
improve error messages.
Rachelint Aug 19, 2024
ead0076
add comment for `ProducingBlocks`.
Rachelint Aug 19, 2024
db1adbe
add comments for GroupStatesMode.
Rachelint Aug 19, 2024
03189a7
remove unused import.
Rachelint Aug 19, 2024
49c5e5e
fix clippy.
Rachelint Aug 19, 2024
50e8958
fix clippy.
Rachelint Aug 19, 2024
13e74b0
add comments.
Rachelint Aug 19, 2024
f5684a0
improve comments.
Rachelint Aug 20, 2024
3b63eaa
eliminate the unnecessary `mode check + current_mut`.
Rachelint Aug 20, 2024
5602ded
use index to replace get + unwrap.
Rachelint Aug 21, 2024
8da7806
add comments for some interanl functions.
Rachelint Aug 21, 2024
e8ce09b
remove more unnecessary mode checks.
Rachelint Aug 21, 2024
48c7e4f
fix some comments.
Rachelint Aug 21, 2024
0bbad3a
improve comments about `Block`.
Rachelint Aug 21, 2024
0ab53dc
move `Blocks`, `BlockedIndex`, some functions of `Emit` to `datafusio…
Rachelint Aug 21, 2024
78c8e82
add test for `BlockedNullState`.
Rachelint Aug 21, 2024
f54878f
fix `BlockedNullState`'s unit test.
Rachelint Aug 21, 2024
56b0bcf
add unit tests for blocks.
Rachelint Aug 22, 2024
94af694
add unit test for `ensure_enough_room_for_blocked_nulls`.
Rachelint Aug 22, 2024
1064a72
test take needed.
Rachelint Aug 22, 2024
2b0796a
fix clippy.
Rachelint Aug 22, 2024
46b10b4
merge two modes to one.
Rachelint Aug 25, 2024
aef6c49
experiment.
Rachelint Aug 25, 2024
127a6e7
use function point to replace trait.
Rachelint Aug 25, 2024
9bffb4a
simplify function pointer.
Rachelint Aug 25, 2024
c8c0fee
init the function point during new.
Rachelint Aug 25, 2024
3e409ba
tmp.
Rachelint Aug 25, 2024
25269f4
tmp3
Rachelint Aug 25, 2024
4e2b9bc
tmp4
Rachelint Aug 25, 2024
07deb39
tmp5.
Rachelint Aug 25, 2024
5101165
just keep the if else.
Rachelint Aug 25, 2024
e10a4bb
remove some unnecessary codes.
Rachelint Aug 25, 2024
cc117d4
adapt the new great comments.
Rachelint Aug 25, 2024
c65b808
add option and disable the blocked optimization by default.
Rachelint Aug 25, 2024
0a26de8
remove codes about outdated `switch_to_mdoe`.
Rachelint Aug 25, 2024
96f8be8
fix test.
Rachelint Aug 25, 2024
6535b93
fix clippy.
Rachelint Aug 25, 2024
781d00c
try to eliminate more dynamic dispatch.
Rachelint Aug 25, 2024
3316f8f
continue to eliminate more dynamic dispatch.
Rachelint Aug 25, 2024
8a8e799
test.
Rachelint Aug 25, 2024
921cad7
fix clippy.
Rachelint Aug 25, 2024
8c5afa9
fix sql logic tests.
Rachelint Aug 25, 2024
e4dd31a
add options to enable blocked apporach in benmarks.
Rachelint Aug 25, 2024
db431cb
fix fmt and clippy.
Rachelint Aug 25, 2024
f2e316a
fix tpch opts.
Rachelint Aug 25, 2024
8b8da5e
fix comments of `BlockedGroupIndex`.
Rachelint Aug 25, 2024
21e5fdf
update config.
Rachelint Aug 25, 2024
8d5cb7f
fix docs.
Rachelint Aug 25, 2024
b33c6f9
use the right way to check if spilling enalbed, and support `emit_ear…
Rachelint Aug 25, 2024
fd54e1c
unify ensure_enough_room_for_xxx and add tests.
Rachelint Aug 27, 2024
addcc13
fix ensure_enough_room_for_xxx.
Rachelint Aug 27, 2024
5a2292d
add physical level test for blocked approach.
Rachelint Aug 27, 2024
207a777
extract `run_aggregate_test_internal` for resuing later.
Rachelint Aug 27, 2024
6a4cf5b
add simple fuzz test for blocked approach.
Rachelint Aug 27, 2024
f1855ee
Merge branch 'main' into sketch-blocked-aggr-state-management
Rachelint Aug 27, 2024
551690d
don't support the `blocked approach` in bench until it compatible wit…
Rachelint Aug 27, 2024
4a97d35
fix clippy.
Rachelint Aug 28, 2024
6f877e3
Merge branch 'main' into sketch-blocked-aggr-state-management
Rachelint Aug 28, 2024
189b4c3
merge main and fix compile.
Rachelint Aug 28, 2024
11870cb
fix clippy.
Rachelint Aug 28, 2024
6ecd81b
fix clippy.
Rachelint Aug 28, 2024
36e0791
add comments to architecture about blocked approach.
Rachelint Aug 29, 2024
4426307
fix comments.
Rachelint Aug 29, 2024
886bb20
fix typo.
Rachelint Aug 29, 2024
c2cb573
fix docs.
Rachelint Aug 30, 2024
ef91012
a unified and low cost way to compute the different type `BlockedGrou…
Rachelint Sep 1, 2024
31356d8
add test to `BlockedGroupIndexBuilder`.
Rachelint Sep 1, 2024
5d2ac01
add more inlines.
Rachelint Sep 1, 2024
0a7b52b
fix clippy.
Rachelint Sep 1, 2024
318c650
improve config comments.
Rachelint Sep 1, 2024
3d82094
remove deprecated function.
Rachelint Sep 1, 2024
b7a443a
improve docs.
Rachelint Sep 1, 2024
0cff3be
rename the on/off option to enable_aggregation_intermediate_states_bl…
Rachelint Sep 1, 2024
a2d81a5
fix doc.
Rachelint Sep 1, 2024
1db8633
fix fmt and tests.
Rachelint Sep 1, 2024
5907b8b
Merge branch 'main' into sketch-blocked-aggr-state-management
Rachelint Sep 2, 2024
6613288
update docs.
Rachelint Sep 3, 2024
cbafbc5
fix fmt.
Rachelint Sep 3, 2024
d258ea9
fix compile.
Rachelint Sep 3, 2024
4a48d3a
Merge branch 'main' into sketch-blocked-aggr-state-management
Rachelint Sep 9, 2024
7b61328
Merge branch 'main' into sketch-blocked-aggr-state-management
Rachelint Sep 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ dashmap = { workspace = true }
datafusion = { workspace = true, default-features = true, features = ["avro"] }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
datafusion-optimizer = { workspace = true, default-features = true }
datafusion-physical-expr = { workspace = true, default-features = true }
datafusion-proto = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use arrow_schema::{Field, Schema};
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::EmitToExt;
use datafusion_physical_expr::NullState;
use std::{any::Any, sync::Arc};

Expand Down
13 changes: 13 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,19 @@ config_namespace! {
/// if the source of statistics is accurate.
/// We plan to make this the default in the future.
pub use_row_number_estimates_to_optimize_partitioning: bool, default = false

/// Should DataFusion use the the blocked approach to manage the groups
/// values and their related states in accumulators. By default, the single
/// approach will be used, values are managed within a single large block
/// (can think of it as a Vec). As this block grows, it often triggers
/// numerous copies, resulting in poor performance.
/// If setting this flag to `true`, the blocked approach will be used.
/// And the blocked approach allocates capacity for the block
/// based on a predefined block size firstly. When the block reaches its limit,
/// we allocate a new block (also with the same predefined block size based capacity)
// instead of expanding the current one and copying the data.
/// We plan to make this the default in the future when tests are enough.
pub enable_aggregation_intermediate_states_blocked_approach: bool, default = false
}
}

Expand Down
223 changes: 199 additions & 24 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,20 @@ use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::{collect, displayable, ExecutionPlan};
use datafusion::prelude::{DataFrame, SessionConfig, SessionContext};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::ScalarValue;
use datafusion_execution::disk_manager::DiskManagerConfig;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_execution::TaskContext;
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::InputOrderMode;
use rand::seq::SliceRandom;
use test_utils::{add_empty_batches, StringBatchGenerator};

use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use rand::{thread_rng, Rng, SeedableRng};
use tokio::task::JoinSet;

/// Tests that streaming aggregate and batch (non streaming) aggregate produce
Expand All @@ -65,7 +70,7 @@ async fn streaming_aggregate_test() {
for i in 0..n {
let test_idx = i % test_cases.len();
let group_by_columns = test_cases[test_idx].clone();
join_set.spawn(run_aggregate_test(
join_set.spawn(run_streaming_aggregate_test(
make_staggered_batches::<true>(1000, distinct, i as u64),
group_by_columns,
));
Expand All @@ -77,13 +82,59 @@ async fn streaming_aggregate_test() {
}
}

/// Tests that streaming aggregate and batch (non streaming) aggregate produce
/// same results
#[tokio::test(flavor = "multi_thread")]
async fn blocked_approach_aggregate_test() {
let test_cases = [
vec!["a"],
vec!["b", "a"],
vec!["c", "a"],
vec!["c", "b", "a"],
vec!["d", "a"],
vec!["d", "b", "a"],
vec!["d", "c", "a"],
vec!["d", "c", "b", "a"],
];

let n_batch_size = 10;
let mut rng = thread_rng();
let mut all_batch_sizes = (1..=50_usize).collect::<Vec<_>>();
all_batch_sizes.shuffle(&mut rng);
let batch_sizes = &all_batch_sizes[0..n_batch_size];

let n = 300;
let distincts = vec![10, 20];
for distinct in distincts {
let mut join_set = JoinSet::new();
for batch_size in batch_sizes {
for i in 0..n {
let test_idx = i % test_cases.len();
let group_by_columns = test_cases[test_idx].clone();
join_set.spawn(run_blocked_approach_aggregate_test(
make_staggered_batches::<true>(1000, distinct, i as u64),
group_by_columns,
*batch_size,
));
}
}
while let Some(join_handle) = join_set.join_next().await {
// propagate errors
join_handle.unwrap();
}
}
}

/// Perform batch and streaming aggregation with same input
/// and verify outputs of `AggregateExec` with pipeline breaking stream `GroupedHashAggregateStream`
/// and non-pipeline breaking stream `BoundedAggregateStream` produces same result.
async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str>) {
let schema = input1[0].schema();
let session_config = SessionConfig::new().with_batch_size(50);
let ctx = SessionContext::new_with_config(session_config);
async fn run_streaming_aggregate_test(
test_data: Vec<RecordBatch>,
group_by_columns: Vec<&str>,
) {
let schema = test_data[0].schema();

// Define test data source exec
let mut sort_keys = vec![];
for ordering_col in ["a", "b", "c"] {
sort_keys.push(PhysicalSortExpr {
Expand All @@ -92,17 +143,138 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
})
}

let concat_input_record = concat_batches(&schema, &input1).unwrap();
let concat_input_record = concat_batches(&schema, &test_data).unwrap();
let usual_source = Arc::new(
MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None).unwrap(),
);

let running_source = Arc::new(
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
MemoryExec::try_new(&[test_data.clone()], schema.clone(), None)
.unwrap()
.with_sort_information(vec![sort_keys]),
);

// Define test task ctx
let session_config = SessionConfig::new().with_batch_size(50);
let ctx = SessionContext::new_with_config(session_config);

// Run and check
let usual_aggr_ctx = AggrTestContext {
data_source_exec: usual_source,
task_ctx: ctx.task_ctx(),
};

let running_aggr_ctx = AggrTestContext {
data_source_exec: running_source,
task_ctx: ctx.task_ctx(),
};

run_aggregate_test_internal(
test_data,
usual_aggr_ctx,
running_aggr_ctx,
|collected_usual, collected_running| {
assert!(collected_running.len() > 2);
// Running should produce more chunk than the usual AggregateExec.
// Otherwise it means that we cannot generate result in running mode.
assert!(collected_running.len() > collected_usual.len());
},
group_by_columns,
)
.await;
}

/// Perform batch and blocked approach aggregations, and then verify their outputs.
async fn run_blocked_approach_aggregate_test(
test_data: Vec<RecordBatch>,
group_by_columns: Vec<&str>,
batch_size: usize,
) {
let schema = test_data[0].schema();

// Define test data source exec
let concat_input_record = concat_batches(&schema, &test_data).unwrap();
let usual_source = Arc::new(
MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None).unwrap(),
);

let running_source = Arc::new(
MemoryExec::try_new(&[test_data.clone()], schema.clone(), None).unwrap(),
);

// Define test task ctx
// Usual task ctx
let mut session_config = SessionConfig::default();
session_config = session_config.set(
"datafusion.execution.batch_size",
&ScalarValue::UInt64(Some(batch_size as u64)),
);
let usual_ctx = Arc::new(TaskContext::default().with_session_config(session_config));

// Running task ctx
let mut session_config = SessionConfig::default();
session_config = session_config.set(
"datafusion.execution.enable_aggregation_intermediate_states_blocked_approach",
&ScalarValue::Boolean(Some(true)),
);
session_config = session_config.set(
"datafusion.execution.batch_size",
&ScalarValue::UInt64(Some(batch_size as u64)),
);

let runtime = Arc::new(
RuntimeEnv::new(
RuntimeConfig::default().with_disk_manager(DiskManagerConfig::Disabled),
)
.unwrap(),
);
let running_ctx = Arc::new(
TaskContext::default()
.with_session_config(session_config)
.with_runtime(runtime),
);

// Run and check
let usual_aggr_ctx = AggrTestContext {
data_source_exec: usual_source,
task_ctx: usual_ctx,
};

let running_aggr_ctx = AggrTestContext {
data_source_exec: running_source,
task_ctx: running_ctx,
};

run_aggregate_test_internal(
test_data,
usual_aggr_ctx,
running_aggr_ctx,
|_, _| {},
group_by_columns,
)
.await;
}

/// Options of the fuzz aggregation tests
struct AggrTestContext {
data_source_exec: Arc<dyn ExecutionPlan>,
task_ctx: Arc<TaskContext>,
}

/// The internal test function for performing normal aggregation
/// and other optimized aggregations (without any optimizations,
/// e.g. streaming, blocked approach), and verify outputs of them.
async fn run_aggregate_test_internal<C>(
test_data: Vec<RecordBatch>,
left_aggr_ctx: AggrTestContext,
right_aggr_ctx: AggrTestContext,
extra_checks: C,
group_by_columns: Vec<&str>,
) where
C: Fn(&[RecordBatch], &[RecordBatch]),
{
let schema = test_data[0].schema();

let aggregate_expr =
vec![
AggregateExprBuilder::new(sum_udaf(), vec![col("d", &schema).unwrap()])
Expand All @@ -117,42 +289,44 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
.collect::<Vec<_>>();
let group_by = PhysicalGroupBy::new_single(expr);

let aggregate_exec_running = Arc::new(
let aggregate_exec_usual = Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
group_by.clone(),
aggregate_expr.clone(),
vec![None],
running_source,
left_aggr_ctx.data_source_exec.clone(),
schema.clone(),
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;

let aggregate_exec_usual = Arc::new(
let aggregate_exec_running = Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
group_by.clone(),
aggregate_expr.clone(),
vec![None],
usual_source,
right_aggr_ctx.data_source_exec.clone(),
schema.clone(),
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;

let task_ctx = ctx.task_ctx();
let collected_usual = collect(aggregate_exec_usual.clone(), task_ctx.clone())
.await
.unwrap();
let collected_usual =
collect(aggregate_exec_usual.clone(), left_aggr_ctx.task_ctx.clone())
.await
.unwrap();

let collected_running = collect(
aggregate_exec_running.clone(),
right_aggr_ctx.task_ctx.clone(),
)
.await
.unwrap();

extra_checks(&collected_usual, &collected_running);

let collected_running = collect(aggregate_exec_running.clone(), task_ctx.clone())
.await
.unwrap();
assert!(collected_running.len() > 2);
// Running should produce more chunk than the usual AggregateExec.
// Otherwise it means that we cannot generate result in running mode.
assert!(collected_running.len() > collected_usual.len());
// compare
let usual_formatted = pretty_format_batches(&collected_usual).unwrap().to_string();
let running_formatted = pretty_format_batches(&collected_running)
Expand Down Expand Up @@ -187,7 +361,7 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
displayable(aggregate_exec_running.as_ref()).indent(false),
usual_formatted,
running_formatted,
pretty_format_batches(&input1).unwrap(),
pretty_format_batches(&test_data).unwrap(),
);
}
}
Expand Down Expand Up @@ -311,6 +485,7 @@ async fn group_by_string_test(
let actual = extract_result_counts(results);
assert_eq!(expected, actual);
}

async fn verify_ordered_aggregate(frame: &DataFrame, expected_sort: bool) {
struct Visitor {
expected_sort: bool,
Expand Down
Loading