Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.json
import scala.collection.Map
import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper}
import scala.math.BigDecimal
import java.sql.Timestamp

import com.fasterxml.jackson.databind.ObjectMapper

Expand Down Expand Up @@ -361,6 +362,14 @@ private[sql] object JsonRDD extends Logging {
}
}

private def toTimestamp(value: Any): Timestamp = {
value match {
case value: java.lang.Integer => new Timestamp(value.asInstanceOf[Int].toLong)
case value: java.lang.Long => new Timestamp(value)
case value: java.lang.String => Timestamp.valueOf(value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually one more thing. Will this handle null values? I think you'll get a match error.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't null being checked for at the beginning enforceCorrectType? Though, I can add it here if you think it is needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. Cool.

}
}

private[json] def enforceCorrectType(value: Any, desiredType: DataType): Any ={
if (value == null) {
null
Expand All @@ -377,6 +386,7 @@ private[sql] object JsonRDD extends Logging {
case ArrayType(elementType, _) =>
value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct)
case TimestampType => toTimestamp(value)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.apache.spark.sql.json.JsonRDD.{enforceCorrectType, compatibleType}
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.TestSQLContext._

import java.sql.Timestamp

class JsonSuite extends QueryTest {
import TestJsonData._
TestJsonData
Expand Down Expand Up @@ -50,6 +52,12 @@ class JsonSuite extends QueryTest {
val doubleNumber: Double = 1.7976931348623157E308d
checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, DoubleType))
checkTypePromotion(BigDecimal(doubleNumber), enforceCorrectType(doubleNumber, DecimalType))

checkTypePromotion(new Timestamp(intNumber), enforceCorrectType(intNumber, TimestampType))
checkTypePromotion(new Timestamp(intNumber.toLong),
enforceCorrectType(intNumber.toLong, TimestampType))
val strDate = "2014-09-30 12:34:56"
checkTypePromotion(Timestamp.valueOf(strDate), enforceCorrectType(strDate, TimestampType))
}

test("Get compatible type") {
Expand Down