-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29352][SQL][SS] Track active streaming queries in the SparkSession.sharedState #26018
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @zsxwing |
|
Test build #111760 has finished for PR 26018 at commit
|
|
Test build #111783 has started for PR 26018 at commit |
| } | ||
| } | ||
|
|
||
| testQuietly( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a sake of understanding, this patch is intended to prevent starting multiple instances of the same streaming query in the different sessions (while it was allowed to do this and it would probably incur some problem), right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. This is the specific test which would have previously failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, this test case doesn't fail without this patch because this is for single session test case. The above test case (line 276) fails without this patch correctly.
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. It would keep isolation of active queries between SparkSession and only add restriction of running same streaming query concurrently across multiple SparkSessions.
|
Test build #111791 has finished for PR 26018 at commit
|
|
Test build #111790 has finished for PR 26018 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| val activeOption = | ||
| Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this)) | ||
| if (activeOption.isDefined) { | ||
| if (activeOption.isDefined || activeQueries.values.exists(_.id == query.id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious: are you seeing the actual case where activeOption is None but activeQueries contain such query? I'm not seeing the case, though I don't think adding this would hurt.
|
Retest this please. |
|
Test build #111984 has finished for PR 26018 at commit
|
| // Make sure no other query with same id is active across all sessions | ||
| val activeOption = | ||
| Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this)) | ||
| if (activeOption.isDefined || activeQueries.values.exists(_.id == query.id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that one ) is missed after query.id). Style check and compilation failure.
[error] /home/jenkins/workspace/SparkPullRequestBuilder@2/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala: Expected token RPAREN but got Token(DOT,.,14050,.)
[error] Total time: 20 s, completed Oct 12, 2019 4:11:49 PM
|
Test build #111996 has finished for PR 26018 at commit
|
|
Retest this please. |
| query2.stop() | ||
| } | ||
|
|
||
| testQuietly("can't start multiple instances of the same streaming query in the same session") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test case names seems to be switched, @brkyvz .
in the same session -> in the different sessions?
| } | ||
|
|
||
| testQuietly( | ||
| "can't start multiple instances of the same streaming query in the different sessions") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the test body, in the different sessions -> in the same session?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. I named them wrong :)
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM (except the switched test case names). Thank you for adding more missed test cases, @brkyvz .
|
Test build #112083 has finished for PR 26018 at commit
|
|
Test build #112455 has finished for PR 26018 at commit
|
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM again.
|
Merging to master! Thanks all :) |
What changes were proposed in this pull request?
This moves the tracking of active queries from a per SparkSession state, to the shared SparkSession for better safety in isolated Spark Session environments.
Why are the changes needed?
We have checks to prevent the restarting of the same stream on the same spark session, but we can actually make that better in multi-tenant environments by actually putting that state in the SharedState instead of SessionState. This would allow a more comprehensive check for multi-tenant clusters.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added tests to StreamingQueryManagerSuite