Skip to content

Commit e4f4886

Browse files
cloud-fanmarmbrus
authored andcommitted
[SPARK-2096][SQL] Correctly parse dot notations
First let me write down the current `projections` grammar of spark sql: expression : orExpression orExpression : andExpression {"or" andExpression} andExpression : comparisonExpression {"and" comparisonExpression} comparisonExpression : termExpression | termExpression "=" termExpression | termExpression ">" termExpression | ... termExpression : productExpression {"+"|"-" productExpression} productExpression : baseExpression {"*"|"/"|"%" baseExpression} baseExpression : expression "[" expression "]" | ... | ident | ... ident : identChar {identChar | digit} | delimiters | ... identChar : letter | "_" | "." delimiters : "," | ";" | "(" | ")" | "[" | "]" | ... projection : expression [["AS"] ident] projections : projection { "," projection} For something like `a.b.c[1]`, it will be parsed as: <img src="http://img51.imgspice.com/i/03008/4iltjsnqgmtt_t.jpg" border=0> But for something like `a[1].b`, the current grammar can't parse it correctly. A simple solution is written in `ParquetQuerySuite#NestedSqlParser`, changed grammars are: delimiters : "." | "," | ";" | "(" | ")" | "[" | "]" | ... identChar : letter | "_" baseExpression : expression "[" expression "]" | expression "." ident | ... | ident | ... This works well, but can't cover some corner case like `select t.a.b from table as t`: <img src="http://img51.imgspice.com/i/03008/v2iau3hoxoxg_t.jpg" border=0> `t.a.b` parsed as `GetField(GetField(UnResolved("t"), "a"), "b")` instead of `GetField(UnResolved("t.a"), "b")` using this new grammar. However, we can't resolve `t` as it's not a filed, but the whole table.(if we could do this, then `select t from table as t` is legal, which is unexpected) My solution is: dotExpressionHeader : ident "." ident baseExpression : expression "[" expression "]" | expression "." ident | ... | dotExpressionHeader | ident | ... I passed all test cases under sql locally and add a more complex case. "arrayOfStruct.field1 to access all values of field1" is not supported yet. Since this PR has changed a lot of code, I will open another PR for it. I'm not familiar with the latter optimize phase, please correct me if I missed something. Author: Wenchen Fan <[email protected]> Author: Michael Armbrust <[email protected]> Closes apache#2230 from cloud-fan/dot and squashes the following commits: e1a8898 [Wenchen Fan] remove support for arbitrary nested arrays ee8a724 [Wenchen Fan] rollback LogicalPlan, support dot operation on nested array type a58df40 [Michael Armbrust] add regression test for doubly nested data 16bc4c6 [Wenchen Fan] some enhance 95d733f [Wenchen Fan] split long line dc31698 [Wenchen Fan] SPARK-2096 Correctly parse dot notations
1 parent 1f4a648 commit e4f4886

File tree

6 files changed

+88
-90
lines changed

6 files changed

+88
-90
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,16 +357,25 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
357357
expression ~ "[" ~ expression <~ "]" ^^ {
358358
case base ~ _ ~ ordinal => GetItem(base, ordinal)
359359
} |
360+
(expression <~ ".") ~ ident ^^ {
361+
case base ~ fieldName => GetField(base, fieldName)
362+
} |
360363
TRUE ^^^ Literal(true, BooleanType) |
361364
FALSE ^^^ Literal(false, BooleanType) |
362365
cast |
363366
"(" ~> expression <~ ")" |
364367
function |
365368
"-" ~> literal ^^ UnaryMinus |
369+
dotExpressionHeader |
366370
ident ^^ UnresolvedAttribute |
367371
"*" ^^^ Star(None) |
368372
literal
369373

