@@ -40,6 +40,7 @@ import org.apache.spark.streaming.dstream._
4040import org .apache .spark .streaming .receiver .{ActorReceiver , ActorSupervisorStrategy , Receiver }
4141import org .apache .spark .streaming .scheduler .{JobScheduler , StreamingListener }
4242import org .apache .spark .streaming .ui .{StreamingJobProgressListener , StreamingTab }
43+ import org .apache .spark .util .CallSite
4344
4445/**
4546 * Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -203,6 +204,8 @@ class StreamingContext private[streaming] (
203204 import StreamingContextState ._
204205 private [streaming] var state = Initialized
205206
207+ private val startSite = new AtomicReference [CallSite ](null )
208+
206209 /**
207210 * Return the associated Spark context
208211 */
@@ -527,7 +530,8 @@ class StreamingContext private[streaming] (
527530 throw new SparkException (" StreamingContext has already been stopped" )
528531 }
529532 validate()
530- sparkContext.setCallSite(DStream .getCreationSite())
533+ startSite.set(DStream .getCreationSite())
534+ sparkContext.setCallSite(startSite.get)
531535 ACTIVATION_LOCK .synchronized {
532536 assertNoOtherContextIsActive()
533537 scheduler.start()
@@ -625,7 +629,10 @@ object StreamingContext extends Logging {
625629 private def assertNoOtherContextIsActive (): Unit = {
626630 ACTIVATION_LOCK .synchronized {
627631 if (activeContext.get() != null ) {
628- throw new SparkException (" Only one StreamingContext may be started in this JVM" )
632+ throw new SparkException (
633+ " Only one StreamingContext may be started in this JVM. " +
634+ " Currently running StreamingContext was started at" +
635+ activeContext.get.startSite.get.longForm)
629636 }
630637 }
631638 }
0 commit comments