From 7be0ce45a95b4f2449ac370baceb0b9f461e7199 Mon Sep 17 00:00:00 2001 From: attilapiros Date: Sat, 5 Jun 2021 10:46:08 +0200 Subject: [PATCH 1/2] Initial upload --- .../spark/storage/BlockManagerMasterEndpoint.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 7d068fd69529..f54d3dded970 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -276,8 +276,11 @@ class BlockManagerMasterEndpoint( val blockIdsToDel = blocksToDeleteByShuffleService.getOrElseUpdate(bmIdForShuffleService, new mutable.HashSet[RDDBlockId]()) blockIdsToDel += blockId - blockStatusByShuffleService.get(bmIdForShuffleService).foreach { blockStatus => - blockStatus.remove(blockId) + blockStatusByShuffleService.get(bmIdForShuffleService).foreach { blockStatusForId => + blockStatusForId.remove(blockId) + if (blockStatusForId.isEmpty) { + blockStatusByShuffleService.remove(bmIdForShuffleService) + } } } } @@ -309,6 +312,7 @@ class BlockManagerMasterEndpoint( Future.sequence(removeRddFromExecutorsFutures ++ removeRddBlockViaExtShuffleServiceFutures) } + private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = { // Nothing to do in the BlockManagerMasterEndpoint data structures val removeMsg = RemoveShuffle(shuffleId) @@ -665,7 +669,7 @@ class BlockManagerMasterEndpoint( val locations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty) val status = locations.headOption.flatMap { bmId => if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) { - Option(blockStatusByShuffleService(bmId).get(blockId)) + blockStatusByShuffleService.get(bmId).flatMap(m => Option(m.get(blockId))) } else { aliveBlockManagerInfo(bmId).flatMap(_.getStatus(blockId)) } From 6de20f10ca60c778db25ba3d5975f85e5c23aedb Mon Sep 17 00:00:00 2001 From: attilapiros Date: Mon, 7 Jun 2021 05:13:21 +0200 Subject: [PATCH 2/2] add comment --- .../org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index f54d3dded970..80dd1cb0be1d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -278,6 +278,8 @@ class BlockManagerMasterEndpoint( blockIdsToDel += blockId blockStatusByShuffleService.get(bmIdForShuffleService).foreach { blockStatusForId => blockStatusForId.remove(blockId) + // when all blocks are removed from the block statuses then for this BM Id the whole + // blockStatusByShuffleService entry can be removed to avoid leaking memory if (blockStatusForId.isEmpty) { blockStatusByShuffleService.remove(bmIdForShuffleService) }