Skip to content

Conversation

@maropu
Copy link
Member

@maropu maropu commented Jun 25, 2018

What changes were proposed in this pull request?

In the master, when csvColumnPruning(implemented in this commit) enabled and partitions scanned only, it throws an exception below;

scala> val dir = "/tmp/spark-csv/csv"
scala> spark.range(10).selectExpr("id % 2 AS p", "id").write.mode("overwrite").partitionBy("p").csv(dir)
scala> spark.read.csv(dir).selectExpr("sum(p)").collect()
18/06/25 13:12:51 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 5)
java.lang.NullPointerException
        at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert(UnivocityParser.scala:197)  
        at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:190)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:309)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:309)
        at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:61)
        ...

This pr modified code to skip CSV parsing in the case.

How was this patch tested?

Added tests in CSVSuite.

@maropu
Copy link
Member Author

maropu commented Jun 25, 2018

cc: @HyukjinKwon @MaxGekk

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu, mind if I ask to run the benchmark added in #21415 again?

@maropu
Copy link
Member Author

maropu commented Jun 25, 2018

@HyukjinKwon sure, I'll do

withTempPath { path =>
val dir = path.getAbsolutePath
spark.range(10).selectExpr("id % 2 AS p", "id").write.partitionBy("p").csv(dir)
spark.read.csv(dir).selectExpr("sum(p)").collect()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I forgot to add assert here. I'll update soon.

@maropu
Copy link
Member Author

maropu commented Jun 25, 2018

As I described in #21625 (comment), I found another bug? (the case where spark.sql.csv.parser.columnPruning.enabled=false) when working on this pr;

./bin/spark-shell --conf spark.sql.csv.parser.columnPruning.enabled=false
scala> val dir = "/tmp/spark-csv/csv"
scala> spark.range(10).selectExpr("id % 2 AS p", "id").write.mode("overwrite").partitionBy("p").csv(dir)
scala> spark.read.csv(dir).selectExpr("sum(p)").collect()
18/06/25 13:48:46 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 7)
java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
        at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:41)
        ...

I worked on this fix and made a patch to fix this; master...maropu:SPARK-24645-2

@gatorsmile
Copy link
Member

@maropu Could you confirm whether these two bugs are regressions in the master branch?

@maropu
Copy link
Member Author

maropu commented Jun 25, 2018

yea, I think this is regressions because I checked that the query above passed in the master before this commit.

@gatorsmile
Copy link
Member

Both?

@SparkQA
Copy link

SparkQA commented Jun 25, 2018

Test build #92284 has finished for PR 21631 at commit c7362dc.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 25, 2018

Test build #92282 has finished for PR 21631 at commit 59a7f14.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jun 25, 2018

yea, I checked the two queries with/without column pruning in the master;

./bin/spark-shell --conf spark.sql.csv.parser.columnPruning.enabled=true (default)
scala> val dir = "/tmp/spark-csv/csv"

// Q1: NG
scala> spark.range(10).selectExpr("id % 2 AS p", "id").write.mode("overwrite").partitionBy("p").csv(dir)
scala> spark.read.csv(dir).selectExpr("sum(p)").collect()
18/06/25 16:16:30 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 5)
java.lang.NullPointerException
        at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert(UnivocityParser.scal
a:193)  
        at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:190)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:305)

// Q2: OK
scala> spark.range(10).selectExpr("id % 2 AS p", "id AS c0", "id AS c1").write.mode("overwrite").partitionBy("p").csv(dir)
scala> spark.read.csv(dir).selectExpr("sum(p)", "avg(_c0)").collect()
res11: Array[org.apache.spark.sql.Row] = Array([5,4.5])




./bin/spark-shell --conf spark.sql.csv.parser.columnPruning.enabled=false
// Q1: NG
scala> spark.range(10).selectExpr("id % 2 AS p", "id").write.mode("overwrite").partitionBy("p").csv(dir)
scala> spark.read.csv(dir).selectExpr("sum(p)").collect()
Caused by: java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
  at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
  at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:41)
  at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:195)
  at org.apache.spark.sql.catalyst.expressions.JoinedRow.getInt(JoinedRow.scala:82)

// Q2: NG
scala> spark.range(10).selectExpr("id % 2 AS p", "id AS c0", "id AS c1").write.mode("overwrite").partitionBy("p").csv(dir)
scala> spark.read.csv(dir).selectExpr("sum(p)", "avg(_c0)").collect()
18/06/25 16:23:55 ERROR TaskSetManager: Task 0 in stage 14.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 32, loca
lhost, executor driver): java.lang.ClassCastException
Caused by: java.lang.ClassCastException

The two queries is ok in v2.3.1 and in the master before the commit.

@maropu
Copy link
Member Author

maropu commented Jun 25, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Jun 25, 2018

Test build #92293 has finished for PR 21631 at commit c7362dc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member

MaxGekk commented Jun 25, 2018

Looking at the NullPointerException, it comes from the line:

