Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ class UnivocityParser(
new CsvParser(parserSetting)
}

// Pre-allocated Seq to avoid the overhead of the seq builder.
private val requiredRow = Seq(new GenericInternalRow(requiredSchema.length))
// Pre-allocated Some to avoid the overhead of building Some per each-row.
private val requiredRow = Some(new GenericInternalRow(requiredSchema.length))
// Pre-allocated empty sequence returned when the parsed row cannot pass filters.
// We preallocate it avoid unnecessary invokes of the seq builder.
private val noRows = Seq.empty[InternalRow]
// We preallocate it avoid unnecessary allocations.
private val noRows = None

private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
Expand Down Expand Up @@ -206,7 +206,7 @@ class UnivocityParser(
private val doParse = if (options.columnPruning && requiredSchema.isEmpty) {
// If `columnPruning` enabled and partition attributes scanned only,
// `schema` gets empty.
(_: String) => Seq(InternalRow.empty)
(_: String) => Some(InternalRow.empty)
} else {
// parse if the columnPruning is disabled or requiredSchema is nonEmpty
(input: String) => convert(tokenizer.parseLine(input))
Expand All @@ -216,15 +216,15 @@ class UnivocityParser(
* Parses a single CSV string and turns it into either one resulting row or no row (if the
* the record is malformed).
*/
def parse(input: String): Seq[InternalRow] = doParse(input)
def parse(input: String): Option[InternalRow] = doParse(input)

private val getToken = if (options.columnPruning) {
(tokens: Array[String], index: Int) => tokens(index)
} else {
(tokens: Array[String], index: Int) => tokens(tokenIndexArr(index))
}

private def convert(tokens: Array[String]): Seq[InternalRow] = {
private def convert(tokens: Array[String]): Option[InternalRow] = {
if (tokens == null) {
throw BadRecordException(
() => getCurrentInput,
Expand All @@ -251,7 +251,7 @@ class UnivocityParser(
// 1. Convert the tokens that correspond to the required schema.
// 2. Apply the pushdown filters to `requiredRow`.
var i = 0
val row = requiredRow.head
val row = requiredRow.get
var skipRow = false
while (i < requiredSchema.length) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@ class JacksonParser(
* to a value according to a desired schema. This is a wrapper for the method
* `makeConverter()` to handle a row wrapped with an array.
*/
private def makeRootConverter(dt: DataType): JsonParser => Seq[InternalRow] = {
private def makeRootConverter(dt: DataType): JsonParser => Iterable[InternalRow] = {
dt match {
case st: StructType => makeStructRootConverter(st)
case mt: MapType => makeMapRootConverter(mt)
case at: ArrayType => makeArrayRootConverter(at)
}
}

private def makeStructRootConverter(st: StructType): JsonParser => Seq[InternalRow] = {
private def makeStructRootConverter(st: StructType): JsonParser => Iterable[InternalRow] = {
val elementConverter = makeConverter(st)
val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
(parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, st) {
case START_OBJECT => convertObject(parser, st, fieldConverters) :: Nil
(parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, st) {
case START_OBJECT => Some(convertObject(parser, st, fieldConverters))
// SPARK-3308: support reading top level JSON arrays and take every element
// in such an array as a row
//
Expand All @@ -99,26 +99,26 @@ class JacksonParser(
// Here, as we support reading top level JSON arrays and take every element
// in such an array as a row, this case is possible.
if (array.numElements() == 0) {
Nil
Array.empty[InternalRow]
} else {
array.toArray[InternalRow](schema).toSeq
array.toArray[InternalRow](schema)
}
case START_ARRAY =>
throw new RuntimeException("Parsing JSON arrays as structs is forbidden.")
}
}

private def makeMapRootConverter(mt: MapType): JsonParser => Seq[InternalRow] = {
private def makeMapRootConverter(mt: MapType): JsonParser => Iterable[InternalRow] = {
val fieldConverter = makeConverter(mt.valueType)
(parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, mt) {
case START_OBJECT => Seq(InternalRow(convertMap(parser, fieldConverter)))
(parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, mt) {
case START_OBJECT => Some(InternalRow(convertMap(parser, fieldConverter)))
}
}

private def makeArrayRootConverter(at: ArrayType): JsonParser => Seq[InternalRow] = {
private def makeArrayRootConverter(at: ArrayType): JsonParser => Iterable[InternalRow] = {
val elemConverter = makeConverter(at.elementType)
(parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, at) {
case START_ARRAY => Seq(InternalRow(convertArray(parser, elemConverter)))
(parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, at) {
case START_ARRAY => Some(InternalRow(convertArray(parser, elemConverter)))
case START_OBJECT if at.elementType.isInstanceOf[StructType] =>
// This handles the case when an input JSON object is a structure but
// the specified schema is an array of structures. In that case, the input JSON is
Expand All @@ -140,7 +140,7 @@ class JacksonParser(
//
val st = at.elementType.asInstanceOf[StructType]
val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
Seq(InternalRow(new GenericArrayData(Seq(convertObject(parser, st, fieldConverters)))))
Some(InternalRow(new GenericArrayData(Seq(convertObject(parser, st, fieldConverters)))))
}
}

Expand Down Expand Up @@ -395,13 +395,13 @@ class JacksonParser(
def parse[T](
record: T,
createParser: (JsonFactory, T) => JsonParser,
recordLiteral: T => UTF8String): Seq[InternalRow] = {
recordLiteral: T => UTF8String): Iterable[InternalRow] = {
try {
Utils.tryWithResource(createParser(factory, record)) { parser =>
// a null first token is equivalent to testing for input.trim.isEmpty
// but it works on any token stream and not just strings
parser.nextToken() match {
case null => Nil
case null => None
case _ => rootConverter.apply(parser) match {
case null => throw new RuntimeException("Root converter returned null")
case rows => rows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

class FailureSafeParser[IN](
rawParser: IN => Seq[InternalRow],
rawParser: IN => Iterable[InternalRow],
mode: ParseMode,
schema: StructType,
columnNameOfCorruptRecord: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
dataSchema: StructType = StructType.fromDDL("i INTEGER, s STRING"),
requiredSchema: StructType = StructType.fromDDL("i INTEGER"),
filters: Seq[Filter],
expected: Seq[InternalRow]): Unit = {
expected: Option[InternalRow]): Unit = {
Seq(false, true).foreach { columnPruning =>
val options = new CSVOptions(Map.empty[String, String], columnPruning, "GMT")
val parser = new UnivocityParser(dataSchema, requiredSchema, options, filters)
Expand All @@ -285,26 +285,26 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
}
}

check(filters = Seq(), expected = Seq(InternalRow(1)))
check(filters = Seq(EqualTo("i", 1)), expected = Seq(InternalRow(1)))
check(filters = Seq(EqualTo("i", 2)), expected = Seq())
check(filters = Seq(), expected = Some(InternalRow(1)))
check(filters = Seq(EqualTo("i", 1)), expected = Some(InternalRow(1)))
check(filters = Seq(EqualTo("i", 2)), expected = None)
check(
requiredSchema = StructType.fromDDL("s STRING"),
filters = Seq(StringStartsWith("s", "b")),
expected = Seq())
expected = None)
check(
requiredSchema = StructType.fromDDL("i INTEGER, s STRING"),
filters = Seq(StringStartsWith("s", "a")),
expected = Seq(InternalRow(1, UTF8String.fromString("a"))))
expected = Some(InternalRow(1, UTF8String.fromString("a"))))
check(
input = "1,a,3.14",
dataSchema = StructType.fromDDL("i INTEGER, s STRING, d DOUBLE"),
requiredSchema = StructType.fromDDL("i INTEGER, d DOUBLE"),
filters = Seq(EqualTo("d", 3.14)),
expected = Seq(InternalRow(1, 3.14)))
expected = Some(InternalRow(1, 3.14)))

val errMsg = intercept[IllegalArgumentException] {
check(filters = Seq(EqualTo("invalid attr", 1)), expected = Seq())
check(filters = Seq(EqualTo("invalid attr", 1)), expected = None)
}.getMessage
assert(errMsg.contains("invalid attr does not exist"))

Expand All @@ -313,7 +313,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
dataSchema = new StructType(),
requiredSchema = new StructType(),
filters = Seq(EqualTo("i", 1)),
expected = Seq(InternalRow.empty))
expected = Some(InternalRow.empty))
}.getMessage
assert(errMsg2.contains("i does not exist"))
}
Expand Down