Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,10 @@ license: |
yes
</td>
<td>
no
yes
</td>
<td>
For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it.
For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it. For tables, please use the ALTER TABLE ... SET OWNER syntax to modify it.
</td>
</tr>
<tr>
Expand All @@ -328,10 +328,10 @@ license: |
yes
</td>
<td>
no
yes
</td>
<td>
For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it.
For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it. For tables, please use the ALTER TABLE ... SET OWNER syntax to modify it.
</td>
</tr>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ statement
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions
| ALTER TABLE multipartIdentifier
(partitionSpec)? SET locationSpec #setTableLocation
| ALTER TABLE multipartIdentifier
SET OWNER ownerType=(USER | ROLE | GROUP) identifier #setTableOwner
| ALTER TABLE multipartIdentifier RECOVER PARTITIONS #recoverPartitions
| DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? multipartIdentifier #dropView
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,21 @@ public interface TableCatalog extends CatalogPlugin {
*/
String PROP_PROVIDER = "provider";

/**
* A property to specify the owner of the table.
*/
String PROP_OWNER_NAME = "ownerName";

/**
* A property to specify the type of the table's owner.
*/
String PROP_OWNER_TYPE = "ownerType";

/**
* The list of reserved table properties.
*/
List<String> RESERVED_PROPERTIES = Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER);
List<String> RESERVED_PROPERTIES =
Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER, PROP_OWNER_NAME, PROP_OWNER_TYPE);

