Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,14 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_TRANSFORMATIONS: ConfigEntry[Boolean] =
conf("spark.comet.explain.rules")
.doc("When this setting is enabled, Comet will log all plan transformations performed " +
"in physical optimizer rules ")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explainFallback.enabled")
.doc(
Expand Down
11 changes: 11 additions & 0 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ import org.apache.comet.serde.QueryPlanSerde
* Spark physical optimizer rule for replacing Spark operators with Comet operators.
*/
case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {

private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()

private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case s: ShuffleExchangeExec
Expand Down Expand Up @@ -619,6 +622,14 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
}

override def apply(plan: SparkPlan): SparkPlan = {
val newPlan = _apply(plan)
if (showTransformations) {
logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
}
newPlan
}

private def _apply(plan: SparkPlan): SparkPlan = {
// DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is
// enabled.
if (isANSIEnabled(conf)) {
Expand Down
11 changes: 11 additions & 0 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,18 @@ import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
* Spark physical optimizer rule for replacing Spark scans with Comet scans.
*/
case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {

private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()

override def apply(plan: SparkPlan): SparkPlan = {
val newPlan = _apply(plan)
if (showTransformations) {
logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
}
newPlan
}

private def _apply(plan: SparkPlan): SparkPlan = {
if (!isCometLoaded(conf) || !isCometScanEnabled(conf)) {
if (!isCometLoaded(conf)) {
withInfo(plan, "Comet is not enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, Comet
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.QueryStageExec

import org.apache.comet.CometConf

// This rule is responsible for eliminating redundant transitions between row-based and
// columnar-based operators for Comet. Currently, three potential redundant transitions are:
// 1. `ColumnarToRowExec` on top of an ending `CometCollectLimitExec` operator, which is
Expand All @@ -48,7 +50,18 @@ import org.apache.spark.sql.execution.adaptive.QueryStageExec
// another `ColumnarToRowExec` on top of `CometSparkToColumnarExec`. In this case, the pair could
// be removed.
case class EliminateRedundantTransitions(session: SparkSession) extends Rule[SparkPlan] {

private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()

override def apply(plan: SparkPlan): SparkPlan = {
val newPlan = _apply(plan)
if (showTransformations) {
logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
}
newPlan
}

private def _apply(plan: SparkPlan): SparkPlan = {
val eliminatedPlan = plan transformUp {
case ColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) =>
if (sparkToColumnar.child.supportsColumnar) {
Expand Down
Loading