Skip to content

Commit 1a527bd

Browse files
gatorsmilecloud-fan
authored andcommitted
[SPARK-20976][SQL] Unify Error Messages for FAILFAST mode
### What changes were proposed in this pull request? Before 2.2, we indicate the job was terminated because of `FAILFAST` mode. ``` Malformed line in FAILFAST mode: {"a":{, b:3} ``` If possible, we should keep it. This PR is to unify the error messages. ### How was this patch tested? Modified the existing messages. Author: Xiao Li <[email protected]> Closes #18196 from gatorsmile/messFailFast.
1 parent 55b8cfe commit 1a527bd

File tree

4 files changed

+21
-14
lines changed

4 files changed

+21
-14
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ class JacksonParser(
278278
// We cannot parse this token based on the given data type. So, we throw a
279279
// RuntimeException and this exception will be caught by `parse` method.
280280
throw new RuntimeException(
281-
s"Failed to parse a value for data type $dataType (current token: $token).")
281+
s"Failed to parse a value for data type ${dataType.catalogString} (current token: $token).")
282282
}
283283

284284
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20+
import org.apache.spark.SparkException
2021
import org.apache.spark.sql.catalyst.InternalRow
2122
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
2223
import org.apache.spark.sql.catalyst.util._
@@ -65,7 +66,8 @@ class FailureSafeParser[IN](
6566
case DropMalformedMode =>
6667
Iterator.empty
6768
case FailFastMode =>
68-
throw e.cause
69+
throw new SparkException("Malformed records are detected in record parsing. " +
70+
s"Parse Mode: ${FailFastMode.name}.", e.cause)
6971
}
7072
}
7173
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.Comparator
2121

2222
import com.fasterxml.jackson.core._
2323

24+
import org.apache.spark.SparkException
2425
import org.apache.spark.rdd.RDD
2526
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
2627
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
@@ -61,7 +62,8 @@ private[sql] object JsonInferSchema {
6162
case DropMalformedMode =>
6263
None
6364
case FailFastMode =>
64-
throw e
65+
throw new SparkException("Malformed records are detected in schema inference. " +
66+
s"Parse Mode: ${FailFastMode.name}.", e)
6567
}
6668
}
6769
}
@@ -231,8 +233,9 @@ private[sql] object JsonInferSchema {
231233

232234
case FailFastMode =>
233235
// If `other` is not struct type, consider it as malformed one and throws an exception.
234-
throw new RuntimeException("Failed to infer a common schema. Struct types are expected" +
235-
s" but ${other.catalogString} was found.")
236+
throw new SparkException("Malformed records are detected in schema inference. " +
237+
s"Parse Mode: ${FailFastMode.name}. Reasons: Failed to infer a common schema. " +
238+
s"Struct types are expected, but `${other.catalogString}` was found.")
236239
}
237240

238241
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,24 +1036,24 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
10361036
}
10371037

10381038
test("Corrupt records: FAILFAST mode") {
1039-
val schema = StructType(
1040-
StructField("a", StringType, true) :: Nil)
10411039
// `FAILFAST` mode should throw an exception for corrupt records.
10421040
val exceptionOne = intercept[SparkException] {
10431041
spark.read
10441042
.option("mode", "FAILFAST")
10451043
.json(corruptRecords)
1046-
}
1047-
assert(exceptionOne.getMessage.contains("JsonParseException"))
1044+
}.getMessage
1045+
assert(exceptionOne.contains(
1046+
"Malformed records are detected in schema inference. Parse Mode: FAILFAST."))
10481047

10491048
val exceptionTwo = intercept[SparkException] {
10501049
spark.read
10511050
.option("mode", "FAILFAST")
1052-
.schema(schema)
1051+
.schema("a string")
10531052
.json(corruptRecords)
10541053
.collect()
1055-
}
1056-
assert(exceptionTwo.getMessage.contains("JsonParseException"))
1054+
}.getMessage
1055+
assert(exceptionTwo.contains(
1056+
"Malformed records are detected in record parsing. Parse Mode: FAILFAST."))
10571057
}
10581058

10591059
test("Corrupt records: DROPMALFORMED mode") {
@@ -1944,7 +1944,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
19441944
.option("mode", "FAILFAST")
19451945
.json(path)
19461946
}
1947-
assert(exceptionOne.getMessage.contains("Failed to infer a common schema"))
1947+
assert(exceptionOne.getMessage.contains("Malformed records are detected in schema " +
1948+
"inference. Parse Mode: FAILFAST."))
19481949

19491950
val exceptionTwo = intercept[SparkException] {
19501951
spark.read
@@ -1954,7 +1955,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
19541955
.json(path)
19551956
.collect()
19561957
}
1957-
assert(exceptionTwo.getMessage.contains("Failed to parse a value"))
1958+
assert(exceptionTwo.getMessage.contains("Malformed records are detected in record " +
1959+
"parsing. Parse Mode: FAILFAST."))
19581960
}
19591961
}
19601962

0 commit comments

Comments
 (0)