@@ -381,6 +381,12 @@ class SingletonReplSuite extends SparkFunSuite {
381381 }
382382
383383 test(" SPARK-31399: should clone+clean line object w/ non-serializable state in ClosureCleaner" ) {
384+ // Test ClosureCleaner when a closure captures the enclosing `this` REPL line object, and that
385+ // object contains an unused non-serializable field.
386+ // Specifically, the closure in this test case contains a directly nested closure, and the
387+ // capture is triggered by the inner closure.
388+ // `ns` should be nulled out, but `topLevelValue` should stay intact.
389+
384390 // Can't use :paste mode because PipedOutputStream/PipedInputStream doesn't work well with the
385391 // EOT control character (i.e. Ctrl+D).
386392 // Just write things on a single line to emulate :paste mode.
@@ -402,7 +408,36 @@ class SingletonReplSuite extends SparkFunSuite {
402408 |}
403409 |val r = sc.parallelize(0 to 2).map(closure).collect
404410 """ .stripMargin)
405- assertContains(" r: Array[scala.collection.immutable.IndexedSeq[String]] = Array(Vector" , output)
411+ assertContains(" r: Array[scala.collection.immutable.IndexedSeq[String]] = " +
412+ " Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))" , output)
413+ assertDoesNotContain(" Exception" , output)
414+ }
415+
416+ test(" SPARK-31399: ClosureCleaner should discover indirectly nested closure in inner class" ) {
417+ // Similar to the previous test case, but with indirect closure nesting instead.
418+ // There's still nested closures involved, but the inner closure is indirectly nested in the
419+ // outer closure, with a level of inner class in between them.
420+ // This changes how the inner closure references/captures the outer closure/enclosing `this`
421+ // REPL line object, and covers a different code path in inner closure discovery.
422+
423+ // `ns` should be nulled out, but `topLevelValue` should stay intact.
424+
425+ val output = runInterpreter(
426+ """
427+ |class NotSerializableClass(val x: Int)
428+ |val ns = new NotSerializableClass(42); val topLevelValue = "someValue"; val closure =
429+ |(j: Int) => {
430+ | class InnerFoo {
431+ | val innerClosure = (x: Int) => (1 to x).map { y => y + topLevelValue }
432+ | }
433+ | val innerFoo = new InnerFoo
434+ | (1 to j).flatMap(innerFoo.innerClosure)
435+ |}
436+ |val r = sc.parallelize(0 to 2).map(closure).collect
437+ """ .stripMargin)
438+ assertContains(" r: Array[scala.collection.immutable.IndexedSeq[String]] = " +
439+ " Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))" , output)
440+ assertDoesNotContain(" Array(Vector(), Vector(1null), Vector(1null, 1null, 2null)" , output)
406441 assertDoesNotContain(" Exception" , output)
407442 }
408443
0 commit comments