Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ private[spark] object ClosureCleaner extends Logging {
cleanTransitively: Boolean,
accessedFields: Map[Class[_], Set[String]]): Unit = {

if (!isClosure(func.getClass)) {
logWarning("Expected a closure; got " + func.getClass.getName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pwendell Is this okay to log a warning?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to make this an assertion? Isn't it simply invalid if we call this on something that is not a closure?

return
}

// TODO: clean all inner closures first. This requires us to find the inner objects.
// TODO: cache outerClasses / innerClasses / accessedFields

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,8 @@ abstract class DStream[T: ClassTag] (
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false))
val cleanedF = context.sparkContext.clean(transformFunc, false)
transform((r: RDD[T], t: Time) => cleanedF(r))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
}

withStreamingContext(new StreamingContext(sparkConf, batchDuration)) { ssc =>
val receiver1 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
val receiver2 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
val receiver1 = new FakeReceiver(sendData = true)
val receiver2 = new FakeReceiver(sendData = true)
val receiverStream1 = ssc.receiverStream(receiver1)
val receiverStream2 = ssc.receiverStream(receiver2)
receiverStream1.register()
Expand Down