Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,14 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_TRANSFORMATIONS: ConfigEntry[Boolean] =
conf("spark.comet.explain.rules")
.doc("When this setting is enabled, Comet will log all plan transformations performed " +
"in physical optimizer rules. Default: false")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explainFallback.enabled")
.doc(
Expand Down
11 changes: 11 additions & 0 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ import org.apache.comet.serde.QueryPlanSerde
* Spark physical optimizer rule for replacing Spark operators with Comet operators.
*/
case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {

private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()

private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case s: ShuffleExchangeExec
Expand Down Expand Up @@ -619,6 +622,14 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
}

override def apply(plan: SparkPlan): SparkPlan = {
val newPlan = _apply(plan)
if (showTransformations) {
logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
}
newPlan
}

private def _apply(plan: SparkPlan): SparkPlan = {
// DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is
// enabled.
if (isANSIEnabled(conf)) {
Expand Down
11 changes: 11 additions & 0 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,18 @@ import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
* Spark physical optimizer rule for replacing Spark scans with Comet scans.
*/
case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {

private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()

override def apply(plan: SparkPlan): SparkPlan = {
val newPlan = _apply(plan)
if (showTransformations) {
logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
}
newPlan
}

private def _apply(plan: SparkPlan): SparkPlan = {
if (!isCometLoaded(conf) || !isCometScanEnabled(conf)) {
if (!isCometLoaded(conf)) {
withInfo(plan, "Comet is not enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, Comet
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.QueryStageExec

import org.apache.comet.CometConf

// This rule is responsible for eliminating redundant transitions between row-based and
// columnar-based operators for Comet. Currently, three potential redundant transitions are:
// 1. `ColumnarToRowExec` on top of an ending `CometCollectLimitExec` operator, which is
Expand All @@ -48,7 +50,18 @@ import org.apache.spark.sql.execution.adaptive.QueryStageExec
// another `ColumnarToRowExec` on top of `CometSparkToColumnarExec`. In this case, the pair could
// be removed.
case class EliminateRedundantTransitions(session: SparkSession) extends Rule[SparkPlan] {

private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()

override def apply(plan: SparkPlan): SparkPlan = {
val newPlan = _apply(plan)
if (showTransformations) {
logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
}
newPlan
}

private def _apply(plan: SparkPlan): SparkPlan = {
val eliminatedPlan = plan transformUp {
case ColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) =>
if (sparkToColumnar.child.supportsColumnar) {
Expand Down
Loading