-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20464][SS] Add a job group and description for streaming queries and fix cancellation of running jobs using the job group #17765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #76152 has finished for PR 17765 at commit
|
|
Test build #76160 has finished for PR 17765 at commit
|
|
Test build #76162 has finished for PR 17765 at commit
|
|
Test build #76163 has finished for PR 17765 at commit
|
| private def getBatchDescriptionString: String = { | ||
| val batchDescription = if (currentBatchId < 0) "init" else currentBatchId.toString | ||
| Option(name).map(_ + " ").getOrElse("") + | ||
| s"[batch = $batchDescription, id = $id, runId = $runId]" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description looks good to me. @marmbrus what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| */ | ||
| private def runBatches(): Unit = { | ||
| try { | ||
| sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brkyvz is this okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this seems fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note here it is setJobGroup(..., interruptOnCancel = false). Should we add a sql conf for interruptOnCancel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when would you want to set this to true or false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had this set to false due to HDFS-1208, but setting it to true since the HDFS bug is 7 years old.
|
Test build #76198 has finished for PR 17765 at commit
|
|
|
||
| // Verify that this has only anchors and span (we are wrapping in span) | ||
| val allowedNodeLabels = Set("a", "span") | ||
| val allowedNodeLabels = Set("a", "span", "br") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdas Is this change okay? Need it to add line breaks in the job description cells.
| if (currentBatchId < 0) { | ||
| // We'll do this initialization only once | ||
| populateStartOffsets(sparkSessionToRunBatches) | ||
| sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update job description with correct currentBatchId after initializing starting offsets.
| logDebug(s"batch ${currentBatchId} committed") | ||
| // We'll increase currentBatchId after we complete processing current batch's data | ||
| currentBatchId += 1 | ||
| sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update job description with updated currentBatchId after each batch.
| private def getBatchDescriptionString: String = { | ||
| val batchDescription = if (currentBatchId < 0) "init" else currentBatchId.toString | ||
| Option(name).map(_ + " ").getOrElse("") + | ||
| s"[batch = $batchDescription,<br/>id = $id,<br/>runId = $runId]" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would get rid of the [] if you are going to use newlines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, updated.
| private def getBatchDescriptionString: String = { | ||
| val batchDescription = if (currentBatchId < 0) "init" else currentBatchId.toString | ||
| Option(name).map(_ + "<br/>").getOrElse("") + | ||
| s"id = $id<br/>runId = $runId<br/>batch = $batchDescription" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Test build #76246 has finished for PR 17765 at commit
|
|
Test build #76245 has finished for PR 17765 at commit
|
|
Test build #76249 has finished for PR 17765 at commit
|
|
Test build #76250 has finished for PR 17765 at commit
|
|
Test build #76251 has finished for PR 17765 at commit
|
|
LGTM! Any other comments @zsxwing? |
|
LGTM! |
|
Merging to master and 2.2. |
…es and fix cancellation of running jobs using the job group ## What changes were proposed in this pull request? Job group: adding a job group is required to properly cancel running jobs related to a query. Description: the new description makes it easier to group the batches of a query by sorting by name in the Spark Jobs UI. ## How was this patch tested? - Unit tests - UI screenshot - Order by job id:  - Order by description:  - Order by job id (no query name):  - Order by description (no query name):  Author: Kunal Khamar <[email protected]> Closes #17765 from kunalkhamar/sc-6696. (cherry picked from commit 6fc6cf8) Signed-off-by: Shixiong Zhu <[email protected]>
…es and fix cancellation of running jobs using the job group ## What changes were proposed in this pull request? Job group: adding a job group is required to properly cancel running jobs related to a query. Description: the new description makes it easier to group the batches of a query by sorting by name in the Spark Jobs UI. ## How was this patch tested? - Unit tests - UI screenshot - Order by job id:  - Order by description:  - Order by job id (no query name):  - Order by description (no query name):  Author: Kunal Khamar <[email protected]> Closes apache#17765 from kunalkhamar/sc-6696. (cherry picked from commit 6fc6cf8)
…es and fix cancellation of running jobs using the job group ## What changes were proposed in this pull request? Job group: adding a job group is required to properly cancel running jobs related to a query. Description: the new description makes it easier to group the batches of a query by sorting by name in the Spark Jobs UI. ## How was this patch tested? - Unit tests - UI screenshot - Order by job id:  - Order by description:  - Order by job id (no query name):  - Order by description (no query name):  Author: Kunal Khamar <[email protected]> Closes apache#17765 from kunalkhamar/sc-6696. (cherry picked from commit 6fc6cf8)
What changes were proposed in this pull request?
Job group: adding a job group is required to properly cancel running jobs related to a query.
Description: the new description makes it easier to group the batches of a query by sorting by name in the Spark Jobs UI.
How was this patch tested?
Unit tests
UI screenshot
Order by job id:

Order by description:

Order by job id (no query name):

Order by description (no query name):
