Skip to content

Commit 4c4292c

Browse files
committed
change underlying type of JavaSchemaRDD as scala
1 parent bb0508f commit 4c4292c

File tree

3 files changed

+40
-2
lines changed

3 files changed

+40
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
2424
import org.apache.spark.annotation.{DeveloperApi, Experimental}
2525
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
2626
import org.apache.spark.sql.json.JsonRDD
27+
import org.apache.spark.sql.types.util.DataTypeConversions
2728
import org.apache.spark.sql.{SQLContext, StructType => SStructType}
2829
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
2930
import org.apache.spark.sql.parquet.ParquetRelation
@@ -97,7 +98,9 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
9798
localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod)
9899

99100
iter.map { row =>
100-
new GenericRow(extractors.map(e => e.invoke(row)).toArray[Any]): ScalaRow
101+
new GenericRow(
102+
extractors.map(e => DataTypeConversions.convertJavaToCatalyst(e.invoke(row))).toArray[Any]
103+
): ScalaRow
101104
}
102105
}
103106
new JavaSchemaRDD(sqlContext, LogicalRDD(schema, rowRdd)(sqlContext))

sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,16 @@ protected[sql] object DataTypeConversions {
110110
case structType: org.apache.spark.sql.api.java.StructType =>
111111
StructType(structType.getFields.map(asScalaStructField))
112112
}
113+
114+
/** Converts Java objects to catalyst rows / types */
115+
def convertJavaToCatalyst(a: Any): Any = a match {
116+
case d: java.math.BigDecimal => BigDecimal(d)
117+
case other => other
118+
}
119+
120+
/** Converts Java objects to catalyst rows / types */
121+
def convertCatalystToJava(a: Any): Any = a match {
122+
case d: scala.math.BigDecimal => d.underlying()
123+
case other => other
124+
}
113125
}

sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,30 @@ class JavaSQLSuite extends FunSuite {
9292
|FROM allTypes
9393
""".stripMargin).collect.head.row ===
9494
Seq("", 0, 0L, 0F, 0.0, 0.toShort, 0.toByte, false, java.sql.Date.valueOf("2014-10-10"),
95-
java.sql.Timestamp.valueOf("2014-10-10 00:00:00.0"), new java.math.BigDecimal(0)))
95+
java.sql.Timestamp.valueOf("2014-10-10 00:00:00.0"), scala.math.BigDecimal(0)))
96+
}
97+
98+
test("decimal types in JavaBeans") {
99+
val bean = new AllTypesBean
100+
bean.setStringField("")
101+
bean.setIntField(0)
102+
bean.setLongField(0)
103+
bean.setFloatField(0.0F)
104+
bean.setDoubleField(0.0)
105+
bean.setShortField(0.toShort)
106+
bean.setByteField(0.toByte)
107+
bean.setBooleanField(false)
108+
bean.setDateField(java.sql.Date.valueOf("2014-10-10"))
109+
bean.setTimestampField(java.sql.Timestamp.valueOf("2014-10-10 00:00:00.0"))
110+
bean.setBigDecimalField(new java.math.BigDecimal(0))
111+
112+
val rdd = javaCtx.parallelize(bean :: Nil)
113+
val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[AllTypesBean])
114+
schemaRDD.registerTempTable("decimalTypes")
115+
116+
assert(javaSqlCtx.sql(
117+
"select bigDecimalField + bigDecimalField from decimalTypes"
118+
).collect.head.row === Seq(scala.math.BigDecimal(0)))
96119
}
97120

98121
test("all types null in JavaBeans") {

0 commit comments

Comments
 (0)