Skip to content

Commit 8512612

Browse files
committed
Changed TimeStampedHashMap to use WrappedJavaHashMap.
1 parent e427a9e commit 8512612

File tree

6 files changed

+154
-104
lines changed

6 files changed

+154
-104
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ private[spark] class BlockManager(
225225
* the slave needs to re-register.
226226
*/
227227
private def tryToReportBlockStatus(blockId: BlockId, info: BlockInfo, droppedMemorySize: Long = 0L): Boolean = {
228+
logInfo("Reporting " + blockId)
228229
val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
229230
info.level match {
230231
case null =>
@@ -770,7 +771,7 @@ private[spark] class BlockManager(
770771
val iterator = blockInfo.internalMap.entrySet().iterator()
771772
while (iterator.hasNext) {
772773
val entry = iterator.next()
773-
val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
774+
val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp)
774775
if (time < cleanupTime && shouldDrop(id)) {
775776
info.synchronized {
776777
val level = info.level

core/src/main/scala/org/apache/spark/util/BoundedHashMap.scala

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.spark.util
219

320
import scala.collection.mutable.{ArrayBuffer, SynchronizedMap}
@@ -7,16 +24,20 @@ import java.util.Map.{Entry => JMapEntry}
724
import scala.reflect.ClassTag
825

926
/**
10-
* A map that bounds the number of key-value pairs present in it. It can be configured to
11-
* drop least recently inserted or used pair. It exposes a scala.collection.mutable.Map interface
12-
* to allow it to be a drop-in replacement of Scala HashMaps. Internally, a Java LinkedHashMap is
13-
* used to get insert-order or access-order behavior. Note that the LinkedHashMap is not
14-
* thread-safe and hence, it is wrapped in a Collections.synchronizedMap.
15-
* However, getting the Java HashMap's iterator and using it can still lead to
16-
* ConcurrentModificationExceptions. Hence, the iterator() function is overridden to copy the
17-
* all pairs into an ArrayBuffer and then return the iterator to the ArrayBuffer. Also,
18-
* the class apply the trait SynchronizedMap which ensures that all calls to the Scala Map API
19-
* are synchronized. This together ensures that ConcurrentModificationException is never thrown.
27+
* A map that upper bounds the number of key-value pairs present in it. It can be configured to
28+
* drop the least recently user pair or the earliest inserted pair. It exposes a
29+
* scala.collection.mutable.Map interface to allow it to be a drop-in replacement for Scala
30+
* HashMaps.
31+
*
32+
* Internally, a Java LinkedHashMap is used to get insert-order or access-order behavior.
33+
* Note that the LinkedHashMap is not thread-safe and hence, it is wrapped in a
34+
* Collections.synchronizedMap. However, getting the Java HashMap's iterator and
35+
* using it can still lead to ConcurrentModificationExceptions. Hence, the iterator()
36+
* function is overridden to copy the all pairs into an ArrayBuffer and then return the
37+
* iterator to the ArrayBuffer. Also, the class apply the trait SynchronizedMap which
38+
* ensures that all calls to the Scala Map API are synchronized. This together ensures
39+
* that ConcurrentModificationException is never thrown.
40+
*
2041
* @param bound max number of key-value pairs
2142
* @param useLRU true = least recently used/accessed will be dropped when bound is reached,
2243
* false = earliest inserted will be dropped
@@ -37,7 +58,8 @@ private[spark] class BoundedHashMap[A, B](bound: Int, useLRU: Boolean)
3758

3859
/**
3960
* Overriding iterator to make sure that the internal Java HashMap's iterator
40-
* is not concurrently modified.
61+
* is not concurrently modified. This can be a performance issue and this should be overridden
62+
* if it is known that this map will not be used in a multi-threaded environment.
4163
*/
4264
override def iterator: Iterator[(A, B)] = {
4365
(new ArrayBuffer[(A, B)] ++= super.iterator).iterator

core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala

Lines changed: 35 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -18,108 +18,66 @@
1818
package org.apache.spark.util
1919

2020
import java.util.concurrent.ConcurrentHashMap
21-
import scala.collection.JavaConversions
22-
import scala.collection.mutable.Map
23-
import scala.collection.immutable
24-
import org.apache.spark.scheduler.MapStatus
21+
2522
import org.apache.spark.Logging
2623

24+
private[util] case class TimeStampedValue[T](timestamp: Long, value: T)
25+
2726
/**
28-
* This is a custom implementation of scala.collection.mutable.Map which stores the insertion
29-
* timestamp along with each key-value pair. If specified, the timestamp of each pair can be
30-
* updated every time it is accessed. Key-value pairs whose timestamp are older than a particular
31-
* threshold time can then be removed using the clearOldValues method. This is intended to
32-
* be a drop-in replacement of scala.collection.mutable.HashMap.
27+
* A map that stores the timestamp of when a key was inserted along with the value. If specified,
28+
* the timestamp of each pair can be updated every time it is accessed.
29+
* Key-value pairs whose timestamps are older than a particular
30+
* threshold time can then be removed using the clearOldValues method. It exposes a
31+
* scala.collection.mutable.Map interface to allow it to be a drop-in replacement for Scala
32+
* HashMaps.
33+
*
34+
* Internally, it uses a Java ConcurrentHashMap, so all operations on this HashMap are thread-safe.
35+
*
3336
* @param updateTimeStampOnGet When enabled, the timestamp of a pair will be
3437
* updated when it is accessed
3538
*/
36-
class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false)
37-
extends Map[A, B]() with Logging {
38-
val internalMap = new ConcurrentHashMap[A, (B, Long)]()
39-
40-
def get(key: A): Option[B] = {
41-
val value = internalMap.get(key)
42-
if (value != null && updateTimeStampOnGet) {
43-
internalMap.replace(key, value, (value._1, currentTime))
44-
}
45-
Option(value).map(_._1)
46-
}
47-
48-
def iterator: Iterator[(A, B)] = {
49-
val jIterator = internalMap.entrySet().iterator()
50-
JavaConversions.asScalaIterator(jIterator).map(kv => (kv.getKey, kv.getValue._1))
51-
}
52-
53-
override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = {
54-
val newMap = new TimeStampedHashMap[A, B1]
55-
newMap.internalMap.putAll(this.internalMap)
56-
newMap.internalMap.put(kv._1, (kv._2, currentTime))
57-
newMap
58-
}
59-
60-
override def - (key: A): Map[A, B] = {
61-
val newMap = new TimeStampedHashMap[A, B]
62-
newMap.internalMap.putAll(this.internalMap)
63-
newMap.internalMap.remove(key)
64-
newMap
65-
}
39+
private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false)
40+
extends WrappedJavaHashMap[A, B, A, TimeStampedValue[B]] with Logging {
6641

67-
override def += (kv: (A, B)): this.type = {
68-
internalMap.put(kv._1, (kv._2, currentTime))
69-
this
70-
}
42+
protected[util] val internalJavaMap = new ConcurrentHashMap[A, TimeStampedValue[B]]()
7143

72-
// Should we return previous value directly or as Option ?
73-
def putIfAbsent(key: A, value: B): Option[B] = {
74-
val prev = internalMap.putIfAbsent(key, (value, currentTime))
75-
if (prev != null) Some(prev._1) else None
44+
protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
45+
new TimeStampedHashMap[K1, V1]()
7646
}
7747

48+
def internalMap = internalJavaMap
7849

79-
override def -= (key: A): this.type = {
80-
internalMap.remove(key)
81-
this
82-
}
83-
84-
override def update(key: A, value: B) {
85-
this += ((key, value))
50+
override def get(key: A): Option[B] = {
51+
val timeStampedValue = internalMap.get(key)
52+
if (updateTimeStampOnGet && timeStampedValue != null) {
53+
internalJavaMap.replace(key, timeStampedValue, TimeStampedValue(currentTime, timeStampedValue.value))
54+
}
55+
Option(timeStampedValue).map(_.value)
8656
}
87-
88-
override def apply(key: A): B = {
89-
val value = internalMap.get(key)
90-
if (value == null) throw new NoSuchElementException()
91-
value._1
57+
@inline override protected def externalValueToInternalValue(v: B): TimeStampedValue[B] = {
58+
new TimeStampedValue(currentTime, v)
9259
}
9360

94-
override def filter(p: ((A, B)) => Boolean): Map[A, B] = {
95-
JavaConversions.mapAsScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p)
61+
@inline override protected def internalValueToExternalValue(iv: TimeStampedValue[B]): B = {
62+
iv.value
9663
}
9764

98-
override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
99-
100-
override def size: Int = internalMap.size
101-
102-
override def foreach[U](f: ((A, B)) => U) {
103-
val iterator = internalMap.entrySet().iterator()
104-
while(iterator.hasNext) {
105-
val entry = iterator.next()
106-
val kv = (entry.getKey, entry.getValue._1)
107-
f(kv)
108-
}
65+
/** Atomically put if a key is absent. This exposes the existing API of ConcurrentHashMap. */
66+
def putIfAbsent(key: A, value: B): Option[B] = {
67+
val prev = internalJavaMap.putIfAbsent(key, TimeStampedValue(currentTime, value))
68+
Option(prev).map(_.value)
10969
}
11070

111-
def toMap: immutable.Map[A, B] = iterator.toMap
112-
11371
/**
11472
* Removes old key-value pairs that have timestamp earlier than `threshTime`,
11573
* calling the supplied function on each such entry before removing.
11674
*/
11775
def clearOldValues(threshTime: Long, f: (A, B) => Unit) {
118-
val iterator = internalMap.entrySet().iterator()
76+
val iterator = internalJavaMap.entrySet().iterator()
11977
while (iterator.hasNext) {
12078
val entry = iterator.next()
121-
if (entry.getValue._2 < threshTime) {
122-
f(entry.getKey, entry.getValue._1)
79+
if (entry.getValue.timestamp < threshTime) {
80+
f(entry.getKey, entry.getValue.value)
12381
logDebug("Removing key " + entry.getKey)
12482
iterator.remove()
12583
}
@@ -134,5 +92,4 @@ class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false)
13492
}
13593

13694
private def currentTime: Long = System.currentTimeMillis()
137-
13895
}

core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.spark.util
219

320
import scala.collection.{JavaConversions, immutable}
@@ -12,8 +29,19 @@ private[util] case class TimeStampedWeakValue[T](timestamp: Long, weakValue: Wea
1229
def this(timestamp: Long, value: T) = this(timestamp, new WeakReference[T](value))
1330
}
1431

32+
/**
33+
* A map that stores the timestamp of when a key was inserted along with the value,
34+
* while ensuring that the values are weakly referenced. If the value is garbage collected and
35+
* the weak reference is null, get() operation returns the key be non-existent. However,
36+
* the key is actually not remmoved in the current implementation. Key-value pairs whose
37+
* timestamps are older than a particular threshold time can then be removed using the
38+
* clearOldValues method. It exposes a scala.collection.mutable.Map interface to allow it to be a
39+
* drop-in replacement for Scala HashMaps.
40+
*
41+
* Internally, it uses a Java ConcurrentHashMap, so all operations on this HashMap are thread-safe.
42+
*/
1543

16-
private[spark] class TimeStampedWeakValueHashMap[A, B]
44+
private[spark] class TimeStampedWeakValueHashMap[A, B]()
1745
extends WrappedJavaHashMap[A, B, A, TimeStampedWeakValue[B]] with Logging {
1846

1947
protected[util] val internalJavaMap: util.Map[A, TimeStampedWeakValue[B]] = {

core/src/main/scala/org/apache/spark/util/WrappedJavaHashMap.scala

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.spark.util
219

320
import scala.collection.mutable.Map
@@ -8,8 +25,8 @@ import scala.reflect.ClassTag
825

926
/**
1027
* Convenient wrapper class for exposing Java HashMaps as Scala Maps even if the
11-
* exposed key-value type is different from the internal type. This allows Scala HashMaps to be
12-
* hot replaceable with these Java HashMaps.
28+
* exposed key-value type is different from the internal type. This allows these
29+
* implementations of WrappedJavaHashMap to be drop-in replacements for Scala HashMaps.
1330
*
1431
* While Java <-> Scala conversion methods exists, its hard to understand the performance
1532
* implications and thread safety of the Scala wrapper. This class allows you to convert
@@ -62,7 +79,7 @@ private[spark] abstract class WrappedJavaHashMap[K, V, IK, IV] extends Map[K, V]
6279
(internalKeyToExternalKey(ip.getKey), internalValueToExternalValue(ip.getValue) )
6380
}
6481

65-
/* Implicit functions to convert the types. */
82+
/* Implicit methods to convert the types. */
6683

6784
@inline implicit private def convExtKeyToIntKey(k: K) = externalKeyToInternalKey(k)
6885

@@ -76,6 +93,8 @@ private[spark] abstract class WrappedJavaHashMap[K, V, IK, IV] extends Map[K, V]
7693
internalPairToExternalPair(ip)
7794
}
7895

96+
/* Methods that must be implemented for a scala.collection.mutable.Map */
97+
7998
def get(key: K): Option[V] = {
8099
Option(internalJavaMap.get(key))
81100
}
@@ -85,6 +104,8 @@ private[spark] abstract class WrappedJavaHashMap[K, V, IK, IV] extends Map[K, V]
85104
JavaConversions.asScalaIterator(jIterator).map(kv => convIntPairToExtPair(kv))
86105
}
87106

107+
/* Other methods that are implemented to ensure performance. */
108+
88109
def +=(kv: (K, V)): this.type = {
89110
internalJavaMap.put(kv._1, kv._2)
90111
this
@@ -109,8 +130,9 @@ private[spark] abstract class WrappedJavaHashMap[K, V, IK, IV] extends Map[K, V]
109130
}
110131

111132
override def foreach[U](f: ((K, V)) => U) {
112-
while(iterator.hasNext) {
113-
f(iterator.next())
133+
val jIterator = internalJavaMap.entrySet().iterator()
134+
while(jIterator.hasNext) {
135+
f(jIterator.next())
114136
}
115137
}
116138

0 commit comments

Comments
 (0)