File tree Expand file tree Collapse file tree 1 file changed +12
-12
lines changed
streaming/src/main/scala/org/apache/spark/streaming Expand file tree Collapse file tree 1 file changed +12
-12
lines changed Original file line number Diff line number Diff line change @@ -487,20 +487,20 @@ class StreamingContext private[streaming] (
487487 * received data to be completed
488488 */
489489 def stop (stopSparkContext : Boolean , stopGracefully : Boolean ): Unit = synchronized {
490- // Warn (but not fail) if context is stopped twice,
491- // or context is stopped before starting
492- if (state == Initialized ) {
493- logWarning(" StreamingContext has not been started yet" )
494- return
495- }
496490 if (state == Stopped ) {
497491 logWarning(" StreamingContext has already been stopped" )
498- return
499- } // no need to throw an exception as its okay to stop twice
500- scheduler.stop(stopGracefully)
501- logInfo(" StreamingContext stopped successfully" )
502- waiter.notifyStop()
503- if (stopSparkContext) sc.stop()
492+ } else {
493+ // Even if the streaming context has not been started, we still need to stop the SparkContext:
494+ if (stopSparkContext) sc.stop()
495+ if (state == Initialized ) {
496+ logWarning(" StreamingContext has not been started yet" )
497+ } else {
498+ scheduler.stop(stopGracefully)
499+ logInfo(" StreamingContext stopped successfully" )
500+ waiter.notifyStop()
501+ }
502+ }
503+ // The state should always be Stopped after calling `stop()`, even if we haven't started yet:
504504 state = Stopped
505505 }
506506}
You can’t perform that action at this time.
0 commit comments