diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1c746a4e98405..1e0f63d6d81ca 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -300,7 +300,7 @@ config_namespace! { /// concurrency. /// /// Defaults to the number of CPU cores on the system - pub target_partitions: usize, default = get_available_parallelism() + pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism() /// The default time zone /// @@ -316,7 +316,7 @@ config_namespace! { /// This is mostly use to plan `UNION` children in parallel. /// /// Defaults to the number of CPU cores on the system - pub planning_concurrency: usize, default = get_available_parallelism() + pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism() /// When set to true, skips verifying that the schema produced by /// planning the input of `LogicalPlan::Aggregate` exactly matches the @@ -739,6 +739,19 @@ config_namespace! { } } +impl ExecutionOptions { + /// Returns the correct parallelism based on the provided `value`. + /// If `value` is `"0"`, returns the default available parallelism, computed with + /// `get_available_parallelism`. Otherwise, returns `value`. + fn normalized_parallelism(value: &str) -> String { + if value.parse::() == Ok(0) { + get_available_parallelism().to_string() + } else { + value.to_owned() + } + } +} + /// A key value pair, with a corresponding description #[derive(Debug)] pub struct ConfigEntry { diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index 53646dc5b468e..1e00a1ce4725e 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -193,9 +193,11 @@ impl SessionConfig { /// /// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions pub fn with_target_partitions(mut self, n: usize) -> Self { - // partition count must be greater than zero - assert!(n > 0); - self.options.execution.target_partitions = n; + self.options.execution.target_partitions = if n == 0 { + datafusion_common::config::ExecutionOptions::default().target_partitions + } else { + n + }; self } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 4964bcbc735cd..87abaadb516f3 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -149,6 +149,39 @@ drop table t statement ok drop table t2 + +############ +## 0 to represent the default value (target_partitions and planning_concurrency) +########### + +statement ok +SET datafusion.execution.target_partitions = 3; + +statement ok +SET datafusion.execution.planning_concurrency = 3; + +# when setting target_partitions and planning_concurrency to 3, their values will be 3 +query TB rowsort +SELECT name, value = 3 FROM information_schema.df_settings WHERE name IN ('datafusion.execution.target_partitions', 'datafusion.execution.planning_concurrency'); +---- +datafusion.execution.planning_concurrency true +datafusion.execution.target_partitions true + +statement ok +SET datafusion.execution.target_partitions = 0; + +statement ok +SET datafusion.execution.planning_concurrency = 0; + +# when setting target_partitions and planning_concurrency to 0, their values will be equal to the +# default values, which are different from 0 (which is invalid) +query TB rowsort +SELECT name, value = 0 FROM information_schema.df_settings WHERE name IN ('datafusion.execution.target_partitions', 'datafusion.execution.planning_concurrency'); +---- +datafusion.execution.planning_concurrency false +datafusion.execution.target_partitions false + + ############ ## SHOW VARIABLES should work ###########