From dcd9ac45673af31e59dcfb633a2b87f76f2bee03 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Thu, 16 Aug 2018 11:35:16 -0400 Subject: [PATCH 1/6] if csv column-pruning is turned on header should be checked with requiredSchema not dataSchema --- .../datasources/csv/CSVDataSource.scala | 13 ++++--- .../datasources/csv/CSVFileFormat.scala | 4 +- .../execution/datasources/csv/CSVSuite.scala | 38 +++++++++++++++++++ 3 files changed, 49 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index b7b46c7c86a2..d1e4bb22b80e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -54,7 +54,8 @@ abstract class CSVDataSource extends Serializable { requiredSchema: StructType, // Actual schema of data in the csv file dataSchema: StructType, - caseSensitive: Boolean): Iterator[InternalRow] + caseSensitive: Boolean, + columnPruning: Boolean): Iterator[InternalRow] /** * Infers the schema from `inputPaths` files. @@ -211,7 +212,8 @@ object TextInputCSVDataSource extends CSVDataSource { parser: UnivocityParser, requiredSchema: StructType, dataSchema: StructType, - caseSensitive: Boolean): Iterator[InternalRow] = { + caseSensitive: Boolean, + columnPruning: Boolean): Iterator[InternalRow] = { val lines = { val linesReader = new HadoopFileLinesReader(file, conf) Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close())) @@ -230,7 +232,7 @@ object TextInputCSVDataSource extends CSVDataSource { CSVDataSource.checkHeader( header, parser.tokenizer, - dataSchema, + if (columnPruning) requiredSchema else dataSchema, file.filePath, parser.options.enforceSchema, caseSensitive) @@ -308,10 +310,11 @@ object MultiLineCSVDataSource extends CSVDataSource { parser: UnivocityParser, requiredSchema: StructType, dataSchema: StructType, - caseSensitive: Boolean): Iterator[InternalRow] = { + caseSensitive: Boolean, + columnPruning: Boolean): Iterator[InternalRow] = { def checkHeader(header: Array[String]): Unit = { CSVDataSource.checkHeaderColumnNames( - dataSchema, + if (columnPruning) requiredSchema else dataSchema, header, file.filePath, parser.options.enforceSchema, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index d59b9820bdee..9aad0bd55e73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -131,6 +131,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { ) } val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + val columnPruning = sparkSession.sessionState.conf.csvColumnPruning (file: PartitionedFile) => { val conf = broadcastedHadoopConf.value.value @@ -144,7 +145,8 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { parser, requiredSchema, dataSchema, - caseSensitive) + caseSensitive, + columnPruning) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 456b4535a0dc..f555085f34f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1603,6 +1603,44 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te .exists(msg => msg.getRenderedMessage.contains("CSV header does not conform to the schema"))) } + test("SPARK-23786: check header on parsing of dataset with projection and column pruning") { + withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "true") { + withTempPath { path => + val dir = path.getAbsolutePath + Seq(("a", "b")).toDF("columnA", "columnB").write + .format("csv") + .option("header", true) + .save(dir) + checkAnswer(spark.read + .format("csv") + .option("header", true) + .option("enforceSchema", false) + .load(dir) + .select("columnA"), + Row("a")) + } + } + } + + test("SPARK-23786: check header on parsing of dataset with projection and no column pruning") { + withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false") { + withTempPath { path => + val dir = path.getAbsolutePath + Seq(("a", "b")).toDF("columnA", "columnB").write + .format("csv") + .option("header", true) + .save(dir) + checkAnswer(spark.read + .format("csv") + .option("header", true) + .option("enforceSchema", false) + .load(dir) + .select("columnA"), + Row("a")) + } + } + } + test("SPARK-24645 skip parsing when columnPruning enabled and partitions scanned only") { withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "true") { withTempPath { path => From c4179a9f0a85b412178323e6cb881385fa644051 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Thu, 16 Aug 2018 11:52:02 -0400 Subject: [PATCH 2/6] update jira reference in unit test --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f555085f34f1..d95bc04f57b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1603,7 +1603,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te .exists(msg => msg.getRenderedMessage.contains("CSV header does not conform to the schema"))) } - test("SPARK-23786: check header on parsing of dataset with projection and column pruning") { + test("SPARK-25134: check header on parsing of dataset with projection and column pruning") { withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "true") { withTempPath { path => val dir = path.getAbsolutePath @@ -1622,7 +1622,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } } - test("SPARK-23786: check header on parsing of dataset with projection and no column pruning") { + test("SPARK-25134: check header on parsing of dataset with projection and no column pruning") { withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false") { withTempPath { path => val dir = path.getAbsolutePath From 09c986c7e9586346255ba7631db83f2f88fe1625 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Fri, 17 Aug 2018 00:14:27 -0400 Subject: [PATCH 3/6] remove test for check header and projection with column pruning disabled --- .../execution/datasources/csv/CSVSuite.scala | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index d95bc04f57b7..9db50c7a6ff1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1622,25 +1622,6 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } } - test("SPARK-25134: check header on parsing of dataset with projection and no column pruning") { - withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false") { - withTempPath { path => - val dir = path.getAbsolutePath - Seq(("a", "b")).toDF("columnA", "columnB").write - .format("csv") - .option("header", true) - .save(dir) - checkAnswer(spark.read - .format("csv") - .option("header", true) - .option("enforceSchema", false) - .load(dir) - .select("columnA"), - Row("a")) - } - } - } - test("SPARK-24645 skip parsing when columnPruning enabled and partitions scanned only") { withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "true") { withTempPath { path => From cd18ed282966ba6031973f895266ba3bb1297ff6 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Sun, 19 Aug 2018 09:36:55 -0400 Subject: [PATCH 4/6] also check multiLine codepath and selecting empty schema with count --- .../execution/datasources/csv/CSVSuite.scala | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 9db50c7a6ff1..8f3d89289653 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1605,19 +1605,33 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te test("SPARK-25134: check header on parsing of dataset with projection and column pruning") { withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "true") { - withTempPath { path => - val dir = path.getAbsolutePath - Seq(("a", "b")).toDF("columnA", "columnB").write - .format("csv") - .option("header", true) - .save(dir) - checkAnswer(spark.read - .format("csv") - .option("header", true) - .option("enforceSchema", false) - .load(dir) - .select("columnA"), - Row("a")) + Seq(false, true).foreach { multiLine => + withTempPath { path => + val dir = path.getAbsolutePath + Seq(("a", "b")).toDF("columnA", "columnB").write + .format("csv") + .option("header", true) + .save(dir) + + // schema with one column + checkAnswer(spark.read + .format("csv") + .option("header", true) + .option("enforceSchema", false) + .option("multiLine", multiLine) + .load(dir) + .select("columnA"), + Row("a")) + + // empty schema + assert(spark.read + .format("csv") + .option("header", true) + .option("enforceSchema", false) + .option("multiLine", multiLine) + .load(dir) + .count() === 1L) + } } } } From f2eb1df8256de6f38d72840a5cf70cd4bb7ec643 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Mon, 20 Aug 2018 10:23:53 -0400 Subject: [PATCH 5/6] remove checkHeader and use checkHeaderColumnNames directly --- .../apache/spark/sql/DataFrameReader.scala | 5 ++-- .../datasources/csv/CSVDataSource.scala | 24 ++----------------- 2 files changed, 4 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 9bd113419ae4..4a6b35e6b43d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -505,10 +505,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine => - CSVDataSource.checkHeader( - firstLine, - new CsvParser(parsedOptions.asParserSettings), + CSVDataSource.checkHeaderColumnNames( actualSchema, + new CsvParser(parsedOptions.asParserSettings).parseLine(firstLine), csvDataset.getClass.getCanonicalName, parsedOptions.enforceSchema, sparkSession.sessionState.conf.caseSensitiveAnalysis) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index d1e4bb22b80e..2548766a8152 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -182,25 +182,6 @@ object CSVDataSource extends Logging { } } } - - /** - * Checks that CSV header contains the same column names as fields names in the given schema - * by taking into account case sensitivity. - */ - def checkHeader( - header: String, - parser: CsvParser, - schema: StructType, - fileName: String, - enforceSchema: Boolean, - caseSensitive: Boolean): Unit = { - checkHeaderColumnNames( - schema, - parser.parseLine(header), - fileName, - enforceSchema, - caseSensitive) - } } object TextInputCSVDataSource extends CSVDataSource { @@ -229,10 +210,9 @@ object TextInputCSVDataSource extends CSVDataSource { // Note: if there are only comments in the first block, the header would probably // be not extracted. CSVUtils.extractHeader(lines, parser.options).foreach { header => - CSVDataSource.checkHeader( - header, - parser.tokenizer, + CSVDataSource.checkHeaderColumnNames( if (columnPruning) requiredSchema else dataSchema, + parser.tokenizer.parseLine(header), file.filePath, parser.options.enforceSchema, caseSensitive) From 667db3c8ec6249321de10b4c48be545dcd19784a Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Mon, 20 Aug 2018 12:53:10 -0400 Subject: [PATCH 6/6] use style convention of explicit vals --- .../scala/org/apache/spark/sql/DataFrameReader.scala | 4 +++- .../sql/execution/datasources/csv/CSVDataSource.scala | 9 ++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 4a6b35e6b43d..90cf15f9f722 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -505,9 +505,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine => + val parser = new CsvParser(parsedOptions.asParserSettings) + val columnNames = parser.parseLine(firstLine) CSVDataSource.checkHeaderColumnNames( actualSchema, - new CsvParser(parsedOptions.asParserSettings).parseLine(firstLine), + columnNames, csvDataset.getClass.getCanonicalName, parsedOptions.enforceSchema, sparkSession.sessionState.conf.caseSensitiveAnalysis) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 2548766a8152..2b86054c0ffc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -210,9 +210,11 @@ object TextInputCSVDataSource extends CSVDataSource { // Note: if there are only comments in the first block, the header would probably // be not extracted. CSVUtils.extractHeader(lines, parser.options).foreach { header => + val schema = if (columnPruning) requiredSchema else dataSchema + val columnNames = parser.tokenizer.parseLine(header) CSVDataSource.checkHeaderColumnNames( - if (columnPruning) requiredSchema else dataSchema, - parser.tokenizer.parseLine(header), + schema, + columnNames, file.filePath, parser.options.enforceSchema, caseSensitive) @@ -293,8 +295,9 @@ object MultiLineCSVDataSource extends CSVDataSource { caseSensitive: Boolean, columnPruning: Boolean): Iterator[InternalRow] = { def checkHeader(header: Array[String]): Unit = { + val schema = if (columnPruning) requiredSchema else dataSchema CSVDataSource.checkHeaderColumnNames( - if (columnPruning) requiredSchema else dataSchema, + schema, header, file.filePath, parser.options.enforceSchema,