Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,63 @@

package org.apache.spark.streaming

import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock

private[streaming] class ContextWaiter {

private val lock = new ReentrantLock()
private val condition = lock.newCondition()

// Guarded by "lock"
private var error: Throwable = null
private var stopped: Boolean = false

def notifyError(e: Throwable) = synchronized {
error = e
notifyAll()
}
// Guarded by "lock"
private var stopped: Boolean = false

def notifyStop() = synchronized {
stopped = true
notifyAll()
def notifyError(e: Throwable): Unit = {
lock.lock()
try {
error = e
condition.signalAll()
} finally {
lock.unlock()
}
}

def waitForStopOrError(timeout: Long = -1) = synchronized {
// If already had error, then throw it
if (error != null) {
throw error
def notifyStop(): Unit = {
lock.lock()
try {
stopped = true
condition.signalAll()
} finally {
lock.unlock()
}
}

// If not already stopped, then wait
if (!stopped) {
if (timeout < 0) wait() else wait(timeout)
/**
* Return `true` if it's stopped; or throw the reported error if `notifyError` has been called; or
* `false` if the waiting time detectably elapsed before return from the method.
*/
def waitForStopOrError(timeout: Long = -1): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered whether this should throw TimeoutException instead of signaling failure via a boolean, but I guess this is sort of like Object.wait() or Condition.wait(), both of which don't throw exceptions.

Also, it looks like the usages of this in StreamingContext support this theory:

  /**
   * Wait for the execution to stop. Any exceptions that occurs during the execution
   * will be thrown in this thread.
   */
  def awaitTermination() {
    waiter.waitForStopOrError()
  }

  /**
   * Wait for the execution to stop. Any exceptions that occurs during the execution
   * will be thrown in this thread.
   * @param timeout time to wait in milliseconds
   */
  def awaitTermination(timeout: Long) {
    waiter.waitForStopOrError(timeout)
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose the documentation for the SparkContext.awaitTermination(timeout) could be improved to convey what happens when the timeout occurs...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think if it's better to return Boolean to indicate if it's timeout? Although it will break the source compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option would be to just add a new def isTerminated: Boolean method, which would let users write code like

waiter.awaitTermination(1000)
if (!waiter.isTerminated) {
  throw Exception(...)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Done.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making awaitTermination throw a TimeoutException if timeout? It looks a better API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that will break a lot of existing programs for people. A lot of time (even in our own test cases) awaitTermination(timeout) is used for wait for a short period of time before check checking status or something. Currently that times out return quietly. If instead it starts throwing exceptions, then it will completely break some applications. So I strongly advise against changing this behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hindsight, instead of modeling awaitTermination against Akka ActorSystem's awaitTermination (which return Unit) , I should have modeled it like Java ExecutorService's awaitTermination which returns a Boolean. Now its not possible to change the API without breaking compatiblity. :(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hindsight, instead of modeling awaitTermination against Akka ActorSystem's awaitTermination (which return Unit) , I should have modeled it like Java ExecutorService's awaitTermination which returns a Boolean. Now its not possible to change the API without breaking compatiblity. :(

@tdas, Sorry that I forgot to reply you. You said you designed it just like Akka ActorSystem.awaitTermination. But ActorSystem.awaitTermination will throw a TimeoutException in case of timeout.

  /**
   * Block current thread until the system has been shutdown, or the specified
   * timeout has elapsed. This will block until after all on termination
   * callbacks have been run.
   *
   * @throws TimeoutException in case of timeout
   */
  def awaitTermination(timeout: Duration): Unit

lock.lock()
try {
if (timeout < 0) {
while (!stopped && error == null) {
condition.await()
}
} else {
var nanos = TimeUnit.MILLISECONDS.toNanos(timeout)
while (!stopped && error == null && nanos > 0) {
nanos = condition.awaitNanos(nanos)
}
}
// If already had error, then throw it
if (error != null) throw error
// already stopped or timeout
stopped
} finally {
lock.unlock()
}
}
}