Skip to content

Commit cf7ab0a

Browse files
robert3005bulldozer-bot[bot]
authored andcommitted
Revert "[SPARK-25040][SQL] Empty string for non string types should be disallowed" (apache#528)
1 parent c901250 commit cf7ab0a

File tree

3 files changed

+13
-49
lines changed

3 files changed

+13
-49
lines changed

docs/sql-migration-guide-upgrade.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ displayTitle: Spark SQL Upgrading Guide
1313

1414
- In PySpark, when creating a `SparkSession` with `SparkSession.builder.getOrCreate()`, if there is an existing `SparkContext`, the builder was trying to update the `SparkConf` of the existing `SparkContext` with configurations specified to the builder, but the `SparkContext` is shared by all `SparkSession`s, so we should not update them. Since 3.0, the builder comes to not update the configurations. This is the same behavior as Java/Scala API in 2.3 and above. If you want to update them, you need to update them prior to creating a `SparkSession`.
1515

16-
- In Spark version 2.4 and earlier, the parser of JSON data source treats empty strings as null for some data types such as `IntegerType`. For `FloatType` and `DoubleType`, it fails on empty strings and throws exceptions. Since Spark 3.0, we disallow empty strings and will throw exceptions for data types except for `StringType` and `BinaryType`.
17-
1816
- Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`.
1917

2018
- The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.util.control.NonFatal
2626

2727
import com.fasterxml.jackson.core._
2828

29-
import org.apache.spark.internal.Logging
29+
import org.apache.spark.internal.SafeLogging
3030
import org.apache.spark.sql.catalyst.InternalRow
3131
import org.apache.spark.sql.catalyst.expressions._
3232
import org.apache.spark.sql.catalyst.util._
@@ -40,7 +40,7 @@ import org.apache.spark.util.Utils
4040
class JacksonParser(
4141
schema: DataType,
4242
val options: JSONOptions,
43-
allowArrayAsStructs: Boolean) extends Logging {
43+
allowArrayAsStructs: Boolean) extends SafeLogging {
4444

4545
import JacksonUtils._
4646
import com.fasterxml.jackson.core.JsonToken._
@@ -174,7 +174,7 @@ class JacksonParser(
174174
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
175175
parser.getFloatValue
176176

177-
case VALUE_STRING if parser.getTextLength >= 1 =>
177+
case VALUE_STRING =>
178178
// Special case handling for NaN and Infinity.
179179
parser.getText match {
180180
case "NaN" => Float.NaN
@@ -190,7 +190,7 @@ class JacksonParser(
190190
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
191191
parser.getDoubleValue
192192

193-
case VALUE_STRING if parser.getTextLength >= 1 =>
193+
case VALUE_STRING =>
194194
// Special case handling for NaN and Infinity.
195195
parser.getText match {
196196
case "NaN" => Double.NaN
@@ -217,7 +217,7 @@ class JacksonParser(
217217

218218
case TimestampType =>
219219
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
220-
case VALUE_STRING if parser.getTextLength >= 1 =>
220+
case VALUE_STRING =>
221221
val stringValue = parser.getText
222222
// This one will lose microseconds parts.
223223
// See https://issues.apache.org/jira/browse/SPARK-10681.
@@ -236,7 +236,7 @@ class JacksonParser(
236236

237237
case DateType =>
238238
(parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
239-
case VALUE_STRING if parser.getTextLength >= 1 =>
239+
case VALUE_STRING =>
240240
val stringValue = parser.getText
241241
// This one will lose microseconds parts.
242242
// See https://issues.apache.org/jira/browse/SPARK-10681.x
@@ -319,17 +319,18 @@ class JacksonParser(
319319
}
320320

321321
/**
322-
* This function throws an exception for failed conversion. For empty string on data types
323-
* except for string and binary types, this also throws an exception.
322+
* This function throws an exception for failed conversion, but returns null for empty string,
323+
* to guard the non string types.
324324
*/
325325
private def failedConversion[R >: Null](
326326
parser: JsonParser,
327327
dataType: DataType): PartialFunction[JsonToken, R] = {
328-
329-
// SPARK-25040: Disallow empty strings for data types except for string and binary types.
330328
case VALUE_STRING if parser.getTextLength < 1 =>
331-
throw new RuntimeException(
332-
s"Failed to parse an empty string for data type ${dataType.catalogString}")
329+
safeLogInfo("Would have dropped empty record. " +
330+
"Likely you have empty strings for nested values")
331+
// If conversion is failed, this produces `null` rather than throwing exception.
332+
// This will protect the mismatch of types.
333+
null
333334

334335
case token =>
335336
// We cannot parse this token based on the given data type. So, we throw a

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2515,41 +2515,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
25152515
countForMalformedJSON(0, Seq(""))
25162516
}
25172517

2518-
test("SPARK-25040: empty strings should be disallowed") {
2519-
def failedOnEmptyString(dataType: DataType): Unit = {
2520-
val df = spark.read.schema(s"a ${dataType.catalogString}")
2521-
.option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS)
2522-
val errMessage = intercept[SparkException] {
2523-
df.collect()
2524-
}.getMessage
2525-
assert(errMessage.contains(
2526-
s"Failed to parse an empty string for data type ${dataType.catalogString}"))
2527-
}
2528-
2529-
def emptyString(dataType: DataType, expected: Any): Unit = {
2530-
val df = spark.read.schema(s"a ${dataType.catalogString}")
2531-
.option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS)
2532-
checkAnswer(df, Row(expected) :: Nil)
2533-
}
2534-
2535-
failedOnEmptyString(BooleanType)
2536-
failedOnEmptyString(ByteType)
2537-
failedOnEmptyString(ShortType)
2538-
failedOnEmptyString(IntegerType)
2539-
failedOnEmptyString(LongType)
2540-
failedOnEmptyString(FloatType)
2541-
failedOnEmptyString(DoubleType)
2542-
failedOnEmptyString(DecimalType.SYSTEM_DEFAULT)
2543-
failedOnEmptyString(TimestampType)
2544-
failedOnEmptyString(DateType)
2545-
failedOnEmptyString(ArrayType(IntegerType))
2546-
failedOnEmptyString(MapType(StringType, IntegerType, true))
2547-
failedOnEmptyString(StructType(StructField("f1", IntegerType, true) :: Nil))
2548-
2549-
emptyString(StringType, "")
2550-
emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8))
2551-
}
2552-
25532518
test("do not produce empty files for empty partitions") {
25542519
withTempPath { dir =>
25552520
val path = dir.getCanonicalPath

0 commit comments

Comments
 (0)