Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
sessionState.catalog.createTable(newTable, ignoreIfExists = false)

// Need to recover partitions into the metastore so the data in the path is visible.
if (partitionColumnNames.nonEmpty && pathOption.nonEmpty &&
sparkSession.sessionState.conf.manageFilesourcePartitions) {
sparkSession.sessionState.executePlan(
AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
}

Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ object SQLConf {
val HIVE_MANAGE_FILESOURCE_PARTITIONS =
SQLConfigBuilder("spark.sql.hive.manageFilesourcePartitions")
.doc("When true, enable metastore partition management for file source tables as well. " +
"This includes both datasource and converted Hive tables. When partition managment " +
"This includes both datasource and converted Hive tables. When partition management " +
"is enabled, datasource tables store partition in the Hive metastore, and use the " +
"metastore to prune partitions during query planning.")
.booleanConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,14 @@ class PartitionProviderCompatibilitySuite
}
}

test("when partition management is enabled, new tables have partition provider hive") {
withTable("test") {
withTempDir { dir =>
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
setupPartitionedDatasourceTable("test", dir)
spark.sql("show partitions test").count() // check we are a new table
assert(spark.sql("select * from test").count() == 0) // needs repair
spark.sql("msck repair table test")
assert(spark.sql("select * from test").count() == 5)
test("Return correct results no matter whether partition management is enabled") {
for (enabled <- Seq("false")) {
withTable("test") {
withTempDir { dir =>
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled) {
setupPartitionedDatasourceTable("test", dir)
assert(spark.sql("select * from test").count() == 5)
}
}
}
}
Expand All @@ -109,14 +108,14 @@ class PartitionProviderCompatibilitySuite
}
}

test("when partition management is disabled, we preserve the old behavior even for new tables") {
test("When partition management is disabled, we preserve the old behavior even for new tables") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old behavior is returning 5 rows.

withTable("test") {
withTempDir { dir =>
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
setupPartitionedDatasourceTable("test", dir)
spark.sql("show partitions test").count() // check we are a new table
spark.sql("refresh table test")
assert(spark.sql("select * from test").count() == 0)
assert(spark.sql("select * from test").count() == 5)
}
// disabled
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
Expand All @@ -130,7 +129,7 @@ class PartitionProviderCompatibilitySuite
// then enabled again
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
spark.sql("refresh table test")
assert(spark.sql("select * from test").count() == 0)
assert(spark.sql("select * from test").count() == 5)
}
}
}
Expand Down