-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19774] StreamExecution should call stop() on sources when a stream fails #17107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Just some nits.
| final val fakeSchema = StructType(StructField("a", IntegerType) :: Nil) | ||
|
|
||
| def withMockSources(sources: Source*)(f: => Unit): Unit = { | ||
| require(sources.nonEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can use withMockSources(firstSource: Source, otherSources: Source*) to make it become a compile error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea
| def withMockSources(sources: Source*)(f: => Unit): Unit = { | ||
| require(sources.nonEmpty) | ||
| var i = 0 | ||
| val srcProvider = () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: srcProvider is not necessary. You can just assign the func to sourceProviderFunction
|
@zsxwing Addressed |
|
Test build #73617 has finished for PR 17107 at commit
|
|
Test build #73619 has finished for PR 17107 at commit
|
|
Thanks. LGTM. Merging to master and 2.1. |
…ream fails ## What changes were proposed in this pull request? We call stop() on a Structured Streaming Source only when the stream is shutdown when a user calls streamingQuery.stop(). We should actually stop all sources when the stream fails as well, otherwise we may leak resources, e.g. connections to Kafka. ## How was this patch tested? Unit tests in `StreamingQuerySuite`. Author: Burak Yavuz <[email protected]> Closes #17107 from brkyvz/close-source. (cherry picked from commit 9314c08) Signed-off-by: Shixiong Zhu <[email protected]>
What changes were proposed in this pull request?
We call stop() on a Structured Streaming Source only when the stream is shutdown when a user calls streamingQuery.stop(). We should actually stop all sources when the stream fails as well, otherwise we may leak resources, e.g. connections to Kafka.
How was this patch tested?
Unit tests in
StreamingQuerySuite.