From a4fa7686f6bb81c43918cdca5511507aa75b8d39 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 1 May 2015 22:17:38 -0700 Subject: [PATCH 1/4] Clean the closure, not the RDD --- .../scala/org/apache/spark/streaming/dstream/DStream.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 83d41f576244..f1f8a7065599 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -553,7 +553,8 @@ abstract class DStream[T: ClassTag] ( // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean - transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false)) + val cleanedF = context.sparkContext.clean(transformFunc, false) + transform((r: RDD[T], t: Time) => cleanedF(r)) } /** From 67eeff427380d4f68ba1a3b115d5cc8cf83afc67 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 1 May 2015 22:25:28 -0700 Subject: [PATCH 2/4] Add tests --- .../scala/org/apache/spark/util/ClosureCleaner.scala | 4 ++++ .../org/apache/spark/util/ClosureCleanerSuite.scala | 12 ++++++++++++ 2 files changed, 16 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index e3f52f6ff1e6..5d60cbdd5cf2 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -102,6 +102,10 @@ private[spark] object ClosureCleaner extends Logging { } def clean(func: AnyRef, checkSerializable: Boolean = true) { + if (!isClosure(func.getClass)) { + throw new IllegalArgumentException("Expected a closure; got " + func.getClass.getName) + } + // TODO: cache outerClasses / innerClasses / accessedFields val outerClasses = getOuterClasses(func) val innerClasses = getInnerClasses(func) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index c47162779bbb..65b1d77883af 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -63,6 +63,18 @@ class ClosureCleanerSuite extends FunSuite { val result = TestObjectWithNestedReturns.run() assert(result == 1) } + + test("should clean only closures") { + withSpark(new SparkContext("local", "test")) { sc => + val rdd = sc.makeRDD(1 to 100) + intercept[IllegalArgumentException] { sc.clean(1) } + intercept[IllegalArgumentException] { sc.clean("not a closure") } + intercept[IllegalArgumentException] { sc.clean(rdd) } + sc.clean(() => 1) + sc.clean(() => "part of a closure") + sc.clean(() => rdd) + } + } } // A non-serializable class we create in closures to make sure that we aren't From 5ee4e252d1054f42ee0fc128fc4850dd3d3894e3 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 2 May 2015 17:27:27 -0700 Subject: [PATCH 3/4] Fix tests --- .../scala/org/apache/spark/util/ClosureCleanerSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index ca707e760abe..497512d1d4d3 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -67,10 +67,10 @@ class ClosureCleanerSuite extends FunSuite { test("should clean only closures") { withSpark(new SparkContext("local", "test")) { sc => val rdd = sc.makeRDD(1 to 100) - intercept[IllegalArgumentException] { sc.clean(1) } + intercept[IllegalArgumentException] { sc.clean(new Integer(1)) } intercept[IllegalArgumentException] { sc.clean("not a closure") } intercept[IllegalArgumentException] { sc.clean(rdd) } - sc.clean(() => 1) + sc.clean(() => new Integer(1)) sc.clean(() => "part of a closure") sc.clean(() => rdd) } From 8e971d738fae2d3238f5452f6c015bc4365e3d0c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 2 May 2015 19:31:37 -0700 Subject: [PATCH 4/4] Do not throw exception if object to clean is not closure This breaks a valid use case where the user code passes in a case class into `map`. See ml.NormalizerSuite. --- .../scala/org/apache/spark/util/ClosureCleaner.scala | 3 ++- .../org/apache/spark/util/ClosureCleanerSuite.scala | 12 ------------ .../org/apache/spark/streaming/ReceiverSuite.scala | 4 ++-- 3 files changed, 4 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 325958e76409..cd4fbd427fc7 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -180,7 +180,8 @@ private[spark] object ClosureCleaner extends Logging { accessedFields: Map[Class[_], Set[String]]): Unit = { if (!isClosure(func.getClass)) { - throw new IllegalArgumentException("Expected a closure; got " + func.getClass.getName) + logWarning("Expected a closure; got " + func.getClass.getName) + return } // TODO: clean all inner closures first. This requires us to find the inner objects. diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 497512d1d4d3..ff1bfe0774a2 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -63,18 +63,6 @@ class ClosureCleanerSuite extends FunSuite { val result = TestObjectWithNestedReturns.run() assert(result === 1) } - - test("should clean only closures") { - withSpark(new SparkContext("local", "test")) { sc => - val rdd = sc.makeRDD(1 to 100) - intercept[IllegalArgumentException] { sc.clean(new Integer(1)) } - intercept[IllegalArgumentException] { sc.clean("not a closure") } - intercept[IllegalArgumentException] { sc.clean(rdd) } - sc.clean(() => new Integer(1)) - sc.clean(() => "part of a closure") - sc.clean(() => rdd) - } - } } // A non-serializable class we create in closures to make sure that we aren't diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 393a360cfe15..5d7127627eea 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -256,8 +256,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { } withStreamingContext(new StreamingContext(sparkConf, batchDuration)) { ssc => - val receiver1 = ssc.sparkContext.clean(new FakeReceiver(sendData = true)) - val receiver2 = ssc.sparkContext.clean(new FakeReceiver(sendData = true)) + val receiver1 = new FakeReceiver(sendData = true) + val receiver2 = new FakeReceiver(sendData = true) val receiverStream1 = ssc.receiverStream(receiver1) val receiverStream2 = ssc.receiverStream(receiver2) receiverStream1.register()