Skip to content
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@ class UnivocityParser(
// A `ValueConverter` is responsible for converting the given value to a desired type.
private type ValueConverter = String => Any

private val tokenizer = new CsvParser(options.asParserSettings)

private val row = new GenericInternalRow(requiredSchema.length)

// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
}

// This parser first picks some tokens from the input tokens, according to the required schema,
// then parse these tokens and put the values in a row, with the order specified by the required
// schema.
Expand All @@ -73,11 +64,24 @@ class UnivocityParser(
// Each input token is placed in each output row's position by mapping these. In this case,
//
// output row - ["A", 2]
private val valueConverters: Array[ValueConverter] =
schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
private val valueConverters: Array[ValueConverter] = {
requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
}

private val tokenIndexArr: Array[Int] = {
requiredSchema.map(f => schema.indexOf(f)).toArray
private val tokenizer = {
val parserSetting = options.asParserSettings
if (requiredSchema.length < schema.length) {
val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(schema.indexOf(f)))
parserSetting.selectIndexes(tokenIndexArr: _*)
Copy link
Member

@HyukjinKwon HyukjinKwon May 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I tried this locally a while ago but I didn't submit a PR since the improvement was trivial and a test was broken fwiw.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the changes as an experiment because some of our clients have many columns (> 200 columns) in their input CSV files. The experiment shows that the improvements can significantly impact on total execution time.

}
new CsvParser(parserSetting)
}

private val row = new GenericInternalRow(requiredSchema.length)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we don't need to move this down.


// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
}

/**
Expand Down Expand Up @@ -185,14 +189,14 @@ class UnivocityParser(
def parse(input: String): InternalRow = convert(tokenizer.parseLine(input))

private def convert(tokens: Array[String]): InternalRow = {
if (tokens.length != schema.length) {
if (tokens.length != requiredSchema.length) {
// If the number of tokens doesn't match the schema, we should treat it as a malformed record.
// However, we still have chance to parse some of the tokens, by adding extra null tokens in
// the tail if the number is smaller, or by dropping extra tokens if the number is larger.
val checkedTokens = if (schema.length > tokens.length) {
tokens ++ new Array[String](schema.length - tokens.length)
val checkedTokens = if (requiredSchema.length > tokens.length) {
tokens ++ new Array[String](requiredSchema.length - tokens.length)
} else {
tokens.take(schema.length)
tokens.take(requiredSchema.length)
}
def getPartialResult(): Option[InternalRow] = {
try {
Expand All @@ -211,8 +215,7 @@ class UnivocityParser(
try {
var i = 0
while (i < requiredSchema.length) {
val from = tokenIndexArr(i)
row(i) = valueConverters(from).apply(tokens(from))
row(i) = valueConverters(i).apply(tokens(i))
i += 1
}
row
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.csv

import java.io.File

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, Row, SparkSession}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types._
import org.apache.spark.util.{Benchmark, Utils}

/**
* Benchmark to measure CSV read/write performance.
* To run this:
* spark-submit --class <this class> --jars <spark sql test jar>
*/
object CSVBenchmarks {
val conf = new SparkConf()

val spark = SparkSession.builder
.master("local[1]")
.appName("benchmark-csv-datasource")
.config(conf)
.getOrCreate()
import spark.implicits._

def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}

def multiColumnsBenchmark(rowsNum: Int): Unit = {
val colsNum = 1000
val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum)

withTempPath { path =>
val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
val schema = StructType(fields)
val values = (0 until colsNum).map(i => i.toString).mkString(",")
val columnNames = schema.fieldNames

spark.range(rowsNum)
.select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*)
.write.option("header", true)
.csv(path.getAbsolutePath)

val ds = spark.read.schema(schema).csv(path.getAbsolutePath)

benchmark.addCase(s"Select $colsNum columns", 3) { _ =>
ds.select("*").filter((row: Row) => true).count()
}
val cols100 = columnNames.take(100).map(Column(_))
benchmark.addCase(s"Select 100 columns", 3) { _ =>
ds.select(cols100: _*).filter((row: Row) => true).count()
}
benchmark.addCase(s"Select one column", 3) { _ =>
ds.select($"col1").filter((row: Row) => true).count()
}

