Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,23 +240,25 @@ object TextInputCSVDataSource extends CSVDataSource {
sparkSession: SparkSession,
csv: Dataset[String],
maybeFirstLine: Option[String],
parsedOptions: CSVOptions): StructType = maybeFirstLine match {
case Some(firstLine) =>
val firstRow = new CsvParser(parsedOptions.asParserSettings).parseLine(firstLine)
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions)
val tokenRDD = sampled.rdd.mapPartitions { iter =>
val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions)
val linesWithoutHeader =
CSVUtils.filterHeaderLine(filteredLines, firstLine, parsedOptions)
val parser = new CsvParser(parsedOptions.asParserSettings)
linesWithoutHeader.map(parser.parseLine)
}
CSVInferSchema.infer(tokenRDD, header, parsedOptions)
case None =>
// If the first line could not be read, just return the empty schema.
StructType(Nil)
parsedOptions: CSVOptions): StructType = {
val csvParser = new CsvParser(parsedOptions.asParserSettings)
maybeFirstLine.map(csvParser.parseLine(_)) match {
case Some(firstRow) if firstRow != null =>
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions)
val tokenRDD = sampled.rdd.mapPartitions { iter =>
val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions)
val linesWithoutHeader =
CSVUtils.filterHeaderLine(filteredLines, maybeFirstLine.get, parsedOptions)
val parser = new CsvParser(parsedOptions.asParserSettings)
linesWithoutHeader.map(parser.parseLine)
}
CSVInferSchema.infer(tokenRDD, header, parsedOptions)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk, BTW what happen if the second line is the malfromed record and it returns null? From a cursory look, schema inference looks going to throw an NPE exception.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I have checked this on (with header too):

    val input = spark.createDataset(Seq("1", "\u0000\u0000\u0001234"))

    val df = spark.read.option("inferSchema", true).csv(input)
    df.printSchema()
    df.show()
root
 |-- _c0: integer (nullable = true)

+----+
| _c0|
+----+
|   1|
|null|
+----+

In the debugger, I didn't observe null in

private def inferRowType(options: CSVOptions)
(rowSoFar: Array[DataType], next: Array[String]): Array[DataType] = {
var i = 0
while (i < math.min(rowSoFar.length, next.length)) { // May have columns on right missing.
rowSoFar(i) = inferField(rowSoFar(i), next(i), options)
i+=1
}
rowSoFar
}
.

case _ =>
// If the first line could not be read, just return the empty schema.
StructType(Nil)
}
}

private def createBaseDataset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,12 @@ class UnivocityParser(
}

private def convert(tokens: Array[String]): InternalRow = {
if (tokens.length != parsedSchema.length) {
if (tokens == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will we hit this?

Copy link
Copy Markdown
Member Author

@MaxGekk MaxGekk Sep 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it on a CSV file with some marks (a couple zero bytes) at the beginning but uniVocity parser returns null in many cases when it cannot read/parse input, for example: https://github.com/uniVocity/univocity-parsers/blob/f616d151b48150bc9cb98943f9b6f8353b704359/src/main/java/com/univocity/parsers/common/AbstractParser.java#L663

throw BadRecordException(
() => getCurrentInput,
() => None,
new RuntimeException("Malformed CSV record"))
} else if (tokens.length != parsedSchema.length) {
// If the number of tokens doesn't match the schema, we should treat it as a malformed record.
// However, we still have chance to parse some of the tokens, by adding extra null tokens in
// the tail if the number is smaller, or by dropping extra tokens if the number is larger.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.log4j.{AppenderSkeleton, LogManager}
import org.apache.log4j.spi.LoggingEvent

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, UDT}
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
Expand Down Expand Up @@ -1700,4 +1700,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
checkCount(2)
countForMalformedCSV(0, Seq(""))
}

test("SPARK-25387: bad input should not cause NPE") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, in this title, bad CSV means what (bad unicode?)? In this case, the CSV parser returns null and, in another case, it throws com.univocity.parsers.common.TextParsingException? I just want to know the behaivour in the parser.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


checkAnswer(spark.read.schema(schema).csv(input), Row(null))
checkAnswer(spark.read.option("multiLine", true).schema(schema).csv(input), Row(null))
assert(spark.read.csv(input).collect().toSet == Set(Row()))
}
}