From 33671c8e4b26eb7f138d0f401ee61637306ea0cc Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 25 Jun 2018 15:17:58 +0900 Subject: [PATCH 1/8] Fix --- .../execution/datasources/csv/CSVFileFormat.scala | 13 ++++++++++++- .../sql/execution/datasources/csv/CSVSuite.scala | 12 ++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index fa366ccce6b61..606b194f28baa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ @@ -131,6 +132,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { ) } val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + val columnPruning = sparkSession.sessionState.conf.csvColumnPruning (file: PartitionedFile) => { val conf = broadcastedHadoopConf.value.value @@ -138,13 +140,22 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), parsedOptions) - CSVDataSource(parsedOptions).readFile( + val inputRows = CSVDataSource(parsedOptions).readFile( conf, file, parser, requiredSchema, dataSchema, caseSensitive) + + if (columnPruning) { + inputRows + } else { + val inputAttrs = dataSchema.toAttributes + val outputAttrs = requiredSchema.map(dataSchema.indexOf).map(inputAttrs) + val outputProjection = GenerateUnsafeProjection.generate(outputAttrs, inputAttrs) + inputRows.map(outputProjection) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 84b91f6309fe8..f2d8a0263bfaa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1579,4 +1579,16 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } } } + + test("SPARK-24676 project required data from parsed data when columnPruning disabled") { + withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false") { + withTempPath { path => + val dir = path.getAbsolutePath + spark.range(10).selectExpr("id % 2 AS p", "id AS c0", "id AS c1").write.partitionBy("p") + .option("header", "true").csv(dir) + val df = spark.read.option("header", "true").csv(dir).selectExpr("sum(p)", "avg(c0)") + checkAnswer(df, Row(5, 4.5)) + } + } + } } From 83ed0823fbd4219c5462f0991509b08507ee10c6 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 29 Jun 2018 10:47:24 +0900 Subject: [PATCH 2/8] Fix --- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f2d8a0263bfaa..c1f2b79ed42fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1586,8 +1586,12 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val dir = path.getAbsolutePath spark.range(10).selectExpr("id % 2 AS p", "id AS c0", "id AS c1").write.partitionBy("p") .option("header", "true").csv(dir) - val df = spark.read.option("header", "true").csv(dir).selectExpr("sum(p)", "avg(c0)") - checkAnswer(df, Row(5, 4.5)) + var df = spark.read.option("header", true).csv(dir).selectExpr("sum(p)", "count(c0)") + checkAnswer(df, Row(5, 10)) + + // empty required column case + df = spark.read.option("header", true).csv(dir).selectExpr("sum(p)") + checkAnswer(df, Row(5)) } } } From 2d11e5fdadb006aaa9b9bc7cfa6a6b39e4ca8f6c Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 4 Jul 2018 11:14:28 +0900 Subject: [PATCH 3/8] Fix --- .../datasources/csv/CSVFileFormat.scala | 13 +------ .../datasources/csv/UnivocityParser.scala | 34 +++++++++++++------ 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 606b194f28baa..fa366ccce6b61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ @@ -132,7 +131,6 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { ) } val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis - val columnPruning = sparkSession.sessionState.conf.csvColumnPruning (file: PartitionedFile) => { val conf = broadcastedHadoopConf.value.value @@ -140,22 +138,13 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), parsedOptions) - val inputRows = CSVDataSource(parsedOptions).readFile( + CSVDataSource(parsedOptions).readFile( conf, file, parser, requiredSchema, dataSchema, caseSensitive) - - if (columnPruning) { - inputRows - } else { - val inputAttrs = dataSchema.toAttributes - val outputAttrs = requiredSchema.map(dataSchema.indexOf).map(inputAttrs) - val outputProjection = GenerateUnsafeProjection.generate(outputAttrs, inputAttrs) - inputRows.map(outputProjection) - } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index aa545e1a0c00a..fcbf02f74c173 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -38,7 +38,7 @@ class UnivocityParser( requiredSchema: StructType, val options: CSVOptions) extends Logging { require(requiredSchema.toSet.subsetOf(dataSchema.toSet), - "requiredSchema should be the subset of schema.") + "requiredSchema should be the subset of dataSchema.") def this(schema: StructType, options: CSVOptions) = this(schema, schema, options) @@ -53,9 +53,10 @@ class UnivocityParser( } new CsvParser(parserSetting) } - private val schema = if (options.columnPruning) requiredSchema else dataSchema - private val row = new GenericInternalRow(schema.length) + private val parsedSchema = if (options.columnPruning) requiredSchema else dataSchema + + private val row = new GenericInternalRow(requiredSchema.length) // Retrieve the raw record string. private def getCurrentInput: UTF8String = { @@ -82,7 +83,12 @@ class UnivocityParser( // // output row - ["A", 2] private val valueConverters: Array[ValueConverter] = { - schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray + requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray + } + + // If `columnPruning` disabled, this index is used to reorder parsed tokens + private lazy val tokenIndexArr: Array[Int] = { + requiredSchema.map(f => dataSchema.indexOf(f)).toArray } /** @@ -183,7 +189,7 @@ class UnivocityParser( } } - private val doParse = if (schema.nonEmpty) { + private val doParse = if (requiredSchema.nonEmpty) { (input: String) => convert(tokenizer.parseLine(input)) } else { // If `columnPruning` enabled and partition attributes scanned only, @@ -197,15 +203,21 @@ class UnivocityParser( */ def parse(input: String): InternalRow = doParse(input) + private val getToken = if (options.columnPruning) { + (tokens: Array[String], index: Int) => tokens(index) + } else { + (tokens: Array[String], index: Int) => tokens(tokenIndexArr(index)) + } + private def convert(tokens: Array[String]): InternalRow = { - if (tokens.length != schema.length) { + if (tokens.length != parsedSchema.length) { // If the number of tokens doesn't match the schema, we should treat it as a malformed record. // However, we still have chance to parse some of the tokens, by adding extra null tokens in // the tail if the number is smaller, or by dropping extra tokens if the number is larger. - val checkedTokens = if (schema.length > tokens.length) { - tokens ++ new Array[String](schema.length - tokens.length) + val checkedTokens = if (parsedSchema.length > tokens.length) { + tokens ++ new Array[String](parsedSchema.length - tokens.length) } else { - tokens.take(schema.length) + tokens.take(parsedSchema.length) } def getPartialResult(): Option[InternalRow] = { try { @@ -223,8 +235,8 @@ class UnivocityParser( } else { try { var i = 0 - while (i < schema.length) { - row(i) = valueConverters(i).apply(tokens(i)) + while (i < requiredSchema.length) { + row(i) = valueConverters(i).apply(getToken(tokens, i)) i += 1 } row From 12906d80242d68ad3848fefa13911d96c14a2638 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 5 Jul 2018 08:37:07 +0900 Subject: [PATCH 4/8] Fix --- .../execution/datasources/csv/UnivocityParser.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index fcbf02f74c173..eb3a5f1c23ea8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -45,10 +45,13 @@ class UnivocityParser( // A `ValueConverter` is responsible for converting the given value to a desired type. private type ValueConverter = String => Any + // This index is used to reorder parsed tokens + private val tokenIndexArr = + requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f))) + val tokenizer = { val parserSetting = options.asParserSettings if (options.columnPruning && requiredSchema.length < dataSchema.length) { - val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f))) parserSetting.selectIndexes(tokenIndexArr: _*) } new CsvParser(parserSetting) @@ -86,11 +89,6 @@ class UnivocityParser( requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray } - // If `columnPruning` disabled, this index is used to reorder parsed tokens - private lazy val tokenIndexArr: Array[Int] = { - requiredSchema.map(f => dataSchema.indexOf(f)).toArray - } - /** * Create a converter which converts the string value to a value according to a desired type. * Currently, we do not support complex types (`ArrayType`, `MapType`, `StructType`). From d5921f08d8efa00f64f01d005e843291568c1e80 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sat, 7 Jul 2018 14:13:28 +0900 Subject: [PATCH 5/8] Fix --- .../spark/sql/execution/datasources/csv/UnivocityParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index eb3a5f1c23ea8..c13b92ee6541d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -47,7 +47,7 @@ class UnivocityParser( // This index is used to reorder parsed tokens private val tokenIndexArr = - requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f))) + requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f))).toArray val tokenizer = { val parserSetting = options.asParserSettings From dd5bb595fe282035e0cf8996cfea15724fe0f674 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 11 Jul 2018 10:06:40 +0900 Subject: [PATCH 6/8] Fix --- .../spark/sql/execution/datasources/csv/UnivocityParser.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index c13b92ee6541d..2fa34c1ec249f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -38,7 +38,8 @@ class UnivocityParser( requiredSchema: StructType, val options: CSVOptions) extends Logging { require(requiredSchema.toSet.subsetOf(dataSchema.toSet), - "requiredSchema should be the subset of dataSchema.") + s"requiredSchema (${requiredSchema.catalogString}) should be the subset of " + + s"dataSchema (${dataSchema.catalogString}).") def this(schema: StructType, options: CSVOptions) = this(schema, schema, options) From 6722fba7814dcf62cc0a4bd063a3d05296f21d5c Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 15 Jul 2018 11:01:59 +0900 Subject: [PATCH 7/8] Fix --- .../datasources/csv/UnivocityParser.scala | 15 ++++++++++++++ .../execution/datasources/csv/CSVSuite.scala | 20 +++++++++++++++---- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 2fa34c1ec249f..e40100126ee88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -33,6 +33,14 @@ import org.apache.spark.sql.execution.datasources.FailureSafeParser import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String + +/** + * XXX. + * + * @param dataSchema + * @param requiredSchema + * @param options + */ class UnivocityParser( dataSchema: StructType, requiredSchema: StructType, @@ -52,12 +60,17 @@ class UnivocityParser( val tokenizer = { val parserSetting = options.asParserSettings + // When to-be-parsed schema is shorter than the to-be-read data schema, we let Univocity CSV + // parser select a sequence of fields for reading by their positions. if (options.columnPruning && requiredSchema.length < dataSchema.length) { + // if (parsedSchema.length < dataSchema.length) { parserSetting.selectIndexes(tokenIndexArr: _*) } new CsvParser(parserSetting) } + // When column pruning is enabled, the parser only parses the required columns based on + // their positions in the data schema. private val parsedSchema = if (options.columnPruning) requiredSchema else dataSchema private val row = new GenericInternalRow(requiredSchema.length) @@ -233,6 +246,8 @@ class UnivocityParser( new RuntimeException("Malformed CSV record")) } else { try { + // When the length of the returned tokens is identical to the length of the parsed schema, + // we just need to convert the tokens that correspond to the required columns. var i = 0 while (i < requiredSchema.length) { row(i) = valueConverters(i).apply(getToken(tokens, i)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index c1f2b79ed42fc..bcf1a8b0208a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1586,12 +1586,24 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val dir = path.getAbsolutePath spark.range(10).selectExpr("id % 2 AS p", "id AS c0", "id AS c1").write.partitionBy("p") .option("header", "true").csv(dir) - var df = spark.read.option("header", true).csv(dir).selectExpr("sum(p)", "count(c0)") - checkAnswer(df, Row(5, 10)) + val df1 = spark.read.option("header", true).csv(dir).selectExpr("sum(p)", "count(c0)") + checkAnswer(df1, Row(5, 10)) // empty required column case - df = spark.read.option("header", true).csv(dir).selectExpr("sum(p)") - checkAnswer(df, Row(5)) + val df2 = spark.read.option("header", true).csv(dir).selectExpr("sum(p)") + checkAnswer(df2, Row(5)) + } + + // the case where tokens length != parsedSchema length + withTempPath { path => + val dir = path.getAbsolutePath + Seq("1,2").toDF().write.text(dir) + // more tokens + val df1 = spark.read.schema("c0 int").format("csv").load(dir) + checkAnswer(df1, Row()) + // less tokens + val df2 = spark.read.schema("c0 int, c1 int, c2 int").format("csv").load(dir) + checkAnswer(df2, Row()) } } } From 81b397140486fab7f7c2f7dcb15d5a9a62c99845 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 15 Jul 2018 15:52:54 +0900 Subject: [PATCH 8/8] Fix --- .../datasources/csv/UnivocityParser.scala | 22 ++++++++++--------- .../execution/datasources/csv/CSVSuite.scala | 9 ++++---- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index e40100126ee88..79143cce4a380 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -35,11 +35,13 @@ import org.apache.spark.unsafe.types.UTF8String /** - * XXX. + * Constructs a parser for a given schema that translates CSV data to an [[InternalRow]]. * - * @param dataSchema - * @param requiredSchema - * @param options + * @param dataSchema The CSV data schema that is specified by the user, or inferred from underlying + * data files. + * @param requiredSchema The schema of the data that should be output for each row. This should be a + * subset of the columns in dataSchema. + * @param options Configuration options for a CSV parser. */ class UnivocityParser( dataSchema: StructType, @@ -58,21 +60,21 @@ class UnivocityParser( private val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f))).toArray + // When column pruning is enabled, the parser only parses the required columns based on + // their positions in the data schema. + private val parsedSchema = if (options.columnPruning) requiredSchema else dataSchema + val tokenizer = { val parserSetting = options.asParserSettings // When to-be-parsed schema is shorter than the to-be-read data schema, we let Univocity CSV // parser select a sequence of fields for reading by their positions. - if (options.columnPruning && requiredSchema.length < dataSchema.length) { - // if (parsedSchema.length < dataSchema.length) { + // if (options.columnPruning && requiredSchema.length < dataSchema.length) { + if (parsedSchema.length < dataSchema.length) { parserSetting.selectIndexes(tokenIndexArr: _*) } new CsvParser(parserSetting) } - // When column pruning is enabled, the parser only parses the required columns based on - // their positions in the data schema. - private val parsedSchema = if (options.columnPruning) requiredSchema else dataSchema - private val row = new GenericInternalRow(requiredSchema.length) // Retrieve the raw record string. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index bcf1a8b0208a0..ae8110fdf1709 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1599,11 +1599,12 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val dir = path.getAbsolutePath Seq("1,2").toDF().write.text(dir) // more tokens - val df1 = spark.read.schema("c0 int").format("csv").load(dir) - checkAnswer(df1, Row()) + val df1 = spark.read.schema("c0 int").format("csv").option("mode", "permissive").load(dir) + checkAnswer(df1, Row(1)) // less tokens - val df2 = spark.read.schema("c0 int, c1 int, c2 int").format("csv").load(dir) - checkAnswer(df2, Row()) + val df2 = spark.read.schema("c0 int, c1 int, c2 int").format("csv") + .option("mode", "permissive").load(dir) + checkAnswer(df2, Row(1, 2, null)) } } }