Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -230,64 +230,55 @@ class UnivocityParser(
() => getCurrentInput,
() => None,
new RuntimeException("Malformed CSV record"))
} else if (tokens.length != parsedSchema.length) {
}

var checkedTokens = tokens
var badRecordException: Option[Throwable] = None

if (tokens.length != parsedSchema.length) {
// If the number of tokens doesn't match the schema, we should treat it as a malformed record.
// However, we still have chance to parse some of the tokens, by adding extra null tokens in
// the tail if the number is smaller, or by dropping extra tokens if the number is larger.
val checkedTokens = if (parsedSchema.length > tokens.length) {
checkedTokens = if (parsedSchema.length > tokens.length) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this checkedTokens now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems not. The if can be replaced by:

    var badRecordException: Option[Throwable] = if (tokens.length != parsedSchema.length) {
      // If the number of tokens doesn't match the schema, we should treat it as a malformed record.
      Some(new RuntimeException("Malformed CSV record"))
    } else None

Let me do that in a follow up PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I already did at #27287. Let me address this comment there.

tokens ++ new Array[String](parsedSchema.length - tokens.length)
} else {
tokens.take(parsedSchema.length)
}
def getPartialResult(): Option[InternalRow] = {
try {
convert(checkedTokens).headOption
} catch {
case _: BadRecordException => None
}
}
// For records with less or more tokens than the schema, tries to return partial results
// if possible.
throw BadRecordException(
() => getCurrentInput,
() => getPartialResult(),
new RuntimeException("Malformed CSV record"))
} else {
// When the length of the returned tokens is identical to the length of the parsed schema,
// we just need to:
// 1. Convert the tokens that correspond to the required schema.
// 2. Apply the pushdown filters to `requiredRow`.
var i = 0
val row = requiredRow.head
var skipRow = false
var badRecordException: Option[Throwable] = None
while (i < requiredSchema.length) {
try {
if (!skipRow) {
row(i) = valueConverters(i).apply(getToken(tokens, i))
if (csvFilters.skipRow(row, i)) {
skipRow = true
}
}
if (skipRow) {
row.setNullAt(i)
badRecordException = Some(new RuntimeException("Malformed CSV record"))
}
// When the length of the returned tokens is identical to the length of the parsed schema,
// we just need to:
// 1. Convert the tokens that correspond to the required schema.
// 2. Apply the pushdown filters to `requiredRow`.
var i = 0
val row = requiredRow.head
var skipRow = false
while (i < requiredSchema.length) {
try {
if (!skipRow) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

if (skipRow) {
  row.setNullAt(i)
} else {
  row(i) = valueConverters(i).apply(getToken(tokens, i))
  if (csvFilters.skipRow(row, i)) {
    skipRow = true
  }
}

row(i) = valueConverters(i).apply(getToken(tokens, i))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the first column is corrupted, and the predicate is first_col is null, what will happen?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 3 cases:

  1. Univocity parser is not able to parse its input. For example, it faced to wrong Unicode symbol. In that case, it return null in tokens, and BadRecordException will be raised here
    throw BadRecordException(
    () => getCurrentInput,
    () => None,
    new RuntimeException("Malformed CSV record"))
  2. Univocity parser returns null in the first token. In this case, we will try to convert null to desired type according to requiredSchema. Most likely, the conversion raises an exception which is will be converted BadRecordException here
    badRecordException = badRecordException.orElse(Some(e))
    and here
    2.1 If conversion doesn't fail, the is null filter will be applied to the value and row could be passed to upper layer.
  3. Univocity parser returns a valid string at index 0 in tokens but conversion fails at
    row(i) = valueConverters(i).apply(getToken(tokens, i))
    with some exception. Similar situation to 2. The exception will be handled, and transformed to BadRecordException.

New implementation with filters pushdown does not change the behavior in those cases.

if (csvFilters.skipRow(row, i)) {
skipRow = true
}
} catch {
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
row.setNullAt(i)
}
i += 1
if (skipRow) {
row.setNullAt(i)
}
} catch {
case NonFatal(e) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we rely on nulls already exiting in the array. Now we rely on java.lang.ArrayIndexOutOfBoundsException. I don't particularly like this approach .. but I'm good as it does simplify the codes.

badRecordException = badRecordException.orElse(Some(e))
row.setNullAt(i)
}
if (skipRow) {
noRows
i += 1
}
if (skipRow) {
noRows
} else {
if (badRecordException.isDefined) {
throw BadRecordException(
() => getCurrentInput, () => requiredRow.headOption, badRecordException.get)
} else {
if (badRecordException.isDefined) {
throw BadRecordException(
() => getCurrentInput, () => requiredRow.headOption, badRecordException.get)
} else {
requiredRow
}
requiredRow
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2270,4 +2270,28 @@ class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData {
}
}
}

test("SPARK-30530: apply filters to malformed rows") {
withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { path =>
Seq(
"100.0,1.0,",
"200.0,,",
"300.0,3.0,",
"1.0,4.0,",
",4.0,",
"500.0,,",
",6.0,",
"-500.0,50.5").toDF("data")
.repartition(1)
.write.text(path.getAbsolutePath)
val schema = new StructType().add("floats", FloatType).add("more_floats", FloatType)
val readback = spark.read
.schema(schema)
.csv(path.getAbsolutePath)
.filter("floats is null")
checkAnswer(readback, Seq(Row(null, 4.0), Row(null, 6.0)))
}
}
}
}