@@ -22,7 +22,8 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
2222
2323import scala .concurrent .ExecutionContext .Implicits .global
2424
25- import scala .collection .mutable .{ArrayBuffer , HashMap , HashSet }
25+ import scala .collection .mutable
26+ import scala .collection .mutable .{ArrayBuffer , HashMap }
2627import scala .concurrent .{Await , Future }
2728import scala .concurrent .duration ._
2829import scala .util .Random
@@ -111,7 +112,7 @@ private[spark] class BlockManager(
111112 MetadataCleanerType .BLOCK_MANAGER , this .dropOldNonBroadcastBlocks, conf)
112113 private val broadcastCleaner = new MetadataCleaner (
113114 MetadataCleanerType .BROADCAST_VARS , this .dropOldBroadcastBlocks, conf)
114- private val cachedPeers = new HashSet [BlockManagerId ]
115+ private val cachedPeers = new mutable. HashSet [BlockManagerId ]
115116 private var lastPeerFetchTime = 0L
116117
117118 initialize()
@@ -791,11 +792,10 @@ private[spark] class BlockManager(
791792 /**
792793 * Get peer block managers in the system.
793794 */
794- private def getPeers (forceFetch : Boolean ): HashSet [BlockManagerId ] = {
795- val cachedPeersTtl = conf.getInt(" spark.storage.cachedPeersTtl" , 60 * 1000 ) // milliseconds
796- val timeout = System .currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
797-
795+ private def getPeers (forceFetch : Boolean ): mutable.HashSet [BlockManagerId ] = {
798796 cachedPeers.synchronized {
797+ val cachedPeersTtl = conf.getInt(" spark.storage.cachedPeersTtl" , 60 * 1000 ) // milliseconds
798+ val timeout = System .currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
799799 if (cachedPeers.isEmpty || forceFetch || timeout) {
800800 cachedPeers.clear()
801801 cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
@@ -812,27 +812,52 @@ private[spark] class BlockManager(
812812 private def replicate (blockId : BlockId , data : ByteBuffer , level : StorageLevel ): Unit = {
813813 val maxReplicationFailures = conf.getInt(" spark.storage.maxReplicationFailures" , 1 )
814814 val numPeersToReplicateTo = level.replication - 1
815- val peersReplicatedTo = new HashSet [BlockManagerId ]
816- val peersFailedToReplicateTo = new HashSet [BlockManagerId ]
815+ val peersForReplication = new ArrayBuffer [BlockManagerId ]
816+ val peersReplicatedTo = new ArrayBuffer [BlockManagerId ]
817+ val peersFailedToReplicateTo = new ArrayBuffer [BlockManagerId ]
817818 val tLevel = StorageLevel (
818819 level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1 )
819820 val startTime = System .nanoTime
820821 val random = new Random (blockId.hashCode)
821822
822- var forceFetchPeers = false
823+ var replicationFailed = false
823824 var failures = 0
824825 var done = false
825826
826- // Get a random peer
827+ // Get cached list of peers
828+ peersForReplication ++= getPeers(forceFetch = false )
829+
830+ // Get a random peer. Note that this selection of a peer is deterministic on the block id.
831+ // So assuming the list of peers does not change and no replication failures,
832+ // if there are multiple attempts in the same node to replicate the same block,
833+ // the same set of peers will be selected.
827834 def getRandomPeer (): Option [BlockManagerId ] = {
828- val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo
829- if (! peers.isEmpty) Some (peers.toSeq(random.nextInt(peers.size))) else None
835+ // If replication had failed, then force update the cached list of peers and remove the peers
836+ // that have been already used
837+ if (replicationFailed) {
838+ peersForReplication.clear()
839+ peersForReplication ++= getPeers(forceFetch = true )
840+ peersForReplication --= peersReplicatedTo
841+ peersForReplication --= peersFailedToReplicateTo
842+ }
843+ if (! peersForReplication.isEmpty) {
844+ Some (peersForReplication(random.nextInt(peersForReplication.size)))
845+ } else {
846+ None
847+ }
830848 }
831849
832850 // One by one choose a random peer and try uploading the block to it
833851 // If replication fails (e.g., target peer is down), force the list of cached peers
834852 // to be re-fetched from driver and then pick another random peer for replication. Also
835853 // temporarily black list the peer for which replication failed.
854+ //
855+ // This selection of a peer and replication is continued in a loop until one of the
856+ // following 3 conditions is fulfilled:
857+ // (i) specified number of peers have been replicated to
858+ // (ii) too many failures in replicating to peers
859+ // (iii) no peer left to replicate to
860+ //
836861 while (! done) {
837862 getRandomPeer() match {
838863 case Some (peer) =>
@@ -845,22 +870,22 @@ private[spark] class BlockManager(
845870 logTrace(s " Replicated $blockId of ${data.limit()} bytes to $peer in %f ms "
846871 .format((System .nanoTime - onePeerStartTime) / 1e6 ))
847872 peersReplicatedTo += peer
848- forceFetchPeers = false
873+ peersForReplication -= peer
874+ replicationFailed = false
849875 if (peersReplicatedTo.size == numPeersToReplicateTo) {
850- done = true
876+ done = true // specified number of peers have been replicated to
851877 }
852878 } catch {
853879 case e : Exception =>
854880 logWarning(s " Failed to replicate $blockId to $peer, failure # $failures" , e)
855881 failures += 1
856- forceFetchPeers = true
882+ replicationFailed = true
857883 peersFailedToReplicateTo += peer
858- if (failures > maxReplicationFailures) {
884+ if (failures > maxReplicationFailures) { // too many failures in replcating to peers
859885 done = true
860886 }
861887 }
862- case None =>
863- // no peer left to replicate to
888+ case None => // no peer left to replicate to
864889 done = true
865890 }
866891 }
0 commit comments