Skip to content

Commit 4bad4b6

Browse files
wangyumdongjoon-hyun
authored andcommitted
[SPARK-45454][SQL] Set the table's default owner to current_user
### What changes were proposed in this pull request? This PR sets the table's default owner to `CURRENT_USER`. ### Why are the changes needed? In thrift server mode, the owner of the table is inconsistent with the `SELECT CURRENT_USER();`, the owner of the table is always the user who started the thrift server. ### Does this PR introduce _any_ user-facing change? The table owner may be changed to `CURRENT_USER`. For example: ``` Before this PR: yumwangG9L07H60PK spark-3.5.0-bin-hadoop3 % bin/beeline -u "jdbc:hive2://localhost:10000/" -n test_table_owner -e "create table t(id int) using parquet; desc formatted t;" | grep Owner Connecting to jdbc:hive2://localhost:10000/ Connected to: Spark SQL (version 3.5.0) Driver: Hive JDBC (version 2.3.9) Transaction isolation: TRANSACTION_REPEATABLE_READ No rows selected (0.36 seconds) No rows selected (0.1 seconds) | Owner | yumwang | | 16 rows selected (0.055 seconds) Beeline version 2.3.9 by Apache Hive Closing: 0: jdbc:hive2://localhost:10000/ After this PR: yumwangG9L07H60PK spark-4.0.0-SNAPSHOT-bin-3.3.6 % bin/beeline -u "jdbc:hive2://localhost:10000/" -n test_table_owner -e "create table t(id int) using parquet; desc formatted t;" | grep Owner Connecting to jdbc:hive2://localhost:10000/ Connected to: Spark SQL (version 4.0.0-SNAPSHOT) Driver: Hive JDBC (version 2.3.9) Transaction isolation: TRANSACTION_REPEATABLE_READ No rows selected (0.719 seconds) No rows selected (0.335 seconds) | Owner | test_table_owner | | 16 rows selected (0.065 seconds) Beeline version 2.3.9 by Apache Hive Closing: 0: jdbc:hive2://localhost:10000/ ``` ### How was this patch tested? Unit test and manual test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#43264 from wangyum/SPARK-45454. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 2d6d09b commit 4bad4b6

File tree

7 files changed

+58
-7
lines changed

7 files changed

+58
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CurrentUserContext.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,14 @@
1717

1818
package org.apache.spark.sql.catalyst
1919

