diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 2fff0a04c3..4d63de75a5 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -452,15 +452,6 @@ object CometConf extends ShimCometConf { .intConf .createWithDefault(8192) - val COMET_EXEC_MEMORY_FRACTION: ConfigEntry[Double] = conf("spark.comet.exec.memoryFraction") - .doc( - "The fraction of memory from Comet memory overhead that the native memory " + - "manager can use for execution. The purpose of this config is to set aside memory for " + - "untracked data structures, as well as imprecise size estimation during memory " + - "acquisition.") - .doubleConf - .createWithDefault(0.7) - val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] = conf("spark.comet.parquet.enable.directBuffer") .doc("Whether to use Java direct byte buffer when reading Parquet.") diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index ecea70254a..20923b93ae 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -47,7 +47,6 @@ Comet provides the following configuration settings. | spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true | | spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true | | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true | -| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 | | spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, this config is 'greedy_task_shared'. | greedy_task_shared | | spark.comet.exec.project.enabled | Whether to enable project by default. | true | | spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false | diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index e04e750b47..30b6d0f466 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -23,11 +23,52 @@ Comet provides some tuning options to help you get the best performance from you ## Memory Tuning -Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`. -If this setting is not enabled, Comet will not accelerate queries and will fall back to Spark. +### Unified Memory Management with Off-Heap Memory + +The recommended way to share memory between Spark and Comet is to set `spark.memory.offHeap.enabled=true`. This allows +Comet to share an off-heap memory pool with Spark. The size of the pool is specified by `spark.memory.offHeap.size`. For more details about Spark off-heap memory mode, please refer to Spark documentation: https://spark.apache.org/docs/latest/configuration.html. + +### Dedicated Comet Memory Pools + +Spark uses on-heap memory mode by default, i.e., the `spark.memory.offHeap.enabled` setting is not enabled. If Spark is under on-heap memory mode, Comet will use its own dedicated memory pools that +are not shared with Spark. This requires additional configuration settings to be specified to set the size and type of +memory pool to use. + +The size of the pool can be set explicitly with `spark.comet.memoryOverhead`. If this setting is not specified then +the memory overhead will be calculated by multiplying the executor memory by `spark.comet.memory.overhead.factor` +(defaults to `0.2`). + +The type of pool can be specified with `spark.comet.exec.memoryPool`. The default setting is `greedy_task_shared`. + +The valid pool types are: + +- `greedy` +- `greedy_global` +- `greedy_task_shared` +- `fair_spill` +- `fair_spill_global` +- `fair_spill_task_shared` + +Pool types ending with `_global` use a single global memory pool between all tasks on same executor. + +Pool types ending with `_task_shared` share a single memory pool across all attempts for a single task. + +Other pool types create a dedicated pool per native query plan using a fraction of the available pool size based on number of cores +and cores per task. + +The `greedy*` pool types use DataFusion's [GreedyMemoryPool], which implements a greedy first-come first-serve limit. This +pool works well for queries that do not need to spill or have a single spillable operator. + +The `fair_spill*` pool types use DataFusion's [FairSpillPool], which prevents spillable reservations from using more +than an even fraction of the available memory sans any unspillable reservations +(i.e. `(pool_size - unspillable_memory) / num_spillable_reservations)`). This pool works best when you know beforehand +the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even +when there was sufficient memory (reserved for other operators) to avoid doing so. Unspillable memory is allocated in +a first-come, first-serve fashion + +[GreedyMemoryPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.GreedyMemoryPool.html +[FairSpillPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html -Each executor will have a single memory pool which will be shared by all native plans being executed within that -process, and by Spark itself. The size of the pool is specified by `spark.memory.offHeap.size`. ### Determining How Much Memory to Allocate @@ -106,15 +147,19 @@ then any shuffle operations that cannot be supported in this mode will fall back ### Shuffle Compression By default, Spark compresses shuffle files using LZ4 compression. Comet overrides this behavior with ZSTD compression. -Compression can be disabled by setting `spark.shuffle.compress=false`, which may result in faster shuffle times in +Compression can be disabled by setting `spark.shuffle.compress=false`, which may result in faster shuffle times in certain environments, such as single-node setups with fast NVMe drives, at the expense of increased disk space usage. ## Explain Plan + ### Extended Explain + With Spark 4.0.0 and newer, Comet can provide extended explain plan information in the Spark UI. Currently this lists reasons why Comet may not have been enabled for specific operations. To enable this, in the Spark configuration, set the following: + ```shell -c spark.sql.extendedExplainProviders=org.apache.comet.ExtendedExplainInfo ``` -This will add a section to the detailed plan displayed in the Spark SQL UI page. \ No newline at end of file + +This will add a section to the detailed plan displayed in the Spark SQL UI page. diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index b1190d9059..7d8d577fe5 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -162,7 +162,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( memory_pool_type: jstring, memory_limit: jlong, memory_limit_per_task: jlong, - memory_fraction: jdouble, task_attempt_id: jlong, debug_native: jboolean, explain_native: jboolean, @@ -208,7 +207,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( memory_pool_type, memory_limit, memory_limit_per_task, - memory_fraction, )?; let memory_pool = create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id); @@ -281,14 +279,13 @@ fn parse_memory_pool_config( memory_pool_type: String, memory_limit: i64, memory_limit_per_task: i64, - memory_fraction: f64, ) -> CometResult { let memory_pool_config = if use_unified_memory_manager { MemoryPoolConfig::new(MemoryPoolType::Unified, 0) } else { // Use the memory pool from DF - let pool_size = (memory_limit as f64 * memory_fraction) as usize; - let pool_size_per_task = (memory_limit_per_task as f64 * memory_fraction) as usize; + let pool_size = memory_limit as usize; + let pool_size_per_task = memory_limit_per_task as usize; match memory_pool_type.as_str() { "fair_spill_task_shared" => { MemoryPoolConfig::new(MemoryPoolType::FairSpillTaskShared, pool_size_per_task) diff --git a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java b/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java index 54e349c137..f6e6ca96a7 100644 --- a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java +++ b/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java @@ -52,7 +52,7 @@ public static CometShuffleMemoryAllocatorTrait getInstance( (boolean) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get(); - if (isSparkTesting && !useUnifiedMemAllocator) { + if (!useUnifiedMemAllocator) { synchronized (CometShuffleMemoryAllocator.class) { if (INSTANCE == null) { // CometTestShuffleMemoryAllocator handles pages by itself so it can be a singleton. diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 0b90a91c74..7eae2e0663 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -20,10 +20,11 @@ package org.apache.comet import org.apache.spark._ +import org.apache.spark.internal.Logging import org.apache.spark.sql.comet.CometMetricNode import org.apache.spark.sql.vectorized._ -import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} +import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} import org.apache.comet.vector.NativeUtil /** @@ -52,7 +53,8 @@ class CometExecIterator( nativeMetrics: CometMetricNode, numParts: Int, partitionIndex: Int) - extends Iterator[ColumnarBatch] { + extends Iterator[ColumnarBatch] + with Logging { private val nativeLib = new Native() private val nativeUtil = new NativeUtil() @@ -75,7 +77,6 @@ class CometExecIterator( memory_pool_type = COMET_EXEC_MEMORY_POOL_TYPE.get(), memory_limit = CometSparkSessionExtensions.getCometMemoryOverhead(conf), memory_limit_per_task = getMemoryLimitPerTask(conf), - memory_fraction = COMET_EXEC_MEMORY_FRACTION.get(), task_attempt_id = TaskContext.get().taskAttemptId, debug = COMET_DEBUG_ENABLED.get(), explain = COMET_EXPLAIN_NATIVE_ENABLED.get(), @@ -93,7 +94,10 @@ class CometExecIterator( val coresPerTask = conf.get("spark.task.cpus", "1").toFloat // example 16GB maxMemory * 16 cores with 4 cores per task results // in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB - (maxMemory.toFloat * coresPerTask / numCores).toLong + val limit = (maxMemory.toFloat * coresPerTask / numCores).toLong + logInfo( + s"Calculated per-task memory limit of $limit ($maxMemory * $coresPerTask / $numCores)") + limit } private def numDriverOrExecutorCores(conf: SparkConf): Int = { diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 8bff6b5fbd..af42538c0f 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -53,7 +53,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType} import org.apache.comet.CometConf._ import org.apache.comet.CometExplainInfo.getActualPlan -import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isOffHeapEnabled, isSpark34Plus, isSpark40Plus, isTesting, shouldApplySparkToColumnar, withInfo, withInfos} +import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.rules.RewriteJoin import org.apache.comet.serde.OperatorOuterClass.Operator @@ -919,14 +919,6 @@ class CometSparkSessionExtensions } override def apply(plan: SparkPlan): SparkPlan = { - - // Comet required off-heap memory to be enabled - if (!isOffHeapEnabled(conf) && !isTesting) { - logWarning("Comet native exec disabled because spark.memory.offHeap.enabled=false") - withInfo(plan, "Comet native exec disabled because spark.memory.offHeap.enabled=false") - return plan - } - // DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is // enabled. if (isANSIEnabled(conf)) { @@ -1174,21 +1166,12 @@ object CometSparkSessionExtensions extends Logging { } } - private[comet] def isOffHeapEnabled(conf: SQLConf): Boolean = - conf.getConfString("spark.memory.offHeap.enabled", "false").toBoolean - - // Copied from org.apache.spark.util.Utils which is private to Spark. - private[comet] def isTesting: Boolean = { - System.getenv("SPARK_TESTING") != null || System.getProperty("spark.testing") != null - } - // Check whether Comet shuffle is enabled: // 1. `COMET_EXEC_SHUFFLE_ENABLED` is true // 2. `spark.shuffle.manager` is set to `CometShuffleManager` // 3. Off-heap memory is enabled || Spark/Comet unit testing private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean = - COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) && - (isOffHeapEnabled(conf) || isTesting) + COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) private[comet] def getCometShuffleNotEnabledReason(conf: SQLConf): Option[String] = { if (!COMET_EXEC_SHUFFLE_ENABLED.get(conf)) { diff --git a/spark/src/main/scala/org/apache/comet/Native.scala b/spark/src/main/scala/org/apache/comet/Native.scala index 5fd84989ba..e5728009e4 100644 --- a/spark/src/main/scala/org/apache/comet/Native.scala +++ b/spark/src/main/scala/org/apache/comet/Native.scala @@ -55,7 +55,6 @@ class Native extends NativeBase { memory_pool_type: String, memory_limit: Long, memory_limit_per_task: Long, - memory_fraction: Double, task_attempt_id: Long, debug: Boolean, explain: Boolean,