Skip to content

Commit f82461f

Browse files
hvanhovellcloud-fan
authored andcommitted
[SPARK-20126][SQL] Remove HiveSessionState
## What changes were proposed in this pull request? Commit ea36116 moved most of the logic from the SessionState classes into an accompanying builder. This makes the existence of the `HiveSessionState` redundant. This PR removes the `HiveSessionState`. ## How was this patch tested? Existing tests. Author: Herman van Hovell <[email protected]> Closes #17457 from hvanhovell/SPARK-20126.
1 parent 4fcc214 commit f82461f

File tree

14 files changed

+104
-193
lines changed

14 files changed

+104
-193
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ case class AddJarCommand(path: String) extends RunnableCommand {
3737
}
3838

3939
override def run(sparkSession: SparkSession): Seq[Row] = {
40-
sparkSession.sessionState.addJar(path)
40+
sparkSession.sessionState.resourceLoader.addJar(path)
4141
Seq(Row(0))
4242
}
4343
}

sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ private[sql] class SessionState(
6363
val optimizer: Optimizer,
6464
val planner: SparkPlanner,
6565
val streamingQueryManager: StreamingQueryManager,
66+
val resourceLoader: SessionResourceLoader,
6667
createQueryExecution: LogicalPlan => QueryExecution,
6768
createClone: (SparkSession, SessionState) => SessionState) {
6869

@@ -106,27 +107,6 @@ private[sql] class SessionState(
106107
def refreshTable(tableName: String): Unit = {
107108
catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
108109
}
109-
110-
/**
111-
* Add a jar path to [[SparkContext]] and the classloader.
112-
*
113-
* Note: this method seems not access any session state, but the subclass `HiveSessionState` needs
114-
* to add the jar to its hive client for the current session. Hence, it still needs to be in
115-
* [[SessionState]].
116-
*/
117-
def addJar(path: String): Unit = {
118-
sparkContext.addJar(path)
119-
val uri = new Path(path).toUri
120-
val jarURL = if (uri.getScheme == null) {
121-
// `path` is a local file path without a URL scheme
122-
new File(path).toURI.toURL
123-
} else {
124-
// `path` is a URL with a scheme
125-
uri.toURL
126-
}
127-
sharedState.jarClassLoader.addURL(jarURL)
128-
Thread.currentThread().setContextClassLoader(sharedState.jarClassLoader)
129-
}
130110
}
131111

132112
private[sql] object SessionState {
@@ -160,15 +140,36 @@ class SessionStateBuilder(
160140
* Session shared [[FunctionResourceLoader]].
161141
*/
162142
@InterfaceStability.Unstable
163-
class SessionFunctionResourceLoader(session: SparkSession) extends FunctionResourceLoader {
143+
class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoader {
164144
override def loadResource(resource: FunctionResource): Unit = {
165145
resource.resourceType match {
166-
case JarResource => session.sessionState.addJar(resource.uri)
146+
case JarResource => addJar(resource.uri)
167147
case FileResource => session.sparkContext.addFile(resource.uri)
168148
case ArchiveResource =>
169149
throw new AnalysisException(
170150
"Archive is not allowed to be loaded. If YARN mode is used, " +
171151
"please use --archives options while calling spark-submit.")
172152
}
173153
}
154+
155+
/**
156+
* Add a jar path to [[SparkContext]] and the classloader.
157+
*
158+
* Note: this method seems not access any session state, but the subclass `HiveSessionState` needs
159+
* to add the jar to its hive client for the current session. Hence, it still needs to be in
160+
* [[SessionState]].
161+
*/
162+
def addJar(path: String): Unit = {
163+
session.sparkContext.addJar(path)
164+
val uri = new Path(path).toUri
165+
val jarURL = if (uri.getScheme == null) {
166+
// `path` is a local file path without a URL scheme
167+
new File(path).toURI.toURL
168+
} else {
169+
// `path` is a URL with a scheme
170+
uri.toURL
171+
}
172+
session.sharedState.jarClassLoader.addURL(jarURL)
173+
Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader)
174+
}
174175
}

sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ abstract class BaseSessionStateBuilder(
109109
*/
110110
protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
111111

112+
/**
113+
* ResourceLoader that is used to load function resources and jars.
114+
*/
115+
protected lazy val resourceLoader: SessionResourceLoader = new SessionResourceLoader(session)
116+
112117
/**
113118
* Catalog for managing table and database states. If there is a pre-existing catalog, the state
114119
* of that catalog (temp tables & current database) will be copied into the new catalog.
@@ -123,7 +128,7 @@ abstract class BaseSessionStateBuilder(
123128
conf,
124129
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
125130
sqlParser,
126-
new SessionFunctionResourceLoader(session))
131+
resourceLoader)
127132
parentState.foreach(_.catalog.copyStateTo(catalog))
128133
catalog
129134
}
@@ -251,6 +256,7 @@ abstract class BaseSessionStateBuilder(
251256
optimizer,
252257
planner,
253258
streamingQueryManager,
259+
resourceLoader,
254260
createQueryExecution,
255261
createClone)
256262
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.io.PrintStream
2222
import org.apache.spark.{SparkConf, SparkContext}
2323
import org.apache.spark.internal.Logging
2424
import org.apache.spark.sql.{SparkSession, SQLContext}
25-
import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
25+
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
2626
import org.apache.spark.util.Utils
2727

2828
/** A singleton object for the master program. The slaves should not access this. */
@@ -49,10 +49,12 @@ private[hive] object SparkSQLEnv extends Logging {
4949
sparkContext = sparkSession.sparkContext
5050
sqlContext = sparkSession.sqlContext
5151

52-
val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
53-
sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
54-
sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
55-
sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
52+
val metadataHive = sparkSession
53+
.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
54+
.client.newSession()
55+
metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
56+
metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
57+
metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
5658
sparkSession.conf.set("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
5759
}
5860
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.hive.service.cli.session.HiveSession
2626

2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.sql.SQLContext
29-
import org.apache.spark.sql.hive.HiveSessionState
29+
import org.apache.spark.sql.hive.HiveUtils
3030
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation}
3131

3232
/**
@@ -49,8 +49,8 @@ private[thriftserver] class SparkSQLOperationManager()
4949
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
5050
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
5151
s" initialized or had already closed.")
52-
val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
53-
val runInBackground = async && sessionState.hiveThriftServerAsync
52+
val conf = sqlContext.sessionState.conf
53+
val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
5454
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
5555
runInBackground)(sqlContext, sessionToActivePool)
5656
handleToOperation.put(operation.getHandle, operation)

sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
3939
private val originalLocale = Locale.getDefault
4040
private val originalColumnBatchSize = TestHive.conf.columnBatchSize
4141
private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning
42-
private val originalConvertMetastoreOrc = TestHive.sessionState.convertMetastoreOrc
42+
private val originalConvertMetastoreOrc = TestHive.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
4343
private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
4444
private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone
4545

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,6 @@ class HiveContext private[hive](_sparkSession: SparkSession)
4848
new HiveContext(sparkSession.newSession())
4949
}
5050

51-
protected[sql] override def sessionState: HiveSessionState = {
52-
sparkSession.sessionState.asInstanceOf[HiveSessionState]
53-
}
54-
5551
/**
5652
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
5753
* Spark SQL or the external data source library it uses might cache certain metadata about a

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.sql.types._
4444
*/
4545
private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
4646
// these are def_s and not val/lazy val since the latter would introduce circular references
47-
private def sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
47+
private def sessionState = sparkSession.sessionState
4848
private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
4949
import HiveMetastoreCatalog._
5050

@@ -281,12 +281,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
281281
object ParquetConversions extends Rule[LogicalPlan] {
282282
private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = {
283283
relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
284-
sessionState.convertMetastoreParquet
284+
sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
285285
}
286286

287287
private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = {
288288
val fileFormatClass = classOf[ParquetFileFormat]
289-
val mergeSchema = sessionState.convertMetastoreParquetWithSchemaMerging
289+
val mergeSchema = sessionState.conf.getConf(
290+
HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
290291
val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString)
291292

292293
convertToLogicalRelation(relation, options, fileFormatClass, "parquet")
@@ -316,7 +317,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
316317
object OrcConversions extends Rule[LogicalPlan] {
317318
private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = {
318319
relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
319-
sessionState.convertMetastoreOrc
320+
sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
320321
}
321322

322323
private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = {

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala

Lines changed: 23 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -17,121 +17,24 @@
1717

1818
package org.apache.spark.sql.hive
1919

20-
import org.apache.spark.SparkContext
2120
import org.apache.spark.annotation.{Experimental, InterfaceStability}
2221
import org.apache.spark.sql._
23-
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
24-
import org.apache.spark.sql.catalyst.optimizer.Optimizer
25-
import org.apache.spark.sql.catalyst.parser.ParserInterface
22+
import org.apache.spark.sql.catalyst.analysis.Analyzer
2623
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2724
import org.apache.spark.sql.catalyst.rules.Rule
28-
import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner}
25+
import org.apache.spark.sql.execution.SparkPlanner
2926
import org.apache.spark.sql.execution.datasources._
3027
import org.apache.spark.sql.hive.client.HiveClient
31-
import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionFunctionResourceLoader, SessionState, SharedState, SQLConf}
32-
import org.apache.spark.sql.streaming.StreamingQueryManager
33-
28+
import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState}
3429

3530
/**
36-
* A class that holds all session-specific state in a given [[SparkSession]] backed by Hive.
37-
*
38-
* @param sparkContext The [[SparkContext]].
39-
* @param sharedState The shared state.
40-
* @param conf SQL-specific key-value configurations.
41-
* @param experimentalMethods The experimental methods.
42-
* @param functionRegistry Internal catalog for managing functions registered by the user.
43-
* @param catalog Internal catalog for managing table and database states that uses Hive client for
44-
* interacting with the metastore.
45-
* @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
46-
* @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations.
47-
* @param optimizer Logical query plan optimizer.
48-
* @param planner Planner that converts optimized logical plans to physical plans and that takes
49-
* Hive-specific strategies into account.
50-
* @param streamingQueryManager Interface to start and stop streaming queries.
51-
* @param createQueryExecution Function used to create QueryExecution objects.
52-
* @param createClone Function used to create clones of the session state.
53-
* @param metadataHive The Hive metadata client.
31+
* Entry object for creating a Hive aware [[SessionState]].
5432
*/
55-
private[hive] class HiveSessionState(
56-
sparkContext: SparkContext,
57-
sharedState: SharedState,
58-
conf: SQLConf,
59-
experimentalMethods: ExperimentalMethods,
60-
functionRegistry: FunctionRegistry,
61-
override val catalog: HiveSessionCatalog,
62-
sqlParser: ParserInterface,
63-
analyzer: Analyzer,
64-
optimizer: Optimizer,
65-
planner: SparkPlanner,
66-
streamingQueryManager: StreamingQueryManager,
67-
createQueryExecution: LogicalPlan => QueryExecution,
68-
createClone: (SparkSession, SessionState) => SessionState,
69-
val metadataHive: HiveClient)
70-
extends SessionState(
71-
sparkContext,
72-
sharedState,
73-
conf,
74-
experimentalMethods,
75-
functionRegistry,
76-
catalog,
77-
sqlParser,
78-
analyzer,
79-
optimizer,
80-
planner,
81-
streamingQueryManager,
82-
createQueryExecution,
83-
createClone) {
84-
85-
// ------------------------------------------------------
86-
// Helper methods, partially leftover from pre-2.0 days
87-
// ------------------------------------------------------
88-
89-
override def addJar(path: String): Unit = {
90-
metadataHive.addJar(path)
91-
super.addJar(path)
92-
}
93-
94-
/**
95-
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
96-
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
97-
* SerDe.
98-
*/
99-
def convertMetastoreParquet: Boolean = {
100-
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
101-
}
102-
103-
/**
104-
* When true, also tries to merge possibly different but compatible Parquet schemas in different
105-
* Parquet data files.
106-
*
107-
* This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true.
108-
*/
109-
def convertMetastoreParquetWithSchemaMerging: Boolean = {
110-
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
111-
}
112-
113-
/**
114-
* When true, enables an experimental feature where metastore tables that use the Orc SerDe
115-
* are automatically converted to use the Spark SQL ORC table scan, instead of the Hive
116-
* SerDe.
117-
*/
118-
def convertMetastoreOrc: Boolean = {
119-
conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
120-
}
121-
122-
/**
123-
* When true, Hive Thrift server will execute SQL queries asynchronously using a thread pool."
124-
*/
125-
def hiveThriftServerAsync: Boolean = {
126-
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
127-
}
128-
}
129-
13033
private[hive] object HiveSessionState {
13134
/**
132-
* Create a new [[HiveSessionState]] for the given session.
35+
* Create a new Hive aware [[SessionState]]. for the given session.
13336
*/
134-
def apply(session: SparkSession): HiveSessionState = {
37+
def apply(session: SparkSession): SessionState = {
13538
new HiveSessionStateBuilder(session).build()
13639
}
13740
}
@@ -147,6 +50,14 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
14750
private def externalCatalog: HiveExternalCatalog =
14851
session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
14952

53+
/**
54+
* Create a Hive aware resource loader.
55+
*/
56+
override protected lazy val resourceLoader: HiveSessionResourceLoader = {
57+
val client: HiveClient = externalCatalog.client.newSession()
58+
new HiveSessionResourceLoader(session, client)
59+
}
60+
15061
/**
15162
* Create a [[HiveSessionCatalog]].
15263
*/
@@ -159,7 +70,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
15970
conf,
16071
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
16172
sqlParser,
162-
new SessionFunctionResourceLoader(session))
73+
resourceLoader)
16374
parentState.foreach(_.catalog.copyStateTo(catalog))
16475
catalog
16576
}
@@ -217,23 +128,14 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
217128
}
218129

219130
override protected def newBuilder: NewBuilder = new HiveSessionStateBuilder(_, _)
131+
}
220132

221-
override def build(): HiveSessionState = {
222-
val metadataHive: HiveClient = externalCatalog.client.newSession()
223-
new HiveSessionState(
224-
session.sparkContext,
225-
session.sharedState,
226-
conf,
227-
experimentalMethods,
228-
functionRegistry,
229-
catalog,
230-
sqlParser,
231-
analyzer,
232-
optimizer,
233-
planner,
234-
streamingQueryManager,
235-
createQueryExecution,
236-
createClone,
237-
metadataHive)
133+
class HiveSessionResourceLoader(
134+
session: SparkSession,
135+
client: HiveClient)
136+
extends SessionResourceLoader(session) {
137+
override def addJar(path: String): Unit = {
138+
client.addJar(path)
139+
super.addJar(path)
238140
}
239141
}

0 commit comments

Comments
 (0)