Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
18ce1b8
Add capability to inherit SessionState (SQL conf, temp tables, regist…
kunalkhamar Feb 3, 2017
9beb78d
Add tests for forking new session with inherit config enabled. Update…
kunalkhamar Feb 6, 2017
a343d8a
Fix constructor default args for bytecode compatibility.
kunalkhamar Feb 6, 2017
4210079
Incorporate feedback. Fix association of incorrect SparkSession while…
kunalkhamar Feb 10, 2017
6da6bda
Update spark version. Rename clone to copy, in order to avoid Java Ob…
kunalkhamar Feb 10, 2017
579d0b7
Make lazy vals strict.
kunalkhamar Feb 14, 2017
2837e73
Refactor SessionState to remove passing of base SessionState, and ini…
kunalkhamar Feb 16, 2017
8c00344
Remove unused import.
kunalkhamar Feb 16, 2017
f423f74
Remove SparkSession reference from SessionState.
kunalkhamar Feb 16, 2017
b1371d8
Merge branch 'master' into fork-sparksession
kunalkhamar Feb 16, 2017
2cee190
Fix initialization loop.
kunalkhamar Feb 17, 2017
e2bbfa8
Fix var name error.
kunalkhamar Feb 17, 2017
8ac778a
Add tests. Refactor. Temporarily disable subtest SPARK-18360: default…
kunalkhamar Feb 18, 2017
0c732ce
Merge branch 'master' into fork-sparksession
kunalkhamar Feb 21, 2017
3c995e1
Fix copy of SessionCatalog. Changes from review.
kunalkhamar Feb 21, 2017
292011a
Merge branch 'fork-sparksession' of github.com:kunalkhamar/spark into…
kunalkhamar Feb 21, 2017
b027412
Merge branch 'master' into fork-sparksession
kunalkhamar Feb 21, 2017
295ee41
Add synchronized blocks. Ignore hive metastore tests for now.
kunalkhamar Feb 21, 2017
847b484
Merge branch 'fork-sparksession' of github.com:kunalkhamar/spark into…
kunalkhamar Feb 21, 2017
9beba84
Add tests. Force copy of session state on cloneSession.
kunalkhamar Feb 22, 2017
3d2e4a6
Rename copy to clone() to work around copy method of case classes. Mo…
kunalkhamar Feb 22, 2017
4f70d12
Fix HiveSessionState clone.
kunalkhamar Feb 22, 2017
dd2dedd
Add tests for HiveSessionState. Review feedback.
kunalkhamar Feb 23, 2017
8a8d47b
Simplify TestSQLContext. Review feedback.
kunalkhamar Feb 23, 2017
ffc2058
(attempt to) Fix tests.
kunalkhamar Feb 24, 2017
16824f9
Review.
kunalkhamar Feb 24, 2017
fd11ee2
Update test case.
kunalkhamar Feb 24, 2017
437b0bc
Add throwing exception if wrong SessionState clone is called. Update …
kunalkhamar Feb 25, 2017
300d3a0
Most of the changes from review.
kunalkhamar Feb 28, 2017
3ee271f
All but one review feedback.
kunalkhamar Mar 6, 2017
c3f052f
Merge remote-tracking branch 'origin/master' into pr16826
zsxwing Mar 6, 2017
0bdc81c
Merge remote-tracking branch 'origin/master' into pr16826
zsxwing Mar 6, 2017
2740c63
Fix tests
zsxwing Mar 6, 2017
0f167db
Clean up tests
zsxwing Mar 6, 2017
2f0b1ad
fix SessionCatalogSuite
zsxwing Mar 7, 2017
c41e7bc
More cleanup
zsxwing Mar 7, 2017
5eb6733
More tests
zsxwing Mar 7, 2017
05abcf8
Update tests and a param comment.
kunalkhamar Mar 7, 2017
4c23e7a
Merge branch 'master' into fork-sparksession
kunalkhamar Mar 8, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1180,4 +1180,25 @@ class SessionCatalog(
}
}

