From adb9b2251faa4e847dbdc954b5f8f04b84d8c8aa Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Thu, 13 Feb 2020 20:24:16 +0900 Subject: [PATCH 1/2] Parses and convert a CSV Dataset having different column from 'value' in csv(dataset) API --- .../spark/sql/execution/datasources/csv/CSVUtils.scala | 5 ++--- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 7 +++++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala index 21fabac472f4b..bf0ab7c2b5200 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala @@ -32,10 +32,9 @@ object CSVUtils { // Note that this was separately made by SPARK-18362. Logically, this should be the same // with the one below, `filterCommentAndEmpty` but execution path is different. One of them // might have to be removed in the near future if possible. - import lines.sqlContext.implicits._ - val nonEmptyLines = lines.filter(length(trim($"value")) > 0) + val nonEmptyLines = lines.filter(length(trim(col(lines.schema.fieldNames.head))) > 0) if (options.isCommentSet) { - nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)) + nonEmptyLines.filter(!col(lines.schema.fieldNames.head).startsWith(options.comment.toString)) } else { nonEmptyLines } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 97dfbbdb7fd2f..c10a88875393d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2294,6 +2294,13 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa } } } + + test("SPARK-30810: parses and convert a CSV Dataset having different column from 'value'") { + val ds = spark.range(2).selectExpr("concat('a,b,', id) AS text").as[String] + val csv = spark.read.option("header", true).option("inferSchema", true).csv(ds) + assert(csv.schema.fieldNames === Seq("a", "b", "0")) + checkAnswer(csv, Row("a", "b", 1)) + } } class CSVv1Suite extends CSVSuite { From 4e9ddf1ecc82a9d43d50c261dadf2be2d2b60395 Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Fri, 14 Feb 2020 11:39:00 +0900 Subject: [PATCH 2/2] Another approach --- .../spark/sql/execution/datasources/csv/CSVUtils.scala | 8 +++++--- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala index bf0ab7c2b5200..d8b52c503ad34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala @@ -32,11 +32,13 @@ object CSVUtils { // Note that this was separately made by SPARK-18362. Logically, this should be the same // with the one below, `filterCommentAndEmpty` but execution path is different. One of them // might have to be removed in the near future if possible. - val nonEmptyLines = lines.filter(length(trim(col(lines.schema.fieldNames.head))) > 0) + import lines.sqlContext.implicits._ + val aliased = lines.toDF("value") + val nonEmptyLines = aliased.filter(length(trim($"value")) > 0) if (options.isCommentSet) { - nonEmptyLines.filter(!col(lines.schema.fieldNames.head).startsWith(options.comment.toString)) + nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).as[String] } else { - nonEmptyLines + nonEmptyLines.as[String] } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index c10a88875393d..c6f0263e8653c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2296,7 +2296,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa } test("SPARK-30810: parses and convert a CSV Dataset having different column from 'value'") { - val ds = spark.range(2).selectExpr("concat('a,b,', id) AS text").as[String] + val ds = spark.range(2).selectExpr("concat('a,b,', id) AS `a.text`").as[String] val csv = spark.read.option("header", true).option("inferSchema", true).csv(ds) assert(csv.schema.fieldNames === Seq("a", "b", "0")) checkAnswer(csv, Row("a", "b", 1))