diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 0da2baf24fbcb..bbcff4949ae87 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -93,7 +93,7 @@ class JacksonParser( new NoopFilters } (parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, st) { - case START_OBJECT => convertObject(parser, st, fieldConverters, jsonFilters) + case START_OBJECT => convertObject(parser, st, fieldConverters, jsonFilters, isRoot = true) // SPARK-3308: support reading top level JSON arrays and take every element // in such an array as a row // @@ -383,7 +383,8 @@ class JacksonParser( parser: JsonParser, schema: StructType, fieldConverters: Array[ValueConverter], - structFilters: StructFilters = new NoopFilters()): Option[InternalRow] = { + structFilters: StructFilters = new NoopFilters(), + isRoot: Boolean = false): Option[InternalRow] = { val row = new GenericInternalRow(schema.length) var badRecordException: Option[Throwable] = None var skipRow = false @@ -397,7 +398,7 @@ class JacksonParser( skipRow = structFilters.skipRow(row, index) } catch { case e: SparkUpgradeException => throw e - case NonFatal(e) => + case NonFatal(e) if isRoot => badRecordException = badRecordException.orElse(Some(e)) parser.skipChildren() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 03b48451c7495..5a1a3550d855b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -733,4 +733,18 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { | """.stripMargin) checkAnswer(toDF("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), toDF("yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]")) } + + test("SPARK-33134: return partial results only for root JSON objects") { + val st = new StructType() + .add("c1", LongType) + .add("c2", ArrayType(new StructType().add("c3", LongType).add("c4", StringType))) + val df1 = Seq("""{"c2": [19], "c1": 123456}""").toDF("c0") + checkAnswer(df1.select(from_json($"c0", st)), Row(Row(123456, null))) + val df2 = Seq("""{"data": {"c2": [19], "c1": 123456}}""").toDF("c0") + checkAnswer(df2.select(from_json($"c0", new StructType().add("data", st))), Row(Row(null))) + val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0") + checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(null)) + val df4 = Seq("""{"c2": [19]}""").toDF("c0") + checkAnswer(df4.select(from_json($"c0", MapType(StringType, st))), Row(null)) + } }