diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index c9685b866774..1a496d773557 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -109,8 +109,8 @@ displayTitle: Spark SQL Upgrading Guide - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. - + - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. In addition, this makes Spark's Hive table read behavior more consistent over different formats and with the behavior of `spark.read.load`. For example, for both ORC/Parquet Hive tables, `LOCATION '/table/*'` is required instead of `LOCATION '/table/'` to create an external table reading its direct sub-directories like `'/table/dir'`. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. + - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. - Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.statistics.parallelFileListingInStatsComputation.enabled` to `False`. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala index de588768cfde..4b0344836888 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala @@ -18,13 +18,13 @@ package org.apache.spark.sql.hive import java.io.File +import java.io.IOException import org.apache.spark.sql.{Row, SaveMode} import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils /** * A suite of tests for the Parquet support through the data sources API. @@ -222,4 +222,115 @@ class HiveParquetSourceSuite extends ParquetPartitioningTest { assert(df4.columns === Array("str", "max_int")) } } + + test("SPARK-25993 CREATE EXTERNAL TABLE with subdirectories") { + Seq("true", "false").foreach { parquetConversion => + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> parquetConversion) { + withTempPath { path => + withTable("tbl1", "tbl2", "tbl3", "tbl4", "tbl5", "tbl6") { + val someDF1 = Seq((1, 1, "parq1"), (2, 2, "parq2")). + toDF("c1", "c2", "c3").repartition(1) + val someDF2 = Seq((3, 3, "parq3"), (4, 4, "parq4")). + toDF("c1", "c2", "c3").repartition(1) + val someDF3 = Seq((5, 5, "parq5"), (6, 6, "parq6")). + toDF("c1", "c2", "c3").repartition(1) + someDF1.write.parquet(s"${path.getCanonicalPath}/l1/") + someDF2.write.parquet(s"${path.getCanonicalPath}/l1/l2/") + someDF3.write.parquet(s"${path.getCanonicalPath}/l1/l2/l3/") + + val topDirStatement = + s""" + |CREATE EXTERNAL TABLE tbl1( + | c1 int, + | c2 int, + | c3 string) + |STORED AS parquet + |LOCATION '${s"${path.getCanonicalPath}"}'""".stripMargin + sql(topDirStatement) + if (parquetConversion == "true") { + checkAnswer(sql("select * from tbl1"), Nil) + } else { + intercept[IOException](sql("select * from tbl1").show()) + } + + val l1DirStatement = + s""" + |CREATE EXTERNAL TABLE tbl2( + | c1 int, + | c2 int, + | c3 string) + |STORED AS parquet + |LOCATION '${s"${path.getCanonicalPath}/l1/"}'""".stripMargin + sql(l1DirStatement) + if (parquetConversion == "true") { + checkAnswer(sql("select * from tbl2"), + (1 to 2).map(i => Row(i, i, s"parq$i"))) + } else { + intercept[IOException](sql("select * from tbl2").show()) + } + + val l2DirStatement = + s""" + |CREATE EXTERNAL TABLE tbl3( + | c1 int, + | c2 int, + | c3 string) + |STORED AS parquet + |LOCATION '${s"${path.getCanonicalPath}/l1/l2/"}'""".stripMargin + sql(l2DirStatement) + if (parquetConversion == "true") { + checkAnswer(sql("select * from tbl3"), + (3 to 4).map(i => Row(i, i, s"parq$i"))) + } else { + intercept[IOException](sql("select * from tbl3").show()) + } + + val wildcardTopDirStatement = + s""" + |CREATE EXTERNAL TABLE tbl4( + | c1 int, + | c2 int, + | c3 string) + |STORED AS parquet + |LOCATION '${new File(s"${path}/*").toURI}'""".stripMargin + sql(wildcardTopDirStatement) + if (parquetConversion == "true") { + checkAnswer(sql("select * from tbl4"), + (1 to 2).map(i => Row(i, i, s"parq$i"))) + } else { + intercept[IOException](sql("select * from tbl4").show()) + } + + val wildcardL1DirStatement = + s""" + |CREATE EXTERNAL TABLE tbl5( + | c1 int, + | c2 int, + | c3 string) + |STORED AS parquet + |LOCATION '${new File(s"${path}/l1/*").toURI}'""".stripMargin + sql(wildcardL1DirStatement) + if (parquetConversion == "true") { + checkAnswer(sql("select * from tbl5"), + (1 to 4).map(i => Row(i, i, s"parq$i"))) + } else { + intercept[IOException](sql("select * from tbl5").show()) + } + + val wildcardL2DirStatement = + s""" + |CREATE EXTERNAL TABLE tbl6( + | c1 int, + | c2 int, + | c3 string) + |STORED AS parquet + |LOCATION '${new File(s"${path}/l1/l2/*").toURI}'""".stripMargin + sql(wildcardL2DirStatement) + checkAnswer(sql("select * from tbl6"), + (3 to 6).map(i => Row(i, i, s"parq$i"))) + } + } + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index 7fefaf53939b..216898fd7b83 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -190,4 +190,155 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { } } } + + test("SPARK-25993 CREATE EXTERNAL TABLE with subdirectories") { + Seq(true, false).foreach { convertMetastore => + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> s"$convertMetastore") { + withTempDir { dir => + try { + hiveClient.runSqlHive("USE default") + hiveClient.runSqlHive( + """ + |CREATE EXTERNAL TABLE hive_orc( + | C1 INT, + | C2 INT, + | C3 STRING) + |STORED AS orc""".stripMargin) + // Hive throws an exception if I assign the location in the create table statement. + hiveClient.runSqlHive( + s"ALTER TABLE hive_orc SET LOCATION " + + s"'${new File(s"${dir.getCanonicalPath}/l1/").toURI}'") + hiveClient.runSqlHive( + """ + |INSERT INTO TABLE hive_orc + |VALUES (1, 1, 'orc1'), (2, 2, 'orc2')""".stripMargin) + + hiveClient.runSqlHive( + s"ALTER TABLE hive_orc SET LOCATION " + + s"'${new File(s"${dir.getCanonicalPath}/l1/l2/").toURI}'") + hiveClient.runSqlHive( + """ + |INSERT INTO TABLE hive_orc + |VALUES (3, 3, 'orc3'), (4, 4, 'orc4')""".stripMargin) + + hiveClient.runSqlHive( + s"ALTER TABLE hive_orc SET LOCATION " + + s"'${new File(s"${dir.getCanonicalPath}/l1/l2/l3/").toURI}'") + hiveClient.runSqlHive( + """ + |INSERT INTO TABLE hive_orc + |VALUES (5, 5, 'orc5'), (6, 6, 'orc6')""".stripMargin) + + withTable("tbl1", "tbl2", "tbl3", "tbl4", "tbl5", "tbl6") { + val topDirStatement = + s""" + |CREATE EXTERNAL TABLE tbl1( + | c1 int, + | c2 int, + | c3 string) + |STORED AS orc + |LOCATION '${s"${dir.getCanonicalPath}"}'""".stripMargin + sql(topDirStatement) + val topDirSqlStatement = s"select * from tbl1" + if (convertMetastore) { + checkAnswer(sql(topDirSqlStatement), Nil) + } else { + checkAnswer(sql(topDirSqlStatement), + (1 to 6).map(i => Row(i, i, s"orc$i"))) + } + + val l1DirStatement = + s""" + |CREATE EXTERNAL TABLE tbl2( + | c1 int, + | c2 int, + | c3 string) + |STORED AS orc + |LOCATION '${s"${dir.getCanonicalPath}/l1/"}'""".stripMargin + sql(l1DirStatement) + val l1DirSqlStatement = s"select * from tbl2" + if (convertMetastore) { + checkAnswer(sql(l1DirSqlStatement), + (1 to 2).map(i => Row(i, i, s"orc$i"))) + } else { + checkAnswer(sql(l1DirSqlStatement), + (1 to 6).map(i => Row(i, i, s"orc$i"))) + } + + val l2DirStatement = + s""" + |CREATE EXTERNAL TABLE tbl3( + | c1 int, + | c2 int, + | c3 string) + |STORED AS orc + |LOCATION '${s"${dir.getCanonicalPath}/l1/l2/"}'""".stripMargin + sql(l2DirStatement) + val l2DirSqlStatement = s"select * from tbl3" + if (convertMetastore) { + checkAnswer(sql(l2DirSqlStatement), + (3 to 4).map(i => Row(i, i, s"orc$i"))) + } else { + checkAnswer(sql(l2DirSqlStatement), + (3 to 6).map(i => Row(i, i, s"orc$i"))) + } + + val wildcardTopDirStatement = + s""" + |CREATE EXTERNAL TABLE tbl4( + | c1 int, + | c2 int, + | c3 string) + |STORED AS orc + |LOCATION '${new File(s"${dir}/*").toURI}'""".stripMargin + sql(wildcardTopDirStatement) + val wildcardTopDirSqlStatement = s"select * from tbl4" + if (convertMetastore) { + checkAnswer(sql(wildcardTopDirSqlStatement), + (1 to 2).map(i => Row(i, i, s"orc$i"))) + } else { + checkAnswer(sql(wildcardTopDirSqlStatement), Nil) + } + + val wildcardL1DirStatement = + s""" + |CREATE EXTERNAL TABLE tbl5( + | c1 int, + | c2 int, + | c3 string) + |STORED AS orc + |LOCATION '${new File(s"${dir}/l1/*").toURI}'""".stripMargin + sql(wildcardL1DirStatement) + val wildcardL1DirSqlStatement = s"select * from tbl5" + if (convertMetastore) { + checkAnswer(sql(wildcardL1DirSqlStatement), + (1 to 4).map(i => Row(i, i, s"orc$i"))) + } else { + checkAnswer(sql(wildcardL1DirSqlStatement), Nil) + } + + val wildcardL2Statement = + s""" + |CREATE EXTERNAL TABLE tbl6( + | c1 int, + | c2 int, + | c3 string) + |STORED AS orc + |LOCATION '${new File(s"${dir}/l1/l2/*").toURI}'""".stripMargin + sql(wildcardL2Statement) + val wildcardL2SqlStatement = s"select * from tbl6" + if (convertMetastore) { + checkAnswer(sql(wildcardL2SqlStatement), + (3 to 6).map(i => Row(i, i, s"orc$i"))) + } else { + checkAnswer(sql(wildcardL2SqlStatement), Nil) + } + } + } finally { + hiveClient.runSqlHive("DROP TABLE IF EXISTS hive_orc") + } + } + } + } + } }