/*
Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------
Select 1000 columns 76910 / 78065 0.0 76909.8 1.0X
Select 100 columns 28625 / 32884 0.0 28625.1 2.7X
Select one column 22498 / 22669 0.0 22497.8 3.4X
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think we are already doing the column pruning by avoiding casting cost which is relatively expensive comparing to the parsing logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, avoiding unnecessary casting speeds up more than 2 times. We can see that on this benchmark before my changes. Without the changes, selecting only one string column takes 44.5 seconds but select of all columns ~80 seconds.

... relatively expensive comparing to the parsing logic.

As the benchmark shows we can achieve performance improvements in parsing too. Selecting only 1 out of 1000 columns takes 22.5 seconds but without the PR it takes 44.5:
8809cec

*/
benchmark.run()
}
}

def main(args: Array[String]): Unit = {
multiColumnsBenchmark(rowsNum = 1000 * 1000)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
.options(Map("header" -> "true", "mode" -> "dropmalformed"))
.load(testFile(carsFile))

assert(cars.select("year").collect().size === 2)
assert(cars.collect().size === 2)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cars.csv file has header with 5 columns:

year,make,model,comment,blank

and 2 rows with 4 valid columns and the last one is blank:

"2012","Tesla","S","No comment",
1997,Ford,E350,"Go get one now they are going fast",

and one more row with only with 3 columns:

2015,Chevy,Volt

Previous (current) implementation drops the last row in the dropmalformed mode because it parses whole rows, and the last one is incorrect. If only the year column is selected, uniVocity parser returns values for first column (with index 0) and doesn't analyze correctness of the rest part of the rows. So in this way cars.select("year").collect().size returns 3

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes behaviour and it's intendedly parsed to keep the backword compatibility. There was an issue about the different number of counts. I think you are basically saying cars.select("year").collect().size and cars.collect().size are different and they are correct, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's intendedly parsed to keep the backword compatibility.

Right, by selecting all columns I force UnivocityParser to fall to the case:
https://github.com/MaxGekk/spark-1/blob/a4a0a549156a15011c33c7877a35f244d75b7a4f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala#L193-L213
when number of returned tokens are less than required.

In the case of cars.select("year"), uniVocity parser returns only one token as it is expected.

There was an issue about the different number of counts.

The PR changes behavior for some malformed inputs but I believe we could provide better performance for users who have correct inputs.

I think you are basically saying cars.select("year").collect().size and cars.collect().size are different and they are correct, right?

Yes, you can say that. You are right it seems the PR proposes another interpretation for malformed rows. cars.select("year") is:

+----+
|year|
+----+
|2012|
|1997|
|2015|
+----+

and we should not reject 2015 only because there are problems in not requested columns. In this particular case, the last row consists of only one value at 0 position and it is correct.

}
}

Expand Down Expand Up @@ -1322,4 +1322,31 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
val sampled = spark.read.option("inferSchema", true).option("samplingRatio", 1.0).csv(ds)
assert(sampled.count() == ds.count())
}

test("SPARK-24244: Select a little of many columns") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the test to check that requesting only subset of all columns works correctly. And to check the case when ordering of fields in required schema is different from the data schema. Previously I had a concern that if I select columns in different order like select('f15, 'f10, 'f5), I will get the required schema with the same field order. It seems the required schema has the same order as data schema. That's why I removed https://github.com/apache/spark/pull/21296/files/a4a0a549156a15011c33c7877a35f244d75b7a4f#diff-d19881aceddcaa5c60620fdcda99b4c4L214

withTempPath { path =>
import collection.JavaConverters._
val schema = new StructType()
.add("f1", IntegerType).add("f2", IntegerType).add("f3", IntegerType)
.add("f4", IntegerType).add("f5", IntegerType).add("f6", IntegerType)
.add("f7", IntegerType).add("f8", IntegerType).add("f9", IntegerType)
.add("f10", IntegerType).add("f11", IntegerType).add("f12", IntegerType)
.add("f13", IntegerType).add("f14", IntegerType).add("f15", IntegerType)

val odf = spark.createDataFrame(List(
Row(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15),
Row(-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15)
).asJava, schema)
odf.write.csv(path.getCanonicalPath)
val idf = spark.read
.schema(schema)
.csv(path.getCanonicalPath)
.select('f15, 'f10, 'f5)

checkAnswer(
idf,
List(Row(15, 10, 5), Row(-15, -10, -5))
)
}
}
}