From df6f029656c12c5d2f261da94be18162928aa0c3 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 16 Sep 2015 13:51:14 -0700 Subject: [PATCH 1/9] Fixed job and job description for streaming jobs --- .../org/apache/spark/util/ThreadUtils.scala | 39 +++++++++++++++++++ .../apache/spark/util/ThreadUtilsSuite.scala | 23 ++++++++++- .../spark/streaming/StreamingContext.scala | 13 +++++-- .../streaming/StreamingContextSuite.scala | 33 ++++++++++++++++ 4 files changed, 104 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index ca5624a3d8b3..60ff1aaa0f20 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -21,6 +21,7 @@ package org.apache.spark.util import java.util.concurrent._ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} +import scala.util.control.NonFatal import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} @@ -86,4 +87,42 @@ private[spark] object ThreadUtils { val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build() Executors.newSingleThreadScheduledExecutor(threadFactory) } + + def runInNewThread[T]( + threadName: String, + isDaemon: Boolean = true, + timeoutMillis: Long = 0)(body: => T): T = { + @volatile var exception: Option[Throwable] = None + @volatile var result: T = null.asInstanceOf[T] + + val thread = new Thread(threadName) { + + override def run(): Unit = { + try { + result = body + } catch { + case NonFatal(e) => + exception = Some(e) + } + } + } + thread.setDaemon(isDaemon) + thread.start() + thread.join(timeoutMillis) + + exception match { + case Some(realException) => + val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile( + ! _.getClassName.contains(this.getClass.getSimpleName)).drop(1) + val extraStackTrace = realException.getStackTrace.takeWhile( + ! _.getClassName.contains(this.getClass.getSimpleName)) + val placeHolderStackElem = new StackTraceElement( + s"... run in separate thread by ${ThreadUtils.getClass.getName} ..", " ", "", -1) + val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace + realException.setStackTrace(finalStackTrace) + throw realException + case None => + result + } + } } diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index 8c51e6b14b7f..01ef8bf08b6a 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -20,8 +20,8 @@ package org.apache.spark.util import java.util.concurrent.{CountDownLatch, TimeUnit} -import scala.concurrent.{Await, Future} import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} import org.apache.spark.SparkFunSuite @@ -66,4 +66,25 @@ class ThreadUtilsSuite extends SparkFunSuite { val futureThreadName = Await.result(f, 10.seconds) assert(futureThreadName === callerThreadName) } + + test("runInNewThread") { + import ThreadUtils._ + assert(runInNewThread("thread-name") { Thread.currentThread().getName } === "thread-name") + assert(runInNewThread("thread-name") { Thread.currentThread().isDaemon } === true) + assert( + runInNewThread("thread-name", isDaemon = false) { Thread.currentThread().isDaemon } === false + ) + val exception = intercept[Exception] { + runInNewThread("thread-name") { throw new IllegalArgumentException("test") } + } + assert(exception.isInstanceOf[IllegalArgumentException]) + assert(exception.asInstanceOf[IllegalArgumentException].getMessage.contains("test")) + assert(exception.getStackTrace.mkString("\n").contains( + "... run in separate thread by org.apache.spark.util.ThreadUtils$ ...") === true, + "stack trace does not contain expected place holder" + ) + assert(exception.getStackTrace.mkString("\n").contains("ThreadUtils.scala") === false, + "stack trace contains unexpected references to ThreadUtils" + ) + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index b496d1f341a0..30f3bd8f9fc1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver} import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} -import org.apache.spark.util.{CallSite, ShutdownHookManager, Utils} +import org.apache.spark.util.{ThreadUtils, CallSite, ShutdownHookManager} /** * Main entry point for Spark Streaming functionality. It provides methods used to create @@ -588,12 +588,16 @@ class StreamingContext private[streaming] ( state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) - sparkContext.setCallSite(startSite.get) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate() - scheduler.start() + ThreadUtils.runInNewThread("streaming-start") { + sparkContext.setCallSite(startSite.get) + sparkContext.setJobGroup( + StreamingContext.STREAMING_JOB_GROUP_ID, StreamingContext.STREAMING_JOB_DESCRIPTION, false) + scheduler.start() + } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => @@ -618,6 +622,7 @@ class StreamingContext private[streaming] ( } } + /** * Wait for the execution to stop. Any exceptions that occurs during the execution * will be thrown in this thread. @@ -719,6 +724,8 @@ class StreamingContext private[streaming] ( object StreamingContext extends Logging { + private[streaming] val STREAMING_JOB_GROUP_ID = "streaming" + private[streaming] val STREAMING_JOB_DESCRIPTION = "streaming" /** * Lock that guards activation of a StreamingContext as well as access to the singleton active * StreamingContext in getActiveOrCreate(). diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index d26894e88fc2..d96f3b2ff9db 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -180,6 +180,39 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo assert(ssc.scheduler.isStarted === false) } + test("start should set job group correctly") { + ssc = new StreamingContext(conf, batchDuration) + ssc.sc.setJobGroup("non-streaming", "non-streaming", true) + val sc = ssc.sc + + @volatile var jobGroupFound: String = null + @volatile var jobDescFound: String = null + @volatile var jobInterruptFound: String = null + @volatile var allFound: Boolean = false + + addInputStream(ssc).foreachRDD { rdd => + jobGroupFound = sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) + jobDescFound = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) + jobInterruptFound = sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) + println("Found") + allFound = true + } + ssc.start() + + eventually(timeout(5 seconds), interval(10 milliseconds)) { + assert(allFound === true) + } + + // Verify streaming jobs have expected thread-local properties + assert(jobGroupFound === StreamingContext.STREAMING_JOB_GROUP_ID) + assert(jobDescFound === StreamingContext.STREAMING_JOB_DESCRIPTION) + assert(jobInterruptFound === "false") + + // Verify current thread's thread-local properties have not changed + assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "non-streaming") + assert(sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) === "non-streaming") + assert(sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) === "true") + } test("start multiple times") { ssc = new StreamingContext(master, appName, batchDuration) From 5b6ab2886816a56cf2f27b909e2776e8d5d168c9 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 16 Sep 2015 14:59:49 -0700 Subject: [PATCH 2/9] Addressed scala style issue --- .../apache/spark/streaming/StreamingContext.scala | 12 ++++++------ .../spark/streaming/StreamingContextSuite.scala | 1 - 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 30f3bd8f9fc1..461652b7eec0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -585,17 +585,17 @@ class StreamingContext private[streaming] ( * @throws IllegalStateException if the StreamingContext is already stopped. */ def start(): Unit = synchronized { + import StreamingContext._ state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) - StreamingContext.ACTIVATION_LOCK.synchronized { - StreamingContext.assertNoOtherContextIsActive() + ACTIVATION_LOCK.synchronized { + assertNoOtherContextIsActive() try { validate() ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) - sparkContext.setJobGroup( - StreamingContext.STREAMING_JOB_GROUP_ID, StreamingContext.STREAMING_JOB_DESCRIPTION, false) + sparkContext.setJobGroup(STREAMING_JOB_GROUP_ID, STREAMING_JOB_DESCRIPTION, false) scheduler.start() } state = StreamingContextState.ACTIVE @@ -606,10 +606,10 @@ class StreamingContext private[streaming] ( state = StreamingContextState.STOPPED throw e } - StreamingContext.setActiveContext(this) + setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( - StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) + SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index d96f3b2ff9db..115411859780 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -194,7 +194,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo jobGroupFound = sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) jobDescFound = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) jobInterruptFound = sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) - println("Found") allFound = true } ssc.start() From 05536648191251d08e52af64a27253c86493ac05 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 16 Sep 2015 15:15:55 -0700 Subject: [PATCH 3/9] Added docs on runInNewThread --- .../org/apache/spark/util/ThreadUtils.scala | 29 +++++++++++++++---- .../apache/spark/util/ThreadUtilsSuite.scala | 2 +- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 60ff1aaa0f20..fa3830ede7f4 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -88,15 +88,24 @@ private[spark] object ThreadUtils { Executors.newSingleThreadScheduledExecutor(threadFactory) } + /** + * Run a piece of code in a new thread, and the get result. Exception in the new thread is + * thrown in the caller thread with an adjusted stack trace that removes references to this + * method for clarity. The exception stack traces will be like the following + * + * SomeException: exception-message + * at CallerClass.body-method (sourcefile.scala) + * at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () + * at CallerClass.caller-method (sourcefile.scala) + * ... + */ def runInNewThread[T]( threadName: String, - isDaemon: Boolean = true, - timeoutMillis: Long = 0)(body: => T): T = { + isDaemon: Boolean = true)(body: => T): T = { @volatile var exception: Option[Throwable] = None @volatile var result: T = null.asInstanceOf[T] val thread = new Thread(threadName) { - override def run(): Unit = { try { result = body @@ -108,18 +117,28 @@ private[spark] object ThreadUtils { } thread.setDaemon(isDaemon) thread.start() - thread.join(timeoutMillis) + thread.join() exception match { case Some(realException) => + // Remove the part of the stack that shows method calls into this helper method val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile( ! _.getClassName.contains(this.getClass.getSimpleName)).drop(1) + + // Remove the part of the new thread stack that shows methods call from this helper method val extraStackTrace = realException.getStackTrace.takeWhile( ! _.getClassName.contains(this.getClass.getSimpleName)) + + // Combine the two stack traces, with a place holder just specifying that there + // was a helper method used, without any further details of the helper val placeHolderStackElem = new StackTraceElement( - s"... run in separate thread by ${ThreadUtils.getClass.getName} ..", " ", "", -1) + s"... run in separate thread using ${ThreadUtils.getClass.getName.stripSuffix("$")} ..", + " ", "", -1) val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace + + // Update the stack trace and rethrow the exception in the caller thread realException.setStackTrace(finalStackTrace) + println(realException) throw realException case None => result diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index 01ef8bf08b6a..85d3cb32a574 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -80,7 +80,7 @@ class ThreadUtilsSuite extends SparkFunSuite { assert(exception.isInstanceOf[IllegalArgumentException]) assert(exception.asInstanceOf[IllegalArgumentException].getMessage.contains("test")) assert(exception.getStackTrace.mkString("\n").contains( - "... run in separate thread by org.apache.spark.util.ThreadUtils$ ...") === true, + "... run in separate thread using org.apache.spark.util.ThreadUtils ...") === true, "stack trace does not contain expected place holder" ) assert(exception.getStackTrace.mkString("\n").contains("ThreadUtils.scala") === false, From 1b347d763a9b42ffa76efaf5ef58e8147fae2942 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 16 Sep 2015 15:27:31 -0700 Subject: [PATCH 4/9] Fixed compilation error --- .../apache/spark/streaming/StreamingContext.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 461652b7eec0..1d33a437e87f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -585,15 +585,15 @@ class StreamingContext private[streaming] ( * @throws IllegalStateException if the StreamingContext is already stopped. */ def start(): Unit = synchronized { - import StreamingContext._ state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) - ACTIVATION_LOCK.synchronized { - assertNoOtherContextIsActive() + StreamingContext.ACTIVATION_LOCK.synchronized { + StreamingContext.assertNoOtherContextIsActive() try { validate() ThreadUtils.runInNewThread("streaming-start") { + import StreamingContext._ sparkContext.setCallSite(startSite.get) sparkContext.setJobGroup(STREAMING_JOB_GROUP_ID, STREAMING_JOB_DESCRIPTION, false) scheduler.start() @@ -606,15 +606,15 @@ class StreamingContext private[streaming] ( state = StreamingContextState.STOPPED throw e } - setActiveContext(this) + StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( - SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) + StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) - logInfo("StreamingContext started") + this.logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => From c4534fdec6f2aede9c82587771479121f4a62941 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 16 Sep 2015 15:28:09 -0700 Subject: [PATCH 5/9] style fix --- core/src/main/scala/org/apache/spark/util/ThreadUtils.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index fa3830ede7f4..7a985dfb9dc0 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -95,7 +95,7 @@ private[spark] object ThreadUtils { * * SomeException: exception-message * at CallerClass.body-method (sourcefile.scala) - * at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () + * at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () * at CallerClass.caller-method (sourcefile.scala) * ... */ @@ -138,7 +138,6 @@ private[spark] object ThreadUtils { // Update the stack trace and rethrow the exception in the caller thread realException.setStackTrace(finalStackTrace) - println(realException) throw realException case None => result From 2afc50e586bc9050636292aa5394248e2d4701b9 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 17 Sep 2015 03:04:21 -0700 Subject: [PATCH 6/9] Changed to suit the current master --- .../org/apache/spark/streaming/StreamingContext.scala | 9 ++++----- .../apache/spark/streaming/StreamingContextSuite.scala | 10 +++++----- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 1d33a437e87f..207bbb97f121 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver} import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} -import org.apache.spark.util.{ThreadUtils, CallSite, ShutdownHookManager} +import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils} /** * Main entry point for Spark Streaming functionality. It provides methods used to create @@ -593,9 +593,10 @@ class StreamingContext private[streaming] ( try { validate() ThreadUtils.runInNewThread("streaming-start") { - import StreamingContext._ sparkContext.setCallSite(startSite.get) - sparkContext.setJobGroup(STREAMING_JOB_GROUP_ID, STREAMING_JOB_DESCRIPTION, false) + sparkContext.setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null) + sparkContext.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null) + sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") scheduler.start() } state = StreamingContextState.ACTIVE @@ -724,8 +725,6 @@ class StreamingContext private[streaming] ( object StreamingContext extends Logging { - private[streaming] val STREAMING_JOB_GROUP_ID = "streaming" - private[streaming] val STREAMING_JOB_DESCRIPTION = "streaming" /** * Lock that guards activation of a StreamingContext as well as access to the singleton active * StreamingContext in getActiveOrCreate(). diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 115411859780..da6ffc50e9d3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -185,9 +185,9 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ssc.sc.setJobGroup("non-streaming", "non-streaming", true) val sc = ssc.sc - @volatile var jobGroupFound: String = null - @volatile var jobDescFound: String = null - @volatile var jobInterruptFound: String = null + @volatile var jobGroupFound: String = "" + @volatile var jobDescFound: String = "" + @volatile var jobInterruptFound: String = "" @volatile var allFound: Boolean = false addInputStream(ssc).foreachRDD { rdd => @@ -203,8 +203,8 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo } // Verify streaming jobs have expected thread-local properties - assert(jobGroupFound === StreamingContext.STREAMING_JOB_GROUP_ID) - assert(jobDescFound === StreamingContext.STREAMING_JOB_DESCRIPTION) + assert(jobGroupFound === null) + assert(jobDescFound === null) assert(jobInterruptFound === "false") // Verify current thread's thread-local properties have not changed From d8600cfec0e39f17102823ef3dd5d2ce28eb86d8 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 18 Sep 2015 03:37:09 -0700 Subject: [PATCH 7/9] Addressed comments on PR --- core/src/main/scala/org/apache/spark/util/ThreadUtils.scala | 2 ++ .../test/scala/org/apache/spark/util/ThreadUtilsSuite.scala | 3 +-- .../scala/org/apache/spark/streaming/StreamingContext.scala | 3 +-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 7a985dfb9dc0..ad61db59e1a4 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -122,6 +122,8 @@ private[spark] object ThreadUtils { exception match { case Some(realException) => // Remove the part of the stack that shows method calls into this helper method + // This means drop everything from the top until the stack element + // ThreadUtils.runInNewThread(), and then drop that as well (hence the `drop(1)`). val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile( ! _.getClassName.contains(this.getClass.getSimpleName)).drop(1) diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index 85d3cb32a574..aeffac1d2df0 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -74,10 +74,9 @@ class ThreadUtilsSuite extends SparkFunSuite { assert( runInNewThread("thread-name", isDaemon = false) { Thread.currentThread().isDaemon } === false ) - val exception = intercept[Exception] { + val exception = intercept[IllegalArgumentException] { runInNewThread("thread-name") { throw new IllegalArgumentException("test") } } - assert(exception.isInstanceOf[IllegalArgumentException]) assert(exception.asInstanceOf[IllegalArgumentException].getMessage.contains("test")) assert(exception.getStackTrace.mkString("\n").contains( "... run in separate thread using org.apache.spark.util.ThreadUtils ...") === true, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 207bbb97f121..965a6d88ecae 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -594,8 +594,7 @@ class StreamingContext private[streaming] ( validate() ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) - sparkContext.setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null) - sparkContext.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null) + sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") scheduler.start() } From 7550490fd76f32ded7d8f855c71b4fb6d4eac68e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 21 Sep 2015 10:50:18 -0700 Subject: [PATCH 8/9] Addressed PR comments --- core/src/main/scala/org/apache/spark/util/ThreadUtils.scala | 2 +- .../test/scala/org/apache/spark/util/ThreadUtilsSuite.scala | 6 ++++-- .../org/apache/spark/streaming/StreamingContextSuite.scala | 4 ++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index ad61db59e1a4..22e291a2b48d 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -89,7 +89,7 @@ private[spark] object ThreadUtils { } /** - * Run a piece of code in a new thread, and the get result. Exception in the new thread is + * Run a piece of code in a new thread and return the result. Exception in the new thread is * thrown in the caller thread with an adjusted stack trace that removes references to this * method for clarity. The exception stack traces will be like the following * diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index aeffac1d2df0..620e4debf4e0 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -22,6 +22,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.concurrent.duration._ import scala.concurrent.{Await, Future} +import scala.util.Random import org.apache.spark.SparkFunSuite @@ -74,10 +75,11 @@ class ThreadUtilsSuite extends SparkFunSuite { assert( runInNewThread("thread-name", isDaemon = false) { Thread.currentThread().isDaemon } === false ) + val uniqueExceptionMessage = "test" + Random.nextInt() val exception = intercept[IllegalArgumentException] { - runInNewThread("thread-name") { throw new IllegalArgumentException("test") } + runInNewThread("thread-name") { throw new IllegalArgumentException(uniqueExceptionMessage) } } - assert(exception.asInstanceOf[IllegalArgumentException].getMessage.contains("test")) + assert(exception.asInstanceOf[IllegalArgumentException].getMessage === uniqueExceptionMessage) assert(exception.getStackTrace.mkString("\n").contains( "... run in separate thread using org.apache.spark.util.ThreadUtils ...") === true, "stack trace does not contain expected place holder" diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index da6ffc50e9d3..3b9d0d15ea04 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -180,7 +180,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo assert(ssc.scheduler.isStarted === false) } - test("start should set job group correctly") { + test("start should set job group and description of streaming jobs correctly") { ssc = new StreamingContext(conf, batchDuration) ssc.sc.setJobGroup("non-streaming", "non-streaming", true) val sc = ssc.sc @@ -198,7 +198,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo } ssc.start() - eventually(timeout(5 seconds), interval(10 milliseconds)) { + eventually(timeout(10 seconds), interval(10 milliseconds)) { assert(allFound === true) } From f84f479c4802a70ee0534abe75ea5dcd3bf118cb Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 21 Sep 2015 13:34:35 -0700 Subject: [PATCH 9/9] Added comment --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 965a6d88ecae..6720ba4f72cf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -592,6 +592,10 @@ class StreamingContext private[streaming] ( StreamingContext.assertNoOtherContextIsActive() try { validate() + + // Start the streaming scheduler in a new thread, so that thread local properties + // like call sites and job groups can be reset without affecting those of the + // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() @@ -614,7 +618,7 @@ class StreamingContext private[streaming] ( assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) - this.logInfo("StreamingContext started") + logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED =>