Skip to content

Commit 620eca3

Browse files
committed
Changes based on PR comments.
1 parent f2881fd commit 620eca3

File tree

7 files changed

+27
-26
lines changed

7 files changed

+27
-26
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
7070
* their cache of map output locations if this happens.
7171
*/
7272
protected var epoch: Long = 0
73-
protected val epochLock = new java.lang.Object
73+
protected val epochLock = new AnyRef
7474

7575
/** Remembers which map output locations are currently being fetched on a worker */
7676
private val fetching = new HashSet[Int]
@@ -305,7 +305,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
305305
cachedSerializedStatuses.clear()
306306
}
307307

308-
protected def cleanup(cleanupTime: Long) {
308+
private def cleanup(cleanupTime: Long) {
309309
mapStatuses.clearOldValues(cleanupTime)
310310
cachedSerializedStatuses.clearOldValues(cleanupTime)
311311
}

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
150150
private def removeShuffle(shuffleId: Int) {
151151
// Nothing to do in the BlockManagerMasterActor data structures
152152
val removeMsg = RemoveShuffle(shuffleId)
153-
blockManagerInfo.values.map { bm =>
153+
blockManagerInfo.values.foreach { bm =>
154154
bm.slaveActor ! removeMsg
155155
}
156156
}

core/src/main/scala/org/apache/spark/util/BoundedHashMap.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,14 @@ import scala.reflect.ClassTag
4545
private[spark] class BoundedHashMap[A, B](bound: Int, useLRU: Boolean)
4646
extends WrappedJavaHashMap[A, B, A, B] with SynchronizedMap[A, B] {
4747

48-
protected[util] val internalJavaMap = Collections.synchronizedMap(new LinkedHashMap[A, B](
48+
private[util] val internalJavaMap = Collections.synchronizedMap(new LinkedHashMap[A, B](
4949
bound / 8, (0.75).toFloat, useLRU) {
5050
override protected def removeEldestEntry(eldest: JMapEntry[A, B]): Boolean = {
5151
size() > bound
5252
}
5353
})
5454

55-
protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
55+
private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
5656
new BoundedHashMap[K1, V1](bound, useLRU)
5757
}
5858

core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ private[util] case class TimeStampedValue[T](timestamp: Long, value: T)
3939
private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false)
4040
extends WrappedJavaHashMap[A, B, A, TimeStampedValue[B]] with Logging {
4141

42-
protected[util] val internalJavaMap = new ConcurrentHashMap[A, TimeStampedValue[B]]()
42+
private[util] val internalJavaMap = new ConcurrentHashMap[A, TimeStampedValue[B]]()
4343

44-
protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
44+
private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
4545
new TimeStampedHashMap[K1, V1]()
4646
}
4747

core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ private[spark] class TimeStampedWeakValueHashMap[A, B]()
5151
/** Counter for counting the number of inserts */
5252
private val insertCounts = new AtomicInteger(0)
5353

54-
protected[util] val internalJavaMap: util.Map[A, TimeStampedWeakValue[B]] = {
54+
private[util] val internalJavaMap: util.Map[A, TimeStampedWeakValue[B]] = {
5555
new ConcurrentHashMap[A, TimeStampedWeakValue[B]]()
5656
}
5757

58-
protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
58+
private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
5959
new TimeStampedWeakValueHashMap[K1, V1]()
6060
}
6161

@@ -68,15 +68,12 @@ private[spark] class TimeStampedWeakValueHashMap[A, B]()
6868
}
6969

7070
override def get(key: A): Option[B] = {
71-
Option(internalJavaMap.get(key)) match {
72-
case Some(weakValue) =>
73-
val value = weakValue.weakValue.get
74-
if (value == null) {
75-
internalJavaMap.remove(key)
76-
}
77-
Option(value)
78-
case None =>
79-
None
71+
Option(internalJavaMap.get(key)).flatMap { weakValue =>
72+
val value = weakValue.weakValue.get
73+
if (value == null) {
74+
internalJavaMap.remove(key)
75+
}
76+
Option(value)
8077
}
8178
}
8279

core/src/main/scala/org/apache/spark/util/WrappedJavaHashMap.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,15 @@ private[spark] abstract class WrappedJavaHashMap[K, V, IK, IV] extends Map[K, V]
4646

4747
/* Methods that must be defined. */
4848

49-
/** Internal Java HashMap that is being wrapped. */
50-
protected[util] val internalJavaMap: JMap[IK, IV]
49+
/**
50+
* Internal Java HashMap that is being wrapped.
51+
* Scoped private[util] so that rest of Spark code cannot
52+
* directly access the internal map.
53+
*/
54+
private[util] val internalJavaMap: JMap[IK, IV]
5155

5256
/** Method to get a new instance of the internal Java HashMap. */
53-
protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _]
57+
private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _]
5458

5559
/*
5660
Methods that convert between internal and external types. These implementations

core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
package org.apache.spark.util
1919

20-
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
21-
import scala.util.Random
22-
2320
import java.util
2421
import java.lang.ref.WeakReference
2522

23+
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
24+
import scala.util.Random
25+
2626
import org.scalatest.FunSuite
2727

2828
class WrappedJavaHashMapSuite extends FunSuite {
@@ -203,9 +203,9 @@ class WrappedJavaHashMapSuite extends FunSuite {
203203
}
204204

205205
class TestMap[A, B] extends WrappedJavaHashMap[A, B, A, B] {
206-
protected[util] val internalJavaMap: util.Map[A, B] = new util.HashMap[A, B]()
206+
private[util] val internalJavaMap: util.Map[A, B] = new util.HashMap[A, B]()
207207

208-
protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
208+
private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
209209
new TestMap[K1, V1]
210210
}
211211
}

0 commit comments

Comments
 (0)