Skip to content
16 changes: 11 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap

import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
import org.apache.spark.util.{TimeStampedHashMap, Utils}

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -123,7 +123,8 @@ private[spark] object BlockManagerId {
execId: String,
host: String,
port: Int,
topologyInfo: Option[String] = None): BlockManagerId =
topologyInfo: Option[String] = None,
clearOldValues: Boolean = false): BlockManagerId =
getCachedBlockManagerId(new BlockManagerId(execId, host, port, topologyInfo))

def apply(in: ObjectInput): BlockManagerId = {
Expand All @@ -132,10 +133,15 @@ private[spark] object BlockManagerId {
getCachedBlockManagerId(obj)
}

val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
val blockManagerIdCache = new TimeStampedHashMap[BlockManagerId, BlockManagerId](true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause serious performance regression? TimeStampedHashMap will copy all the old values on update. cc @cloud-fan


def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
def getCachedBlockManagerId(id: BlockManagerId, clearOldValues: Boolean = false): BlockManagerId =
{
blockManagerIdCache.putIfAbsent(id, id)
blockManagerIdCache.get(id)
val blockManagerId = blockManagerIdCache.get(id)
if (clearOldValues) {
blockManagerIdCache.clearOldValues(System.currentTimeMillis - Utils.timeStringAsMs("10d"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 days? I don't think time can be a judging criteria to decide whether we should remove a cached id or not, even if you set the time threshold far less/greater than '10d'. Think about a extreamly case that a block could be frequently got all the time during the app‘s running. So, it would be certainly removed from cache due to the time threshold, and recached next time we get it, and repeatedly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 Thanks.i also though about remove when we delete a block.
In this case, it is history replaying which will trigger this problem,and we do not delete any block actually.
Maybe use weakreference better as @jiangxb1987 mentioned?WDYT?
Thanks again!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simply use com.google.common.cache.Cache? which has a size limitation and we don't need to worry about OOM.

}
blockManagerId.get
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ private[spark] object JsonProtocol {
val executorId = (json \ "Executor ID").extract[String].intern()
val host = (json \ "Host").extract[String].intern()
val port = (json \ "Port").extract[Int]
BlockManagerId(executorId, host, port)
BlockManagerId(executorId, host, port, clearOldValues = true)
}

private object JOB_RESULT_FORMATTED_CLASS_NAMES {
Expand Down