@@ -334,19 +334,22 @@ abstract class DStream[T: ClassTag] (
334334 * Get the RDD corresponding to the given time; either retrieve it from cache
335335 * or compute-and-cache it.
336336 */
337- private [streaming] final def getOrCompute (time : Time ): Option [RDD [T ]] = createRDDWith(time) {
337+ private [streaming] final def getOrCompute (time : Time ): Option [RDD [T ]] = {
338338 // If RDD was already generated, then retrieve it from HashMap,
339339 // or else compute the RDD
340340 generatedRDDs.get(time).orElse {
341341 // Compute the RDD if time is valid (e.g. correct time in a sliding window)
342342 // of RDD generation, else generate nothing.
343343 if (isTimeValid(time)) {
344- // Disable checks for existing output directories in jobs launched by the streaming
345- // scheduler, since we may need to write output to an existing directory during checkpoint
346- // recovery; see SPARK-4835 for more details. We need to have this call here because
347- // compute() might cause Spark jobs to be launched.
348- val rddOption = PairRDDFunctions .disableOutputSpecValidation.withValue(true ) {
349- compute(time)
344+
345+ val rddOption = createRDDWithLocalProperties(time) {
346+ // Disable checks for existing output directories in jobs launched by the streaming
347+ // scheduler, since we may need to write output to an existing directory during checkpoint
348+ // recovery; see SPARK-4835 for more details. We need to have this call here because
349+ // compute() might cause Spark jobs to be launched.
350+ PairRDDFunctions .disableOutputSpecValidation.withValue(true ) {
351+ compute(time)
352+ }
350353 }
351354
352355 rddOption.foreach { case newRDD =>
@@ -372,7 +375,7 @@ abstract class DStream[T: ClassTag] (
372375 * Wrap a body of code such that the call site and operation scope
373376 * information are passed to the RDDs created in this body properly.
374377 */
375- protected def createRDDWith [U ](time : Time )(body : => U ): U = {
378+ protected def createRDDWithLocalProperties [U ](time : Time )(body : => U ): U = {
376379 val scopeKey = SparkContext .RDD_SCOPE_KEY
377380 val scopeNoOverrideKey = SparkContext .RDD_SCOPE_NO_OVERRIDE_KEY
378381 // Pass this DStream's operation scope and creation site information to RDDs through
0 commit comments