Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import java.io.File
import java.io.IOException

import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
Expand Down Expand Up @@ -222,4 +223,158 @@ class HiveParquetSourceSuite extends ParquetPartitioningTest {
assert(df4.columns === Array("str", "max_int"))
}
}

test("SPARK-25993 CREATE EXTERNAL TABLE with subdirectories") {
Seq("true", "false").foreach { parquetConversion =>
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> parquetConversion) {
withTempPath { path =>
withTable("parq_tbl1", "parq_tbl2", "parq_tbl3",
"tbl1", "tbl2", "tbl3", "tbl4", "tbl5", "tbl6") {
val parquetTblStatement1 =
s"""
|CREATE EXTERNAL TABLE parq_tbl1(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${s"${path.getCanonicalPath}/l1/"}'""".stripMargin
sql(parquetTblStatement1)

val parquetTblInsertL1 =
s"INSERT INTO TABLE parq_tbl1 VALUES (1, 1, 'parq1'), (2, 2, 'parq2')".stripMargin
sql(parquetTblInsertL1)

val parquetTblStatement2 =
s"""
|CREATE EXTERNAL TABLE parq_tbl2(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${s"${path.getCanonicalPath}/l1/l2/"}'""".stripMargin
sql(parquetTblStatement2)

val parquetTblInsertL2 =
s"INSERT INTO TABLE parq_tbl2 VALUES (3, 3, 'parq3'), (4, 4, 'parq4')".stripMargin
sql(parquetTblInsertL2)

val parquetTblStatement3 =
s"""
|CREATE EXTERNAL TABLE parq_tbl3(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${s"${path.getCanonicalPath}/l1/l2/l3/"}'""".stripMargin
sql(parquetTblStatement3)

val parquetTblInsertL3 =
s"INSERT INTO TABLE parq_tbl3 VALUES (5, 5, 'parq5'), (6, 6, 'parq6')".stripMargin
sql(parquetTblInsertL3)

val topDirStatement =
s"""
|CREATE EXTERNAL TABLE tbl1(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${s"${path.getCanonicalPath}"}'""".stripMargin
sql(topDirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("SELECT * FROM tbl1"), Nil)
} else {
val msg = intercept[IOException] {
sql("SELECT * FROM tbl1").show()
}.getMessage
assert(msg.contains("Not a file:"))
}

val l1DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl2(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${s"${path.getCanonicalPath}/l1/"}'""".stripMargin
sql(l1DirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("SELECT * FROM tbl2"), (1 to 2).map(i => Row(i, i, s"parq$i")))
} else {
val msg = intercept[IOException] {
sql("SELECT * FROM tbl2").show()
}.getMessage
assert(msg.contains("Not a file:"))
}

val l2DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl3(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${s"${path.getCanonicalPath}/l1/l2/"}'""".stripMargin
sql(l2DirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("SELECT * FROM tbl3"), (3 to 4).map(i => Row(i, i, s"parq$i")))
} else {
val msg = intercept[IOException] {
sql("SELECT * FROM tbl3").show()
}.getMessage
assert(msg.contains("Not a file:"))
}

val wildcardTopDirStatement =
s"""
|CREATE EXTERNAL TABLE tbl4(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${new File(s"${path}/*").toURI}'""".stripMargin
sql(wildcardTopDirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("SELECT * FROM tbl4"), (1 to 2).map(i => Row(i, i, s"parq$i")))
} else {
val msg = intercept[IOException] {
sql("SELECT * FROM tbl4").show()
}.getMessage
assert(msg.contains("Not a file:"))
}

val wildcardL1DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl5(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${new File(s"${path}/l1/*").toURI}'""".stripMargin
sql(wildcardL1DirStatement)
if (parquetConversion == "true") {
checkAnswer(sql("SELECT * FROM tbl5"), (1 to 4).map(i => Row(i, i, s"parq$i")))
} else {
val msg = intercept[IOException] {
sql("SELECT * FROM tbl5").show()
}.getMessage
assert(msg.contains("Not a file:"))
}

val wildcardL2DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl6(
| c1 int,
| c2 int,
| c3 string)
|STORED AS parquet
|LOCATION '${new File(s"${path}/l1/l2/*").toURI}'""".stripMargin
sql(wildcardL2DirStatement)
checkAnswer(sql("SELECT * FROM tbl6"), (3 to 6).map(i => Row(i, i, s"parq$i")))
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,154 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton {
test("SPARK-11412 read and merge orc schemas in parallel") {
testMergeSchemasInParallel(OrcFileOperator.readOrcSchemasInParallel)
}

test("SPARK-25993 CREATE EXTERNAL TABLE with subdirectories") {
Seq(true, false).foreach { convertMetastore =>
withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> s"$convertMetastore") {
withTempDir { dir =>
withTable("orc_tbl1", "orc_tbl2", "orc_tbl3") {
val orcTblStatement1 =
s"""
|CREATE EXTERNAL TABLE orc_tbl1(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${s"${dir.getCanonicalPath}/l1/"}'""".stripMargin
sql(orcTblStatement1)

val orcTblInsertL1 =
s"INSERT INTO TABLE orc_tbl1 VALUES (1, 1, 'orc1'), (2, 2, 'orc2')".stripMargin
sql(orcTblInsertL1)

val orcTblStatement2 =
s"""
|CREATE EXTERNAL TABLE orc_tbl2(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${s"${dir.getCanonicalPath}/l1/l2/"}'""".stripMargin
sql(orcTblStatement2)

val orcTblInsertL2 =
s"INSERT INTO TABLE orc_tbl2 VALUES (3, 3, 'orc3'), (4, 4, 'orc4')".stripMargin
sql(orcTblInsertL2)

val orcTblStatement3 =
s"""
|CREATE EXTERNAL TABLE orc_tbl3(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${s"${dir.getCanonicalPath}/l1/l2/l3/"}'""".stripMargin
sql(orcTblStatement3)

val orcTblInsertL3 =
s"INSERT INTO TABLE orc_tbl3 VALUES (5, 5, 'orc5'), (6, 6, 'orc6')".stripMargin
sql(orcTblInsertL3)

withTable("tbl1", "tbl2", "tbl3", "tbl4", "tbl5", "tbl6") {
val topDirStatement =
s"""
|CREATE EXTERNAL TABLE tbl1(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${s"${dir.getCanonicalPath}"}'""".stripMargin
sql(topDirStatement)
val topDirSqlStatement = s"SELECT * FROM tbl1"
if (convertMetastore) {
checkAnswer(sql(topDirSqlStatement), Nil)
} else {
checkAnswer(sql(topDirSqlStatement), (1 to 6).map(i => Row(i, i, s"orc$i")))
}

val l1DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl2(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${s"${dir.getCanonicalPath}/l1/"}'""".stripMargin
sql(l1DirStatement)
val l1DirSqlStatement = s"SELECT * FROM tbl2"
if (convertMetastore) {
checkAnswer(sql(l1DirSqlStatement), (1 to 2).map(i => Row(i, i, s"orc$i")))
} else {
checkAnswer(sql(l1DirSqlStatement), (1 to 6).map(i => Row(i, i, s"orc$i")))
}

val l2DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl3(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${s"${dir.getCanonicalPath}/l1/l2/"}'""".stripMargin
sql(l2DirStatement)
val l2DirSqlStatement = s"SELECT * FROM tbl3"
if (convertMetastore) {
checkAnswer(sql(l2DirSqlStatement), (3 to 4).map(i => Row(i, i, s"orc$i")))
} else {
checkAnswer(sql(l2DirSqlStatement), (3 to 6).map(i => Row(i, i, s"orc$i")))
}

val wildcardTopDirStatement =
s"""
|CREATE EXTERNAL TABLE tbl4(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${new File(s"${dir}/*").toURI}'""".stripMargin
sql(wildcardTopDirStatement)
val wildcardTopDirSqlStatement = s"SELECT * FROM tbl4"
if (convertMetastore) {
checkAnswer(sql(wildcardTopDirSqlStatement), (1 to 2).map(i => Row(i, i, s"orc$i")))
} else {
checkAnswer(sql(wildcardTopDirSqlStatement), Nil)
}

val wildcardL1DirStatement =
s"""
|CREATE EXTERNAL TABLE tbl5(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${new File(s"${dir}/l1/*").toURI}'""".stripMargin
sql(wildcardL1DirStatement)
val wildcardL1DirSqlStatement = s"SELECT * FROM tbl5"
if (convertMetastore) {
checkAnswer(sql(wildcardL1DirSqlStatement), (1 to 4).map(i => Row(i, i, s"orc$i")))
} else {
checkAnswer(sql(wildcardL1DirSqlStatement), Nil)
}

val wildcardL2Statement =
s"""
|CREATE EXTERNAL TABLE tbl6(
| c1 int,
| c2 int,
| c3 string)
|STORED AS orc
|LOCATION '${new File(s"${dir}/l1/l2/*").toURI}'""".stripMargin
sql(wildcardL2Statement)
val wildcardL2SqlStatement = s"SELECT * FROM tbl6"
if (convertMetastore) {
checkAnswer(sql(wildcardL2SqlStatement), (3 to 6).map(i => Row(i, i, s"orc$i")))
} else {
checkAnswer(sql(wildcardL2SqlStatement), Nil)
}
}
}
}
}
}
}
}