374+
protected lazy val dotExpressionHeader: Parser[Expression] =
375+
(ident <~ ".") ~ ident ~ rep("." ~> ident) ^^ {
376+
case i1 ~ i2 ~ rest => UnresolvedAttribute(i1 + "." + i2 + rest.mkString(".", ".", ""))
377+
}
378+
370379
protected lazy val dataType: Parser[DataType] =
371380
STRING ^^^ StringType | TIMESTAMP ^^^ TimestampType
372381
}
@@ -380,7 +389,7 @@ class SqlLexical(val keywords: Seq[String]) extends StdLexical {
380389

381390
delimiters += (
382391
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
383-
",", ";", "%", "{", "}", ":", "[", "]"
392+
",", ";", "%", "{", "}", ":", "[", "]", "."
384393
)
385394

386395
override lazy val token: Parser[Token] = (
@@ -401,7 +410,7 @@ class SqlLexical(val keywords: Seq[String]) extends StdLexical {
401410
| failure("illegal character")
402411
)
403412

404-
override def identChar = letter | elem('_') | elem('.')
413+
override def identChar = letter | elem('_')
405414

406415
override def whitespace: Parser[Any] = rep(
407416
whitespaceChar

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
104104
case Seq((a, Nil)) => Some(a) // One match, no nested fields, use it.
105105
// One match, but we also need to extract the requested nested field.
106106
case Seq((a, nestedFields)) =>
107-
a.dataType match {
108-
case StructType(fields) =>
109-
Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)())
110-
case _ => None // Don't know how to resolve these field references
111-
}
107+
Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)())
112108
case Seq() => None // No matches.
113109
case ambiguousReferences =>
114110
throw new TreeNodeException(

sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,4 +581,18 @@ class JsonSuite extends QueryTest {
581581
"this is a simple string.") :: Nil
582582
)
583583
}
584+
585+
test("SPARK-2096 Correctly parse dot notations") {
586+
val jsonSchemaRDD = jsonRDD(complexFieldAndType2)
587+
jsonSchemaRDD.registerTempTable("jsonTable")
588+
589+
checkAnswer(
590+
sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"),
591+
(true, "str1") :: Nil
592+
)
593+
checkAnswer(
594+
sql("select complexArrayOfStruct[0].field1[1].inner2[0], complexArrayOfStruct[1].field2[0][1] from jsonTable"),
595+
("str2", 6) :: Nil
596+
)
597+
}
584598
}

sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,30 @@ object TestJsonData {
8282
"""{"c":[33, 44]}""" ::
8383
"""{"d":{"field":true}}""" ::
8484
"""{"e":"str"}""" :: Nil)
85+
86+
val complexFieldAndType2 =
87+
TestSQLContext.sparkContext.parallelize(
88+
"""{"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
89+
"complexArrayOfStruct": [
90+
{
91+
"field1": [
92+
{
93+
"inner1": "str1"
94+
},
95+
{
96+
"inner2": ["str2", "str22"]
97+
}],
98+
"field2": [[1, 2], [3, 4]]
99+
},
100+
{
101+
"field1": [
102+
{
103+
"inner2": ["str3", "str33"]
104+
},
105+
{
106+
"inner1": "str4"
107+
}],
108+
"field2": [[5, 6], [7, 8]]
109+
}]
110+
}""" :: Nil)
85111
}

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 24 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,14 @@
1717

1818
package org.apache.spark.sql.parquet
1919

20+
import org.apache.hadoop.fs.{FileSystem, Path}
21+
import org.apache.hadoop.mapreduce.Job
2022
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
21-
2223
import parquet.hadoop.ParquetFileWriter
2324
import parquet.hadoop.util.ContextUtil
24-
import org.apache.hadoop.fs.{FileSystem, Path}
25-
import org.apache.hadoop.mapreduce.Job
26-
27-
import org.apache.spark.SparkContext
2825
import org.apache.spark.sql._
29-
import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser}
30-
import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAttribute}
3126
import org.apache.spark.sql.catalyst.expressions._
32-
import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType}
27+
import org.apache.spark.sql.catalyst.types.IntegerType
3328
import org.apache.spark.sql.catalyst.util.getTempFilePath
3429
import org.apache.spark.sql.test.TestSQLContext
3530
import org.apache.spark.sql.test.TestSQLContext._
@@ -87,11 +82,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
8782

8883
var testRDD: SchemaRDD = null
8984

90-
// TODO: remove this once SqlParser can parse nested select statements
91-
var nestedParserSqlContext: NestedParserSQLContext = null
92-
9385
override def beforeAll() {
94-
nestedParserSqlContext = new NestedParserSQLContext(TestSQLContext.sparkContext)
9586
ParquetTestData.writeFile()
9687
ParquetTestData.writeFilterFile()
9788
ParquetTestData.writeNestedFile1()
@@ -718,11 +709,9 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
718709
}
719710

720711
test("Projection in addressbook") {
721-
val data = nestedParserSqlContext
722-
.parquetFile(ParquetTestData.testNestedDir1.toString)
723-
.toSchemaRDD
712+
val data = parquetFile(ParquetTestData.testNestedDir1.toString).toSchemaRDD
724713
data.registerTempTable("data")
725-
val query = nestedParserSqlContext.sql("SELECT owner, contacts[1].name FROM data")
714+
val query = sql("SELECT owner, contacts[1].name FROM data")
726715
val tmp = query.collect()
727716
assert(tmp.size === 2)
728717
assert(tmp(0).size === 2)
@@ -733,21 +722,19 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
733722
}
734723

