diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala index 87387b18dbab4..b1952e225491f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala @@ -326,4 +326,11 @@ object QueryCompilationErrors { "of rows, therefore they are currently not supported.", t.origin.line, t.origin.startPosition) } + def complexColumnExpressionsNotSupportOutsideOfProject( + method: String, + t: TreeNode[_]): Throwable = { + new AnalysisException(s"Complex column expressions `${method}`" + + s" not supported outside of Project's projection list.", + t.origin.line, t.origin.startPosition) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0bef6998b177d..be09df5acb522 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -259,6 +259,7 @@ class Analyzer(override val catalogManager: CatalogManager) ResolveGenerate :: ResolveFunctions :: ResolveAliases :: + ResolveColumnsExpression :: ResolveSubquery :: ResolveSubqueryColumnAliases :: ResolveWindowOrder :: @@ -971,6 +972,40 @@ class Analyzer(override val catalogManager: CatalogManager) } } + /** + * Resolve complex column generator expression. + */ + object ResolveColumnsExpression extends Rule[LogicalPlan] { + + def containsAllColumnsExcept(expr: Expression): Boolean = expr.collect { + case a: AllColumnsExcept => a + }.nonEmpty + + def expandAllColumnsExcept(p: Project, expr: AllColumnsExcept): Seq[NamedExpression] = { + p.child.output.filter(!expr.children.contains(_)) + } + + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { + case p: Project => + val newProjectList = p.projectList.flatMap { + case UnresolvedAlias(child: AllColumnsExcept, _) => + expandAllColumnsExcept(p, child) + case Alias(child: AllColumnsExcept, _) => + expandAllColumnsExcept(p, child) + case MultiAlias(child: AllColumnsExcept, _) => + expandAllColumnsExcept(p, child) + case e if containsAllColumnsExcept(e) => + throw QueryCompilationErrors + .complexColumnExpressionsNotSupportOutsideOfProject("all_columns_except", p) + case e => Seq(e) + } + p.copy(projectList = newProjectList) + case p if p.expressions.exists(containsAllColumnsExcept) => + throw QueryCompilationErrors + .complexColumnExpressionsNotSupportOutsideOfProject("all_columns_except", p) + } + } + /** * Resolve table relations with concrete relations from v2 catalog. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 3b46de539ce3d..adc063db5a6b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -575,7 +575,10 @@ object FunctionRegistry { // csv expression[CsvToStructs]("from_csv"), expression[SchemaOfCsv]("schema_of_csv"), - expression[StructsToCsv]("to_csv") + expression[StructsToCsv]("to_csv"), + + // column + expression[AllColumnsExcept]("all_columns_except") ) val builtin: SimpleFunctionRegistry = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/columnExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/columnExpression.scala new file mode 100644 index 0000000000000..24c37bf9f6164 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/columnExpression.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.types.DataType + +/** + * A function that returns the columns except columns given in function parameters. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ + _FUNC_(expr*) - Returns the columns except columns given in function parameters. + """, + examples = """ + Examples: + > select * FROM TBL; + A B C + 1 2 3 + > SELECT _FUNC_(a, b) FROM TBL; + C + 3 + """, + since = "1.5.0") +// scalastyle:on line.size.limit +case class AllColumnsExcept(exclude: Expression*) extends Expression with CodegenFallback { + + override def children: Seq[Expression] = exclude + + // this should be replaced first + override lazy val resolved: Boolean = false + + override def dataType: DataType = throw new UnsupportedOperationException + override def foldable: Boolean = false + override def nullable: Boolean = true + override def eval(input: InternalRow): Any = throw new UnsupportedOperationException +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 112b1a7210cb4..281975c7d51d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3745,6 +3745,26 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } } + + test("SPARK-33809: Support `all_columns_except` complex column expression") { + withTempView("src") { + val df = Seq((1, 2)).toDF("key", "value") + df.createOrReplaceTempView("src") + checkAnswer(sql("SELECT all_columns_except(key) FROM src"), Row(2)) + checkAnswer(sql("SELECT key, all_columns_except(key) FROM src"), Row(1, 2)) + checkAnswer(sql("SELECT key, all_columns_except(value) FROM src"), Row(1, 1)) + val e1 = intercept[AnalysisException] { + sql("SELECT key, all_columns_except(value, invalid_column) FROM src") + }.getMessage + assert(e1.contains("cannot resolve '`invalid_column`' given input columns:" + + " [src.key, src.value]")) + val e2 = intercept[AnalysisException] { + sql("SELECT all_columns_except(value), count(value) FROM src GROUP BY key") + }.getMessage + assert(e2.contains("Complex column expressions `all_columns_except` not supported" + + " outside of Project's projection list.")) + } + } } case class Foo(bar: Option[String])