Skip to content

Commit 8b50d93

Browse files
committed
Address Andrew's review comments
1 parent 0c8d47e commit 8b50d93

File tree

2 files changed

+20
-14
lines changed

2 files changed

+20
-14
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,8 @@ abstract class RDD[T: ClassTag](
717717
def mapPartitionsWithContext[U: ClassTag](
718718
f: (TaskContext, Iterator[T]) => Iterator[U],
719719
preservesPartitioning: Boolean = false): RDD[U] = withScope {
720-
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter)
720+
val cleanedF = sc.clean(f)
721+
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => cleanF(context, iter)
721722
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
722723
}
723724

@@ -789,8 +790,10 @@ abstract class RDD[T: ClassTag](
789790
@deprecated("use mapPartitionsWithIndex and filter", "1.0.0")
790791
def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = withScope {
791792
mapPartitionsWithIndex((index, iter) => {
792-
val a = constructA(index)
793-
iter.filter(t => p(t, a))
793+
val cleanP = sc.clean(p)
794+
val cleanA = sc.clean(constructA)
795+
val a = cleanA(index)
796+
iter.filter(t => cleanP(t, a))
794797
}, preservesPartitioning = true)
795798
}
796799

core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,10 @@ class ClosureCleanerSuite extends FunSuite {
9292
expectCorrectException { TestUserClosuresActuallyCleaned.testKeyBy(rdd) }
9393
expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitions(rdd) }
9494
expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithIndex(rdd) }
95+
expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithContext(rdd) }
9596
expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapWith(rdd) }
97+
expectCorrectException { TestUserClosuresActuallyCleaned.testFilterWith(rdd) }
98+
expectCorrectException { TestUserClosuresActuallyCleaned.testForEachWith(rdd) }
9699
expectCorrectException { TestUserClosuresActuallyCleaned.testMapWith(rdd) }
97100
expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions2(rdd) }
98101
expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions3(rdd) }
@@ -263,19 +266,19 @@ private object TestUserClosuresActuallyCleaned {
263266
rdd.mapPartitionsWithIndex { (_, it) => return; it }.count()
264267
}
265268
def testFlatMapWith(rdd: RDD[Int]): Unit = {
266-
import java.util.Random
267-
val randoms = rdd.flatMapWith(
268-
(index: Int) => new Random(index + 42))
269-
{(t: Int, prng: Random) =>
270-
val random = prng.nextDouble()
271-
Seq(random * t, random * t * 10)}.
272-
count()
269+
rdd.flatMapWith { (_, it) => return; it }.count()
273270
}
274271
def testMapWith(rdd: RDD[Int]): Unit = {
275-
import java.util.Random
276-
val randoms = rdd.mapWith(
277-
(index: Int) => new Random(index + 42))
278-
{(t: Int, prng: Random) => prng.nextDouble * t}.count()
272+
rdd.mapWith { (_, it) => return; it }.count()
273+
}
274+
def testFilterWith(rdd: RDD[Int]): Unit = {
275+
rdd.filterWith { (_, it) => return; it }.count()
276+
}
277+
def testForEachWith(rdd: RDD[Int]): Unit = {
278+
rdd.foreachWith { (_, it) => return; it }.count()
279+
}
280+
def testMapPartitionsWithContext(rdd: RDD[Int]): Unit = {
281+
rdd.mapPartitionsWithContext { (_, it) => return; it }.count()
279282
}
280283
def testZipPartitions2(rdd: RDD[Int]): Unit = {
281284
rdd.zipPartitions(rdd) { case (it1, it2) => return; it1 }.count()

0 commit comments

Comments
 (0)