Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ class UnivocityParser(
// their positions in the data schema.
private val parsedSchema = if (options.columnPruning) requiredSchema else dataSchema

val tokenizer = {
val tokenizer: CsvParser = {
val parserSetting = options.asParserSettings
// When to-be-parsed schema is shorter than the to-be-read data schema, we let Univocity CSV
// parser select a sequence of fields for reading by their positions.
// if (options.columnPruning && requiredSchema.length < dataSchema.length) {
if (parsedSchema.length < dataSchema.length) {
parserSetting.selectIndexes(tokenIndexArr: _*)
}
Expand Down Expand Up @@ -203,20 +202,21 @@ class UnivocityParser(
}
}

private val doParse = if (options.columnPruning && requiredSchema.isEmpty) {
// If `columnPruning` enabled and partition attributes scanned only,
// `schema` gets empty.
(_: String) => Some(InternalRow.empty)
} else {
// parse if the columnPruning is disabled or requiredSchema is nonEmpty
(input: String) => convert(tokenizer.parseLine(input))
}

/**
* Parses a single CSV string and turns it into either one resulting row or no row (if the
* the record is malformed).
*/
def parse(input: String): Option[InternalRow] = doParse(input)
val parse: String => Option[InternalRow] = {
// This is intentionally a val to create a function once and reuse.
if (options.columnPruning && requiredSchema.isEmpty) {
// If `columnPruning` enabled and partition attributes scanned only,
// `schema` gets empty.
(_: String) => Some(InternalRow.empty)
} else {
// parse if the columnPruning is disabled or requiredSchema is nonEmpty
(input: String) => convert(tokenizer.parseLine(input))
}
}

private val getToken = if (options.columnPruning) {
(tokens: Array[String], index: Int) => tokens(index)
Expand All @@ -232,20 +232,13 @@ class UnivocityParser(
new RuntimeException("Malformed CSV record"))
}

var checkedTokens = tokens
var badRecordException: Option[Throwable] = None

if (tokens.length != parsedSchema.length) {
var badRecordException: Option[Throwable] = if (tokens.length != parsedSchema.length) {
// If the number of tokens doesn't match the schema, we should treat it as a malformed record.
// However, we still have chance to parse some of the tokens, by adding extra null tokens in
// the tail if the number is smaller, or by dropping extra tokens if the number is larger.
checkedTokens = if (parsedSchema.length > tokens.length) {
tokens ++ new Array[String](parsedSchema.length - tokens.length)
} else {
tokens.take(parsedSchema.length)
}
badRecordException = Some(new RuntimeException("Malformed CSV record"))
}
// However, we still have chance to parse some of the tokens. It continues to parses the
// tokens normally and sets null when `ArrayIndexOutOfBoundsException` occurs for missing
// tokens.
Some(new RuntimeException("Malformed CSV record"))
} else None
// When the length of the returned tokens is identical to the length of the parsed schema,
// we just need to:
// 1. Convert the tokens that correspond to the required schema.
Expand All @@ -255,15 +248,14 @@ class UnivocityParser(
var skipRow = false
while (i < requiredSchema.length) {
try {
if (!skipRow) {
if (skipRow) {
row.setNullAt(i)
} else {
row(i) = valueConverters(i).apply(getToken(tokens, i))
if (csvFilters.skipRow(row, i)) {
skipRow = true
}
}
if (skipRow) {
row.setNullAt(i)
}
} catch {
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
Expand Down