Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, ParseModes}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, BadRecordException, GenericArrayData, ParseModes}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -555,7 +555,7 @@ case class JsonToStruct(
CreateJacksonParser.utf8String,
identity[UTF8String]))
} catch {
case _: SparkSQLJsonProcessingException => null
case _: BadRecordException => null
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[sql] class JSONOptions(
val allowBackslashEscapingAnyCharacter =
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about creating an enum, like what we are doing for SaveMode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea this can be a good follow-up

val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,14 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)

/**
* Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
*/
class JacksonParser(
schema: StructType,
options: JSONOptions) extends Logging {
val options: JSONOptions) extends Logging {

import JacksonUtils._
import ParseModes._
import com.fasterxml.jackson.core.JsonToken._

// A `ValueConverter` is responsible for converting a value from `JsonParser`
Expand All @@ -55,107 +52,7 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)

private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))

private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
corruptFieldIndex.foreach { corrFieldIndex =>
require(schema(corrFieldIndex).dataType == StringType)
require(schema(corrFieldIndex).nullable)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above checking sounds missing in the new codes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a sanity check, actually this check is already done in DataFrameReader.csv/json and JsonFileFormat/CSVFileFormat


@transient
private[this] var isWarningPrinted: Boolean = false

@transient
private def printWarningForMalformedRecord(record: () => UTF8String): Unit = {
def sampleRecord: String = {
if (options.wholeFile) {
""
} else {
s"Sample record: ${record()}\n"
}
}

def footer: String = {
s"""Code example to print all malformed records (scala):
|===================================================
|// The corrupted record exists in column ${options.columnNameOfCorruptRecord}.
|val parsedJson = spark.read.json("/path/to/json/file/test.json")
|
""".stripMargin
}

if (options.permissive) {
logWarning(
s"""Found at least one malformed record. The JSON reader will replace
|all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
|To find out which corrupted records have been replaced with null, please use the
|default inferred schema instead of providing a custom schema.
|
|${sampleRecord ++ footer}
|
""".stripMargin)
} else if (options.dropMalformed) {
logWarning(
s"""Found at least one malformed record. The JSON reader will drop
|all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
|corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
|mode and use the default inferred schema.
|
|${sampleRecord ++ footer}
|
""".stripMargin)
}
}

@transient
private def printWarningIfWholeFile(): Unit = {
if (options.wholeFile && corruptFieldIndex.isDefined) {
logWarning(
s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord may result
|in very large allocations or OutOfMemoryExceptions being raised.
|
""".stripMargin)
}
}

/**
* This function deals with the cases it fails to parse. This function will be called
* when exceptions are caught during converting. This functions also deals with `mode` option.
*/
private def failedRecord(record: () => UTF8String): Seq[InternalRow] = {
corruptFieldIndex match {
case _ if options.failFast =>
if (options.wholeFile) {
throw new SparkSQLJsonProcessingException("Malformed line in FAILFAST mode")
} else {
throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: ${record()}")
}

case _ if options.dropMalformed =>
if (!isWarningPrinted) {
printWarningForMalformedRecord(record)
isWarningPrinted = true
}
Nil

case None =>
if (!isWarningPrinted) {
printWarningForMalformedRecord(record)
isWarningPrinted = true
}
emptyRow

case Some(corruptIndex) =>
if (!isWarningPrinted) {
printWarningIfWholeFile()
isWarningPrinted = true
}
val row = new GenericInternalRow(schema.length)
row.update(corruptIndex, record())
Seq(row)
}
}
private val emptyRow = new GenericInternalRow(schema.length)

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
Expand Down Expand Up @@ -239,7 +136,7 @@ class JacksonParser(
lowerCaseValue.equals("-inf")) {
value.toFloat
} else {
throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
throw new RuntimeException(s"Cannot parse $value as FloatType.")
}
}

Expand All @@ -259,7 +156,7 @@ class JacksonParser(
lowerCaseValue.equals("-inf")) {
value.toDouble
} else {
throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
throw new RuntimeException(s"Cannot parse $value as DoubleType.")
}
}

Expand Down Expand Up @@ -391,9 +288,9 @@ class JacksonParser(

case token =>
// We cannot parse this token based on the given data type. So, we throw a
// SparkSQLJsonProcessingException and this exception will be caught by
// SparkSQLRuntimeException and this exception will be caught by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RuntimeException ?

// `parse` method.
throw new SparkSQLJsonProcessingException(
throw new RuntimeException(
s"Failed to parse a value for data type $dataType (current token: $token).")
}

Expand Down Expand Up @@ -466,14 +363,14 @@ class JacksonParser(
parser.nextToken() match {
case null => Nil
case _ => rootConverter.apply(parser) match {
case null => throw new SparkSQLJsonProcessingException("Root converter returned null")
case null => throw new RuntimeException("Root converter returned null")
case rows => rows
}
}
}
} catch {
case _: JsonProcessingException | _: SparkSQLJsonProcessingException =>
failedRecord(() => recordLiteral(record))
case e @ (_: RuntimeException | _: JsonProcessingException) =>
throw BadRecordException(() => recordLiteral(record), () => emptyRow, e)
}
}
}
Loading