-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16947][SQL] Support type coercion and foldable expression for inline tables #14676
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
d7acae5
[SPARK-16947][SQL] Support type coercion and foldable expression for …
petermaxlee 6a0450b
Add one more negative test case for aggregate expression.
petermaxlee 2327b79
update doc about visibility for testing.
petermaxlee fcc3caf
Fix PlanParserSuite
petermaxlee f597bae
Merge branch 'master' into SPARK-16947
petermaxlee 092605b
Fix tests
petermaxlee 4723902
Add two cases with null.
petermaxlee c1071af
Code review
petermaxlee 1b39a97
Code review
petermaxlee 2e68438
small code review
petermaxlee fb9de34
Add comment explaining attribute reference.
petermaxlee aed7c5e
nullability and remove rand
petermaxlee 08f4e39
Merge remote-tracking branch 'apache/master' into SPARK-16947
petermaxlee 285b941
updated test case comment
petermaxlee 88e7272
Fix merge bug
petermaxlee File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
112 changes: 112 additions & 0 deletions
112
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,112 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.analysis | ||
|
|
||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.Cast | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.types.{StructField, StructType} | ||
|
|
||
| /** | ||
| * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. | ||
| */ | ||
| object ResolveInlineTables extends Rule[LogicalPlan] { | ||
| override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { | ||
| case table: UnresolvedInlineTable if table.expressionsResolved => | ||
| validateInputDimension(table) | ||
| validateInputEvaluable(table) | ||
| convert(table) | ||
| } | ||
|
|
||
| /** | ||
| * Validates the input data dimension: | ||
| * 1. All rows have the same cardinality. | ||
| * 2. The number of column aliases defined is consistent with the number of columns in data. | ||
| * | ||
| * This is package visible for unit testing. | ||
| */ | ||
| private[analysis] def validateInputDimension(table: UnresolvedInlineTable): Unit = { | ||
| if (table.rows.nonEmpty) { | ||
| val numCols = table.names.size | ||
| table.rows.zipWithIndex.foreach { case (row, ri) => | ||
| if (row.size != numCols) { | ||
| table.failAnalysis(s"expected $numCols columns but found ${row.size} columns in row $ri") | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Validates that all inline table data are valid expressions that can be evaluated | ||
| * (in this they must be foldable). | ||
| * | ||
| * This is package visible for unit testing. | ||
| */ | ||
| private[analysis] def validateInputEvaluable(table: UnresolvedInlineTable): Unit = { | ||
| table.rows.foreach { row => | ||
| row.foreach { e => | ||
| // Note that nondeterministic expressions are not supported since they are not foldable. | ||
| if (!e.resolved || !e.foldable) { | ||
| e.failAnalysis(s"cannot evaluate expression ${e.sql} in inline table definition") | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Convert a valid (with right shape and foldable inputs) [[UnresolvedInlineTable]] | ||
| * into a [[LocalRelation]]. | ||
| * | ||
| * This function attempts to coerce inputs into consistent types. | ||
| * | ||
| * This is package visible for unit testing. | ||
| */ | ||
| private[analysis] def convert(table: UnresolvedInlineTable): LocalRelation = { | ||
| // For each column, traverse all the values and find a common data type and nullability. | ||
| val fields = table.rows.transpose.zip(table.names).map { case (column, name) => | ||
| val inputTypes = column.map(_.dataType) | ||
| val tpe = TypeCoercion.findWiderTypeWithoutStringPromotion(inputTypes).getOrElse { | ||
| table.failAnalysis(s"incompatible types found in column $name for inline table") | ||
| } | ||
| StructField(name, tpe, nullable = column.exists(_.nullable)) | ||
| } | ||
| val attributes = StructType(fields).toAttributes | ||
| assert(fields.size == table.names.size) | ||
|
|
||
| val newRows: Seq[InternalRow] = table.rows.map { row => | ||
| InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) => | ||
| val targetType = fields(ci).dataType | ||
| try { | ||
| if (e.dataType.sameType(targetType)) { | ||
| e.eval() | ||
| } else { | ||
| Cast(e, targetType).eval() | ||
| } | ||
| } catch { | ||
| case NonFatal(ex) => | ||
| table.failAnalysis(s"failed to evaluate expression ${e.sql}: ${ex.getMessage}") | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| LocalRelation(attributes, newRows) | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -50,10 +50,30 @@ case class UnresolvedRelation( | |
| } | ||
|
|
||
| /** | ||
| * Holds a table-valued function call that has yet to be resolved. | ||
| * An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into | ||
| * a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]. | ||
| * | ||
| * @param names list of column names | ||
| * @param rows expressions for the data | ||
| */ | ||
| case class UnresolvedInlineTable( | ||
| names: Seq[String], | ||
| rows: Seq[Seq[Expression]]) | ||
| extends LeafNode { | ||
|
|
||
| lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is used only once. Lets move this code into that location.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do want this memoized, so a lazy val is better here. |
||
| override lazy val resolved = false | ||
| override def output: Seq[Attribute] = Nil | ||
| } | ||
|
|
||
| /** | ||
| * A table-valued function, e.g. | ||
| * {{{ | ||
| * select * from range(10); | ||
| * }}} | ||
| */ | ||
| case class UnresolvedTableValuedFunction( | ||
| functionName: String, functionArgs: Seq[Expression]) extends LeafNode { | ||
| case class UnresolvedTableValuedFunction(functionName: String, functionArgs: Seq[Expression]) | ||
| extends LeafNode { | ||
|
|
||
| override def output: Seq[Attribute] = Nil | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
101 changes: 101 additions & 0 deletions
101
...lyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.analysis | ||
|
|
||
| import org.scalatest.BeforeAndAfter | ||
|
|
||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.expressions.{Literal, Rand} | ||
| import org.apache.spark.sql.catalyst.expressions.aggregate.Count | ||
| import org.apache.spark.sql.catalyst.plans.PlanTest | ||
| import org.apache.spark.sql.types.{LongType, NullType} | ||
|
|
||
| /** | ||
| * Unit tests for [[ResolveInlineTables]]. Note that there are also test cases defined in | ||
| * end-to-end tests (in sql/core module) for verifying the correct error messages are shown | ||
| * in negative cases. | ||
| */ | ||
| class ResolveInlineTablesSuite extends PlanTest with BeforeAndAfter { | ||
|
|
||
| private def lit(v: Any): Literal = Literal(v) | ||
|
|
||
| test("validate inputs are foldable") { | ||
| ResolveInlineTables.validateInputEvaluable( | ||
| UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(lit(1))))) | ||
|
|
||
| // nondeterministic (rand) should not work | ||
| intercept[AnalysisException] { | ||
| ResolveInlineTables.validateInputEvaluable( | ||
| UnresolvedInlineTable(Seq("c1"), Seq(Seq(Rand(1))))) | ||
| } | ||
|
|
||
| // aggregate should not work | ||
| intercept[AnalysisException] { | ||
| ResolveInlineTables.validateInputEvaluable( | ||
| UnresolvedInlineTable(Seq("c1"), Seq(Seq(Count(lit(1)))))) | ||
| } | ||
|
|
||
| // unresolved attribute should not work | ||
| intercept[AnalysisException] { | ||
| ResolveInlineTables.validateInputEvaluable( | ||
| UnresolvedInlineTable(Seq("c1"), Seq(Seq(UnresolvedAttribute("A"))))) | ||
| } | ||
| } | ||
|
|
||
| test("validate input dimensions") { | ||
| ResolveInlineTables.validateInputDimension( | ||
| UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2))))) | ||
|
|
||
| // num alias != data dimension | ||
| intercept[AnalysisException] { | ||
| ResolveInlineTables.validateInputDimension( | ||
| UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(lit(1)), Seq(lit(2))))) | ||
| } | ||
|
|
||
| // num alias == data dimension, but data themselves are inconsistent | ||
| intercept[AnalysisException] { | ||
| ResolveInlineTables.validateInputDimension( | ||
| UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(21), lit(22))))) | ||
| } | ||
| } | ||
|
|
||
| test("do not fire the rule if not all expressions are resolved") { | ||
| val table = UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(UnresolvedAttribute("A")))) | ||
| assert(ResolveInlineTables(table) == table) | ||
| } | ||
|
|
||
| test("convert") { | ||
| val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2L)))) | ||
| val converted = ResolveInlineTables.convert(table) | ||
|
|
||
| assert(converted.output.map(_.dataType) == Seq(LongType)) | ||
| assert(converted.data.size == 2) | ||
| assert(converted.data(0).getLong(0) == 1L) | ||
| assert(converted.data(1).getLong(0) == 2L) | ||
| } | ||
|
|
||
| test("nullability inference in convert") { | ||
| val table1 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2L)))) | ||
| val converted1 = ResolveInlineTables.convert(table1) | ||
| assert(!converted1.schema.fields(0).nullable) | ||
|
|
||
| val table2 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(Literal(null, NullType)))) | ||
| val converted2 = ResolveInlineTables.convert(table2) | ||
| assert(converted2.schema.fields(0).nullable) | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we just get the
table.names.sizefirst and iterate the rows?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. Let me do that.