Skip to content

Commit 6f0d296

Browse files
kunalkhamarzsxwing
authored andcommitted
[SPARK-20464][SS] Add a job group and description for streaming queries and fix cancellation of running jobs using the job group
## What changes were proposed in this pull request? Job group: adding a job group is required to properly cancel running jobs related to a query. Description: the new description makes it easier to group the batches of a query by sorting by name in the Spark Jobs UI. ## How was this patch tested? - Unit tests - UI screenshot - Order by job id: ![screen shot 2017-04-27 at 5 10 09 pm](https://cloud.githubusercontent.com/assets/7865120/25509468/15452274-2b6e-11e7-87ba-d929816688cf.png) - Order by description: ![screen shot 2017-04-27 at 5 10 22 pm](https://cloud.githubusercontent.com/assets/7865120/25509474/1c298512-2b6e-11e7-99b8-fef1ef7665c1.png) - Order by job id (no query name): ![screen shot 2017-04-27 at 5 21 33 pm](https://cloud.githubusercontent.com/assets/7865120/25509482/28c96dc8-2b6e-11e7-8df0-9d3cdbb05e36.png) - Order by description (no query name): ![screen shot 2017-04-27 at 5 21 44 pm](https://cloud.githubusercontent.com/assets/7865120/25509489/37674742-2b6e-11e7-9357-b5c38ec16ac4.png) Author: Kunal Khamar <[email protected]> Closes #17765 from kunalkhamar/sc-6696. (cherry picked from commit 6fc6cf8) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 38edb92 commit 6f0d296

File tree

3 files changed

+79
-1
lines changed

3 files changed

+79
-1
lines changed

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
446446
val xml = XML.loadString(s"""<span class="description-input">$desc</span>""")
447447

448448
// Verify that this has only anchors and span (we are wrapping in span)
449-
val allowedNodeLabels = Set("a", "span")
449+
val allowedNodeLabels = Set("a", "span", "br")
450450
val illegalNodes = xml \\ "_" filterNot { case node: Node =>
451451
allowedNodeLabels.contains(node.label)
452452
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,8 @@ class StreamExecution(
252252
*/
253253
private def runBatches(): Unit = {
254254
try {
255+
sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString,
256+
interruptOnCancel = true)
255257
if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
256258
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
257259
}
@@ -289,6 +291,7 @@ class StreamExecution(
289291
if (currentBatchId < 0) {
290292
// We'll do this initialization only once
291293
populateStartOffsets(sparkSessionToRunBatches)
294+
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
292295
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
293296
} else {
294297
constructNextBatch()
@@ -308,6 +311,7 @@ class StreamExecution(
308311
logDebug(s"batch ${currentBatchId} committed")
309312
// We'll increase currentBatchId after we complete processing current batch's data
310313
currentBatchId += 1
314+
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
311315
} else {
312316
currentStatus = currentStatus.copy(isDataAvailable = false)
313317
updateStatusMessage("Waiting for data to arrive")
@@ -684,8 +688,11 @@ class StreamExecution(
684688
// intentionally
685689
state.set(TERMINATED)
686690
if (microBatchThread.isAlive) {
691+
sparkSession.sparkContext.cancelJobGroup(runId.toString)
687692
microBatchThread.interrupt()
688693
microBatchThread.join()
694+
// microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak
695+
sparkSession.sparkContext.cancelJobGroup(runId.toString)
689696
}
690697
logInfo(s"Query $prettyIdString was stopped")
691698
}
@@ -825,6 +832,11 @@ class StreamExecution(
825832
}
826833
}
827834

835+
private def getBatchDescriptionString: String = {
836+
val batchDescription = if (currentBatchId < 0) "init" else currentBatchId.toString
837+
Option(name).map(_ + "<br/>").getOrElse("") +
838+
s"id = $id<br/>runId = $runId<br/>batch = $batchDescription"
839+
}
828840
}
829841

830842

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import scala.util.control.ControlThrowable
2525

2626
import org.apache.commons.io.FileUtils
2727

28+
import org.apache.spark.SparkContext
29+
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
2830
import org.apache.spark.sql._
2931
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
3032
import org.apache.spark.sql.execution.command.ExplainCommand
@@ -500,6 +502,70 @@ class StreamSuite extends StreamTest {
500502
}
501503
}
502504
}
505+
506+
test("calling stop() on a query cancels related jobs") {
507+
val input = MemoryStream[Int]
508+
val query = input
509+
.toDS()
510+
.map { i =>
511+
while (!org.apache.spark.TaskContext.get().isInterrupted()) {
512+
// keep looping till interrupted by query.stop()
513+
Thread.sleep(100)
514+
}
515+
i
516+
}
517+
.writeStream
518+
.format("console")
519+
.start()
520+
521+
input.addData(1)
522+
// wait for jobs to start
523+
eventually(timeout(streamingTimeout)) {
524+
assert(sparkContext.statusTracker.getActiveJobIds().nonEmpty)
525+
}
526+
527+
query.stop()
528+
// make sure jobs are stopped
529+
eventually(timeout(streamingTimeout)) {
530+
assert(sparkContext.statusTracker.getActiveJobIds().isEmpty)
531+
}
532+
}
533+
534+
test("batch id is updated correctly in the job description") {
535+
val queryName = "memStream"
536+
@volatile var jobDescription: String = null
537+
def assertDescContainsQueryNameAnd(batch: Integer): Unit = {
538+
// wait for listener event to be processed
539+
spark.sparkContext.listenerBus.waitUntilEmpty(streamingTimeout.toMillis)
540+
assert(jobDescription.contains(queryName) && jobDescription.contains(s"batch = $batch"))
541+
}
542+
543+
spark.sparkContext.addSparkListener(new SparkListener {
544+
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
545+
jobDescription = jobStart.properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)
546+
}
547+
})
548+
549+
val input = MemoryStream[Int]
550+
val query = input
551+
.toDS()
552+
.map(_ + 1)
553+
.writeStream
554+
.format("memory")
555+
.queryName(queryName)
556+
.start()
557+
558+
input.addData(1)
559+
query.processAllAvailable()
560+
assertDescContainsQueryNameAnd(batch = 0)
561+
input.addData(2, 3)
562+
query.processAllAvailable()
563+
assertDescContainsQueryNameAnd(batch = 1)
564+
input.addData(4)
565+
query.processAllAvailable()
566+
assertDescContainsQueryNameAnd(batch = 2)
567+
query.stop()
568+
}
503569
}
504570

505571
abstract class FakeSource extends StreamSourceProvider {

0 commit comments

Comments
 (0)