Skip to content
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1814,6 +1814,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
- In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround.
- In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files.
- Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
- Since Spark 2.4, handling of malformed rows in CSV files was changed. Previously, all column values of every row are parsed independently of its future usage. A row was considered as malformed if the CSV parser wasn't able to handle any column value in the row even if the value wasn't requested. Starting from version 2.4, only requested column values are parsed, and other values can be ignored. In such way, correct column values that were considered as malformed in previous Spark version only because of other malformed values become correct in Spark version 2.4.
Copy link
Member

@HyukjinKwon HyukjinKwon May 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add some more examples? For example, I guess now df.count() with dropmalformed give a different number too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we follow the style of other migration guides?

In version 2.3 and earlier, xxxx. Since Spark 2.4, xxxx. As an example, xxxx. (and talk about the flag to restore the previous behavior)


## Upgrading From Spark SQL 2.2 to 2.3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ class UnivocityParser(
// A `ValueConverter` is responsible for converting the given value to a desired type.
private type ValueConverter = String => Any

private val tokenizer = new CsvParser(options.asParserSettings)
private val tokenizer = {
val parserSetting = options.asParserSettings
if (requiredSchema.length < schema.length) {
val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(schema.indexOf(f)))
parserSetting.selectIndexes(tokenIndexArr: _*)
}
new CsvParser(parserSetting)
}

private val row = new GenericInternalRow(requiredSchema.length)

Expand Down Expand Up @@ -73,11 +80,8 @@ class UnivocityParser(
// Each input token is placed in each output row's position by mapping these. In this case,
//
// output row - ["A", 2]
private val valueConverters: Array[ValueConverter] =
schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray

private val tokenIndexArr: Array[Int] = {
requiredSchema.map(f => schema.indexOf(f)).toArray
private val valueConverters: Array[ValueConverter] = {
requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
}

/**
Expand Down Expand Up @@ -185,14 +189,14 @@ class UnivocityParser(
def parse(input: String): InternalRow = convert(tokenizer.parseLine(input))

private def convert(tokens: Array[String]): InternalRow = {
if (tokens.length != schema.length) {
if (tokens.length != requiredSchema.length) {
// If the number of tokens doesn't match the schema, we should treat it as a malformed record.
// However, we still have chance to parse some of the tokens, by adding extra null tokens in
// the tail if the number is smaller, or by dropping extra tokens if the number is larger.
val checkedTokens = if (schema.length > tokens.length) {
tokens ++ new Array[String](schema.length - tokens.length)
val checkedTokens = if (requiredSchema.length > tokens.length) {
tokens ++ new Array[String](requiredSchema.length - tokens.length)
} else {
tokens.take(schema.length)
tokens.take(requiredSchema.length)
}
def getPartialResult(): Option[InternalRow] = {
try {
Expand All @@ -211,8 +215,7 @@ class UnivocityParser(
try {
var i = 0
while (i < requiredSchema.length) {
val from = tokenIndexArr(i)
row(i) = valueConverters(from).apply(tokens(from))
row(i) = valueConverters(i).apply(tokens(i))
i += 1
}
row
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.csv

import java.io.File

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, Row, SparkSession}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types._
import org.apache.spark.util.{Benchmark, Utils}

/**
* Benchmark to measure CSV read/write performance.
* To run this:
* spark-submit --class <this class> --jars <spark sql test jar>
*/
object CSVBenchmarks {
val conf = new SparkConf()

val spark = SparkSession.builder
.master("local[1]")
.appName("benchmark-csv-datasource")
.config(conf)
.getOrCreate()
import spark.implicits._

def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}

def multiColumnsBenchmark(rowsNum: Int): Unit = {
val colsNum = 1000
val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum)

withTempPath { path =>
val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
val schema = StructType(fields)
val values = (0 until colsNum).map(i => i.toString).mkString(",")
val columnNames = schema.fieldNames

spark.range(rowsNum)
.select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*)
.write.option("header", true)
.csv(path.getAbsolutePath)

val ds = spark.read.schema(schema).csv(path.getAbsolutePath)

benchmark.addCase(s"Select $colsNum columns", 3) { _ =>
ds.select("*").filter((row: Row) => true).count()
}
val cols100 = columnNames.take(100).map(Column(_))
benchmark.addCase(s"Select 100 columns", 3) { _ =>
ds.select(cols100: _*).filter((row: Row) => true).count()
}
benchmark.addCase(s"Select one column", 3) { _ =>
ds.select($"col1").filter((row: Row) => true).count()
}

/*
Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------
Select 1000 columns 76910 / 78065 0.0 76909.8 1.0X
Select 100 columns 28625 / 32884 0.0 28625.1 2.7X
Select one column 22498 / 22669 0.0 22497.8 3.4X
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think we are already doing the column pruning by avoiding casting cost which is relatively expensive comparing to the parsing logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, avoiding unnecessary casting speeds up more than 2 times. We can see that on this benchmark before my changes. Without the changes, selecting only one string column takes 44.5 seconds but select of all columns ~80 seconds.

... relatively expensive comparing to the parsing logic.

As the benchmark shows we can achieve performance improvements in parsing too. Selecting only 1 out of 1000 columns takes 22.5 seconds but without the PR it takes 44.5:
8809cec

*/
benchmark.run()
}
}

