Skip to content

Commit 6c124a9

Browse files
committed
Clean function in several RDD methods
1 parent 14502d5 commit 6c124a9

File tree

1 file changed

+8
-4
lines changed
  • core/src/main/scala/org/apache/spark/rdd

1 file changed

+8
-4
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -741,9 +741,10 @@ abstract class RDD[T: ClassTag](
741741
def mapWith[A, U: ClassTag]
742742
(constructA: Int => A, preservesPartitioning: Boolean = false)
743743
(f: (T, A) => U): RDD[U] = withScope {
744+
val cleanF = sc.clean(f)
744745
mapPartitionsWithIndex((index, iter) => {
745746
val a = constructA(index)
746-
iter.map(t => f(t, a))
747+
iter.map(t => cleanF(t, a))
747748
}, preservesPartitioning)
748749
}
749750

@@ -756,9 +757,10 @@ abstract class RDD[T: ClassTag](
756757
def flatMapWith[A, U: ClassTag]
757758
(constructA: Int => A, preservesPartitioning: Boolean = false)
758759
(f: (T, A) => Seq[U]): RDD[U] = withScope {
760+
val cleanF = sc.clean(f)
759761
mapPartitionsWithIndex((index, iter) => {
760762
val a = constructA(index)
761-
iter.flatMap(t => f(t, a))
763+
iter.flatMap(t => cleanF(t, a))
762764
}, preservesPartitioning)
763765
}
764766

@@ -769,9 +771,10 @@ abstract class RDD[T: ClassTag](
769771
*/
770772
@deprecated("use mapPartitionsWithIndex and foreach", "1.0.0")
771773
def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit): Unit = withScope {
774+
val cleanF = sc.clean(f)
772775
mapPartitionsWithIndex { (index, iter) =>
773776
val a = constructA(index)
774-
iter.map(t => {f(t, a); t})
777+
iter.map(t => {cleanF(t, a); t})
775778
}
776779
}
777780

@@ -901,7 +904,8 @@ abstract class RDD[T: ClassTag](
901904
* Return an RDD that contains all matching values by applying `f`.
902905
*/
903906
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
904-
filter(f.isDefinedAt).map(f)
907+
val cleanF = sc.clean(f)
908+
filter(cleanF.isDefinedAt).map(cleanF)
905909
}
906910

907911
/**

0 commit comments

Comments
 (0)