Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.spark.util
import java.util.concurrent._

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.control.NonFatal

import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}

Expand Down Expand Up @@ -86,4 +87,42 @@ private[spark] object ThreadUtils {
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
Executors.newSingleThreadScheduledExecutor(threadFactory)
}

def runInNewThread[T](
threadName: String,
isDaemon: Boolean = true,
timeoutMillis: Long = 0)(body: => T): T = {
@volatile var exception: Option[Throwable] = None
@volatile var result: T = null.asInstanceOf[T]

val thread = new Thread(threadName) {

override def run(): Unit = {
try {
result = body
} catch {
case NonFatal(e) =>
exception = Some(e)
}
}
}
thread.setDaemon(isDaemon)
thread.start()
thread.join(timeoutMillis)

exception match {
case Some(realException) =>
val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile(
! _.getClassName.contains(this.getClass.getSimpleName)).drop(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you add a comment about the magic number 1, such as remove "java.lang.Thread.getStackTrace"?

val extraStackTrace = realException.getStackTrace.takeWhile(
! _.getClassName.contains(this.getClass.getSimpleName))
val placeHolderStackElem = new StackTraceElement(
s"... run in separate thread by ${ThreadUtils.getClass.getName} ..", " ", "", -1)
val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace
realException.setStackTrace(finalStackTrace)
throw realException
case None =>
result
}
}
}
23 changes: 22 additions & 1 deletion core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.util

import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

import org.apache.spark.SparkFunSuite

Expand Down Expand Up @@ -66,4 +66,25 @@ class ThreadUtilsSuite extends SparkFunSuite {
val futureThreadName = Await.result(f, 10.seconds)
assert(futureThreadName === callerThreadName)
}

test("runInNewThread") {
import ThreadUtils._
assert(runInNewThread("thread-name") { Thread.currentThread().getName } === "thread-name")
assert(runInNewThread("thread-name") { Thread.currentThread().isDaemon } === true)
assert(
runInNewThread("thread-name", isDaemon = false) { Thread.currentThread().isDaemon } === false
)
val exception = intercept[Exception] {
runInNewThread("thread-name") { throw new IllegalArgumentException("test") }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some kind of more unique identifier here? I wouldn't be surprised if elsewhere we have test in the stack trace. Maybe just do "test-" + System.currentTimeMillis

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

}
assert(exception.isInstanceOf[IllegalArgumentException])
assert(exception.asInstanceOf[IllegalArgumentException].getMessage.contains("test"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can update these 5 lines to:

    val exception = intercept[IllegalArgumentException] {
      runInNewThread("thread-name") { throw new IllegalArgumentException("test") }
    }
    assert(exception.getMessage.contains("test"))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true!

assert(exception.getStackTrace.mkString("\n").contains(
"... run in separate thread by org.apache.spark.util.ThreadUtils$ ...") === true,
"stack trace does not contain expected place holder"
)
assert(exception.getStackTrace.mkString("\n").contains("ThreadUtils.scala") === false,
"stack trace contains unexpected references to ThreadUtils"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.{CallSite, ShutdownHookManager, Utils}
import org.apache.spark.util.{ThreadUtils, CallSite, ShutdownHookManager}

/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
Expand Down Expand Up @@ -588,12 +588,16 @@ class StreamingContext private[streaming] (
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
sparkContext.setCallSite(startSite.get)
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
scheduler.start()
ThreadUtils.runInNewThread("streaming-start") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this block definitely requires a paragraph comment. Someone new to the code will have no idea why we need to do these things in a new thread.

sparkContext.setCallSite(startSite.get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why we need to move this in here? It's an atomic reference so it doesn't really matter which thread reads it right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this sets a thread local variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an inheritable thread local variable, so it still doesn't matter

(anyway we can just keep this change, not a big deal, mainly just wondering)

sparkContext.setJobGroup(
StreamingContext.STREAMING_JOB_GROUP_ID, StreamingContext.STREAMING_JOB_DESCRIPTION, false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am forced to set a specific job group and desc, because there is not way to remove them (because of default Property in the local property).

scheduler.start()
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
Expand All @@ -618,6 +622,7 @@ class StreamingContext private[streaming] (
}
}


/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
Expand Down Expand Up @@ -719,6 +724,8 @@ class StreamingContext private[streaming] (

object StreamingContext extends Logging {

private[streaming] val STREAMING_JOB_GROUP_ID = "streaming"
private[streaming] val STREAMING_JOB_DESCRIPTION = "streaming"
/**
* Lock that guards activation of a StreamingContext as well as access to the singleton active
* StreamingContext in getActiveOrCreate().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,39 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
assert(ssc.scheduler.isStarted === false)
}

test("start should set job group correctly") {
ssc = new StreamingContext(conf, batchDuration)
ssc.sc.setJobGroup("non-streaming", "non-streaming", true)
val sc = ssc.sc

@volatile var jobGroupFound: String = null
@volatile var jobDescFound: String = null
@volatile var jobInterruptFound: String = null
@volatile var allFound: Boolean = false

addInputStream(ssc).foreachRDD { rdd =>
jobGroupFound = sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID)
jobDescFound = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
jobInterruptFound = sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
println("Found")
allFound = true
}
ssc.start()

eventually(timeout(5 seconds), interval(10 milliseconds)) {
assert(allFound === true)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to use semaphores here or some kind of locking to make the test more robust? I'm worried that it could become another flaky test that we have to fix eventually anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can volatile lead to flakiness?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if for some reason we don't execute foreachRDD within the time frame

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well you have to set some time limit no matter what method you use to wait, isnt it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily. If it hangs forever Jenkins will time it out anyway. In general I'm not a huge fan of eventually's because these are usually the ones we have to end up fixing later. If you guess a timeout wrong then you'll have to try a slightly higher one after breaking many builds.

In this case it's probably OK to keep this as is, but I'm just saying I would have written the test differently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, in both cases, one has to go and fix the test :) Anyways not going into the argument of eventually vs block permanently. I think its better to use eventually with conservative timeouts, than block completely. I am fairly certain that this test is fine. Other tests in this suite also uses same eventually and they have not been flaky (the few times StreamingContextSuite has been flaky, they were for one test which had a real bug, not for eventually). I can make the timeout even more conservative, no harm in that.


// Verify streaming jobs have expected thread-local properties
assert(jobGroupFound === StreamingContext.STREAMING_JOB_GROUP_ID)
assert(jobDescFound === StreamingContext.STREAMING_JOB_DESCRIPTION)
assert(jobInterruptFound === "false")

// Verify current thread's thread-local properties have not changed
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "non-streaming")
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) === "non-streaming")
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) === "true")
}

test("start multiple times") {
ssc = new StreamingContext(master, appName, batchDuration)
Expand Down