@@ -20,7 +20,7 @@ package org.apache.spark
2020import java .io ._
2121import java .util .zip .{GZIPInputStream , GZIPOutputStream }
2222
23- import scala .collection .mutable .{HashSet , Map }
23+ import scala .collection .mutable .{HashSet , HashMap , Map }
2424import scala .concurrent .Await
2525
2626import akka .actor ._
@@ -34,6 +34,7 @@ private[spark] case class GetMapOutputStatuses(shuffleId: Int)
3434 extends MapOutputTrackerMessage
3535private [spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
3636
37+ /** Actor class for MapOutputTrackerMaster */
3738private [spark] class MapOutputTrackerMasterActor (tracker : MapOutputTrackerMaster )
3839 extends Actor with Logging {
3940 def receive = {
@@ -50,28 +51,35 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
5051}
5152
5253/**
53- * Class that keeps track of the location of the location of the map output of
54+ * Class that keeps track of the location of the map output of
5455 * a stage. This is abstract because different versions of MapOutputTracker
5556 * (driver and worker) use different HashMap to store its metadata.
5657 */
5758private [spark] abstract class MapOutputTracker (conf : SparkConf ) extends Logging {
5859
5960 private val timeout = AkkaUtils .askTimeout(conf)
6061
61- // Set to the MapOutputTrackerActor living on the driver
62+ /** Set to the MapOutputTrackerActor living on the driver */
6263 var trackerActor : ActorRef = _
6364
6465 /** This HashMap needs to have different storage behavior for driver and worker */
6566 protected val mapStatuses : Map [Int , Array [MapStatus ]]
6667
67- // Incremented every time a fetch fails so that client nodes know to clear
68- // their cache of map output locations if this happens.
68+ /**
69+ * Incremented every time a fetch fails so that client nodes know to clear
70+ * their cache of map output locations if this happens.
71+ */
6972 protected var epoch : Long = 0
7073 protected val epochLock = new java.lang.Object
7174
72- // Send a message to the trackerActor and get its result within a default timeout, or
73- // throw a SparkException if this fails.
74- private def askTracker (message : Any ): Any = {
75+ /** Remembers which map output locations are currently being fetched on a worker */
76+ private val fetching = new HashSet [Int ]
77+
78+ /**
79+ * Send a message to the trackerActor and get its result within a default timeout, or
80+ * throw a SparkException if this fails.
81+ */
82+ protected def askTracker (message : Any ): Any = {
7583 try {
7684 val future = trackerActor.ask(message)(timeout)
7785 Await .result(future, timeout)
@@ -81,17 +89,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
8189 }
8290 }
8391
84- // Send a one-way message to the trackerActor, to which we expect it to reply with true.
85- private def communicate (message : Any ) {
92+ /** Send a one-way message to the trackerActor, to which we expect it to reply with true. */
93+ protected def sendTracker (message : Any ) {
8694 if (askTracker(message) != true ) {
8795 throw new SparkException (" Error reply received from MapOutputTracker" )
8896 }
8997 }
9098
91- // Remembers which map output locations are currently being fetched on a worker
92- private val fetching = new HashSet [ Int ]
93-
94- // Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle
99+ /**
100+ * Called from executors to get the server URIs and
101+ * output sizes of the map outputs of a given shuffle
102+ */
95103 def getServerStatuses (shuffleId : Int , reduceId : Int ): Array [(BlockManagerId , Long )] = {
96104 val statuses = mapStatuses.get(shuffleId).orNull
97105 if (statuses == null ) {
@@ -150,22 +158,18 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
150158 }
151159 }
152160
153- def stop () {
154- communicate(StopMapOutputTracker )
155- mapStatuses.clear()
156- trackerActor = null
157- }
158-
159- // Called to get current epoch number
161+ /** Called to get current epoch number */
160162 def getEpoch : Long = {
161163 epochLock.synchronized {
162164 return epoch
163165 }
164166 }
165167
166- // Called on workers to update the epoch number, potentially clearing old outputs
167- // because of a fetch failure. (Each worker task calls this with the latest epoch
168- // number on the master at the time it was created.)
168+ /**
169+ * Called from executors to update the epoch number, potentially clearing old outputs
170+ * because of a fetch failure. Each worker task calls this with the latest epoch
171+ * number on the master at the time it was created.
172+ */
169173 def updateEpoch (newEpoch : Long ) {
170174 epochLock.synchronized {
171175 if (newEpoch > epoch) {
@@ -175,24 +179,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
175179 }
176180 }
177181 }
178- }
179182
180- /**
181- * MapOutputTracker for the workers. This uses BoundedHashMap to keep track of
182- * a limited number of most recently used map output information.
183- */
184- private [spark] class MapOutputTrackerWorker (conf : SparkConf ) extends MapOutputTracker (conf) {
183+ /** Unregister shuffle data */
184+ def unregisterShuffle (shuffleId : Int ) {
185+ mapStatuses.remove(shuffleId)
186+ }
185187
186- /**
187- * Bounded HashMap for storing serialized statuses in the worker. This allows
188- * the HashMap stay bounded in memory-usage. Things dropped from this HashMap will be
189- * automatically repopulated by fetching them again from the driver. Its okay to
190- * keep the cache size small as it unlikely that there will be a very large number of
191- * stages active simultaneously in the worker.
192- */
193- protected val mapStatuses = new BoundedHashMap [Int , Array [MapStatus ]](
194- conf.getInt(" spark.mapOutputTracker.cacheSize" , 100 ), true
195- )
188+ def stop () {
189+ sendTracker(StopMapOutputTracker )
190+ mapStatuses.clear()
191+ trackerActor = null
192+ }
196193}
197194
198195/**
@@ -202,7 +199,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
202199private [spark] class MapOutputTrackerMaster (conf : SparkConf )
203200 extends MapOutputTracker (conf) {
204201
205- // Cache a serialized version of the output statuses for each shuffle to send them out faster
202+ /** Cache a serialized version of the output statuses for each shuffle to send them out faster */
206203 private var cacheEpoch = epoch
207204
208205 /**
@@ -211,7 +208,6 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
211208 * by TTL-based cleaning (if set). Other than these two
212209 * scenarios, nothing should be dropped from this HashMap.
213210 */
214-
215211 protected val mapStatuses = new TimeStampedHashMap [Int , Array [MapStatus ]]()
216212 private val cachedSerializedStatuses = new TimeStampedHashMap [Int , Array [Byte ]]()
217213
@@ -232,13 +228,15 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
232228 }
233229 }
234230
231+ /** Register multiple map output information for the given shuffle */
235232 def registerMapOutputs (shuffleId : Int , statuses : Array [MapStatus ], changeEpoch : Boolean = false ) {
236233 mapStatuses.put(shuffleId, Array [MapStatus ]() ++ statuses)
237234 if (changeEpoch) {
238235 incrementEpoch()
239236 }
240237 }
241238
239+ /** Unregister map output information of the given shuffle, mapper and block manager */
242240 def unregisterMapOutput (shuffleId : Int , mapId : Int , bmAddress : BlockManagerId ) {
243241 val arrayOpt = mapStatuses.get(shuffleId)
244242 if (arrayOpt.isDefined && arrayOpt.get != null ) {
@@ -254,11 +252,17 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
254252 }
255253 }
256254
257- def unregisterShuffle (shuffleId : Int ) {
255+ /** Unregister shuffle data */
256+ override def unregisterShuffle (shuffleId : Int ) {
258257 mapStatuses.remove(shuffleId)
259258 cachedSerializedStatuses.remove(shuffleId)
260259 }
261260
261+ /** Check if the given shuffle is being tracked */
262+ def containsShuffle (shuffleId : Int ): Boolean = {
263+ cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
264+ }
265+
262266 def incrementEpoch () {
263267 epochLock.synchronized {
264268 epoch += 1
@@ -295,26 +299,26 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
295299 bytes
296300 }
297301
298- def contains (shuffleId : Int ): Boolean = {
299- cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
300- }
301-
302302 override def stop () {
303303 super .stop()
304304 metadataCleaner.cancel()
305305 cachedSerializedStatuses.clear()
306306 }
307307
308- override def updateEpoch (newEpoch : Long ) {
309- // This might be called on the MapOutputTrackerMaster if we're running in local mode.
310- }
311-
312308 protected def cleanup (cleanupTime : Long ) {
313309 mapStatuses.clearOldValues(cleanupTime)
314310 cachedSerializedStatuses.clearOldValues(cleanupTime)
315311 }
316312}
317313
314+ /**
315+ * MapOutputTracker for the workers, which fetches map output information from the driver's
316+ * MapOutputTrackerMaster.
317+ */
318+ private [spark] class MapOutputTrackerWorker (conf : SparkConf ) extends MapOutputTracker (conf) {
319+ protected val mapStatuses = new HashMap [Int , Array [MapStatus ]]
320+ }
321+
318322private [spark] object MapOutputTracker {
319323 private val LOG_BASE = 1.1
320324
0 commit comments