From 51d00a3162eb20e7bd49148b3894287bb77307d4 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sun, 17 Jun 2018 14:55:07 -0700 Subject: [PATCH] update impl --- .../sql/catalyst/json/JacksonParser.scala | 5 -- .../datasources/json/JsonInferSchema.scala | 47 +++++++------------ .../datasources/json/JsonSuite.scala | 18 +++---- 3 files changed, 24 insertions(+), 46 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index f4316b90613cc..a5a4a13eb608b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -234,11 +234,6 @@ class JacksonParser( case udt: UserDefinedType[_] => makeConverter(udt.sqlType) - case _: NullType if options.dropFieldIfAllNull => - (parser: JsonParser) => parseJsonToken[Null](parser, dataType) { - case _ => null - } - case _ => (parser: JsonParser) => // Here, we pass empty `PartialFunction` so that this case can be diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala index 2f165a80884fc..97ed1dc35c97c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala @@ -176,44 +176,33 @@ private[sql] object JsonInferSchema { } /** - * Canonicalize inferred types, e.g., convert NullType to StringType and remove StructTypes - * with no fields. + * Recursively canonicalizes inferred types, e.g., removes StructTypes with no fields, + * drops NullTypes or converts them to StringType based on provided options. */ private def canonicalizeType(tpe: DataType, options: JSONOptions): Option[DataType] = tpe match { - case at @ ArrayType(elementType, _) => - val canonicalizeArrayOption = for { - canonicalType <- canonicalizeType(elementType, options) - } yield { - at.copy(canonicalType) - } - - canonicalizeArrayOption.map { array => - if (options.dropFieldIfAllNull && array.elementType == NullType) { - NullType - } else { - array - } - } + case at: ArrayType => + canonicalizeType(at.elementType, options) + .map(t => at.copy(elementType = t)) case StructType(fields) => - val canonicalFields: Array[StructField] = for { - field <- fields - if field.name.length > 0 - canonicalType <- canonicalizeType(field.dataType, options) - } yield { - field.copy(dataType = canonicalType) + val canonicalFields = fields.filter(_.name.nonEmpty).flatMap { f => + canonicalizeType(f.dataType, options) + .map(t => f.copy(dataType = t)) } - - if (canonicalFields.length > 0) { - Some(StructType(canonicalFields)) - } else if (options.dropFieldIfAllNull) { - Some(NullType) + // SPARK-8093: empty structs should be deleted + if (canonicalFields.isEmpty) { + None } else { - // per SPARK-8093: empty structs should be deleted + Some(StructType(canonicalFields)) + } + + case NullType => + if (options.dropFieldIfAllNull) { None + } else { + Some(StringType) } - case NullType if !options.dropFieldIfAllNull => Some(StringType) case other => Some(other) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 72fddeaa1a0a9..0e4523bfe088c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2423,10 +2423,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .option("dropFieldIfAllNull", true) .load(path) var expectedSchema = new StructType() - .add("a", NullType).add("b", LongType).add("c", StringType) + .add("b", LongType).add("c", StringType) assert(df.schema === expectedSchema) - checkAnswer(df, Row(null, 1, "3.0") :: Row(null, null, "string") :: Row(null, null, null) - :: Nil) + checkAnswer(df, Row(1, "3.0") :: Row(null, "string") :: Row(null, null) :: Nil) // arrays Seq( @@ -2438,17 +2437,14 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .option("dropFieldIfAllNull", true) .load(path) expectedSchema = new StructType() - .add("a", ArrayType(LongType)).add("b", NullType).add("c", NullType).add("d", NullType) - .add("e", NullType) + .add("a", ArrayType(LongType)) assert(df.schema === expectedSchema) - checkAnswer(df, Row(Array(2, 1), null, null, null, null) :: - Row(Array(null), null, null, null, null) :: - Row(null, null, null, null, null) :: Nil) + checkAnswer(df, Row(Array(2, 1)) :: Row(Array(null)) :: Row(null) :: Nil) // structs Seq( """{"a":{"a1": 1, "a2":"string"}, "b":{}}""", - """{"a":{"a1": 2, "a2":null}, "b":{}}""", + """{"a":{"a1": 2, "a2":null}, "b":{"b1":[null]}}""", """{"a":null, "b":null}""") .toDS().write.mode("overwrite").text(path) df = spark.read.format("json") @@ -2457,10 +2453,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { expectedSchema = new StructType() .add("a", StructType(StructField("a1", LongType) :: StructField("a2", StringType) :: Nil)) - .add("b", NullType) assert(df.schema === expectedSchema) - checkAnswer(df, Row(Row(1, "string"), null) :: Row(Row(2, null), null) :: - Row(null, null) :: Nil) + checkAnswer(df, Row(Row(1, "string")) :: Row(Row(2, null)) :: Row(null) :: Nil) } } }