diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 193a2a2cdc170..78035688e8b42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -106,6 +106,13 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo // we reach here, the table should not exist and we should set `ignoreIfExists` to false. sessionState.catalog.createTable(newTable, ignoreIfExists = false) + // Need to recover partitions into the metastore so the data in the path is visible. + if (partitionColumnNames.nonEmpty && pathOption.nonEmpty && + sparkSession.sessionState.conf.manageFilesourcePartitions) { + sparkSession.sessionState.executePlan( + AlterTableRecoverPartitionsCommand(table.identifier)).toRdd + } + Seq.empty[Row] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4d25f54caa130..02af70aac77e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -271,7 +271,7 @@ object SQLConf { val HIVE_MANAGE_FILESOURCE_PARTITIONS = SQLConfigBuilder("spark.sql.hive.manageFilesourcePartitions") .doc("When true, enable metastore partition management for file source tables as well. " + - "This includes both datasource and converted Hive tables. When partition managment " + + "This includes both datasource and converted Hive tables. When partition management " + "is enabled, datasource tables store partition in the Hive metastore, and use the " + "metastore to prune partitions during query planning.") .booleanConf diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index c2ac032760780..831857c890261 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -83,15 +83,14 @@ class PartitionProviderCompatibilitySuite } } - test("when partition management is enabled, new tables have partition provider hive") { - withTable("test") { - withTempDir { dir => - withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") { - setupPartitionedDatasourceTable("test", dir) - spark.sql("show partitions test").count() // check we are a new table - assert(spark.sql("select * from test").count() == 0) // needs repair - spark.sql("msck repair table test") - assert(spark.sql("select * from test").count() == 5) + test("Return correct results no matter whether partition management is enabled") { + for (enabled <- Seq("false")) { + withTable("test") { + withTempDir { dir => + withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled) { + setupPartitionedDatasourceTable("test", dir) + assert(spark.sql("select * from test").count() == 5) + } } } } @@ -109,14 +108,14 @@ class PartitionProviderCompatibilitySuite } } - test("when partition management is disabled, we preserve the old behavior even for new tables") { + test("When partition management is disabled, we preserve the old behavior even for new tables") { withTable("test") { withTempDir { dir => withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") { setupPartitionedDatasourceTable("test", dir) spark.sql("show partitions test").count() // check we are a new table spark.sql("refresh table test") - assert(spark.sql("select * from test").count() == 0) + assert(spark.sql("select * from test").count() == 5) } // disabled withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") { @@ -130,7 +129,7 @@ class PartitionProviderCompatibilitySuite // then enabled again withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") { spark.sql("refresh table test") - assert(spark.sql("select * from test").count() == 0) + assert(spark.sql("select * from test").count() == 5) } } }