Skip to content

Commit 496f62d

Browse files
pwendellmarmbrus
authored andcommitted
SPARK-3025 [SQL]: Allow JDBC clients to set a fair scheduler pool
This definitely needs review as I am not familiar with this part of Spark. I tested this locally and it did seem to work. Author: Patrick Wendell <[email protected]> Closes apache#1937 from pwendell/scheduler and squashes the following commits: b858e33 [Patrick Wendell] SPARK-3025: Allow JDBC clients to set a fair scheduler pool (cherry picked from commit 6bca889) Signed-off-by: Michael Armbrust <[email protected]>
1 parent 4da76fc commit 496f62d

File tree

3 files changed

+28
-7
lines changed

3 files changed

+28
-7
lines changed

docs/sql-programming-guide.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,11 @@ Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
605605

606606
You may also use the beeline script comes with Hive.
607607

608+
To set a [Fair Scheduler](job-scheduling.html#fair-scheduler-pools) pool for a JDBC client session,
609+
users can set the `spark.sql.thriftserver.scheduler.pool` variable:
610+
611+
SET spark.sql.thriftserver.scheduler.pool=accounting;
612+
608613
### Migration Guide for Shark Users
609614

610615
#### Reducer number

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ private[spark] object SQLConf {
3333
val DIALECT = "spark.sql.dialect"
3434
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
3535

36+
// This is only used for the thriftserver
37+
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
38+
3639
object Deprecated {
3740
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
3841
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,24 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver.server
1919

20-
import scala.collection.JavaConversions._
21-
import scala.collection.mutable.ArrayBuffer
22-
import scala.math.{random, round}
23-
2420
import java.sql.Timestamp
2521
import java.util.{Map => JMap}
2622

23+
import scala.collection.JavaConversions._
24+
import scala.collection.mutable.{ArrayBuffer, Map}
25+
import scala.math.{random, round}
26+
2727
import org.apache.hadoop.hive.common.`type`.HiveDecimal
2828
import org.apache.hadoop.hive.metastore.api.FieldSchema
2929
import org.apache.hive.service.cli._
3030
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
3131
import org.apache.hive.service.cli.session.HiveSession
32-
3332
import org.apache.spark.Logging
33+
import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
34+
import org.apache.spark.sql.catalyst.plans.logical.SetCommand
3435
import org.apache.spark.sql.catalyst.types._
35-
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
3636
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
37-
import org.apache.spark.sql.{SchemaRDD, Row => SparkRow}
37+
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
3838

3939
/**
4040
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
@@ -43,6 +43,9 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage
4343
val handleToOperation = ReflectionUtils
4444
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
4545

46+
// TODO: Currenlty this will grow infinitely, even as sessions expire
47+
val sessionToActivePool = Map[HiveSession, String]()
48+
4649
override def newExecuteStatementOperation(
4750
parentSession: HiveSession,
4851
statement: String,
@@ -165,8 +168,18 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage
165168
try {
166169
result = hiveContext.sql(statement)
167170
logDebug(result.queryExecution.toString())
171+
result.queryExecution.logical match {
172+
case SetCommand(Some(key), Some(value)) if (key == SQLConf.THRIFTSERVER_POOL) =>
173+
sessionToActivePool(parentSession) = value
174+
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
175+
case _ =>
176+
}
177+
168178
val groupId = round(random * 1000000).toString
169179
hiveContext.sparkContext.setJobGroup(groupId, statement)
180+
sessionToActivePool.get(parentSession).foreach { pool =>
181+
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
182+
}
170183
iter = {
171184
val resultRdd = result.queryExecution.toRdd
172185
val useIncrementalCollect =

0 commit comments

Comments
 (0)