Skip to content

Commit 042104b

Browse files
authored
feat: Add config option for showing all Comet plan transformations (#1780)
1 parent 9da11c5 commit 042104b

File tree

4 files changed

+43
-0
lines changed

4 files changed

+43
-0
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,14 @@ object CometConf extends ShimCometConf {
454454
.booleanConf
455455
.createWithDefault(false)
456456

457+
val COMET_EXPLAIN_TRANSFORMATIONS: ConfigEntry[Boolean] =
458+
conf("spark.comet.explain.rules")
459+
.doc("When this setting is enabled, Comet will log all plan transformations performed " +
460+
"in physical optimizer rules. Default: false")
461+
.internal()
462+
.booleanConf
463+
.createWithDefault(false)
464+
457465
val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
458466
conf("spark.comet.explainFallback.enabled")
459467
.doc(

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ import org.apache.comet.serde.QueryPlanSerde
4444
* Spark physical optimizer rule for replacing Spark operators with Comet operators.
4545
*/
4646
case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
47+
48+
private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()
49+
4750
private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
4851
plan.transformUp {
4952
case s: ShuffleExchangeExec
@@ -619,6 +622,14 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
619622
}
620623

621624
override def apply(plan: SparkPlan): SparkPlan = {
625+
val newPlan = _apply(plan)
626+
if (showTransformations) {
627+
logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
628+
}
629+
newPlan
630+
}
631+
632+
private def _apply(plan: SparkPlan): SparkPlan = {
622633
// DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is
623634
// enabled.
624635
if (isANSIEnabled(conf)) {

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,18 @@ import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
4242
* Spark physical optimizer rule for replacing Spark scans with Comet scans.
4343
*/
4444
case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
45+
46+
private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()
47+
4548
override def apply(plan: SparkPlan): SparkPlan = {
49+
val newPlan = _apply(plan)
50+
if (showTransformations) {
51+
logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
52+
}
53+
newPlan
54+
}
55+
56+
private def _apply(plan: SparkPlan): SparkPlan = {
4657
if (!isCometLoaded(conf) || !isCometScanEnabled(conf)) {
4758
if (!isCometLoaded(conf)) {
4859
withInfo(plan, "Comet is not enabled")

spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, Comet
2626
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan}
2727
import org.apache.spark.sql.execution.adaptive.QueryStageExec
2828

29+
import org.apache.comet.CometConf
30+
2931
// This rule is responsible for eliminating redundant transitions between row-based and
3032
// columnar-based operators for Comet. Currently, three potential redundant transitions are:
3133
// 1. `ColumnarToRowExec` on top of an ending `CometCollectLimitExec` operator, which is
@@ -48,7 +50,18 @@ import org.apache.spark.sql.execution.adaptive.QueryStageExec
4850
// another `ColumnarToRowExec` on top of `CometSparkToColumnarExec`. In this case, the pair could
4951
// be removed.
5052
case class EliminateRedundantTransitions(session: SparkSession) extends Rule[SparkPlan] {
53+
54+
private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()
55+
5156
override def apply(plan: SparkPlan): SparkPlan = {
57+
val newPlan = _apply(plan)
58+
if (showTransformations) {
59+
logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
60+
}
61+
newPlan
62+
}
63+
64+
private def _apply(plan: SparkPlan): SparkPlan = {
5265
val eliminatedPlan = plan transformUp {
5366
case ColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) =>
5467
if (sparkToColumnar.child.supportsColumnar) {

0 commit comments

Comments
 (0)