Skip to content

Commit 5d981a4

Browse files
ueshinmarmbrus
authored andcommitted
[SPARK-3063][SQL] ExistingRdd should convert Map to catalyst Map.
Currently `ExistingRdd.convertToCatalyst` doesn't convert `Map` value. Author: Takuya UESHIN <[email protected]> Closes #1963 from ueshin/issues/SPARK-3063 and squashes the following commits: 3ba41f2 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 4d7bae2 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 9321379 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 d8a900a [Takuya UESHIN] Make ExistingRdd.convertToCatalyst be able to convert Map value. (cherry picked from commit 6b5584e) Signed-off-by: Michael Armbrust <[email protected]>
1 parent 35a5853 commit 5d981a4

File tree

2 files changed

+48
-1
lines changed

2 files changed

+48
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,8 @@ case class Sort(
206206
object ExistingRdd {
207207
def convertToCatalyst(a: Any): Any = a match {
208208
case o: Option[_] => o.orNull
209-
case s: Seq[Any] => s.map(convertToCatalyst)
209+
case s: Seq[_] => s.map(convertToCatalyst)
210+
case m: Map[_, _] => m.map { case (k, v) => convertToCatalyst(k) -> convertToCatalyst(v) }
210211
case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray)
211212
case other => other
212213
}

sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.sql.Timestamp
2121

2222
import org.scalatest.FunSuite
2323

24+
import org.apache.spark.sql.catalyst.expressions._
2425
import org.apache.spark.sql.test.TestSQLContext._
2526

2627
case class ReflectData(
@@ -56,6 +57,22 @@ case class OptionalReflectData(
5657

5758
case class ReflectBinary(data: Array[Byte])
5859

60+
case class Nested(i: Option[Int], s: String)
61+
62+
case class Data(
63+
array: Seq[Int],
64+
arrayContainsNull: Seq[Option[Int]],
65+
map: Map[Int, Long],
66+
mapContainsNul: Map[Int, Option[Long]],
67+
nested: Nested)
68+
69+
case class ComplexReflectData(
70+
arrayField: Seq[Int],
71+
arrayFieldContainsNull: Seq[Option[Int]],
72+
mapField: Map[Int, Long],
73+
mapFieldContainsNull: Map[Int, Option[Long]],
74+
dataField: Data)
75+
5976
class ScalaReflectionRelationSuite extends FunSuite {
6077
test("query case class RDD") {
6178
val data = ReflectData("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true,
@@ -90,4 +107,33 @@ class ScalaReflectionRelationSuite extends FunSuite {
90107
val result = sql("SELECT data FROM reflectBinary").collect().head(0).asInstanceOf[Array[Byte]]
91108
assert(result.toSeq === Seq[Byte](1))
92109
}
110+
111+
test("query complex data") {
112+
val data = ComplexReflectData(
113+
Seq(1, 2, 3),
114+
Seq(Some(1), Some(2), None),
115+
Map(1 -> 10L, 2 -> 20L),
116+
Map(1 -> Some(10L), 2 -> Some(20L), 3 -> None),
117+
Data(
118+
Seq(10, 20, 30),
119+
Seq(Some(10), Some(20), None),
120+
Map(10 -> 100L, 20 -> 200L),
121+
Map(10 -> Some(100L), 20 -> Some(200L), 30 -> None),
122+
Nested(None, "abc")))
123+
val rdd = sparkContext.parallelize(data :: Nil)
124+
rdd.registerTempTable("reflectComplexData")
125+
126+
assert(sql("SELECT * FROM reflectComplexData").collect().head ===
127+
new GenericRow(Array[Any](
128+
Seq(1, 2, 3),
129+
Seq(1, 2, null),
130+
Map(1 -> 10L, 2 -> 20L),
131+
Map(1 -> 10L, 2 -> 20L, 3 -> null),
132+
new GenericRow(Array[Any](
133+
Seq(10, 20, 30),
134+
Seq(10, 20, null),
135+
Map(10 -> 100L, 20 -> 200L),
136+
Map(10 -> 100L, 20 -> 200L, 30 -> null),
137+
new GenericRow(Array[Any](null, "abc")))))))
138+
}
93139
}

0 commit comments

Comments
 (0)