-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session has an identical copy of the SessionState #16826
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 35 commits
18ce1b8
9beb78d
a343d8a
4210079
6da6bda
579d0b7
2837e73
8c00344
f423f74
b1371d8
2cee190
e2bbfa8
8ac778a
0c732ce
3c995e1
292011a
b027412
295ee41
847b484
9beba84
3d2e4a6
4f70d12
dd2dedd
8a8d47b
ffc2058
16824f9
fd11ee2
437b0bc
300d3a0
3ee271f
c3f052f
0bdc81c
2740c63
0f167db
2f0b1ad
c41e7bc
5eb6733
05abcf8
4c23e7a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -75,7 +75,10 @@ class SessionCatalog( | |
|
|
||
| // For testing only. | ||
| def this(externalCatalog: ExternalCatalog) { | ||
| this(externalCatalog, new SimpleFunctionRegistry, new SimpleCatalystConf(true)) | ||
| this( | ||
| externalCatalog, | ||
| new SimpleFunctionRegistry, | ||
| SimpleCatalystConf(caseSensitiveAnalysis = true)) | ||
| } | ||
|
|
||
| /** List of temporary tables, mapping from table name to their logical plan. */ | ||
|
|
@@ -1182,4 +1185,34 @@ class SessionCatalog( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Get an identical copy of the `SessionCatalog`. | ||
|
||
| * The temporary views and function registry are retained. | ||
| * The table relation cache will not be populated. | ||
| * @note `externalCatalog` and `globalTempViewManager` are from shared state, do not need deep | ||
| * copy. `FunctionResourceLoader` is effectively stateless, also does not need deep copy. | ||
| * All arguments passed in should be associated with a particular `SparkSession`. | ||
| */ | ||
| def clone( | ||
|
||
| conf: CatalystConf, | ||
| hadoopConf: Configuration, | ||
| functionRegistry: FunctionRegistry, | ||
| parser: ParserInterface): SessionCatalog = { | ||
| val catalog = new SessionCatalog( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you are copying... shouldnt all the fields be copied, instead of just being reused? |
||
| externalCatalog, | ||
| globalTempViewManager, | ||
| functionResourceLoader, | ||
| functionRegistry, | ||
| conf, | ||
| hadoopConf, | ||
| parser) | ||
|
|
||
| synchronized { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. resolved
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doesnt look like. its still
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, I should mention we decided to keep it
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it should be |
||
| catalog.currentDb = currentDb | ||
| // copy over temporary tables | ||
| tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2)) | ||
| } | ||
|
|
||
| catalog | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,8 @@ | |
|
|
||
| package org.apache.spark.sql.catalyst.catalog | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
|
|
||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.{FunctionIdentifier, SimpleCatalystConf, TableIdentifier} | ||
| import org.apache.spark.sql.catalyst.analysis._ | ||
|
|
@@ -1197,6 +1199,65 @@ class SessionCatalogSuite extends PlanTest { | |
| } | ||
| } | ||
|
|
||
| test("clone SessionCatalog - temp views") { | ||
| val externalCatalog = newEmptyCatalog() | ||
| val original = new SessionCatalog(externalCatalog) | ||
| val tempTable1 = Range(1, 10, 1, 10) | ||
| original.createTempView("copytest1", tempTable1, overrideIfExists = false) | ||
|
|
||
| // check if tables copied over | ||
| val clone = original.clone( | ||
| SimpleCatalystConf(caseSensitiveAnalysis = true), | ||
| new Configuration(), | ||
| new SimpleFunctionRegistry, | ||
| CatalystSqlParser) | ||
| assert(original ne clone) | ||
| assert(clone.getTempView("copytest1") == Option(tempTable1)) | ||
|
||
|
|
||
| // check if clone and original independent | ||
| clone.dropTable(TableIdentifier("copytest1"), ignoreIfNotExists = false, purge = false) | ||
| assert(original.getTempView("copytest1") == Option(tempTable1)) | ||
|
|
||
| val tempTable2 = Range(1, 20, 2, 10) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should also test that
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a test for this. |
||
| original.createTempView("copytest2", tempTable2, overrideIfExists = false) | ||
| assert(clone.getTempView("copytest2").isEmpty) | ||
| } | ||
|
|
||
| test("clone SessionCatalog - current db") { | ||
| val externalCatalog = newEmptyCatalog() | ||
| externalCatalog.createDatabase(newDb("copytest1"), true) | ||
| externalCatalog.createDatabase(newDb("copytest2"), true) | ||
| externalCatalog.createDatabase(newDb("copytest3"), true) | ||
|
|
||
| val original = new SessionCatalog(externalCatalog) | ||
| val tempTable1 = Range(1, 10, 1, 10) | ||
| val db1 = "copytest1" | ||
| original.createTempView(db1, tempTable1, overrideIfExists = false) | ||
|
||
| original.setCurrentDatabase(db1) | ||
|
|
||
| // check if current db copied over | ||
| val clone = original.clone( | ||
| SimpleCatalystConf(caseSensitiveAnalysis = true), | ||
| new Configuration(), | ||
| new SimpleFunctionRegistry, | ||
| CatalystSqlParser) | ||
| assert(original ne clone) | ||
| assert(clone.getCurrentDatabase == db1) | ||
|
|
||
| // check if clone and original independent | ||
| val db2 = "copytest2" | ||
| val tempTable2 = Range(1, 20, 2, 20) | ||
| clone.createTempView(db2, tempTable2, overrideIfExists = false) | ||
|
||
| clone.setCurrentDatabase(db2) | ||
| assert(original.getCurrentDatabase == db1) | ||
|
|
||
| val db3 = "copytest3" | ||
| val tempTable3 = Range(1, 30, 2, 30) | ||
| original.createTempView(db3, tempTable3, overrideIfExists = false) | ||
| original.setCurrentDatabase(db3) | ||
| assert(clone.getCurrentDatabase == db2) | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about? test("clone SessionCatalog - current db") {
val externalCatalog = newEmptyCatalog()
val db1 = "db1"
val db2 = "db2"
val db3 = "db3"
externalCatalog.createDatabase(newDb(db1), ignoreIfExists = true)
externalCatalog.createDatabase(newDb(db2), ignoreIfExists = true)
externalCatalog.createDatabase(newDb(db3), ignoreIfExists = true)
val original = new SessionCatalog(externalCatalog)
original.setCurrentDatabase(db1)
// check if current db copied over
val clone = original.clone(
SimpleCatalystConf(caseSensitiveAnalysis = true),
new Configuration(),
new SimpleFunctionRegistry,
CatalystSqlParser)
assert(original != clone)
assert(clone.getCurrentDatabase == db1)
// check if clone and original independent
clone.setCurrentDatabase(db2)
assert(original.getCurrentDatabase == db1)
original.setCurrentDatabase(db3)
assert(clone.getCurrentDatabase == db2)
}
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
|
||
| test("SPARK-19737: detect undefined functions without triggering relation resolution") { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this supposed to be part of this PR? |
||
| import org.apache.spark.sql.catalyst.dsl.plans._ | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,4 +46,12 @@ class ExperimentalMethods private[sql]() { | |
|
|
||
| @volatile var extraOptimizations: Seq[Rule[LogicalPlan]] = Nil | ||
|
|
||
| override def clone(): ExperimentalMethods = { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It sounds like we also need to add sync for both
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, added a sync block.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gatorsmile any reason you want to add
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I reviewing the code, I just found they are not thread-safe, although they have already been declared volatile. I am fine to keep them unchanged. How about leaving a comment in the code to emphasize that it is not thread-safe. |
||
| val result = new ExperimentalMethods | ||
| synchronized { | ||
| result.extraStrategies = extraStrategies | ||
| result.extraOptimizations = extraOptimizations | ||
| } | ||
| result | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,6 @@ import java.io.Closeable | |
| import java.util.concurrent.atomic.AtomicReference | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.reflect.ClassTag | ||
| import scala.reflect.runtime.universe.TypeTag | ||
| import scala.util.control.NonFatal | ||
|
|
||
|
|
@@ -43,7 +42,7 @@ import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} | |
| import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION | ||
| import org.apache.spark.sql.sources.BaseRelation | ||
| import org.apache.spark.sql.streaming._ | ||
| import org.apache.spark.sql.types.{DataType, LongType, StructType} | ||
| import org.apache.spark.sql.types.{DataType, StructType} | ||
| import org.apache.spark.sql.util.ExecutionListenerManager | ||
| import org.apache.spark.util.Utils | ||
|
|
||
|
|
@@ -67,15 +66,26 @@ import org.apache.spark.util.Utils | |
| * .config("spark.some.config.option", "some-value") | ||
| * .getOrCreate() | ||
| * }}} | ||
| * | ||
| * @param sparkContext The Spark context associated with this Spark session. | ||
| * @param existingSharedState If supplied, use the existing shared state | ||
| * instead of creating a new one. | ||
| * @param parentSessionState If supplied, inherit all session state (i.e. temporary | ||
| * views, SQL config, UDFs etc) from parent. | ||
| */ | ||
| @InterfaceStability.Stable | ||
| class SparkSession private( | ||
| @transient val sparkContext: SparkContext, | ||
| @transient private val existingSharedState: Option[SharedState]) | ||
| @transient private val existingSharedState: Option[SharedState], | ||
| @transient private val parentSessionState: Option[SessionState]) | ||
| extends Serializable with Closeable with Logging { self => | ||
|
|
||
| private[sql] def this(sc: SparkContext, existingSharedState: Option[SharedState]) { | ||
| this(sc, existingSharedState, None) | ||
| } | ||
|
|
||
| private[sql] def this(sc: SparkContext) { | ||
| this(sc, None) | ||
| this(sc, None, None) | ||
| } | ||
|
|
||
| sparkContext.assertNotStopped() | ||
|
|
@@ -108,6 +118,7 @@ class SparkSession private( | |
| /** | ||
| * State isolated across sessions, including SQL configurations, temporary tables, registered | ||
| * functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]]. | ||
| * If `parentSessionState` is not null, the `SessionState` will be a copy of the parent. | ||
| * | ||
| * This is internal to Spark and there is no guarantee on interface stability. | ||
| * | ||
|
|
@@ -116,9 +127,11 @@ class SparkSession private( | |
| @InterfaceStability.Unstable | ||
| @transient | ||
| lazy val sessionState: SessionState = { | ||
| SparkSession.reflect[SessionState, SparkSession]( | ||
| SparkSession.sessionStateClassName(sparkContext.conf), | ||
| self) | ||
| parentSessionState | ||
| .map(_.clone(this)) | ||
| .getOrElse(SparkSession.instantiateSessionState( | ||
| SparkSession.sessionStateClassName(sparkContext.conf), | ||
| self)) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -211,6 +224,29 @@ class SparkSession private( | |
| new SparkSession(sparkContext, Some(sharedState)) | ||
| } | ||
|
|
||
| /** | ||
| * :: Experimental :: | ||
| * Create an identical copy of this `SparkSession`, sharing the underlying `SparkContext` | ||
| * and cached data. All the state of this session (i.e. SQL configurations, temporary tables, | ||
| * registered functions) is copied over, and the cloned session is set up with the same shared | ||
| * state as this session. The cloned session is independent of this session, that is, any | ||
| * non-global change in either session is not reflected in the other. | ||
| * | ||
| * @note Other than the `SparkContext`, all shared state is initialized lazily. | ||
| * This method will force the initialization of the shared state to ensure that parent | ||
| * and child sessions are set up with the same shared state. If the underlying catalog | ||
| * implementation is Hive, this will initialize the metastore, which may take some time. | ||
| * | ||
| * @since 2.2.0 | ||
| */ | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: please add
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not remove the boolean flag and just call this cloneSession?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That seems cleaner, fixed. |
||
| @Experimental | ||
| @InterfaceStability.Evolving | ||
| def cloneSession(): SparkSession = { | ||
| val result = new SparkSession(sparkContext, Some(sharedState), Some(sessionState)) | ||
| result.sessionState // force copy of SessionState | ||
| result | ||
| } | ||
|
|
||
|
|
||
| /* --------------------------------- * | ||
| | Methods for creating DataFrames | | ||
|
|
@@ -883,7 +919,6 @@ object SparkSession { | |
| } | ||
| }) | ||
| } | ||
|
|
||
| return session | ||
| } | ||
| } | ||
|
|
@@ -971,16 +1006,18 @@ object SparkSession { | |
| } | ||
|
|
||
| /** | ||
| * Helper method to create an instance of [[T]] using a single-arg constructor that | ||
| * accepts an [[Arg]]. | ||
| * Helper method to create an instance of `SessionState` based on `className` from conf. | ||
| * The result is either `SessionState` or `HiveSessionState`. | ||
| */ | ||
| private def reflect[T, Arg <: AnyRef]( | ||
| private def instantiateSessionState( | ||
| className: String, | ||
| ctorArg: Arg)(implicit ctorArgTag: ClassTag[Arg]): T = { | ||
| sparkSession: SparkSession): SessionState = { | ||
|
|
||
| try { | ||
| // get `SessionState.apply(SparkSession)` | ||
| val clazz = Utils.classForName(className) | ||
| val ctor = clazz.getDeclaredConstructor(ctorArgTag.runtimeClass) | ||
| ctor.newInstance(ctorArg).asInstanceOf[T] | ||
| val method = clazz.getMethod("apply", sparkSession.getClass) | ||
| method.invoke(null, sparkSession).asInstanceOf[SessionState] | ||
| } catch { | ||
| case NonFatal(e) => | ||
| throw new IllegalArgumentException(s"Error while instantiating '$className':", e) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,7 +66,8 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] { | |
| * Preprocess [[CreateTable]], to do some normalization and checking. | ||
| */ | ||
| case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[LogicalPlan] { | ||
| private val catalog = sparkSession.sessionState.catalog | ||
| // catalog is a def and not a val/lazy val as the latter would introduce a circular reference | ||
| private def catalog = sparkSession.sessionState.catalog | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add comments saying that this should be a def and not a val or lazy val as this would introduce circular references.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point |
||
|
|
||
| def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
| // When we CREATE TABLE without specifying the table schema, we should fail the query if | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for adding the param name