-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4301] StreamingContext should not allow start() to be called after calling stop() #3160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
5558e70
813e471
5142517
832a7f4
03e9c40
bdbe5da
dbcc929
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -436,10 +436,10 @@ class StreamingContext private[streaming] ( | |
|
|
||
| /** | ||
| * Start the execution of the streams. | ||
| * | ||
| * @throws SparkException if the context has already been started or stopped. | ||
| */ | ||
| def start(): Unit = synchronized { | ||
| // Throw exception if the context has already been started once | ||
| // or if a stopped context is being started again | ||
| if (state == Started) { | ||
| throw new SparkException("StreamingContext has already been started") | ||
| } | ||
|
|
@@ -472,8 +472,9 @@ class StreamingContext private[streaming] ( | |
| /** | ||
| * Stop the execution of the streams immediately (does not wait for all received data | ||
| * to be processed). | ||
| * @param stopSparkContext Stop the associated SparkContext or not | ||
| * | ||
| * @param stopSparkContext if true, stops the associated SparkContext. The SparkContext will be | ||
| * stopped regardless of whether this StreamingContext has been started. | ||
| */ | ||
| def stop(stopSparkContext: Boolean = true): Unit = synchronized { | ||
| stop(stopSparkContext, false) | ||
|
|
@@ -482,25 +483,27 @@ class StreamingContext private[streaming] ( | |
| /** | ||
| * Stop the execution of the streams, with option of ensuring all received data | ||
| * has been processed. | ||
| * @param stopSparkContext Stop the associated SparkContext or not | ||
| * @param stopGracefully Stop gracefully by waiting for the processing of all | ||
| * | ||
| * @param stopSparkContext if true, stops the associated SparkContext. The SparkContext will be | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. |
||
| * stopped regardless of whether this StreamingContext has been started. | ||
| * @param stopGracefully if true, stops gracefully by waiting for the processing of all | ||
| * received data to be completed | ||
| */ | ||
| def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized { | ||
| // Warn (but not fail) if context is stopped twice, | ||
| // or context is stopped before starting | ||
| if (state == Initialized) { | ||
| logWarning("StreamingContext has not been started yet") | ||
| return | ||
| } | ||
| if (state == Stopped) { | ||
| logWarning("StreamingContext has already been stopped") | ||
| return | ||
| } // no need to throw an exception as its okay to stop twice | ||
| scheduler.stop(stopGracefully) | ||
| logInfo("StreamingContext stopped successfully") | ||
| waiter.notifyStop() | ||
| if (stopSparkContext) sc.stop() | ||
| } else { | ||
| // Even if the streaming context has not been started, we still need to stop the SparkContext: | ||
| if (stopSparkContext) sc.stop() | ||
| if (state == Initialized) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is the nested if necessary? Why not
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gah, my bad. It was a holdover from an earlier intermediate change. I'll just use a |
||
| logWarning("StreamingContext has not been started yet") | ||
| } else { | ||
| scheduler.stop(stopGracefully) | ||
| logInfo("StreamingContext stopped successfully") | ||
| waiter.notifyStop() | ||
| } | ||
| } | ||
| // The state should always be Stopped after calling `stop()`, even if we haven't started yet: | ||
| state = Stopped | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,10 +46,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w | |
| after { | ||
| if (ssc != null) { | ||
| ssc.stop() | ||
| if (ssc.sc != null) { | ||
| // Calling ssc.stop() does not always stop the associated SparkContext. | ||
| ssc.sc.stop() | ||
| } | ||
| ssc = null | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This reverts a workaround that Aaron added for this issue in https://github.com/apache/spark/pull/3053/files#diff-e144dbee130ed84f9465853ddce65f8eR49 |
||
| } | ||
| if (sc != null) { | ||
|
|
@@ -137,11 +133,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w | |
| ssc.stop() | ||
| } | ||
|
|
||
| test("stop before start and start after stop") { | ||
| test("stop before start") { | ||
| ssc = new StreamingContext(master, appName, batchDuration) | ||
| addInputStream(ssc).register() | ||
| ssc.stop() // stop before start should not throw exception | ||
| ssc.start() | ||
| } | ||
|
|
||
| test("start after stop") { | ||
| // Regression test for SPARK-4301 | ||
| ssc = new StreamingContext(master, appName, batchDuration) | ||
| addInputStream(ssc).register() | ||
| ssc.stop() | ||
| intercept[SparkException] { | ||
| ssc.start() // start after stop should throw exception | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add "underlying SparkContext"