Skip to content

Commit c5d1644

Browse files
committed
Address Simon (viirya)'s comments
1 parent 9301485 commit c5d1644

File tree

1 file changed

+26
-12
lines changed

1 file changed

+26
-12
lines changed

core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -453,13 +453,13 @@ private[spark] object IndylambdaScalaClosures extends Logging {
453453
val maybeClosureClass = maybeClosure.getClass
454454

455455
// shortcut the fast check:
456-
// indylambda closure classes are generated by Java's LambdaMetafactory, and they're always
457-
// synthetic.
458-
if (!maybeClosureClass.isSynthetic) return None
456+
// 1. indylambda closure classes are generated by Java's LambdaMetafactory, and they're always
457+
// synthetic.
458+
// 2. We only care about Serializable closures, so let's check that as well
459+
if (!maybeClosureClass.isSynthetic || !maybeClosure.isInstanceOf[Serializable]) return None
459460

460461
val implementedInterfaces = ClassUtils.getAllInterfaces(maybeClosureClass).asScala
461-
val isClosureCandidate = implementedInterfaces.exists(_.getName == "scala.Serializable") &&
462-
implementedInterfaces.exists(_.getName.startsWith("scala.Function"))
462+
val isClosureCandidate = implementedInterfaces.exists(_.getName.startsWith("scala.Function"))
463463

464464
if (isClosureCandidate) {
465465
try {
@@ -480,7 +480,6 @@ private[spark] object IndylambdaScalaClosures extends Logging {
480480
def isIndylambdaScalaClosure(lambdaProxy: SerializedLambda): Boolean = {
481481
lambdaProxy.getImplMethodKind == MethodHandleInfo.REF_invokeStatic &&
482482
lambdaProxy.getImplMethodName.contains("$anonfun$")
483-
// && implements a scala.runtime.java8 functional interface
484483
}
485484

486485
def inspect(closure: AnyRef): SerializedLambda = {
@@ -514,18 +513,24 @@ private[spark] object IndylambdaScalaClosures extends Logging {
514513
val implMethodId = MethodIdentifier(
515514
implClass, lambdaProxy.getImplMethodName, lambdaProxy.getImplMethodSignature)
516515

517-
val visited = Set[MethodIdentifier[_]](implMethodId)
516+
val visited = Set.empty[MethodIdentifier[_]]
518517
val stack = Stack[MethodIdentifier[_]](implMethodId)
519518
while (!stack.isEmpty) {
520519
val currentId = stack.pop
520+
visited += currentId
521+
521522
val currentMethodNode = methodsByName(currentId)
522523
logTrace(s" scanning $currentId")
523524
currentMethodNode.accept(new MethodVisitor(ASM7) {
525+
// FIXME: record self class name, get a Class[_] for it
526+
// val selfClassName: String =
527+
// val selfClass: Class[_] =
528+
524529
override def visitFieldInsn(op: Int, owner: String, name: String, desc: String): Unit = {
525530
if (op == GETFIELD || op == PUTFIELD) {
526531
val ownerExternalName = owner.replace('/', '.')
527532
for (cl <- accessedFields.keys if cl.getName == ownerExternalName) {
528-
logTrace(s" found field access $name on $owner")
533+
logTrace(s" found field access $name on $ownerExternalName")
529534
accessedFields(cl) += name
530535
}
531536
}
@@ -534,9 +539,14 @@ private[spark] object IndylambdaScalaClosures extends Logging {
534539
override def visitMethodInsn(
535540
op: Int, owner: String, name: String, desc: String, itf: Boolean): Unit = {
536541
if (owner == implClassInternalName) {
537-
logTrace(s" found intra class call to $owner.$name$desc")
538-
stack.push(MethodIdentifier(implClass, name, desc))
542+
val ownerExternalName = owner.replace('/', '.')
543+
logTrace(s" found intra class call to $ownerExternalName.$name$desc")
544+
val calleeMethodId = MethodIdentifier(implClass, name, desc)
545+
if (!visited.contains(calleeMethodId)) {
546+
stack.push(calleeMethodId)
547+
}
539548
} else {
549+
// FIXME: implement findTransitively
540550
// keep the same behavior as the original ClosureCleaner
541551
logTrace(s" ignoring call to $owner.$name$desc")
542552
}
@@ -560,7 +570,11 @@ private[spark] object IndylambdaScalaClosures extends Logging {
560570
targetHandle.getDesc.startsWith(s"(L$implClassInternalName;")) {
561571
// this is a lexically nested closure that also captures the enclosing `this`
562572
logDebug(s" found inner closure $targetHandle")
563-
stack.push(MethodIdentifier(implClass, targetHandle.getName, targetHandle.getDesc))
573+
val calleeMethodId =
574+
MethodIdentifier(implClass, targetHandle.getName, targetHandle.getDesc)
575+
if (!visited.contains(calleeMethodId)) {
576+
stack.push(calleeMethodId)
577+
}
564578
}
565579
}
566580
}
@@ -577,7 +591,7 @@ private class ReturnStatementFinder(targetMethodName: Option[String] = None)
577591
override def visitMethod(access: Int, name: String, desc: String,
578592
sig: String, exceptions: Array[String]): MethodVisitor = {
579593

580-
// $anonfun$ covers Java 8 lambdas
594+
// $anonfun$ covers indylambda closures
581595
if (name.contains("apply") || name.contains("$anonfun$")) {
582596
// A method with suffix "$adapted" will be generated in cases like
583597
// { _:Int => return; Seq()} but not { _:Int => return; true}

0 commit comments

Comments
 (0)