From ea91f127ecf0cc9d77b27366de87daa1f2237290 Mon Sep 17 00:00:00 2001 From: TJX2014 Date: Fri, 12 Jun 2020 16:54:05 +0800 Subject: [PATCH 1/6] duplicate column check --- .../sql/execution/datasources/PartitioningUtils.scala | 9 +++++++++ .../apache/spark/sql/sources/PartitionedWriteSuite.scala | 9 ++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index f7e225b0cdc96..5a6de7eebe3aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -545,6 +545,15 @@ object PartitioningUtils { partitionColumns: Seq[String], caseSensitive: Boolean): Unit = { + val existsCols = new mutable.HashSet[String] + partitionColumns.foreach(col => { + if (existsCols.contains(col)) { + throw new AnalysisException(s"partition ${col} is duplicate") + } else { + existsCols.add(col) + } + }) + partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach { field => field.dataType match { case _: AtomicType => // OK diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 1c4e2a967b0a2..99e0208c1b284 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.TestUtils import org.apache.spark.internal.Logging -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol @@ -156,4 +156,11 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-31968:duplicate partition columns check") { + val e = intercept[AnalysisException](Seq((3, 2)).toDF("a", "b"). + write.mode("append") + .partitionBy("b", "b").saveAsTable("t")) + assert(e.getMessage.contains("partition b is duplicate")) + } } From 8a14b7e78441f6e75831813e12e9b8a2005a4f67 Mon Sep 17 00:00:00 2001 From: TJX2014 Date: Fri, 12 Jun 2020 17:31:29 +0800 Subject: [PATCH 2/6] message improve add ut fix --- .../spark/sql/execution/datasources/PartitioningUtils.scala | 3 ++- .../org/apache/spark/sql/sources/PartitionedWriteSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 5a6de7eebe3aa..844a3c4f30b0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -548,7 +548,8 @@ object PartitioningUtils { val existsCols = new mutable.HashSet[String] partitionColumns.foreach(col => { if (existsCols.contains(col)) { - throw new AnalysisException(s"partition ${col} is duplicate") + throw new AnalysisException(s"" + + s"Found partition ${col} is duplicate in ${partitionColumns}") } else { existsCols.add(col) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 99e0208c1b284..7da081df8f6a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -160,7 +160,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { test("SPARK-31968:duplicate partition columns check") { val e = intercept[AnalysisException](Seq((3, 2)).toDF("a", "b"). write.mode("append") - .partitionBy("b", "b").saveAsTable("t")) - assert(e.getMessage.contains("partition b is duplicate")) + .partitionBy("b", "b").csv("/tmp")) + assert(e.getMessage.contains("Found partition b is duplicate in WrappedArray(b, b);")) } } From 739c15a09dcdebd362be4e984b95818521b7a655 Mon Sep 17 00:00:00 2001 From: TJX2014 Date: Fri, 12 Jun 2020 17:51:58 +0800 Subject: [PATCH 3/6] casesensitive judge and judge duplicate way improved to refer to SchemaUtils --- .../datasources/PartitioningUtils.scala | 18 ++++++++++-------- .../sql/sources/PartitionedWriteSuite.scala | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 844a3c4f30b0f..f82b3f461983e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -545,15 +545,17 @@ object PartitioningUtils { partitionColumns: Seq[String], caseSensitive: Boolean): Unit = { - val existsCols = new mutable.HashSet[String] - partitionColumns.foreach(col => { - if (existsCols.contains(col)) { - throw new AnalysisException(s"" + - s"Found partition ${col} is duplicate in ${partitionColumns}") - } else { - existsCols.add(col) + // scalastyle:off caselocale + val names = if (caseSensitive) partitionColumns else partitionColumns.map(_.toLowerCase) + // scalastyle:on caselocale + + if (names.distinct.length != names.length) { + val duplicateColumns = names.groupBy(identity).collect { + case (x, ys) if ys.length > 1 => s"`$x`" } - }) + throw new AnalysisException( + s"Found duplicate partition column(s) ${duplicateColumns.mkString(", ")}") + } partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach { field => field.dataType match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 7da081df8f6a1..f26907186776f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -161,6 +161,6 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { val e = intercept[AnalysisException](Seq((3, 2)).toDF("a", "b"). write.mode("append") .partitionBy("b", "b").csv("/tmp")) - assert(e.getMessage.contains("Found partition b is duplicate in WrappedArray(b, b);")) + assert(e.getMessage.contains("Found duplicate partition column(s) `b`;")) } } From 0aab7b5adadec5cfbf788bfbfdc28b55033100dc Mon Sep 17 00:00:00 2001 From: TJX2014 Date: Fri, 12 Jun 2020 18:30:15 +0800 Subject: [PATCH 4/6] reuse SchemaUtils.checkColumnNameDuplication, may be it could be better --- .../execution/datasources/PartitioningUtils.scala | 13 ++----------- .../spark/sql/sources/PartitionedWriteSuite.scala | 10 ++++++---- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index f82b3f461983e..522cdc03c2eff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -545,17 +545,8 @@ object PartitioningUtils { partitionColumns: Seq[String], caseSensitive: Boolean): Unit = { - // scalastyle:off caselocale - val names = if (caseSensitive) partitionColumns else partitionColumns.map(_.toLowerCase) - // scalastyle:on caselocale - - if (names.distinct.length != names.length) { - val duplicateColumns = names.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => s"`$x`" - } - throw new AnalysisException( - s"Found duplicate partition column(s) ${duplicateColumns.mkString(", ")}") - } + SchemaUtils.checkColumnNameDuplication( + partitionColumns, partitionColumns.mkString(","), caseSensitive) partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach { field => field.dataType match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index f26907186776f..f11d00c71855a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -158,9 +158,11 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } test("SPARK-31968:duplicate partition columns check") { - val e = intercept[AnalysisException](Seq((3, 2)).toDF("a", "b"). - write.mode("append") - .partitionBy("b", "b").csv("/tmp")) - assert(e.getMessage.contains("Found duplicate partition column(s) `b`;")) + val ds = Seq((3, 2)).toDF("a", "b") + val e = intercept[AnalysisException](ds + .write.mode(org.apache.spark.sql.SaveMode.Overwrite) + .partitionBy("b", "b").csv("/tmp/111")) + assert(e.getMessage.contains( + "Found duplicate column(s) b,b: `b`;")) } } From ffebc0cb66182f08a9b94a41d47486e726a100af Mon Sep 17 00:00:00 2001 From: TJX2014 Date: Sat, 13 Jun 2020 08:56:10 +0800 Subject: [PATCH 5/6] code format correct --- .../execution/datasources/PartitioningUtils.scala | 2 +- .../spark/sql/sources/PartitionedWriteSuite.scala | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 522cdc03c2eff..5846d46e146ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -546,7 +546,7 @@ object PartitioningUtils { caseSensitive: Boolean): Unit = { SchemaUtils.checkColumnNameDuplication( - partitionColumns, partitionColumns.mkString(","), caseSensitive) + partitionColumns, partitionColumns.mkString(", "), caseSensitive) partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach { field => field.dataType match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index f11d00c71855a..41d1f14314196 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -157,12 +157,13 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { } } - test("SPARK-31968:duplicate partition columns check") { - val ds = Seq((3, 2)).toDF("a", "b") - val e = intercept[AnalysisException](ds - .write.mode(org.apache.spark.sql.SaveMode.Overwrite) - .partitionBy("b", "b").csv("/tmp/111")) - assert(e.getMessage.contains( - "Found duplicate column(s) b,b: `b`;")) + test("SPARK-31968: duplicate partition columns check") { + withTempPath { f => + val ds = Seq((3, 2)).toDF("a", "b") + val e = intercept[AnalysisException](ds + .write.partitionBy("b", "b").csv(f.getAbsolutePath)) + assert(e.getMessage.contains( + "Found duplicate column(s) b, b: `b`;")) + } } } From 4f0bc9f5ca2702e5daa4bad54e3158011d5041a2 Mon Sep 17 00:00:00 2001 From: TJX2014 Date: Sat, 13 Jun 2020 09:54:08 +0800 Subject: [PATCH 6/6] code style improve --- .../apache/spark/sql/sources/PartitionedWriteSuite.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 41d1f14314196..6df1c5db14c26 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -159,11 +159,9 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { test("SPARK-31968: duplicate partition columns check") { withTempPath { f => - val ds = Seq((3, 2)).toDF("a", "b") - val e = intercept[AnalysisException](ds - .write.partitionBy("b", "b").csv(f.getAbsolutePath)) - assert(e.getMessage.contains( - "Found duplicate column(s) b, b: `b`;")) + val e = intercept[AnalysisException]( + Seq((3, 2)).toDF("a", "b").write.partitionBy("b", "b").csv(f.getAbsolutePath)) + assert(e.getMessage.contains("Found duplicate column(s) b, b: `b`;")) } } }