Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,19 @@ class UnivocityParser(
}
}

private lazy val doParse = if (schema.nonEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need lazy here. In most cases, time interval between calling of the constructor and the parse() method is pretty short. I don't think we win something here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I missed. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recheck: it seems parse is called in executor sides only, so we can add lazy here to avoid unnecessary instantiation?

(input: String) => convert(tokenizer.parseLine(input))
} else {
// If `columnPruning` enabled and partition attributes scanned only,
// `schema` gets empty.
(_: String) => InternalRow.empty
}

/**
* Parses a single CSV string and turns it into either one resulting row or no row (if the
* the record is malformed).
*/
def parse(input: String): InternalRow = convert(tokenizer.parseLine(input))
def parse(input: String): InternalRow = doParse(input)

private def convert(tokens: Array[String]): InternalRow = {
if (tokens.length != schema.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1602,4 +1602,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
assert(testAppender2.events.asScala
.exists(msg => msg.getRenderedMessage.contains("CSV header does not conform to the schema")))
}

test("SPARK-24645 skip parsing when columnPruning enabled and partitions scanned only") {
withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "true") {
withTempPath { path =>
val dir = path.getAbsolutePath
spark.range(10).selectExpr("id % 2 AS p", "id").write.partitionBy("p").csv(dir)
spark.read.csv(dir).selectExpr("sum(p)").collect()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I forgot to add assert here. I'll update soon.

}
}
}
}