@@ -1010,9 +1010,9 @@ private[spark] class BlockManager(
10101010 info.synchronized {
10111011 // required ? As of now, this will be invoked only for blocks which are ready
10121012 // But in case this changes in future, adding for consistency sake.
1013- if (! info.waitForReady()) {
1013+ if (blockInfo.get(blockId).isEmpty || ! info.waitForReady()) {
10141014 // If we get here, the block write failed.
1015- logWarning(s " Block $blockId was marked as failure. Nothing to drop " )
1015+ logWarning(s " Block $blockId was marked as failure or already dropped . Nothing to drop " )
10161016 return None
10171017 }
10181018
@@ -1089,15 +1089,17 @@ private[spark] class BlockManager(
10891089 val info = blockInfo.get(blockId).orNull
10901090 if (info != null ) {
10911091 info.synchronized {
1092- // Removals are idempotent in disk store and memory store. At worst, we get a warning.
1093- val removedFromMemory = memoryStore.remove(blockId)
1094- val removedFromDisk = diskStore.remove(blockId)
1095- val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
1096- if (! removedFromMemory && ! removedFromDisk && ! removedFromTachyon) {
1097- logWarning(s " Block $blockId could not be removed as it was not found in either " +
1098- " the disk, memory, or tachyon store" )
1092+ if (blockInfo.get(blockId).isEmpty) {
1093+ // Removals are idempotent in disk store and memory store. At worst, we get a warning.
1094+ val removedFromMemory = memoryStore.remove(blockId)
1095+ val removedFromDisk = diskStore.remove(blockId)
1096+ val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
1097+ if (! removedFromMemory && ! removedFromDisk && ! removedFromTachyon) {
1098+ logWarning(s " Block $blockId could not be removed as it was not found in either " +
1099+ " the disk, memory, or tachyon store" )
1100+ }
1101+ blockInfo.remove(blockId)
10991102 }
1100- blockInfo.remove(blockId)
11011103 if (tellMaster && info.tellMaster) {
11021104 val status = getCurrentBlockStatus(blockId, info)
11031105 reportBlockStatus(blockId, info, status)
@@ -1126,12 +1128,14 @@ private[spark] class BlockManager(
11261128 val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp)
11271129 if (time < cleanupTime && shouldDrop(id)) {
11281130 info.synchronized {
1129- val level = info.level
1130- if (level.useMemory) { memoryStore.remove(id) }
1131- if (level.useDisk) { diskStore.remove(id) }
1132- if (level.useOffHeap) { tachyonStore.remove(id) }
1133- iterator.remove()
1134- logInfo(s " Dropped block $id" )
1131+ if (blockInfo.get(id).isEmpty) {
1132+ val level = info.level
1133+ if (level.useMemory) { memoryStore.remove(id) }
1134+ if (level.useDisk) { diskStore.remove(id) }
1135+ if (level.useOffHeap) { tachyonStore.remove(id) }
1136+ iterator.remove()
1137+ logInfo(s " Dropped block $id" )
1138+ }
11351139 }
11361140 val status = getCurrentBlockStatus(id, info)
11371141 reportBlockStatus(id, info, status)
0 commit comments