/**
* Get an identical copy of the `SessionCatalog`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct. I would like to say

Create a new SessionCatalog with `currentDb` and the existing temp tables.

* The temporary tables and function registry are retained.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

temporary tables -> temporary views

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

* The table relation cache will not be populated.
*/
override def clone: SessionCatalog = {
val catalog = new SessionCatalog(
Copy link
Contributor

@tdas tdas Feb 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are copying... shouldnt all the fields be copied, instead of just being reused?
you should test that any updates to the copied catalog should not get reflected in the earlier catalog.

externalCatalog,
globalTempViewManager,
functionResourceLoader,
functionRegistry,
conf,
hadoopConf,
parser)

catalog.currentDb = currentDb
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes!

tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2)) // copy over temporary tables

catalog
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import scala.collection.mutable.ListBuffer

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -46,4 +48,23 @@ class ExperimentalMethods private[sql]() {

@volatile var extraOptimizations: Seq[Rule[LogicalPlan]] = Nil

/**
* Get an identical copy of this `ExperimentalMethods` instance.
* @note This is used when forking a `SparkSession`.
* `clone` is provided here instead of implementing equivalent functionality
* in `SparkSession.clone` since it needs to be updated
* as the class `ExperimentalMethods` is extended or modified.
*/
override def clone: ExperimentalMethods = {
def cloneSeq[T](seq: Seq[T]): Seq[T] = {
val newSeq = new ListBuffer[T]
newSeq ++= seq
newSeq
}

val result = new ExperimentalMethods
result.extraStrategies = cloneSeq(extraStrategies)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to copy these two Seqs since they are not mutable Seqs.

result.extraOptimizations = cloneSeq(extraOptimizations)
result
}
}
33 changes: 28 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{DataType, LongType, StructType}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.ExecutionListenerManager
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -72,11 +72,16 @@ import org.apache.spark.util.Utils
@InterfaceStability.Stable
class SparkSession private(
@transient val sparkContext: SparkContext,
@transient private val existingSharedState: Option[SharedState])
@transient private val existingSharedState: Option[SharedState],
existingSessionState: Option[SessionState])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add docs for the fields.

