Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ license: |
no
</td>
<td>
For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it.
For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it
</td>
</tr>
<tr>
Expand All @@ -331,7 +331,21 @@ license: |
no
</td>
<td>
For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it.
For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it
</td>
</tr>
<tr>
<td>
owner
</td>
<td>
no

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's reserved for both table and namespace, or do I miss something?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, we can update when revert ALTER NAMESPACE SET OWNER.

</td>
<td>
yes
</td>
<td>
For tables, it is determined by the user who runs spark and create the table.
</td>
</tr>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,16 @@ public interface TableCatalog extends CatalogPlugin {
*/
String PROP_PROVIDER = "provider";

/**
* A property to specify the owner of the table.
*/
String PROP_OWNER = "owner";

/**
* The list of reserved table properties.
*/
List<String> RESERVED_PROPERTIES = Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER);
List<String> RESERVED_PROPERTIES =
Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER, PROP_OWNER);

/**
* List the tables in a namespace from the catalog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2680,6 +2680,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
throw new ParseException(s"$PROP_LOCATION is a reserved table property, please use" +
s" the LOCATION clause to specify it.", ctx)
case (PROP_LOCATION, _) => false
case (PROP_OWNER, _) if !legacyOn =>
throw new ParseException(s"$PROP_OWNER is a reserved table property , please use" +
" ALTER TABLE ... SET OWNER ... to specify it.", ctx)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can't be changed after table is created.

case (PROP_OWNER, _) => false
case _ => true
}
}
Expand Down Expand Up @@ -3621,5 +3625,4 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
ctx.ownerType.getText)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

private[sql] object CatalogV2Util {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
Expand Down Expand Up @@ -276,6 +277,10 @@ private[sql] object CatalogV2Util {
location.map(TableCatalog.PROP_LOCATION -> _)
}

def withDefaultOwnership(properties: Map[String, String]): Map[String, String] = {
properties ++ Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName())
}

def createAlterTable(
originalNameParts: Seq[String],
catalog: CatalogPlugin,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.DescribeTableSchema
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier, CaseInsensitiveMap}
import org.apache.spark.sql.connector.catalog.TableCatalog

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change

import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
Expand Down Expand Up @@ -114,31 +114,37 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) =>
CreateTableExec(catalog, ident, schema, parts, props, ifNotExists) :: Nil
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
CreateTableExec(catalog, ident, schema, parts, propsWithOwner, ifNotExists) :: Nil

case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) =>
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
AtomicCreateTableAsSelectExec(
staging, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query),
propsWithOwner, writeOptions, ifNotExists) :: Nil
case _ =>
CreateTableAsSelectExec(
catalog, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
CreateTableAsSelectExec(catalog, ident, parts, query, planLater(query),
propsWithOwner, writeOptions, ifNotExists) :: Nil
}

case RefreshTable(catalog, ident) =>
RefreshTableExec(catalog, ident) :: Nil

case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableExec(staging, ident, schema, parts, props, orCreate = orCreate) :: Nil
AtomicReplaceTableExec(
staging, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
case _ =>
ReplaceTableExec(catalog, ident, schema, parts, props, orCreate = orCreate) :: Nil
ReplaceTableExec(
catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
}

case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) =>
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
Expand All @@ -148,7 +154,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
parts,
query,
planLater(query),
props,
propsWithOwner,
writeOptions,
orCreate = orCreate) :: Nil
case _ =>
Expand All @@ -158,7 +164,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
parts,
query,
planLater(query),
props,
propsWithOwner,
writeOptions,
orCreate = orCreate) :: Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes)
val comment = properties.get(TableCatalog.PROP_COMMENT)
val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner)

try {
catalog.alterTable(
catalogTable.copy(properties = properties, schema = schema, comment = comment))
catalogTable
.copy(properties = properties, schema = schema, owner = owner, comment = comment))
} catch {
case _: NoSuchTableException =>
throw new NoSuchTableException(ident)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.util.Utils

class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with BeforeAndAfter {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
Expand All @@ -37,6 +38,8 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
spark.sessionState.catalogManager.catalog(name).asTableCatalog
}

private val defaultOwnership = Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName())

before {
spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)

Expand Down Expand Up @@ -234,7 +237,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning.isEmpty)
assert(table.properties.isEmpty)
assert(table.properties == defaultOwnership.asJava)
}

test("Create: with using") {
Expand All @@ -249,7 +252,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning.isEmpty)
assert(table.properties === Map("provider" -> "foo").asJava)
assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
}

test("Create: with property") {
Expand All @@ -264,7 +267,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning.isEmpty)
assert(table.properties === Map("prop" -> "value").asJava)
assert(table.properties === (Map("prop" -> "value") ++ defaultOwnership).asJava)
}

test("Create: identity partitioned table") {
Expand All @@ -279,7 +282,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning === Seq(IdentityTransform(FieldReference("id"))))
assert(table.properties.isEmpty)
assert(table.properties == defaultOwnership.asJava)
}

test("Create: partitioned by years(ts)") {
Expand Down Expand Up @@ -368,7 +371,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning === Seq(IdentityTransform(FieldReference("id"))))
assert(table.properties === Map("provider" -> "foo").asJava)
assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
}

test("Replace: basic behavior") {
Expand All @@ -386,7 +389,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning === Seq(IdentityTransform(FieldReference("id"))))
assert(table.properties === Map("provider" -> "foo").asJava)
assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)

spark.table("source2")
.withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd"))
Expand All @@ -405,7 +408,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
.add("data", StringType)
.add("even_or_odd", StringType))
assert(replaced.partitioning.isEmpty)
assert(replaced.properties.isEmpty)
assert(replaced.properties === defaultOwnership.asJava)
}

test("Replace: partitioned table") {
Expand All @@ -422,7 +425,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning.isEmpty)
assert(table.properties === Map("provider" -> "foo").asJava)
assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)

spark.table("source2")
.withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd"))
Expand All @@ -441,7 +444,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
.add("data", StringType)
.add("even_or_odd", StringType))
assert(replaced.partitioning === Seq(IdentityTransform(FieldReference("id"))))
assert(replaced.properties.isEmpty)
assert(replaced.properties === defaultOwnership.asJava)
}

test("Replace: fail if table does not exist") {
Expand All @@ -465,7 +468,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(replaced.name === "testcat.table_name")
assert(replaced.schema === new StructType().add("id", LongType).add("data", StringType))
assert(replaced.partitioning.isEmpty)
assert(replaced.properties.isEmpty)
assert(replaced.properties === defaultOwnership.asJava)
}

test("CreateOrReplace: table exists") {
Expand All @@ -483,7 +486,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.name === "testcat.table_name")
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
assert(table.partitioning === Seq(IdentityTransform(FieldReference("id"))))
assert(table.properties === Map("provider" -> "foo").asJava)
assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)

spark.table("source2")
.withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd"))
Expand All @@ -502,6 +505,6 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
.add("data", StringType)
.add("even_or_odd", StringType))
assert(replaced.partitioning.isEmpty)
assert(replaced.properties.isEmpty)
assert(replaced.properties === defaultOwnership.asJava)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.connector.catalog.TableCatalog

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary

import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -945,7 +946,7 @@ trait AlterTableTests extends SharedSparkSession {

assert(table.name === fullTableName(t))
assert(table.properties ===
Map("provider" -> v2Format, "location" -> "s3://bucket/path").asJava)
withDefaultOwnership(Map("provider" -> v2Format, "location" -> "s3://bucket/path")).asJava)
}
}

Expand All @@ -971,7 +972,8 @@ trait AlterTableTests extends SharedSparkSession {
val table = getTableMetadata(t)

assert(table.name === fullTableName(t))
assert(table.properties === Map("provider" -> v2Format, "test" -> "34").asJava)
assert(table.properties ===
withDefaultOwnership(Map("provider" -> v2Format, "test" -> "34")).asJava)
}
}

Expand All @@ -983,15 +985,15 @@ trait AlterTableTests extends SharedSparkSession {
val table = getTableMetadata(t)

assert(table.name === fullTableName(t))
assert(table.properties === Map("provider" -> v2Format, "test" -> "34").asJava)
assert(table.properties ===
withDefaultOwnership(Map("provider" -> v2Format, "test" -> "34")).asJava)

sql(s"ALTER TABLE $t UNSET TBLPROPERTIES ('test')")

val updated = getTableMetadata(t)

assert(updated.name === fullTableName(t))
assert(updated.properties === Map("provider" -> v2Format).asJava)
assert(updated.properties === withDefaultOwnership(Map("provider" -> v2Format)).asJava)
}
}

}
Loading