From 76dfffeca5191b33b6e815180494d4bb01d84417 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Thu, 26 Sep 2024 21:36:04 +0800 Subject: [PATCH 01/12] Make datafusion's native memory pool configurable --- .../scala/org/apache/comet/CometConf.scala | 9 + docs/source/user-guide/configs.md | 1 + native/core/src/execution/jni_api.rs | 209 ++++++++++++++++-- .../org/apache/comet/CometExecIterator.scala | 29 ++- .../main/scala/org/apache/comet/Native.scala | 3 + 5 files changed, 229 insertions(+), 22 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 8815ac4eb8..2fff0a04c3 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -467,6 +467,15 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_EXEC_MEMORY_POOL_TYPE: ConfigEntry[String] = conf("spark.comet.exec.memoryPool") + .doc( + "The type of memory pool to be used for Comet native execution. " + + "Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', " + + "'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, " + + "this config is 'greedy_task_shared'.") + .stringConf + .createWithDefault("greedy_task_shared") + val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.preFetch.enabled") .doc("Whether to enable pre-fetching feature of CometScan.") diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 7881f07632..ecea70254a 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -48,6 +48,7 @@ Comet provides the following configuration settings. | spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true | | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true | | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 | +| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, this config is 'greedy_task_shared'. | greedy_task_shared | | spark.comet.exec.project.enabled | Whether to enable project by default. | true | | spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false | | spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. Compression can be disabled by setting spark.shuffle.compress=false. | zstd | diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 09caf5e279..83f1652309 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -24,6 +24,9 @@ use datafusion::{ physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream}, prelude::{SessionConfig, SessionContext}, }; +use datafusion_execution::memory_pool::{ + FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, +}; use futures::poll; use jni::{ errors::Result as JNIResult, @@ -51,20 +54,26 @@ use datafusion_comet_proto::spark_operator::Operator; use datafusion_common::ScalarValue; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use futures::stream::StreamExt; +use jni::sys::JNI_FALSE; use jni::{ objects::GlobalRef, sys::{jboolean, jdouble, jintArray, jobjectArray, jstring}, }; +use std::num::NonZeroUsize; +use std::sync::Mutex; use tokio::runtime::Runtime; use crate::execution::operators::ScanExec; use crate::execution::spark_plan::SparkPlan; use log::info; +use once_cell::sync::{Lazy, OnceCell}; /// Comet native execution context. Kept alive across JNI calls. struct ExecutionContext { /// The id of the execution context. pub id: i64, + /// Task attempt id + pub task_attempt_id: i64, /// The deserialized Spark plan pub spark_plan: Operator, /// The DataFusion root operator converted from the `spark_plan` @@ -89,6 +98,51 @@ struct ExecutionContext { pub explain_native: bool, /// Map of metrics name -> jstring object to cache jni_NewStringUTF calls. pub metrics_jstrings: HashMap>, + /// Memory pool config + pub memory_pool_config: MemoryPoolConfig, +} + +#[derive(PartialEq, Eq)] +enum MemoryPoolType { + Unified, + Greedy, + FairSpill, + GreedyTaskShared, + FairSpillTaskShared, + GreedyGlobal, + FairSpillGlobal, +} + +struct MemoryPoolConfig { + pool_type: MemoryPoolType, + pool_size: usize, +} + +impl MemoryPoolConfig { + fn new(pool_type: MemoryPoolType, pool_size: usize) -> Self { + Self { + pool_type, + pool_size, + } + } +} + +/// The per-task memory pools keyed by task attempt id. +static TASK_SHARED_MEMORY_POOLS: Lazy>> = + Lazy::new(|| Mutex::new(HashMap::new())); + +struct PerTaskMemoryPool { + memory_pool: Arc, + num_plans: usize, +} + +impl PerTaskMemoryPool { + fn new(memory_pool: Arc) -> Self { + Self { + memory_pool, + num_plans: 0, + } + } } /// Accept serialized query plan and return the address of the native query plan. @@ -105,8 +159,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( comet_task_memory_manager_obj: JObject, batch_size: jint, use_unified_memory_manager: jboolean, + memory_pool_type: jstring, memory_limit: jlong, + memory_limit_per_task: jlong, memory_fraction: jdouble, + task_attempt_id: jlong, debug_native: jboolean, explain_native: jboolean, worker_threads: jint, @@ -145,21 +202,27 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( let task_memory_manager = Arc::new(jni_new_global_ref!(env, comet_task_memory_manager_obj)?); + let memory_pool_type = env.get_string(&JString::from_raw(memory_pool_type))?.into(); + let memory_pool_config = parse_memory_pool_config( + use_unified_memory_manager != JNI_FALSE, + memory_pool_type, + memory_limit, + memory_limit_per_task, + memory_fraction, + )?; + let memory_pool = + create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id); + // We need to keep the session context alive. Some session state like temporary // dictionaries are stored in session context. If it is dropped, the temporary // dictionaries will be dropped as well. - let session = prepare_datafusion_session_context( - batch_size as usize, - use_unified_memory_manager == 1, - memory_limit as usize, - memory_fraction, - task_memory_manager, - )?; + let session = prepare_datafusion_session_context(batch_size as usize, memory_pool)?; let plan_creation_time = start.elapsed(); let exec_context = Box::new(ExecutionContext { id, + task_attempt_id, spark_plan, root_op: None, scans: vec![], @@ -172,6 +235,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( debug_native: debug_native == 1, explain_native: explain_native == 1, metrics_jstrings: HashMap::new(), + memory_pool_config, }); Ok(Box::into_raw(exec_context) as i64) @@ -181,22 +245,10 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( /// Configure DataFusion session context. fn prepare_datafusion_session_context( batch_size: usize, - use_unified_memory_manager: bool, - memory_limit: usize, - memory_fraction: f64, - comet_task_memory_manager: Arc, + memory_pool: Arc, ) -> CometResult { let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager(DiskManagerConfig::NewOs); - - // Check if we are using unified memory manager integrated with Spark. - if use_unified_memory_manager { - // Set Comet memory pool for native - let memory_pool = CometMemoryPool::new(comet_task_memory_manager); - rt_config = rt_config.with_memory_pool(Arc::new(memory_pool)); - } else { - // Use the memory pool from DF - rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction) - } + rt_config = rt_config.with_memory_pool(memory_pool); // Get Datafusion configuration from Spark Execution context // can be configured in Comet Spark JVM using Spark --conf parameters @@ -224,6 +276,107 @@ fn prepare_datafusion_session_context( Ok(session_ctx) } +fn parse_memory_pool_config( + use_unified_memory_manager: bool, + memory_pool_type: String, + memory_limit: i64, + memory_limit_per_task: i64, + memory_fraction: f64, +) -> CometResult { + let memory_pool_config = if use_unified_memory_manager { + MemoryPoolConfig::new(MemoryPoolType::Unified, 0) + } else { + // Use the memory pool from DF + let pool_size = (memory_limit as f64 * memory_fraction) as usize; + let pool_size_per_task = (memory_limit_per_task as f64 * memory_fraction) as usize; + match memory_pool_type.as_str() { + "fair_spill_task_shared" => { + MemoryPoolConfig::new(MemoryPoolType::FairSpillTaskShared, pool_size_per_task) + } + "greedy_task_shared" => { + MemoryPoolConfig::new(MemoryPoolType::GreedyTaskShared, pool_size_per_task) + } + "fair_spill_global" => { + MemoryPoolConfig::new(MemoryPoolType::FairSpillGlobal, pool_size) + } + "greedy_global" => MemoryPoolConfig::new(MemoryPoolType::GreedyGlobal, pool_size), + "fair_spill" => MemoryPoolConfig::new(MemoryPoolType::FairSpill, pool_size_per_task), + "greedy" => MemoryPoolConfig::new(MemoryPoolType::Greedy, pool_size_per_task), + _ => { + return Err(CometError::Config(format!( + "Unsupported memory pool type: {}", + memory_pool_type + ))) + } + } + }; + Ok(memory_pool_config) +} + +fn create_memory_pool( + memory_pool_config: &MemoryPoolConfig, + comet_task_memory_manager: Arc, + task_attempt_id: i64, +) -> Arc { + const NUM_TRACKED_CONSUMERS: usize = 10; + match memory_pool_config.pool_type { + MemoryPoolType::Unified => { + // Set Comet memory pool for native + let memory_pool = CometMemoryPool::new(comet_task_memory_manager); + Arc::new(memory_pool) + } + MemoryPoolType::Greedy => Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(memory_pool_config.pool_size), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )), + MemoryPoolType::FairSpill => Arc::new(TrackConsumersPool::new( + FairSpillPool::new(memory_pool_config.pool_size), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )), + MemoryPoolType::GreedyGlobal => { + static GLOBAL_MEMORY_POOL_GREEDY: OnceCell> = OnceCell::new(); + let memory_pool = GLOBAL_MEMORY_POOL_GREEDY.get_or_init(|| { + Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(memory_pool_config.pool_size), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )) + }); + Arc::clone(memory_pool) + } + MemoryPoolType::FairSpillGlobal => { + static GLOBAL_MEMORY_POOL_FAIR: OnceCell> = OnceCell::new(); + let memory_pool = GLOBAL_MEMORY_POOL_FAIR.get_or_init(|| { + Arc::new(TrackConsumersPool::new( + FairSpillPool::new(memory_pool_config.pool_size), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )) + }); + Arc::clone(memory_pool) + } + MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared => { + let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); + let per_task_memory_pool = + memory_pool_map.entry(task_attempt_id).or_insert_with(|| { + let pool: Arc = + if memory_pool_config.pool_type == MemoryPoolType::GreedyTaskShared { + Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(memory_pool_config.pool_size), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )) + } else { + Arc::new(TrackConsumersPool::new( + FairSpillPool::new(memory_pool_config.pool_size), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )) + }; + PerTaskMemoryPool::new(pool) + }); + per_task_memory_pool.num_plans += 1; + Arc::clone(&per_task_memory_pool.memory_pool) + } + } +} + /// Prepares arrow arrays for output. fn prepare_output( env: &mut JNIEnv, @@ -407,6 +560,20 @@ pub extern "system" fn Java_org_apache_comet_Native_releasePlan( ) { try_unwrap_or_throw(&e, |_| unsafe { let execution_context = get_execution_context(exec_context); + if execution_context.memory_pool_config.pool_type == MemoryPoolType::FairSpillTaskShared { + // Decrement the number of native plans using the per-task shared memory pool, and + // remove the memory pool if the released native plan is the last native plan using it. + let task_attempt_id = execution_context.task_attempt_id; + let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); + if let Some(per_task_memory_pool) = memory_pool_map.get_mut(&task_attempt_id) { + per_task_memory_pool.num_plans -= 1; + if per_task_memory_pool.num_plans == 0 { + // Drop the memory pool from the per-task memory pool map if there are no + // more native plans using it. + memory_pool_map.remove(&task_attempt_id); + } + } + } let _: Box = Box::from_raw(execution_context); Ok(()) }) diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 04d9306951..0b90a91c74 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -23,7 +23,7 @@ import org.apache.spark._ import org.apache.spark.sql.comet.CometMetricNode import org.apache.spark.sql.vectorized._ -import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} +import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} import org.apache.comet.vector.NativeUtil /** @@ -72,8 +72,11 @@ class CometExecIterator( new CometTaskMemoryManager(id), batchSize = COMET_BATCH_SIZE.get(), use_unified_memory_manager = conf.getBoolean("spark.memory.offHeap.enabled", false), + memory_pool_type = COMET_EXEC_MEMORY_POOL_TYPE.get(), memory_limit = CometSparkSessionExtensions.getCometMemoryOverhead(conf), + memory_limit_per_task = getMemoryLimitPerTask(conf), memory_fraction = COMET_EXEC_MEMORY_FRACTION.get(), + task_attempt_id = TaskContext.get().taskAttemptId, debug = COMET_DEBUG_ENABLED.get(), explain = COMET_EXPLAIN_NATIVE_ENABLED.get(), workerThreads = COMET_WORKER_THREADS.get(), @@ -84,6 +87,30 @@ class CometExecIterator( private var currentBatch: ColumnarBatch = null private var closed: Boolean = false + private def getMemoryLimitPerTask(conf: SparkConf): Long = { + val numCores = numDriverOrExecutorCores(conf).toFloat + val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf) + val coresPerTask = conf.get("spark.task.cpus", "1").toFloat + // example 16GB maxMemory * 16 cores with 4 cores per task results + // in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB + (maxMemory.toFloat * coresPerTask / numCores).toLong + } + + private def numDriverOrExecutorCores(conf: SparkConf): Int = { + def convertToInt(threads: String): Int = { + if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt + } + val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r + val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r + val master = conf.get("spark.master") + master match { + case "local" => 1 + case LOCAL_N_REGEX(threads) => convertToInt(threads) + case LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads) + case _ => conf.get("spark.executor.cores", "1").toInt + } + } + def getNextBatch(): Option[ColumnarBatch] = { assert(partitionIndex >= 0 && partitionIndex < numParts) diff --git a/spark/src/main/scala/org/apache/comet/Native.scala b/spark/src/main/scala/org/apache/comet/Native.scala index 083c0f2b50..5fd84989ba 100644 --- a/spark/src/main/scala/org/apache/comet/Native.scala +++ b/spark/src/main/scala/org/apache/comet/Native.scala @@ -52,8 +52,11 @@ class Native extends NativeBase { taskMemoryManager: CometTaskMemoryManager, batchSize: Int, use_unified_memory_manager: Boolean, + memory_pool_type: String, memory_limit: Long, + memory_limit_per_task: Long, memory_fraction: Double, + task_attempt_id: Long, debug: Boolean, explain: Boolean, workerThreads: Int, From fa69e8933c4127f31d4e27f19ad14531b551910f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 2 Jan 2025 09:44:21 -0700 Subject: [PATCH 02/12] save --- .../shuffle/comet/CometShuffleMemoryAllocator.java | 2 +- .../apache/comet/CometSparkSessionExtensions.scala | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java b/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java index 54e349c137..f6e6ca96a7 100644 --- a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java +++ b/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java @@ -52,7 +52,7 @@ public static CometShuffleMemoryAllocatorTrait getInstance( (boolean) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get(); - if (isSparkTesting && !useUnifiedMemAllocator) { + if (!useUnifiedMemAllocator) { synchronized (CometShuffleMemoryAllocator.class) { if (INSTANCE == null) { // CometTestShuffleMemoryAllocator handles pages by itself so it can be a singleton. diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 8bff6b5fbd..f62698ca37 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -921,11 +921,11 @@ class CometSparkSessionExtensions override def apply(plan: SparkPlan): SparkPlan = { // Comet required off-heap memory to be enabled - if (!isOffHeapEnabled(conf) && !isTesting) { - logWarning("Comet native exec disabled because spark.memory.offHeap.enabled=false") - withInfo(plan, "Comet native exec disabled because spark.memory.offHeap.enabled=false") - return plan - } +// if (!isOffHeapEnabled(conf) && !isTesting) { +// logWarning("Comet native exec disabled because spark.memory.offHeap.enabled=false") +// withInfo(plan, "Comet native exec disabled because spark.memory.offHeap.enabled=false") +// return plan +// } // DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is // enabled. @@ -1187,8 +1187,7 @@ object CometSparkSessionExtensions extends Logging { // 2. `spark.shuffle.manager` is set to `CometShuffleManager` // 3. Off-heap memory is enabled || Spark/Comet unit testing private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean = - COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) && - (isOffHeapEnabled(conf) || isTesting) + COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) private[comet] def getCometShuffleNotEnabledReason(conf: SQLConf): Option[String] = { if (!COMET_EXEC_SHUFFLE_ENABLED.get(conf)) { From 9acf9f5b22c5f8f565134dd254a63e2395fcfe13 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 2 Jan 2025 10:19:50 -0700 Subject: [PATCH 03/12] fix --- .../src/main/scala/org/apache/comet/CometExecIterator.scala | 5 ++++- .../scala/org/apache/comet/CometSparkSessionExtensions.scala | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 0b90a91c74..978f55c923 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -20,6 +20,7 @@ package org.apache.comet import org.apache.spark._ +import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.comet.CometMetricNode import org.apache.spark.sql.vectorized._ @@ -89,7 +90,9 @@ class CometExecIterator( private def getMemoryLimitPerTask(conf: SparkConf): Long = { val numCores = numDriverOrExecutorCores(conf).toFloat - val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf) + val maxMemory = ConfigHelpers + .byteFromString(conf.get("spark.executor.memory", "1024MB"), ByteUnit.BYTE) + val coresPerTask = conf.get("spark.task.cpus", "1").toFloat // example 16GB maxMemory * 16 cores with 4 cores per task results // in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index f62698ca37..a8cbc07b3d 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -53,7 +53,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType} import org.apache.comet.CometConf._ import org.apache.comet.CometExplainInfo.getActualPlan -import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isOffHeapEnabled, isSpark34Plus, isSpark40Plus, isTesting, shouldApplySparkToColumnar, withInfo, withInfos} +import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.rules.RewriteJoin import org.apache.comet.serde.OperatorOuterClass.Operator From 499c8bd6411d416a086faf43c0aca12429a081fe Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 3 Jan 2025 08:40:55 -0700 Subject: [PATCH 04/12] Update memory calculation and add draft documentation --- docs/source/user-guide/tuning.md | 51 ++++++++++++++++--- native/core/src/execution/jni_api.rs | 7 +-- .../org/apache/comet/CometExecIterator.scala | 20 ++++---- .../comet/CometSparkSessionExtensions.scala | 8 --- .../main/scala/org/apache/comet/Native.scala | 1 - 5 files changed, 56 insertions(+), 31 deletions(-) diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index e04e750b47..a0ed26f2c1 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -23,11 +23,46 @@ Comet provides some tuning options to help you get the best performance from you ## Memory Tuning -Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`. -If this setting is not enabled, Comet will not accelerate queries and will fall back to Spark. +### Unified Memory Management with Off-Heap Memory -Each executor will have a single memory pool which will be shared by all native plans being executed within that -process, and by Spark itself. The size of the pool is specified by `spark.memory.offHeap.size`. +The recommended way to share memory between Spark and Comet is to set `spark.memory.offHeap.enabled=true`. This allows +Comet to share an off-heap memory pool with Spark. The size of the pool is specified by `spark.memory.offHeap.size`. + +### Dedicated Comet Memory Pools + +If the `spark.memory.offHeap.enabled` setting is not enabled then Comet will use its own dedicated memory pools that +are not shared with Spark. This requires additional configuration settings to be specified to set the size and type of +memory pool to use. + +The size of the pool can be set explicitly with `spark.comet.memoryOverhead`. If this setting is not specified then +the memory overhead will be calculated by multiplying the executor memory by `spark.comet.memory.overhead.factor` +(defaults to `0.2`). + +The type of pool can be specified with `spark.comet.exec.memoryPool`. The default setting is `greedy_task_shared`. + +The valid pool types are: + +- `greedy` +- `greedy_global` +- `greedy_task_shared` +- `fair_spill` +- `fair_spill_global` +- `fair_spill_task_shared` + +The `greedy*` pool types use DataFusion's [GreedyMemoryPool], which implements a greedy first-come first-serve limit. This +pool works well for queries that do not need to spill or have a single spillable operator. + +The `fair_spill*` pool types use DataFusion's [FairSpillPool], which prevents spillable reservations from using more +than an even fraction of the available memory sans any unspillable reservations +(i.e. `(pool_size - unspillable_memory) / num_spillable_reservations)`). This pool works best when you know beforehand +the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even +when there was sufficient memory (reserved for other operators) to avoid doing so. Unspillable memory is allocated in +a first-come, first-serve fashion + +[GreedyMemoryPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.GreedyMemoryPool.html +[FairSpillPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html + +TODO: explain global vs task_shared ### Determining How Much Memory to Allocate @@ -106,15 +141,19 @@ then any shuffle operations that cannot be supported in this mode will fall back ### Shuffle Compression By default, Spark compresses shuffle files using LZ4 compression. Comet overrides this behavior with ZSTD compression. -Compression can be disabled by setting `spark.shuffle.compress=false`, which may result in faster shuffle times in +Compression can be disabled by setting `spark.shuffle.compress=false`, which may result in faster shuffle times in certain environments, such as single-node setups with fast NVMe drives, at the expense of increased disk space usage. ## Explain Plan + ### Extended Explain + With Spark 4.0.0 and newer, Comet can provide extended explain plan information in the Spark UI. Currently this lists reasons why Comet may not have been enabled for specific operations. To enable this, in the Spark configuration, set the following: + ```shell -c spark.sql.extendedExplainProviders=org.apache.comet.ExtendedExplainInfo ``` -This will add a section to the detailed plan displayed in the Spark SQL UI page. \ No newline at end of file + +This will add a section to the detailed plan displayed in the Spark SQL UI page. diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index b1190d9059..7d8d577fe5 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -162,7 +162,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( memory_pool_type: jstring, memory_limit: jlong, memory_limit_per_task: jlong, - memory_fraction: jdouble, task_attempt_id: jlong, debug_native: jboolean, explain_native: jboolean, @@ -208,7 +207,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( memory_pool_type, memory_limit, memory_limit_per_task, - memory_fraction, )?; let memory_pool = create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id); @@ -281,14 +279,13 @@ fn parse_memory_pool_config( memory_pool_type: String, memory_limit: i64, memory_limit_per_task: i64, - memory_fraction: f64, ) -> CometResult { let memory_pool_config = if use_unified_memory_manager { MemoryPoolConfig::new(MemoryPoolType::Unified, 0) } else { // Use the memory pool from DF - let pool_size = (memory_limit as f64 * memory_fraction) as usize; - let pool_size_per_task = (memory_limit_per_task as f64 * memory_fraction) as usize; + let pool_size = memory_limit as usize; + let pool_size_per_task = memory_limit_per_task as usize; match memory_pool_type.as_str() { "fair_spill_task_shared" => { MemoryPoolConfig::new(MemoryPoolType::FairSpillTaskShared, pool_size_per_task) diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 5954e815e5..7eae2e0663 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -20,11 +20,11 @@ package org.apache.comet import org.apache.spark._ -import org.apache.spark.network.util.ByteUnit +import org.apache.spark.internal.Logging import org.apache.spark.sql.comet.CometMetricNode import org.apache.spark.sql.vectorized._ -import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} +import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} import org.apache.comet.vector.NativeUtil /** @@ -53,7 +53,8 @@ class CometExecIterator( nativeMetrics: CometMetricNode, numParts: Int, partitionIndex: Int) - extends Iterator[ColumnarBatch] { + extends Iterator[ColumnarBatch] + with Logging { private val nativeLib = new Native() private val nativeUtil = new NativeUtil() @@ -76,7 +77,6 @@ class CometExecIterator( memory_pool_type = COMET_EXEC_MEMORY_POOL_TYPE.get(), memory_limit = CometSparkSessionExtensions.getCometMemoryOverhead(conf), memory_limit_per_task = getMemoryLimitPerTask(conf), - memory_fraction = COMET_EXEC_MEMORY_FRACTION.get(), task_attempt_id = TaskContext.get().taskAttemptId, debug = COMET_DEBUG_ENABLED.get(), explain = COMET_EXPLAIN_NATIVE_ENABLED.get(), @@ -90,16 +90,14 @@ class CometExecIterator( private def getMemoryLimitPerTask(conf: SparkConf): Long = { val numCores = numDriverOrExecutorCores(conf).toFloat - - // TODO need to think about best configs to use here for max memory to - // allocate to Comet memory pools - val maxMemory = ConfigHelpers - .byteFromString(conf.get("spark.executor.memory", "1024MB"), ByteUnit.BYTE) - + val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf) val coresPerTask = conf.get("spark.task.cpus", "1").toFloat // example 16GB maxMemory * 16 cores with 4 cores per task results // in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB - (maxMemory.toFloat * coresPerTask / numCores).toLong + val limit = (maxMemory.toFloat * coresPerTask / numCores).toLong + logInfo( + s"Calculated per-task memory limit of $limit ($maxMemory * $coresPerTask / $numCores)") + limit } private def numDriverOrExecutorCores(conf: SparkConf): Int = { diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index a8cbc07b3d..32292ec2dd 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -919,14 +919,6 @@ class CometSparkSessionExtensions } override def apply(plan: SparkPlan): SparkPlan = { - - // Comet required off-heap memory to be enabled -// if (!isOffHeapEnabled(conf) && !isTesting) { -// logWarning("Comet native exec disabled because spark.memory.offHeap.enabled=false") -// withInfo(plan, "Comet native exec disabled because spark.memory.offHeap.enabled=false") -// return plan -// } - // DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is // enabled. if (isANSIEnabled(conf)) { diff --git a/spark/src/main/scala/org/apache/comet/Native.scala b/spark/src/main/scala/org/apache/comet/Native.scala index 5fd84989ba..e5728009e4 100644 --- a/spark/src/main/scala/org/apache/comet/Native.scala +++ b/spark/src/main/scala/org/apache/comet/Native.scala @@ -55,7 +55,6 @@ class Native extends NativeBase { memory_pool_type: String, memory_limit: Long, memory_limit_per_task: Long, - memory_fraction: Double, task_attempt_id: Long, debug: Boolean, explain: Boolean, From 19b0491c3c0d565c0c79ff631f5396cf84e16906 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 3 Jan 2025 08:46:43 -0700 Subject: [PATCH 05/12] ready for review --- docs/source/user-guide/tuning.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index a0ed26f2c1..2ead3d85e7 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -49,6 +49,13 @@ The valid pool types are: - `fair_spill_global` - `fair_spill_task_shared` +Pool types ending with `_global` use a single global memory pool between all tasks. + +Pool types ending with `_task_shared` share a single memory pool across all attempts for a single task. + +Other pool types create a dedicated pool per plan using a fraction of the available pool size based on number of cores +and cores per task. + The `greedy*` pool types use DataFusion's [GreedyMemoryPool], which implements a greedy first-come first-serve limit. This pool works well for queries that do not need to spill or have a single spillable operator. @@ -62,7 +69,6 @@ a first-come, first-serve fashion [GreedyMemoryPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.GreedyMemoryPool.html [FairSpillPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html -TODO: explain global vs task_shared ### Determining How Much Memory to Allocate From 1a967d333ca40e8293b6211a2ef09b6f7cf4b058 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 3 Jan 2025 08:47:55 -0700 Subject: [PATCH 06/12] ready for review --- docs/source/user-guide/tuning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index 2ead3d85e7..c47811baa4 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -53,7 +53,7 @@ Pool types ending with `_global` use a single global memory pool between all tas Pool types ending with `_task_shared` share a single memory pool across all attempts for a single task. -Other pool types create a dedicated pool per plan using a fraction of the available pool size based on number of cores +Other pool types create a dedicated pool per task using a fraction of the available pool size based on number of cores and cores per task. The `greedy*` pool types use DataFusion's [GreedyMemoryPool], which implements a greedy first-come first-serve limit. This From 63b55a5d2a6207321039a03c1235a9374518ccb4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 3 Jan 2025 16:29:26 -0700 Subject: [PATCH 07/12] address feedback --- .../org/apache/comet/CometSparkSessionExtensions.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 32292ec2dd..af42538c0f 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -1166,14 +1166,6 @@ object CometSparkSessionExtensions extends Logging { } } - private[comet] def isOffHeapEnabled(conf: SQLConf): Boolean = - conf.getConfString("spark.memory.offHeap.enabled", "false").toBoolean - - // Copied from org.apache.spark.util.Utils which is private to Spark. - private[comet] def isTesting: Boolean = { - System.getenv("SPARK_TESTING") != null || System.getProperty("spark.testing") != null - } - // Check whether Comet shuffle is enabled: // 1. `COMET_EXEC_SHUFFLE_ENABLED` is true // 2. `spark.shuffle.manager` is set to `CometShuffleManager` From 44332fba2cba26c15cf3a8a8cf3c473d03eab11c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 6 Jan 2025 13:48:16 -0700 Subject: [PATCH 08/12] Update docs/source/user-guide/tuning.md Co-authored-by: Liang-Chi Hsieh --- docs/source/user-guide/tuning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index c47811baa4..13a233df46 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -26,7 +26,7 @@ Comet provides some tuning options to help you get the best performance from you ### Unified Memory Management with Off-Heap Memory The recommended way to share memory between Spark and Comet is to set `spark.memory.offHeap.enabled=true`. This allows -Comet to share an off-heap memory pool with Spark. The size of the pool is specified by `spark.memory.offHeap.size`. +Comet to share an off-heap memory pool with Spark. The size of the pool is specified by `spark.memory.offHeap.size`. For more details about Spark off-heap memory mode, please refer to Spark documentation: https://spark.apache.org/docs/latest/configuration.html. ### Dedicated Comet Memory Pools From 4643ad637e3c32e17a005a61090ad5e332dce921 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 6 Jan 2025 13:48:22 -0700 Subject: [PATCH 09/12] Update docs/source/user-guide/tuning.md Co-authored-by: Kristin Cowalcijk --- docs/source/user-guide/tuning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index 13a233df46..beb48b2e7f 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -53,7 +53,7 @@ Pool types ending with `_global` use a single global memory pool between all tas Pool types ending with `_task_shared` share a single memory pool across all attempts for a single task. -Other pool types create a dedicated pool per task using a fraction of the available pool size based on number of cores +Other pool types create a dedicated pool per native query plan using a fraction of the available pool size based on number of cores and cores per task. The `greedy*` pool types use DataFusion's [GreedyMemoryPool], which implements a greedy first-come first-serve limit. This From eb5d4d0a2a8a581a8cfa9a722ae515be240210de Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 6 Jan 2025 13:48:51 -0700 Subject: [PATCH 10/12] Update docs/source/user-guide/tuning.md Co-authored-by: Liang-Chi Hsieh --- docs/source/user-guide/tuning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index beb48b2e7f..48e22922d9 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -30,7 +30,7 @@ Comet to share an off-heap memory pool with Spark. The size of the pool is speci ### Dedicated Comet Memory Pools -If the `spark.memory.offHeap.enabled` setting is not enabled then Comet will use its own dedicated memory pools that +Spark uses on-heap memory mode by default, i.e., the `spark.memory.offHeap.enabled` setting is not enabled. If Spark is under on-heap memory mode, Comet will use its own dedicated memory pools that are not shared with Spark. This requires additional configuration settings to be specified to set the size and type of memory pool to use. From 9c1fa49cb4b0425dfa2b416dcbf72ec7e09757c7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 6 Jan 2025 13:57:49 -0700 Subject: [PATCH 11/12] Update docs/source/user-guide/tuning.md Co-authored-by: Liang-Chi Hsieh --- docs/source/user-guide/tuning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index 48e22922d9..30b6d0f466 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -49,7 +49,7 @@ The valid pool types are: - `fair_spill_global` - `fair_spill_task_shared` -Pool types ending with `_global` use a single global memory pool between all tasks. +Pool types ending with `_global` use a single global memory pool between all tasks on same executor. Pool types ending with `_task_shared` share a single memory pool across all attempts for a single task. From d0a5bf317d50ef3a514f44a5080bfc30b2685190 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 6 Jan 2025 14:47:14 -0700 Subject: [PATCH 12/12] remove unused config --- common/src/main/scala/org/apache/comet/CometConf.scala | 9 --------- docs/source/user-guide/configs.md | 1 - 2 files changed, 10 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 2fff0a04c3..4d63de75a5 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -452,15 +452,6 @@ object CometConf extends ShimCometConf { .intConf .createWithDefault(8192) - val COMET_EXEC_MEMORY_FRACTION: ConfigEntry[Double] = conf("spark.comet.exec.memoryFraction") - .doc( - "The fraction of memory from Comet memory overhead that the native memory " + - "manager can use for execution. The purpose of this config is to set aside memory for " + - "untracked data structures, as well as imprecise size estimation during memory " + - "acquisition.") - .doubleConf - .createWithDefault(0.7) - val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] = conf("spark.comet.parquet.enable.directBuffer") .doc("Whether to use Java direct byte buffer when reading Parquet.") diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index ecea70254a..20923b93ae 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -47,7 +47,6 @@ Comet provides the following configuration settings. | spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true | | spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true | | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true | -| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 | | spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, this config is 'greedy_task_shared'. | greedy_task_shared | | spark.comet.exec.project.enabled | Whether to enable project by default. | true | | spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |