Skip to content

Commit 18ce1b8

Browse files
committed
Add capability to inherit SessionState (SQL conf, temp tables, registered functions) when forking a new SparkSession.
1 parent 40a4cfc commit 18ce1b8

File tree

4 files changed

+105
-17
lines changed

4 files changed

+105
-17
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,4 +1180,25 @@ class SessionCatalog(
11801180
}
11811181
}
11821182

1183+
/**
1184+
* Get an identical copy of the `SessionCatalog`.
1185+
* The temporary tables and function registry are retained.
1186+
* The table relation cache will not be populated.
1187+
*/
1188+
override def clone: SessionCatalog = {
1189+
val catalog = new SessionCatalog(
1190+
externalCatalog,
1191+
globalTempViewManager,
1192+
functionResourceLoader,
1193+
functionRegistry,
1194+
conf,
1195+
hadoopConf,
1196+
parser)
1197+
1198+
catalog.currentDb = currentDb
1199+
tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2)) // copy over temporary tables
1200+
1201+
catalog
1202+
}
1203+
11831204
}

sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20+
import scala.collection.mutable.ListBuffer
21+
2022
import org.apache.spark.annotation.{Experimental, InterfaceStability}
2123
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2224
import org.apache.spark.sql.catalyst.rules.Rule
@@ -46,4 +48,23 @@ class ExperimentalMethods private[sql]() {
4648

4749
@volatile var extraOptimizations: Seq[Rule[LogicalPlan]] = Nil
4850

51+
/**
52+
* Get an identical copy of this `ExperimentalMethods` instance.
53+
* @note This is used when forking a `SparkSession`.
54+
* `clone` is provided here instead of implementing equivalent functionality
55+
* in `SparkSession.clone` since it needs to be updated
56+
* as the class `ExperimentalMethods` is extended or modified.
57+
*/
58+
override def clone: ExperimentalMethods = {
59+
def cloneSeq[T](seq: Seq[T]): Seq[T] = {
60+
val newSeq = new ListBuffer[T]
61+
newSeq ++= seq
62+
newSeq
63+
}
64+
65+
val result = new ExperimentalMethods
66+
result.extraStrategies = cloneSeq(extraStrategies)
67+
result.extraOptimizations = cloneSeq(extraOptimizations)
68+
result
69+
}
4970
}

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ import org.apache.spark.util.Utils
7272
@InterfaceStability.Stable
7373
class SparkSession private(
7474
@transient val sparkContext: SparkContext,
75-
@transient private val existingSharedState: Option[SharedState])
75+
@transient private val existingSharedState: Option[SharedState],
76+
existingSessionState: Option[SessionState] = None)
7677
extends Serializable with Closeable with Logging { self =>
7778

7879
private[sql] def this(sc: SparkContext) {
@@ -107,9 +108,9 @@ class SparkSession private(
107108
*/
108109
@transient
109110
private[sql] lazy val sessionState: SessionState = {
110-
SparkSession.reflect[SessionState, SparkSession](
111+
existingSessionState.getOrElse(SparkSession.reflect[SessionState, SparkSession](
111112
SparkSession.sessionStateClassName(sparkContext.conf),
112-
self)
113+
self))
113114
}
114115

115116
/**
@@ -201,6 +202,8 @@ class SparkSession private(
201202
/**
202203
* Start a new session with isolated SQL configurations, temporary tables, registered
203204
* functions are isolated, but sharing the underlying `SparkContext` and cached data.
205+
* If inherit is enabled, then SQL configurations, temporary tables, registered functions
206+
* are copied over from parent `SparkSession`.
204207
*
205208
* @note Other than the `SparkContext`, all shared state is initialized lazily.
206209
* This method will force the initialization of the shared state to ensure that parent
@@ -209,8 +212,12 @@ class SparkSession private(
209212
*
210213
* @since 2.0.0
211214
*/
212-
def newSession(): SparkSession = {
213-
new SparkSession(sparkContext, Some(sharedState))
215+
def newSession(inheritSessionState: Boolean = false): SparkSession = {
216+
if (inheritSessionState) {
217+
new SparkSession(sparkContext, Some(sharedState), Some(sessionState.clone))
218+
} else {
219+
new SparkSession(sparkContext, Some(sharedState))
220+
}
214221
}
215222

216223

sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,27 @@ import org.apache.spark.sql.util.ExecutionListenerManager
3838

3939
/**
4040
* A class that holds all session-specific state in a given [[SparkSession]].
41+
* If an `existingSessionState` is supplied, then its members will be copied over.
4142
*/
42-
private[sql] class SessionState(sparkSession: SparkSession) {
43+
private[sql] class SessionState(
44+
sparkSession: SparkSession,
45+
existingSessionState: Option[SessionState] = None) {
4346

4447
// Note: These are all lazy vals because they depend on each other (e.g. conf) and we
4548
// want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs.
4649

4750
/**
4851
* SQL-specific key-value configurations.
4952
*/
50-
lazy val conf: SQLConf = new SQLConf
53+
lazy val conf: SQLConf = {
54+
val result = new SQLConf
55+
if (existingSessionState.nonEmpty) {
56+
existingSessionState.get.conf.getAllConfs.foreach {
57+
case (k, v) => if (v ne null) result.setConfString(k, v)
58+
}
59+
}
60+
result
61+
}
5162

5263
def newHadoopConf(): Configuration = {
5364
val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)
@@ -65,12 +76,29 @@ private[sql] class SessionState(sparkSession: SparkSession) {
6576
hadoopConf
6677
}
6778

68-
lazy val experimentalMethods = new ExperimentalMethods
79+
lazy val experimentalMethods: ExperimentalMethods = {
80+
existingSessionState
81+
.map(_.experimentalMethods.clone)
82+
.getOrElse(new ExperimentalMethods)
83+
}
6984

7085
/**
7186
* Internal catalog for managing functions registered by the user.
7287
*/
73-
lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy()
88+
lazy val functionRegistry: FunctionRegistry = {
89+
val registry = FunctionRegistry.builtin.copy()
90+
91+
if (existingSessionState.nonEmpty) {
92+
val sourceRegistry = existingSessionState.get.functionRegistry
93+
sourceRegistry
94+
.listFunction()
95+
.foreach(name => registry.registerFunction(
96+
name,
97+
sourceRegistry.lookupFunction(name).get,
98+
sourceRegistry.lookupFunctionBuilder(name).get))
99+
}
100+
registry
101+
}
74102

75103
/**
76104
* A class for loading resources specified by a function.
@@ -93,14 +121,18 @@ private[sql] class SessionState(sparkSession: SparkSession) {
93121
/**
94122
* Internal catalog for managing table and database states.
95123
*/
96-
lazy val catalog = new SessionCatalog(
97-
sparkSession.sharedState.externalCatalog,
98-
sparkSession.sharedState.globalTempViewManager,
99-
functionResourceLoader,
100-
functionRegistry,
101-
conf,
102-
newHadoopConf(),
103-
sqlParser)
124+
lazy val catalog: SessionCatalog = {
125+
existingSessionState
126+
.map(_.catalog.clone)
127+
.getOrElse(new SessionCatalog(
128+
sparkSession.sharedState.externalCatalog,
129+
sparkSession.sharedState.globalTempViewManager,
130+
functionResourceLoader,
131+
functionRegistry,
132+
conf,
133+
newHadoopConf(),
134+
sqlParser))
135+
}
104136

105137
/**
106138
* Interface exposed to the user for registering user-defined functions.
@@ -165,6 +197,13 @@ private[sql] class SessionState(sparkSession: SparkSession) {
165197
conf.setConfString(k, v)
166198
}
167199

200+
/**
201+
* Get an identical copy of the `SessionState`.
202+
*/
203+
override def clone: SessionState = {
204+
new SessionState(sparkSession, Some(this))
205+
}
206+
168207
// ------------------------------------------------------
169208
// Helper methods, partially leftover from pre-2.0 days
170209
// ------------------------------------------------------

0 commit comments

Comments
 (0)