Skip to content

Commit d17f6e9

Browse files
committed
Incorporating suggestions from @sameeragarwal. Removing objectives based replication strategy and constraint solver associate with it.
1 parent bdf69d6 commit d17f6e9

File tree

4 files changed

+5
-333
lines changed

4 files changed

+5
-333
lines changed

core/src/main/scala/org/apache/spark/storage/BlockReplicationObjectives.scala

Lines changed: 0 additions & 202 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -222,56 +222,3 @@ class BasicBlockReplicationPolicy
222222
}
223223

224224
}
225-
226-
@DeveloperApi
227-
class ObjectivesBasedReplicationPolicy
228-
extends BlockReplicationPolicy
229-
with Logging {
230-
val objectives: Set[BlockReplicationObjective] = Set(
231-
ReplicateToADifferentHost,
232-
ReplicateBlockOutsideRack,
233-
ReplicateBlockWithinRack,
234-
NoTwoReplicasInSameRack
235-
)
236-
237-
/**
238-
* Method to prioritize a bunch of candidate peers of a block
239-
*
240-
* @param blockManagerId Id of the current BlockManager for self identification
241-
* @param peers A list of peers of a BlockManager
242-
* @param peersReplicatedTo Set of peers already replicated to
243-
* @param blockId BlockId of the block being replicated. This can be used as a source of
244-
* randomness if needed.
245-
* @param numReplicas Number of peers we need to replicate to
246-
* @return A prioritized list of peers. Lower the index of a peer, higher its priority
247-
*/
248-
override def prioritize(
249-
blockManagerId: BlockManagerId,
250-
peers: Seq[BlockManagerId],
251-
peersReplicatedTo: mutable.HashSet[BlockManagerId],
252-
blockId: BlockId,
253-
numReplicas: Int): List[BlockManagerId] = {
254-
val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives(
255-
objectives,
256-
peers,
257-
peersReplicatedTo,
258-
blockId,
259-
blockManagerId,
260-
numReplicas
261-
)
262-
logDebug(s"BlockReplication objectives met : ${objectivesMet.mkString(", ")}")
263-
logDebug(s"Optimal peers : ${optimalPeers.mkString(", ")}")
264-
// pad optimal peers with random peers if we are short
265-
val numRemainingPeers = numReplicas - optimalPeers.size
266-
val remainingPeers = if (numRemainingPeers > 0) {
267-
val r = new Random(blockId.hashCode)
268-
BlockReplicationUtils.getRandomSample(
269-
peers.filter(p => !optimalPeers.contains(p)),
270-
numRemainingPeers,
271-
r)
272-
} else {
273-
List.empty
274-
}
275-
optimalPeers.toList ++ remainingPeers
276-
}
277-
}

core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -553,14 +553,3 @@ class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationB
553553
classOf[DummyTopologyMapper].getName)
554554
}
555555

556-
class BlockManagerPrioritizerReplicationSuite extends BlockManagerReplicationBehavior {
557-
override val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test")
558-
conf.set("spark.kryoserializer.buffer", "1m")
559-
conf.set(
560-
"spark.storage.replication.policy",
561-
classOf[ObjectivesBasedReplicationPolicy].getName)
562-
conf.set(
563-
"spark.storage.replication.topologyMapper",
564-
classOf[DummyTopologyMapper].getName
565-
)
566-
}

core/src/test/scala/org/apache/spark/storage/BlockReplicationPolicySuite.scala

Lines changed: 5 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ import org.scalatest.{BeforeAndAfter, Matchers}
2424

2525
import org.apache.spark.{LocalSparkContext, SparkFunSuite}
2626

27-
trait RandomBlockReplicationPolicyBehavior extends SparkFunSuite
27+
class RandomBlockReplicationPolicyBehavior extends SparkFunSuite
2828
with Matchers
2929
with BeforeAndAfter
3030
with LocalSparkContext {
3131

3232
// Implicitly convert strings to BlockIds for test clarity.
3333
protected implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
3434

35-
val replicationPolicy: BlockReplicationPolicy
35+
val replicationPolicy: BlockReplicationPolicy = new RandomBlockReplicationPolicy
3636

3737
val blockId = "test-block"
3838
/**
@@ -77,7 +77,9 @@ trait RandomBlockReplicationPolicyBehavior extends SparkFunSuite
7777
}
7878
}
7979

80-
trait TopologyAwareBlockReplicationPolicyBehavior extends RandomBlockReplicationPolicyBehavior {
80+
class TopologyAwareBlockReplicationPolicyBehavior extends RandomBlockReplicationPolicyBehavior {
81+
override val replicationPolicy = new BasicBlockReplicationPolicy
82+
8183
test("All peers in the same rack") {
8284
val racks = Seq("/default-rack")
8385
val numBlockManager = 10
@@ -123,67 +125,3 @@ trait TopologyAwareBlockReplicationPolicyBehavior extends RandomBlockReplication
123125
}
124126
}
125127
}
126-
127-
class RandomBlockReplicationPolicySuite extends RandomBlockReplicationPolicyBehavior {
128-
override val replicationPolicy = new RandomBlockReplicationPolicy
129-
}
130-
131-
class BasicBlockReplicationPolicySuite extends TopologyAwareBlockReplicationPolicyBehavior {
132-
override val replicationPolicy = new BasicBlockReplicationPolicy
133-
}
134-
135-
class ObjectivesBasedReplicationPolicySuite extends TopologyAwareBlockReplicationPolicyBehavior {
136-
override val replicationPolicy = new ObjectivesBasedReplicationPolicy
137-
138-
val objectives: Set[BlockReplicationObjective] = Set(
139-
ReplicateToADifferentHost,
140-
ReplicateBlockOutsideRack,
141-
ReplicateBlockWithinRack,
142-
NoTwoReplicasInSameRack
143-
)
144-
145-
test("peers are all in the same rack") {
146-
val blockManagerIds = generateBlockManagerIds(10, List("Default-rack"))
147-
148-
val blockId = BlockId("test_block")
149-
150-
val candidateBMId = generateBlockManagerIds(1, List("Default-rack")).head
151-
152-
val numReplicas = 2
153-
154-
val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives(
155-
objectives,
156-
blockManagerIds,
157-
mutable.HashSet.empty,
158-
blockId,
159-
candidateBMId,
160-
numReplicas)
161-
162-
logDebug(s"Optimal peers : ${optimalPeers}")
163-
logDebug(s"Objectives met : ${objectivesMet}")
164-
assert(optimalPeers.size == 1)
165-
assert(objectivesMet.size == 3)
166-
}
167-
168-
test("peers in 3 racks") {
169-
val racks = List("/Rack1", "/Rack2", "/Rack3")
170-
val blockManagerIds = generateBlockManagerIds(10, racks)
171-
val candidateBMId = generateBlockManagerIds(1, racks).head
172-
val blockId = BlockId("test_block")
173-
val numReplicas = 2
174-
val (optimalPeers, objectivesMet) = BlockReplicationOptimizer.getPeersToMeetObjectives(
175-
objectives,
176-
blockManagerIds,
177-
mutable.HashSet.empty,
178-
blockId,
179-
candidateBMId,
180-
numReplicas)
181-
182-
logDebug(s"Optimal peers : ${optimalPeers}")
183-
logDebug(s"Objectives met : ${objectivesMet}")
184-
assert(optimalPeers.size == 2)
185-
assert(objectivesMet.size == 4)
186-
assert(objectives.forall(_.isObjectiveMet(candidateBMId, optimalPeers)))
187-
}
188-
189-
}

0 commit comments

Comments
 (0)