735724
test("Simple query on nested int data") {
736-
val data = nestedParserSqlContext
737-
.parquetFile(ParquetTestData.testNestedDir2.toString)
738-
.toSchemaRDD
725+
val data = parquetFile(ParquetTestData.testNestedDir2.toString).toSchemaRDD
739726
data.registerTempTable("data")
740-
val result1 = nestedParserSqlContext.sql("SELECT entries[0].value FROM data").collect()
727+
val result1 = sql("SELECT entries[0].value FROM data").collect()
741728
assert(result1.size === 1)
742729
assert(result1(0).size === 1)
743730
assert(result1(0)(0) === 2.5)
744-
val result2 = nestedParserSqlContext.sql("SELECT entries[0] FROM data").collect()
731+
val result2 = sql("SELECT entries[0] FROM data").collect()
745732
assert(result2.size === 1)
746733
val subresult1 = result2(0)(0).asInstanceOf[CatalystConverter.StructScalaType[_]]
747734
assert(subresult1.size === 2)
748735
assert(subresult1(0) === 2.5)
749736
assert(subresult1(1) === false)
750-
val result3 = nestedParserSqlContext.sql("SELECT outerouter FROM data").collect()
737+
val result3 = sql("SELECT outerouter FROM data").collect()
751738
val subresult2 = result3(0)(0)
752739
.asInstanceOf[CatalystConverter.ArrayScalaType[_]](0)
753740
.asInstanceOf[CatalystConverter.ArrayScalaType[_]]
@@ -760,19 +747,18 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
760747
}
761748

762749
test("nested structs") {
763-
val data = nestedParserSqlContext
764-
.parquetFile(ParquetTestData.testNestedDir3.toString)
750+
val data = parquetFile(ParquetTestData.testNestedDir3.toString)
765751
.toSchemaRDD
766752
data.registerTempTable("data")
767-
val result1 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[0].value[0].truth FROM data").collect()
753+
val result1 = sql("SELECT booleanNumberPairs[0].value[0].truth FROM data").collect()
768754
assert(result1.size === 1)
769755
assert(result1(0).size === 1)
770756
assert(result1(0)(0) === false)
771-
val result2 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[0].value[1].truth FROM data").collect()
757+
val result2 = sql("SELECT booleanNumberPairs[0].value[1].truth FROM data").collect()
772758
assert(result2.size === 1)
773759
assert(result2(0).size === 1)
774760
assert(result2(0)(0) === true)
775-
val result3 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[1].value[0].truth FROM data").collect()
761+
val result3 = sql("SELECT booleanNumberPairs[1].value[0].truth FROM data").collect()
776762
assert(result3.size === 1)
777763
assert(result3(0).size === 1)
778764
assert(result3(0)(0) === false)
@@ -796,11 +782,9 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
796782
}
797783

798784
test("map with struct values") {
799-
val data = nestedParserSqlContext
800-
.parquetFile(ParquetTestData.testNestedDir4.toString)
801-
.toSchemaRDD
785+
val data = parquetFile(ParquetTestData.testNestedDir4.toString).toSchemaRDD
802786
data.registerTempTable("mapTable")
803-
val result1 = nestedParserSqlContext.sql("SELECT data2 FROM mapTable").collect()
787+
val result1 = sql("SELECT data2 FROM mapTable").collect()
804788
assert(result1.size === 1)
805789
val entry1 = result1(0)(0)
806790
.asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
@@ -814,7 +798,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
814798
assert(entry2 != null)
815799
assert(entry2(0) === 49)
816800
assert(entry2(1) === null)
817-
val result2 = nestedParserSqlContext.sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM mapTable""").collect()
801+
val result2 = sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM mapTable""").collect()
818802
assert(result2.size === 1)
819803
assert(result2(0)(0) === 42.toLong)
820804
assert(result2(0)(1) === "the answer")
@@ -825,15 +809,12 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
825809
// has no effect in this test case
826810
val tmpdir = Utils.createTempDir()
827811
Utils.deleteRecursively(tmpdir)
828-
val result = nestedParserSqlContext
829-
.parquetFile(ParquetTestData.testNestedDir1.toString)
830-
.toSchemaRDD
812+
val result = parquetFile(ParquetTestData.testNestedDir1.toString).toSchemaRDD
831813
result.saveAsParquetFile(tmpdir.toString)
832-
nestedParserSqlContext
833-
.parquetFile(tmpdir.toString)
814+
parquetFile(tmpdir.toString)
834815
.toSchemaRDD
835816
.registerTempTable("tmpcopy")
836-
val tmpdata = nestedParserSqlContext.sql("SELECT owner, contacts[1].name FROM tmpcopy").collect()
817+
val tmpdata = sql("SELECT owner, contacts[1].name FROM tmpcopy").collect()
837818
assert(tmpdata.size === 2)
838819
assert(tmpdata(0).size === 2)
839820
assert(tmpdata(0)(0) === "Julien Le Dem")
@@ -844,20 +825,17 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
844825
}
845826

