Skip to content

Commit 11eabbe

Browse files
tdaspwendell
authored andcommitted
[SPARK-1103] Automatic garbage collection of RDD, shuffle and broadcast data
This PR allows Spark to automatically cleanup metadata and data related to persisted RDDs, shuffles and broadcast variables when the corresponding RDDs, shuffles and broadcast variables fall out of scope from the driver program. This is still a work in progress as broadcast cleanup has not been implemented. **Implementation Details** A new class `ContextCleaner` is responsible cleaning all the state. It is instantiated as part of a `SparkContext`. RDD and ShuffleDependency classes have overridden `finalize()` function that gets called whenever their instances go out of scope. The `finalize()` function enqueues the object’s identifier (i.e. RDD ID, shuffle ID, etc.) with the `ContextCleaner`, which is a very short and cheap operation and should not significantly affect the garbage collection mechanism. The `ContextCleaner`, on a different thread, performs the cleanup, whose details are given below. *RDD cleanup:* `ContextCleaner` calls `RDD.unpersist()` is used to cleanup persisted RDDs. Regarding metadata, the DAGScheduler automatically cleans up all metadata related to a RDD after all jobs have completed. Only the `SparkContext.persistentRDDs` keeps strong references to persisted RDDs. The `TimeStampedHashMap` used for that has been replaced by `TimeStampedWeakValueHashMap` that keeps only weak references to the RDDs, allowing them to be garbage collected. *Shuffle cleanup:* New BlockManager message `RemoveShuffle(<shuffle ID>)` asks the `BlockManagerMaster` and currently active `BlockManager`s to delete all the disk blocks related to the shuffle ID. `ContextCleaner` cleans up shuffle data using this message and also cleans up the metadata in the `MapOutputTracker` of the driver. The `MapOutputTracker` at the workers, that caches the shuffle metadata, maintains a `BoundedHashMap` to limit the shuffle information it caches. Refetching the shuffle information from the driver is not too costly. *Broadcast cleanup:* To be done. [This PR](https://github.com/apache/incubator-spark/pull/543/) adds mechanism for explicit cleanup of broadcast variables. `Broadcast.finalize()` will enqueue its own ID with ContextCleaner and the PRs mechanism will be used to unpersist the Broadcast data. *Other cleanup:* `ShuffleMapTask` and `ResultTask` caches tasks and used TTL based cleanup (using `TimeStampedHashMap`), so nothing got cleaned up if TTL was not set. Instead, they now use `BoundedHashMap` to keep a limited number of map output information. Cost of repopulating the cache if necessary is very small. **Current state of implementation** Implemented RDD and shuffle cleanup. Things left to be done are. - Cleaning up for broadcast variable still to be done. - Automatic cleaning up keys with empty weak refs as values in `TimeStampedWeakValueHashMap` Author: Tathagata Das <[email protected]> Author: Andrew Or <[email protected]> Author: Roman Pastukhov <[email protected]> Closes apache#126 from tdas/state-cleanup and squashes the following commits: 61b8d6e [Tathagata Das] Fixed issue with Tachyon + new BlockManager methods. f489fdc [Tathagata Das] Merge remote-tracking branch 'apache/master' into state-cleanup d25a86e [Tathagata Das] Fixed stupid typo. cff023c [Tathagata Das] Fixed issues based on Andrew's comments. 4d05314 [Tathagata Das] Scala style fix. 2b95b5e [Tathagata Das] Added more documentation on Broadcast implementations, specially which blocks are told about to the driver. Also, fixed Broadcast API to hide destroy functionality. 41c9ece [Tathagata Das] Added more unit tests for BlockManager, DiskBlockManager, and ContextCleaner. 6222697 [Tathagata Das] Fixed bug and adding unit test for removeBroadcast in BlockManagerSuite. 104a89a [Tathagata Das] Fixed failing BroadcastSuite unit tests by introducing blocking for removeShuffle and removeBroadcast in BlockManager* a430f06 [Tathagata Das] Fixed compilation errors. b27f8e8 [Tathagata Das] Merge pull request #3 from andrewor14/cleanup cd72d19 [Andrew Or] Make automatic cleanup configurable (not documented) ada45f0 [Andrew Or] Merge branch 'state-cleanup' of github.com:tdas/spark into cleanup a2cc8bc [Tathagata Das] Merge remote-tracking branch 'apache/master' into state-cleanup c5b1d98 [Andrew Or] Address Patrick's comments a6460d4 [Andrew Or] Merge github.com:apache/spark into cleanup 762a4d8 [Tathagata Das] Merge pull request #1 from andrewor14/cleanup f0aabb1 [Andrew Or] Correct semantics for TimeStampedWeakValueHashMap + add tests 5016375 [Andrew Or] Address TD's comments 7ed72fb [Andrew Or] Fix style test fail + remove verbose test message regarding broadcast 634a097 [Andrew Or] Merge branch 'state-cleanup' of github.com:tdas/spark into cleanup 7edbc98 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into state-cleanup 8557c12 [Andrew Or] Merge github.com:apache/spark into cleanup e442246 [Andrew Or] Merge github.com:apache/spark into cleanup 88904a3 [Andrew Or] Make TimeStampedWeakValueHashMap a wrapper of TimeStampedHashMap fbfeec8 [Andrew Or] Add functionality to query executors for their local BlockStatuses 34f436f [Andrew Or] Generalize BroadcastBlockId to remove BroadcastHelperBlockId 0d17060 [Andrew Or] Import, comments, and style fixes (minor) c92e4d9 [Andrew Or] Merge github.com:apache/spark into cleanup f201a8d [Andrew Or] Test broadcast cleanup in ContextCleanerSuite + remove BoundedHashMap e95479c [Andrew Or] Add tests for unpersisting broadcast 544ac86 [Andrew Or] Clean up broadcast blocks through BlockManager* d0edef3 [Andrew Or] Add framework for broadcast cleanup ba52e00 [Andrew Or] Refactor broadcast classes c7ccef1 [Andrew Or] Merge branch 'bc-unpersist-merge' of github.com:ignatich/incubator-spark into cleanup 6c9dcf6 [Tathagata Das] Added missing Apache license d2f8b97 [Tathagata Das] Removed duplicate unpersistRDD. a007307 [Tathagata Das] Merge remote-tracking branch 'apache/master' into state-cleanup 620eca3 [Tathagata Das] Changes based on PR comments. f2881fd [Tathagata Das] Changed ContextCleaner to use ReferenceQueue instead of finalizer e1fba5f [Tathagata Das] Style fix 892b952 [Tathagata Das] Removed use of BoundedHashMap, and made BlockManagerSlaveActor cleanup shuffle metadata in MapOutputTrackerWorker. a7260d3 [Tathagata Das] Added try-catch in context cleaner and null value cleaning in TimeStampedWeakValueHashMap. e61daa0 [Tathagata Das] Modifications based on the comments on PR 126. ae9da88 [Tathagata Das] Removed unncessary TimeStampedHashMap from DAGScheduler, added try-catches in finalize() methods, and replaced ArrayBlockingQueue to LinkedBlockingQueue to avoid blocking in Java's finalizing thread. cb0a5a6 [Tathagata Das] Fixed docs and styles. a24fefc [Tathagata Das] Merge remote-tracking branch 'apache/master' into state-cleanup 8512612 [Tathagata Das] Changed TimeStampedHashMap to use WrappedJavaHashMap. e427a9e [Tathagata Das] Added ContextCleaner to automatically clean RDDs and shuffles when they fall out of scope. Also replaced TimeStampedHashMap to BoundedHashMaps and TimeStampedWeakValueHashMap for the necessary hashmap behavior. 80dd977 [Roman Pastukhov] Fix for Broadcast unpersist patch. 1e752f1 [Roman Pastukhov] Added unpersist method to Broadcast.
1 parent 0d0493f commit 11eabbe

40 files changed

+2571
-469
lines changed
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import java.lang.ref.{ReferenceQueue, WeakReference}
21+
22+
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
23+
24+
import org.apache.spark.broadcast.Broadcast
25+
import org.apache.spark.rdd.RDD
26+
27+
/**
28+
* Classes that represent cleaning tasks.
29+
*/
30+
private sealed trait CleanupTask
31+
private case class CleanRDD(rddId: Int) extends CleanupTask
32+
private case class CleanShuffle(shuffleId: Int) extends CleanupTask
33+
private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
34+
35+
/**
36+
* A WeakReference associated with a CleanupTask.
37+
*
38+
* When the referent object becomes only weakly reachable, the corresponding
39+
* CleanupTaskWeakReference is automatically added to the given reference queue.
40+
*/
41+
private class CleanupTaskWeakReference(
42+
val task: CleanupTask,
43+
referent: AnyRef,
44+
referenceQueue: ReferenceQueue[AnyRef])
45+
extends WeakReference(referent, referenceQueue)
46+
47+
/**
48+
* An asynchronous cleaner for RDD, shuffle, and broadcast state.
49+
*
50+
* This maintains a weak reference for each RDD, ShuffleDependency, and Broadcast of interest,
51+
* to be processed when the associated object goes out of scope of the application. Actual
52+
* cleanup is performed in a separate daemon thread.
53+
*/
54+
private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
55+
56+
private val referenceBuffer = new ArrayBuffer[CleanupTaskWeakReference]
57+
with SynchronizedBuffer[CleanupTaskWeakReference]
58+
59+
private val referenceQueue = new ReferenceQueue[AnyRef]
60+
61+
private val listeners = new ArrayBuffer[CleanerListener]
62+
with SynchronizedBuffer[CleanerListener]
63+
64+
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
65+
66+
/**
67+
* Whether the cleaning thread will block on cleanup tasks.
68+
* This is set to true only for tests.
69+
*/
70+
private val blockOnCleanupTasks = sc.conf.getBoolean(
71+
"spark.cleaner.referenceTracking.blocking", false)
72+
73+
@volatile private var stopped = false
74+
75+
/** Attach a listener object to get information of when objects are cleaned. */
76+
def attachListener(listener: CleanerListener) {
77+
listeners += listener
78+
}
79+
80+
/** Start the cleaner. */
81+
def start() {
82+
cleaningThread.setDaemon(true)
83+
cleaningThread.setName("Spark Context Cleaner")
84+
cleaningThread.start()
85+
}
86+
87+
/** Stop the cleaner. */
88+
def stop() {
89+
stopped = true
90+
}
91+
92+
/** Register a RDD for cleanup when it is garbage collected. */
93+
def registerRDDForCleanup(rdd: RDD[_]) {
94+
registerForCleanup(rdd, CleanRDD(rdd.id))
95+
}
96+
97+
/** Register a ShuffleDependency for cleanup when it is garbage collected. */
98+
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _]) {
99+
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
100+
}
101+
102+
/** Register a Broadcast for cleanup when it is garbage collected. */
103+
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]) {
104+
registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
105+
}
106+
107+
/** Register an object for cleanup. */
108+
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) {
109+
referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)
110+
}
111+
112+
/** Keep cleaning RDD, shuffle, and broadcast state. */
113+
private def keepCleaning() {
114+
while (!stopped) {
115+
try {
116+
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
117+
.map(_.asInstanceOf[CleanupTaskWeakReference])
118+
reference.map(_.task).foreach { task =>
119+
logDebug("Got cleaning task " + task)
120+
referenceBuffer -= reference.get
121+
task match {
122+
case CleanRDD(rddId) =>
123+
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
124+
case CleanShuffle(shuffleId) =>
125+
doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
126+
case CleanBroadcast(broadcastId) =>
127+
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
128+
}
129+
}
130+
} catch {
131+
case t: Throwable => logError("Error in cleaning thread", t)
132+
}
133+
}
134+
}
135+
136+
/** Perform RDD cleanup. */
137+
def doCleanupRDD(rddId: Int, blocking: Boolean) {
138+
try {
139+
logDebug("Cleaning RDD " + rddId)
140+
sc.unpersistRDD(rddId, blocking)
141+
listeners.foreach(_.rddCleaned(rddId))
142+
logInfo("Cleaned RDD " + rddId)
143+
} catch {
144+
case t: Throwable => logError("Error cleaning RDD " + rddId, t)
145+
}
146+
}
147+
148+
/** Perform shuffle cleanup, asynchronously. */
149+
def doCleanupShuffle(shuffleId: Int, blocking: Boolean) {
150+
try {
151+
logDebug("Cleaning shuffle " + shuffleId)
152+
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
153+
blockManagerMaster.removeShuffle(shuffleId, blocking)
154+
listeners.foreach(_.shuffleCleaned(shuffleId))
155+
logInfo("Cleaned shuffle " + shuffleId)
156+
} catch {
157+
case t: Throwable => logError("Error cleaning shuffle " + shuffleId, t)
158+
}
159+
}
160+
161+
/** Perform broadcast cleanup. */
162+
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
163+
try {
164+
logDebug("Cleaning broadcast " + broadcastId)
165+
broadcastManager.unbroadcast(broadcastId, true, blocking)
166+
listeners.foreach(_.broadcastCleaned(broadcastId))
167+
logInfo("Cleaned broadcast " + broadcastId)
168+
} catch {
169+
case t: Throwable => logError("Error cleaning broadcast " + broadcastId, t)
170+
}
171+
}
172+
173+
private def blockManagerMaster = sc.env.blockManager.master
174+
private def broadcastManager = sc.env.broadcastManager
175+
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
176+
177+
// Used for testing. These methods explicitly blocks until cleanup is completed
178+
// to ensure that more reliable testing.
179+
}
180+
181+
private object ContextCleaner {
182+
private val REF_QUEUE_POLL_TIMEOUT = 100
183+
}
184+
185+
/**
186+
* Listener class used for testing when any item has been cleaned by the Cleaner class.
187+
*/
188+
private[spark] trait CleanerListener {
189+
def rddCleaned(rddId: Int)
190+
def shuffleCleaned(shuffleId: Int)
191+
def broadcastCleaned(broadcastId: Long)
192+
}

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class ShuffleDependency[K, V](
5555
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
5656

5757
val shuffleId: Int = rdd.context.newShuffleId()
58+
59+
rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
5860
}
5961

6062

0 commit comments

Comments
 (0)