From 6c124a9ddeb19541c381c3eea0bc13db38ce7574 Mon Sep 17 00:00:00 2001 From: tedyu Date: Wed, 6 May 2015 19:23:46 -0700 Subject: [PATCH 01/12] Clean function in several RDD methods --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index b3b60578c92e..2ee29549ea48 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -741,9 +741,10 @@ abstract class RDD[T: ClassTag]( def mapWith[A, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) (f: (T, A) => U): RDD[U] = withScope { + val cleanF = sc.clean(f) mapPartitionsWithIndex((index, iter) => { val a = constructA(index) - iter.map(t => f(t, a)) + iter.map(t => cleanF(t, a)) }, preservesPartitioning) } @@ -756,9 +757,10 @@ abstract class RDD[T: ClassTag]( def flatMapWith[A, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) (f: (T, A) => Seq[U]): RDD[U] = withScope { + val cleanF = sc.clean(f) mapPartitionsWithIndex((index, iter) => { val a = constructA(index) - iter.flatMap(t => f(t, a)) + iter.flatMap(t => cleanF(t, a)) }, preservesPartitioning) } @@ -769,9 +771,10 @@ abstract class RDD[T: ClassTag]( */ @deprecated("use mapPartitionsWithIndex and foreach", "1.0.0") def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit): Unit = withScope { + val cleanF = sc.clean(f) mapPartitionsWithIndex { (index, iter) => val a = constructA(index) - iter.map(t => {f(t, a); t}) + iter.map(t => {cleanF(t, a); t}) } } @@ -901,7 +904,8 @@ abstract class RDD[T: ClassTag]( * Return an RDD that contains all matching values by applying `f`. */ def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope { - filter(f.isDefinedAt).map(f) + val cleanF = sc.clean(f) + filter(cleanF.isDefinedAt).map(cleanF) } /** From 6846e40ada0c7df58ba8c549b87571bb052b264e Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 7 May 2015 14:08:55 -0700 Subject: [PATCH 02/12] Add test for flatMapWith() --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 9 ++++++--- .../org/apache/spark/util/ClosureCleanerSuite.scala | 11 +++++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 2ee29549ea48..e118fbfdf3fd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -742,8 +742,9 @@ abstract class RDD[T: ClassTag]( (constructA: Int => A, preservesPartitioning: Boolean = false) (f: (T, A) => U): RDD[U] = withScope { val cleanF = sc.clean(f) + val cleanA = sc.clean(constructA) mapPartitionsWithIndex((index, iter) => { - val a = constructA(index) + val a = cleanA(index) iter.map(t => cleanF(t, a)) }, preservesPartitioning) } @@ -758,8 +759,9 @@ abstract class RDD[T: ClassTag]( (constructA: Int => A, preservesPartitioning: Boolean = false) (f: (T, A) => Seq[U]): RDD[U] = withScope { val cleanF = sc.clean(f) + val cleanA = sc.clean(constructA) mapPartitionsWithIndex((index, iter) => { - val a = constructA(index) + val a = cleanA(index) iter.flatMap(t => cleanF(t, a)) }, preservesPartitioning) } @@ -772,8 +774,9 @@ abstract class RDD[T: ClassTag]( @deprecated("use mapPartitionsWithIndex and foreach", "1.0.0") def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit): Unit = withScope { val cleanF = sc.clean(f) + val cleanA = sc.clean(constructA) mapPartitionsWithIndex { (index, iter) => - val a = constructA(index) + val a = cleanA(index) iter.map(t => {cleanF(t, a); t}) } } diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 446c3f24a74d..858a2cee368e 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -92,6 +92,7 @@ class ClosureCleanerSuite extends FunSuite { expectCorrectException { TestUserClosuresActuallyCleaned.testKeyBy(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitions(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithIndex(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapWith(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions2(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions3(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions4(rdd) } @@ -260,6 +261,16 @@ private object TestUserClosuresActuallyCleaned { def testMapPartitionsWithIndex(rdd: RDD[Int]): Unit = { rdd.mapPartitionsWithIndex { (_, it) => return; it }.count() } + def testFlatMapWith(rdd: RDD[Int]): Unit = { + import java.util.Random + val randoms = rdd.flatMapWith( + (index: Int) => new Random(index + 42)) + {(t: Int, prng: Random) => + val random = prng.nextDouble() + Seq(random * t, random * t * 10)}. + count() + rdd.mapPartitionsWithIndex { (_, it) => return; it }.count() + } def testZipPartitions2(rdd: RDD[Int]): Unit = { rdd.zipPartitions(rdd) { case (it1, it2) => return; it1 }.count() } From 0c8d47e2538c08b6686b569fdfc882c47a0e91dc Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 7 May 2015 14:13:54 -0700 Subject: [PATCH 03/12] Add test for mapWith() --- .../scala/org/apache/spark/util/ClosureCleanerSuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 858a2cee368e..7a0f66eff7b7 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -93,6 +93,7 @@ class ClosureCleanerSuite extends FunSuite { expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitions(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithIndex(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapWith(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testMapWith(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions2(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions3(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions4(rdd) } @@ -269,7 +270,12 @@ private object TestUserClosuresActuallyCleaned { val random = prng.nextDouble() Seq(random * t, random * t * 10)}. count() - rdd.mapPartitionsWithIndex { (_, it) => return; it }.count() + } + def testMapWith(rdd: RDD[Int]): Unit = { + import java.util.Random + val randoms = rdd.mapWith( + (index: Int) => new Random(index + 42)) + {(t: Int, prng: Random) => prng.nextDouble * t}.count() } def testZipPartitions2(rdd: RDD[Int]): Unit = { rdd.zipPartitions(rdd) { case (it1, it2) => return; it1 }.count() From 8b50d939e273746b11e963022dc17509952c498b Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 7 May 2015 16:17:01 -0700 Subject: [PATCH 04/12] Address Andrew's review comments --- .../main/scala/org/apache/spark/rdd/RDD.scala | 9 ++++--- .../spark/util/ClosureCleanerSuite.scala | 25 +++++++++++-------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e118fbfdf3fd..590602f60dff 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -717,7 +717,8 @@ abstract class RDD[T: ClassTag]( def mapPartitionsWithContext[U: ClassTag]( f: (TaskContext, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { - val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter) + val cleanedF = sc.clean(f) + val func = (context: TaskContext, index: Int, iter: Iterator[T]) => cleanF(context, iter) new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) } @@ -789,8 +790,10 @@ abstract class RDD[T: ClassTag]( @deprecated("use mapPartitionsWithIndex and filter", "1.0.0") def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = withScope { mapPartitionsWithIndex((index, iter) => { - val a = constructA(index) - iter.filter(t => p(t, a)) + val cleanP = sc.clean(p) + val cleanA = sc.clean(constructA) + val a = cleanA(index) + iter.filter(t => cleanP(t, a)) }, preservesPartitioning = true) } diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 7a0f66eff7b7..9e0bd294f13c 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -92,7 +92,10 @@ class ClosureCleanerSuite extends FunSuite { expectCorrectException { TestUserClosuresActuallyCleaned.testKeyBy(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitions(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithIndex(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithContext(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapWith(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testFilterWith(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testForEachWith(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testMapWith(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions2(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions3(rdd) } @@ -263,19 +266,19 @@ private object TestUserClosuresActuallyCleaned { rdd.mapPartitionsWithIndex { (_, it) => return; it }.count() } def testFlatMapWith(rdd: RDD[Int]): Unit = { - import java.util.Random - val randoms = rdd.flatMapWith( - (index: Int) => new Random(index + 42)) - {(t: Int, prng: Random) => - val random = prng.nextDouble() - Seq(random * t, random * t * 10)}. - count() + rdd.flatMapWith { (_, it) => return; it }.count() } def testMapWith(rdd: RDD[Int]): Unit = { - import java.util.Random - val randoms = rdd.mapWith( - (index: Int) => new Random(index + 42)) - {(t: Int, prng: Random) => prng.nextDouble * t}.count() + rdd.mapWith { (_, it) => return; it }.count() + } + def testFilterWith(rdd: RDD[Int]): Unit = { + rdd.filterWith { (_, it) => return; it }.count() + } + def testForEachWith(rdd: RDD[Int]): Unit = { + rdd.foreachWith { (_, it) => return; it }.count() + } + def testMapPartitionsWithContext(rdd: RDD[Int]): Unit = { + rdd.mapPartitionsWithContext { (_, it) => return; it }.count() } def testZipPartitions2(rdd: RDD[Int]): Unit = { rdd.zipPartitions(rdd) { case (it1, it2) => return; it1 }.count() From 164d3e408c7d112a2a5652651dc65e0800969bd6 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 7 May 2015 16:28:03 -0700 Subject: [PATCH 05/12] Correct variable name --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 590602f60dff..233fcecf6b5c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -717,7 +717,7 @@ abstract class RDD[T: ClassTag]( def mapPartitionsWithContext[U: ClassTag]( f: (TaskContext, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { - val cleanedF = sc.clean(f) + val cleanF = sc.clean(f) val func = (context: TaskContext, index: Int, iter: Iterator[T]) => cleanF(context, iter) new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) } From d92bfcf892caab87438d5b7acbd32257fa8e6c0e Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 7 May 2015 17:38:09 -0700 Subject: [PATCH 06/12] Correct syntax in test --- .../scala/org/apache/spark/util/ClosureCleanerSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 9e0bd294f13c..928b76377e06 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -266,16 +266,16 @@ private object TestUserClosuresActuallyCleaned { rdd.mapPartitionsWithIndex { (_, it) => return; it }.count() } def testFlatMapWith(rdd: RDD[Int]): Unit = { - rdd.flatMapWith { (_, it) => return; it }.count() + rdd.flatMapWith { (it) => return; it }.count() } def testMapWith(rdd: RDD[Int]): Unit = { - rdd.mapWith { (_, it) => return; it }.count() + rdd.mapWith { (it) => return; it }.count() } def testFilterWith(rdd: RDD[Int]): Unit = { - rdd.filterWith { (_, it) => return; it }.count() + rdd.filterWith { (it) => return; it }.count() } def testForEachWith(rdd: RDD[Int]): Unit = { - rdd.foreachWith { (_, it) => return; it }.count() + rdd.foreachWith { (it) => return; it }.count() } def testMapPartitionsWithContext(rdd: RDD[Int]): Unit = { rdd.mapPartitionsWithContext { (_, it) => return; it }.count() From c2786dfe3c0a084c3b96ac5bdd1165047e00dfb0 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 7 May 2015 18:48:50 -0700 Subject: [PATCH 07/12] Correct syntax --- .../scala/org/apache/spark/util/ClosureCleanerSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 928b76377e06..6c888611d214 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -266,16 +266,16 @@ private object TestUserClosuresActuallyCleaned { rdd.mapPartitionsWithIndex { (_, it) => return; it }.count() } def testFlatMapWith(rdd: RDD[Int]): Unit = { - rdd.flatMapWith { (it) => return; it }.count() + rdd.flatMapWith { _ => return; Seq() }.count() } def testMapWith(rdd: RDD[Int]): Unit = { - rdd.mapWith { (it) => return; it }.count() + rdd.mapWith { _ => return; 0 }.count() } def testFilterWith(rdd: RDD[Int]): Unit = { - rdd.filterWith { (it) => return; it }.count() + rdd.filterWith { _ => return; true }.count() } def testForEachWith(rdd: RDD[Int]): Unit = { - rdd.foreachWith { (it) => return; it }.count() + rdd.foreachWith { _ => return } } def testMapPartitionsWithContext(rdd: RDD[Int]): Unit = { rdd.mapPartitionsWithContext { (_, it) => return; it }.count() From 55d01eb0607c95425b096dc812a513c75eb190f4 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 7 May 2015 20:22:16 -0700 Subject: [PATCH 08/12] Try to get correct syntax --- .../org/apache/spark/util/ClosureCleanerSuite.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 6c888611d214..3fac11d03b14 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -266,16 +266,20 @@ private object TestUserClosuresActuallyCleaned { rdd.mapPartitionsWithIndex { (_, it) => return; it }.count() } def testFlatMapWith(rdd: RDD[Int]): Unit = { - rdd.flatMapWith { _ => return; Seq() }.count() + import java.util.Random + rdd.flatMapWith ((index: Int) => new Random(index + 42)){ _ => return; Seq() }.count() } def testMapWith(rdd: RDD[Int]): Unit = { - rdd.mapWith { _ => return; 0 }.count() + import java.util.Random + rdd.mapWith ((index: Int) => new Random(index + 42)){ _ => return; 0 }.count() } def testFilterWith(rdd: RDD[Int]): Unit = { - rdd.filterWith { _ => return; true }.count() + import java.util.Random + rdd.filterWith ((index: Int) => new Random(index + 42)){ _ => return; true }.count() } def testForEachWith(rdd: RDD[Int]): Unit = { - rdd.foreachWith { _ => return } + import java.util.Random + rdd.foreachWith ((index: Int) => new Random(index + 42)){ _ => return } } def testMapPartitionsWithContext(rdd: RDD[Int]): Unit = { rdd.mapPartitionsWithContext { (_, it) => return; it }.count() From 36feb6cbd3c8834c8ac0c1fcfc60a1f63169001f Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 7 May 2015 21:21:19 -0700 Subject: [PATCH 09/12] Try to get correct syntax --- .../scala/org/apache/spark/util/ClosureCleanerSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 3fac11d03b14..4e1cc08fdb59 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -267,19 +267,19 @@ private object TestUserClosuresActuallyCleaned { } def testFlatMapWith(rdd: RDD[Int]): Unit = { import java.util.Random - rdd.flatMapWith ((index: Int) => new Random(index + 42)){ _ => return; Seq() }.count() + rdd.flatMapWith ((index: Int) => new Random(index + 42)){ (_, it) => return; Seq() }.count() } def testMapWith(rdd: RDD[Int]): Unit = { import java.util.Random - rdd.mapWith ((index: Int) => new Random(index + 42)){ _ => return; 0 }.count() + rdd.mapWith ((index: Int) => new Random(index + 42)){ (_, it) => return; 0 }.count() } def testFilterWith(rdd: RDD[Int]): Unit = { import java.util.Random - rdd.filterWith ((index: Int) => new Random(index + 42)){ _ => return; true }.count() + rdd.filterWith ((index: Int) => new Random(index + 42)){ (_, it) => return; true }.count() } def testForEachWith(rdd: RDD[Int]): Unit = { import java.util.Random - rdd.foreachWith ((index: Int) => new Random(index + 42)){ _ => return } + rdd.foreachWith ((index: Int) => new Random(index + 42)){ (_, it) => return } } def testMapPartitionsWithContext(rdd: RDD[Int]): Unit = { rdd.mapPartitionsWithContext { (_, it) => return; it }.count() From f6014c0a3ff55b33ebb51fd8eef471bb15f58b2f Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 8 May 2015 06:41:32 -0700 Subject: [PATCH 10/12] Remove cleaning in RDD#filterWith --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 8 ++++---- .../scala/org/apache/spark/util/ClosureCleanerSuite.scala | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 233fcecf6b5c..92d75f447098 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -790,10 +790,10 @@ abstract class RDD[T: ClassTag]( @deprecated("use mapPartitionsWithIndex and filter", "1.0.0") def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = withScope { mapPartitionsWithIndex((index, iter) => { - val cleanP = sc.clean(p) - val cleanA = sc.clean(constructA) - val a = cleanA(index) - iter.filter(t => cleanP(t, a)) + /* val cleanP = sc.clean(p) + val cleanA = sc.clean(constructA) */ + val a = constructA(index) + iter.filter(t => p(t, a)) }, preservesPartitioning = true) } diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 4e1cc08fdb59..480b5e825281 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -94,7 +94,7 @@ class ClosureCleanerSuite extends FunSuite { expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithIndex(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithContext(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapWith(rdd) } - expectCorrectException { TestUserClosuresActuallyCleaned.testFilterWith(rdd) } + // expectCorrectException { TestUserClosuresActuallyCleaned.testFilterWith(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testForEachWith(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testMapWith(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions2(rdd) } From 56d7c92dde91b0d9eecf53d9833ca7ad6d1bfba2 Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 8 May 2015 14:26:26 -0700 Subject: [PATCH 11/12] Consolidate import of Random --- .../scala/org/apache/spark/util/ClosureCleanerSuite.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 480b5e825281..85cc5c6b856e 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.util import java.io.NotSerializableException +import java.util.Random import org.scalatest.FunSuite @@ -266,19 +267,15 @@ private object TestUserClosuresActuallyCleaned { rdd.mapPartitionsWithIndex { (_, it) => return; it }.count() } def testFlatMapWith(rdd: RDD[Int]): Unit = { - import java.util.Random rdd.flatMapWith ((index: Int) => new Random(index + 42)){ (_, it) => return; Seq() }.count() } def testMapWith(rdd: RDD[Int]): Unit = { - import java.util.Random rdd.mapWith ((index: Int) => new Random(index + 42)){ (_, it) => return; 0 }.count() } def testFilterWith(rdd: RDD[Int]): Unit = { - import java.util.Random rdd.filterWith ((index: Int) => new Random(index + 42)){ (_, it) => return; true }.count() } def testForEachWith(rdd: RDD[Int]): Unit = { - import java.util.Random rdd.foreachWith ((index: Int) => new Random(index + 42)){ (_, it) => return } } def testMapPartitionsWithContext(rdd: RDD[Int]): Unit = { From f83d44557551bda264655f8b7073bbf49722c26e Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 8 May 2015 15:50:20 -0700 Subject: [PATCH 12/12] Move cleaning outside of mapPartitionsWithIndex --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 8 ++++---- .../scala/org/apache/spark/util/ClosureCleanerSuite.scala | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 92d75f447098..8baf199f215f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -789,11 +789,11 @@ abstract class RDD[T: ClassTag]( */ @deprecated("use mapPartitionsWithIndex and filter", "1.0.0") def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = withScope { + val cleanP = sc.clean(p) + val cleanA = sc.clean(constructA) mapPartitionsWithIndex((index, iter) => { - /* val cleanP = sc.clean(p) - val cleanA = sc.clean(constructA) */ - val a = constructA(index) - iter.filter(t => p(t, a)) + val a = cleanA(index) + iter.filter(t => cleanP(t, a)) }, preservesPartitioning = true) } diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 85cc5c6b856e..e41f6ee27764 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -95,7 +95,7 @@ class ClosureCleanerSuite extends FunSuite { expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithIndex(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithContext(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapWith(rdd) } - // expectCorrectException { TestUserClosuresActuallyCleaned.testFilterWith(rdd) } + expectCorrectException { TestUserClosuresActuallyCleaned.testFilterWith(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testForEachWith(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testMapWith(rdd) } expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions2(rdd) }