diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 32ff2c90bfa28..22fd69b2ce0da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -78,11 +78,11 @@ class UnivocityParser( new CsvParser(parserSetting) } - // Pre-allocated Seq to avoid the overhead of the seq builder. - private val requiredRow = Seq(new GenericInternalRow(requiredSchema.length)) + // Pre-allocated Some to avoid the overhead of building Some per each-row. + private val requiredRow = Some(new GenericInternalRow(requiredSchema.length)) // Pre-allocated empty sequence returned when the parsed row cannot pass filters. - // We preallocate it avoid unnecessary invokes of the seq builder. - private val noRows = Seq.empty[InternalRow] + // We preallocate it avoid unnecessary allocations. + private val noRows = None private val timestampFormatter = TimestampFormatter( options.timestampFormat, @@ -206,7 +206,7 @@ class UnivocityParser( private val doParse = if (options.columnPruning && requiredSchema.isEmpty) { // If `columnPruning` enabled and partition attributes scanned only, // `schema` gets empty. - (_: String) => Seq(InternalRow.empty) + (_: String) => Some(InternalRow.empty) } else { // parse if the columnPruning is disabled or requiredSchema is nonEmpty (input: String) => convert(tokenizer.parseLine(input)) @@ -216,7 +216,7 @@ class UnivocityParser( * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): Seq[InternalRow] = doParse(input) + def parse(input: String): Option[InternalRow] = doParse(input) private val getToken = if (options.columnPruning) { (tokens: Array[String], index: Int) => tokens(index) @@ -224,7 +224,7 @@ class UnivocityParser( (tokens: Array[String], index: Int) => tokens(tokenIndexArr(index)) } - private def convert(tokens: Array[String]): Seq[InternalRow] = { + private def convert(tokens: Array[String]): Option[InternalRow] = { if (tokens == null) { throw BadRecordException( () => getCurrentInput, @@ -251,7 +251,7 @@ class UnivocityParser( // 1. Convert the tokens that correspond to the required schema. // 2. Apply the pushdown filters to `requiredRow`. var i = 0 - val row = requiredRow.head + val row = requiredRow.get var skipRow = false while (i < requiredSchema.length) { try { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index ead26665bd6ea..4824b0c860cb4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -68,7 +68,7 @@ class JacksonParser( * to a value according to a desired schema. This is a wrapper for the method * `makeConverter()` to handle a row wrapped with an array. */ - private def makeRootConverter(dt: DataType): JsonParser => Seq[InternalRow] = { + private def makeRootConverter(dt: DataType): JsonParser => Iterable[InternalRow] = { dt match { case st: StructType => makeStructRootConverter(st) case mt: MapType => makeMapRootConverter(mt) @@ -76,11 +76,11 @@ class JacksonParser( } } - private def makeStructRootConverter(st: StructType): JsonParser => Seq[InternalRow] = { + private def makeStructRootConverter(st: StructType): JsonParser => Iterable[InternalRow] = { val elementConverter = makeConverter(st) val fieldConverters = st.map(_.dataType).map(makeConverter).toArray - (parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, st) { - case START_OBJECT => convertObject(parser, st, fieldConverters) :: Nil + (parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, st) { + case START_OBJECT => Some(convertObject(parser, st, fieldConverters)) // SPARK-3308: support reading top level JSON arrays and take every element // in such an array as a row // @@ -99,26 +99,26 @@ class JacksonParser( // Here, as we support reading top level JSON arrays and take every element // in such an array as a row, this case is possible. if (array.numElements() == 0) { - Nil + Array.empty[InternalRow] } else { - array.toArray[InternalRow](schema).toSeq + array.toArray[InternalRow](schema) } case START_ARRAY => throw new RuntimeException("Parsing JSON arrays as structs is forbidden.") } } - private def makeMapRootConverter(mt: MapType): JsonParser => Seq[InternalRow] = { + private def makeMapRootConverter(mt: MapType): JsonParser => Iterable[InternalRow] = { val fieldConverter = makeConverter(mt.valueType) - (parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, mt) { - case START_OBJECT => Seq(InternalRow(convertMap(parser, fieldConverter))) + (parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, mt) { + case START_OBJECT => Some(InternalRow(convertMap(parser, fieldConverter))) } } - private def makeArrayRootConverter(at: ArrayType): JsonParser => Seq[InternalRow] = { + private def makeArrayRootConverter(at: ArrayType): JsonParser => Iterable[InternalRow] = { val elemConverter = makeConverter(at.elementType) - (parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, at) { - case START_ARRAY => Seq(InternalRow(convertArray(parser, elemConverter))) + (parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, at) { + case START_ARRAY => Some(InternalRow(convertArray(parser, elemConverter))) case START_OBJECT if at.elementType.isInstanceOf[StructType] => // This handles the case when an input JSON object is a structure but // the specified schema is an array of structures. In that case, the input JSON is @@ -140,7 +140,7 @@ class JacksonParser( // val st = at.elementType.asInstanceOf[StructType] val fieldConverters = st.map(_.dataType).map(makeConverter).toArray - Seq(InternalRow(new GenericArrayData(Seq(convertObject(parser, st, fieldConverters))))) + Some(InternalRow(new GenericArrayData(Seq(convertObject(parser, st, fieldConverters))))) } } @@ -395,13 +395,13 @@ class JacksonParser( def parse[T]( record: T, createParser: (JsonFactory, T) => JsonParser, - recordLiteral: T => UTF8String): Seq[InternalRow] = { + recordLiteral: T => UTF8String): Iterable[InternalRow] = { try { Utils.tryWithResource(createParser(factory, record)) { parser => // a null first token is equivalent to testing for input.trim.isEmpty // but it works on any token stream and not just strings parser.nextToken() match { - case null => Nil + case null => None case _ => rootConverter.apply(parser) match { case null => throw new RuntimeException("Root converter returned null") case rows => rows diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index 361c8b29db33d..bc5e9be324bb2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( - rawParser: IN => Seq[InternalRow], + rawParser: IN => Iterable[InternalRow], mode: ParseMode, schema: StructType, columnNameOfCorruptRecord: String) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index bd4b2529f8b92..77a2ca7e4a828 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -276,7 +276,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { dataSchema: StructType = StructType.fromDDL("i INTEGER, s STRING"), requiredSchema: StructType = StructType.fromDDL("i INTEGER"), filters: Seq[Filter], - expected: Seq[InternalRow]): Unit = { + expected: Option[InternalRow]): Unit = { Seq(false, true).foreach { columnPruning => val options = new CSVOptions(Map.empty[String, String], columnPruning, "GMT") val parser = new UnivocityParser(dataSchema, requiredSchema, options, filters) @@ -285,26 +285,26 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { } } - check(filters = Seq(), expected = Seq(InternalRow(1))) - check(filters = Seq(EqualTo("i", 1)), expected = Seq(InternalRow(1))) - check(filters = Seq(EqualTo("i", 2)), expected = Seq()) + check(filters = Seq(), expected = Some(InternalRow(1))) + check(filters = Seq(EqualTo("i", 1)), expected = Some(InternalRow(1))) + check(filters = Seq(EqualTo("i", 2)), expected = None) check( requiredSchema = StructType.fromDDL("s STRING"), filters = Seq(StringStartsWith("s", "b")), - expected = Seq()) + expected = None) check( requiredSchema = StructType.fromDDL("i INTEGER, s STRING"), filters = Seq(StringStartsWith("s", "a")), - expected = Seq(InternalRow(1, UTF8String.fromString("a")))) + expected = Some(InternalRow(1, UTF8String.fromString("a")))) check( input = "1,a,3.14", dataSchema = StructType.fromDDL("i INTEGER, s STRING, d DOUBLE"), requiredSchema = StructType.fromDDL("i INTEGER, d DOUBLE"), filters = Seq(EqualTo("d", 3.14)), - expected = Seq(InternalRow(1, 3.14))) + expected = Some(InternalRow(1, 3.14))) val errMsg = intercept[IllegalArgumentException] { - check(filters = Seq(EqualTo("invalid attr", 1)), expected = Seq()) + check(filters = Seq(EqualTo("invalid attr", 1)), expected = None) }.getMessage assert(errMsg.contains("invalid attr does not exist")) @@ -313,7 +313,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { dataSchema = new StructType(), requiredSchema = new StructType(), filters = Seq(EqualTo("i", 1)), - expected = Seq(InternalRow.empty)) + expected = Some(InternalRow.empty)) }.getMessage assert(errMsg2.contains("i does not exist")) }