diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 8c099d167e..a65da43a56 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -504,8 +504,8 @@ object CometConf extends ShimCometConf { .doc( "The type of memory pool to be used for Comet native execution. " + "Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', " + - "'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global'. For off-heap " + - "types are 'unified' and `fair_unified`.") + "'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. " + + "For off-heap types are 'unified' and `fair_unified`.") .stringConf .createWithDefault("greedy_task_shared") diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 60e2e5dfc4..6874b8d5f0 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -48,7 +48,7 @@ Comet provides the following configuration settings. | spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true | | spark.comet.exec.initCap.enabled | Whether to enable initCap by default. | false | | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true | -| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global'. For off-heap types are 'unified' and `fair_unified`. | greedy_task_shared | +| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. For off-heap types are 'unified' and `fair_unified`. | greedy_task_shared | | spark.comet.exec.project.enabled | Whether to enable project by default. | true | | spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false | | spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 | diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index fb0b62a727..1b11d9b25e 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -26,7 +26,7 @@ use datafusion::{ prelude::{SessionConfig, SessionContext}, }; use datafusion_execution::memory_pool::{ - FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, + FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool, }; use futures::poll; use jni::{ @@ -118,6 +118,7 @@ enum MemoryPoolType { FairSpillTaskShared, GreedyGlobal, FairSpillGlobal, + Unbounded, } struct MemoryPoolConfig { @@ -319,6 +320,7 @@ fn parse_memory_pool_config( "greedy_global" => MemoryPoolConfig::new(MemoryPoolType::GreedyGlobal, pool_size), "fair_spill" => MemoryPoolConfig::new(MemoryPoolType::FairSpill, pool_size_per_task), "greedy" => MemoryPoolConfig::new(MemoryPoolType::Greedy, pool_size_per_task), + "unbounded" => MemoryPoolConfig::new(MemoryPoolType::Unbounded, 0), _ => { return Err(CometError::Config(format!( "Unsupported memory pool type: {}", @@ -397,6 +399,7 @@ fn create_memory_pool( per_task_memory_pool.num_plans += 1; Arc::clone(&per_task_memory_pool.memory_pool) } + MemoryPoolType::Unbounded => Arc::new(UnboundedMemoryPool::default()), } }