@@ -252,6 +252,7 @@ class StreamExecution(
252252 */
253253 private def runBatches (): Unit = {
254254 try {
255+ sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString)
255256 if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
256257 sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
257258 }
@@ -308,6 +309,7 @@ class StreamExecution(
308309 logDebug(s " batch ${currentBatchId} committed " )
309310 // We'll increase currentBatchId after we complete processing current batch's data
310311 currentBatchId += 1
312+ sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
311313 } else {
312314 currentStatus = currentStatus.copy(isDataAvailable = false )
313315 updateStatusMessage(" Waiting for data to arrive" )
@@ -418,6 +420,7 @@ class StreamExecution(
418420 /* First assume that we are re-executing the latest known batch
419421 * in the offset log */
420422 currentBatchId = latestBatchId
423+ sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
421424 availableOffsets = nextOffsets.toStreamProgress(sources)
422425 /* Initialize committed offsets to a committed batch, which at this
423426 * is the second latest batch id in the offset log. */
@@ -463,6 +466,7 @@ class StreamExecution(
463466 }
464467 }
465468 currentBatchId = latestCommittedBatchId + 1
469+ sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
466470 committedOffsets ++= availableOffsets
467471 // Construct a new batch be recomputing availableOffsets
468472 constructNextBatch()
@@ -478,6 +482,7 @@ class StreamExecution(
478482 case None => // We are starting this stream for the first time.
479483 logInfo(s " Starting new streaming query. " )
480484 currentBatchId = 0
485+ sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
481486 constructNextBatch()
482487 }
483488 }
@@ -590,8 +595,6 @@ class StreamExecution(
590595 * @param sparkSessionToRunBatch Isolated [[SparkSession ]] to run this batch with.
591596 */
592597 private def runBatch (sparkSessionToRunBatch : SparkSession ): Unit = {
593- sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString)
594-
595598 // Request unprocessed data from all sources.
596599 newData = reportTimeTaken(" getBatch" ) {
597600 availableOffsets.flatMap {
@@ -686,8 +689,11 @@ class StreamExecution(
686689 // intentionally
687690 state.set(TERMINATED )
688691 if (microBatchThread.isAlive) {
692+ sparkSession.sparkContext.cancelJobGroup(runId.toString)
689693 microBatchThread.interrupt()
690694 microBatchThread.join()
695+ // microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak
696+ sparkSession.sparkContext.cancelJobGroup(runId.toString)
691697 }
692698 logInfo(s " Query $prettyIdString was stopped " )
693699 }
@@ -828,7 +834,9 @@ class StreamExecution(
828834 }
829835
830836 private def getBatchDescriptionString : String = {
831- Option (name).map(_ + " " ).getOrElse(" " ) + s " [batch = $currentBatchId, id = $id, runId = $runId] "
837+ val batchDescription = if (currentBatchId < 0 ) " init" else currentBatchId.toString
838+ Option (name).map(_ + " " ).getOrElse(" " ) +
839+ s " [batch = $batchDescription, id = $id, runId = $runId] "
832840 }
833841}
834842
0 commit comments