Skip to content

Commit 77579aa

Browse files
committed
[SPARK-25389][SQL] INSERT OVERWRITE DIRECTORY STORED AS should prevent duplicate fields
## What changes were proposed in this pull request? Like `INSERT OVERWRITE DIRECTORY USING` syntax, `INSERT OVERWRITE DIRECTORY STORED AS` should not generate files with duplicate fields because Spark cannot read those files back. **INSERT OVERWRITE DIRECTORY USING** ```scala scala> sql("INSERT OVERWRITE DIRECTORY 'file:///tmp/parquet' USING parquet SELECT 'id', 'id2' id") ... ERROR InsertIntoDataSourceDirCommand: Failed to write to directory ... org.apache.spark.sql.AnalysisException: Found duplicate column(s) when inserting into file:/tmp/parquet: `id`; ``` **INSERT OVERWRITE DIRECTORY STORED AS** ```scala scala> sql("INSERT OVERWRITE DIRECTORY 'file:///tmp/parquet' STORED AS parquet SELECT 'id', 'id2' id") // It generates corrupted files scala> spark.read.parquet("/tmp/parquet").show 18/09/09 22:09:57 WARN DataSource: Found duplicate column(s) in the data schema and the partition schema: `id`; ``` ## How was this patch tested? Pass the Jenkins with newly added test cases. Closes #22378 from dongjoon-hyun/SPARK-25389. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent c9cb393 commit 77579aa

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
3434
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3535
import org.apache.spark.sql.execution.SparkPlan
3636
import org.apache.spark.sql.hive.client.HiveClientImpl
37+
import org.apache.spark.sql.util.SchemaUtils
3738

3839
/**
3940
* Command for writing the results of `query` to file system.
@@ -61,6 +62,10 @@ case class InsertIntoHiveDirCommand(
6162

6263
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
6364
assert(storage.locationUri.nonEmpty)
65+
SchemaUtils.checkColumnNameDuplication(
66+
outputColumnNames,
67+
s"when inserting into ${storage.locationUri.get}",
68+
sparkSession.sessionState.conf.caseSensitiveAnalysis)
6469

6570
val hiveTable = HiveClientImpl.toHiveTable(CatalogTable(
6671
identifier = TableIdentifier(storage.locationUri.get.toString, Some("default")),

sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{QueryTest, _}
2626
import org.apache.spark.sql.catalyst.parser.ParseException
2727
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
2828
import org.apache.spark.sql.hive.test.TestHiveSingleton
29+
import org.apache.spark.sql.internal.SQLConf
2930
import org.apache.spark.sql.test.SQLTestUtils
3031
import org.apache.spark.sql.types._
3132
import org.apache.spark.util.Utils
@@ -750,4 +751,27 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
750751
}
751752
}
752753
}
754+
755+
Seq("LOCAL", "").foreach { local =>
756+
Seq(true, false).foreach { caseSensitivity =>
757+
Seq("orc", "parquet").foreach { format =>
758+
test(s"SPARK-25389 INSERT OVERWRITE $local DIRECTORY ... STORED AS with duplicated names" +
759+
s"(caseSensitivity=$caseSensitivity, format=$format)") {
760+
withTempDir { dir =>
761+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitivity") {
762+
val m = intercept[AnalysisException] {
763+
sql(
764+
s"""
765+
|INSERT OVERWRITE $local DIRECTORY '${dir.toURI}'
766+
|STORED AS $format
767+
|SELECT 'id', 'id2' ${if (caseSensitivity) "id" else "ID"}
768+
""".stripMargin)
769+
}.getMessage
770+
assert(m.contains("Found duplicate column(s) when inserting into"))
771+
}
772+
}
773+
}
774+
}
775+
}
776+
}
753777
}

0 commit comments

Comments
 (0)