From 195589ed73fe298beffbb822d668b34431dc4dd8 Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Thu, 3 Dec 2015 19:53:30 -0500 Subject: [PATCH 1/3] [SPARK-12145][SQL] Command 'Set Role [ADMIN|NONE|ALL]' doesn't work in SQL based authorization --- .../spark/sql/execution/SparkSQLParser.scala | 129 ++++++++++++++++++ .../thriftserver/SparkSQLSessionManager.scala | 2 +- .../apache/spark/sql/hive/HiveContext.scala | 8 +- .../spark/sql/hive/client/HiveClient.scala | 3 +- .../sql/hive/client/HiveClientImpl.scala | 14 +- .../spark/sql/hive/client/HiveShim.scala | 10 +- .../hive/client/IsolatedClientLoader.scala | 6 +- 7 files changed, 154 insertions(+), 18 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala new file mode 100644 index 0000000000000..b6189e1961552 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import scala.util.parsing.combinator.RegexParsers + +import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, ParserInterface, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.types.StringType + +/** + * The top level Spark SQL parser. This parser recognizes syntaxes that are available for all SQL + * dialects supported by Spark SQL, and delegates all the other syntaxes to the `fallback` parser. + * + * @param fallback A function that returns the next parser in the chain. This is a call-by-name + * parameter because this allows us to return a different dialect if we + * have to. + */ +class SparkSQLParser(fallback: => ParserInterface) extends AbstractSparkSQLParser { + + override def parseExpression(sql: String): Expression = fallback.parseExpression(sql) + + override def parseTableIdentifier(sql: String): TableIdentifier = + fallback.parseTableIdentifier(sql) + + // A parser for the key-value part of the "SET [key = [value ]]" syntax + private object SetCommandParser extends RegexParsers { + private val key: Parser[String] = "(?m)[^=]+".r + + private val value: Parser[String] = "(?m).*$".r + + private val output: Seq[Attribute] = Seq(AttributeReference("", StringType, nullable = false)()) + + private val pair: Parser[LogicalPlan] = + (key ~ ("=".r ~> value).?).? ^^ { + case None => SetCommand(None) + case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim))) + } + + def apply(input: String): LogicalPlan = parseAll(pair, input) match { + case Success(plan, _) => plan + case x => sys.error(x.toString) + } + } + + protected val AS = Keyword("AS") + protected val CACHE = Keyword("CACHE") + protected val CLEAR = Keyword("CLEAR") + protected val DESCRIBE = Keyword("DESCRIBE") + protected val EXTENDED = Keyword("EXTENDED") + protected val FUNCTION = Keyword("FUNCTION") + protected val FUNCTIONS = Keyword("FUNCTIONS") + protected val IN = Keyword("IN") + protected val LAZY = Keyword("LAZY") + protected val SET = Keyword("SET") + protected val SHOW = Keyword("SHOW") + protected val TABLE = Keyword("TABLE") + protected val TABLES = Keyword("TABLES") + protected val UNCACHE = Keyword("UNCACHE") + protected val ROLE = Keyword("ROLE") + + override protected lazy val start: Parser[LogicalPlan] = + cache | uncache | setRole | set | show | desc | others + + private lazy val cache: Parser[LogicalPlan] = + CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ { + case isLazy ~ tableName ~ plan => + CacheTableCommand(tableName, plan.map(fallback.parsePlan), isLazy.isDefined) + } + + private lazy val uncache: Parser[LogicalPlan] = + ( UNCACHE ~ TABLE ~> ident ^^ { + case tableName => UncacheTableCommand(tableName) + } + | CLEAR ~ CACHE ^^^ ClearCacheCommand + ) + + private lazy val setRole: Parser[LogicalPlan] = + SET ~ ROLE ~ ident ^^ { + case set ~ role ~ roleName => fallback(List(set, role, roleName).mkString(" ")) + } + + private lazy val set: Parser[LogicalPlan] = + SET ~> restInput ^^ { + case input => SetCommandParser(input) + } + + // It can be the following patterns: + // SHOW FUNCTIONS; + // SHOW FUNCTIONS mydb.func1; + // SHOW FUNCTIONS func1; + // SHOW FUNCTIONS `mydb.a`.`func1.aa`; + private lazy val show: Parser[LogicalPlan] = + ( SHOW ~> TABLES ~ (IN ~> ident).? ^^ { + case _ ~ dbName => ShowTablesCommand(dbName) + } + | SHOW ~ FUNCTIONS ~> ((ident <~ ".").? ~ (ident | stringLit)).? ^^ { + case Some(f) => logical.ShowFunctions(f._1, Some(f._2)) + case None => logical.ShowFunctions(None, None) + } + ) + + private lazy val desc: Parser[LogicalPlan] = + DESCRIBE ~ FUNCTION ~> EXTENDED.? ~ (ident | stringLit) ^^ { + case isExtended ~ functionName => logical.DescribeFunction(functionName, isExtended.isDefined) + } + + private lazy val others: Parser[LogicalPlan] = + wholeInput ^^ { + case input => fallback.parsePlan(input) + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index de4e9c62b57a4..586403a7d0501 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -74,7 +74,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: val ctx = if (hiveContext.hiveThriftServerSingleSession) { hiveContext } else { - hiveContext.newSession() + hiveContext.newSession(username) } ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 05863ae18350d..b33cf3401513c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -100,12 +100,16 @@ class HiveContext private[hive]( * and Hive client (both of execution and metadata) with existing HiveContext. */ override def newSession(): HiveContext = { + newSession() + } + + def newSession(userName: String = null): HiveContext = { new HiveContext( sc = sc, cacheManager = cacheManager, listener = listener, - execHive = executionHive.newSession(), - metaHive = metadataHive.newSession(), + execHive = executionHive.newSession(userName), + metaHive = metadataHive.newSession(userName), isRootContext = false) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index f681cc67041a1..7a1bb5698501f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -184,8 +184,7 @@ private[hive] trait HiveClient { /** Add a jar into class loader */ def addJar(path: String): Unit - /** Return a [[HiveClient]] as new session, that will share the class loader and Hive client */ - def newSession(): HiveClient + def newSession(userName: String = null): HiveClient /** Run a function within Hive state (SessionState, HiveConf, Hive client and class loader) */ def withHiveState[A](f: => A): A diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index cf1ff55c96fc9..52c5285217060 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -58,6 +58,7 @@ import org.apache.spark.util.{CircularBuffer, Utils} * this [[HiveClientImpl]]. */ private[hive] class HiveClientImpl( + val userName: String = null, override val version: HiveVersion, config: Map[String, String], initClassLoader: ClassLoader, @@ -119,13 +120,14 @@ private[hive] class HiveClientImpl( } initialConf.set(k, v) } - val state = new SessionState(initialConf) + val state = new SessionState(initialConf, userName) if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) } SessionState.start(state) state.out = new PrintStream(outputBuffer, true, "UTF-8") state.err = new PrintStream(outputBuffer, true, "UTF-8") + state.setIsHiveServerQuery(true) state } finally { Thread.currentThread().setContextClassLoader(original) @@ -412,13 +414,15 @@ private[hive] class HiveClientImpl( */ protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState { logDebug(s"Running hiveql '$cmd'") - if (cmd.toLowerCase.startsWith("set")) { logDebug(s"Changing config: $cmd") } + if (cmd.toLowerCase.startsWith("set") && !cmd.toLowerCase.startsWith("set role ")) { + logDebug(s"Changing config: $cmd") + } try { val cmd_trimmed: String = cmd.trim() val tokens: Array[String] = cmd_trimmed.split("\\s+") // The remainder of the command. val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() - val proc = shim.getCommandProcessor(tokens(0), conf) + val proc = shim.getCommandProcessor(tokens, conf) proc match { case driver: Driver => val response: CommandProcessorResponse = driver.run(cmd) @@ -521,8 +525,8 @@ private[hive] class HiveClientImpl( runSqlHive(s"ADD JAR $path") } - def newSession(): HiveClientImpl = { - clientLoader.createClient().asInstanceOf[HiveClientImpl] + def newSession(userName: String = null): HiveClientImpl = { + clientLoader.createClient(userName).asInstanceOf[HiveClientImpl] } def reset(): Unit = withHiveState { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 70c10be25be9f..e5be0085c3561 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -67,7 +67,7 @@ private[client] sealed abstract class Shim { def getPartitionsByFilter(hive: Hive, table: Table, predicates: Seq[Expression]): Seq[Partition] - def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor + def getCommandProcessor(token: Array[String], conf: HiveConf): CommandProcessor def getDriverResults(driver: Driver): Seq[String] @@ -213,8 +213,8 @@ private[client] class Shim_v0_12 extends Shim with Logging { getAllPartitions(hive, table) } - override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor = - getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor] + override def getCommandProcessor(token: Array[String], conf: HiveConf): CommandProcessor = + getCommandProcessorMethod.invoke(null, token(0), conf).asInstanceOf[CommandProcessor] override def getDriverResults(driver: Driver): Seq[String] = { val res = new JArrayList[String]() @@ -357,8 +357,8 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { partitions.asScala.toSeq } - override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor = - getCommandProcessorMethod.invoke(null, Array(token), conf).asInstanceOf[CommandProcessor] + override def getCommandProcessor(token: Array[String], conf: HiveConf): CommandProcessor = + getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor] override def getDriverResults(driver: Driver): Seq[String] = { val res = new JArrayList[Object]() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index dca7396ee1ab4..8333276b73a6e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -233,9 +233,9 @@ private[hive] class IsolatedClientLoader( } /** The isolated client interface to Hive. */ - private[hive] def createClient(): HiveClient = { + private[hive] def createClient(userName: String = null): HiveClient = { if (!isolationOn) { - return new HiveClientImpl(version, config, baseClassLoader, this) + return new HiveClientImpl(userName, version, config, baseClassLoader, this) } // Pre-reflective instantiation setup. logDebug("Initializing the logger to avoid disaster...") @@ -246,7 +246,7 @@ private[hive] class IsolatedClientLoader( classLoader .loadClass(classOf[HiveClientImpl].getName) .getConstructors.head - .newInstance(version, config, classLoader, this) + .newInstance(userName, version, config, classLoader, this) .asInstanceOf[HiveClient] } catch { case e: InvocationTargetException => From 739f1111f1e59f81ab74697fb61b16b2816aa311 Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Mon, 18 Jan 2016 21:50:50 -0500 Subject: [PATCH 2/3] Rebase code --- .../scala/org/apache/spark/sql/execution/SparkSQLParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala index b6189e1961552..75fa85cf8e973 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala @@ -94,7 +94,7 @@ class SparkSQLParser(fallback: => ParserInterface) extends AbstractSparkSQLParse private lazy val setRole: Parser[LogicalPlan] = SET ~ ROLE ~ ident ^^ { - case set ~ role ~ roleName => fallback(List(set, role, roleName).mkString(" ")) + case set ~ role ~ roleName => fallback.parsePlan(List(set, role, roleName).mkString(" ")) } private lazy val set: Parser[LogicalPlan] = From 643ae61a424449c7aa2554f5b3f027642f5a6e02 Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Wed, 3 Feb 2016 20:41:02 -0500 Subject: [PATCH 3/3] Update patches since changes in Parser --- .../spark/sql/execution/SparkSQLParser.scala | 129 ------------------ .../spark/sql/hive/client/HiveClient.scala | 1 + 2 files changed, 1 insertion(+), 129 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala deleted file mode 100644 index 75fa85cf8e973..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSQLParser.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import scala.util.parsing.combinator.RegexParsers - -import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, ParserInterface, TableIdentifier} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} -import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.types.StringType - -/** - * The top level Spark SQL parser. This parser recognizes syntaxes that are available for all SQL - * dialects supported by Spark SQL, and delegates all the other syntaxes to the `fallback` parser. - * - * @param fallback A function that returns the next parser in the chain. This is a call-by-name - * parameter because this allows us to return a different dialect if we - * have to. - */ -class SparkSQLParser(fallback: => ParserInterface) extends AbstractSparkSQLParser { - - override def parseExpression(sql: String): Expression = fallback.parseExpression(sql) - - override def parseTableIdentifier(sql: String): TableIdentifier = - fallback.parseTableIdentifier(sql) - - // A parser for the key-value part of the "SET [key = [value ]]" syntax - private object SetCommandParser extends RegexParsers { - private val key: Parser[String] = "(?m)[^=]+".r - - private val value: Parser[String] = "(?m).*$".r - - private val output: Seq[Attribute] = Seq(AttributeReference("", StringType, nullable = false)()) - - private val pair: Parser[LogicalPlan] = - (key ~ ("=".r ~> value).?).? ^^ { - case None => SetCommand(None) - case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim))) - } - - def apply(input: String): LogicalPlan = parseAll(pair, input) match { - case Success(plan, _) => plan - case x => sys.error(x.toString) - } - } - - protected val AS = Keyword("AS") - protected val CACHE = Keyword("CACHE") - protected val CLEAR = Keyword("CLEAR") - protected val DESCRIBE = Keyword("DESCRIBE") - protected val EXTENDED = Keyword("EXTENDED") - protected val FUNCTION = Keyword("FUNCTION") - protected val FUNCTIONS = Keyword("FUNCTIONS") - protected val IN = Keyword("IN") - protected val LAZY = Keyword("LAZY") - protected val SET = Keyword("SET") - protected val SHOW = Keyword("SHOW") - protected val TABLE = Keyword("TABLE") - protected val TABLES = Keyword("TABLES") - protected val UNCACHE = Keyword("UNCACHE") - protected val ROLE = Keyword("ROLE") - - override protected lazy val start: Parser[LogicalPlan] = - cache | uncache | setRole | set | show | desc | others - - private lazy val cache: Parser[LogicalPlan] = - CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ { - case isLazy ~ tableName ~ plan => - CacheTableCommand(tableName, plan.map(fallback.parsePlan), isLazy.isDefined) - } - - private lazy val uncache: Parser[LogicalPlan] = - ( UNCACHE ~ TABLE ~> ident ^^ { - case tableName => UncacheTableCommand(tableName) - } - | CLEAR ~ CACHE ^^^ ClearCacheCommand - ) - - private lazy val setRole: Parser[LogicalPlan] = - SET ~ ROLE ~ ident ^^ { - case set ~ role ~ roleName => fallback.parsePlan(List(set, role, roleName).mkString(" ")) - } - - private lazy val set: Parser[LogicalPlan] = - SET ~> restInput ^^ { - case input => SetCommandParser(input) - } - - // It can be the following patterns: - // SHOW FUNCTIONS; - // SHOW FUNCTIONS mydb.func1; - // SHOW FUNCTIONS func1; - // SHOW FUNCTIONS `mydb.a`.`func1.aa`; - private lazy val show: Parser[LogicalPlan] = - ( SHOW ~> TABLES ~ (IN ~> ident).? ^^ { - case _ ~ dbName => ShowTablesCommand(dbName) - } - | SHOW ~ FUNCTIONS ~> ((ident <~ ".").? ~ (ident | stringLit)).? ^^ { - case Some(f) => logical.ShowFunctions(f._1, Some(f._2)) - case None => logical.ShowFunctions(None, None) - } - ) - - private lazy val desc: Parser[LogicalPlan] = - DESCRIBE ~ FUNCTION ~> EXTENDED.? ~ (ident | stringLit) ^^ { - case isExtended ~ functionName => logical.DescribeFunction(functionName, isExtended.isDefined) - } - - private lazy val others: Parser[LogicalPlan] = - wholeInput ^^ { - case input => fallback.parsePlan(input) - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 7a1bb5698501f..be2719810521f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -184,6 +184,7 @@ private[hive] trait HiveClient { /** Add a jar into class loader */ def addJar(path: String): Unit + /** Return a [[HiveClient]] as new session, that will share the class loader and Hive client */ def newSession(userName: String = null): HiveClient /** Run a function within Hive state (SessionState, HiveConf, Hive client and class loader) */