diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 7ed3725bed..7a69e63096 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -3148,17 +3148,22 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim orderSpec: Seq[SortOrder], op: SparkPlan): Boolean = { if (partitionSpec.length != orderSpec.length) { - withInfo(op, "Partitioning and sorting specifications do not match") return false } - val partitionColumnNames = partitionSpec.collect { case a: AttributeReference => - a.name + val partitionColumnNames = partitionSpec.collect { + case a: AttributeReference => a.name + case other => + withInfo(op, s"Unsupported partition expression: ${other.getClass.getSimpleName}") + return false } val orderColumnNames = orderSpec.collect { case s: SortOrder => s.child match { case a: AttributeReference => a.name + case other => + withInfo(op, s"Unsupported sort expression: ${other.getClass.getSimpleName}") + return false } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 9a642f12f7..8170230bc6 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.catalyst.optimizer.EliminateSorts import org.apache.spark.sql.comet.CometHashAggregateExec +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.{count_distinct, sum} import org.apache.spark.sql.internal.SQLConf @@ -89,6 +90,37 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + // based on Spark's SQLWindowFunctionSuite test of the same name + test("window function: partition and order expressions") { + for (shuffleMode <- Seq("auto", "native", "jvm")) { + withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) { + val df = + Seq((1, "a", 5), (2, "a", 6), (3, "b", 7), (4, "b", 8), (5, "c", 9), (6, "c", 10)).toDF( + "month", + "area", + "product") + df.createOrReplaceTempView("windowData") + val df2 = sql(""" + |select month, area, product, sum(product + 1) over (partition by 1 order by 2) + |from windowData + """.stripMargin) + checkSparkAnswer(df2) + val cometShuffles = collect(df2.queryExecution.executedPlan) { + case _: CometShuffleExchangeExec => true + } + if (shuffleMode == "jvm") { + assert(cometShuffles.length == 1) + } else { + // we fall back to Spark for shuffle because we do not support + // native shuffle with a LocalTableScan input, and we do not fall + // back to Comet columnar shuffle due to + // https://github.com/apache/datafusion-comet/issues/1248 + assert(cometShuffles.isEmpty) + } + } + } + } + test("multiple column distinct count") { withSQLConf( CometConf.COMET_ENABLED.key -> "true",