Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ displayTitle: Spark SQL Upgrading Guide

- In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files.

- Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior.

- Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. In addition, this makes Spark's Hive table read behavior more consistent over different formats and with the behavior of `spark.read.load`. For example, for both ORC/Parquet Hive tables, `LOCATION '/table/*'` is required instead of `LOCATION '/table/'` to create an external table reading its direct sub-directories like `'/table/dir'`. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
- In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.

- Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.statistics.parallelFileListingInStatsComputation.enabled` to `False`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.spark.sql.hive

import java.io.File
import java.io.IOException

import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

/**
* A suite of tests for the Parquet support through the data sources API.
Expand Down Expand Up @@ -222,4 +222,115 @@ class HiveParquetSourceSuite extends ParquetPartitioningTest {
assert(df4.columns === Array("str", "max_int"))
}
}

test("SPARK-25993 CREATE EXTERNAL TABLE with subdirectories") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix this first for the first and second review comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Thanks for the comments, I have tried to make the changes for the first and second review comments, I changed both suites to make it looks similar, also add more test cases. For the 3rd comments, I haven't found a common place to both suites, when you say the help function missing in the previous commit, can you help to point what kind of help function I missed? Thanks.

Seq("true", "false").foreach { parquetConversion =>
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> parquetConversion) {
withTempPath { path =>
withTable("tbl1", "tbl2", "tbl3", "tbl4", "tbl5", "tbl6") {
val someDF1 = Seq((1, 1, "parq1"), (2, 2, "parq2")).
toDF("c1", "c2", "c3").repartition(1)
val someDF2 = Seq((3, 3, "parq3"), (4, 4, "parq4")).
toDF("c1", "c2", "c3").repartition(1)
val someDF3 = Seq((5, 5, "parq5"), (6, 6, "parq6")).
toDF("c1", "c2", "c3").repartition(1)
someDF1.write.parquet(s"${path.getCanonicalPath}/l1/")
someDF2.write.parquet(s"${path.getCanonicalPath}/l1/l2/")
someDF3.write.parquet(s"${path.getCanonicalPath}/l1/l2/l3/")

val topDirStatement =
s"""
|CREATE EXTERNAL TABLE tbl1(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${s"${path.getCanonicalPath}"}'""".stripMargin
sql(topDirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("select * from tbl1"), Nil)
} else {
intercept[IOException](sql("select * from tbl1").show())
}

val l1DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl2(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${s"${path.getCanonicalPath}/l1/"}'""".stripMargin
sql(l1DirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("select * from tbl2"),
(1 to 2).map(i => Row(i, i, s"parq$i")))
} else {
intercept[IOException](sql("select * from tbl2").show())
}

val l2DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl3(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${s"${path.getCanonicalPath}/l1/l2/"}'""".stripMargin
sql(l2DirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("select * from tbl3"),
(3 to 4).map(i => Row(i, i, s"parq$i")))
} else {
intercept[IOException](sql("select * from tbl3").show())
}

val wildcardTopDirStatement =
s"""
|CREATE EXTERNAL TABLE tbl4(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${new File(s"${path}/*").toURI}'""".stripMargin
sql(wildcardTopDirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("select * from tbl4"),
(1 to 2).map(i => Row(i, i, s"parq$i")))
} else {
intercept[IOException](sql("select * from tbl4").show())
}

val wildcardL1DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl5(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${new File(s"${path}/l1/*").toURI}'""".stripMargin
sql(wildcardL1DirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("select * from tbl5"),
(1 to 4).map(i => Row(i, i, s"parq$i")))
} else {
intercept[IOException](sql("select * from tbl5").show())
}

val wildcardL2DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl6(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${new File(s"${path}/l1/l2/*").toURI}'""".stripMargin
sql(wildcardL2DirStatement)
checkAnswer(sql("select * from tbl6"),
(3 to 6).map(i => Row(i, i, s"parq$i")))
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,155 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton {
}
}
}

test("SPARK-25993 CREATE EXTERNAL TABLE with subdirectories") {
Seq(true, false).foreach { convertMetastore =>
withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> s"$convertMetastore") {
withTempDir { dir =>
try {
hiveClient.runSqlHive("USE default")
hiveClient.runSqlHive(
"""
|CREATE EXTERNAL TABLE hive_orc(
| C1 INT,
| C2 INT,
| C3 STRING)
|STORED AS orc""".stripMargin)
// Hive throws an exception if I assign the location in the create table statement.
hiveClient.runSqlHive(
s"ALTER TABLE hive_orc SET LOCATION " +
s"'${new File(s"${dir.getCanonicalPath}/l1/").toURI}'")
hiveClient.runSqlHive(
"""
|INSERT INTO TABLE hive_orc
|VALUES (1, 1, 'orc1'), (2, 2, 'orc2')""".stripMargin)

hiveClient.runSqlHive(
s"ALTER TABLE hive_orc SET LOCATION " +
s"'${new File(s"${dir.getCanonicalPath}/l1/l2/").toURI}'")
hiveClient.runSqlHive(
"""
|INSERT INTO TABLE hive_orc
|VALUES (3, 3, 'orc3'), (4, 4, 'orc4')""".stripMargin)

hiveClient.runSqlHive(
s"ALTER TABLE hive_orc SET LOCATION " +
s"'${new File(s"${dir.getCanonicalPath}/l1/l2/l3/").toURI}'")
hiveClient.runSqlHive(
"""
|INSERT INTO TABLE hive_orc
|VALUES (5, 5, 'orc5'), (6, 6, 'orc6')""".stripMargin)

withTable("tbl1", "tbl2", "tbl3", "tbl4", "tbl5", "tbl6") {
val topDirStatement =
s"""
|CREATE EXTERNAL TABLE tbl1(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${s"${dir.getCanonicalPath}"}'""".stripMargin
sql(topDirStatement)
val topDirSqlStatement = s"select * from tbl1"
if (convertMetastore) {
checkAnswer(sql(topDirSqlStatement), Nil)
} else {
checkAnswer(sql(topDirSqlStatement),
(1 to 6).map(i => Row(i, i, s"orc$i")))
}

val l1DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl2(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${s"${dir.getCanonicalPath}/l1/"}'""".stripMargin
sql(l1DirStatement)
val l1DirSqlStatement = s"select * from tbl2"
if (convertMetastore) {
checkAnswer(sql(l1DirSqlStatement),
(1 to 2).map(i => Row(i, i, s"orc$i")))
} else {
checkAnswer(sql(l1DirSqlStatement),
(1 to 6).map(i => Row(i, i, s"orc$i")))
}

val l2DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl3(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${s"${dir.getCanonicalPath}/l1/l2/"}'""".stripMargin
sql(l2DirStatement)
val l2DirSqlStatement = s"select * from tbl3"
if (convertMetastore) {
checkAnswer(sql(l2DirSqlStatement),
(3 to 4).map(i => Row(i, i, s"orc$i")))
} else {
checkAnswer(sql(l2DirSqlStatement),
(3 to 6).map(i => Row(i, i, s"orc$i")))
}

val wildcardTopDirStatement =
s"""
|CREATE EXTERNAL TABLE tbl4(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${new File(s"${dir}/*").toURI}'""".stripMargin
sql(wildcardTopDirStatement)
val wildcardTopDirSqlStatement = s"select * from tbl4"
if (convertMetastore) {
checkAnswer(sql(wildcardTopDirSqlStatement),
(1 to 2).map(i => Row(i, i, s"orc$i")))
} else {
checkAnswer(sql(wildcardTopDirSqlStatement), Nil)
}

val wildcardL1DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl5(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${new File(s"${dir}/l1/*").toURI}'""".stripMargin
sql(wildcardL1DirStatement)
val wildcardL1DirSqlStatement = s"select * from tbl5"
if (convertMetastore) {
checkAnswer(sql(wildcardL1DirSqlStatement),
(1 to 4).map(i => Row(i, i, s"orc$i")))
} else {
checkAnswer(sql(wildcardL1DirSqlStatement), Nil)
}

val wildcardL2Statement =
s"""
|CREATE EXTERNAL TABLE tbl6(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${new File(s"${dir}/l1/l2/*").toURI}'""".stripMargin
sql(wildcardL2Statement)
val wildcardL2SqlStatement = s"select * from tbl6"
if (convertMetastore) {
checkAnswer(sql(wildcardL2SqlStatement),
(3 to 6).map(i => Row(i, i, s"orc$i")))
} else {
checkAnswer(sql(wildcardL2SqlStatement), Nil)
}
}
} finally {
hiveClient.runSqlHive("DROP TABLE IF EXISTS hive_orc")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we need to clean up tbl1 ~ tbl4, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun at line 221, I put the tbl1 ~ tbl4 with the withTable, I think it will get dropped. I tried to run it couple time in intellij, it seems work fine. what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I missed that.

}
}
}
}
}
}