@@ -217,7 +217,7 @@ private[spark] class BlockManager(
217217 logInfo(s " Reporting ${blockInfoManager.size} blocks to the master. " )
218218 for ((blockId, info) <- blockInfoManager.entries) {
219219 val status = getCurrentBlockStatus(blockId, info)
220- if (! tryToReportBlockStatus(blockId, info , status)) {
220+ if (info.tellMaster && ! tryToReportBlockStatus(blockId, status)) {
221221 logError(s " Failed to report $blockId to master; giving up. " )
222222 return
223223 }
@@ -298,7 +298,7 @@ private[spark] class BlockManager(
298298
299299 /**
300300 * Get the BlockStatus for the block identified by the given ID, if it exists.
301- * NOTE: This is mainly for testing, and it doesn't fetch information from external block store .
301+ * NOTE: This is mainly for testing.
302302 */
303303 def getStatus (blockId : BlockId ): Option [BlockStatus ] = {
304304 blockInfoManager.get(blockId).map { info =>
@@ -333,10 +333,9 @@ private[spark] class BlockManager(
333333 */
334334 private def reportBlockStatus (
335335 blockId : BlockId ,
336- info : BlockInfo ,
337336 status : BlockStatus ,
338337 droppedMemorySize : Long = 0L ): Unit = {
339- val needReregister = ! tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
338+ val needReregister = ! tryToReportBlockStatus(blockId, status, droppedMemorySize)
340339 if (needReregister) {
341340 logInfo(s " Got told to re-register updating block $blockId" )
342341 // Re-registering will report our new block for free.
@@ -352,17 +351,12 @@ private[spark] class BlockManager(
352351 */
353352 private def tryToReportBlockStatus (
354353 blockId : BlockId ,
355- info : BlockInfo ,
356354 status : BlockStatus ,
357355 droppedMemorySize : Long = 0L ): Boolean = {
358- if (info.tellMaster) {
359- val storageLevel = status.storageLevel
360- val inMemSize = Math .max(status.memSize, droppedMemorySize)
361- val onDiskSize = status.diskSize
362- master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
363- } else {
364- true
365- }
356+ val storageLevel = status.storageLevel
357+ val inMemSize = Math .max(status.memSize, droppedMemorySize)
358+ val onDiskSize = status.diskSize
359+ master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
366360 }
367361
368362 /**
@@ -374,7 +368,7 @@ private[spark] class BlockManager(
374368 info.synchronized {
375369 info.level match {
376370 case null =>
377- BlockStatus ( StorageLevel . NONE , memSize = 0L , diskSize = 0L )
371+ BlockStatus .empty
378372 case level =>
379373 val inMem = level.useMemory && memoryStore.contains(blockId)
380374 val onDisk = level.useDisk && diskStore.contains(blockId)
@@ -807,12 +801,10 @@ private[spark] class BlockManager(
807801 // Now that the block is in either the memory or disk store,
808802 // tell the master about it.
809803 info.size = size
810- if (tellMaster) {
811- reportBlockStatus(blockId, info, putBlockStatus)
812- }
813- Option (TaskContext .get()).foreach { c =>
814- c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
804+ if (tellMaster && info.tellMaster) {
805+ reportBlockStatus(blockId, putBlockStatus)
815806 }
807+ addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
816808 }
817809 logDebug(" Put block %s locally took %s" .format(blockId, Utils .getUsedTimeMs(startTimeMs)))
818810 if (level.replication > 1 ) {
@@ -961,15 +953,12 @@ private[spark] class BlockManager(
961953 val putBlockStatus = getCurrentBlockStatus(blockId, info)
962954 val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
963955 if (blockWasSuccessfullyStored) {
964- // Now that the block is in either the memory, externalBlockStore, or disk store,
965- // tell the master about it.
956+ // Now that the block is in either the memory or disk store, tell the master about it.
966957 info.size = size
967- if (tellMaster) {
968- reportBlockStatus(blockId, info, putBlockStatus)
969- }
970- Option (TaskContext .get()).foreach { c =>
971- c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
958+ if (tellMaster && info.tellMaster) {
959+ reportBlockStatus(blockId, putBlockStatus)
972960 }
961+ addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
973962 logDebug(" Put block %s locally took %s" .format(blockId, Utils .getUsedTimeMs(startTimeMs)))
974963 if (level.replication > 1 ) {
975964 val remoteStartTime = System .currentTimeMillis
@@ -1271,12 +1260,10 @@ private[spark] class BlockManager(
12711260
12721261 val status = getCurrentBlockStatus(blockId, info)
12731262 if (info.tellMaster) {
1274- reportBlockStatus(blockId, info, status, droppedMemorySize)
1263+ reportBlockStatus(blockId, status, droppedMemorySize)
12751264 }
12761265 if (blockIsUpdated) {
1277- Option (TaskContext .get()).foreach { c =>
1278- c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
1279- }
1266+ addUpdatedBlockStatusToTaskMetrics(blockId, status)
12801267 }
12811268 status.storageLevel
12821269 }
@@ -1316,21 +1303,31 @@ private[spark] class BlockManager(
13161303 // The block has already been removed; do nothing.
13171304 logWarning(s " Asked to remove block $blockId, which does not exist " )
13181305 case Some (info) =>
1319- // Removals are idempotent in disk store and memory store. At worst, we get a warning.
1320- val removedFromMemory = memoryStore.remove(blockId)
1321- val removedFromDisk = diskStore.remove(blockId)
1322- if (! removedFromMemory && ! removedFromDisk) {
1323- logWarning(s " Block $blockId could not be removed as it was not found in either " +
1324- " the disk, memory, or external block store" )
1325- }
1326- blockInfoManager.removeBlock(blockId)
1327- val removeBlockStatus = getCurrentBlockStatus(blockId, info)
1328- if (tellMaster && info.tellMaster) {
1329- reportBlockStatus(blockId, info, removeBlockStatus)
1330- }
1331- Option (TaskContext .get()).foreach { c =>
1332- c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus)
1333- }
1306+ removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster)
1307+ addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus .empty)
1308+ }
1309+ }
1310+
1311+ /**
1312+ * Internal version of [[removeBlock() ]] which assumes that the caller already holds a write
1313+ * lock on the block.
1314+ */
1315+ private def removeBlockInternal (blockId : BlockId , tellMaster : Boolean ): Unit = {
1316+ // Removals are idempotent in disk store and memory store. At worst, we get a warning.
1317+ val removedFromMemory = memoryStore.remove(blockId)
1318+ val removedFromDisk = diskStore.remove(blockId)
1319+ if (! removedFromMemory && ! removedFromDisk) {
1320+ logWarning(s " Block $blockId could not be removed as it was not found on disk or in memory " )
1321+ }
1322+ blockInfoManager.removeBlock(blockId)
1323+ if (tellMaster) {
1324+ reportBlockStatus(blockId, BlockStatus .empty)
1325+ }
1326+ }
1327+
1328+ private def addUpdatedBlockStatusToTaskMetrics (blockId : BlockId , status : BlockStatus ): Unit = {
1329+ Option (TaskContext .get()).foreach { c =>
1330+ c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
13341331 }
13351332 }
13361333
0 commit comments