From a1d04f52f1c551e0366e421f31c5b06068b694c4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 9 Oct 2024 09:34:58 -0600 Subject: [PATCH 01/18] experiment --- .../scala/org/apache/comet/CometConf.scala | 6 +++ .../comet/CometSparkSessionExtensions.scala | 37 ++++++++++++++++++- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 8223c2cc18..36642c46d0 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -263,6 +263,12 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_REPLACE_SMJ: ConfigEntry[Boolean] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMorgeJoin") + .doc("Whether to replace SortMergeJoin with ShuffleHashJoin for improved performance.") + .booleanConf + .createWithDefault(true) + val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf( s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec") .doc( diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 0185a15e5b..2037b87e87 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -27,7 +27,9 @@ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} import org.apache.spark.sql.catalyst.expressions.{Attribute, Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, PlanExpression, Remainder} import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial} -import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, NormalizeNaNAndZero} +import org.apache.spark.sql.catalyst.optimizer.InjectRuntimeFilter.{canBuildShuffledHashJoinLeft, canBuildShuffledHashJoinRight} +import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.MetadataColumnHelper @@ -938,7 +940,38 @@ class CometSparkSessionExtensions plan } } else { - var newPlan = transform(normalizePlan(plan)) + + def getBuildSide(joinType: JoinType): Option[BuildSide] = { + val leftBuildable = canBuildShuffledHashJoinLeft(joinType) + val rightBuildable = canBuildShuffledHashJoinRight(joinType) + if (rightBuildable) { + Some(BuildRight) + } else if (leftBuildable) { + Some(BuildLeft) + } else { + None + } + } + + val x = plan.transformUp { + case smj: SortMergeJoinExec if CometConf.COMET_REPLACE_SMJ.get() => + getBuildSide(smj.joinType) match { + case Some(buildSide) => + ShuffledHashJoinExec( + smj.leftKeys, + smj.rightKeys, + smj.joinType, + buildSide, + smj.condition, + smj.left, + smj.right, + smj.isSkewJoin) + case _ => plan + } + case _ => plan + } + + var newPlan = transform(normalizePlan(x)) // if the plan cannot be run fully natively then explain why (when appropriate // config is enabled) From 598735e18499bcfe1d013a98bfc5850faa85498d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 9 Oct 2024 10:31:58 -0600 Subject: [PATCH 02/18] fix and add credit --- NOTICE.txt | 7 +- .../scala/org/apache/comet/CometConf.scala | 2 +- docs/source/user-guide/configs.md | 1 + .../comet/CometSparkSessionExtensions.scala | 40 ++---------- .../org/apache/comet/rules/RewriteJoin.scala | 65 +++++++++++++++++++ 5 files changed, 79 insertions(+), 36 deletions(-) create mode 100644 spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala diff --git a/NOTICE.txt b/NOTICE.txt index 655222a268..b3b1abce2f 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -5,4 +5,9 @@ This product includes software developed at The Apache Software Foundation (http://www.apache.org/). This product includes software from the twox-hash project (MIT License) -https://github.com/shepmaster/twox-hash \ No newline at end of file +https://github.com/shepmaster/twox-hash + +This product includes software developed at +Apache Gluten (https://github.com/apache/incubator-gluten/) +Specifically: +- Optimizer rule to replace SortMergeJoin with ShuffleHashJoin diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 36642c46d0..a4a24a2425 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -264,7 +264,7 @@ object CometConf extends ShimCometConf { .createWithDefault(false) val COMET_REPLACE_SMJ: ConfigEntry[Boolean] = - conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMorgeJoin") + conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin") .doc("Whether to replace SortMergeJoin with ShuffleHashJoin for improved performance.") .booleanConf .createWithDefault(true) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index f7ef1d55f8..b28a5b6fc5 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -50,6 +50,7 @@ Comet provides the following configuration settings. | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true | | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 | | spark.comet.exec.project.enabled | Whether to enable project by default. | true | +| spark.comet.exec.replaceSortMergeJoin | Whether to replace SortMergeJoin with ShuffleHashJoin for improved performance. | true | | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true | | spark.comet.exec.sort.enabled | Whether to enable sort by default. | true | diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 2037b87e87..5e63aa49d5 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -27,9 +27,7 @@ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} import org.apache.spark.sql.catalyst.expressions.{Attribute, Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, PlanExpression, Remainder} import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial} -import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, NormalizeNaNAndZero} -import org.apache.spark.sql.catalyst.optimizer.InjectRuntimeFilter.{canBuildShuffledHashJoinLeft, canBuildShuffledHashJoinRight} -import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.MetadataColumnHelper @@ -57,6 +55,7 @@ import org.apache.comet.CometConf._ import org.apache.comet.CometExplainInfo.getActualPlan import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} +import org.apache.comet.rules.RewriteJoin import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde import org.apache.comet.shims.ShimCometSparkSessionExtensions @@ -940,38 +939,11 @@ class CometSparkSessionExtensions plan } } else { + val normalizedPlan = normalizePlan(plan) - def getBuildSide(joinType: JoinType): Option[BuildSide] = { - val leftBuildable = canBuildShuffledHashJoinLeft(joinType) - val rightBuildable = canBuildShuffledHashJoinRight(joinType) - if (rightBuildable) { - Some(BuildRight) - } else if (leftBuildable) { - Some(BuildLeft) - } else { - None - } - } - - val x = plan.transformUp { - case smj: SortMergeJoinExec if CometConf.COMET_REPLACE_SMJ.get() => - getBuildSide(smj.joinType) match { - case Some(buildSide) => - ShuffledHashJoinExec( - smj.leftKeys, - smj.rightKeys, - smj.joinType, - buildSide, - smj.condition, - smj.left, - smj.right, - smj.isSkewJoin) - case _ => plan - } - case _ => plan - } - - var newPlan = transform(normalizePlan(x)) + var newPlan = transform(normalizedPlan.transformUp { case p => + RewriteJoin.rewrite(p) + }) // if the plan cannot be run fully natively then explain why (when appropriate // config is enabled) diff --git a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala new file mode 100644 index 0000000000..b60761ce67 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.rules + +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, JoinSelectionHelper} +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} + +import org.apache.comet.CometConf + +/** + * Based on equivalent rule in Apache Gluten. + * + * This rule replaces [[SortMergeJoinExec]] with [[ShuffledHashJoinExec]]. + */ +object RewriteJoin extends JoinSelectionHelper { + + private def getBuildSide(joinType: JoinType): Option[BuildSide] = { + val leftBuildable = canBuildShuffledHashJoinLeft(joinType) + val rightBuildable = canBuildShuffledHashJoinRight(joinType) + if (rightBuildable) { + Some(BuildRight) + } else if (leftBuildable) { + Some(BuildLeft) + } else { + None + } + } + + def rewrite(plan: SparkPlan): SparkPlan = plan match { + case smj: SortMergeJoinExec if CometConf.COMET_REPLACE_SMJ.get() => + getBuildSide(smj.joinType) match { + case Some(buildSide) => + ShuffledHashJoinExec( + smj.leftKeys, + smj.rightKeys, + smj.joinType, + buildSide, + smj.condition, + smj.left, + smj.right, + smj.isSkewJoin) + case _ => plan + } + case _ => plan + } +} From d55f2ead76993320b1f48712e98f834706cdaa01 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 9 Oct 2024 11:03:57 -0600 Subject: [PATCH 03/18] disable by default and make internal --- common/src/main/scala/org/apache/comet/CometConf.scala | 6 ++++-- docs/source/user-guide/configs.md | 1 - 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index a4a24a2425..b49e809ab0 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -265,9 +265,11 @@ object CometConf extends ShimCometConf { val COMET_REPLACE_SMJ: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin") - .doc("Whether to replace SortMergeJoin with ShuffleHashJoin for improved performance.") + .doc("Whether to replace SortMergeJoin with ShuffledHashJoin for improved " + + "performance (experimental).") + .internal() .booleanConf - .createWithDefault(true) + .createWithDefault(false) val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf( s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec") diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index b28a5b6fc5..f7ef1d55f8 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -50,7 +50,6 @@ Comet provides the following configuration settings. | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true | | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 | | spark.comet.exec.project.enabled | Whether to enable project by default. | true | -| spark.comet.exec.replaceSortMergeJoin | Whether to replace SortMergeJoin with ShuffleHashJoin for improved performance. | true | | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true | | spark.comet.exec.sort.enabled | Whether to enable sort by default. | true | From e3313bdc8831f12fe407f43ab2529f0098501fdc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 9 Oct 2024 11:18:20 -0600 Subject: [PATCH 04/18] remove sort --- .../scala/org/apache/comet/rules/RewriteJoin.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala index b60761ce67..02e292f8dd 100644 --- a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala +++ b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala @@ -21,13 +21,12 @@ package org.apache.comet.rules import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, JoinSelectionHelper} import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{SortExec, SparkPlan} import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} - import org.apache.comet.CometConf /** - * Based on equivalent rule in Apache Gluten. + * Adapted from equivalent rule in Apache Gluten. * * This rule replaces [[SortMergeJoinExec]] with [[ShuffledHashJoinExec]]. */ @@ -45,6 +44,11 @@ object RewriteJoin extends JoinSelectionHelper { } } + private def removeSort(plan: SparkPlan) = plan match { + case _: SortExec => plan.children.head + case _ => plan + } + def rewrite(plan: SparkPlan): SparkPlan = plan match { case smj: SortMergeJoinExec if CometConf.COMET_REPLACE_SMJ.get() => getBuildSide(smj.joinType) match { @@ -55,8 +59,8 @@ object RewriteJoin extends JoinSelectionHelper { smj.joinType, buildSide, smj.condition, - smj.left, - smj.right, + removeSort(smj.left), + removeSort(smj.right), smj.isSkewJoin) case _ => plan } From a0d13813e8dd5bfc3d64436c80a065208c1fb24b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 9 Oct 2024 11:21:59 -0600 Subject: [PATCH 05/18] minor optimization --- .../main/scala/org/apache/comet/rules/RewriteJoin.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala index 02e292f8dd..0167333007 100644 --- a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala +++ b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.execution.{SortExec, SparkPlan} import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} + import org.apache.comet.CometConf /** @@ -33,11 +34,9 @@ import org.apache.comet.CometConf object RewriteJoin extends JoinSelectionHelper { private def getBuildSide(joinType: JoinType): Option[BuildSide] = { - val leftBuildable = canBuildShuffledHashJoinLeft(joinType) - val rightBuildable = canBuildShuffledHashJoinRight(joinType) - if (rightBuildable) { + if (canBuildShuffledHashJoinRight(joinType)) { Some(BuildRight) - } else if (leftBuildable) { + } else if (canBuildShuffledHashJoinLeft(joinType)) { Some(BuildLeft) } else { None From 948f2c008d444439ff779e549a687e92ac5bb8b0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 9 Oct 2024 11:23:14 -0600 Subject: [PATCH 06/18] minor optimization --- .../apache/comet/CometSparkSessionExtensions.scala | 12 ++++++++---- .../scala/org/apache/comet/rules/RewriteJoin.scala | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 5e63aa49d5..c1d63299e3 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -939,11 +939,15 @@ class CometSparkSessionExtensions plan } } else { - val normalizedPlan = normalizePlan(plan) + val normalizedPlan = if (CometConf.COMET_REPLACE_SMJ.get()) { + normalizePlan(plan).transformUp { case p => + RewriteJoin.rewrite(p) + } + } else { + normalizePlan(plan) + } - var newPlan = transform(normalizedPlan.transformUp { case p => - RewriteJoin.rewrite(p) - }) + var newPlan = transform(normalizedPlan) // if the plan cannot be run fully natively then explain why (when appropriate // config is enabled) diff --git a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala index 0167333007..4e73e437e1 100644 --- a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala +++ b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala @@ -49,7 +49,7 @@ object RewriteJoin extends JoinSelectionHelper { } def rewrite(plan: SparkPlan): SparkPlan = plan match { - case smj: SortMergeJoinExec if CometConf.COMET_REPLACE_SMJ.get() => + case smj: SortMergeJoinExec => getBuildSide(smj.joinType) match { case Some(buildSide) => ShuffledHashJoinExec( From fd87412e96a8f889a6ee2b85aa39857dec3efc71 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 9 Oct 2024 11:23:53 -0600 Subject: [PATCH 07/18] remove unused import --- spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala index 4e73e437e1..c619391167 100644 --- a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala +++ b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala @@ -24,8 +24,6 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.execution.{SortExec, SparkPlan} import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} -import org.apache.comet.CometConf - /** * Adapted from equivalent rule in Apache Gluten. * From 99eca10b774d4d768c8754023484b254057677bc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 9 Oct 2024 16:18:37 -0600 Subject: [PATCH 08/18] disable feature by default --- common/src/main/scala/org/apache/comet/CometConf.scala | 3 +-- docs/source/user-guide/configs.md | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index b49e809ab0..a4d4cfe7de 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -266,8 +266,7 @@ object CometConf extends ShimCometConf { val COMET_REPLACE_SMJ: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin") .doc("Whether to replace SortMergeJoin with ShuffledHashJoin for improved " + - "performance (experimental).") - .internal() + "performance.") .booleanConf .createWithDefault(false) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index f7ef1d55f8..ef816a9402 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -50,6 +50,7 @@ Comet provides the following configuration settings. | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true | | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 | | spark.comet.exec.project.enabled | Whether to enable project by default. | true | +| spark.comet.exec.replaceSortMergeJoin | Whether to replace SortMergeJoin with ShuffledHashJoin for improved performance. | false | | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true | | spark.comet.exec.sort.enabled | Whether to enable sort by default. | true | From 1d5b58d465c67199d4f5743aae237d97b6ed5b45 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 9 Oct 2024 17:44:44 -0600 Subject: [PATCH 09/18] fix dockerfile --- kube/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kube/Dockerfile b/kube/Dockerfile index e8ba72dc55..19f12137bf 100644 --- a/kube/Dockerfile +++ b/kube/Dockerfile @@ -65,4 +65,4 @@ ENV SCALA_VERSION=2.12 USER root # note the use of a wildcard in the file name so that this works with both snapshot and final release versions -COPY --from=builder /comet/spark/target/comet-spark-spark${SPARK_VERSION}_$SCALA_VERSION-0.2.0*.jar $SPARK_HOME/jars +COPY --from=builder /comet/spark/target/comet-spark-spark${SPARK_VERSION}_$SCALA_VERSION-*.jar $SPARK_HOME/jars From 1a5de4e21ed150673cd02f0beea99507ed662964 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 10 Oct 2024 10:17:23 -0600 Subject: [PATCH 10/18] Add section to tuning guide --- docs/source/user-guide/tuning.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index 2baced0927..eb5f4ba4d7 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -78,6 +78,18 @@ Note that there is currently a known issue where this will be inaccurate when us does not take executor concurrency into account. The tracking issue for this is https://github.com/apache/datafusion-comet/issues/949. +## Optimizing Joins + +Spark often chooses `SortMergeJoin` over `ShuffledHashJoin` for stability reasons. If the build-side of a +`ShuffledHashJoin` is very large then it could lead to OOM in Spark. + +Vectorized query engines tend to perform better with `ShuffledHashJoin`, so for best performance it is often preferable +to configure Comet to convert `SortMergeJoin` to `ShuffledHashJoin`. Comet does provide spill-to-disk for +`ShuffledHashJoin` so this should not result in OOM. However, `SortMergeJoin` may be faster in some cases. It is best +to test with both for your specific workloads. + +To configure Comet to convert `SortMergeJoin` to `ShuffledHashJoin`, set `spark.comet.exec.replaceSortMergeJoin=true`. + ## Shuffle Comet provides accelerated shuffle implementations that can be used to improve the performance of your queries. From 7cce6a5c2c74b92d1bc74400f93a6a772f5360c0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 10 Oct 2024 10:18:03 -0600 Subject: [PATCH 11/18] update benchmarking guide --- docs/source/contributor-guide/benchmarking.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/contributor-guide/benchmarking.md b/docs/source/contributor-guide/benchmarking.md index 456cacef15..8c8d53e677 100644 --- a/docs/source/contributor-guide/benchmarking.md +++ b/docs/source/contributor-guide/benchmarking.md @@ -64,6 +64,7 @@ $SPARK_HOME/bin/spark-submit \ --conf spark.executor.extraClassPath=$COMET_JAR \ --conf spark.plugins=org.apache.spark.CometPlugin \ --conf spark.comet.cast.allowIncompatible=true \ + --conf spark.comet.exec.replaceSortMergeJoin=true \ --conf spark.comet.exec.shuffle.enabled=true \ --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ tpcbench.py \ From 7ce87266cd3bd414ea99d821206fc5ca0e7c271b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 15 Oct 2024 09:04:39 -0600 Subject: [PATCH 12/18] Revert "chore: Reserve memory for native shuffle writer per partition (#988)" This reverts commit e146cfa1676fedcfb3003c2e3f1da1578d6b664b. --- .../execution/datafusion/shuffle_writer.rs | 251 +++++------------- 1 file changed, 61 insertions(+), 190 deletions(-) diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index 6c31746678..9668359fc0 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -66,14 +66,6 @@ use crate::{ }; use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes; -/// The status of appending rows to a partition buffer. -enum AppendRowStatus { - /// The difference in memory usage after appending rows - MemDiff(Result), - /// The index of the next row to append - StartIndex(usize), -} - /// The shuffle writer operator maps each input partition to M output partitions based on a /// partitioning scheme. No guarantees are made about the order of the resulting partitions. #[derive(Debug)] @@ -214,21 +206,10 @@ struct PartitionBuffer { /// The maximum number of rows in a batch. Once `num_active_rows` reaches `batch_size`, /// the active array builders will be frozen and appended to frozen buffer `frozen`. batch_size: usize, - /// Memory reservation for this partition buffer. - reservation: MemoryReservation, } impl PartitionBuffer { - fn new( - schema: SchemaRef, - batch_size: usize, - partition_id: usize, - runtime: &Arc, - ) -> Self { - let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]", partition_id)) - .with_can_spill(true) - .register(&runtime.memory_pool); - + fn new(schema: SchemaRef, batch_size: usize) -> Self { Self { schema, frozen: vec![], @@ -236,52 +217,47 @@ impl PartitionBuffer { active_slots_mem_size: 0, num_active_rows: 0, batch_size, - reservation, } } /// Initializes active builders if necessary. - /// Returns error if memory reservation fails. fn init_active_if_necessary(&mut self) -> Result { let mut mem_diff = 0; if self.active.is_empty() { - // Estimate the memory size of active builders + self.active = new_array_builders(&self.schema, self.batch_size); if self.active_slots_mem_size == 0 { self.active_slots_mem_size = self - .schema - .fields() + .active .iter() - .map(|field| slot_size(self.batch_size, field.data_type())) + .zip(self.schema.fields()) + .map(|(_ab, field)| slot_size(self.batch_size, field.data_type())) .sum::(); } - - self.reservation.try_grow(self.active_slots_mem_size)?; - - self.active = new_array_builders(&self.schema, self.batch_size); - mem_diff += self.active_slots_mem_size as isize; } Ok(mem_diff) } + /// Appends all rows of given batch into active array builders. + fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) -> Result { + let columns = batch.columns(); + let indices = (0..batch.num_rows()).collect::>(); + self.append_rows(columns, &indices, time_metric) + } + /// Appends rows of specified indices from columns into active array builders. fn append_rows( &mut self, columns: &[ArrayRef], indices: &[usize], - start_index: usize, time_metric: &Time, - ) -> AppendRowStatus { + ) -> Result { let mut mem_diff = 0; - let mut start = start_index; + let mut start = 0; // lazy init because some partition may be empty - let init = self.init_active_if_necessary(); - if init.is_err() { - return AppendRowStatus::StartIndex(start); - } - mem_diff += init.unwrap(); + mem_diff += self.init_active_if_necessary()?; while start < indices.len() { let end = (start + self.batch_size).min(indices.len()); @@ -294,22 +270,14 @@ impl PartitionBuffer { self.num_active_rows += end - start; if self.num_active_rows >= self.batch_size { let mut timer = time_metric.timer(); - let flush = self.flush(); - if let Err(e) = flush { - return AppendRowStatus::MemDiff(Err(e)); - } - mem_diff += flush.unwrap(); + mem_diff += self.flush()?; timer.stop(); - let init = self.init_active_if_necessary(); - if init.is_err() { - return AppendRowStatus::StartIndex(end); - } - mem_diff += init.unwrap(); + mem_diff += self.init_active_if_necessary()?; } start = end; } - AppendRowStatus::MemDiff(Ok(mem_diff)) + Ok(mem_diff) } /// flush active data into frozen bytes @@ -323,7 +291,7 @@ impl PartitionBuffer { let active = std::mem::take(&mut self.active); let num_rows = self.num_active_rows; self.num_active_rows = 0; - self.reservation.try_shrink(self.active_slots_mem_size)?; + mem_diff -= self.active_slots_mem_size as isize; let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?; @@ -607,7 +575,7 @@ struct ShuffleRepartitioner { output_data_file: String, output_index_file: String, schema: SchemaRef, - buffered_partitions: Vec, + buffered_partitions: Mutex>, spills: Mutex>, /// Sort expressions /// Partitioning scheme to use @@ -680,11 +648,11 @@ impl ShuffleRepartitioner { output_data_file, output_index_file, schema: Arc::clone(&schema), - buffered_partitions: (0..num_output_partitions) - .map(|partition_id| { - PartitionBuffer::new(Arc::clone(&schema), batch_size, partition_id, &runtime) - }) - .collect::>(), + buffered_partitions: Mutex::new( + (0..num_output_partitions) + .map(|_| PartitionBuffer::new(Arc::clone(&schema), batch_size)) + .collect::>(), + ), spills: Mutex::new(vec![]), partitioning, num_output_partitions, @@ -731,6 +699,8 @@ impl ShuffleRepartitioner { // Update data size metric self.metrics.data_size.add(input.get_array_memory_size()); + let time_metric = self.metrics.baseline.elapsed_compute(); + // NOTE: in shuffle writer exec, the output_rows metrics represents the // number of rows those are written to output data file. self.metrics.baseline.record_output(input.num_rows()); @@ -795,36 +765,34 @@ impl ShuffleRepartitioner { .enumerate() .filter(|(_, (start, end))| start < end) { - mem_diff += self - .append_rows_to_partition( - input.columns(), - &shuffled_partition_ids[start..end], - partition_id, - ) - .await?; - - if mem_diff > 0 { - let mem_increase = mem_diff as usize; - if self.reservation.try_grow(mem_increase).is_err() { - self.spill().await?; - self.reservation.free(); - self.reservation.try_grow(mem_increase)?; - - mem_diff = 0; - } - } - - if mem_diff < 0 { - let mem_used = self.reservation.size(); - let mem_decrease = mem_used.min(-mem_diff as usize); - self.reservation.shrink(mem_decrease); + let mut buffered_partitions = self.buffered_partitions.lock().await; + let output = &mut buffered_partitions[partition_id]; + + // If the range of indices is not big enough, just appending the rows into + // active array builders instead of directly adding them as a record batch. + mem_diff += output.append_rows( + input.columns(), + &shuffled_partition_ids[start..end], + time_metric, + )?; + } - mem_diff += mem_decrease as isize; + if mem_diff > 0 { + let mem_increase = mem_diff as usize; + if self.reservation.try_grow(mem_increase).is_err() { + self.spill().await?; + self.reservation.free(); + self.reservation.try_grow(mem_increase)?; } } + if mem_diff < 0 { + let mem_used = self.reservation.size(); + let mem_decrease = mem_used.min(-mem_diff as usize); + self.reservation.shrink(mem_decrease); + } } Partitioning::UnknownPartitioning(n) if *n == 1 => { - let buffered_partitions = &mut self.buffered_partitions; + let mut buffered_partitions = self.buffered_partitions.lock().await; assert!( buffered_partitions.len() == 1, @@ -832,10 +800,8 @@ impl ShuffleRepartitioner { buffered_partitions.len() ); - let indices = (0..input.num_rows()).collect::>(); - - self.append_rows_to_partition(input.columns(), &indices, 0) - .await?; + let output = &mut buffered_partitions[0]; + output.append_batch(&input, time_metric)?; } other => { // this should be unreachable as long as the validation logic @@ -852,7 +818,7 @@ impl ShuffleRepartitioner { /// Writes buffered shuffled record batches into Arrow IPC bytes. async fn shuffle_write(&mut self) -> Result { let num_output_partitions = self.num_output_partitions; - let buffered_partitions = &mut self.buffered_partitions; + let mut buffered_partitions = self.buffered_partitions.lock().await; let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; for i in 0..num_output_partitions { @@ -950,15 +916,16 @@ impl ShuffleRepartitioner { self.metrics.data_size.value() } - async fn spill(&mut self) -> Result { + async fn spill(&self) -> Result { log::debug!( "ShuffleRepartitioner spilling shuffle data of {} to disk while inserting ({} time(s) so far)", self.used(), self.spill_count() ); + let mut buffered_partitions = self.buffered_partitions.lock().await; // we could always get a chance to free some memory as long as we are holding some - if self.buffered_partitions.is_empty() { + if buffered_partitions.len() == 0 { return Ok(0); } @@ -969,7 +936,7 @@ impl ShuffleRepartitioner { .disk_manager .create_tmp_file("shuffle writer spill")?; let offsets = spill_into( - &mut self.buffered_partitions, + &mut buffered_partitions, spillfile.path(), self.num_output_partitions, ) @@ -987,60 +954,6 @@ impl ShuffleRepartitioner { }); Ok(used) } - - /// Appends rows of specified indices from columns into active array builders in the specified partition. - async fn append_rows_to_partition( - &mut self, - columns: &[ArrayRef], - indices: &[usize], - partition_id: usize, - ) -> Result { - let mut mem_diff = 0; - - let output = &mut self.buffered_partitions[partition_id]; - - let time_metric = self.metrics.baseline.elapsed_compute(); - - // If the range of indices is not big enough, just appending the rows into - // active array builders instead of directly adding them as a record batch. - let mut start_index: usize = 0; - let mut output_ret = output.append_rows(columns, indices, start_index, time_metric); - - loop { - match output_ret { - AppendRowStatus::MemDiff(l) => { - mem_diff += l?; - break; - } - AppendRowStatus::StartIndex(new_start) => { - // Cannot allocate enough memory for the array builders in the partition, - // spill partitions and retry. - self.spill().await?; - - let output = &mut self.buffered_partitions[partition_id]; - output.reservation.free(); - - let time_metric = self.metrics.baseline.elapsed_compute(); - - start_index = new_start; - output_ret = output.append_rows(columns, indices, start_index, time_metric); - - if let AppendRowStatus::StartIndex(new_start) = output_ret { - if new_start == start_index { - // If the start index is not updated, it means that the partition - // is still not able to allocate enough memory for the array builders. - return Err(DataFusionError::Internal( - "Partition is still not able to allocate enough memory for the array builders after spilling." - .to_string(), - )); - } - } - } - } - } - - Ok(mem_diff) - } } /// consume the `buffered_partitions` and do spill into a single temp shuffle output file @@ -1557,8 +1470,6 @@ mod test { use datafusion::physical_plan::common::collect; use datafusion::physical_plan::memory::MemoryExec; use datafusion::prelude::SessionContext; - use datafusion_execution::config::SessionConfig; - use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_physical_expr::expressions::Column; use tokio::runtime::Runtime; @@ -1593,65 +1504,25 @@ mod test { #[test] #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` fn test_insert_larger_batch() { - shuffle_write_test(10000, 1, 16, None); - } - - #[test] - #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` - fn test_insert_smaller_batch() { - shuffle_write_test(1000, 1, 16, None); - shuffle_write_test(1000, 10, 16, None); - } - - #[test] - #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` - #[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too many open files". - fn test_large_number_of_partitions() { - shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024)); - shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024)); - } - - #[test] - #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` - #[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too many open files". - fn test_large_number_of_partitions_spilling() { - shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024)); - } - - fn shuffle_write_test( - batch_size: usize, - num_batches: usize, - num_partitions: usize, - memory_limit: Option, - ) { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); let mut b = StringBuilder::new(); - for i in 0..batch_size { + for i in 0..10000 { b.append_value(format!("{i}")); } let array = b.finish(); let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap(); - let batches = (0..num_batches).map(|_| batch.clone()).collect::>(); + let batches = vec![batch.clone()]; let partitions = &[batches]; let exec = ShuffleWriterExec::try_new( Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()), - Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), + Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), ) .unwrap(); - - // 10MB memory should be enough for running this test - let config = SessionConfig::new(); - let mut runtime_env_builder = RuntimeEnvBuilder::new(); - runtime_env_builder = match memory_limit { - Some(limit) => runtime_env_builder.with_memory_limit(limit, 1.0), - None => runtime_env_builder, - }; - let runtime_env = Arc::new(runtime_env_builder.build().unwrap()); - let ctx = SessionContext::new_with_config_rt(config, runtime_env); + let ctx = SessionContext::new(); let task_ctx = ctx.task_ctx(); let stream = exec.execute(0, task_ctx).unwrap(); let rt = Runtime::new().unwrap(); From 26f9a4fdda3b9804f199e4a5d63ad675916f66c3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 18 Oct 2024 14:54:38 -0600 Subject: [PATCH 13/18] mark feature as experimental and explain risks --- common/src/main/scala/org/apache/comet/CometConf.scala | 5 +++-- docs/source/user-guide/configs.md | 2 +- docs/source/user-guide/tuning.md | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index a4d4cfe7de..113b057e9a 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -265,8 +265,9 @@ object CometConf extends ShimCometConf { val COMET_REPLACE_SMJ: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin") - .doc("Whether to replace SortMergeJoin with ShuffledHashJoin for improved " + - "performance.") + .doc("Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin " + + "for improved performance. See tuning guide for more information regarding stability of " + + "this feature.") .booleanConf .createWithDefault(false) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index ef816a9402..eb690684b4 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -50,7 +50,7 @@ Comet provides the following configuration settings. | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true | | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 | | spark.comet.exec.project.enabled | Whether to enable project by default. | true | -| spark.comet.exec.replaceSortMergeJoin | Whether to replace SortMergeJoin with ShuffledHashJoin for improved performance. | false | +| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. See tuning guide for more information regarding stability of this feature. | false | | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true | | spark.comet.exec.sort.enabled | Whether to enable sort by default. | true | diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index eb5f4ba4d7..30ada4c9b1 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -84,8 +84,8 @@ Spark often chooses `SortMergeJoin` over `ShuffledHashJoin` for stability reason `ShuffledHashJoin` is very large then it could lead to OOM in Spark. Vectorized query engines tend to perform better with `ShuffledHashJoin`, so for best performance it is often preferable -to configure Comet to convert `SortMergeJoin` to `ShuffledHashJoin`. Comet does provide spill-to-disk for -`ShuffledHashJoin` so this should not result in OOM. However, `SortMergeJoin` may be faster in some cases. It is best +to configure Comet to convert `SortMergeJoin` to `ShuffledHashJoin`. Comet does not yet provide spill-to-disk for +`ShuffledHashJoin` so this could result in OOM. Also, `SortMergeJoin` may still be faster in some cases. It is best to test with both for your specific workloads. To configure Comet to convert `SortMergeJoin` to `ShuffledHashJoin`, set `spark.comet.exec.replaceSortMergeJoin=true`. From 63ce71c2b22b7d2466315fb65f684721f81bf18d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 19 Oct 2024 14:30:43 -0600 Subject: [PATCH 14/18] workaround for TPC-DS q14 hanging on a RightSemi join --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 3805d418b8..d6276522bd 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2984,6 +2984,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim return None } + if (join.buildSide == BuildRight && join.joinType == LeftSemi) { + withInfo(join, "BuildRight with LeftSemi is not supported") + return None + } + if (join.buildSide == BuildRight && join.joinType == LeftAnti) { withInfo(join, "BuildRight with LeftAnti is not supported") return None From 60d1028a3c879b483fccc7bbe5b53c8c69e168e9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 20 Oct 2024 10:02:00 -0600 Subject: [PATCH 15/18] revert a change --- native/core/src/execution/datafusion/planner.rs | 4 ++++ .../src/main/scala/org/apache/comet/rules/RewriteJoin.scala | 5 ++++- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 5 ----- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 5b53cb3930..c83b5ac56a 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1183,6 +1183,8 @@ impl PhysicalPlanner { .zip(right_join_exprs) .collect::>(); + println!("join_type[1] = {}", join_type); + let join_type = match join_type.try_into() { Ok(JoinType::Inner) => DFJoinType::Inner, Ok(JoinType::LeftOuter) => DFJoinType::Left, @@ -1200,6 +1202,8 @@ impl PhysicalPlanner { } }; + println!("join_type[2] = {}", join_type); + // Handle join filter as DataFusion `JoinFilter` struct let join_filter = if let Some(expr) = condition { let left_schema = left.schema(); diff --git a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala index c619391167..a58973f127 100644 --- a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala +++ b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala @@ -20,7 +20,7 @@ package org.apache.comet.rules import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, JoinSelectionHelper} -import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.{JoinType, LeftSemi} import org.apache.spark.sql.execution.{SortExec, SparkPlan} import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} @@ -49,6 +49,9 @@ object RewriteJoin extends JoinSelectionHelper { def rewrite(plan: SparkPlan): SparkPlan = plan match { case smj: SortMergeJoinExec => getBuildSide(smj.joinType) match { + case Some(BuildRight) if smj.joinType == LeftSemi => + // TODO this was added as a workaround for TPC-DS q14 hanging and needs further investigation + plan case Some(buildSide) => ShuffledHashJoinExec( smj.leftKeys, diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index d6276522bd..3805d418b8 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2984,11 +2984,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim return None } - if (join.buildSide == BuildRight && join.joinType == LeftSemi) { - withInfo(join, "BuildRight with LeftSemi is not supported") - return None - } - if (join.buildSide == BuildRight && join.joinType == LeftAnti) { withInfo(join, "BuildRight with LeftAnti is not supported") return None From 662d0de346d5977b0ae1ea4440b6d018acf2e741 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 20 Oct 2024 10:04:06 -0600 Subject: [PATCH 16/18] remove debug logging: --- native/core/src/execution/datafusion/planner.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index c83b5ac56a..5b53cb3930 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1183,8 +1183,6 @@ impl PhysicalPlanner { .zip(right_join_exprs) .collect::>(); - println!("join_type[1] = {}", join_type); - let join_type = match join_type.try_into() { Ok(JoinType::Inner) => DFJoinType::Inner, Ok(JoinType::LeftOuter) => DFJoinType::Left, @@ -1202,8 +1200,6 @@ impl PhysicalPlanner { } }; - println!("join_type[2] = {}", join_type); - // Handle join filter as DataFusion `JoinFilter` struct let join_filter = if let Some(expr) = condition { let left_schema = left.schema(); From 6ed01c15dc6fbb39560828513ddd3d6190bfc01b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 20 Oct 2024 10:47:11 -0600 Subject: [PATCH 17/18] format --- spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala index a58973f127..2f1f5ab744 100644 --- a/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala +++ b/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala @@ -50,7 +50,8 @@ object RewriteJoin extends JoinSelectionHelper { case smj: SortMergeJoinExec => getBuildSide(smj.joinType) match { case Some(BuildRight) if smj.joinType == LeftSemi => - // TODO this was added as a workaround for TPC-DS q14 hanging and needs further investigation + // TODO this was added as a workaround for TPC-DS q14 hanging and needs + // further investigation plan case Some(buildSide) => ShuffledHashJoinExec( From 107351713da16b19ec2a330a0c031b70653edf6d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 21 Oct 2024 07:57:37 -0600 Subject: [PATCH 18/18] add link to tuning guide --- common/src/main/scala/org/apache/comet/CometConf.scala | 6 ++++-- docs/source/user-guide/configs.md | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 113b057e9a..6833aefffe 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -45,6 +45,9 @@ import org.apache.comet.shims.ShimCometConf */ object CometConf extends ShimCometConf { + private val TUNING_GUIDE = "For more information, refer to the Comet Tuning " + + "Guide (https://datafusion.apache.org/comet/user-guide/tuning.html)" + /** List of all configs that is used for generating documentation */ val allConfs = new ListBuffer[ConfigEntry[_]] @@ -266,8 +269,7 @@ object CometConf extends ShimCometConf { val COMET_REPLACE_SMJ: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin") .doc("Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin " + - "for improved performance. See tuning guide for more information regarding stability of " + - "this feature.") + s"for improved performance. This feature is not stable yet. $TUNING_GUIDE.") .booleanConf .createWithDefault(false) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index eb690684b4..1618a03452 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -50,7 +50,7 @@ Comet provides the following configuration settings. | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true | | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 | | spark.comet.exec.project.enabled | Whether to enable project by default. | true | -| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. See tuning guide for more information regarding stability of this feature. | false | +| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false | | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true | | spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |