diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 5f7d5696b71a..aa545e1a0c00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -183,11 +183,19 @@ class UnivocityParser( } } + private val doParse = if (schema.nonEmpty) { + (input: String) => convert(tokenizer.parseLine(input)) + } else { + // If `columnPruning` enabled and partition attributes scanned only, + // `schema` gets empty. + (_: String) => InternalRow.empty + } + /** * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + def parse(input: String): InternalRow = doParse(input) private def convert(tokens: Array[String]): InternalRow = { if (tokens.length != schema.length) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index d2f166c7d187..d7a99a022185 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1602,4 +1602,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(testAppender2.events.asScala .exists(msg => msg.getRenderedMessage.contains("CSV header does not conform to the schema"))) } + + test("SPARK-24645 skip parsing when columnPruning enabled and partitions scanned only") { + withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "true") { + withTempPath { path => + val dir = path.getAbsolutePath + spark.range(10).selectExpr("id % 2 AS p", "id").write.partitionBy("p").csv(dir) + checkAnswer(spark.read.csv(dir).selectExpr("sum(p)"), Row(5)) + } + } + } }