Skip to content

Commit

Permalink
rename the on/off option to enable_aggregation_intermediate_states_bl…
Browse files Browse the repository at this point in the history
…ocked_approach.
  • Loading branch information
Rachelint committed Sep 1, 2024
1 parent b7a443a commit 0cff3be
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 99 deletions.
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ config_namespace! {
/// we allocate a new block (also with the same predefined block size based capacity)
// instead of expanding the current one and copying the data.
/// We plan to make this the default in the future when tests are enough.
pub enable_aggregation_group_states_blocked_approach: bool, default = false
pub enable_aggregation_intermediate_states_blocked_approach: bool, default = false
}
}

Expand Down
16 changes: 8 additions & 8 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ pub(crate) struct GroupedHashAggregateStream {
skip_aggregation_probe: Option<SkipAggregationProbe>,

/// Have we enabled the blocked optimization for group values and accumulators.
enable_blocked_group_states: bool,
is_blocked_approach_on: bool,

// ========================================================================
// EXECUTION RESOURCES:
Expand Down Expand Up @@ -613,7 +613,7 @@ impl GroupedHashAggregateStream {
spill_state,
group_values_soft_limit: agg.limit,
skip_aggregation_probe,
enable_blocked_group_states,
is_blocked_approach_on: enable_blocked_group_states,
})
}
}
Expand Down Expand Up @@ -641,7 +641,7 @@ fn maybe_enable_blocked_group_states(
.session_config()
.options()
.execution
.enable_aggregation_group_states_blocked_approach
.enable_aggregation_intermediate_states_blocked_approach
|| !matches!(group_ordering, GroupOrdering::None)
|| accumulators.is_empty()
|| enable_spilling(context.runtime_env().disk_manager.as_ref())
Expand Down Expand Up @@ -1082,7 +1082,7 @@ impl GroupedHashAggregateStream {
&& matches!(self.mode, AggregateMode::Partial)
&& self.update_memory_reservation().is_err()
{
if !self.enable_blocked_group_states {
if !self.is_blocked_approach_on {
let n = self.group_values.len() / self.batch_size * self.batch_size;
let batch = self.emit(EmitTo::First(n), false)?;
self.exec_state = ExecutionState::ProducingOutput(batch);
Expand Down Expand Up @@ -1130,12 +1130,12 @@ impl GroupedHashAggregateStream {
// We should disable the blocked optimization for `GroupValues` and `GroupAccumulator`s here,
// because the blocked mode can't support `Emit::First(exact n)` which is needed in
// streaming aggregation.
if self.enable_blocked_group_states {
if self.is_blocked_approach_on {
self.group_values.alter_block_size(None)?;
self.accumulators
.iter_mut()
.try_for_each(|acc| acc.alter_block_size(None))?;
self.enable_blocked_group_states = false;
self.is_blocked_approach_on = false;
}

self.input_done = false;
Expand All @@ -1159,7 +1159,7 @@ impl GroupedHashAggregateStream {
let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
let timer = elapsed_compute.timer();
self.exec_state = if self.spill_state.spills.is_empty() {
if !self.enable_blocked_group_states {
if !self.is_blocked_approach_on {
let batch = self.emit(EmitTo::All, false)?;
ExecutionState::ProducingOutput(batch)
} else {
Expand Down Expand Up @@ -1196,7 +1196,7 @@ impl GroupedHashAggregateStream {
fn switch_to_skip_aggregation(&mut self) -> Result<()> {
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
if probe.should_skip() {
if !self.enable_blocked_group_states {
if !self.is_blocked_approach_on {
let batch = self.emit(EmitTo::All, false)?;
self.exec_state = ExecutionState::ProducingOutput(batch);
} else {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ datafusion.execution.aggregate.scalar_update_factor 10
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics false
datafusion.execution.enable_aggregation_group_states_blocked_approach false
datafusion.execution.enable_aggregation_intermediate_states_blocked_approach false
datafusion.execution.enable_recursive_ctes true
datafusion.execution.keep_partition_by_columns false
datafusion.execution.listing_table_ignore_subdirectory true
Expand Down Expand Up @@ -264,7 +264,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold f
datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption
datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting
datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files
datafusion.execution.enable_aggregation_group_states_blocked_approach false Should DataFusion use the the blocked approach to manage the groups values and their related states in accumulators. By default, the single approach will be used, values are managed within a single large block (can think of it as a Vec). As this block grows, it often triggers numerous copies, resulting in poor performance. If setting this flag to `true`, the blocked approach will be used. And the blocked approach allocates capacity for the block based on a predefined block size firstly. When the block reaches its limit, we allocate a new block (also with the same predefined block size based capacity) We plan to make this the default in the future when tests are enough.
datafusion.execution.enable_aggregation_intermediate_states_blocked_approach false Should DataFusion use the the blocked approach to manage the groups values and their related states in accumulators. By default, the single approach will be used, values are managed within a single large block (can think of it as a Vec). As this block grows, it often triggers numerous copies, resulting in poor performance. If setting this flag to `true`, the blocked approach will be used. And the blocked approach allocates capacity for the block based on a predefined block size firstly. When the block reaches its limit, we allocate a new block (also with the same predefined block size based capacity) We plan to make this the default in the future when tests are enough.
datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs
datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches
datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`).
Expand Down
Loading

0 comments on commit 0cff3be

Please sign in to comment.