846827
test("Writing out Map and reading it back in") {
847-
val data = nestedParserSqlContext
848-
.parquetFile(ParquetTestData.testNestedDir4.toString)
849-
.toSchemaRDD
828+
val data = parquetFile(ParquetTestData.testNestedDir4.toString).toSchemaRDD
850829
val tmpdir = Utils.createTempDir()
851830
Utils.deleteRecursively(tmpdir)
852831
data.saveAsParquetFile(tmpdir.toString)
853-
nestedParserSqlContext
854-
.parquetFile(tmpdir.toString)
832+
parquetFile(tmpdir.toString)
855833
.toSchemaRDD
856834
.registerTempTable("tmpmapcopy")
857-
val result1 = nestedParserSqlContext.sql("""SELECT data1["key2"] FROM tmpmapcopy""").collect()
835+
val result1 = sql("""SELECT data1["key2"] FROM tmpmapcopy""").collect()
858836
assert(result1.size === 1)
859837
assert(result1(0)(0) === 2)
860-
val result2 = nestedParserSqlContext.sql("SELECT data2 FROM tmpmapcopy").collect()
838+
val result2 = sql("SELECT data2 FROM tmpmapcopy").collect()
861839
assert(result2.size === 1)
862840
val entry1 = result2(0)(0)
863841
.asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]]
@@ -871,42 +849,10 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
871849
assert(entry2 != null)
872850
assert(entry2(0) === 49)
873851
assert(entry2(1) === null)
874-
val result3 = nestedParserSqlContext.sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM tmpmapcopy""").collect()
852+
val result3 = sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM tmpmapcopy""").collect()
875853
assert(result3.size === 1)
876854
assert(result3(0)(0) === 42.toLong)
877855
assert(result3(0)(1) === "the answer")
878856
Utils.deleteRecursively(tmpdir)
879857
}
880858
}
881-
882-
// TODO: the code below is needed temporarily until the standard parser is able to parse
883-
// nested field expressions correctly
884-
class NestedParserSQLContext(@transient override val sparkContext: SparkContext) extends SQLContext(sparkContext) {
885-
override protected[sql] val parser = new NestedSqlParser()
886-
}
887-
888-
class NestedSqlLexical(override val keywords: Seq[String]) extends SqlLexical(keywords) {
889-
override def identChar = letter | elem('_')
890-
delimiters += (".")
891-
}
892-
893-
class NestedSqlParser extends SqlParser {
894-
override val lexical = new NestedSqlLexical(reservedWords)
895-
896-
override protected lazy val baseExpression: PackratParser[Expression] =
897-
expression ~ "[" ~ expression <~ "]" ^^ {
898-
case base ~ _ ~ ordinal => GetItem(base, ordinal)
899-
} |
900-
expression ~ "." ~ ident ^^ {
901-
case base ~ _ ~ fieldName => GetField(base, fieldName)
902-
} |
903-
TRUE ^^^ Literal(true, BooleanType) |
904-
FALSE ^^^ Literal(false, BooleanType) |
905-
cast |
906-
"(" ~> expression <~ ")" |
907-
function |
908-
"-" ~> literal ^^ UnaryMinus |
909-
ident ^^ UnresolvedAttribute |
910-
"*" ^^^ Star(None) |
911-
literal
912-
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717

1818
package org.apache.spark.sql.hive.execution
1919

20-
import scala.reflect.ClassTag
21-
22-
import org.apache.spark.sql.{SQLConf, QueryTest}
23-
import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
24-
import org.apache.spark.sql.hive.test.TestHive
20+
import org.apache.spark.sql.QueryTest
2521
import org.apache.spark.sql.hive.test.TestHive._
2622

23+
case class Nested1(f1: Nested2)
24+
case class Nested2(f2: Nested3)
25+
case class Nested3(f3: Int)
26+
2727
/**
2828
* A collection of hive query tests where we generate the answers ourselves instead of depending on
2929
* Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is
@@ -47,4 +47,11 @@ class SQLQuerySuite extends QueryTest {
4747
GROUP BY key, value
4848
ORDER BY value) a""").collect().toSeq)
4949
}
50+
51+
test("double nested data") {
52+
sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil).registerTempTable("nested")
53+
checkAnswer(
54+
sql("SELECT f1.f2.f3 FROM nested"),
55+
1)
56+
}
5057
}

0 commit comments

Comments
 (0)