Skip to content

Commit 78b0cbe

Browse files
stczwdcloud-fan
authored andcommitted
[SPARK-29444] Add configuration to support JacksonGenrator to keep fields with null values
### Why are the changes needed? As mentioned in jira, sometimes we need to be able to support the retention of null columns when writing JSON. For example, sparkmagic(used widely in jupyter with livy) will generate sql query results based on DataSet.toJSON and parse JSON to pandas DataFrame to display. If there is a null column, it is easy to have some column missing or even the query result is empty. The loss of the null column in the first row, may cause parsing exceptions or loss of entire column data. ### Does this PR introduce any user-facing change? Example in spark-shell. scala> spark.sql("select null as a, 1 as b").toJSON.collect.foreach(println) {"b":1} scala> spark.sql("set spark.sql.jsonGenerator.struct.ignore.null=false") res2: org.apache.spark.sql.DataFrame = [key: string, value: string] scala> spark.sql("select null as a, 1 as b").toJSON.collect.foreach(println) {"a":null,"b":1} ### How was this patch tested? Add new test to JacksonGeneratorSuite Closes apache#26098 from stczwd/json. Lead-authored-by: stczwd <[email protected]> Co-authored-by: Jackey Lee <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent ec5d698 commit 78b0cbe

File tree

4 files changed

+43
-0
lines changed

4 files changed

+43
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
2525

2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.catalyst.util._
28+
import org.apache.spark.sql.internal.SQLConf
2829

2930
/**
3031
* Options for parsing JSON data into Spark SQL rows.
@@ -76,6 +77,10 @@ private[sql] class JSONOptions(
7677
// Whether to ignore column of all null values or empty array/struct during schema inference
7778
val dropFieldIfAllNull = parameters.get("dropFieldIfAllNull").map(_.toBoolean).getOrElse(false)
7879

80+
// Whether to ignore null fields during json generating
81+
val ignoreNullFields = parameters.getOrElse("ignoreNullFields",
82+
SQLConf.get.jsonGeneratorIgnoreNullFields).toBoolean
83+
7984
// A language tag in IETF BCP 47 format
8085
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)
8186

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@ private[sql] class JacksonGenerator(
184184
if (!row.isNullAt(i)) {
185185
gen.writeFieldName(field.name)
186186
fieldWriters(i).apply(row, i)
187+
} else if (!options.ignoreNullFields) {
188+
gen.writeFieldName(field.name)
189+
gen.writeNull()
187190
}
188191
i += 1
189192
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1187,6 +1187,12 @@ object SQLConf {
11871187
.booleanConf
11881188
.createWithDefault(true)
11891189

1190+
val JSON_GENERATOR_IGNORE_NULL_FIELDS =
1191+
buildConf("spark.sql.jsonGenerator.ignoreNullFields")
1192+
.doc("If false, JacksonGenerator will generate null for null fields in Struct.")
1193+
.stringConf
1194+
.createWithDefault("true")
1195+
11901196
val FILE_SINK_LOG_DELETION = buildConf("spark.sql.streaming.fileSink.log.deletion")
11911197
.internal()
11921198
.doc("Whether to delete the expired log files in file stream sink.")
@@ -2379,6 +2385,8 @@ class SQLConf extends Serializable with Logging {
23792385

23802386
def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
23812387

2388+
def jsonGeneratorIgnoreNullFields: String = getConf(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS)
2389+
23822390
def parallelFileListingInStatsComputation: Boolean =
23832391
getConf(SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION)
23842392

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,33 @@ class JacksonGeneratorSuite extends SparkFunSuite {
3939
assert(writer.toString === """{"a":1}""")
4040
}
4141

42+
test("SPARK-29444: initial with StructType and write out an empty row " +
43+
"with ignoreNullFields=false") {
44+
val dataType = StructType(StructField("a", IntegerType) :: Nil)
45+
val input = InternalRow(null)
46+
val writer = new CharArrayWriter()
47+
val allowNullOption =
48+
new JSONOptions(Map("ignoreNullFields" -> "false"), gmtId)
49+
val gen = new JacksonGenerator(dataType, writer, allowNullOption)
50+
gen.write(input)
51+
gen.flush()
52+
assert(writer.toString === """{"a":null}""")
53+
}
54+
55+
test("SPARK-29444: initial with StructType field and write out a row " +
56+
"with ignoreNullFields=false and struct inner null") {
57+
val fieldType = StructType(StructField("b", IntegerType) :: Nil)
58+
val dataType = StructType(StructField("a", fieldType) :: Nil)
59+
val input = InternalRow(InternalRow(null))
60+
val writer = new CharArrayWriter()
61+
val allowNullOption =
62+
new JSONOptions(Map("ignoreNullFields" -> "false"), gmtId)
63+
val gen = new JacksonGenerator(dataType, writer, allowNullOption)
64+
gen.write(input)
65+
gen.flush()
66+
assert(writer.toString === """{"a":{"b":null}}""")
67+
}
68+
4269
test("initial with StructType and write out rows") {
4370
val dataType = StructType(StructField("a", IntegerType) :: Nil)
4471
val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil)

0 commit comments

Comments
 (0)