From 31f2efa9b93f581f48fcd941d7d167d30fbad85f Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 2 Apr 2018 07:37:50 +0800 Subject: [PATCH 1/7] validate managed table location --- .../sql/catalyst/catalog/SessionCatalog.scala | 21 ++++++- .../command/createDataSourceTables.scala | 5 +- .../spark/sql/StatisticsCollectionSuite.scala | 7 +++ .../sql/execution/command/DDLSuite.scala | 54 ++++++++++++++++++ .../sql/hive/execution/HiveDDLSuite.scala | 55 +++++++++++++++++++ 5 files changed, 138 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 64e7ca11270b..a0c0146c89f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -289,7 +289,11 @@ class SessionCatalog( def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableDefinition.identifier.table) + val tableIdentifier = TableIdentifier(table, Some(db)) validateName(table) + if (!ignoreIfExists) { + validateTableLocation(tableDefinition, tableIdentifier) + } val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined && !tableDefinition.storage.locationUri.get.isAbsolute) { @@ -298,15 +302,28 @@ class SessionCatalog( makeQualifiedPath(tableDefinition.storage.locationUri.get) tableDefinition.copy( storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)), - identifier = TableIdentifier(table, Some(db))) + identifier = tableIdentifier) } else { - tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) + tableDefinition.copy(identifier = tableIdentifier) } requireDbExists(db) externalCatalog.createTable(newTableDefinition, ignoreIfExists) } + def validateTableLocation(table: CatalogTable, tableIdentifier: TableIdentifier): Unit = { + // SPARK-19724: the default location of a managed table should be non-existent or empty. + if (table.tableType == CatalogTableType.MANAGED) { + val tableLocation = new Path(defaultTablePath(tableIdentifier)) + val fs = tableLocation.getFileSystem(hadoopConf) + + if (fs.exists(tableLocation) && fs.listStatus(tableLocation).nonEmpty) { + throw new AnalysisException(s"Can not create the managed table('${tableIdentifier}')" + + s". The associated location('${tableLocation.toString}') already exists.") + } + } + } + /** * Alter the metadata of an existing metastore table identified by `tableDefinition`. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index e9747769dfcf..f4bd2dcb2c13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -167,7 +167,7 @@ case class CreateDataSourceTableAsSelectCommand( sparkSession, table, table.storage.locationUri, child, SaveMode.Append, tableExists = true) } else { assert(table.schema.isEmpty) - + sparkSession.sessionState.catalog.validateTableLocation(table, table.identifier) val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { Some(sessionState.catalog.defaultTablePath(table.identifier)) } else { @@ -181,7 +181,8 @@ case class CreateDataSourceTableAsSelectCommand( // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). schema = result.schema) - sessionState.catalog.createTable(newTable, ignoreIfExists = false) + // Table location is already validated. No need to check it again during table creation. + sessionState.catalog.createTable(newTable, ignoreIfExists = true) result match { case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index ed4ea0231f1a..14a565863d66 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.io.File + import scala.collection.mutable import org.apache.spark.sql.catalyst.TableIdentifier @@ -26,6 +28,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData.ArrayData import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** @@ -242,6 +245,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared test("change stats after set location command") { val table = "change_stats_set_location_table" + val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier(table))) Seq(false, true).foreach { autoUpdate => withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) { withTable(table) { @@ -269,6 +273,9 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared assert(fetched3.get.sizeInBytes == fetched1.get.sizeInBytes) } else { checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None) + // SPARK-19724: clean up the previous table location. + waitForTasksToFinish() + Utils.deleteRecursively(tableLoc) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 4df8fbfe1c0d..060ea8b267c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -103,6 +103,60 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo } } + test("CTAS a managed table with the existing empty directory") { + val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) + try { + tableLoc.mkdir() + withTable("tab1") { + sql("CREATE TABLE tab1 USING PARQUET AS SELECT 1, 'a'") + checkAnswer(spark.table("tab1"), Row(1, "a")) + } + } finally { + waitForTasksToFinish() + Utils.deleteRecursively(tableLoc) + } + } + + test("create a managed table with the existing empty directory") { + val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) + try { + tableLoc.mkdir() + withTable("tab1") { + sql("CREATE TABLE tab1 (col1 int, col2 string) USING PARQUET") + sql("INSERT INTO tab1 VALUES (1, 'a')") + checkAnswer(spark.table("tab1"), Row(1, "a")) + } + } finally { + waitForTasksToFinish() + Utils.deleteRecursively(tableLoc) + } + } + + test("create a managed table with the existing non-empty directory") { + withTable("tab1") { + val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) + try { + // create an empty hidden file + tableLoc.mkdir() + val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage") + hiddenGarbageFile.createNewFile() + var ex = intercept[AnalysisException] { + sql("CREATE TABLE tab1 USING PARQUET AS SELECT 1, 'a'") + }.getMessage + assert(ex.contains("Can not create the managed table('`tab1`'). The associated location")) + + ex = intercept[AnalysisException] { + sql("CREATE TABLE tab1 (col1 int, col2 string) USING PARQUET") + }.getMessage + assert(ex.contains( + "Can not create the managed table('`default`.`tab1`'). The associated location")) + } finally { + waitForTasksToFinish() + Utils.deleteRecursively(tableLoc) + } + } + } + test("Create Hive Table As Select") { import testImplicits._ withTable("t", "t1") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index db76ec9d084c..3039fdf0db62 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -437,6 +437,61 @@ class HiveDDLSuite } } + test("CTAS a managed table with the existing empty directory") { + val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) + try { + tableLoc.mkdir() + withTable("tab1") { + sql("CREATE TABLE tab1 USING HIVE AS SELECT 1, 'a'") + checkAnswer(spark.table("tab1"), Row(1, "a")) + } + } finally { + waitForTasksToFinish() + Utils.deleteRecursively(tableLoc) + } + } + + test("create a managed table with the existing empty directory") { + val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) + try { + tableLoc.mkdir() + withTable("tab1") { + sql("CREATE TABLE tab1 (col1 int, col2 string) USING HIVE") + sql("INSERT INTO tab1 VALUES (1, 'a')") + checkAnswer(spark.table("tab1"), Row(1, "a")) + } + } finally { + waitForTasksToFinish() + Utils.deleteRecursively(tableLoc) + } + } + + test("create a managed table with the existing non-empty directory") { + withTable("tab1") { + val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) + try { + // create an empty hidden file + tableLoc.mkdir() + val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage") + hiddenGarbageFile.createNewFile() + var ex = intercept[AnalysisException] { + sql("CREATE TABLE tab1 USING HIVE AS SELECT 1, 'a'") + }.getMessage + assert(ex.contains( + "Can not create the managed table('`default`.`tab1`'). The associated location")) + + ex = intercept[AnalysisException] { + sql("CREATE TABLE tab1 (col1 int, col2 string) USING HIVE") + }.getMessage + assert(ex.contains( + "Can not create the managed table('`default`.`tab1`'). The associated location")) + } finally { + waitForTasksToFinish() + Utils.deleteRecursively(tableLoc) + } + } + } + test("create table: partition column names exist in table definition") { val e = intercept[AnalysisException] { sql("CREATE TABLE tbl(a int) PARTITIONED BY (a string)") From f26c08c1638e2e0caef6283c20cab8364109f0e0 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 2 Apr 2018 08:34:34 +0800 Subject: [PATCH 2/7] address comments --- .../sql/catalyst/catalog/SessionCatalog.scala | 13 +- .../command/createDataSourceTables.scala | 2 +- .../sql/execution/command/DDLSuite.scala | 115 ++++++++++-------- .../sql/hive/execution/HiveDDLSuite.scala | 55 --------- 4 files changed, 69 insertions(+), 116 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index a0c0146c89f7..5a8d5d517fcb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -291,9 +291,6 @@ class SessionCatalog( val table = formatTableName(tableDefinition.identifier.table) val tableIdentifier = TableIdentifier(table, Some(db)) validateName(table) - if (!ignoreIfExists) { - validateTableLocation(tableDefinition, tableIdentifier) - } val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined && !tableDefinition.storage.locationUri.get.isAbsolute) { @@ -308,17 +305,21 @@ class SessionCatalog( } requireDbExists(db) + if (!ignoreIfExists) { + validateTableLocation(newTableDefinition) + } externalCatalog.createTable(newTableDefinition, ignoreIfExists) } - def validateTableLocation(table: CatalogTable, tableIdentifier: TableIdentifier): Unit = { + def validateTableLocation(table: CatalogTable): Unit = { // SPARK-19724: the default location of a managed table should be non-existent or empty. if (table.tableType == CatalogTableType.MANAGED) { - val tableLocation = new Path(defaultTablePath(tableIdentifier)) + val tableLocation = + new Path(table.storage.locationUri.getOrElse(defaultTablePath(table.identifier))) val fs = tableLocation.getFileSystem(hadoopConf) if (fs.exists(tableLocation) && fs.listStatus(tableLocation).nonEmpty) { - throw new AnalysisException(s"Can not create the managed table('${tableIdentifier}')" + + throw new AnalysisException(s"Can not create the managed table('${table.identifier}')" + s". The associated location('${tableLocation.toString}') already exists.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index f4bd2dcb2c13..f7c3e9b01925 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -167,7 +167,7 @@ case class CreateDataSourceTableAsSelectCommand( sparkSession, table, table.storage.locationUri, child, SaveMode.Append, tableExists = true) } else { assert(table.schema.isEmpty) - sparkSession.sessionState.catalog.validateTableLocation(table, table.identifier) + sparkSession.sessionState.catalog.validateTableLocation(table) val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { Some(sessionState.catalog.defaultTablePath(table.identifier)) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 060ea8b267c3..49487d89cbf2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -103,60 +103,6 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo } } - test("CTAS a managed table with the existing empty directory") { - val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) - try { - tableLoc.mkdir() - withTable("tab1") { - sql("CREATE TABLE tab1 USING PARQUET AS SELECT 1, 'a'") - checkAnswer(spark.table("tab1"), Row(1, "a")) - } - } finally { - waitForTasksToFinish() - Utils.deleteRecursively(tableLoc) - } - } - - test("create a managed table with the existing empty directory") { - val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) - try { - tableLoc.mkdir() - withTable("tab1") { - sql("CREATE TABLE tab1 (col1 int, col2 string) USING PARQUET") - sql("INSERT INTO tab1 VALUES (1, 'a')") - checkAnswer(spark.table("tab1"), Row(1, "a")) - } - } finally { - waitForTasksToFinish() - Utils.deleteRecursively(tableLoc) - } - } - - test("create a managed table with the existing non-empty directory") { - withTable("tab1") { - val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) - try { - // create an empty hidden file - tableLoc.mkdir() - val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage") - hiddenGarbageFile.createNewFile() - var ex = intercept[AnalysisException] { - sql("CREATE TABLE tab1 USING PARQUET AS SELECT 1, 'a'") - }.getMessage - assert(ex.contains("Can not create the managed table('`tab1`'). The associated location")) - - ex = intercept[AnalysisException] { - sql("CREATE TABLE tab1 (col1 int, col2 string) USING PARQUET") - }.getMessage - assert(ex.contains( - "Can not create the managed table('`default`.`tab1`'). The associated location")) - } finally { - waitForTasksToFinish() - Utils.deleteRecursively(tableLoc) - } - } - } - test("Create Hive Table As Select") { import testImplicits._ withTable("t", "t1") { @@ -234,6 +180,13 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { private val escapedIdentifier = "`(.+)`".r + private def dataSource: String = { + if (isUsingHiveMetastore) { + "HIVE" + } else { + "PARQUET" + } + } protected def normalizeCatalogTable(table: CatalogTable): CatalogTable = table private def normalizeSerdeProp(props: Map[String, String]): Map[String, String] = { @@ -419,6 +372,60 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } + test("CTAS a managed table with the existing empty directory") { + val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) + try { + tableLoc.mkdir() + withTable("tab1") { + sql(s"CREATE TABLE tab1 USING ${dataSource} AS SELECT 1, 'a'") + checkAnswer(spark.table("tab1"), Row(1, "a")) + } + } finally { + waitForTasksToFinish() + Utils.deleteRecursively(tableLoc) + } + } + + test("create a managed table with the existing empty directory") { + val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) + try { + tableLoc.mkdir() + withTable("tab1") { + sql(s"CREATE TABLE tab1 (col1 int, col2 string) USING ${dataSource}") + sql("INSERT INTO tab1 VALUES (1, 'a')") + checkAnswer(spark.table("tab1"), Row(1, "a")) + } + } finally { + waitForTasksToFinish() + Utils.deleteRecursively(tableLoc) + } + } + + test("create a managed table with the existing non-empty directory") { + withTable("tab1") { + val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) + try { + // create an empty hidden file + tableLoc.mkdir() + val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage") + hiddenGarbageFile.createNewFile() + var ex = intercept[AnalysisException] { + sql(s"CREATE TABLE tab1 USING ${dataSource} AS SELECT 1, 'a'") + }.getMessage + assert(ex.contains("Can not create the managed table('`tab1`'). The associated location")) + + ex = intercept[AnalysisException] { + sql(s"CREATE TABLE tab1 (col1 int, col2 string) USING ${dataSource}") + }.getMessage + assert(ex.contains( + "Can not create the managed table('`default`.`tab1`'). The associated location")) + } finally { + waitForTasksToFinish() + Utils.deleteRecursively(tableLoc) + } + } + } + private def checkSchemaInCreatedDataSourceTable( path: File, userSpecifiedSchema: Option[String], diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 3039fdf0db62..db76ec9d084c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -437,61 +437,6 @@ class HiveDDLSuite } } - test("CTAS a managed table with the existing empty directory") { - val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) - try { - tableLoc.mkdir() - withTable("tab1") { - sql("CREATE TABLE tab1 USING HIVE AS SELECT 1, 'a'") - checkAnswer(spark.table("tab1"), Row(1, "a")) - } - } finally { - waitForTasksToFinish() - Utils.deleteRecursively(tableLoc) - } - } - - test("create a managed table with the existing empty directory") { - val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) - try { - tableLoc.mkdir() - withTable("tab1") { - sql("CREATE TABLE tab1 (col1 int, col2 string) USING HIVE") - sql("INSERT INTO tab1 VALUES (1, 'a')") - checkAnswer(spark.table("tab1"), Row(1, "a")) - } - } finally { - waitForTasksToFinish() - Utils.deleteRecursively(tableLoc) - } - } - - test("create a managed table with the existing non-empty directory") { - withTable("tab1") { - val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) - try { - // create an empty hidden file - tableLoc.mkdir() - val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage") - hiddenGarbageFile.createNewFile() - var ex = intercept[AnalysisException] { - sql("CREATE TABLE tab1 USING HIVE AS SELECT 1, 'a'") - }.getMessage - assert(ex.contains( - "Can not create the managed table('`default`.`tab1`'). The associated location")) - - ex = intercept[AnalysisException] { - sql("CREATE TABLE tab1 (col1 int, col2 string) USING HIVE") - }.getMessage - assert(ex.contains( - "Can not create the managed table('`default`.`tab1`'). The associated location")) - } finally { - waitForTasksToFinish() - Utils.deleteRecursively(tableLoc) - } - } - } - test("create table: partition column names exist in table definition") { val e = intercept[AnalysisException] { sql("CREATE TABLE tbl(a int) PARTITIONED BY (a string)") From 97804384cf7cd2dceb1aa1af34ada843cf556c10 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 2 Apr 2018 16:22:00 +0800 Subject: [PATCH 3/7] fix test failure --- .../spark/sql/execution/command/DDLSuite.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 49487d89cbf2..4304d0b6f6b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -409,16 +409,22 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { tableLoc.mkdir() val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage") hiddenGarbageFile.createNewFile() + val exMsg = "Can not create the managed table('`tab1`'). The associated location" + val exMsgWithDefaultDB = + "Can not create the managed table('`default`.`tab1`'). The associated location" var ex = intercept[AnalysisException] { sql(s"CREATE TABLE tab1 USING ${dataSource} AS SELECT 1, 'a'") }.getMessage - assert(ex.contains("Can not create the managed table('`tab1`'). The associated location")) + if (isUsingHiveMetastore) { + assert(ex.contains(exMsgWithDefaultDB)) + } else { + assert(ex.contains(exMsg)) + } ex = intercept[AnalysisException] { sql(s"CREATE TABLE tab1 (col1 int, col2 string) USING ${dataSource}") }.getMessage - assert(ex.contains( - "Can not create the managed table('`default`.`tab1`'). The associated location")) + assert(ex.contains(exMsgWithDefaultDB)) } finally { waitForTasksToFinish() Utils.deleteRecursively(tableLoc) From 4d2cc31e10bc634f01f39d2fb22e8652e8b4b8e1 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 2 Apr 2018 16:49:15 +0800 Subject: [PATCH 4/7] add config spark.sql.allowNonemptyManagedTableLocation and modify migration guide --- docs/sql-programming-guide.md | 1 + .../spark/sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 9 +++++++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 2b393f30d143..60f36f89850b 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1809,6 +1809,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - Since Spark 2.4, expression IDs in UDF arguments do not appear in column names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` but ``UDF:f(col0 AS `colA`)``. - Since Spark 2.4, writing a dataframe with an empty or nested empty schema using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An exception is thrown when attempting to write dataframes with empty schema. - Since Spark 2.4, Spark compares a DATE type with a TIMESTAMP type after promotes both sides to TIMESTAMP. To set `false` to `spark.sql.hive.compareDateTimestampInTimestamp` restores the previous behavior. This option will be removed in Spark 3.0. + - Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.allowNonemptyManagedTableLocation` restores the previous behavior. This option will be removed in Spark 3.0. ## Upgrading From Spark SQL 2.2 to 2.3 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 5a8d5d517fcb..de0e9fe775ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -313,7 +313,7 @@ class SessionCatalog( def validateTableLocation(table: CatalogTable): Unit = { // SPARK-19724: the default location of a managed table should be non-existent or empty. - if (table.tableType == CatalogTableType.MANAGED) { + if (table.tableType == CatalogTableType.MANAGED && !conf.allowNonemptyManagedTableLocation) { val tableLocation = new Path(table.storage.locationUri.getOrElse(defaultTablePath(table.identifier))) val fs = tableLocation.getFileSystem(hadoopConf) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9cb03b5bb615..80d327447a03 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1152,6 +1152,13 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ALLOW_NONEMPTY_MANAGED_TABLE_LOCATION = + buildConf("spark.sql.allowNonemptyManagedTableLocation") + .doc("When this option is set to true, creating managed tables with nonempty location " + + "is allowed. Otherwise, an analysis exception is thrown. ") + .booleanConf + .createWithDefault(false) + val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE = buildConf("spark.sql.streaming.continuous.executorQueueSize") .internal() @@ -1572,6 +1579,8 @@ class SQLConf extends Serializable with Logging { def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING) + def allowNonemptyManagedTableLocation: Boolean = getConf(ALLOW_NONEMPTY_MANAGED_TABLE_LOCATION) + def partitionOverwriteMode: PartitionOverwriteMode.Value = PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE)) From 13b6633d8194d1cad71bea1ceb699716aa1ea402 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 2 Apr 2018 17:06:36 +0800 Subject: [PATCH 5/7] fix indent --- .../sql/catalyst/catalog/SessionCatalog.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index de0e9fe775ec..410048473c69 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -313,16 +313,16 @@ class SessionCatalog( def validateTableLocation(table: CatalogTable): Unit = { // SPARK-19724: the default location of a managed table should be non-existent or empty. - if (table.tableType == CatalogTableType.MANAGED && !conf.allowNonemptyManagedTableLocation) { - val tableLocation = - new Path(table.storage.locationUri.getOrElse(defaultTablePath(table.identifier))) - val fs = tableLocation.getFileSystem(hadoopConf) - - if (fs.exists(tableLocation) && fs.listStatus(tableLocation).nonEmpty) { - throw new AnalysisException(s"Can not create the managed table('${table.identifier}')" + - s". The associated location('${tableLocation.toString}') already exists.") - } - } + if (table.tableType == CatalogTableType.MANAGED && !conf.allowNonemptyManagedTableLocation) { + val tableLocation = + new Path(table.storage.locationUri.getOrElse(defaultTablePath(table.identifier))) + val fs = tableLocation.getFileSystem(hadoopConf) + + if (fs.exists(tableLocation) && fs.listStatus(tableLocation).nonEmpty) { + throw new AnalysisException(s"Can not create the managed table('${table.identifier}')" + + s". The associated location('${tableLocation.toString}') already exists.") + } + } } /** From 7a3311c2cbd3d9f7399abb38bd877bbd23ca836e Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 4 Apr 2018 07:08:17 +0800 Subject: [PATCH 6/7] fix scala style --- .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 410048473c69..e1b67a962562 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -317,7 +317,7 @@ class SessionCatalog( val tableLocation = new Path(table.storage.locationUri.getOrElse(defaultTablePath(table.identifier))) val fs = tableLocation.getFileSystem(hadoopConf) - + if (fs.exists(tableLocation) && fs.listStatus(tableLocation).nonEmpty) { throw new AnalysisException(s"Can not create the managed table('${table.identifier}')" + s". The associated location('${tableLocation.toString}') already exists.") From 2b2973a9db7a8fa228bfc939604feca4cc2c6a59 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 4 Apr 2018 13:35:27 +0800 Subject: [PATCH 7/7] address comments --- docs/sql-programming-guide.md | 2 +- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 3 ++- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 8 +++++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 60f36f89850b..9822d669050d 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1809,7 +1809,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - Since Spark 2.4, expression IDs in UDF arguments do not appear in column names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` but ``UDF:f(col0 AS `colA`)``. - Since Spark 2.4, writing a dataframe with an empty or nested empty schema using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An exception is thrown when attempting to write dataframes with empty schema. - Since Spark 2.4, Spark compares a DATE type with a TIMESTAMP type after promotes both sides to TIMESTAMP. To set `false` to `spark.sql.hive.compareDateTimestampInTimestamp` restores the previous behavior. This option will be removed in Spark 3.0. - - Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.allowNonemptyManagedTableLocation` restores the previous behavior. This option will be removed in Spark 3.0. + - Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.allowCreatingManagedTableUsingNonemptyLocation` restores the previous behavior. This option will be removed in Spark 3.0. ## Upgrading From Spark SQL 2.2 to 2.3 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index e1b67a962562..52ed89ef8d78 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -313,7 +313,8 @@ class SessionCatalog( def validateTableLocation(table: CatalogTable): Unit = { // SPARK-19724: the default location of a managed table should be non-existent or empty. - if (table.tableType == CatalogTableType.MANAGED && !conf.allowNonemptyManagedTableLocation) { + if (table.tableType == CatalogTableType.MANAGED && + !conf.allowCreatingManagedTableUsingNonemptyLocation) { val tableLocation = new Path(table.storage.locationUri.getOrElse(defaultTablePath(table.identifier))) val fs = tableLocation.getFileSystem(hadoopConf) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 80d327447a03..4daf966a6e6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1152,8 +1152,9 @@ object SQLConf { .booleanConf .createWithDefault(false) - val ALLOW_NONEMPTY_MANAGED_TABLE_LOCATION = - buildConf("spark.sql.allowNonemptyManagedTableLocation") + val ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION = + buildConf("spark.sql.allowCreatingManagedTableUsingNonemptyLocation") + .internal() .doc("When this option is set to true, creating managed tables with nonempty location " + "is allowed. Otherwise, an analysis exception is thrown. ") .booleanConf @@ -1579,7 +1580,8 @@ class SQLConf extends Serializable with Logging { def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING) - def allowNonemptyManagedTableLocation: Boolean = getConf(ALLOW_NONEMPTY_MANAGED_TABLE_LOCATION) + def allowCreatingManagedTableUsingNonemptyLocation: Boolean = + getConf(ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION) def partitionOverwriteMode: PartitionOverwriteMode.Value = PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))