From 6708db68d0a9008e74119f0fc90af02eaf1273d4 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 15 Jan 2020 21:38:30 +0800 Subject: [PATCH 01/15] tmp --- .../spark/sql/catalyst/parser/SqlBase.g4 | 2 + .../sql/connector/catalog/TableCatalog.java | 13 +++- .../sql/catalyst/catalog/interface.scala | 3 +- .../sql/catalyst/parser/AstBuilder.scala | 17 +++++ .../catalyst/plans/logical/v2Commands.scala | 10 +++ .../sql/catalyst/parser/DDLParserSuite.scala | 13 ++++ .../datasources/v2/CreateTableExec.scala | 7 ++- .../datasources/v2/DataSourceV2Strategy.scala | 5 ++ .../v2/DescribeNamespaceExec.scala | 2 +- .../datasources/v2/V2SessionCatalog.scala | 4 +- .../command/PlanResolutionSuite.scala | 25 ++++++-- .../sql/hive/client/HiveClientImpl.scala | 21 ++++--- .../spark/sql/hive/client/HiveShim.scala | 28 +++++++++ .../sql/hive/execution/HiveDDLSuite.scala | 62 ++++++++++++++++--- 14 files changed, 185 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 645d0d709e063..77252e5c69623 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -181,6 +181,8 @@ statement DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions | ALTER TABLE multipartIdentifier (partitionSpec)? SET locationSpec #setTableLocation + | ALTER TABLE multipartIdentifier + SET OWNER ownerType=(USER | ROLE | GROUP) identifier #setTableOwner | ALTER TABLE multipartIdentifier RECOVER PARTITIONS #recoverPartitions | DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable | DROP VIEW (IF EXISTS)? multipartIdentifier #dropView diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 32c6f8f2cde16..04e87d9560f1a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -56,10 +56,21 @@ public interface TableCatalog extends CatalogPlugin { */ String PROP_PROVIDER = "provider"; + /** + * A property to specify the owner of the table. + */ + String PROP_OWNER_NAME = "ownerName"; + + /** + * A property to specify the type of the table's owner. + */ + String PROP_OWNER_TYPE = "ownerType"; + /** * The list of reserved table properties. */ - List RESERVED_PROPERTIES = Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER); + List RESERVED_PROPERTIES = + Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER, PROP_OWNER_NAME, PROP_OWNER_TYPE); /** * List the tables in a namespace from the catalog. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index d51690367bf35..af4bd2a068fad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.quoteIdentifier -import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -344,6 +344,7 @@ case class CatalogTable( identifier.database.foreach(map.put("Database", _)) map.put("Table", identifier.table) if (owner != null && owner.nonEmpty) map.put("Owner", owner) + properties.get(TableCatalog.PROP_OWNER_TYPE).foreach(map.put("Owner Type", _)) map.put("Created Time", new Date(createTime).toString) map.put("Last Access", lastAccess) map.put("Created By", "Spark " + createVersion) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 5ddeff50e47b4..c6148cd932026 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3622,4 +3622,21 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create an [[AlterTableSetOwner]] logical plan. + * + * For example: + * {{{ + * ALTER TABLE tableName SET OWNER (USER|ROLE|GROUP) identityName; + * }}} + */ + override def visitSetTableOwner(ctx: SetTableOwnerContext): LogicalPlan = { + withOrigin(ctx) { + val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier) + AlterTableSetOwner( + UnresolvedTable(nameParts), + ctx.identifier.getText, + ctx.ownerType.getText) + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index e98b2cf7abfcc..8d3b8b5594d7b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -302,6 +302,16 @@ case class AlterNamespaceSetOwner( override def children: Seq[LogicalPlan] = child :: Nil } +/** + * ALTER TABLE ... SET OWNER command, as parsed from SQL. + */ +case class AlterTableSetOwner( + child: LogicalPlan, + ownerName: String, + ownerType: String) extends Command { + override def children: Seq[LogicalPlan] = child :: Nil +} + /** * The logical plan of the SHOW NAMESPACES command that works for v2 catalogs. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 3a4c08235731e..3045c3a12b1f9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1356,6 +1356,19 @@ class DDLParserSuite extends AnalysisTest { AlterNamespaceSetOwner(UnresolvedNamespace(Seq("a", "b", "c")), "group1", "GROUP")) } + test("set table owner") { + comparePlans( + parsePlan("ALTER TABLE a.b.c SET OWNER USER user1"), + AlterTableSetOwner(UnresolvedTable(Seq("a", "b", "c")), "user1", "USER")) + + comparePlans( + parsePlan("ALTER TABLE a.b.c SET OWNER ROLE role1"), + AlterTableSetOwner(UnresolvedTable(Seq("a", "b", "c")), "role1", "ROLE")) + comparePlans( + parsePlan("ALTER TABLE a.b.c SET OWNER GROUP group1"), + AlterTableSetOwner(UnresolvedTable(Seq("a", "b", "c")), "group1", "GROUP")) + } + test("show databases: basic") { comparePlans( parsePlan("SHOW DATABASES"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala index 511cd8a9a438f..efb00df0a64b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils case class CreateTableExec( catalog: TableCatalog, @@ -38,7 +39,11 @@ case class CreateTableExec( override protected def run(): Seq[InternalRow] = { if (!catalog.tableExists(identifier)) { try { - catalog.createTable(identifier, tableSchema, partitioning.toArray, tableProperties.asJava) + val ownership = Map( + TableCatalog.PROP_OWNER_NAME -> Utils.getCurrentUserName(), + TableCatalog.PROP_OWNER_TYPE -> "USER") + catalog.createTable(identifier, tableSchema, partitioning.toArray, + (tableProperties ++ ownership).asJava) } catch { case _: TableAlreadyExistsException if ignoreIfExists => logWarning(s"Table ${identifier.quoted} was created concurrently. Ignoring.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index b452b66e03813..940ebf4d1d9e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -257,6 +257,11 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { Map(SupportsNamespaces.PROP_OWNER_NAME -> name, SupportsNamespaces.PROP_OWNER_TYPE -> typ) AlterNamespaceSetPropertiesExec(catalog, namespace, properties) :: Nil + case AlterTableSetOwner(ResolvedTable(catalog, ident, _), name, typ) => + val changes = TableChange.setProperty(TableCatalog.PROP_OWNER_NAME, name) :: + TableChange.setProperty(TableCatalog.PROP_OWNER_TYPE, typ) :: Nil + AlterTableExec(catalog, ident, changes) :: Nil + case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala index 979d740efa8ff..1cc04fa30d077 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala @@ -50,7 +50,7 @@ case class DescribeNamespaceExec( rows += toCatalystRow("Location", _) } Option(metadata.get(PROP_OWNER_NAME)).foreach { - rows += toCatalystRow("Owner Name", _) + rows += toCatalystRow("Owner", _) } Option(metadata.get(PROP_OWNER_TYPE)).foreach { rows += toCatalystRow("Owner Type", _) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index ddb2926eb6c9a..2940b9b409cfc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -125,10 +125,12 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes) val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes) val comment = properties.get(TableCatalog.PROP_COMMENT) + val owner = properties.getOrElse(TableCatalog.PROP_OWNER_NAME, catalogTable.owner) try { catalog.alterTable( - catalogTable.copy(properties = properties, schema = schema, comment = comment)) + catalogTable + .copy(properties = properties, schema = schema, owner = owner, comment = comment)) } catch { case _: NoSuchTableException => throw new NoSuchTableException(ident) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 0901c66cccceb..d430f6ec81926 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{CharType, DoubleType, HIVE_TYPE_STRING, IntegerType, LongType, MetadataBuilder, StringType, StructField, StructType} +import org.apache.spark.util.Utils class PlanResolutionSuite extends AnalysisTest { import CatalystSqlParser._ @@ -401,7 +402,9 @@ class PlanResolutionSuite extends AnalysisTest { "other" -> "20", "provider" -> "parquet", "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment") + "comment" -> "table comment", + "ownerName" -> Utils.getCurrentUserName(), + "ownerType" -> "USER") parseAndResolve(sql) match { case create: CreateV2Table => @@ -440,7 +443,9 @@ class PlanResolutionSuite extends AnalysisTest { "other" -> "20", "provider" -> "parquet", "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment") + "comment" -> "table comment", + "ownerName" -> Utils.getCurrentUserName(), + "ownerType" -> "USER") parseAndResolve(sql, withDefault = true) match { case create: CreateV2Table => @@ -478,7 +483,9 @@ class PlanResolutionSuite extends AnalysisTest { "p2" -> "v2", "provider" -> v2Format, "location" -> "/user/external/page_view", - "comment" -> "This is the staging page view table") + "comment" -> "This is the staging page view table", + "ownerName" -> Utils.getCurrentUserName(), + "ownerType" -> "USER") parseAndResolve(sql) match { case create: CreateV2Table => @@ -515,7 +522,9 @@ class PlanResolutionSuite extends AnalysisTest { "other" -> "20", "provider" -> "parquet", "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment") + "comment" -> "table comment", + "ownerName" -> Utils.getCurrentUserName(), + "ownerType" -> "USER") parseAndResolve(sql) match { case ctas: CreateTableAsSelect => @@ -549,7 +558,9 @@ class PlanResolutionSuite extends AnalysisTest { "other" -> "20", "provider" -> "parquet", "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment") + "comment" -> "table comment", + "ownerName" -> Utils.getCurrentUserName(), + "ownerType" -> "USER") parseAndResolve(sql, withDefault = true) match { case ctas: CreateTableAsSelect => @@ -582,7 +593,9 @@ class PlanResolutionSuite extends AnalysisTest { "p2" -> "v2", "provider" -> v2Format, "location" -> "/user/external/page_view", - "comment" -> "This is the staging page view table") + "comment" -> "This is the staging page view table", + "ownerName" -> Utils.getCurrentUserName(), + "ownerType" -> "USER") parseAndResolve(sql) match { case ctas: CreateTableAsSelect => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index f196e94a83f97..0904b076f184d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -54,6 +54,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX} @@ -512,7 +513,7 @@ private[hive] class HiveClientImpl( val filteredProperties = properties.filterNot { case (key, _) => excludedTableProperties.contains(key) - } + } ++ Option(shim.getTableOwnerType(h)).map(TableCatalog.PROP_OWNER_TYPE -> _) val comment = properties.get("comment") CatalogTable( @@ -569,7 +570,10 @@ private[hive] class HiveClientImpl( override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState { verifyColumnDataType(table.dataSchema) - client.createTable(toHiveTable(table, Some(userName)), ignoreIfExists) + val hiveTable = toHiveTable(table, Some(userName)) + shim.setTableOwnerType(hiveTable, + table.properties.getOrElse(TableCatalog.PROP_OWNER_TYPE, PrincipalType.USER.name)) + client.createTable(hiveTable, ignoreIfExists) } override def dropTable( @@ -590,7 +594,8 @@ private[hive] class HiveClientImpl( // these user-specified values. verifyColumnDataType(table.dataSchema) val hiveTable = toHiveTable( - table.copy(properties = table.ignoredProperties ++ table.properties), Some(userName)) + table.copy(properties = table.ignoredProperties ++ table.properties)) + table.properties.get(TableCatalog.PROP_OWNER_TYPE).foreach(shim.setTableOwnerType(hiveTable, _)) // Do not use `table.qualifiedName` here because this may be a rename val qualifiedTableName = s"$dbName.$tableName" shim.alterTable(client, qualifiedTableName, hiveTable) @@ -687,7 +692,7 @@ private[hive] class HiveClientImpl( newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState { require(specs.size == newSpecs.size, "number of old and new partition specs differ") val catalogTable = getTable(db, table) - val hiveTable = toHiveTable(catalogTable, Some(userName)) + val hiveTable = toHiveTable(catalogTable) specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => val hivePart = getPartitionOption(catalogTable, oldSpec) .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) } @@ -707,7 +712,7 @@ private[hive] class HiveClientImpl( val original = state.getCurrentDatabase try { setCurrentDatabaseRaw(db) - val hiveTable = toHiveTable(getTable(db, table), Some(userName)) + val hiveTable = toHiveTable(getTable(db, table)) shim.alterPartitions(client, table, newParts.map { toHivePartition(_, hiveTable) }.asJava) } finally { state.setCurrentDatabase(original) @@ -738,7 +743,7 @@ private[hive] class HiveClientImpl( override def getPartitionOption( table: CatalogTable, spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table, Some(userName)) + val hiveTable = toHiveTable(table) val hivePartition = client.getPartition(hiveTable, spec.asJava, false) Option(hivePartition).map(fromHivePartition) } @@ -750,7 +755,7 @@ private[hive] class HiveClientImpl( override def getPartitions( table: CatalogTable, spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table, Some(userName)) + val hiveTable = toHiveTable(table) val partSpec = spec match { case None => CatalogTypes.emptyTablePartitionSpec case Some(s) => @@ -765,7 +770,7 @@ private[hive] class HiveClientImpl( override def getPartitionsByFilter( table: CatalogTable, predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table, Some(userName)) + val hiveTable = toHiveTable(table) val parts = shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index d8078a534a874..189c70cc39322 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -161,6 +161,10 @@ private[client] sealed abstract class Shim { def setDatabaseOwnerType(db: Database, ownerType: String): Unit + def getTableOwnerType(table: Table): String + + def setTableOwnerType(table: Table, ownerType: String): Unit + protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = { val method = findMethod(klass, name, args: _*) require(Modifier.isStatic(method.getModifiers()), @@ -471,6 +475,10 @@ private[client] class Shim_v0_12 extends Shim with Logging { override def getDatabaseOwnerType(db: Database): String = "" override def setDatabaseOwnerType(db: Database, ownerType: String): Unit = {} + + override def getTableOwnerType(table: Table): String = "" + + override def setTableOwnerType(table: Table, ownerType: String): Unit = {} } private[client] class Shim_v0_13 extends Shim_v0_12 { @@ -1312,6 +1320,17 @@ private[client] class Shim_v3_0 extends Shim_v2_3 { classOf[AcidUtils.Operation], JBoolean.TYPE) + private lazy val getTableOwnerTypeMethod = + findMethod( + classOf[Table], + "getOwnerType") + + private lazy val setTableOwnerTypeMethod = + findMethod( + classOf[Table], + "setOwnerType", + classOf[PrincipalType]) + override def loadPartition( hive: Hive, loadPath: Path, @@ -1373,6 +1392,15 @@ private[client] class Shim_v3_0 extends Shim_v2_3 { stmtIdInLoadTableOrPartition, hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID, replace: JBoolean) } + + override def getTableOwnerType(table: Table): String = { + Option(getTableOwnerTypeMethod.invoke(table)) + .map(_.asInstanceOf[PrincipalType].name()).getOrElse("") + } + + override def setTableOwnerType(table: Table, ownerType: String): Unit = { + setTableOwnerTypeMethod.invoke(table, PrincipalType.valueOf(ownerType)) + } } private[client] class Shim_v3_1 extends Shim_v3_0 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index b3f7fc4d0557e..544dfa2aef15b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -375,7 +375,8 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA } } - private def checkOwner(db: String, expectedOwnerName: String, expectedOwnerType: String): Unit = { + private def checkDbOwner(db: String, + expectedOwnerName: String, expectedOwnerType: String): Unit = { val df = sql(s"DESCRIBE DATABASE EXTENDED $db") val owner = df.where("database_description_item='Owner Name'") .collect().head.getString(1) @@ -394,26 +395,71 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA val currentUser = Utils.getCurrentUserName() sql(s"CREATE DATABASE $db1") - checkOwner(db1, currentUser, "USER") + checkDbOwner(db1, currentUser, "USER") sql(s"ALTER DATABASE $db1 SET DBPROPERTIES ('a'='a')") - checkOwner(db1, currentUser, "USER") + checkDbOwner(db1, currentUser, "USER") val e = intercept[ParseException](sql(s"ALTER DATABASE $db1 SET DBPROPERTIES ('a'='a'," + s"'ownerName'='$owner','ownerType'='XXX')")) assert(e.getMessage.contains("ownerName")) sql(s"ALTER DATABASE $db1 SET OWNER ROLE $owner") - checkOwner(db1, owner, "ROLE") + checkDbOwner(db1, owner, "ROLE") val e2 = intercept[ParseException]( sql(s"CREATE DATABASE $db2 WITH DBPROPERTIES('ownerName'='$owner')")) assert(e2.getMessage.contains("ownerName")) sql(s"CREATE DATABASE $db2") - checkOwner(db2, currentUser, "USER") + checkDbOwner(db2, currentUser, "USER") sql(s"ALTER DATABASE $db2 SET OWNER GROUP $owner") - checkOwner(db2, owner, "GROUP") + checkDbOwner(db2, owner, "GROUP") sql(s"ALTER DATABASE $db2 SET OWNER GROUP `$owner`") - checkOwner(db2, owner, "GROUP") + checkDbOwner(db2, owner, "GROUP") sql(s"ALTER DATABASE $db2 SET OWNER GROUP OWNER") - checkOwner(db2, "OWNER", "GROUP") + checkDbOwner(db2, "OWNER", "GROUP") + } finally { + catalog.reset() + } + } + + private def checkTblOwner(table: String, + expectedOwnerName: String, expectedOwnerType: String): Unit = { + val df = sql(s"DESCRIBE TABLE EXTENDED $table") + val owner = df.where("col_name='Owner'") + .collect().head.getString(1) + val typ = df.where("col_name='Owner Type'") + .collect().head.getString(1) + assert(owner === expectedOwnerName) + assert(typ === expectedOwnerType) + } + + test("Table Ownership") { + val catalog = spark.sessionState.catalog + try { + val table1 = "spark_30019_1" + val table2 = "spark_30019_2" + val owner = "spark_30019" + val currentUser = Utils.getCurrentUserName() + + sql(s"CREATE TABLE $table1(k int)") + checkTblOwner(table1, currentUser, "") + sql(s"ALTER TABLE $table1 SET TBLPROPERTIES ('a'='a')") + checkTblOwner(table1, currentUser, "") +// val e = intercept[ParseException](sql(s"ALTER TABLE $table1 SET TBLPROPERTIES ('a'='a'," +// + s"'ownerName'='$owner','ownerType'='XXX')")) +// assert(e.getMessage.contains("ownerName")) + sql(s"ALTER TABLE $table1 SET OWNER ROLE $owner") + checkTblOwner(table1, owner, "") + +// val e2 = intercept[ParseException]( +// sql(s"CREATE TABLE $table2 WITH TBLPROPERTIES('ownerName'='$owner')")) +// assert(e2.getMessage.contains("ownerName")) + sql(s"CREATE TABLE $table2(k int)") + checkTblOwner(table2, currentUser, "") + sql(s"ALTER TABLE $table2 SET OWNER GROUP $owner") + checkTblOwner(table2, owner, "") + sql(s"ALTER TABLE $table2 SET OWNER ROLE `$owner`") + checkTblOwner(table2, owner, "") + sql(s"ALTER TABLE $table2 SET OWNER USER OWNER") + checkTblOwner(table2, "OWNER", "") } finally { catalog.reset() } From 30b91f545918bdc52e01803c93a719cabf579525 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 17 Jan 2020 11:56:30 +0800 Subject: [PATCH 02/15] [SPARK-30019][SQL] Add ALTER TABLE SET OWNER syntax --- .../sql/catalyst/catalog/interface.scala | 3 +- .../sql/catalyst/parser/AstBuilder.scala | 7 +++ .../sql/connector/catalog/CatalogV2Util.scala | 7 +++ .../datasources/v2/CreateTableExec.scala | 8 ++-- .../v2/DescribeNamespaceExec.scala | 2 +- .../v2/WriteToDataSourceV2Exec.scala | 13 ++--- .../spark/sql/connector/AlterTableTests.scala | 28 +++++++++-- .../sql/connector/DataSourceV2SQLSuite.scala | 48 +++++++++++-------- .../sql/execution/command/DDLSuite.scala | 9 +++- .../command/PlanResolutionSuite.scala | 24 +++------- .../sql/hive/client/HiveClientImpl.scala | 8 ++-- .../spark/sql/hive/client/HiveShim.scala | 17 ------- .../sql/hive/execution/HiveDDLSuite.scala | 27 ++++++----- 13 files changed, 109 insertions(+), 92 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index af4bd2a068fad..5fa3c87c68ccb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -335,7 +335,8 @@ case class CatalogTable( def toLinkedHashMap: mutable.LinkedHashMap[String, String] = { val map = new mutable.LinkedHashMap[String, String]() - val tableProperties = properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") + val tableProperties = properties.filterKeys(_ != TableCatalog.PROP_OWNER_TYPE) + .map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") val partitionColumns = partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") val lastAccess = { if (lastAccessTime <= 0) "UNKNOWN" else new Date(lastAccessTime).toString diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c6148cd932026..28facf930b97b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2680,6 +2680,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging throw new ParseException(s"$PROP_LOCATION is a reserved table property, please use" + s" the LOCATION clause to specify it.", ctx) case (PROP_LOCATION, _) => false + case (ownership, _) if ownership == PROP_OWNER_NAME || ownership == PROP_OWNER_TYPE => + if (legacyOn) { + false + } else { + throw new ParseException(s"$ownership is a reserved table property , please use" + + " ALTER TABLE ... SET OWNER ... to specify it.", ctx) + } case _ => true } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index f8fc4bc7ce23f..5c0814090434d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils private[sql] object CatalogV2Util { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -276,6 +277,12 @@ private[sql] object CatalogV2Util { location.map(TableCatalog.PROP_LOCATION -> _) } + def withDefaultOwnership(properties: Map[String, String]): Map[String, String] = { + properties ++ + Map(TableCatalog.PROP_OWNER_NAME -> Utils.getCurrentUserName(), + TableCatalog.PROP_OWNER_TYPE -> "USER") + } + def createAlterTable( originalNameParts: Seq[String], catalog: CatalogPlugin, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala index efb00df0a64b3..812ff01a109fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala @@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType -import org.apache.spark.util.Utils case class CreateTableExec( catalog: TableCatalog, @@ -39,11 +39,9 @@ case class CreateTableExec( override protected def run(): Seq[InternalRow] = { if (!catalog.tableExists(identifier)) { try { - val ownership = Map( - TableCatalog.PROP_OWNER_NAME -> Utils.getCurrentUserName(), - TableCatalog.PROP_OWNER_TYPE -> "USER") + val propertiesWithOwner = withDefaultOwnership(tableProperties) catalog.createTable(identifier, tableSchema, partitioning.toArray, - (tableProperties ++ ownership).asJava) + propertiesWithOwner.asJava) } catch { case _: TableAlreadyExistsException if ignoreIfExists => logWarning(s"Table ${identifier.quoted} was created concurrently. Ignoring.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala index 1cc04fa30d077..979d740efa8ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala @@ -50,7 +50,7 @@ case class DescribeNamespaceExec( rows += toCatalystRow("Location", _) } Option(metadata.get(PROP_OWNER_NAME)).foreach { - rows += toCatalystRow("Owner", _) + rows += toCatalystRow("Owner Name", _) } Option(metadata.get(PROP_OWNER_TYPE)).foreach { rows += toCatalystRow("Owner Type", _) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index e360a9e656a16..7e9e8b59e619f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -31,13 +31,13 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, SupportsWrite, TableCatalog} +import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, LogicalWriteInfoImpl, PhysicalWriteInfoImpl, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.sources.{AlwaysTrue, Filter} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.{LongAccumulator, Utils} - /** * Deprecated logical plan for writing data into data source v2. This is being replaced by more * specific logical plans, like [[org.apache.spark.sql.catalyst.plans.logical.AppendData]]. @@ -82,7 +82,7 @@ case class CreateTableAsSelectExec( Utils.tryWithSafeFinallyAndFailureCallbacks({ val schema = query.schema.asNullable catalog.createTable( - ident, schema, partitioning.toArray, properties.asJava) match { + ident, schema, partitioning.toArray, withDefaultOwnership(properties).asJava) match { case table: SupportsWrite => val info = LogicalWriteInfoImpl( queryId = UUID.randomUUID().toString, @@ -134,7 +134,7 @@ case class AtomicCreateTableAsSelectExec( throw new TableAlreadyExistsException(ident) } val stagedTable = catalog.stageCreate( - ident, query.schema.asNullable, partitioning.toArray, properties.asJava) + ident, query.schema.asNullable, partitioning.toArray, withDefaultOwnership(properties).asJava) writeToStagedTable(stagedTable, writeOptions, ident) } } @@ -177,7 +177,7 @@ case class ReplaceTableAsSelectExec( } val schema = query.schema.asNullable val createdTable = catalog.createTable( - ident, schema, partitioning.toArray, properties.asJava) + ident, schema, partitioning.toArray, withDefaultOwnership(properties).asJava) Utils.tryWithSafeFinallyAndFailureCallbacks({ createdTable match { case table: SupportsWrite => @@ -227,13 +227,14 @@ case class AtomicReplaceTableAsSelectExec( override protected def doExecute(): RDD[InternalRow] = { val schema = query.schema.asNullable + val propertiesWithOwner = withDefaultOwnership(properties).asJava val staged = if (orCreate) { catalog.stageCreateOrReplace( - ident, schema, partitioning.toArray, properties.asJava) + ident, schema, partitioning.toArray, propertiesWithOwner) } else if (catalog.tableExists(ident)) { try { catalog.stageReplace( - ident, schema, partitioning.toArray, properties.asJava) + ident, schema, partitioning.toArray, propertiesWithOwner) } catch { case e: NoSuchTableException => throw new CannotReplaceMissingTableException(ident, Some(e)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index d304d5b2ca6a2..334f4c56f751a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -945,7 +946,7 @@ trait AlterTableTests extends SharedSparkSession { assert(table.name === fullTableName(t)) assert(table.properties === - Map("provider" -> v2Format, "location" -> "s3://bucket/path").asJava) + withDefaultOwnership(Map("provider" -> v2Format, "location" -> "s3://bucket/path")).asJava) } } @@ -971,7 +972,8 @@ trait AlterTableTests extends SharedSparkSession { val table = getTableMetadata(t) assert(table.name === fullTableName(t)) - assert(table.properties === Map("provider" -> v2Format, "test" -> "34").asJava) + assert(table.properties === + withDefaultOwnership(Map("provider" -> v2Format, "test" -> "34")).asJava) } } @@ -983,15 +985,33 @@ trait AlterTableTests extends SharedSparkSession { val table = getTableMetadata(t) assert(table.name === fullTableName(t)) - assert(table.properties === Map("provider" -> v2Format, "test" -> "34").asJava) + assert(table.properties === + withDefaultOwnership(Map("provider" -> v2Format, "test" -> "34")).asJava) sql(s"ALTER TABLE $t UNSET TBLPROPERTIES ('test')") val updated = getTableMetadata(t) assert(updated.name === fullTableName(t)) - assert(updated.properties === Map("provider" -> v2Format).asJava) + assert(updated.properties === withDefaultOwnership(Map("provider" -> v2Format)).asJava) } } + test("AlterTable: set table owner") { + val t = s"${catalogAndNamespace}table_name" + withTable(t) { + sql(s"CREATE TABLE $t (id int) USING $v2Format") + assert(getTableMetadata(t).properties === + withDefaultOwnership(Map("provider" -> v2Format)).asJava) + sql(s"ALTER TABLE $t SET OWNER ROLE kent") + assert(getTableMetadata(t).properties === + Map("provider" -> v2Format, "ownerName" -> "kent", "ownerType" -> "ROLE").asJava) + sql(s"ALTER TABLE $t SET OWNER GROUP yao") + assert(getTableMetadata(t).properties === + Map("provider" -> v2Format, "ownerName" -> "yao", "ownerType" -> "GROUP").asJava) + sql(s"ALTER TABLE $t SET OWNER USER ming") + assert(getTableMetadata(t).properties === + Map("provider" -> v2Format, "ownerName" -> "ming", "ownerType" -> "USER").asJava) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 4ee8a6803ea5e..a71d90090185c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME +import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.SimpleScanSource @@ -41,6 +42,7 @@ class DataSourceV2SQLSuite private val v2Source = classOf[FakeV2Provider].getName override protected val v2Format = v2Source override protected val catalogAndNamespace = "testcat.ns1.ns2." + private val defaultUser: String = Utils.getCurrentUserName() private def catalog(name: String): CatalogPlugin = { spark.sessionState.catalogManager.catalog(name) @@ -94,7 +96,7 @@ class DataSourceV2SQLSuite assert(table.name == "testcat.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType() .add("id", LongType, nullable = false) .add("data", StringType)) @@ -160,6 +162,8 @@ class DataSourceV2SQLSuite Array("Comment", "this is a test table", ""), Array("Location", "/tmp/testcat/table_name", ""), Array("Provider", "foo", ""), + Array("OwnerName", defaultUser, ""), + Array("OwnerType", "USER", ""), Array("Table Properties", "[bar=baz]", ""))) } @@ -172,7 +176,7 @@ class DataSourceV2SQLSuite assert(table.name == "default.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> v2Source).asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> v2Source)).asJava) assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) @@ -187,7 +191,7 @@ class DataSourceV2SQLSuite val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table.name == "testcat.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) // run a second create query that should fail @@ -201,7 +205,7 @@ class DataSourceV2SQLSuite val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table2.name == "testcat.table_name") assert(table2.partitioning.isEmpty) - assert(table2.properties == Map("provider" -> "foo").asJava) + assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table2.schema == new StructType().add("id", LongType).add("data", StringType)) // check that the table is still empty @@ -218,7 +222,7 @@ class DataSourceV2SQLSuite assert(table.name == "testcat.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) spark.sql("CREATE TABLE IF NOT EXISTS testcat.table_name (id bigint, data string) USING bar") @@ -227,7 +231,7 @@ class DataSourceV2SQLSuite val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table2.name == "testcat.table_name") assert(table2.partitioning.isEmpty) - assert(table2.properties == Map("provider" -> "foo").asJava) + assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table2.schema == new StructType().add("id", LongType).add("data", StringType)) // check that the table is still empty @@ -244,7 +248,7 @@ class DataSourceV2SQLSuite assert(table.name == "testcat.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) // check that the table is empty @@ -266,7 +270,7 @@ class DataSourceV2SQLSuite assert(table.name == identifier) assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType() .add("id", LongType) .add("data", StringType)) @@ -293,7 +297,7 @@ class DataSourceV2SQLSuite assert(replacedTable != originalTable, "Table should have been replaced.") assert(replacedTable.name == identifier) assert(replacedTable.partitioning.isEmpty) - assert(replacedTable.properties == Map("provider" -> "foo").asJava) + assert(replacedTable.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(replacedTable.schema == new StructType().add("id", LongType)) val rdd = spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows) @@ -431,7 +435,7 @@ class DataSourceV2SQLSuite assert(table.name == "default.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> v2Source).asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> v2Source)).asJava) assert(table.schema == new StructType() .add("id", LongType) .add("data", StringType)) @@ -448,7 +452,7 @@ class DataSourceV2SQLSuite val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table.name == "testcat.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType() .add("id", LongType) .add("data", StringType)) @@ -468,7 +472,7 @@ class DataSourceV2SQLSuite val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table2.name == "testcat.table_name") assert(table2.partitioning.isEmpty) - assert(table2.properties == Map("provider" -> "foo").asJava) + assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table2.schema == new StructType() .add("id", LongType) .add("data", StringType)) @@ -486,7 +490,7 @@ class DataSourceV2SQLSuite assert(table.name == "testcat.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType() .add("id", LongType) .add("data", StringType)) @@ -517,7 +521,7 @@ class DataSourceV2SQLSuite assert(table.name == "testcat.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType() .add("id", LongType) .add("data", StringType)) @@ -557,7 +561,7 @@ class DataSourceV2SQLSuite assert(table.name == identifier) assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType().add("i", "int")) val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) @@ -1059,7 +1063,7 @@ class DataSourceV2SQLSuite Row("Namespace Name", "ns2"), Row("Description", "test namespace"), Row("Location", "/tmp/ns_test"), - Row("Owner Name", Utils.getCurrentUserName()), + Row("Owner Name", defaultUser), Row("Owner Type", "USER") )) } @@ -1075,7 +1079,7 @@ class DataSourceV2SQLSuite Row("Namespace Name", "ns2"), Row("Description", "test namespace"), Row("Location", "/tmp/ns_test"), - Row("Owner Name", Utils.getCurrentUserName()), + Row("Owner Name", defaultUser), Row("Owner Type", "USER"), Row("Properties", "((a,b),(b,a),(c,c))") )) @@ -1123,7 +1127,7 @@ class DataSourceV2SQLSuite Row("Namespace Name", "ns2"), Row("Description", "test namespace"), Row("Location", "/tmp/ns_test_2"), - Row("Owner Name", Utils.getCurrentUserName()), + Row("Owner Name", defaultUser), Row("Owner Type", "USER") )) } @@ -1929,7 +1933,7 @@ class DataSourceV2SQLSuite spark.sql(s"CREATE TABLE $t (id bigint, data string) USING $provider " + s"TBLPROPERTIES ('owner'='$owner', 'status'='$status')") - val properties = sql(s"SHOW TBLPROPERTIES $t") + val properties = sql(s"SHOW TBLPROPERTIES $t").orderBy("key") val schema = new StructType() .add("key", StringType, nullable = false) @@ -1937,8 +1941,10 @@ class DataSourceV2SQLSuite val expected = Seq( Row("owner", owner), - Row("status", status), - Row("provider", provider)) + Row("ownerName", defaultUser), + Row("ownerType", "USER"), + Row("provider", provider), + Row("status", status)) assert(properties.schema === schema) assert(expected === properties.collect()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 11c6487e25e90..a4a15c54e740f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -21,6 +21,8 @@ import java.io.{File, PrintWriter} import java.net.URI import java.util.Locale +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.{Path, RawLocalFileSystem} import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, AclStatus, FsAction, FsPermission} @@ -33,6 +35,7 @@ import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseE import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.SupportsNamespaces.{PROP_OWNER_NAME, PROP_OWNER_TYPE} +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} @@ -1374,11 +1377,12 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { createDatabase(catalog, "dbx") createTable(catalog, tableIdent, isDatasourceTable) def getProps: Map[String, String] = { - if (isUsingHiveMetastore) { + val props = if (isUsingHiveMetastore) { normalizeCatalogTable(catalog.getTableMetadata(tableIdent)).properties } else { catalog.getTableMetadata(tableIdent).properties } + props -- TableCatalog.RESERVED_PROPERTIES.asScala } assert(getProps.isEmpty) // set table properties @@ -1403,11 +1407,12 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { createDatabase(catalog, "dbx") createTable(catalog, tableIdent, isDatasourceTable) def getProps: Map[String, String] = { - if (isUsingHiveMetastore) { + val props = if (isUsingHiveMetastore) { normalizeCatalogTable(catalog.getTableMetadata(tableIdent)).properties } else { catalog.getTableMetadata(tableIdent).properties } + props -- TableCatalog.RESERVED_PROPERTIES.asScala } // unset table properties sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan', 'x' = 'y')") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index d430f6ec81926..9d532155451d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -402,9 +402,7 @@ class PlanResolutionSuite extends AnalysisTest { "other" -> "20", "provider" -> "parquet", "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "ownerName" -> Utils.getCurrentUserName(), - "ownerType" -> "USER") + "comment" -> "table comment") parseAndResolve(sql) match { case create: CreateV2Table => @@ -443,9 +441,7 @@ class PlanResolutionSuite extends AnalysisTest { "other" -> "20", "provider" -> "parquet", "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "ownerName" -> Utils.getCurrentUserName(), - "ownerType" -> "USER") + "comment" -> "table comment") parseAndResolve(sql, withDefault = true) match { case create: CreateV2Table => @@ -483,9 +479,7 @@ class PlanResolutionSuite extends AnalysisTest { "p2" -> "v2", "provider" -> v2Format, "location" -> "/user/external/page_view", - "comment" -> "This is the staging page view table", - "ownerName" -> Utils.getCurrentUserName(), - "ownerType" -> "USER") + "comment" -> "This is the staging page view table") parseAndResolve(sql) match { case create: CreateV2Table => @@ -522,9 +516,7 @@ class PlanResolutionSuite extends AnalysisTest { "other" -> "20", "provider" -> "parquet", "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "ownerName" -> Utils.getCurrentUserName(), - "ownerType" -> "USER") + "comment" -> "table comment") parseAndResolve(sql) match { case ctas: CreateTableAsSelect => @@ -558,9 +550,7 @@ class PlanResolutionSuite extends AnalysisTest { "other" -> "20", "provider" -> "parquet", "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "ownerName" -> Utils.getCurrentUserName(), - "ownerType" -> "USER") + "comment" -> "table comment") parseAndResolve(sql, withDefault = true) match { case ctas: CreateTableAsSelect => @@ -593,9 +583,7 @@ class PlanResolutionSuite extends AnalysisTest { "p2" -> "v2", "provider" -> v2Format, "location" -> "/user/external/page_view", - "comment" -> "This is the staging page view table", - "ownerName" -> Utils.getCurrentUserName(), - "ownerType" -> "USER") + "comment" -> "This is the staging page view table") parseAndResolve(sql) match { case ctas: CreateTableAsSelect => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 0904b076f184d..3da52ecdaed6a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -513,7 +513,7 @@ private[hive] class HiveClientImpl( val filteredProperties = properties.filterNot { case (key, _) => excludedTableProperties.contains(key) - } ++ Option(shim.getTableOwnerType(h)).map(TableCatalog.PROP_OWNER_TYPE -> _) + } val comment = properties.get("comment") CatalogTable( @@ -570,9 +570,10 @@ private[hive] class HiveClientImpl( override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState { verifyColumnDataType(table.dataSchema) + val ownerType = + table.properties.getOrElse(TableCatalog.PROP_OWNER_TYPE, PrincipalType.USER.name()) val hiveTable = toHiveTable(table, Some(userName)) - shim.setTableOwnerType(hiveTable, - table.properties.getOrElse(TableCatalog.PROP_OWNER_TYPE, PrincipalType.USER.name)) + hiveTable.setProperty(TableCatalog.PROP_OWNER_TYPE, ownerType) client.createTable(hiveTable, ignoreIfExists) } @@ -595,7 +596,6 @@ private[hive] class HiveClientImpl( verifyColumnDataType(table.dataSchema) val hiveTable = toHiveTable( table.copy(properties = table.ignoredProperties ++ table.properties)) - table.properties.get(TableCatalog.PROP_OWNER_TYPE).foreach(shim.setTableOwnerType(hiveTable, _)) // Do not use `table.qualifiedName` here because this may be a rename val qualifiedTableName = s"$dbName.$tableName" shim.alterTable(client, qualifiedTableName, hiveTable) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 189c70cc39322..40ba9a3e26b8c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -161,10 +161,6 @@ private[client] sealed abstract class Shim { def setDatabaseOwnerType(db: Database, ownerType: String): Unit - def getTableOwnerType(table: Table): String - - def setTableOwnerType(table: Table, ownerType: String): Unit - protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = { val method = findMethod(klass, name, args: _*) require(Modifier.isStatic(method.getModifiers()), @@ -475,10 +471,6 @@ private[client] class Shim_v0_12 extends Shim with Logging { override def getDatabaseOwnerType(db: Database): String = "" override def setDatabaseOwnerType(db: Database, ownerType: String): Unit = {} - - override def getTableOwnerType(table: Table): String = "" - - override def setTableOwnerType(table: Table, ownerType: String): Unit = {} } private[client] class Shim_v0_13 extends Shim_v0_12 { @@ -1392,15 +1384,6 @@ private[client] class Shim_v3_0 extends Shim_v2_3 { stmtIdInLoadTableOrPartition, hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID, replace: JBoolean) } - - override def getTableOwnerType(table: Table): String = { - Option(getTableOwnerTypeMethod.invoke(table)) - .map(_.asInstanceOf[PrincipalType].name()).getOrElse("") - } - - override def setTableOwnerType(table: Table, ownerType: String): Unit = { - setTableOwnerTypeMethod.invoke(table, PrincipalType.valueOf(ownerType)) - } } private[client] class Shim_v3_1 extends Shim_v3_0 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 544dfa2aef15b..3e8c1a7b429d8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -98,6 +98,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA partitionColumnNames = partitionCols, createTime = 0L, createVersion = org.apache.spark.SPARK_VERSION, + properties = Map("ownerType" -> "USER"), tracksPartitionsInCatalog = true) } @@ -440,26 +441,26 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA val currentUser = Utils.getCurrentUserName() sql(s"CREATE TABLE $table1(k int)") - checkTblOwner(table1, currentUser, "") + checkTblOwner(table1, currentUser, "USER") sql(s"ALTER TABLE $table1 SET TBLPROPERTIES ('a'='a')") - checkTblOwner(table1, currentUser, "") -// val e = intercept[ParseException](sql(s"ALTER TABLE $table1 SET TBLPROPERTIES ('a'='a'," -// + s"'ownerName'='$owner','ownerType'='XXX')")) -// assert(e.getMessage.contains("ownerName")) + checkTblOwner(table1, currentUser, "USER") + val e = intercept[ParseException](sql(s"ALTER TABLE $table1 SET TBLPROPERTIES ('a'='a'," + + s"'ownerName'='$owner','ownerType'='XXX')")) + assert(e.getMessage.contains("ownerName")) sql(s"ALTER TABLE $table1 SET OWNER ROLE $owner") - checkTblOwner(table1, owner, "") + checkTblOwner(table1, owner, "ROLE") -// val e2 = intercept[ParseException]( -// sql(s"CREATE TABLE $table2 WITH TBLPROPERTIES('ownerName'='$owner')")) -// assert(e2.getMessage.contains("ownerName")) + val e2 = intercept[ParseException]( + sql(s"CREATE TABLE $table2 WITH TBLPROPERTIES('ownerName'='$owner')")) + assert(e2.getMessage.contains("ownerName")) sql(s"CREATE TABLE $table2(k int)") - checkTblOwner(table2, currentUser, "") + checkTblOwner(table2, currentUser, "USER") sql(s"ALTER TABLE $table2 SET OWNER GROUP $owner") - checkTblOwner(table2, owner, "") + checkTblOwner(table2, owner, "GROUP") sql(s"ALTER TABLE $table2 SET OWNER ROLE `$owner`") - checkTblOwner(table2, owner, "") + checkTblOwner(table2, owner, "ROLE") sql(s"ALTER TABLE $table2 SET OWNER USER OWNER") - checkTblOwner(table2, "OWNER", "") + checkTblOwner(table2, "OWNER", "USER") } finally { catalog.reset() } From e9ade41ba5825f16b4280339f432ed603b0954d0 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 17 Jan 2020 13:38:53 +0800 Subject: [PATCH 03/15] nit --- .../datasources/v2/WriteToDataSourceV2Exec.scala | 1 + .../sql/execution/command/PlanResolutionSuite.scala | 1 - .../org/apache/spark/sql/hive/client/HiveShim.scala | 11 ----------- 3 files changed, 1 insertion(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 7e9e8b59e619f..f197c0e46869c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.sources.{AlwaysTrue, Filter} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.{LongAccumulator, Utils} + /** * Deprecated logical plan for writing data into data source v2. This is being replaced by more * specific logical plans, like [[org.apache.spark.sql.catalyst.plans.logical.AppendData]]. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 9d532155451d7..0901c66cccceb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{CharType, DoubleType, HIVE_TYPE_STRING, IntegerType, LongType, MetadataBuilder, StringType, StructField, StructType} -import org.apache.spark.util.Utils class PlanResolutionSuite extends AnalysisTest { import CatalystSqlParser._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 40ba9a3e26b8c..d8078a534a874 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -1312,17 +1312,6 @@ private[client] class Shim_v3_0 extends Shim_v2_3 { classOf[AcidUtils.Operation], JBoolean.TYPE) - private lazy val getTableOwnerTypeMethod = - findMethod( - classOf[Table], - "getOwnerType") - - private lazy val setTableOwnerTypeMethod = - findMethod( - classOf[Table], - "setOwnerType", - classOf[PrincipalType]) - override def loadPartition( hive: Hive, loadPath: Path, From 0467b1eec3b7877236c3c054649f5453d45d3ab3 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 17 Jan 2020 13:43:52 +0800 Subject: [PATCH 04/15] doc --- docs/sql-migration-guide.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index bb086ff657e41..e71f0ccf467af 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -314,10 +314,10 @@ license: | yes - no + yes - For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it. + For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it. For tables, please use the ALTER TABLE ... SET OWNER syntax to modify it. @@ -328,10 +328,10 @@ license: | yes - no + yes - For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it. + For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it. For tables, please use the ALTER TABLE ... SET OWNER syntax to modify it. From 4577e933f4c55b9fa8a9e6c7f04696457369a49d Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 17 Jan 2020 15:46:37 +0800 Subject: [PATCH 05/15] fix tests --- .../org/apache/spark/sql/ShowCreateTableSuite.scala | 2 ++ .../apache/spark/sql/hive/HiveExternalCatalog.scala | 11 ++++++++--- .../spark/sql/hive/execution/HiveDDLSuite.scala | 3 ++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala index d7b489e4fa07d..c49c07cfbb1c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.util.Utils @@ -235,6 +236,7 @@ abstract class ShowCreateTableSuite extends QueryTest with SQLTestUtils { "last_modified_by", "last_modified_time", "Owner:", + TableCatalog.PROP_OWNER_TYPE, // The following are hive specific schema parameters which we do not need to match exactly. "totalNumberFiles", "maxFileSize", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 03874d005a6e6..e77213f9093ac 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -40,8 +40,8 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.ColumnStat import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions} import org.apache.spark.sql.hive.client.HiveClient @@ -630,17 +630,22 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // Add old stats properties to table properties, to retain spark's stats. // Set the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, // to retain the spark specific format if it is. + // Add old table's ownerType if we need to restore val propsFromOldTable = oldTableDef.properties.filter { case (k, v) => k.startsWith(DATASOURCE_PREFIX) || k.startsWith(STATISTICS_PREFIX) || - k.startsWith(CREATED_SPARK_VERSION) + k.startsWith(CREATED_SPARK_VERSION) || k == TableCatalog.PROP_OWNER_TYPE } val newTableProps = propsFromOldTable ++ tableDefinition.properties + partitionProviderProp + + // // Add old table's owner if we need to restore + val owner = Option(tableDefinition.owner).filter(_.nonEmpty).getOrElse(oldTableDef.owner) val newDef = tableDefinition.copy( storage = newStorage, schema = oldTableDef.schema, partitionColumnNames = oldTableDef.partitionColumnNames, bucketSpec = oldTableDef.bucketSpec, - properties = newTableProps) + properties = newTableProps, + owner = owner) client.alterTable(newDef) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 3e8c1a7b429d8..29c95d7936ab5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog} import org.apache.spark.sql.connector.catalog.SupportsNamespaces.{PROP_OWNER_NAME, PROP_OWNER_TYPE} import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.functions._ @@ -1603,6 +1603,7 @@ class HiveDDLSuite "last_modified_by", "last_modified_time", "Owner:", + TableCatalog.PROP_OWNER_TYPE, "totalNumberFiles", "maxFileSize", "minFileSize" From 0b5097947a8016e639b4a11fa36b0da10562d7e1 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 17 Jan 2020 17:27:02 +0800 Subject: [PATCH 06/15] use planner to decide ownership --- .../datasources/v2/CreateTableExec.scala | 5 +--- .../datasources/v2/DataSourceV2Strategy.scala | 26 ++++++++++++------- .../v2/WriteToDataSourceV2Exec.scala | 12 ++++----- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala index 812ff01a109fb..511cd8a9a438f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala @@ -23,7 +23,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} -import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -39,9 +38,7 @@ case class CreateTableExec( override protected def run(): Seq[InternalRow] = { if (!catalog.tableExists(identifier)) { try { - val propertiesWithOwner = withDefaultOwnership(tableProperties) - catalog.createTable(identifier, tableSchema, partitioning.toArray, - propertiesWithOwner.asJava) + catalog.createTable(identifier, tableSchema, partitioning.toArray, tableProperties.asJava) } catch { case _: TableAlreadyExistsException if ignoreIfExists => logWarning(s"Table ${identifier.quoted} was created concurrently. Ignoring.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 940ebf4d1d9e8..8cc4671e0a7b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable} import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy @@ -85,31 +85,37 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) => - CreateTableExec(catalog, ident, schema, parts, props, ifNotExists) :: Nil + val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) + CreateTableExec(catalog, ident, schema, parts, propsWithOwner, ifNotExists) :: Nil case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) => + val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) val writeOptions = new CaseInsensitiveStringMap(options.asJava) catalog match { case staging: StagingTableCatalog => - AtomicCreateTableAsSelectExec( - staging, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil + AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query), + propsWithOwner, writeOptions, ifNotExists) :: Nil case _ => - CreateTableAsSelectExec( - catalog, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil + CreateTableAsSelectExec(catalog, ident, parts, query, planLater(query), + propsWithOwner, writeOptions, ifNotExists) :: Nil } case RefreshTable(catalog, ident) => RefreshTableExec(catalog, ident) :: Nil case ReplaceTable(catalog, ident, schema, parts, props, orCreate) => + val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) catalog match { case staging: StagingTableCatalog => - AtomicReplaceTableExec(staging, ident, schema, parts, props, orCreate = orCreate) :: Nil + AtomicReplaceTableExec( + staging, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil case _ => - ReplaceTableExec(catalog, ident, schema, parts, props, orCreate = orCreate) :: Nil + ReplaceTableExec( + catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil } case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) => + val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) val writeOptions = new CaseInsensitiveStringMap(options.asJava) catalog match { case staging: StagingTableCatalog => @@ -119,7 +125,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { parts, query, planLater(query), - props, + propsWithOwner, writeOptions, orCreate = orCreate) :: Nil case _ => @@ -129,7 +135,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { parts, query, planLater(query), - props, + propsWithOwner, writeOptions, orCreate = orCreate) :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index f197c0e46869c..e360a9e656a16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, SupportsWrite, TableCatalog} -import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, LogicalWriteInfoImpl, PhysicalWriteInfoImpl, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} @@ -83,7 +82,7 @@ case class CreateTableAsSelectExec( Utils.tryWithSafeFinallyAndFailureCallbacks({ val schema = query.schema.asNullable catalog.createTable( - ident, schema, partitioning.toArray, withDefaultOwnership(properties).asJava) match { + ident, schema, partitioning.toArray, properties.asJava) match { case table: SupportsWrite => val info = LogicalWriteInfoImpl( queryId = UUID.randomUUID().toString, @@ -135,7 +134,7 @@ case class AtomicCreateTableAsSelectExec( throw new TableAlreadyExistsException(ident) } val stagedTable = catalog.stageCreate( - ident, query.schema.asNullable, partitioning.toArray, withDefaultOwnership(properties).asJava) + ident, query.schema.asNullable, partitioning.toArray, properties.asJava) writeToStagedTable(stagedTable, writeOptions, ident) } } @@ -178,7 +177,7 @@ case class ReplaceTableAsSelectExec( } val schema = query.schema.asNullable val createdTable = catalog.createTable( - ident, schema, partitioning.toArray, withDefaultOwnership(properties).asJava) + ident, schema, partitioning.toArray, properties.asJava) Utils.tryWithSafeFinallyAndFailureCallbacks({ createdTable match { case table: SupportsWrite => @@ -228,14 +227,13 @@ case class AtomicReplaceTableAsSelectExec( override protected def doExecute(): RDD[InternalRow] = { val schema = query.schema.asNullable - val propertiesWithOwner = withDefaultOwnership(properties).asJava val staged = if (orCreate) { catalog.stageCreateOrReplace( - ident, schema, partitioning.toArray, propertiesWithOwner) + ident, schema, partitioning.toArray, properties.asJava) } else if (catalog.tableExists(ident)) { try { catalog.stageReplace( - ident, schema, partitioning.toArray, propertiesWithOwner) + ident, schema, partitioning.toArray, properties.asJava) } catch { case e: NoSuchTableException => throw new CannotReplaceMissingTableException(ident, Some(e)) From 0fc26925ff06b79e964441acdec9fb403b0d253d Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 17 Jan 2020 19:53:19 +0800 Subject: [PATCH 07/15] fix DataFrameWriterV2Suite --- .../spark/sql/DataFrameWriterV2Suite.scala | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index ce0a5f21fd7ec..ed1c5ae69f957 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType} +import org.apache.spark.util.Utils class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with BeforeAndAfter { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -37,6 +38,10 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo spark.sessionState.catalogManager.catalog(name).asTableCatalog } + private val defaultOwnership = + Map(TableCatalog.PROP_OWNER_NAME -> Utils.getCurrentUserName(), + TableCatalog.PROP_OWNER_TYPE -> "USER") + before { spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) @@ -234,7 +239,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning.isEmpty) - assert(table.properties.isEmpty) + assert(table.properties == defaultOwnership.asJava) } test("Create: with using") { @@ -249,7 +254,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning.isEmpty) - assert(table.properties === Map("provider" -> "foo").asJava) + assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava) } test("Create: with property") { @@ -264,7 +269,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning.isEmpty) - assert(table.properties === Map("prop" -> "value").asJava) + assert(table.properties === (Map("prop" -> "value") ++ defaultOwnership).asJava) } test("Create: identity partitioned table") { @@ -279,7 +284,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning === Seq(IdentityTransform(FieldReference("id")))) - assert(table.properties.isEmpty) + assert(table.properties == defaultOwnership.asJava) } test("Create: partitioned by years(ts)") { @@ -368,7 +373,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning === Seq(IdentityTransform(FieldReference("id")))) - assert(table.properties === Map("provider" -> "foo").asJava) + assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava) } test("Replace: basic behavior") { @@ -386,7 +391,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning === Seq(IdentityTransform(FieldReference("id")))) - assert(table.properties === Map("provider" -> "foo").asJava) + assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava) spark.table("source2") .withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd")) @@ -405,7 +410,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo .add("data", StringType) .add("even_or_odd", StringType)) assert(replaced.partitioning.isEmpty) - assert(replaced.properties.isEmpty) + assert(replaced.properties === defaultOwnership.asJava) } test("Replace: partitioned table") { @@ -422,7 +427,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning.isEmpty) - assert(table.properties === Map("provider" -> "foo").asJava) + assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava) spark.table("source2") .withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd")) @@ -441,7 +446,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo .add("data", StringType) .add("even_or_odd", StringType)) assert(replaced.partitioning === Seq(IdentityTransform(FieldReference("id")))) - assert(replaced.properties.isEmpty) + assert(replaced.properties === defaultOwnership.asJava) } test("Replace: fail if table does not exist") { @@ -465,7 +470,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(replaced.name === "testcat.table_name") assert(replaced.schema === new StructType().add("id", LongType).add("data", StringType)) assert(replaced.partitioning.isEmpty) - assert(replaced.properties.isEmpty) + assert(replaced.properties === defaultOwnership.asJava) } test("CreateOrReplace: table exists") { @@ -483,7 +488,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning === Seq(IdentityTransform(FieldReference("id")))) - assert(table.properties === Map("provider" -> "foo").asJava) + assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava) spark.table("source2") .withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd")) @@ -502,6 +507,6 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo .add("data", StringType) .add("even_or_odd", StringType)) assert(replaced.partitioning.isEmpty) - assert(replaced.properties.isEmpty) + assert(replaced.properties === defaultOwnership.asJava) } } From f5512fba49a57121ad5f692c560e1b8d590a22fc Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sat, 18 Jan 2020 01:01:24 +0800 Subject: [PATCH 08/15] restore ownerType for show create table --- .../org/apache/spark/sql/execution/command/tables.scala | 9 ++++++--- .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 1 + 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 447d00c11e7cf..14e354c9e2555 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.DescribeTableSchema import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier, CaseInsensitiveMap} +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils} import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat @@ -1022,12 +1023,14 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman } else { val tableMetadata = catalog.getTableMetadata(table) + val restoredTableMeta = tableMetadata.copy( + properties = tableMetadata.properties.filterKeys(_ != TableCatalog.PROP_OWNER_TYPE)) // TODO: [SPARK-28692] unify this after we unify the // CREATE TABLE syntax for hive serde and data source table. - val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) { - showCreateDataSourceTable(tableMetadata) + val stmt = if (DDLUtils.isDatasourceTable(restoredTableMeta)) { + showCreateDataSourceTable(restoredTableMeta) } else { - showCreateHiveTable(tableMetadata) + showCreateHiveTable(restoredTableMeta) } Seq(Row(stmt)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 29c95d7936ab5..1f2ea7b58e510 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -112,6 +112,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA "last_modified_time", "Owner:", "COLUMN_STATS_ACCURATE", + TableCatalog.PROP_OWNER_TYPE, // The following are hive specific schema parameters which we do not need to match exactly. "numFiles", "numRows", From 8e4f747c3f07378ff615c2586d111db10823cdb5 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sat, 18 Jan 2020 01:14:33 +0800 Subject: [PATCH 09/15] nit --- .../scala/org/apache/spark/sql/hive/client/VersionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index e07978ae180e4..e31003b556352 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -330,7 +330,7 @@ class VersionsSuite extends SparkFunSuite with Logging { // mocking the owner is empty val newTable2 = originalTable.copy(owner = "") client.alterTable(newTable2) - assert(client.getTable("default", "src").owner === client.userName) + assert(client.getTable("default", "src").owner === "") } test(s"$version: alterTable(dbName: String, tableName: String, table: CatalogTable)") { From e618825f7e194cfc75ed3a335458f69c1061c5b4 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sun, 19 Jan 2020 13:48:51 +0800 Subject: [PATCH 10/15] if owner is empty, let it be sparkUser instead of hive's authenticator user --- .../apache/spark/sql/hive/client/HiveClientImpl.scala | 10 +++++----- .../apache/spark/sql/hive/client/VersionsSuite.scala | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 3da52ecdaed6a..24c4977d9af76 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -595,7 +595,7 @@ private[hive] class HiveClientImpl( // these user-specified values. verifyColumnDataType(table.dataSchema) val hiveTable = toHiveTable( - table.copy(properties = table.ignoredProperties ++ table.properties)) + table.copy(properties = table.ignoredProperties ++ table.properties), Some(userName)) // Do not use `table.qualifiedName` here because this may be a rename val qualifiedTableName = s"$dbName.$tableName" shim.alterTable(client, qualifiedTableName, hiveTable) @@ -712,7 +712,7 @@ private[hive] class HiveClientImpl( val original = state.getCurrentDatabase try { setCurrentDatabaseRaw(db) - val hiveTable = toHiveTable(getTable(db, table)) + val hiveTable = toHiveTable(getTable(db, table), Some(userName)) shim.alterPartitions(client, table, newParts.map { toHivePartition(_, hiveTable) }.asJava) } finally { state.setCurrentDatabase(original) @@ -743,7 +743,7 @@ private[hive] class HiveClientImpl( override def getPartitionOption( table: CatalogTable, spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table) + val hiveTable = toHiveTable(table, Some(userName)) val hivePartition = client.getPartition(hiveTable, spec.asJava, false) Option(hivePartition).map(fromHivePartition) } @@ -755,7 +755,7 @@ private[hive] class HiveClientImpl( override def getPartitions( table: CatalogTable, spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table) + val hiveTable = toHiveTable(table, Some(userName)) val partSpec = spec match { case None => CatalogTypes.emptyTablePartitionSpec case Some(s) => @@ -770,7 +770,7 @@ private[hive] class HiveClientImpl( override def getPartitionsByFilter( table: CatalogTable, predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table) + val hiveTable = toHiveTable(table, Some(userName)) val parts = shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index e31003b556352..3c869cdd59a5f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -330,7 +330,7 @@ class VersionsSuite extends SparkFunSuite with Logging { // mocking the owner is empty val newTable2 = originalTable.copy(owner = "") client.alterTable(newTable2) - assert(client.getTable("default", "src").owner === "") + assert(client.getTable("default", "src").owner === originalTable.owner) } test(s"$version: alterTable(dbName: String, tableName: String, table: CatalogTable)") { From 46ae85b4f7388086381cbac681d958e76633488d Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sun, 19 Jan 2020 16:28:50 +0800 Subject: [PATCH 11/15] nit --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 24c4977d9af76..4dea47cda2a96 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -692,7 +692,7 @@ private[hive] class HiveClientImpl( newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState { require(specs.size == newSpecs.size, "number of old and new partition specs differ") val catalogTable = getTable(db, table) - val hiveTable = toHiveTable(catalogTable) + val hiveTable = toHiveTable(catalogTable, Some(userName)) specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => val hivePart = getPartitionOption(catalogTable, oldSpec) .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) } From 9bb78eb82fd3eecd401c4a0175e02a0f6aff925d Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 20 Jan 2020 16:18:07 +0800 Subject: [PATCH 12/15] revert alter table set owner --- docs/sql-migration-guide.md | 20 +++++- .../spark/sql/catalyst/parser/SqlBase.g4 | 2 - .../sql/connector/catalog/TableCatalog.java | 9 +-- .../sql/catalyst/catalog/interface.scala | 4 +- .../sql/catalyst/parser/AstBuilder.scala | 29 ++------- .../catalyst/plans/logical/v2Commands.scala | 10 --- .../sql/connector/catalog/CatalogV2Util.scala | 4 +- .../sql/catalyst/parser/DDLParserSuite.scala | 13 ---- .../spark/sql/execution/command/tables.scala | 8 +-- .../datasources/v2/DataSourceV2Strategy.scala | 5 -- .../datasources/v2/V2SessionCatalog.scala | 2 +- .../spark/sql/DataFrameWriterV2Suite.scala | 4 +- .../spark/sql/ShowCreateTableSuite.scala | 1 - .../spark/sql/connector/AlterTableTests.scala | 18 ------ .../sql/connector/DataSourceV2SQLSuite.scala | 20 +++--- .../sql/execution/command/DDLSuite.scala | 3 +- .../spark/sql/hive/HiveExternalCatalog.scala | 3 +- .../sql/hive/client/HiveClientImpl.scala | 6 +- .../spark/sql/hive/client/VersionsSuite.scala | 2 +- .../sql/hive/execution/HiveDDLSuite.scala | 61 ++++--------------- 20 files changed, 56 insertions(+), 168 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 8139edc314384..2ddbdb9d88d71 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -314,10 +314,10 @@ license: | yes - yes + no - For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it. For tables, please use the ALTER TABLE ... SET OWNER syntax to modify it. + For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it @@ -327,11 +327,25 @@ license: | yes + + no + + + For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it + + + + + owner + + + no + yes - For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it. For tables, please use the ALTER TABLE ... SET OWNER syntax to modify it. + For tables, it is determined by the user who runs spark and create the table. diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 77252e5c69623..645d0d709e063 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -181,8 +181,6 @@ statement DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions | ALTER TABLE multipartIdentifier (partitionSpec)? SET locationSpec #setTableLocation - | ALTER TABLE multipartIdentifier - SET OWNER ownerType=(USER | ROLE | GROUP) identifier #setTableOwner | ALTER TABLE multipartIdentifier RECOVER PARTITIONS #recoverPartitions | DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable | DROP VIEW (IF EXISTS)? multipartIdentifier #dropView diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 04e87d9560f1a..591e1c631be13 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -59,18 +59,13 @@ public interface TableCatalog extends CatalogPlugin { /** * A property to specify the owner of the table. */ - String PROP_OWNER_NAME = "ownerName"; - - /** - * A property to specify the type of the table's owner. - */ - String PROP_OWNER_TYPE = "ownerType"; + String PROP_OWNER = "owner"; /** * The list of reserved table properties. */ List RESERVED_PROPERTIES = - Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER, PROP_OWNER_NAME, PROP_OWNER_TYPE); + Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER, PROP_OWNER); /** * List the tables in a namespace from the catalog. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 5fa3c87c68ccb..37b900c63af1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -335,8 +335,7 @@ case class CatalogTable( def toLinkedHashMap: mutable.LinkedHashMap[String, String] = { val map = new mutable.LinkedHashMap[String, String]() - val tableProperties = properties.filterKeys(_ != TableCatalog.PROP_OWNER_TYPE) - .map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") + val tableProperties = properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") val partitionColumns = partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") val lastAccess = { if (lastAccessTime <= 0) "UNKNOWN" else new Date(lastAccessTime).toString @@ -345,7 +344,6 @@ case class CatalogTable( identifier.database.foreach(map.put("Database", _)) map.put("Table", identifier.table) if (owner != null && owner.nonEmpty) map.put("Owner", owner) - properties.get(TableCatalog.PROP_OWNER_TYPE).foreach(map.put("Owner Type", _)) map.put("Created Time", new Date(createTime).toString) map.put("Last Access", lastAccess) map.put("Created By", "Spark " + createVersion) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d977513256c90..d1baa93876a3c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2680,13 +2680,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging throw new ParseException(s"$PROP_LOCATION is a reserved table property, please use" + s" the LOCATION clause to specify it.", ctx) case (PROP_LOCATION, _) => false - case (ownership, _) if ownership == PROP_OWNER_NAME || ownership == PROP_OWNER_TYPE => - if (legacyOn) { - false - } else { - throw new ParseException(s"$ownership is a reserved table property , please use" + - " ALTER TABLE ... SET OWNER ... to specify it.", ctx) - } + case (PROP_OWNER, _) if !legacyOn => + throw new ParseException(s"$PROP_OWNER is a reserved table property , please use" + + " ALTER TABLE ... SET OWNER ... to specify it.", ctx) + case (PROP_OWNER, _) => false case _ => true } } @@ -3628,22 +3625,4 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging ctx.ownerType.getText) } } - - /** - * Create an [[AlterTableSetOwner]] logical plan. - * - * For example: - * {{{ - * ALTER TABLE tableName SET OWNER (USER|ROLE|GROUP) identityName; - * }}} - */ - override def visitSetTableOwner(ctx: SetTableOwnerContext): LogicalPlan = { - withOrigin(ctx) { - val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier) - AlterTableSetOwner( - UnresolvedTable(nameParts), - ctx.identifier.getText, - ctx.ownerType.getText) - } - } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 7392919de38fb..e1e7eac4cc085 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -302,16 +302,6 @@ case class AlterNamespaceSetOwner( override def children: Seq[LogicalPlan] = child :: Nil } -/** - * ALTER TABLE ... SET OWNER command, as parsed from SQL. - */ -case class AlterTableSetOwner( - child: LogicalPlan, - ownerName: String, - ownerType: String) extends Command { - override def children: Seq[LogicalPlan] = child :: Nil -} - /** * The logical plan of the SHOW NAMESPACES command that works for v2 catalogs. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 5c0814090434d..67726c7343524 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -278,9 +278,7 @@ private[sql] object CatalogV2Util { } def withDefaultOwnership(properties: Map[String, String]): Map[String, String] = { - properties ++ - Map(TableCatalog.PROP_OWNER_NAME -> Utils.getCurrentUserName(), - TableCatalog.PROP_OWNER_TYPE -> "USER") + properties ++ Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName()) } def createAlterTable( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 1217654fc63ab..47387fa18411e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1356,19 +1356,6 @@ class DDLParserSuite extends AnalysisTest { AlterNamespaceSetOwner(UnresolvedNamespace(Seq("a", "b", "c")), "group1", "GROUP")) } - test("set table owner") { - comparePlans( - parsePlan("ALTER TABLE a.b.c SET OWNER USER user1"), - AlterTableSetOwner(UnresolvedTable(Seq("a", "b", "c")), "user1", "USER")) - - comparePlans( - parsePlan("ALTER TABLE a.b.c SET OWNER ROLE role1"), - AlterTableSetOwner(UnresolvedTable(Seq("a", "b", "c")), "role1", "ROLE")) - comparePlans( - parsePlan("ALTER TABLE a.b.c SET OWNER GROUP group1"), - AlterTableSetOwner(UnresolvedTable(Seq("a", "b", "c")), "group1", "GROUP")) - } - test("show databases: basic") { comparePlans( parsePlan("SHOW DATABASES"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 021da648827f6..cb9d752f0a38d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -1016,14 +1016,12 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman } else { val tableMetadata = catalog.getTableMetadata(table) - val restoredTableMeta = tableMetadata.copy( - properties = tableMetadata.properties.filterKeys(_ != TableCatalog.PROP_OWNER_TYPE)) // TODO: [SPARK-28692] unify this after we unify the // CREATE TABLE syntax for hive serde and data source table. - val stmt = if (DDLUtils.isDatasourceTable(restoredTableMeta)) { - showCreateDataSourceTable(restoredTableMeta) + val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) { + showCreateDataSourceTable(tableMetadata) } else { - showCreateHiveTable(restoredTableMeta) + showCreateHiveTable(tableMetadata) } Seq(Row(stmt)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 60bf7137b8027..c68bb3f1ca3a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -292,11 +292,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat Map(SupportsNamespaces.PROP_OWNER_NAME -> name, SupportsNamespaces.PROP_OWNER_TYPE -> typ) AlterNamespaceSetPropertiesExec(catalog, namespace, properties) :: Nil - case AlterTableSetOwner(ResolvedTable(catalog, ident, _), name, typ) => - val changes = TableChange.setProperty(TableCatalog.PROP_OWNER_NAME, name) :: - TableChange.setProperty(TableCatalog.PROP_OWNER_TYPE, typ) :: Nil - AlterTableExec(catalog, ident, changes) :: Nil - case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 2940b9b409cfc..8eea1cf9c06e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -125,7 +125,7 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes) val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes) val comment = properties.get(TableCatalog.PROP_COMMENT) - val owner = properties.getOrElse(TableCatalog.PROP_OWNER_NAME, catalogTable.owner) + val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner) try { catalog.alterTable( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index ed1c5ae69f957..4e6381aea3c31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -38,9 +38,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo spark.sessionState.catalogManager.catalog(name).asTableCatalog } - private val defaultOwnership = - Map(TableCatalog.PROP_OWNER_NAME -> Utils.getCurrentUserName(), - TableCatalog.PROP_OWNER_TYPE -> "USER") + private val defaultOwnership = Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName()) before { spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala index c49c07cfbb1c1..54031e975df4f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala @@ -236,7 +236,6 @@ abstract class ShowCreateTableSuite extends QueryTest with SQLTestUtils { "last_modified_by", "last_modified_time", "Owner:", - TableCatalog.PROP_OWNER_TYPE, // The following are hive specific schema parameters which we do not need to match exactly. "totalNumberFiles", "maxFileSize", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index 334f4c56f751a..2fc5020c39ade 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -996,22 +996,4 @@ trait AlterTableTests extends SharedSparkSession { assert(updated.properties === withDefaultOwnership(Map("provider" -> v2Format)).asJava) } } - - test("AlterTable: set table owner") { - val t = s"${catalogAndNamespace}table_name" - withTable(t) { - sql(s"CREATE TABLE $t (id int) USING $v2Format") - assert(getTableMetadata(t).properties === - withDefaultOwnership(Map("provider" -> v2Format)).asJava) - sql(s"ALTER TABLE $t SET OWNER ROLE kent") - assert(getTableMetadata(t).properties === - Map("provider" -> v2Format, "ownerName" -> "kent", "ownerType" -> "ROLE").asJava) - sql(s"ALTER TABLE $t SET OWNER GROUP yao") - assert(getTableMetadata(t).properties === - Map("provider" -> v2Format, "ownerName" -> "yao", "ownerType" -> "GROUP").asJava) - sql(s"ALTER TABLE $t SET OWNER USER ming") - assert(getTableMetadata(t).properties === - Map("provider" -> v2Format, "ownerName" -> "ming", "ownerType" -> "USER").asJava) - } - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index a71d90090185c..5dde21e573898 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -162,8 +162,7 @@ class DataSourceV2SQLSuite Array("Comment", "this is a test table", ""), Array("Location", "/tmp/testcat/table_name", ""), Array("Provider", "foo", ""), - Array("OwnerName", defaultUser, ""), - Array("OwnerType", "USER", ""), + Array(TableCatalog.PROP_OWNER.capitalize, defaultUser, ""), Array("Table Properties", "[bar=baz]", ""))) } @@ -1927,11 +1926,11 @@ class DataSourceV2SQLSuite test("SHOW TBLPROPERTIES: v2 table") { val t = "testcat.ns1.ns2.tbl" withTable(t) { - val owner = "andrew" + val user = "andrew" val status = "new" val provider = "foo" spark.sql(s"CREATE TABLE $t (id bigint, data string) USING $provider " + - s"TBLPROPERTIES ('owner'='$owner', 'status'='$status')") + s"TBLPROPERTIES ('user'='$user', 'status'='$status')") val properties = sql(s"SHOW TBLPROPERTIES $t").orderBy("key") @@ -1940,11 +1939,10 @@ class DataSourceV2SQLSuite .add("value", StringType, nullable = false) val expected = Seq( - Row("owner", owner), - Row("ownerName", defaultUser), - Row("ownerType", "USER"), + Row(TableCatalog.PROP_OWNER, defaultUser), Row("provider", provider), - Row("status", status)) + Row("status", status), + Row("user", user)) assert(properties.schema === schema) assert(expected === properties.collect()) @@ -1954,11 +1952,11 @@ class DataSourceV2SQLSuite test("SHOW TBLPROPERTIES(key): v2 table") { val t = "testcat.ns1.ns2.tbl" withTable(t) { - val owner = "andrew" + val user = "andrew" val status = "new" val provider = "foo" spark.sql(s"CREATE TABLE $t (id bigint, data string) USING $provider " + - s"TBLPROPERTIES ('owner'='$owner', 'status'='$status')") + s"TBLPROPERTIES ('user'='$user', 'status'='$status')") val properties = sql(s"SHOW TBLPROPERTIES $t ('status')") @@ -1973,7 +1971,7 @@ class DataSourceV2SQLSuite withTable(t) { val nonExistingKey = "nonExistingKey" spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo " + - s"TBLPROPERTIES ('owner'='andrew', 'status'='new')") + s"TBLPROPERTIES ('user'='andrew', 'status'='new')") val properties = sql(s"SHOW TBLPROPERTIES $t ('$nonExistingKey')") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index a4a15c54e740f..d31043fae6f3e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1377,12 +1377,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { createDatabase(catalog, "dbx") createTable(catalog, tableIdent, isDatasourceTable) def getProps: Map[String, String] = { - val props = if (isUsingHiveMetastore) { + if (isUsingHiveMetastore) { normalizeCatalogTable(catalog.getTableMetadata(tableIdent)).properties } else { catalog.getTableMetadata(tableIdent).properties } - props -- TableCatalog.RESERVED_PROPERTIES.asScala } assert(getProps.isEmpty) // set table properties diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index e77213f9093ac..ca292f65efeee 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -630,10 +630,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // Add old stats properties to table properties, to retain spark's stats. // Set the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, // to retain the spark specific format if it is. - // Add old table's ownerType if we need to restore val propsFromOldTable = oldTableDef.properties.filter { case (k, v) => k.startsWith(DATASOURCE_PREFIX) || k.startsWith(STATISTICS_PREFIX) || - k.startsWith(CREATED_SPARK_VERSION) || k == TableCatalog.PROP_OWNER_TYPE + k.startsWith(CREATED_SPARK_VERSION) } val newTableProps = propsFromOldTable ++ tableDefinition.properties + partitionProviderProp diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 4dea47cda2a96..7b0282c43c4c7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -570,11 +570,7 @@ private[hive] class HiveClientImpl( override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState { verifyColumnDataType(table.dataSchema) - val ownerType = - table.properties.getOrElse(TableCatalog.PROP_OWNER_TYPE, PrincipalType.USER.name()) - val hiveTable = toHiveTable(table, Some(userName)) - hiveTable.setProperty(TableCatalog.PROP_OWNER_TYPE, ownerType) - client.createTable(hiveTable, ignoreIfExists) + client.createTable(toHiveTable(table, Some(userName)), ignoreIfExists) } override def dropTable( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 3c869cdd59a5f..e07978ae180e4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -330,7 +330,7 @@ class VersionsSuite extends SparkFunSuite with Logging { // mocking the owner is empty val newTable2 = originalTable.copy(owner = "") client.alterTable(newTable2) - assert(client.getTable("default", "src").owner === originalTable.owner) + assert(client.getTable("default", "src").owner === client.userName) } test(s"$version: alterTable(dbName: String, tableName: String, table: CatalogTable)") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 1f2ea7b58e510..3d8def8016299 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -98,7 +98,6 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA partitionColumnNames = partitionCols, createTime = 0L, createVersion = org.apache.spark.SPARK_VERSION, - properties = Map("ownerType" -> "USER"), tracksPartitionsInCatalog = true) } @@ -112,7 +111,6 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA "last_modified_time", "Owner:", "COLUMN_STATS_ACCURATE", - TableCatalog.PROP_OWNER_TYPE, // The following are hive specific schema parameters which we do not need to match exactly. "numFiles", "numRows", @@ -377,7 +375,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA } } - private def checkDbOwner(db: String, + private def checkOwner(db: String, expectedOwnerName: String, expectedOwnerType: String): Unit = { val df = sql(s"DESCRIBE DATABASE EXTENDED $db") val owner = df.where("database_description_item='Owner Name'") @@ -397,71 +395,40 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA val currentUser = Utils.getCurrentUserName() sql(s"CREATE DATABASE $db1") - checkDbOwner(db1, currentUser, "USER") + checkOwner(db1, currentUser, "USER") sql(s"ALTER DATABASE $db1 SET DBPROPERTIES ('a'='a')") - checkDbOwner(db1, currentUser, "USER") + checkOwner(db1, currentUser, "USER") val e = intercept[ParseException](sql(s"ALTER DATABASE $db1 SET DBPROPERTIES ('a'='a'," + s"'ownerName'='$owner','ownerType'='XXX')")) assert(e.getMessage.contains("ownerName")) sql(s"ALTER DATABASE $db1 SET OWNER ROLE $owner") - checkDbOwner(db1, owner, "ROLE") + checkOwner(db1, owner, "ROLE") val e2 = intercept[ParseException]( sql(s"CREATE DATABASE $db2 WITH DBPROPERTIES('ownerName'='$owner')")) assert(e2.getMessage.contains("ownerName")) sql(s"CREATE DATABASE $db2") - checkDbOwner(db2, currentUser, "USER") + checkOwner(db2, currentUser, "USER") sql(s"ALTER DATABASE $db2 SET OWNER GROUP $owner") - checkDbOwner(db2, owner, "GROUP") + checkOwner(db2, owner, "GROUP") sql(s"ALTER DATABASE $db2 SET OWNER GROUP `$owner`") - checkDbOwner(db2, owner, "GROUP") + checkOwner(db2, owner, "GROUP") sql(s"ALTER DATABASE $db2 SET OWNER GROUP OWNER") - checkDbOwner(db2, "OWNER", "GROUP") + checkOwner(db2, "OWNER", "GROUP") } finally { catalog.reset() } } - private def checkTblOwner(table: String, - expectedOwnerName: String, expectedOwnerType: String): Unit = { - val df = sql(s"DESCRIBE TABLE EXTENDED $table") - val owner = df.where("col_name='Owner'") - .collect().head.getString(1) - val typ = df.where("col_name='Owner Type'") - .collect().head.getString(1) - assert(owner === expectedOwnerName) - assert(typ === expectedOwnerType) - } - test("Table Ownership") { val catalog = spark.sessionState.catalog try { - val table1 = "spark_30019_1" - val table2 = "spark_30019_2" - val owner = "spark_30019" - val currentUser = Utils.getCurrentUserName() - - sql(s"CREATE TABLE $table1(k int)") - checkTblOwner(table1, currentUser, "USER") - sql(s"ALTER TABLE $table1 SET TBLPROPERTIES ('a'='a')") - checkTblOwner(table1, currentUser, "USER") - val e = intercept[ParseException](sql(s"ALTER TABLE $table1 SET TBLPROPERTIES ('a'='a'," - + s"'ownerName'='$owner','ownerType'='XXX')")) - assert(e.getMessage.contains("ownerName")) - sql(s"ALTER TABLE $table1 SET OWNER ROLE $owner") - checkTblOwner(table1, owner, "ROLE") - + sql(s"CREATE TABLE spark_30019(k int)") + assert(sql(s"DESCRIBE TABLE EXTENDED spark_30019").where("col_name='Owner'") + .collect().head.getString(1) === Utils.getCurrentUserName()) val e2 = intercept[ParseException]( - sql(s"CREATE TABLE $table2 WITH TBLPROPERTIES('ownerName'='$owner')")) - assert(e2.getMessage.contains("ownerName")) - sql(s"CREATE TABLE $table2(k int)") - checkTblOwner(table2, currentUser, "USER") - sql(s"ALTER TABLE $table2 SET OWNER GROUP $owner") - checkTblOwner(table2, owner, "GROUP") - sql(s"ALTER TABLE $table2 SET OWNER ROLE `$owner`") - checkTblOwner(table2, owner, "ROLE") - sql(s"ALTER TABLE $table2 SET OWNER USER OWNER") - checkTblOwner(table2, "OWNER", "USER") + sql(s"CREATE TABLE spark_30019_2 WITH TBLPROPERTIES('owner'='spark_30019')")) + assert(e2.getMessage.contains("owner")) } finally { catalog.reset() } @@ -472,7 +439,6 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { import testImplicits._ val hiveFormats = Seq("PARQUET", "ORC", "TEXTFILE", "SEQUENCEFILE", "RCFILE", "AVRO") - private val reversedProperties = Seq("ownerName", "ownerType") override def afterEach(): Unit = { try { @@ -1604,7 +1570,6 @@ class HiveDDLSuite "last_modified_by", "last_modified_time", "Owner:", - TableCatalog.PROP_OWNER_TYPE, "totalNumberFiles", "maxFileSize", "minFileSize" From ce352a949759c88c6e654acf2b6806c61ab84054 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 20 Jan 2020 17:14:50 +0800 Subject: [PATCH 13/15] nit --- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 4 ++-- .../scala/org/apache/spark/sql/execution/command/tables.scala | 1 - .../scala/org/apache/spark/sql/ShowCreateTableSuite.scala | 1 - .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 1 - .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 3 +-- 5 files changed, 3 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d1baa93876a3c..69b70c2f53708 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2681,8 +2681,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging s" the LOCATION clause to specify it.", ctx) case (PROP_LOCATION, _) => false case (PROP_OWNER, _) if !legacyOn => - throw new ParseException(s"$PROP_OWNER is a reserved table property , please use" + - " ALTER TABLE ... SET OWNER ... to specify it.", ctx) + throw new ParseException(s"$PROP_OWNER is a reserved table property, it will be" + + s" set to the current user by default.", ctx) case (PROP_OWNER, _) => false case _ => true } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index cb9d752f0a38d..a92fbdf25975b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.DescribeTableSchema import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier, CaseInsensitiveMap} -import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils} import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala index 54031e975df4f..d7b489e4fa07d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.util.Utils diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 7b0282c43c4c7..f196e94a83f97 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -54,7 +54,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ -import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 3d8def8016299..2a75204fe225f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -375,8 +375,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA } } - private def checkOwner(db: String, - expectedOwnerName: String, expectedOwnerType: String): Unit = { + private def checkOwner(db: String, expectedOwnerName: String, expectedOwnerType: String): Unit = { val df = sql(s"DESCRIBE DATABASE EXTENDED $db") val owner = df.where("database_description_item='Owner Name'") .collect().head.getString(1) From 8e70ec1e59b81a777c8c8efdc4d663c3fbb77f9d Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 20 Jan 2020 17:25:35 +0800 Subject: [PATCH 14/15] nit --- .../scala/org/apache/spark/sql/catalyst/catalog/interface.scala | 2 +- .../scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 37b900c63af1f..d51690367bf35 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.quoteIdentifier -import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog} +import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 69b70c2f53708..bdbb0e7e4adc8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3625,4 +3625,5 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging ctx.ownerType.getText) } } + } From 97f07b86320862dd7fae5c1d8646dc74f1be26b2 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 20 Jan 2020 22:12:46 +0800 Subject: [PATCH 15/15] nit --- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 6 +----- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 3 --- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index d31043fae6f3e..11c6487e25e90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -21,8 +21,6 @@ import java.io.{File, PrintWriter} import java.net.URI import java.util.Locale -import scala.collection.JavaConverters._ - import org.apache.hadoop.fs.{Path, RawLocalFileSystem} import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, AclStatus, FsAction, FsPermission} @@ -35,7 +33,6 @@ import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseE import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.SupportsNamespaces.{PROP_OWNER_NAME, PROP_OWNER_TYPE} -import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} @@ -1406,12 +1403,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { createDatabase(catalog, "dbx") createTable(catalog, tableIdent, isDatasourceTable) def getProps: Map[String, String] = { - val props = if (isUsingHiveMetastore) { + if (isUsingHiveMetastore) { normalizeCatalogTable(catalog.getTableMetadata(tableIdent)).properties } else { catalog.getTableMetadata(tableIdent).properties } - props -- TableCatalog.RESERVED_PROPERTIES.asScala } // unset table properties sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan', 'x' = 'y')") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 2a75204fe225f..59eadb844837e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -425,9 +425,6 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA sql(s"CREATE TABLE spark_30019(k int)") assert(sql(s"DESCRIBE TABLE EXTENDED spark_30019").where("col_name='Owner'") .collect().head.getString(1) === Utils.getCurrentUserName()) - val e2 = intercept[ParseException]( - sql(s"CREATE TABLE spark_30019_2 WITH TBLPROPERTIES('owner'='spark_30019')")) - assert(e2.getMessage.contains("owner")) } finally { catalog.reset() }