Skip to content

Commit 544ac86

Browse files
committed
Clean up broadcast blocks through BlockManager*
1 parent d0edef3 commit 544ac86

File tree

9 files changed

+53
-14
lines changed

9 files changed

+53
-14
lines changed

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ private[spark] object HttpBroadcast extends Logging {
186186
* and delete the associated broadcast file.
187187
*/
188188
def unpersist(id: Long, removeFromDriver: Boolean) = synchronized {
189-
//SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver)
189+
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver)
190190
if (removeFromDriver) {
191191
val file = new File(broadcastDir, BroadcastBlockId(id).name)
192192
files.remove(file.toString)

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ private[spark] object TorrentBroadcast extends Logging {
232232
* If removeFromDriver is true, also remove these persisted blocks on the driver.
233233
*/
234234
def unpersist(id: Long, removeFromDriver: Boolean) = synchronized {
235-
//SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver)
235+
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver)
236236
}
237237

238238
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -820,10 +820,22 @@ private[spark] class BlockManager(
820820
// from RDD.id to blocks.
821821
logInfo("Removing RDD " + rddId)
822822
val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
823-
blocksToRemove.foreach(blockId => removeBlock(blockId, tellMaster = false))
823+
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
824824
blocksToRemove.size
825825
}
826826

827+
/**
828+
* Remove all blocks belonging to the given broadcast.
829+
*/
830+
def removeBroadcast(broadcastId: Long) {
831+
logInfo("Removing broadcast " + broadcastId)
832+
val blocksToRemove = blockInfo.keys.filter(_.isBroadcast).collect {
833+
case bid: BroadcastBlockId if bid.broadcastId == broadcastId => bid
834+
case bid: BroadcastHelperBlockId if bid.broadcastId.broadcastId == broadcastId => bid
835+
}
836+
blocksToRemove.foreach { blockId => removeBlock(blockId) }
837+
}
838+
827839
/**
828840
* Remove a block from both memory and disk.
829841
*/

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,13 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
126126
askDriverWithReply(RemoveShuffle(shuffleId))
127127
}
128128

129+
/**
130+
* Remove all blocks belonging to the given broadcast.
131+
*/
132+
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean) {
133+
askDriverWithReply(RemoveBroadcast(broadcastId, removeFromMaster))
134+
}
135+
129136
/**
130137
* Return the memory status for each block manager, in the form of a map from
131138
* the block manager's id to two long values. The first value is the maximum

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
100100
removeShuffle(shuffleId)
101101
sender ! true
102102

103+
case RemoveBroadcast(broadcastId, removeFromDriver) =>
104+
removeBroadcast(broadcastId, removeFromDriver)
105+
sender ! true
106+
103107
case RemoveBlock(blockId) =>
104108
removeBlockFromWorkers(blockId)
105109
sender ! true
@@ -151,9 +155,15 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
151155
private def removeShuffle(shuffleId: Int) {
152156
// Nothing to do in the BlockManagerMasterActor data structures
153157
val removeMsg = RemoveShuffle(shuffleId)
154-
blockManagerInfo.values.foreach { bm =>
155-
bm.slaveActor ! removeMsg
156-
}
158+
blockManagerInfo.values.foreach { bm => bm.slaveActor ! removeMsg }
159+
}
160+
161+
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) {
162+
// TODO(aor): Consolidate usages of <driver>
163+
val removeMsg = RemoveBroadcast(broadcastId)
164+
blockManagerInfo.values
165+
.filter { info => removeFromDriver || info.blockManagerId.executorId != "<driver>" }
166+
.foreach { bm => bm.slaveActor ! removeMsg }
157167
}
158168

159169
private def removeBlockManager(blockManagerId: BlockManagerId) {

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
2222
import akka.actor.ActorRef
2323

2424
private[storage] object BlockManagerMessages {
25+
2526
//////////////////////////////////////////////////////////////////////////////////
2627
// Messages from the master to slaves.
2728
//////////////////////////////////////////////////////////////////////////////////
29+
2830
sealed trait ToBlockManagerSlave
2931

3032
// Remove a block from the slaves that have it. This can only be used to remove
@@ -37,10 +39,15 @@ private[storage] object BlockManagerMessages {
3739
// Remove all blocks belonging to a specific shuffle.
3840
case class RemoveShuffle(shuffleId: Int) extends ToBlockManagerSlave
3941

42+
// Remove all blocks belonging to a specific broadcast.
43+
case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)
44+
extends ToBlockManagerSlave
45+
4046

4147
//////////////////////////////////////////////////////////////////////////////////
4248
// Messages from slaves to the master.
4349
//////////////////////////////////////////////////////////////////////////////////
50+
4451
sealed trait ToBlockManagerMaster
4552

4653
case class RegisterBlockManager(
@@ -57,8 +64,7 @@ private[storage] object BlockManagerMessages {
5764
var storageLevel: StorageLevel,
5865
var memSize: Long,
5966
var diskSize: Long)
60-
extends ToBlockManagerMaster
61-
with Externalizable {
67+
extends ToBlockManagerMaster with Externalizable {
6268

6369
def this() = this(null, null, null, 0, 0) // For deserialization only
6470

@@ -80,7 +86,8 @@ private[storage] object BlockManagerMessages {
8086
}
8187

8288
object UpdateBlockInfo {
83-
def apply(blockManagerId: BlockManagerId,
89+
def apply(
90+
blockManagerId: BlockManagerId,
8491
blockId: BlockId,
8592
storageLevel: StorageLevel,
8693
memSize: Long,

core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,8 @@ class BlockManagerSlaveActor(
4646
if (mapOutputTracker != null) {
4747
mapOutputTracker.unregisterShuffle(shuffleId)
4848
}
49+
50+
case RemoveBroadcast(broadcastId, _) =>
51+
blockManager.removeBroadcast(broadcastId)
4952
}
5053
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -461,10 +461,10 @@ private[spark] object Utils extends Logging {
461461
private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
462462

463463
def parseHostPort(hostPort: String): (String, Int) = {
464-
{
465-
// Check cache first.
466-
val cached = hostPortParseResults.get(hostPort)
467-
if (cached != null) return cached
464+
// Check cache first.
465+
val cached = hostPortParseResults.get(hostPort)
466+
if (cached != null) {
467+
return cached
468468
}
469469

470470
val indx: Int = hostPort.lastIndexOf(':')

core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ import org.scalatest.concurrent.Eventually._
2828
import org.scalatest.time.SpanSugar._
2929

3030
import org.apache.spark.SparkContext._
31-
import org.apache.spark.storage.{RDDBlockId, ShuffleBlockId}
3231
import org.apache.spark.rdd.RDD
32+
import org.apache.spark.storage.{RDDBlockId, ShuffleBlockId}
3333

3434
class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
3535

0 commit comments

Comments
 (0)