-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4301] StreamingContext should not allow start() to be called after calling stop() #3160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
5558e70
813e471
5142517
832a7f4
03e9c40
bdbe5da
dbcc929
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,10 +46,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w | |
| after { | ||
| if (ssc != null) { | ||
| ssc.stop() | ||
| if (ssc.sc != null) { | ||
| // Calling ssc.stop() does not always stop the associated SparkContext. | ||
| ssc.sc.stop() | ||
| } | ||
| ssc = null | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This reverts a workaround that Aaron added for this issue in https://github.com/apache/spark/pull/3053/files#diff-e144dbee130ed84f9465853ddce65f8eR49 |
||
| } | ||
| if (sc != null) { | ||
|
|
@@ -137,11 +133,28 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w | |
| ssc.stop() | ||
| } | ||
|
|
||
| test("stop before start and start after stop") { | ||
| test("stop before start") { | ||
| ssc = new StreamingContext(master, appName, batchDuration) | ||
| addInputStream(ssc).register() | ||
| ssc.stop() // stop before start should not throw exception | ||
| ssc.start() | ||
| } | ||
|
|
||
| test("stop(stopContext=true) after stopSparkContext(stopContext=false)") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor nit: This unit test should be logically after the "stop only streaming context", as that tests the |
||
| ssc = new StreamingContext(master, appName, batchDuration) | ||
| addInputStream(ssc).register() | ||
| ssc.stop(stopSparkContext = false) | ||
| assert(ssc.sc.makeRDD(1 to 100).collect().size === 100) | ||
| ssc.stop(stopSparkContext = true) | ||
| // Check that the SparkContext is actually stopped: | ||
| intercept[Exception] { | ||
| ssc.sc.makeRDD(1 to 100).collect() | ||
| } | ||
| } | ||
|
|
||
| test("start after stop") { | ||
| // Regression test for SPARK-4301 | ||
| ssc = new StreamingContext(master, appName, batchDuration) | ||
| addInputStream(ssc).register() | ||
| ssc.stop() | ||
| intercept[SparkException] { | ||
| ssc.start() // start after stop should throw exception | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the nested if necessary? Why not
if (state == Stopped) { ... } else if (state == Initialized) { ... } else { ... }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gah, my bad. It was a holdover from an earlier intermediate change. I'll just use a
matchstatement.