diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 22fd69b2ce0da..61b1b437da04c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -67,11 +67,10 @@ class UnivocityParser( // their positions in the data schema. private val parsedSchema = if (options.columnPruning) requiredSchema else dataSchema - val tokenizer = { + val tokenizer: CsvParser = { val parserSetting = options.asParserSettings // When to-be-parsed schema is shorter than the to-be-read data schema, we let Univocity CSV // parser select a sequence of fields for reading by their positions. - // if (options.columnPruning && requiredSchema.length < dataSchema.length) { if (parsedSchema.length < dataSchema.length) { parserSetting.selectIndexes(tokenIndexArr: _*) } @@ -203,20 +202,21 @@ class UnivocityParser( } } - private val doParse = if (options.columnPruning && requiredSchema.isEmpty) { - // If `columnPruning` enabled and partition attributes scanned only, - // `schema` gets empty. - (_: String) => Some(InternalRow.empty) - } else { - // parse if the columnPruning is disabled or requiredSchema is nonEmpty - (input: String) => convert(tokenizer.parseLine(input)) - } - /** * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): Option[InternalRow] = doParse(input) + val parse: String => Option[InternalRow] = { + // This is intentionally a val to create a function once and reuse. + if (options.columnPruning && requiredSchema.isEmpty) { + // If `columnPruning` enabled and partition attributes scanned only, + // `schema` gets empty. + (_: String) => Some(InternalRow.empty) + } else { + // parse if the columnPruning is disabled or requiredSchema is nonEmpty + (input: String) => convert(tokenizer.parseLine(input)) + } + } private val getToken = if (options.columnPruning) { (tokens: Array[String], index: Int) => tokens(index) @@ -232,20 +232,13 @@ class UnivocityParser( new RuntimeException("Malformed CSV record")) } - var checkedTokens = tokens - var badRecordException: Option[Throwable] = None - - if (tokens.length != parsedSchema.length) { + var badRecordException: Option[Throwable] = if (tokens.length != parsedSchema.length) { // If the number of tokens doesn't match the schema, we should treat it as a malformed record. - // However, we still have chance to parse some of the tokens, by adding extra null tokens in - // the tail if the number is smaller, or by dropping extra tokens if the number is larger. - checkedTokens = if (parsedSchema.length > tokens.length) { - tokens ++ new Array[String](parsedSchema.length - tokens.length) - } else { - tokens.take(parsedSchema.length) - } - badRecordException = Some(new RuntimeException("Malformed CSV record")) - } + // However, we still have chance to parse some of the tokens. It continues to parses the + // tokens normally and sets null when `ArrayIndexOutOfBoundsException` occurs for missing + // tokens. + Some(new RuntimeException("Malformed CSV record")) + } else None // When the length of the returned tokens is identical to the length of the parsed schema, // we just need to: // 1. Convert the tokens that correspond to the required schema. @@ -255,15 +248,14 @@ class UnivocityParser( var skipRow = false while (i < requiredSchema.length) { try { - if (!skipRow) { + if (skipRow) { + row.setNullAt(i) + } else { row(i) = valueConverters(i).apply(getToken(tokens, i)) if (csvFilters.skipRow(row, i)) { skipRow = true } } - if (skipRow) { - row.setNullAt(i) - } } catch { case NonFatal(e) => badRecordException = badRecordException.orElse(Some(e))