Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class SparkSession private(
@Unstable
@transient
lazy val sharedState: SharedState = {
existingSharedState.getOrElse(new SharedState(sparkContext, initialSessionOptions))
existingSharedState.getOrElse(SharedState.get(sparkContext, initialSessionOptions))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,19 @@ private[sql] class SharedState(

object SharedState extends Logging {
@volatile private var fsUrlStreamHandlerFactoryInitialized = false
@volatile private var _sharedState: SharedState = _

def get(sparkContext: SparkContext, initialConfigs: scala.collection.Map[String, String])
: SharedState = {
if (_sharedState == null) {
synchronized {
if (_sharedState == null) {
_sharedState = new SharedState(sparkContext, initialConfigs)
}
}
}
_sharedState
}

private def setFsUrlStreamHandlerFactory(conf: SparkConf, hadoopConf: Configuration): Unit = {
if (!fsUrlStreamHandlerFactoryInitialized &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import scala.collection.JavaConverters._

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
Expand Down Expand Up @@ -196,6 +198,71 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(postFirstCreation == postSecondCreation)
}

test("SPARK-32165: SparkContext only register one SQLAppStatusListener") {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test-register-one-SQLAppStatusListener")
val context = new SparkContext(conf)
SparkSession
.builder()
.sparkContext(context)
.master("local")
.getOrCreate()
// Touches the sessionState so it initiate SQLAppStatusListener and ExecutionListenerBus
.sessionState

assert(countListener("SQLAppStatusListener", context) === 1)
assert(countListener("ExecutionListenerBus", context) === 1)
val postFirstCreation = context.listenerBus.listeners.size()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

SparkSession
.builder()
.sparkContext(context)
.master("local")
.getOrCreate()
.sessionState
assert(countListener("SQLAppStatusListener", context) === 1)
assert(countListener("ExecutionListenerBus", context) === 2)
val postSecondCreation = context.listenerBus.listeners.size()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
// Minus 1 because the listener `ExecutionListenerBus` is created per SparkSession
assert(postFirstCreation == postSecondCreation - 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it duplicated with checking assert(countListener("ExecutionListenerBus", context) === 2)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Here we want to ensure other potential listeners won't leak across SparkSessions.

context.stop()
}

test("SPARK-32165: Ensure only initiates SharedState once") {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test-initiates-one-shared-state")
val context = new SparkContext(conf)
val sharedState1 = SparkSession
.builder()
.sparkContext(context)
.master("local")
.getOrCreate()
.sharedState

SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

val sharedState2 = SparkSession
.builder()
.sparkContext(context)
.master("local")
.getOrCreate()
.sharedState

assert(sharedState1.eq(sharedState2))
}

private def countListener(listener: String, context: SparkContext): Int = {
val listeners = context.listenerBus.listeners.asScala
listeners.count(_.getClass.getSimpleName === listener)
}

test("SPARK-31532: should not propagate static sql configs to the existing" +
" active/default SparkSession") {
val session = SparkSession.builder()
Expand Down