diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 7e0def60e531a..2ddbdb9d88d71 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -317,7 +317,7 @@ license: |
no
- For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it.
+ For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it
|
@@ -331,7 +331,21 @@ license: |
no
|
- For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it.
+ For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it
+ |
+
+
+ |
+ owner
+ |
+
+ no
+ |
+
+ yes
+ |
+
+ For tables, it is determined by the user who runs spark and create the table.
|
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index 32c6f8f2cde16..591e1c631be13 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -56,10 +56,16 @@ public interface TableCatalog extends CatalogPlugin {
*/
String PROP_PROVIDER = "provider";
+ /**
+ * A property to specify the owner of the table.
+ */
+ String PROP_OWNER = "owner";
+
/**
* The list of reserved table properties.
*/
- List RESERVED_PROPERTIES = Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER);
+ List RESERVED_PROPERTIES =
+ Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER, PROP_OWNER);
/**
* List the tables in a namespace from the catalog.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index d09c53ed919cb..2050ec3399303 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2680,6 +2680,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
throw new ParseException(s"$PROP_LOCATION is a reserved table property, please use" +
s" the LOCATION clause to specify it.", ctx)
case (PROP_LOCATION, _) => false
+ case (PROP_OWNER, _) if !legacyOn =>
+ throw new ParseException(s"$PROP_OWNER is a reserved table property, it will be" +
+ s" set to the current user by default.", ctx)
+ case (PROP_OWNER, _) => false
case _ => true
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 3ee22548ca3be..a4c7b4c3a2894 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.Utils
private[sql] object CatalogV2Util {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@@ -275,6 +276,10 @@ private[sql] object CatalogV2Util {
location.map(TableCatalog.PROP_LOCATION -> _)
}
+ def withDefaultOwnership(properties: Map[String, String]): Map[String, String] = {
+ properties ++ Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName())
+ }
+
def getTableProviderCatalog(
provider: SupportsCatalogOptions,
catalogManager: CatalogManager,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 7169a437359a7..c6d8a12a94549 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
@@ -114,31 +114,37 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) =>
- CreateTableExec(catalog, ident, schema, parts, props, ifNotExists) :: Nil
+ val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
+ CreateTableExec(catalog, ident, schema, parts, propsWithOwner, ifNotExists) :: Nil
case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) =>
+ val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
- AtomicCreateTableAsSelectExec(
- staging, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
+ AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query),
+ propsWithOwner, writeOptions, ifNotExists) :: Nil
case _ =>
- CreateTableAsSelectExec(
- catalog, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
+ CreateTableAsSelectExec(catalog, ident, parts, query, planLater(query),
+ propsWithOwner, writeOptions, ifNotExists) :: Nil
}
case RefreshTable(catalog, ident) =>
RefreshTableExec(catalog, ident) :: Nil
case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
+ val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
catalog match {
case staging: StagingTableCatalog =>
- AtomicReplaceTableExec(staging, ident, schema, parts, props, orCreate = orCreate) :: Nil
+ AtomicReplaceTableExec(
+ staging, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
case _ =>
- ReplaceTableExec(catalog, ident, schema, parts, props, orCreate = orCreate) :: Nil
+ ReplaceTableExec(
+ catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
}
case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) =>
+ val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
@@ -148,7 +154,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
parts,
query,
planLater(query),
- props,
+ propsWithOwner,
writeOptions,
orCreate = orCreate) :: Nil
case _ =>
@@ -158,7 +164,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
parts,
query,
planLater(query),
- props,
+ propsWithOwner,
writeOptions,
orCreate = orCreate) :: Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index ddb2926eb6c9a..8eea1cf9c06e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -125,10 +125,12 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes)
val comment = properties.get(TableCatalog.PROP_COMMENT)
+ val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner)
try {
catalog.alterTable(
- catalogTable.copy(properties = properties, schema = schema, comment = comment))
+ catalogTable
+ .copy(properties = properties, schema = schema, owner = owner, comment = comment))
} catch {
case _: NoSuchTableException =>
throw new NoSuchTableException(ident)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
index ce0a5f21fd7ec..4e6381aea3c31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
+import org.apache.spark.util.Utils
class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with BeforeAndAfter {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@@ -37,6 +38,8 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
spark.sessionState.catalogManager.catalog(name).asTableCatalog
}
+ private val defaultOwnership = Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName())
+
before {
spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
@@ -234,7 +237,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning.isEmpty)
- assert(table.properties.isEmpty)
+ assert(table.properties == defaultOwnership.asJava)
}
test("Create: with using") {
@@ -249,7 +252,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning.isEmpty)
- assert(table.properties === Map("provider" -> "foo").asJava)
+ assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
}
test("Create: with property") {
@@ -264,7 +267,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning.isEmpty)
- assert(table.properties === Map("prop" -> "value").asJava)
+ assert(table.properties === (Map("prop" -> "value") ++ defaultOwnership).asJava)
}
test("Create: identity partitioned table") {
@@ -279,7 +282,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning === Seq(IdentityTransform(FieldReference("id"))))
- assert(table.properties.isEmpty)
+ assert(table.properties == defaultOwnership.asJava)
}
test("Create: partitioned by years(ts)") {
@@ -368,7 +371,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning === Seq(IdentityTransform(FieldReference("id"))))
- assert(table.properties === Map("provider" -> "foo").asJava)
+ assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
}
test("Replace: basic behavior") {
@@ -386,7 +389,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning === Seq(IdentityTransform(FieldReference("id"))))
- assert(table.properties === Map("provider" -> "foo").asJava)
+ assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
spark.table("source2")
.withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd"))
@@ -405,7 +408,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
.add("data", StringType)
.add("even_or_odd", StringType))
assert(replaced.partitioning.isEmpty)
- assert(replaced.properties.isEmpty)
+ assert(replaced.properties === defaultOwnership.asJava)
}
test("Replace: partitioned table") {
@@ -422,7 +425,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning.isEmpty)
- assert(table.properties === Map("provider" -> "foo").asJava)
+ assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
spark.table("source2")
.withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd"))
@@ -441,7 +444,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
.add("data", StringType)
.add("even_or_odd", StringType))
assert(replaced.partitioning === Seq(IdentityTransform(FieldReference("id"))))
- assert(replaced.properties.isEmpty)
+ assert(replaced.properties === defaultOwnership.asJava)
}
test("Replace: fail if table does not exist") {
@@ -465,7 +468,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(replaced.name === "testcat.table_name")
assert(replaced.schema === new StructType().add("id", LongType).add("data", StringType))
assert(replaced.partitioning.isEmpty)
- assert(replaced.properties.isEmpty)
+ assert(replaced.properties === defaultOwnership.asJava)
}
test("CreateOrReplace: table exists") {
@@ -483,7 +486,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning === Seq(IdentityTransform(FieldReference("id"))))
- assert(table.properties === Map("provider" -> "foo").asJava)
+ assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
spark.table("source2")
.withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd"))
@@ -502,6 +505,6 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
.add("data", StringType)
.add("even_or_odd", StringType))
assert(replaced.partitioning.isEmpty)
- assert(replaced.properties.isEmpty)
+ assert(replaced.properties === defaultOwnership.asJava)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index d304d5b2ca6a2..2fc5020c39ade 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -945,7 +946,7 @@ trait AlterTableTests extends SharedSparkSession {
assert(table.name === fullTableName(t))
assert(table.properties ===
- Map("provider" -> v2Format, "location" -> "s3://bucket/path").asJava)
+ withDefaultOwnership(Map("provider" -> v2Format, "location" -> "s3://bucket/path")).asJava)
}
}
@@ -971,7 +972,8 @@ trait AlterTableTests extends SharedSparkSession {
val table = getTableMetadata(t)
assert(table.name === fullTableName(t))
- assert(table.properties === Map("provider" -> v2Format, "test" -> "34").asJava)
+ assert(table.properties ===
+ withDefaultOwnership(Map("provider" -> v2Format, "test" -> "34")).asJava)
}
}
@@ -983,15 +985,15 @@ trait AlterTableTests extends SharedSparkSession {
val table = getTableMetadata(t)
assert(table.name === fullTableName(t))
- assert(table.properties === Map("provider" -> v2Format, "test" -> "34").asJava)
+ assert(table.properties ===
+ withDefaultOwnership(Map("provider" -> v2Format, "test" -> "34")).asJava)
sql(s"ALTER TABLE $t UNSET TBLPROPERTIES ('test')")
val updated = getTableMetadata(t)
assert(updated.name === fullTableName(t))
- assert(updated.properties === Map("provider" -> v2Format).asJava)
+ assert(updated.properties === withDefaultOwnership(Map("provider" -> v2Format)).asJava)
}
}
-
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 4c5b1d95b12da..e65030f715204 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
+import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
import org.apache.spark.sql.sources.SimpleScanSource
@@ -41,6 +42,7 @@ class DataSourceV2SQLSuite
private val v2Source = classOf[FakeV2Provider].getName
override protected val v2Format = v2Source
override protected val catalogAndNamespace = "testcat.ns1.ns2."
+ private val defaultUser: String = Utils.getCurrentUserName()
private def catalog(name: String): CatalogPlugin = {
spark.sessionState.catalogManager.catalog(name)
@@ -94,7 +96,7 @@ class DataSourceV2SQLSuite
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
- assert(table.properties == Map("provider" -> "foo").asJava)
+ assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType()
.add("id", LongType, nullable = false)
.add("data", StringType))
@@ -160,6 +162,7 @@ class DataSourceV2SQLSuite
Array("Comment", "this is a test table", ""),
Array("Location", "/tmp/testcat/table_name", ""),
Array("Provider", "foo", ""),
+ Array(TableCatalog.PROP_OWNER.capitalize, defaultUser, ""),
Array("Table Properties", "[bar=baz]", "")))
}
@@ -172,7 +175,7 @@ class DataSourceV2SQLSuite
assert(table.name == "default.table_name")
assert(table.partitioning.isEmpty)
- assert(table.properties == Map("provider" -> v2Source).asJava)
+ assert(table.properties == withDefaultOwnership(Map("provider" -> v2Source)).asJava)
assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
@@ -187,7 +190,7 @@ class DataSourceV2SQLSuite
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
- assert(table.properties == Map("provider" -> "foo").asJava)
+ assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
// run a second create query that should fail
@@ -201,7 +204,7 @@ class DataSourceV2SQLSuite
val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table2.name == "testcat.table_name")
assert(table2.partitioning.isEmpty)
- assert(table2.properties == Map("provider" -> "foo").asJava)
+ assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table2.schema == new StructType().add("id", LongType).add("data", StringType))
// check that the table is still empty
@@ -218,7 +221,7 @@ class DataSourceV2SQLSuite
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
- assert(table.properties == Map("provider" -> "foo").asJava)
+ assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
spark.sql("CREATE TABLE IF NOT EXISTS testcat.table_name (id bigint, data string) USING bar")
@@ -227,7 +230,7 @@ class DataSourceV2SQLSuite
val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table2.name == "testcat.table_name")
assert(table2.partitioning.isEmpty)
- assert(table2.properties == Map("provider" -> "foo").asJava)
+ assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table2.schema == new StructType().add("id", LongType).add("data", StringType))
// check that the table is still empty
@@ -244,7 +247,7 @@ class DataSourceV2SQLSuite
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
- assert(table.properties == Map("provider" -> "foo").asJava)
+ assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
// check that the table is empty
@@ -266,7 +269,7 @@ class DataSourceV2SQLSuite
assert(table.name == identifier)
assert(table.partitioning.isEmpty)
- assert(table.properties == Map("provider" -> "foo").asJava)
+ assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType()
.add("id", LongType)
.add("data", StringType))
@@ -293,7 +296,7 @@ class DataSourceV2SQLSuite
assert(replacedTable != originalTable, "Table should have been replaced.")
assert(replacedTable.name == identifier)
assert(replacedTable.partitioning.isEmpty)
- assert(replacedTable.properties == Map("provider" -> "foo").asJava)
+ assert(replacedTable.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(replacedTable.schema == new StructType().add("id", LongType))
val rdd = spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows)
@@ -431,7 +434,7 @@ class DataSourceV2SQLSuite
assert(table.name == "default.table_name")
assert(table.partitioning.isEmpty)
- assert(table.properties == Map("provider" -> v2Source).asJava)
+ assert(table.properties == withDefaultOwnership(Map("provider" -> v2Source)).asJava)
assert(table.schema == new StructType()
.add("id", LongType)
.add("data", StringType))
@@ -448,7 +451,7 @@ class DataSourceV2SQLSuite
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
- assert(table.properties == Map("provider" -> "foo").asJava)
+ assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType()
.add("id", LongType)
.add("data", StringType))
@@ -468,7 +471,7 @@ class DataSourceV2SQLSuite
val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table2.name == "testcat.table_name")
assert(table2.partitioning.isEmpty)
- assert(table2.properties == Map("provider" -> "foo").asJava)
+ assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table2.schema == new StructType()
.add("id", LongType)
.add("data", StringType))
@@ -486,7 +489,7 @@ class DataSourceV2SQLSuite
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
- assert(table.properties == Map("provider" -> "foo").asJava)
+ assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType()
.add("id", LongType)
.add("data", StringType))
@@ -517,7 +520,7 @@ class DataSourceV2SQLSuite
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
- assert(table.properties == Map("provider" -> "foo").asJava)
+ assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType()
.add("id", LongType)
.add("data", StringType))
@@ -557,7 +560,7 @@ class DataSourceV2SQLSuite
assert(table.name == identifier)
assert(table.partitioning.isEmpty)
- assert(table.properties == Map("provider" -> "foo").asJava)
+ assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType().add("i", "int"))
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
@@ -1059,7 +1062,7 @@ class DataSourceV2SQLSuite
Row("Namespace Name", "ns2"),
Row("Description", "test namespace"),
Row("Location", "/tmp/ns_test"),
- Row("Owner Name", Utils.getCurrentUserName()),
+ Row("Owner Name", defaultUser),
Row("Owner Type", "USER")
))
}
@@ -1075,7 +1078,7 @@ class DataSourceV2SQLSuite
Row("Namespace Name", "ns2"),
Row("Description", "test namespace"),
Row("Location", "/tmp/ns_test"),
- Row("Owner Name", Utils.getCurrentUserName()),
+ Row("Owner Name", defaultUser),
Row("Owner Type", "USER"),
Row("Properties", "((a,b),(b,a),(c,c))")
))
@@ -1123,7 +1126,7 @@ class DataSourceV2SQLSuite
Row("Namespace Name", "ns2"),
Row("Description", "test namespace"),
Row("Location", "/tmp/ns_test_2"),
- Row("Owner Name", Utils.getCurrentUserName()),
+ Row("Owner Name", defaultUser),
Row("Owner Type", "USER")
))
}
@@ -1923,22 +1926,23 @@ class DataSourceV2SQLSuite
test("SHOW TBLPROPERTIES: v2 table") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
- val owner = "andrew"
+ val user = "andrew"
val status = "new"
val provider = "foo"
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING $provider " +
- s"TBLPROPERTIES ('owner'='$owner', 'status'='$status')")
+ s"TBLPROPERTIES ('user'='$user', 'status'='$status')")
- val properties = sql(s"SHOW TBLPROPERTIES $t")
+ val properties = sql(s"SHOW TBLPROPERTIES $t").orderBy("key")
val schema = new StructType()
.add("key", StringType, nullable = false)
.add("value", StringType, nullable = false)
val expected = Seq(
- Row("owner", owner),
+ Row(TableCatalog.PROP_OWNER, defaultUser),
+ Row("provider", provider),
Row("status", status),
- Row("provider", provider))
+ Row("user", user))
assert(properties.schema === schema)
assert(expected === properties.collect())
@@ -1948,11 +1952,11 @@ class DataSourceV2SQLSuite
test("SHOW TBLPROPERTIES(key): v2 table") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
- val owner = "andrew"
+ val user = "andrew"
val status = "new"
val provider = "foo"
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING $provider " +
- s"TBLPROPERTIES ('owner'='$owner', 'status'='$status')")
+ s"TBLPROPERTIES ('user'='$user', 'status'='$status')")
val properties = sql(s"SHOW TBLPROPERTIES $t ('status')")
@@ -1967,7 +1971,7 @@ class DataSourceV2SQLSuite
withTable(t) {
val nonExistingKey = "nonExistingKey"
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo " +
- s"TBLPROPERTIES ('owner'='andrew', 'status'='new')")
+ s"TBLPROPERTIES ('user'='andrew', 'status'='new')")
val properties = sql(s"SHOW TBLPROPERTIES $t ('$nonExistingKey')")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 03874d005a6e6..ca292f65efeee 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -40,8 +40,8 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
import org.apache.spark.sql.hive.client.HiveClient
@@ -635,12 +635,16 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
k.startsWith(CREATED_SPARK_VERSION)
}
val newTableProps = propsFromOldTable ++ tableDefinition.properties + partitionProviderProp
+
+ // // Add old table's owner if we need to restore
+ val owner = Option(tableDefinition.owner).filter(_.nonEmpty).getOrElse(oldTableDef.owner)
val newDef = tableDefinition.copy(
storage = newStorage,
schema = oldTableDef.schema,
partitionColumnNames = oldTableDef.partitionColumnNames,
bucketSpec = oldTableDef.bucketSpec,
- properties = newTableProps)
+ properties = newTableProps,
+ owner = owner)
client.alterTable(newDef)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index b3f7fc4d0557e..59eadb844837e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog}
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.{PROP_OWNER_NAME, PROP_OWNER_TYPE}
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.functions._
@@ -418,13 +418,23 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
catalog.reset()
}
}
+
+ test("Table Ownership") {
+ val catalog = spark.sessionState.catalog
+ try {
+ sql(s"CREATE TABLE spark_30019(k int)")
+ assert(sql(s"DESCRIBE TABLE EXTENDED spark_30019").where("col_name='Owner'")
+ .collect().head.getString(1) === Utils.getCurrentUserName())
+ } finally {
+ catalog.reset()
+ }
+ }
}
class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
import testImplicits._
val hiveFormats = Seq("PARQUET", "ORC", "TEXTFILE", "SEQUENCEFILE", "RCFILE", "AVRO")
- private val reversedProperties = Seq("ownerName", "ownerType")
override def afterEach(): Unit = {
try {