diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index 2baced0927..65f8c08aff 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -39,23 +39,18 @@ process, and by Spark itself. The size of the pool is specified by `spark.memory This option is automatically enabled when `spark.memory.offHeap.enabled=false`. -Each native plan has a dedicated memory pool. +By default, the total allocation per executor is `spark.comet.memory.overhead.factor * spark.executor.memory`. The +default value for `spark.comet.memory.overhead.factor` is `0.2`. -By default, the size of each pool is `spark.comet.memory.overhead.factor * spark.executor.memory`. The default value -for `spark.comet.memory.overhead.factor` is `0.2`. - -It is important to take executor concurrency into account. The maximum number of concurrent plans in an executor can -be calculated with `spark.executor.cores / spark.task.cpus`. - -For example, if the executor can execute 4 plans concurrently, then the total amount of memory allocated will be -`4 * spark.comet.memory.overhead.factor * spark.executor.memory`. - -It is also possible to set `spark.comet.memoryOverhead` to the desired size for each pool, rather than calculating +It is also possible to set `spark.comet.memoryOverhead` to the desired size for each executor, rather than calculating it based on `spark.comet.memory.overhead.factor`. If both `spark.comet.memoryOverhead` and `spark.comet.memory.overhead.factor` are set, the former will be used. -Comet will allocate at least `spark.comet.memory.overhead.min` memory per pool. +Comet will allocate at least `spark.comet.memory.overhead.min` memory per executor. + +The total allocation will be split into smaller per-task allocations based on `spark.executor.cores` and +`spark.task.cpus`. The algorithm used is `total_mem_allocation * spark.task.cpus / spark.executor.cores`. ### Determining How Much Memory to Allocate diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index e45897a92b..0f3eef8be8 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -51,6 +51,7 @@ use crate::{ }; use datafusion_comet_proto::spark_operator::Operator; use datafusion_common::ScalarValue; +use datafusion_execution::memory_pool::FairSpillPool; use futures::stream::StreamExt; use jni::{ objects::GlobalRef, @@ -209,16 +210,11 @@ fn prepare_datafusion_session_context( rt_config = rt_config.with_memory_pool(Arc::new(memory_pool)); } else { // Use the memory pool from DF - if conf.contains_key("memory_limit") { - let memory_limit = conf.get("memory_limit").unwrap().parse::()?; - let memory_fraction = conf - .get("memory_fraction") - .ok_or(CometError::Internal( - "Config 'memory_fraction' is not specified from Comet JVM side".to_string(), - ))? - .parse::()?; - rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction) - } + let memory_limit = conf + .get("memory_limit_per_task") + .unwrap() + .parse::()?; + rt_config = rt_config.with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))); } // Get Datafusion configuration from Spark Execution context diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 07dd80c39e..1edad9bd8e 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -23,7 +23,7 @@ import org.apache.spark._ import org.apache.spark.sql.comet.CometMetricNode import org.apache.spark.sql.vectorized._ -import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} +import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} import org.apache.comet.vector.NativeUtil /** @@ -78,12 +78,16 @@ class CometExecIterator( val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf) // Only enable unified memory manager when off-heap mode is enabled. Otherwise, // we'll use the built-in memory pool from DF, and initializes with `memory_limit` - // and `memory_fraction` below. - result.put( - "use_unified_memory_manager", - String.valueOf(conf.get("spark.memory.offHeap.enabled", "false"))) - result.put("memory_limit", String.valueOf(maxMemory)) - result.put("memory_fraction", String.valueOf(COMET_EXEC_MEMORY_FRACTION.get())) + val offHeapEnabled = conf.get("spark.memory.offHeap.enabled", "false").toBoolean + result.put("use_unified_memory_manager", String.valueOf(offHeapEnabled)) + if (!offHeapEnabled) { + val numCores = conf.get("spark.executor.cores", "1").toFloat + val coresPerTask = conf.get("spark.task.cpus", "1").toFloat + // example 16GB maxMemory * 16 cores with 4 cores per task results + // in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB + val memPerTask = maxMemory.toFloat * coresPerTask / numCores + result.put("memory_limit_per_task", String.valueOf(memPerTask.toInt)) + } result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get())) result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get())) result.put("explain_native", String.valueOf(COMET_EXPLAIN_NATIVE_ENABLED.get()))