/**
* List the tables in a namespace from the catalog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -335,7 +335,8 @@ case class CatalogTable(

def toLinkedHashMap: mutable.LinkedHashMap[String, String] = {
val map = new mutable.LinkedHashMap[String, String]()
val tableProperties = properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
val tableProperties = properties.filterKeys(_ != TableCatalog.PROP_OWNER_TYPE)
.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
val partitionColumns = partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
val lastAccess = {
if (lastAccessTime <= 0) "UNKNOWN" else new Date(lastAccessTime).toString
Expand All @@ -344,6 +345,7 @@ case class CatalogTable(
identifier.database.foreach(map.put("Database", _))
map.put("Table", identifier.table)
if (owner != null && owner.nonEmpty) map.put("Owner", owner)
properties.get(TableCatalog.PROP_OWNER_TYPE).foreach(map.put("Owner Type", _))
map.put("Created Time", new Date(createTime).toString)
map.put("Last Access", lastAccess)
map.put("Created By", "Spark " + createVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2680,6 +2680,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
throw new ParseException(s"$PROP_LOCATION is a reserved table property, please use" +
s" the LOCATION clause to specify it.", ctx)
case (PROP_LOCATION, _) => false
case (ownership, _) if ownership == PROP_OWNER_NAME || ownership == PROP_OWNER_TYPE =>
if (legacyOn) {
false
} else {
throw new ParseException(s"$ownership is a reserved table property , please use" +
" ALTER TABLE ... SET OWNER ... to specify it.", ctx)
}
case _ => true
}
}
Expand Down Expand Up @@ -3622,4 +3629,21 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
}

/**
* Create an [[AlterTableSetOwner]] logical plan.
*
* For example:
* {{{
* ALTER TABLE tableName SET OWNER (USER|ROLE|GROUP) identityName;
* }}}
*/
override def visitSetTableOwner(ctx: SetTableOwnerContext): LogicalPlan = {
withOrigin(ctx) {
val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier)
AlterTableSetOwner(
UnresolvedTable(nameParts),
ctx.identifier.getText,
ctx.ownerType.getText)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,16 @@ case class AlterNamespaceSetOwner(
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* ALTER TABLE ... SET OWNER command, as parsed from SQL.
*/
case class AlterTableSetOwner(
child: LogicalPlan,
ownerName: String,
ownerType: String) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the SHOW NAMESPACES command that works for v2 catalogs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

private[sql] object CatalogV2Util {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
Expand Down Expand Up @@ -276,6 +277,12 @@ private[sql] object CatalogV2Util {
location.map(TableCatalog.PROP_LOCATION -> _)
}

def withDefaultOwnership(properties: Map[String, String]): Map[String, String] = {
properties ++
Map(TableCatalog.PROP_OWNER_NAME -> Utils.getCurrentUserName(),
TableCatalog.PROP_OWNER_TYPE -> "USER")
}

def createAlterTable(
originalNameParts: Seq[String],
catalog: CatalogPlugin,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,19 @@ class DDLParserSuite extends AnalysisTest {
AlterNamespaceSetOwner(UnresolvedNamespace(Seq("a", "b", "c")), "group1", "GROUP"))
}

test("set table owner") {
comparePlans(
parsePlan("ALTER TABLE a.b.c SET OWNER USER user1"),
AlterTableSetOwner(UnresolvedTable(Seq("a", "b", "c")), "user1", "USER"))

comparePlans(
parsePlan("ALTER TABLE a.b.c SET OWNER ROLE role1"),
AlterTableSetOwner(UnresolvedTable(Seq("a", "b", "c")), "role1", "ROLE"))
comparePlans(
parsePlan("ALTER TABLE a.b.c SET OWNER GROUP group1"),
AlterTableSetOwner(UnresolvedTable(Seq("a", "b", "c")), "group1", "GROUP"))
}

test("show databases: basic") {
comparePlans(
parsePlan("SHOW DATABASES"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
Expand Down Expand Up @@ -85,31 +85,37 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) =>
CreateTableExec(catalog, ident, schema, parts, props, ifNotExists) :: Nil
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
CreateTableExec(catalog, ident, schema, parts, propsWithOwner, ifNotExists) :: Nil

case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) =>
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
AtomicCreateTableAsSelectExec(
staging, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query),
propsWithOwner, writeOptions, ifNotExists) :: Nil
case _ =>
CreateTableAsSelectExec(
catalog, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
CreateTableAsSelectExec(catalog, ident, parts, query, planLater(query),
propsWithOwner, writeOptions, ifNotExists) :: Nil
}

case RefreshTable(catalog, ident) =>
RefreshTableExec(catalog, ident) :: Nil

case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableExec(staging, ident, schema, parts, props, orCreate = orCreate) :: Nil
AtomicReplaceTableExec(
staging, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
case _ =>
ReplaceTableExec(catalog, ident, schema, parts, props, orCreate = orCreate) :: Nil
ReplaceTableExec(
catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
}

case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) =>
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
Expand All @@ -119,7 +125,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
parts,
query,
planLater(query),
props,
propsWithOwner,
writeOptions,
orCreate = orCreate) :: Nil
case _ =>
Expand All @@ -129,7 +135,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
parts,
query,
planLater(query),
props,
propsWithOwner,
writeOptions,
orCreate = orCreate) :: Nil
}
Expand Down Expand Up @@ -257,6 +263,11 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
Map(SupportsNamespaces.PROP_OWNER_NAME -> name, SupportsNamespaces.PROP_OWNER_TYPE -> typ)
AlterNamespaceSetPropertiesExec(catalog, namespace, properties) :: Nil

case AlterTableSetOwner(ResolvedTable(catalog, ident, _), name, typ) =>
val changes = TableChange.setProperty(TableCatalog.PROP_OWNER_NAME, name) ::
TableChange.setProperty(TableCatalog.PROP_OWNER_TYPE, typ) :: Nil
AlterTableExec(catalog, ident, changes) :: Nil

case _ => Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes)
val comment = properties.get(TableCatalog.PROP_COMMENT)
val owner = properties.getOrElse(TableCatalog.PROP_OWNER_NAME, catalogTable.owner)

try {
catalog.alterTable(
catalogTable.copy(properties = properties, schema = schema, comment = comment))
catalogTable
.copy(properties = properties, schema = schema, owner = owner, comment = comment))
} catch {
case _: NoSuchTableException =>
throw new NoSuchTableException(ident)
Expand Down
Loading