@@ -19,7 +19,7 @@ package org.apache.spark
1919
2020import java .lang .ref .WeakReference
2121
22- import scala .collection .mutable .{ArrayBuffer , HashSet , SynchronizedSet }
22+ import scala .collection .mutable .{HashSet , SynchronizedSet }
2323import scala .util .Random
2424
2525import org .scalatest .{BeforeAndAfter , FunSuite }
@@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar._
2929
3030import org .apache .spark .SparkContext ._
3131import org .apache .spark .rdd .RDD
32- import org .apache .spark .storage .{RDDBlockId , ShuffleBlockId }
32+ import org .apache .spark .storage .{BroadcastBlockId , RDDBlockId , ShuffleBlockId }
3333
3434class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
3535
@@ -46,9 +46,9 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
4646
4747 // Explicit cleanup
4848 cleaner.cleanupRDD(rdd)
49- tester.assertCleanup
49+ tester.assertCleanup()
5050
51- // verify that RDDs can be re-executed after cleaning up
51+ // Verify that RDDs can be re-executed after cleaning up
5252 assert(rdd.collect().toList === collected)
5353 }
5454
@@ -59,87 +59,101 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
5959
6060 // Explicit cleanup
6161 shuffleDeps.foreach(s => cleaner.cleanupShuffle(s))
62- tester.assertCleanup
62+ tester.assertCleanup()
6363
6464 // Verify that shuffles can be re-executed after cleaning up
6565 assert(rdd.collect().toList === collected)
6666 }
6767
68+ test(" cleanup broadcast" ) {
69+ val broadcast = newBroadcast
70+ val tester = new CleanerTester (sc, broadcastIds = Seq (broadcast.id))
71+
72+ // Explicit cleanup
73+ cleaner.cleanupBroadcast(broadcast)
74+ tester.assertCleanup()
75+ }
76+
6877 test(" automatically cleanup RDD" ) {
6978 var rdd = newRDD.persist()
7079 rdd.count()
7180
72- // test that GC does not cause RDD cleanup due to a strong reference
81+ // Test that GC does not cause RDD cleanup due to a strong reference
7382 val preGCTester = new CleanerTester (sc, rddIds = Seq (rdd.id))
7483 runGC()
7584 intercept[Exception ] {
76- preGCTester.assertCleanup(timeout(1000 millis))
85+ preGCTester.assertCleanup()( timeout(1000 millis))
7786 }
7887
79- // test that GC causes RDD cleanup after dereferencing the RDD
88+ // Test that GC causes RDD cleanup after dereferencing the RDD
8089 val postGCTester = new CleanerTester (sc, rddIds = Seq (rdd.id))
81- rdd = null // make RDD out of scope
90+ rdd = null // Make RDD out of scope
8291 runGC()
83- postGCTester.assertCleanup
92+ postGCTester.assertCleanup()
8493 }
8594
8695 test(" automatically cleanup shuffle" ) {
8796 var rdd = newShuffleRDD
8897 rdd.count()
8998
90- // test that GC does not cause shuffle cleanup due to a strong reference
91- val preGCTester = new CleanerTester (sc, shuffleIds = Seq (0 ))
99+ // Test that GC does not cause shuffle cleanup due to a strong reference
100+ val preGCTester = new CleanerTester (sc, shuffleIds = Seq (0 ))
92101 runGC()
93102 intercept[Exception ] {
94- preGCTester.assertCleanup(timeout(1000 millis))
103+ preGCTester.assertCleanup()( timeout(1000 millis))
95104 }
96105
97- // test that GC causes shuffle cleanup after dereferencing the RDD
106+ // Test that GC causes shuffle cleanup after dereferencing the RDD
98107 val postGCTester = new CleanerTester (sc, shuffleIds = Seq (0 ))
99- rdd = null // make RDD out of scope, so that corresponding shuffle goes out of scope
108+ rdd = null // Make RDD out of scope, so that corresponding shuffle goes out of scope
100109 runGC()
101- postGCTester.assertCleanup
110+ postGCTester.assertCleanup()
102111 }
103112
104- test(" automatically cleanup RDD + shuffle" ) {
113+ test(" automatically cleanup broadcast" ) {
114+ var broadcast = newBroadcast
105115
106- def randomRDD : RDD [_] = {
107- val rdd : RDD [_] = Random .nextInt(3 ) match {
108- case 0 => newRDD
109- case 1 => newShuffleRDD
110- case 2 => newPairRDD.join(newPairRDD)
111- }
112- if (Random .nextBoolean()) rdd.persist()
113- rdd.count()
114- rdd
116+ // Test that GC does not cause broadcast cleanup due to a strong reference
117+ val preGCTester = new CleanerTester (sc, broadcastIds = Seq (broadcast.id))
118+ runGC()
119+ intercept[Exception ] {
120+ preGCTester.assertCleanup()(timeout(1000 millis))
115121 }
116122
117- val buffer = new ArrayBuffer [RDD [_]]
118- for (i <- 1 to 500 ) {
119- buffer += randomRDD
120- }
123+ // Test that GC causes broadcast cleanup after dereferencing the broadcast variable
124+ val postGCTester = new CleanerTester (sc, broadcastIds = Seq (broadcast.id))
125+ broadcast = null // Make broadcast variable out of scope
126+ runGC()
127+ postGCTester.assertCleanup()
128+ }
121129
130+ test(" automatically cleanup RDD + shuffle + broadcast" ) {
131+ val numRdds = 100
132+ val numBroadcasts = 4 // Broadcasts are more costly
133+ val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer
134+ val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast).toBuffer
122135 val rddIds = sc.persistentRdds.keys.toSeq
123136 val shuffleIds = 0 until sc.newShuffleId
137+ val broadcastIds = 0L until numBroadcasts
124138
125- val preGCTester = new CleanerTester (sc, rddIds, shuffleIds)
139+ val preGCTester = new CleanerTester (sc, rddIds, shuffleIds, broadcastIds )
126140 runGC()
127141 intercept[Exception ] {
128- preGCTester.assertCleanup(timeout(1000 millis))
142+ preGCTester.assertCleanup()( timeout(1000 millis))
129143 }
130- // test that GC causes shuffle cleanup after dereferencing the RDD
131- val postGCTester = new CleanerTester (sc, rddIds, shuffleIds)
132- buffer.clear()
144+
145+ // Test that GC triggers the cleanup of all variables after the dereferencing them
146+ val postGCTester = new CleanerTester (sc, rddIds, shuffleIds, broadcastIds)
147+ broadcastBuffer.clear()
148+ rddBuffer.clear()
133149 runGC()
134- postGCTester.assertCleanup
150+ postGCTester.assertCleanup()
135151 }
136152
137153 def newRDD = sc.makeRDD(1 to 10 )
138-
139154 def newPairRDD = newRDD.map(_ -> 1 )
140-
141155 def newShuffleRDD = newPairRDD.reduceByKey(_ + _)
142-
156+ def newBroadcast = sc.broadcast( 1 to 100 )
143157 def newRDDWithShuffleDependencies : (RDD [_], Seq [ShuffleDependency [_, _]]) = {
144158 def getAllDependencies (rdd : RDD [_]): Seq [Dependency [_]] = {
145159 rdd.dependencies ++ rdd.dependencies.flatMap { dep =>
@@ -149,11 +163,27 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
149163 val rdd = newShuffleRDD
150164
151165 // Get all the shuffle dependencies
152- val shuffleDeps = getAllDependencies(rdd).filter(_.isInstanceOf [ShuffleDependency [_, _]])
166+ val shuffleDeps = getAllDependencies(rdd)
167+ .filter(_.isInstanceOf [ShuffleDependency [_, _]])
153168 .map(_.asInstanceOf [ShuffleDependency [_, _]])
154169 (rdd, shuffleDeps)
155170 }
156171
172+ def randomRdd = {
173+ val rdd : RDD [_] = Random .nextInt(3 ) match {
174+ case 0 => newRDD
175+ case 1 => newShuffleRDD
176+ case 2 => newPairRDD.join(newPairRDD)
177+ }
178+ if (Random .nextBoolean()) rdd.persist()
179+ rdd.count()
180+ rdd
181+ }
182+
183+ def randomBroadcast = {
184+ sc.broadcast(Random .nextInt(Int .MaxValue ))
185+ }
186+
157187 /** Run GC and make sure it actually has run */
158188 def runGC () {
159189 val weakRef = new WeakReference (new Object ())
@@ -208,7 +238,7 @@ class CleanerTester(
208238 sc.cleaner.attachListener(cleanerListener)
209239
210240 /** Assert that all the stuff has been cleaned up */
211- def assertCleanup (implicit waitTimeout : Eventually .Timeout ) {
241+ def assertCleanup ()( implicit waitTimeout : Eventually .Timeout ) {
212242 try {
213243 eventually(waitTimeout, interval(10 millis)) {
214244 assert(isAllCleanedUp)
@@ -222,7 +252,7 @@ class CleanerTester(
222252
223253 /** Verify that RDDs, shuffles, etc. occupy resources */
224254 private def preCleanupValidate () {
225- assert(rddIds.nonEmpty || shuffleIds.nonEmpty, " Nothing to cleanup" )
255+ assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty , " Nothing to cleanup" )
226256
227257 // Verify the RDDs have been persisted and blocks are present
228258 assert(rddIds.forall(sc.persistentRdds.contains),
@@ -233,8 +263,12 @@ class CleanerTester(
233263 // Verify the shuffle ids are registered and blocks are present
234264 assert(shuffleIds.forall(mapOutputTrackerMaster.containsShuffle),
235265 " One or more shuffles have not been registered cannot start cleaner test" )
236- assert(shuffleIds.forall(shuffleId => diskBlockManager.containsBlock(shuffleBlockId(shuffleId ))),
266+ assert(shuffleIds.forall(sid => diskBlockManager.containsBlock(shuffleBlockId(sid ))),
237267 " One or more shuffles' blocks cannot be found in disk manager, cannot start cleaner test" )
268+
269+ // Verify that the broadcast is in the driver's block manager
270+ assert(broadcastIds.forall(bid => blockManager.getLevel(broadcastBlockId(bid)).isDefined),
271+ " One ore more broadcasts have not been persisted in the driver's block manager" )
238272 }
239273
240274 /**
@@ -247,14 +281,19 @@ class CleanerTester(
247281 attempts += 1
248282 logInfo(" Attempt: " + attempts)
249283 try {
250- // Verify all the RDDs have been unpersisted
284+ // Verify all RDDs have been unpersisted
251285 assert(rddIds.forall(! sc.persistentRdds.contains(_)))
252286 assert(rddIds.forall(rddId => ! blockManager.master.contains(rddBlockId(rddId))))
253287
254- // Verify all the shuffle have been deregistered and cleaned up
288+ // Verify all shuffles have been deregistered and cleaned up
255289 assert(shuffleIds.forall(! mapOutputTrackerMaster.containsShuffle(_)))
256- assert(shuffleIds.forall(shuffleId =>
257- ! diskBlockManager.containsBlock(shuffleBlockId(shuffleId))))
290+ assert(shuffleIds.forall(sid => ! diskBlockManager.containsBlock(shuffleBlockId(sid))))
291+
292+ // Verify all broadcasts have been unpersisted
293+ assert(broadcastIds.forall { bid =>
294+ blockManager.master.askForStorageLevels(broadcastBlockId(bid)).isEmpty
295+ })
296+
258297 return
259298 } catch {
260299 case t : Throwable =>
@@ -271,18 +310,20 @@ class CleanerTester(
271310 s """
272311 |\tRDDs = ${toBeCleanedRDDIds.mkString(" [" , " , " , " ]" )}
273312 |\tShuffles = ${toBeCleanedShuffleIds.mkString(" [" , " , " , " ]" )}
313+ |\tBroadcasts = ${toBeCleanedBroadcstIds.mkString(" [" , " , " , " ]" )}
274314 """ .stripMargin
275315 }
276316
277- private def isAllCleanedUp = toBeCleanedRDDIds.isEmpty && toBeCleanedShuffleIds.isEmpty
278-
279- private def shuffleBlockId (shuffleId : Int ) = ShuffleBlockId (shuffleId, 0 , 0 )
317+ private def isAllCleanedUp =
318+ toBeCleanedRDDIds.isEmpty &&
319+ toBeCleanedShuffleIds.isEmpty &&
320+ toBeCleanedBroadcstIds.isEmpty
280321
281322 private def rddBlockId (rddId : Int ) = RDDBlockId (rddId, 0 )
323+ private def shuffleBlockId (shuffleId : Int ) = ShuffleBlockId (shuffleId, 0 , 0 )
324+ private def broadcastBlockId (broadcastId : Long ) = BroadcastBlockId (broadcastId)
282325
283326 private def blockManager = sc.env.blockManager
284-
285327 private def diskBlockManager = blockManager.diskBlockManager
286-
287328 private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf [MapOutputTrackerMaster ]
288- }
329+ }
0 commit comments