Skip to content

Commit 012afa3

Browse files
committed
Bug fix
1 parent 89f91a0 commit 012afa3

File tree

1 file changed

+7
-7
lines changed

1 file changed

+7
-7
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ private[spark] class BlockManager(
112112
MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
113113
private val broadcastCleaner = new MetadataCleaner(
114114
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
115-
private val cachedPeers = new mutable.HashSet[BlockManagerId]
115+
@volatile private var cachedPeers: Seq[BlockManagerId] = _
116+
private val peerFetchLock = new Object
116117
private var lastPeerFetchTime = 0L
117118

118119
initialize()
@@ -792,18 +793,17 @@ private[spark] class BlockManager(
792793
/**
793794
* Get peer block managers in the system.
794795
*/
795-
private def getPeers(forceFetch: Boolean): mutable.HashSet[BlockManagerId] = {
796-
cachedPeers.synchronized {
796+
private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
797+
peerFetchLock.synchronized {
797798
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
798799
val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
799-
if (cachedPeers.isEmpty || forceFetch || timeout) {
800-
cachedPeers.clear()
801-
cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
800+
if (cachedPeers == null || forceFetch || timeout) {
801+
cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
802802
lastPeerFetchTime = System.currentTimeMillis
803803
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
804804
}
805+
cachedPeers
805806
}
806-
cachedPeers
807807
}
808808

809809
/**

0 commit comments

Comments
 (0)