extends Serializable with Closeable with Logging { self =>

private[sql] def this(sc: SparkContext, existingSharedState: Option[SharedState]) {
this(sc, existingSharedState, None)
}

private[sql] def this(sc: SparkContext) {
this(sc, None)
this(sc, None, None)
}

sparkContext.assertNotStopped()
Expand Down Expand Up @@ -107,9 +112,9 @@ class SparkSession private(
*/
@transient
private[sql] lazy val sessionState: SessionState = {
SparkSession.reflect[SessionState, SparkSession](
existingSessionState.getOrElse(SparkSession.reflect[SessionState, SparkSession](
SparkSession.sessionStateClassName(sparkContext.conf),
self)
self))
}

/**
Expand Down Expand Up @@ -213,6 +218,24 @@ class SparkSession private(
new SparkSession(sparkContext, Some(sharedState))
}

/**
* Start a new session, sharing the underlying `SparkContext` and cached data.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add :: Experimental ::

* If inheritSessionState is enabled, then SQL configurations, temporary tables,
* registered functions are copied over from parent `SparkSession`.
*
* @note Other than the `SparkContext`, all shared state is initialized lazily.
* This method will force the initialization of the shared state to ensure that parent
* and child sessions are set up with the same shared state. If the underlying catalog
* implementation is Hive, this will initialize the metastore, which may take some time.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please add @Experimental and @InterfaceStability.Evolving

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not remove the boolean flag and just call this cloneSession?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems cleaner, fixed.

def newSession(inheritSessionState: Boolean): SparkSession = {
if (inheritSessionState) {
new SparkSession(sparkContext, Some(sharedState), Some(sessionState.clone))
} else {
newSession()
}
}


/* --------------------------------- *
| Methods for creating DataFrames |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,31 @@ import org.apache.spark.sql.util.ExecutionListenerManager

/**
* A class that holds all session-specific state in a given [[SparkSession]].
* If an `existingSessionState` is supplied, then its members will be copied over.
*/
private[sql] class SessionState(sparkSession: SparkSession) {
private[sql] class SessionState(
sparkSession: SparkSession,
existingSessionState: Option[SessionState]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: existingSessionState -> parentSessionState to indicate we should copy its internal states.


private[sql] def this(sparkSession: SparkSession) = {
this(sparkSession, None)
}

// Note: These are all lazy vals because they depend on each other (e.g. conf) and we
// want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs.

/**
* SQL-specific key-value configurations.
*/
lazy val conf: SQLConf = new SQLConf
lazy val conf: SQLConf = {
val result = new SQLConf
if (existingSessionState.nonEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

    existingSessionState.foreach(_.conf.getAllConfs.foreach {
      case (k, v) => if (v ne null) result.setConfString(k, v)
    })

existingSessionState.get.conf.getAllConfs.foreach {
case (k, v) => if (v ne null) result.setConfString(k, v)
}
}
result
}

def newHadoopConf(): Configuration = {
val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)
Expand All @@ -65,12 +80,29 @@ private[sql] class SessionState(sparkSession: SparkSession) {
hadoopConf
}

lazy val experimentalMethods = new ExperimentalMethods
lazy val experimentalMethods: ExperimentalMethods = {
existingSessionState
.map(_.experimentalMethods.clone)
.getOrElse(new ExperimentalMethods)
}

/**
* Internal catalog for managing functions registered by the user.
*/
lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy()
lazy val functionRegistry: FunctionRegistry = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to just add a copy method to FunctionRegistry to simplify these codes.

val registry = FunctionRegistry.builtin.copy()

if (existingSessionState.nonEmpty) {
val sourceRegistry = existingSessionState.get.functionRegistry
sourceRegistry
.listFunction()
.foreach(name => registry.registerFunction(
name,
sourceRegistry.lookupFunction(name).get,
sourceRegistry.lookupFunctionBuilder(name).get))
}
registry
}

/**
* A class for loading resources specified by a function.
Expand All @@ -93,14 +125,18 @@ private[sql] class SessionState(sparkSession: SparkSession) {
/**
* Internal catalog for managing table and database states.
*/
lazy val catalog = new SessionCatalog(
sparkSession.sharedState.externalCatalog,
sparkSession.sharedState.globalTempViewManager,
functionResourceLoader,
functionRegistry,
conf,
newHadoopConf(),
sqlParser)
lazy val catalog: SessionCatalog = {
existingSessionState
.map(_.catalog.clone)
.getOrElse(new SessionCatalog(
sparkSession.sharedState.externalCatalog,
sparkSession.sharedState.globalTempViewManager,
functionResourceLoader,
functionRegistry,
conf,
newHadoopConf(),
sqlParser))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra line.

/**
* Interface exposed to the user for registering user-defined functions.
Expand Down Expand Up @@ -165,6 +201,13 @@ private[sql] class SessionState(sparkSession: SparkSession) {
conf.setConfString(k, v)
}

/**
* Get an identical copy of the `SessionState`.
*/
override def clone: SessionState = {
new SessionState(sparkSession, Some(this))
}

// ------------------------------------------------------
// Helper methods, partially leftover from pre-2.0 days
// ------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.sql

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

/**
* Test cases for the builder pattern of [[SparkSession]].
Expand Down Expand Up @@ -123,4 +125,70 @@ class SparkSessionBuilderSuite extends SparkFunSuite {
session.stop()
}
}

test("fork new session and inherit a copy of the session state") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test doesnt test anything other than ne which is already covered by other tests

val activeSession = SparkSession.builder().master("local").getOrCreate()
val forkedSession = activeSession.newSession(inheritSessionState = true)

assert(forkedSession ne activeSession)
assert(forkedSession.sessionState ne activeSession.sessionState)

forkedSession.stop()
activeSession.stop()
}

test("fork new session and inherit sql config options") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each other these state, you have to test the following.

  • whether the state gets copied over and is usable
  • whether the original session is unaffected by changes to clones session, AND vice versa.

I think the last one is missing in these tests.

val activeSession = SparkSession
.builder()
.master("local")
.config("spark-configb", "b")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the shared state. You should use SparkSession.conf.set instead.

.getOrCreate()
val forkedSession = activeSession.newSession(inheritSessionState = true)

assert(forkedSession ne activeSession)
assert(forkedSession.conf ne activeSession.conf)
assert(forkedSession.conf.get("spark-configb") == "b")

forkedSession.stop()
activeSession.stop()
}

test("fork new session and inherit function registry and udf") {
val activeSession = SparkSession.builder().master("local").getOrCreate()
activeSession.udf.register("strlenScala", (_: String).length + (_: Int))
val forkedSession = activeSession.newSession(inheritSessionState = true)

assert(forkedSession ne activeSession)
assert(forkedSession.sessionState.functionRegistry ne
activeSession.sessionState.functionRegistry)
assert(forkedSession.sessionState.functionRegistry.lookupFunction("strlenScala").nonEmpty)

forkedSession.stop()
activeSession.stop()
}

test("fork new session and inherit experimental methods") {
object DummyRule1 extends Rule[LogicalPlan] {
def apply(p: LogicalPlan): LogicalPlan = p
}
object DummyRule2 extends Rule[LogicalPlan] {
def apply(p: LogicalPlan): LogicalPlan = p
}
val optimizations = List(DummyRule1, DummyRule2)

val activeSession = SparkSession.builder().master("local").getOrCreate()
activeSession.experimental.extraOptimizations = optimizations

val forkedSession = activeSession.newSession(inheritSessionState = true)

assert(forkedSession ne activeSession)
assert(forkedSession.experimental ne activeSession.experimental)
assert(forkedSession.experimental.extraOptimizations ne
activeSession.experimental.extraOptimizations)
assert(forkedSession.experimental.extraOptimizations.toSet ==
activeSession.experimental.extraOptimizations.toSet)

forkedSession.stop()
activeSession.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,21 @@ class CatalogSuite
}
}

test("clone SessionCatalog") {
// need to test tempTables are cloned
assert(spark.catalog.listTables().collect().isEmpty)

createTempTable("my_temp_table")
assert(spark.catalog.listTables().collect().map(_.name).toSet == Set("my_temp_table"))

val forkedSession = spark.newSession(inheritSessionState = true)
assert(forkedSession.catalog.listTables().collect().map(_.name).toSet == Set("my_temp_table"))

dropTable("my_temp_table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not clear which session this dropTable is using. can you make it explicit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

assert(spark.catalog.listTables().collect().map(_.name).toSet == Set())
assert(forkedSession.catalog.listTables().collect().map(_.name).toSet == Set("my_temp_table"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didnt test the other way round. changes in forked catalog should not reflect the original

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

}

// TODO: add tests for the rest of them
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this comment for? remove it if its just dead comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After that TODO was added, there have been many additions of tests.
This is a large interface, so the tests are scattered over this suite and GlobalTempViewSuite, CachedTableSuite, PartitionProviderCompatibilitySuite, (Hive)MetadataCacheSuite, DDLSuite, ParquetQuerySuite to name a few.
Though I do see at least one test for every method in the trait Catalog.
Checked with @cloud-fan in person, we should be fine with removing this TODO.


}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,11 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.internal.{SharedState, SQLConf}
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.util.{ShutdownHookManager, Utils}

Expand Down Expand Up @@ -115,16 +113,22 @@ class TestHiveContext(
private[hive] class TestHiveSparkSession(
@transient private val sc: SparkContext,
@transient private val existingSharedState: Option[SharedState],
existingSessionState: Option[SessionState],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you don't need to change this file?

private val loadTestTables: Boolean)
extends SparkSession(sc) with Logging { self =>

def this(sc: SparkContext, loadTestTables: Boolean) {
this(
sc,
existingSharedState = None,
existingSessionState = None,
loadTestTables)
}

def this(sc: SparkContext, existingSharedState: Option[SharedState], loadTestTables: Boolean) {
this(sc, existingSharedState, existingSessionState = None, loadTestTables)
}

{ // set the metastore temporary configuration
val metastoreTempConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false) ++ Map(
ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
Expand All @@ -151,7 +155,7 @@ private[hive] class TestHiveSparkSession(
new TestHiveSessionState(self)

override def newSession(): TestHiveSparkSession = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can change it to override def newSession(inheritSessionState: Boolean) instead

new TestHiveSparkSession(sc, Some(sharedState), loadTestTables)
new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables)
}

private var cacheTables: Boolean = false
Expand Down