File tree Expand file tree Collapse file tree 2 files changed +5
-5
lines changed
main/scala/org/apache/spark/rdd
test/scala/org/apache/spark/util Expand file tree Collapse file tree 2 files changed +5
-5
lines changed Original file line number Diff line number Diff line change @@ -789,11 +789,11 @@ abstract class RDD[T: ClassTag](
789789 */
790790 @ deprecated(" use mapPartitionsWithIndex and filter" , " 1.0.0" )
791791 def filterWith [A ](constructA : Int => A )(p : (T , A ) => Boolean ): RDD [T ] = withScope {
792+ val cleanP = sc.clean(p)
793+ val cleanA = sc.clean(constructA)
792794 mapPartitionsWithIndex((index, iter) => {
793- /* val cleanP = sc.clean(p)
794- val cleanA = sc.clean(constructA) */
795- val a = constructA(index)
796- iter.filter(t => p(t, a))
795+ val a = cleanA(index)
796+ iter.filter(t => cleanP(t, a))
797797 }, preservesPartitioning = true )
798798 }
799799
Original file line number Diff line number Diff line change @@ -95,7 +95,7 @@ class ClosureCleanerSuite extends FunSuite {
9595 expectCorrectException { TestUserClosuresActuallyCleaned .testMapPartitionsWithIndex(rdd) }
9696 expectCorrectException { TestUserClosuresActuallyCleaned .testMapPartitionsWithContext(rdd) }
9797 expectCorrectException { TestUserClosuresActuallyCleaned .testFlatMapWith(rdd) }
98- // expectCorrectException { TestUserClosuresActuallyCleaned.testFilterWith(rdd) }
98+ expectCorrectException { TestUserClosuresActuallyCleaned .testFilterWith(rdd) }
9999 expectCorrectException { TestUserClosuresActuallyCleaned .testForEachWith(rdd) }
100100 expectCorrectException { TestUserClosuresActuallyCleaned .testMapWith(rdd) }
101101 expectCorrectException { TestUserClosuresActuallyCleaned .testZipPartitions2(rdd) }
You can’t perform that action at this time.
0 commit comments