if (tokens.length != schema.length) {

where tokens is null returned by parseLine of UnivocityParser. The parseLine method is implemented in the way that it can return null in some cases:
https://github.com/uniVocity/univocity-parsers/blob/v2.6.4/src/main/java/com/univocity/parsers/common/AbstractParser.java#L636-L679

I think the right fix would be to check result of parseLine on null explicitly and throw BadRecordException if it makes sense.

Just in case, I have checked the commit before my changes - it also doesn't contain checking returned value of parseLine on null.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jun 25, 2018

Wouldn't it be better to check schema if it works instead of value per record?

@MaxGekk
Copy link
Member

MaxGekk commented Jun 25, 2018

I have found the places inside of UnivocityParser from where the null comes. It is interesting that null is returned for valid input string "8". See the screenshot:
npe_parseline

The "8" string is at the end of a file. The file doesn't contain anything unusual:

hexdump -C /private/var/folders/vt/f...c000.csv
00000000  36 0a 38 0a                                       |6.8.|
00000004

@maropu
Copy link
Member Author

maropu commented Jun 25, 2018

@MaxGekk yea, I noticed that behaviour. Probably, in case we set an empty array in CommonSettings.selectIndexes, it seems UnivocityParser returns null for valid input? I'm not sure setting an empty array there is valid. Anyway, to fix this bug, I think we don't need to check values record-by-record because reuiqredSchema is empty as @HyukjinKwon said.

@MaxGekk
Copy link
Member

MaxGekk commented Jun 25, 2018

It seems null for Univocity's parserLine is normal way to indicate about an error. Should we handle nulls and throw BadRecordException instead of propagating NPE to user's app I mean other cases when null can be returned?

@HyukjinKwon
Copy link
Member

So you mean it's a bug in Univocity? that's another fix for a bug existing in Univocity then. We could work around this bug if it's clear that's a bug. I would suggest to open a bug there if we are not sure in any event. FWIW, their support is quite responsive when the issue is reported with a clear reproducer and descriptions. this issue sounds slightly orthogonal to the JIRA itself though.

@MaxGekk
Copy link
Member

MaxGekk commented Jun 25, 2018

So you mean it's a bug in Univocity?

No, I mean we don't handle null from Univocity's parseLine at all (in another situations), and we just propagate NullPointerException to an user instead of producing of BadRecordException. The BadRecordException can be handled differently there

def parse(input: IN): Iterator[InternalRow] = {
try {
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
} catch {
case e: BadRecordException => mode match {
case PermissiveMode =>
Iterator(toResultRow(e.partialResult(), e.record))
case DropMalformedMode =>
Iterator.empty
case FailFastMode =>
throw new SparkException("Malformed records are detected in record parsing. " +
s"Parse Mode: ${FailFastMode.name}.", e.cause)
}
}
}
depend on current mode. In DropMalformedMode mode, for example, we could just drop the line instead of propagating NPE to user space.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jun 25, 2018

I mean null is returned for valid input string "8". I thought this is a bug. If there's valid case returning null, yea we should handle null of course if that's only the way to handle it but the case you mentioned sounds like a bug, which should probably be worked around in Spark (or bumping up the version of Univocity if they happen to make a release). But first we should be sure if it's a bug or not for this anyway.

@MaxGekk
Copy link
Member

MaxGekk commented Jun 25, 2018

But first we should be sure if it's a bug or not for this anyway.

I will try to reproduce it on small example without Spark. I am not sure what the expected behavior should be if set of selected columns is empty. Should the parseLine return null, empty array of Strings or something else. Will see...

}
}

private lazy val doParse = if (schema.nonEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need lazy here. In most cases, time interval between calling of the constructor and the parse() method is pretty short. I don't think we win something here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I missed. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recheck: it seems parse is called in executor sides only, so we can add lazy here to avoid unnecessary instantiation?

@SparkQA
Copy link

SparkQA commented Jun 26, 2018

Test build #92317 has finished for PR 21631 at commit 39811a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member

MaxGekk commented Jun 26, 2018

Here is the test for uniVocity parser: https://github.com/MaxGekk/univocity_tests . For the first line, parseLine outputs empty array but nulls for the next calls. What do you think should I create an issue for uniVocity? I believe output should be consistent and all calls should return the same at least. And I would expect empty array for all calls.

@maropu
Copy link
Member Author

maropu commented Jun 26, 2018

v2.5.9 also have the same behaviour? Anyway, it'd be better to ask the author ;) I asked before and I got quick response.

@MaxGekk
Copy link
Member

MaxGekk commented Jun 26, 2018

v2.5.9 also have the same behaviour?

yes, it is the same.

Anyway, it'd be better to ask the author ;) I asked before and I got quick response.

ok. I will create an issue for uniVocity.

@maropu
Copy link
Member Author

maropu commented Jun 28, 2018

@HyukjinKwon BTW, can you check this?
@MaxGekk Probably, I feel you'd be better to file a new jira for the point you're looking into.

@HyukjinKwon
Copy link
Member

LGTM.

@MaxGekk please take a following action. Will help and check if it's needed.

@HyukjinKwon
Copy link
Member

Merged to master.

@MaxGekk
Copy link
Member

MaxGekk commented Jun 29, 2018

please take a following action. Will help and check if it's needed.

I have opened the issue for uniVocity parser: uniVocity/univocity-parsers#250

@MaxGekk
Copy link
Member

MaxGekk commented Jun 30, 2018

The bug has been already fixed in uniVocity 2.6.5-SNAPSHOT

@maropu
Copy link
Member Author

maropu commented Jul 2, 2018

oh, super quick fix ;) Thanks, @MaxGekk
In the master, do we still hit the bug when parsing csv data?

@MaxGekk
Copy link
Member

MaxGekk commented Jul 27, 2018

do we still hit the bug when parsing csv data?

I have checked uniVocity 2.7.2, there is no problem on this version.

@HyukjinKwon
Copy link
Member

@MaxGekk, thanks. mind opening a PR to upgrade?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants