diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index bf09d6417d..0185a15e5b 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -25,11 +25,12 @@ import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} -import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, PlanExpression, Remainder} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, PlanExpression, Remainder} import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial} import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreeNode +import org.apache.spark.sql.catalyst.util.MetadataColumnHelper import org.apache.spark.sql.comet._ import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec, CometShuffleManager} import org.apache.spark.sql.comet.util.Utils @@ -101,7 +102,19 @@ class CometSparkSessionExtensions def isDynamicPruningFilter(e: Expression): Boolean = e.exists(_.isInstanceOf[PlanExpression[_]]) + def hasMetadataCol(plan: SparkPlan): Boolean = { + plan.expressions.exists(_.exists { + case a: Attribute => + a.isMetadataCol + case _ => false + }) + } + plan.transform { + case scan if hasMetadataCol(scan) => + withInfo(scan, "Metadata column is not supported") + scan + case scanExec: FileSourceScanExec if COMET_DPP_FALLBACK_ENABLED.get() && scanExec.partitionFilters.exists(isDynamicPruningFilter) =>