Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,74 @@

package org.apache.spark.streaming

import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.GuardedBy

private[streaming] class ContextWaiter {

private val lock = new ReentrantLock()
private val condition = lock.newCondition()

@GuardedBy("lock")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor point - these are not in the JDK but in a Findbugs library for JSR-305. It's not used in Spark, and happens to be a dependency now. Maybe not worth using it just 1 place?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not worth using it just 1 place?

So which one do you prefer?

  1. Use comments to describe such information.
  2. Use GuardedBy from now on.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, now Findbugs does not recognize GuardedBy in Scala codes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I turned to GuardedBy because @aarondav asked me to do it in #3634

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's why I brought it up. It's not actually a standard Java annotation (unless someone tells me it just turned up in 8 or something) but part of JSR-305. This is a dependency of Spark core at the moment, but none of the annotations are used. I think we should just not use them instead of using this lib in 1 place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I changed them to comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I'm a fan of the FindBugs annotations. I've had trouble getting the various analysis tools to work well with them in Scala, though. +1 to this commenting convention, though; this is very helpful.

private var error: Throwable = null

@GuardedBy("lock")
private var stopped: Boolean = false

def notifyError(e: Throwable) = synchronized {
error = e
notifyAll()
def notifyError(e: Throwable) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, but mind adding : Unit here?

lock.lock()
try {
error = e
condition.signalAll()
} finally {
lock.unlock()
}
}

def notifyStop() = synchronized {
stopped = true
notifyAll()
def notifyStop() = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: could this have an explicit : Unit type?

lock.lock()
try {
stopped = true
condition.signalAll()
} finally {
lock.unlock()
}
}

def waitForStopOrError(timeout: Long = -1) = synchronized {
// If already had error, then throw it
if (error != null) {
throw error
}
/**
* Return `true` if it's stopped; or throw the reported error if `notifyError` has been called; or
* `false` if the waiting time detectably elapsed before return from the method.
*/
def waitForStopOrError(timeout: Long = -1): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered whether this should throw TimeoutException instead of signaling failure via a boolean, but I guess this is sort of like Object.wait() or Condition.wait(), both of which don't throw exceptions.

Also, it looks like the usages of this in StreamingContext support this theory:

  /**
   * Wait for the execution to stop. Any exceptions that occurs during the execution
   * will be thrown in this thread.
   */
  def awaitTermination() {
    waiter.waitForStopOrError()
  }

  /**
   * Wait for the execution to stop. Any exceptions that occurs during the execution
   * will be thrown in this thread.
   * @param timeout time to wait in milliseconds
   */
  def awaitTermination(timeout: Long) {
    waiter.waitForStopOrError(timeout)
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose the documentation for the SparkContext.awaitTermination(timeout) could be improved to convey what happens when the timeout occurs...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think if it's better to return Boolean to indicate if it's timeout? Although it will break the source compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option would be to just add a new def isTerminated: Boolean method, which would let users write code like

waiter.awaitTermination(1000)
if (!waiter.isTerminated) {
  throw Exception(...)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Done.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making awaitTermination throw a TimeoutException if timeout? It looks a better API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that will break a lot of existing programs for people. A lot of time (even in our own test cases) awaitTermination(timeout) is used for wait for a short period of time before check checking status or something. Currently that times out return quietly. If instead it starts throwing exceptions, then it will completely break some applications. So I strongly advise against changing this behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hindsight, instead of modeling awaitTermination against Akka ActorSystem's awaitTermination (which return Unit) , I should have modeled it like Java ExecutorService's awaitTermination which returns a Boolean. Now its not possible to change the API without breaking compatiblity. :(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hindsight, instead of modeling awaitTermination against Akka ActorSystem's awaitTermination (which return Unit) , I should have modeled it like Java ExecutorService's awaitTermination which returns a Boolean. Now its not possible to change the API without breaking compatiblity. :(

@tdas, Sorry that I forgot to reply you. You said you designed it just like Akka ActorSystem.awaitTermination. But ActorSystem.awaitTermination will throw a TimeoutException in case of timeout.

  /**
   * Block current thread until the system has been shutdown, or the specified
   * timeout has elapsed. This will block until after all on termination
   * callbacks have been run.
   *
   * @throws TimeoutException in case of timeout
   */
  def awaitTermination(timeout: Duration): Unit

lock.lock()
try {
if (timeout < 0) {
while (true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's just me but it feels like these loops would be simpler just testing while (!stopped && error == null)? nanos would be tested in the other one too. This avoids duplication, and also avoids the unreachable return value, because you check these conditions in one place at the end.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's cleaner now.

// If already stopped, then exit
if (stopped) return true
// If already had error, then throw it
if (error != null) throw error

condition.await()
}
} else {
var nanos = TimeUnit.MILLISECONDS.toNanos(timeout)
while (true) {
// If already stopped, then exit
if (stopped) return true
// If already had error, then throw it
if (error != null) throw error
// If no time remains, then exit
if (nanos <= 0) return false

// If not already stopped, then wait
if (!stopped) {
if (timeout < 0) wait() else wait(timeout)
if (error != null) throw error
nanos = condition.awaitNanos(nanos)
}
}
// Never reached. Make the compiler happy.
true
} finally {
lock.unlock()
}
}
}