diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index 2baced0927..ac4f11da91 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -39,23 +39,18 @@ process, and by Spark itself. The size of the pool is specified by `spark.memory This option is automatically enabled when `spark.memory.offHeap.enabled=false`. -Each native plan has a dedicated memory pool. +Each executor will have a single memory pool which will be shared by all native plans being executed within that +process. Unlike Unified Memory Management, this pool is not shared with Spark. -By default, the size of each pool is `spark.comet.memory.overhead.factor * spark.executor.memory`. The default value +By default, the size of this pool is `spark.comet.memory.overhead.factor * spark.executor.memory`. The default value for `spark.comet.memory.overhead.factor` is `0.2`. -It is important to take executor concurrency into account. The maximum number of concurrent plans in an executor can -be calculated with `spark.executor.cores / spark.task.cpus`. - -For example, if the executor can execute 4 plans concurrently, then the total amount of memory allocated will be -`4 * spark.comet.memory.overhead.factor * spark.executor.memory`. - It is also possible to set `spark.comet.memoryOverhead` to the desired size for each pool, rather than calculating it based on `spark.comet.memory.overhead.factor`. If both `spark.comet.memoryOverhead` and `spark.comet.memory.overhead.factor` are set, the former will be used. -Comet will allocate at least `spark.comet.memory.overhead.min` memory per pool. +Comet will allocate at least `spark.comet.memory.overhead.min` memory per executor. ### Determining How Much Memory to Allocate @@ -124,5 +119,4 @@ then any shuffle operations that cannot be supported in this mode will fall back Comet metrics are not directly comparable to Spark metrics in some cases. `CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to -milliseconds _per batch_ which can result in a large loss of precision. In one case we saw total scan time -of 41 seconds reported as 23 seconds for example. +milliseconds _per batch_ which can result in a large loss of precision, resulting in misleading timings. diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index e45897a92b..2c1bc985b9 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -17,6 +17,7 @@ //! Define JNI APIs which can be called from Java/Scala. +use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; use arrow::datatypes::DataType as ArrowDataType; use arrow_array::RecordBatch; use datafusion::{ @@ -39,8 +40,6 @@ use jni::{ }; use std::{collections::HashMap, sync::Arc, task::Poll}; -use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; - use crate::{ errors::{try_unwrap_or_throw, CometError, CometResult}, execution::{ @@ -199,27 +198,9 @@ fn prepare_datafusion_session_context( let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs); - // Check if we are using unified memory manager integrated with Spark. Default to false if not - // set. - let use_unified_memory_manager = parse_bool(conf, "use_unified_memory_manager")?; - - if use_unified_memory_manager { - // Set Comet memory pool for native - let memory_pool = CometMemoryPool::new(comet_task_memory_manager); - rt_config = rt_config.with_memory_pool(Arc::new(memory_pool)); - } else { - // Use the memory pool from DF - if conf.contains_key("memory_limit") { - let memory_limit = conf.get("memory_limit").unwrap().parse::()?; - let memory_fraction = conf - .get("memory_fraction") - .ok_or(CometError::Internal( - "Config 'memory_fraction' is not specified from Comet JVM side".to_string(), - ))? - .parse::()?; - rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction) - } - } + // Set Comet memory pool for native + let memory_pool = CometMemoryPool::new(comet_task_memory_manager); + rt_config = rt_config.with_memory_pool(Arc::new(memory_pool)); // Get Datafusion configuration from Spark Execution context // can be configured in Comet Spark JVM using Spark --conf parameters diff --git a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java index 96fa3b4323..a304efab87 100644 --- a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java +++ b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java @@ -20,6 +20,7 @@ package org.apache.spark; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.MemoryMode; @@ -35,22 +36,53 @@ public class CometTaskMemoryManager { private final TaskMemoryManager internal; private final NativeMemoryConsumer nativeMemoryConsumer; + private final boolean unifiedMemory; + private static AtomicBoolean initialized = new AtomicBoolean(false); + private static long available = 0; - public CometTaskMemoryManager(long id) { + public CometTaskMemoryManager(long id, boolean unifiedMemory, long available) { this.id = id; this.internal = TaskContext$.MODULE$.get().taskMemoryManager(); this.nativeMemoryConsumer = new NativeMemoryConsumer(); + this.unifiedMemory = unifiedMemory; + + if (CometTaskMemoryManager.initialized.compareAndSet(false, true)) { + synchronized (CometTaskMemoryManager.class) { + // TODO use Spark logger + System.out.println("Initializing Comet memory pool to " + available + " bytes"); + CometTaskMemoryManager.available = available; + } + } } // Called by Comet native through JNI. // Returns the actual amount of memory (in bytes) granted. public long acquireMemory(long size) { - return internal.acquireExecutionMemory(size, nativeMemoryConsumer); + if (unifiedMemory) { + return internal.acquireExecutionMemory(size, nativeMemoryConsumer); + } else { + synchronized (CometTaskMemoryManager.class) { + if (size <= CometTaskMemoryManager.available) { + available -= size; + return size; + } else { + long allocated = available; + available = 0; + return allocated; + } + } + } } // Called by Comet native through JNI public void releaseMemory(long size) { - internal.releaseExecutionMemory(size, nativeMemoryConsumer); + if (unifiedMemory) { + internal.releaseExecutionMemory(size, nativeMemoryConsumer); + } else { + synchronized (CometTaskMemoryManager.class) { + available += size; + } + } } /** diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 07dd80c39e..6c6ac5bce0 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -23,7 +23,7 @@ import org.apache.spark._ import org.apache.spark.sql.comet.CometMetricNode import org.apache.spark.sql.vectorized._ -import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} +import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} import org.apache.comet.vector.NativeUtil /** @@ -55,13 +55,16 @@ class CometExecIterator( }.toArray private val plan = { val configs = createNativeConf + val useUnifiedMemory = + SparkEnv.get.conf.get("spark.memory.offHeap.enabled", "false").toBoolean + val nativeMemPoolSize = CometSparkSessionExtensions.getCometMemoryOverhead(SparkEnv.get.conf) nativeLib.createPlan( id, configs, cometBatchIterators, protobufQueryPlan, nativeMetrics, - new CometTaskMemoryManager(id)) + new CometTaskMemoryManager(id, useUnifiedMemory, nativeMemPoolSize)) } private var nextBatch: Option[ColumnarBatch] = None @@ -75,15 +78,6 @@ class CometExecIterator( val result = new java.util.HashMap[String, String]() val conf = SparkEnv.get.conf - val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf) - // Only enable unified memory manager when off-heap mode is enabled. Otherwise, - // we'll use the built-in memory pool from DF, and initializes with `memory_limit` - // and `memory_fraction` below. - result.put( - "use_unified_memory_manager", - String.valueOf(conf.get("spark.memory.offHeap.enabled", "false"))) - result.put("memory_limit", String.valueOf(maxMemory)) - result.put("memory_fraction", String.valueOf(COMET_EXEC_MEMORY_FRACTION.get())) result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get())) result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get())) result.put("explain_native", String.valueOf(COMET_EXPLAIN_NATIVE_ENABLED.get()))