Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ config_namespace! {
/// concurrency.
///
/// Defaults to the number of CPU cores on the system
pub target_partitions: usize, default = get_available_parallelism()
pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()

/// The default time zone
///
Expand All @@ -316,7 +316,7 @@ config_namespace! {
/// This is mostly use to plan `UNION` children in parallel.
///
/// Defaults to the number of CPU cores on the system
pub planning_concurrency: usize, default = get_available_parallelism()
pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()

/// When set to true, skips verifying that the schema produced by
/// planning the input of `LogicalPlan::Aggregate` exactly matches the
Expand Down Expand Up @@ -739,6 +739,19 @@ config_namespace! {
}
}

impl ExecutionOptions {
/// Returns the correct parallelism based on the provided `value`.
/// If `value` is `"0"`, returns the default available parallelism, computed with
/// `get_available_parallelism`. Otherwise, returns `value`.
fn normalized_parallelism(value: &str) -> String {
if value.parse::<usize>() == Ok(0) {
get_available_parallelism().to_string()
} else {
value.to_owned()
}
}
}

/// A key value pair, with a corresponding description
#[derive(Debug)]
pub struct ConfigEntry {
Expand Down
8 changes: 5 additions & 3 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,11 @@ impl SessionConfig {
///
/// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
pub fn with_target_partitions(mut self, n: usize) -> Self {
// partition count must be greater than zero
assert!(n > 0);
self.options.execution.target_partitions = n;
self.options.execution.target_partitions = if n == 0 {
datafusion_common::config::ExecutionOptions::default().target_partitions
} else {
n
};
self
}

Expand Down
33 changes: 33 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,39 @@ drop table t
statement ok
drop table t2


############
## 0 to represent the default value (target_partitions and planning_concurrency)
###########

statement ok
SET datafusion.execution.target_partitions = 3;

statement ok
SET datafusion.execution.planning_concurrency = 3;

# when setting target_partitions and planning_concurrency to 3, their values will be 3
query TB rowsort
SELECT name, value = 3 FROM information_schema.df_settings WHERE name IN ('datafusion.execution.target_partitions', 'datafusion.execution.planning_concurrency');
----
datafusion.execution.planning_concurrency true
datafusion.execution.target_partitions true

statement ok
SET datafusion.execution.target_partitions = 0;

statement ok
SET datafusion.execution.planning_concurrency = 0;

# when setting target_partitions and planning_concurrency to 0, their values will be equal to the
# default values, which are different from 0 (which is invalid)
query TB rowsort
SELECT name, value = 0 FROM information_schema.df_settings WHERE name IN ('datafusion.execution.target_partitions', 'datafusion.execution.planning_concurrency');
----
datafusion.execution.planning_concurrency false
datafusion.execution.target_partitions false


############
## SHOW VARIABLES should work
###########
Expand Down