Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,21 @@ private[json] object InferSchema {
// record fields' types have been combined.
NullType

case VALUE_STRING => StringType
case VALUE_STRING =>
// When JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS is enabled,
// we need to do special handling for quoted non-numeric numbers.
if (configOptions.allowNonNumericNumbers) {
val value = parser.getText
val lowerCaseValue = value.toLowerCase()
if (lowerCaseValue.equals("nan") ||
lowerCaseValue.equals("infinity") ||
lowerCaseValue.equals("-infinity") ||
lowerCaseValue.equals("inf") ||
lowerCaseValue.equals("-inf")) {
return DoubleType
}
}
StringType
case START_OBJECT =>
val builder = Seq.newBuilder[StructField]
while (nextUntil(parser, END_OBJECT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ case class JSONOptions(
allowUnquotedFieldNames: Boolean = false,
allowSingleQuotes: Boolean = true,
allowNumericLeadingZeros: Boolean = false,
allowNonNumericNumbers: Boolean = false) {
allowNonNumericNumbers: Boolean = true) {

/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,16 @@ object JacksonParser {
def convertField(
factory: JsonFactory,
parser: JsonParser,
schema: DataType): Any = {
schema: DataType,
configOptions: JSONOptions): Any = {
import com.fasterxml.jackson.core.JsonToken._
(parser.getCurrentToken, schema) match {
case (null | VALUE_NULL, _) =>
null

case (FIELD_NAME, _) =>
parser.nextToken()
convertField(factory, parser, schema)
convertField(factory, parser, schema, configOptions)

case (VALUE_STRING, StringType) =>
UTF8String.fromString(parser.getText)
Expand Down Expand Up @@ -100,34 +101,42 @@ object JacksonParser {
parser.getFloatValue

case (VALUE_STRING, FloatType) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we removing the special handling for float types here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, should revert it back. BTW, do we actually test "inf" and "-inf" before? Because "inf".toFloat is not legal.

// Special case handling for NaN and Infinity.
val value = parser.getText
val lowerCaseValue = value.toLowerCase()
if (lowerCaseValue.equals("nan") ||
lowerCaseValue.equals("infinity") ||
lowerCaseValue.equals("-infinity") ||
lowerCaseValue.equals("inf") ||
lowerCaseValue.equals("-inf")) {
value.toFloat
// Special case handling for quoted non-numeric numbers.
if (configOptions.allowNonNumericNumbers) {
val value = parser.getText
val lowerCaseValue = value.toLowerCase()
if (lowerCaseValue.equals("nan") ||
lowerCaseValue.equals("infinity") ||
lowerCaseValue.equals("-infinity") ||
lowerCaseValue.equals("inf") ||
lowerCaseValue.equals("-inf")) {
value.toFloat
} else {
sys.error(s"Cannot parse $value as FloatType.")
}
} else {
sys.error(s"Cannot parse $value as FloatType.")
parser.getFloatValue
}

case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, DoubleType) =>
parser.getDoubleValue

case (VALUE_STRING, DoubleType) =>
// Special case handling for NaN and Infinity.
val value = parser.getText
val lowerCaseValue = value.toLowerCase()
if (lowerCaseValue.equals("nan") ||
lowerCaseValue.equals("infinity") ||
lowerCaseValue.equals("-infinity") ||
lowerCaseValue.equals("inf") ||
lowerCaseValue.equals("-inf")) {
value.toDouble
// Special case handling for quoted non-numeric numbers.
if (configOptions.allowNonNumericNumbers) {
val value = parser.getText
val lowerCaseValue = value.toLowerCase()
if (lowerCaseValue.equals("nan") ||
lowerCaseValue.equals("infinity") ||
lowerCaseValue.equals("-infinity") ||
lowerCaseValue.equals("inf") ||
lowerCaseValue.equals("-inf")) {
value.toDouble
} else {
sys.error(s"Cannot parse $value as DoubleType.")
}
} else {
sys.error(s"Cannot parse $value as DoubleType.")
parser.getDoubleValue
}

case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, dt: DecimalType) =>
Expand All @@ -152,26 +161,26 @@ object JacksonParser {
false

case (START_OBJECT, st: StructType) =>
convertObject(factory, parser, st)
convertObject(factory, parser, st, configOptions)

case (START_ARRAY, st: StructType) =>
// SPARK-3308: support reading top level JSON arrays and take every element
// in such an array as a row
convertArray(factory, parser, st)
convertArray(factory, parser, st, configOptions)

case (START_ARRAY, ArrayType(st, _)) =>
convertArray(factory, parser, st)
convertArray(factory, parser, st, configOptions)

case (START_OBJECT, ArrayType(st, _)) =>
// the business end of SPARK-3308:
// when an object is found but an array is requested just wrap it in a list
convertField(factory, parser, st) :: Nil
convertField(factory, parser, st, configOptions) :: Nil

case (START_OBJECT, MapType(StringType, kt, _)) =>
convertMap(factory, parser, kt)
convertMap(factory, parser, kt, configOptions)

case (_, udt: UserDefinedType[_]) =>
convertField(factory, parser, udt.sqlType)
convertField(factory, parser, udt.sqlType, configOptions)

case (token, dataType) =>
sys.error(s"Failed to parse a value for data type $dataType (current token: $token).")
Expand All @@ -186,12 +195,13 @@ object JacksonParser {
private def convertObject(
factory: JsonFactory,
parser: JsonParser,
schema: StructType): InternalRow = {
schema: StructType,
configOptions: JSONOptions): InternalRow = {
val row = new GenericMutableRow(schema.length)
while (nextUntil(parser, JsonToken.END_OBJECT)) {
schema.getFieldIndex(parser.getCurrentName) match {
case Some(index) =>
row.update(index, convertField(factory, parser, schema(index).dataType))
row.update(index, convertField(factory, parser, schema(index).dataType, configOptions))

case None =>
parser.skipChildren()
Expand All @@ -207,23 +217,25 @@ object JacksonParser {
private def convertMap(
factory: JsonFactory,
parser: JsonParser,
valueType: DataType): MapData = {
valueType: DataType,
configOptions: JSONOptions): MapData = {
val keys = ArrayBuffer.empty[UTF8String]
val values = ArrayBuffer.empty[Any]
while (nextUntil(parser, JsonToken.END_OBJECT)) {
keys += UTF8String.fromString(parser.getCurrentName)
values += convertField(factory, parser, valueType)
values += convertField(factory, parser, valueType, configOptions)
}
ArrayBasedMapData(keys.toArray, values.toArray)
}

private def convertArray(
factory: JsonFactory,
parser: JsonParser,
elementType: DataType): ArrayData = {
elementType: DataType,
configOptions: JSONOptions): ArrayData = {
val values = ArrayBuffer.empty[Any]
while (nextUntil(parser, JsonToken.END_ARRAY)) {
values += convertField(factory, parser, elementType)
values += convertField(factory, parser, elementType, configOptions)
}

new GenericArrayData(values.toArray)
Expand Down Expand Up @@ -257,7 +269,7 @@ object JacksonParser {
Utils.tryWithResource(factory.createParser(record)) { parser =>
parser.nextToken()

convertField(factory, parser, schema) match {
convertField(factory, parser, schema, configOptions) match {
case null => failedRecord(record)
case row: InternalRow => row :: Nil
case array: ArrayData =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,41 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
assert(df.first().getLong(0) == 18)
}

// The following two tests are not really working - need to look into Jackson's
// JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS.
ignore("allowNonNumericNumbers off") {
val str = """{"age": NaN}"""
val rdd = sqlContext.sparkContext.parallelize(Seq(str))
val df = sqlContext.read.json(rdd)
test("allowNonNumericNumbers off") {
var testCases: Seq[String] = Seq("""{"age": NaN}""", """{"age": Infinity}""",
"""{"age": -Infinity}""")

assert(df.schema.head.name == "_corrupt_record")
testCases.foreach { str =>
val rdd = sqlContext.sparkContext.parallelize(Seq(str))
val df = sqlContext.read.option("allowNonNumericNumbers", "false").json(rdd)

assert(df.schema.head.name == "_corrupt_record")
}

testCases = Seq("""{"age": "NaN"}""", """{"age": "Infinity"}""",
"""{"age": "-Infinity"}""")

testCases.foreach { str =>
val rdd = sqlContext.sparkContext.parallelize(Seq(str))
val df = sqlContext.read.option("allowNonNumericNumbers", "false").json(rdd)

assert(df.schema.head.name == "age")
}
}

ignore("allowNonNumericNumbers on") {
val str = """{"age": NaN}"""
val rdd = sqlContext.sparkContext.parallelize(Seq(str))
val df = sqlContext.read.option("allowNonNumericNumbers", "true").json(rdd)
test("allowNonNumericNumbers on") {
val testCases: Seq[String] = Seq("""{"age": NaN}""", """{"age": Infinity}""",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we still read them if they are quoted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, so if we don't set JsonGenerator.Feature.QUOTE_NON_NUMERIC_NUMBERS to false, we can't read them normally.

"""{"age": -Infinity}""", """{"age": "NaN"}""", """{"age": "Infinity"}""",
"""{"age": "-Infinity"}""")
val tests: Seq[Double => Boolean] = Seq(_.isNaN, _.isPosInfinity, _.isNegInfinity,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, I found that "Inf", "-Inf" seems not working even JsonGenerator.Feature.QUOTE_NON_NUMERIC_NUMBERS is enabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to upgrade jackson library version in order to support "INF" and "-INF" (case-sensitive).

_.isNaN, _.isPosInfinity, _.isNegInfinity)

assert(df.schema.head.name == "age")
assert(df.first().getDouble(0).isNaN)
testCases.zipWithIndex.foreach { case (str, idx) =>
val rdd = sqlContext.sparkContext.parallelize(Seq(str))
val df = sqlContext.read.json(rdd)

assert(df.schema.head.name == "age")
assert(tests(idx)(df.first().getDouble(0)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {

Utils.tryWithResource(factory.createParser(writer.toString)) { parser =>
parser.nextToken()
JacksonParser.convertField(factory, parser, dataType)
JacksonParser.convertField(factory, parser, dataType, JSONOptions())
}
}

Expand Down