def main(args: Array[String]): Unit = {
multiColumnsBenchmark(rowsNum = 1000 * 1000)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
.options(Map("header" -> "true", "mode" -> "dropmalformed"))
.load(testFile(carsFile))

assert(cars.select("year").collect().size === 2)
assert(cars.collect().size === 2)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cars.csv file has header with 5 columns:

year,make,model,comment,blank

and 2 rows with 4 valid columns and the last one is blank:

"2012","Tesla","S","No comment",
1997,Ford,E350,"Go get one now they are going fast",

and one more row with only with 3 columns:

2015,Chevy,Volt

Previous (current) implementation drops the last row in the dropmalformed mode because it parses whole rows, and the last one is incorrect. If only the year column is selected, uniVocity parser returns values for first column (with index 0) and doesn't analyze correctness of the rest part of the rows. So in this way cars.select("year").collect().size returns 3

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes behaviour and it's intendedly parsed to keep the backword compatibility. There was an issue about the different number of counts. I think you are basically saying cars.select("year").collect().size and cars.collect().size are different and they are correct, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's intendedly parsed to keep the backword compatibility.

Right, by selecting all columns I force UnivocityParser to fall to the case:
https://github.com/MaxGekk/spark-1/blob/a4a0a549156a15011c33c7877a35f244d75b7a4f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala#L193-L213
when number of returned tokens are less than required.

In the case of cars.select("year"), uniVocity parser returns only one token as it is expected.

There was an issue about the different number of counts.

The PR changes behavior for some malformed inputs but I believe we could provide better performance for users who have correct inputs.

I think you are basically saying cars.select("year").collect().size and cars.collect().size are different and they are correct, right?

Yes, you can say that. You are right it seems the PR proposes another interpretation for malformed rows. cars.select("year") is:

+----+
|year|
+----+
|2012|
|1997|
|2015|
+----+

and we should not reject 2015 only because there are problems in not requested columns. In this particular case, the last row consists of only one value at 0 position and it is correct.

}
}

Expand Down Expand Up @@ -1322,4 +1322,31 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
val sampled = spark.read.option("inferSchema", true).option("samplingRatio", 1.0).csv(ds)
assert(sampled.count() == ds.count())
}

test("SPARK-24244: Select a subset of all columns") {
withTempPath { path =>
import collection.JavaConverters._
val schema = new StructType()
.add("f1", IntegerType).add("f2", IntegerType).add("f3", IntegerType)
.add("f4", IntegerType).add("f5", IntegerType).add("f6", IntegerType)
.add("f7", IntegerType).add("f8", IntegerType).add("f9", IntegerType)
.add("f10", IntegerType).add("f11", IntegerType).add("f12", IntegerType)
.add("f13", IntegerType).add("f14", IntegerType).add("f15", IntegerType)

val odf = spark.createDataFrame(List(
Row(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15),
Row(-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15)
).asJava, schema)
odf.write.csv(path.getCanonicalPath)
val idf = spark.read
.schema(schema)
.csv(path.getCanonicalPath)
.select('f15, 'f10, 'f5)

checkAnswer(
idf,
List(Row(15, 10, 5), Row(-15, -10, -5))
)
}
}
}