Skip to content

Commit 53b9936

Browse files
author
Andrew Or
committed
Set scopes for foreachRDD properly
Previously, RDDs created inside the body of foreachRDD are not scoped properly, such that low level Spark operations surface to the UI. This is now fixed such that these RDDs are wrapped in the `foreachRDD` scope.
1 parent 1881802 commit 53b9936

File tree

2 files changed

+15
-15
lines changed

2 files changed

+15
-15
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ abstract class DStream[T: ClassTag] (
134134
*/
135135
private def makeScope(time: Time): Option[RDDOperationScope] = {
136136
baseScope.map { bsJson =>
137-
val formattedBatchTime =
138-
UIUtils.formatBatchTime(time.milliseconds, ssc.graph.batchDuration.milliseconds)
137+
val formattedBatchTime = UIUtils.formatBatchTime(
138+
time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
139139
val bs = RDDOperationScope.fromJson(bsJson)
140140
val baseName = bs.name // e.g. countByWindow, "kafka stream [0]"
141141
val scopeName =
@@ -334,14 +334,20 @@ abstract class DStream[T: ClassTag] (
334334
* Get the RDD corresponding to the given time; either retrieve it from cache
335335
* or compute-and-cache it.
336336
*/
337-
private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
337+
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = createRDDWith(time) {
338338
// If RDD was already generated, then retrieve it from HashMap,
339339
// or else compute the RDD
340340
generatedRDDs.get(time).orElse {
341341
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
342342
// of RDD generation, else generate nothing.
343343
if (isTimeValid(time)) {
344-
val rddOption = doCompute(time)
344+
// Disable checks for existing output directories in jobs launched by the streaming
345+
// scheduler, since we may need to write output to an existing directory during checkpoint
346+
// recovery; see SPARK-4835 for more details. We need to have this call here because
347+
// compute() might cause Spark jobs to be launched.
348+
val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
349+
compute(time)
350+
}
345351

346352
rddOption.foreach { case newRDD =>
347353
// Register the generated RDD for caching and checkpointing
@@ -363,10 +369,10 @@ abstract class DStream[T: ClassTag] (
363369
}
364370

365371
/**
366-
* Helper method to generate an RDD for the specified time.
367-
* This sets and resets the relevant local variables before and after the call to compute.
372+
* Wrap a body of code such that the call site and operation scope
373+
* information are passed to the RDDs created in this body properly.
368374
*/
369-
private def doCompute(time: Time): Option[RDD[T]] = {
375+
protected def createRDDWith[U](time: Time)(body: => U): U = {
370376
val scopeKey = SparkContext.RDD_SCOPE_KEY
371377
val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
372378
// Pass this DStream's operation scope and creation site information to RDDs through
@@ -388,13 +394,7 @@ abstract class DStream[T: ClassTag] (
388394
ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
389395
}
390396

391-
// Disable checks for existing output directories in jobs launched by the streaming
392-
// scheduler, since we may need to write output to an existing directory during checkpoint
393-
// recovery; see SPARK-4835 for more details. We need to have this call here because
394-
// compute() might cause Spark jobs to be launched.
395-
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
396-
compute(time)
397-
}
397+
body
398398
} finally {
399399
// Restore any state that was modified before returning
400400
ssc.sparkContext.setCallSite(prevCallSite)

streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class ForEachDStream[T: ClassTag] (
3737
override def generateJob(time: Time): Option[Job] = {
3838
parent.getOrCompute(time) match {
3939
case Some(rdd) =>
40-
val jobFunc = () => {
40+
val jobFunc = () => createRDDWith(time) {
4141
ssc.sparkContext.setCallSite(creationSite)
4242
foreachFunc(rdd, time)
4343
}

0 commit comments

Comments
 (0)