Skip to content

Commit cb0a5a6

Browse files
committed
Fixed docs and styles.
1 parent a24fefc commit cb0a5a6

File tree

4 files changed

+23
-9
lines changed

4 files changed

+23
-9
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ private[spark] trait CleanerListener {
3030
}
3131

3232
/**
33-
* Cleans RDDs and shuffle data. This should be instantiated only on the driver.
33+
* Cleans RDDs and shuffle data.
3434
*/
3535
private[spark] class ContextCleaner(env: SparkEnv) extends Logging {
3636

@@ -62,12 +62,13 @@ private[spark] class ContextCleaner(env: SparkEnv) extends Logging {
6262
cleaningThread.interrupt()
6363
}
6464

65-
/** Clean all data and metadata related to a RDD, including shuffle files and metadata */
65+
/** Clean (unpersist) RDD data. */
6666
def cleanRDD(rdd: RDD[_]) {
6767
enqueue(CleanRDD(rdd.sparkContext, rdd.id))
6868
logDebug("Enqueued RDD " + rdd + " for cleaning up")
6969
}
7070

71+
/** Clean shuffle data. */
7172
def cleanShuffle(shuffleId: Int) {
7273
enqueue(CleanShuffle(shuffleId))
7374
logDebug("Enqueued shuffle " + shuffleId + " for cleaning up")
@@ -102,16 +103,16 @@ private[spark] class ContextCleaner(env: SparkEnv) extends Logging {
102103

103104
/** Perform RDD cleaning */
104105
private def doCleanRDD(sc: SparkContext, rddId: Int) {
105-
logDebug("Cleaning rdd "+ rddId)
106+
logDebug("Cleaning rdd " + rddId)
106107
sc.env.blockManager.master.removeRdd(rddId, false)
107108
sc.persistentRdds.remove(rddId)
108109
listeners.foreach(_.rddCleaned(rddId))
109-
logInfo("Cleaned rdd "+ rddId)
110+
logInfo("Cleaned rdd " + rddId)
110111
}
111112

112113
/** Perform shuffle cleaning */
113114
private def doCleanShuffle(shuffleId: Int) {
114-
logDebug("Cleaning shuffle "+ shuffleId)
115+
logDebug("Cleaning shuffle " + shuffleId)
115116
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
116117
blockManager.master.removeShuffle(shuffleId)
117118
listeners.foreach(_.shuffleCleaned(shuffleId))
@@ -123,4 +124,4 @@ private[spark] class ContextCleaner(env: SparkEnv) extends Logging {
123124
private def blockManager = env.blockManager
124125

125126
private def isStopped = synchronized { stopped }
126-
}
127+
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
5454
}
5555
}
5656

57+
/**
58+
* Class that keeps track of the location of the location of the mapt output of
59+
* a stage. This is abstract because different versions of MapOutputTracker
60+
* (driver and worker) use different HashMap to store its metadata.
61+
*/
5762
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging {
5863

5964
private val timeout = AkkaUtils.askTimeout(conf)
@@ -181,6 +186,10 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
181186
}
182187
}
183188

189+
/**
190+
* MapOutputTracker for the workers. This uses BoundedHashMap to keep track of
191+
* a limited number of most recently used map output information.
192+
*/
184193
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
185194

186195
/**
@@ -192,7 +201,10 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
192201
protected val mapStatuses = new BoundedHashMap[Int, Array[MapStatus]](MAX_MAP_STATUSES, true)
193202
}
194203

195-
204+
/**
205+
* MapOutputTracker for the driver. This uses TimeStampedHashMap to keep track of map
206+
* output information, which allows old output information based on a TTL.
207+
*/
196208
private[spark] class MapOutputTrackerMaster(conf: SparkConf)
197209
extends MapOutputTracker(conf) {
198210

core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa
5050
override def get(key: A): Option[B] = {
5151
val timeStampedValue = internalMap.get(key)
5252
if (updateTimeStampOnGet && timeStampedValue != null) {
53-
internalJavaMap.replace(key, timeStampedValue, TimeStampedValue(currentTime, timeStampedValue.value))
53+
internalJavaMap.replace(key, timeStampedValue,
54+
TimeStampedValue(currentTime, timeStampedValue.value))
5455
}
5556
Option(timeStampedValue).map(_.value)
5657
}

core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ private[util] case class TimeStampedWeakValue[T](timestamp: Long, weakValue: Wea
3333
* A map that stores the timestamp of when a key was inserted along with the value,
3434
* while ensuring that the values are weakly referenced. If the value is garbage collected and
3535
* the weak reference is null, get() operation returns the key be non-existent. However,
36-
* the key is actually not remmoved in the current implementation. Key-value pairs whose
36+
* the key is actually not removed in the current implementation. Key-value pairs whose
3737
* timestamps are older than a particular threshold time can then be removed using the
3838
* clearOldValues method. It exposes a scala.collection.mutable.Map interface to allow it to be a
3939
* drop-in replacement for Scala HashMaps.

0 commit comments

Comments
 (0)