20+
import org.apache.spark.util.Utils
21+
2022
object CurrentUserContext {
2123
val CURRENT_USER: InheritableThreadLocal[String] = new InheritableThreadLocal[String] {
2224
override protected def initialValue(): String = null
2325
}
26+
27+
def getCurrentUser: String = Option(CURRENT_USER.get()).getOrElse(Utils.getCurrentUserName())
28+
29+
def getCurrentUserOrEmpty: String = Option(CURRENT_USER.get()).getOrElse("")
2430
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.json4s.jackson.JsonMethods._
3030

3131
import org.apache.spark.internal.Logging
3232
import org.apache.spark.sql.AnalysisException
33-
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
33+
import org.apache.spark.sql.catalyst.{CurrentUserContext, FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
3434
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedLeafNode}
3535
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
3636
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
@@ -241,7 +241,7 @@ case class CatalogTable(
241241
provider: Option[String] = None,
242242
partitionColumnNames: Seq[String] = Seq.empty,
243243
bucketSpec: Option[BucketSpec] = None,
244-
owner: String = "",
244+
owner: String = CurrentUserContext.getCurrentUserOrEmpty,
245245
createTime: Long = System.currentTimeMillis,
246246
lastAccessTime: Long = -1,
247247
createVersion: String = "",

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
1919

2020
import java.time.{Instant, LocalDateTime}
2121

22-
import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
22+
import org.apache.spark.sql.catalyst.CurrentUserContext
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.plans.logical._
2525
import org.apache.spark.sql.catalyst.rules._
@@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
2929
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros, localDateTimeToMicros}
3030
import org.apache.spark.sql.connector.catalog.CatalogManager
3131
import org.apache.spark.sql.types._
32-
import org.apache.spark.util.Utils
3332

3433

3534
/**
@@ -109,7 +108,7 @@ case class ReplaceCurrentLike(catalogManager: CatalogManager) extends Rule[Logic
109108
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
110109
val currentNamespace = catalogManager.currentNamespace.quoted
111110
val currentCatalog = catalogManager.currentCatalog.name()
112-
val currentUser = Option(CURRENT_USER.get()).getOrElse(Utils.getCurrentUserName())
111+
val currentUser = CurrentUserContext.getCurrentUser
113112

114113
plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) {
115114
case CurrentDatabase() =>

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.Collections
2323
import scala.jdk.CollectionConverters._
2424

2525
import org.apache.spark.sql.AnalysisException
26+
import org.apache.spark.sql.catalyst.CurrentUserContext
2627
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
2728
import org.apache.spark.sql.catalyst.expressions.Literal
2829
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
@@ -34,7 +35,6 @@ import org.apache.spark.sql.connector.expressions.LiteralValue
3435
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
3536
import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, MetadataBuilder, StructField, StructType}
3637
import org.apache.spark.sql.util.CaseInsensitiveStringMap
37-
import org.apache.spark.util.Utils
3838

3939
private[sql] object CatalogV2Util {
4040
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@@ -423,7 +423,7 @@ private[sql] object CatalogV2Util {
423423
}
424424

425425
def withDefaultOwnership(properties: Map[String, String]): Map[String, String] = {
426-
properties ++ Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName())
426+
properties ++ Map(TableCatalog.PROP_OWNER -> CurrentUserContext.getCurrentUser)
427427
}
428428

429429
def getTableProviderCatalog(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CatalogSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20+
import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
2021
import org.apache.spark.sql.catalyst.TableIdentifier
2122
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
2223
import org.apache.spark.sql.types.StructType
@@ -34,4 +35,20 @@ class CatalogSuite extends AnalysisTest {
3435
provider = Some("parquet"))
3536
table.toLinkedHashMap
3637
}
38+
39+
test("SPARK-45454: Set table owner to current_user") {
40+
val testOwner = "test_table_owner"
41+
try {
42+
CURRENT_USER.set(testOwner)
43+
val table = CatalogTable(
44+
identifier = TableIdentifier("tbl", Some("db1")),
45+
tableType = CatalogTableType.MANAGED,
46+
storage = CatalogStorageFormat.empty,
47+
schema = new StructType().add("col1", "int").add("col2", "string"),
48+
provider = Some("parquet"))
49+
assert(table.owner === testOwner)
50+
} finally {
51+
CURRENT_USER.remove()
52+
}
53+
}
3754
}

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
2626

2727
import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
2828
import org.apache.spark.sql._
29+
import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
2930
import org.apache.spark.sql.catalyst.InternalRow
3031
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchDatabaseException, NoSuchNamespaceException, TableAlreadyExistsException}
3132
import org.apache.spark.sql.catalyst.parser.ParseException
@@ -3294,6 +3295,18 @@ class DataSourceV2SQLSuiteV1Filter
32943295
}
32953296
}
32963297

3298+
test("SPARK-45454: Set table owner to current_user if it is set") {
3299+
val testOwner = "test_table_owner"
3300+
try {
3301+
CURRENT_USER.set(testOwner)
3302+
spark.sql("CREATE TABLE testcat.table_name (id int) USING foo")
3303+
val table = catalog("testcat").asTableCatalog.loadTable(Identifier.of(Array(), "table_name"))
3304+
assert(table.properties.get(TableCatalog.PROP_OWNER) === testOwner)
3305+
} finally {
3306+
CURRENT_USER.remove()
3307+
}
3308+
}
3309+
32973310
private def testNotSupportedV2Command(
32983311
sqlCommand: String,
32993312
sqlParams: String,

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.hive.service.cli.{GetInfoType, HiveSQLException, OperationHand
2424

2525
import org.apache.spark.{ErrorMessageFormat, TaskKilled}
2626
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
27+
import org.apache.spark.sql.catalyst.TableIdentifier
2728
import org.apache.spark.sql.internal.SQLConf
2829

2930
trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
@@ -254,6 +255,21 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
254255
assertThrows[SQLException](rs1.beforeFirst())
255256
}
256257
}
258+
259+
test("SPARK-45454: Set table owner to current_user") {
260+
val testOwner = "test_table_owner"
261+
val tableName = "t"
262+
withTable(tableName) {
263+
withCLIServiceClient(testOwner) { client =>
264+
val sessionHandle = client.openSession(testOwner, "")
265+
val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]
266+
val exec: String => OperationHandle = client.executeStatement(sessionHandle, _, confOverlay)
267+
exec(s"CREATE TABLE $tableName(id int) using parquet")
268+
val owner = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).owner
269+
assert(owner === testOwner)
270+
}
271+
}
272+
}
257273
}
258274

259275
class ThriftServerWithSparkContextInBinarySuite extends ThriftServerWithSparkContextSuite {

0 commit comments

Comments
 (0)