Skip to content

Commit 395656c

Browse files
JoshRosentdas
authored andcommitted
[SPARK-4301] StreamingContext should not allow start() to be called after calling stop()
In Spark 1.0.0+, calling `stop()` on a StreamingContext that has not been started is a no-op which has no side-effects. This allows users to call `stop()` on a fresh StreamingContext followed by `start()`. I believe that this almost always indicates an error and is not behavior that we should support. Since we don't allow `start() stop() start()` then I don't think it makes sense to allow `stop() start()`. The current behavior can lead to resource leaks when StreamingContext constructs its own SparkContext: if I call `stop(stopSparkContext=True)`, then I expect StreamingContext's underlying SparkContext to be stopped irrespective of whether the StreamingContext has been started. This is useful when writing unit test fixtures. Prior discussions: - #3053 (diff) - #3121 (comment) Author: Josh Rosen <[email protected]> Closes #3160 from JoshRosen/SPARK-4301 and squashes the following commits: dbcc929 [Josh Rosen] Address more review comments bdbe5da [Josh Rosen] Stop SparkContext after stopping scheduler, not before. 03e9c40 [Josh Rosen] Always stop SparkContext, even if stop(false) has already been called. 832a7f4 [Josh Rosen] Address review comment 5142517 [Josh Rosen] Add tests; improve Scaladoc. 813e471 [Josh Rosen] Revert workaround added in https://github.com/apache/spark/pull/3053/files#diff-e144dbee130ed84f9465853ddce65f8eR49 5558e70 [Josh Rosen] StreamingContext.stop() should stop SparkContext even if StreamingContext has not been started yet. (cherry picked from commit 7b41b17) Signed-off-by: Tathagata Das <[email protected]>
1 parent d4aed26 commit 395656c

File tree

2 files changed

+40
-19
lines changed

2 files changed

+40
-19
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -424,10 +424,10 @@ class StreamingContext private[streaming] (
424424

425425
/**
426426
* Start the execution of the streams.
427+
*
428+
* @throws SparkException if the context has already been started or stopped.
427429
*/
428430
def start(): Unit = synchronized {
429-
// Throw exception if the context has already been started once
430-
// or if a stopped context is being started again
431431
if (state == Started) {
432432
throw new SparkException("StreamingContext has already been started")
433433
}
@@ -459,8 +459,10 @@ class StreamingContext private[streaming] (
459459
/**
460460
* Stop the execution of the streams immediately (does not wait for all received data
461461
* to be processed).
462-
* @param stopSparkContext Stop the associated SparkContext or not
463462
*
463+
* @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
464+
* will be stopped regardless of whether this StreamingContext has been
465+
* started.
464466
*/
465467
def stop(stopSparkContext: Boolean = true): Unit = synchronized {
466468
stop(stopSparkContext, false)
@@ -469,25 +471,27 @@ class StreamingContext private[streaming] (
469471
/**
470472
* Stop the execution of the streams, with option of ensuring all received data
471473
* has been processed.
472-
* @param stopSparkContext Stop the associated SparkContext or not
473-
* @param stopGracefully Stop gracefully by waiting for the processing of all
474+
*
475+
* @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
476+
* will be stopped regardless of whether this StreamingContext has been
477+
* started.
478+
* @param stopGracefully if true, stops gracefully by waiting for the processing of all
474479
* received data to be completed
475480
*/
476481
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
477-
// Warn (but not fail) if context is stopped twice,
478-
// or context is stopped before starting
479-
if (state == Initialized) {
480-
logWarning("StreamingContext has not been started yet")
481-
return
482+
state match {
483+
case Initialized => logWarning("StreamingContext has not been started yet")
484+
case Stopped => logWarning("StreamingContext has already been stopped")
485+
case Started =>
486+
scheduler.stop(stopGracefully)
487+
logInfo("StreamingContext stopped successfully")
488+
waiter.notifyStop()
482489
}
483-
if (state == Stopped) {
484-
logWarning("StreamingContext has already been stopped")
485-
return
486-
} // no need to throw an exception as its okay to stop twice
487-
scheduler.stop(stopGracefully)
488-
logInfo("StreamingContext stopped successfully")
489-
waiter.notifyStop()
490+
// Even if the streaming context has not been started, we still need to stop the SparkContext.
491+
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
492+
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
490493
if (stopSparkContext) sc.stop()
494+
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
491495
state = Stopped
492496
}
493497
}

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
132132
ssc.stop()
133133
}
134134

135-
test("stop before start and start after stop") {
135+
test("stop before start") {
136136
ssc = new StreamingContext(master, appName, batchDuration)
137137
addInputStream(ssc).register
138138
ssc.stop() // stop before start should not throw exception
139-
ssc.start()
139+
}
140+
141+
test("start after stop") {
142+
// Regression test for SPARK-4301
143+
ssc = new StreamingContext(master, appName, batchDuration)
144+
addInputStream(ssc).register()
140145
ssc.stop()
141146
intercept[SparkException] {
142147
ssc.start() // start after stop should throw exception
@@ -156,6 +161,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
156161
ssc.stop()
157162
}
158163

164+
test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {
165+
ssc = new StreamingContext(master, appName, batchDuration)
166+
addInputStream(ssc).register()
167+
ssc.stop(stopSparkContext = false)
168+
assert(ssc.sc.makeRDD(1 to 100).collect().size === 100)
169+
ssc.stop(stopSparkContext = true)
170+
// Check that the SparkContext is actually stopped:
171+
intercept[Exception] {
172+
ssc.sc.makeRDD(1 to 100).collect()
173+
}
174+
}
175+
159176
test("stop gracefully") {
160177
val conf = new SparkConf().setMaster(master).setAppName(appName)
161178
conf.set("spark.cleaner.ttl", "3600")

0 commit comments

Comments
 (0)