From fc06d6a65d5f9fa5f33f085723a763fbe869a174 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Wed, 22 May 2024 22:45:57 +0800 Subject: [PATCH 1/7] fix --- .../org/apache/spark/MapOutputTracker.scala | 15 ++++-- .../apache/spark/MapOutputTrackerSuite.scala | 53 +++++++++++++++++++ 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index fdc2b0a4c20f..36586daa2c7f 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -44,7 +44,6 @@ import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId, ShuffleMergedBlockId} import org.apache.spark.util._ import org.apache.spark.util.ArrayImplicits._ -import org.apache.spark.util.collection.OpenHashMap import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} /** @@ -153,8 +152,10 @@ private class ShuffleStatus( /** * Mapping from a mapId to the mapIndex, this is required to reduce the searching overhead within * the function updateMapOutput(mapId, bmAddress). + * + * Exposed for testing. */ - private[this] val mapIdToMapIndex = new OpenHashMap[Long, Int]() + private[spark] val mapIdToMapIndex = new HashMap[Long, Int]() /** * Register a map output. If there is already a registered location for the map output then it @@ -164,6 +165,8 @@ private class ShuffleStatus( if (mapStatuses(mapIndex) == null) { _numAvailableMapOutputs += 1 invalidateSerializedMapOutputStatusCache() + } else { + mapIdToMapIndex.remove(mapStatuses(mapIndex).mapId) } mapStatuses(mapIndex) = status mapIdToMapIndex(status.mapId) = mapIndex @@ -224,7 +227,9 @@ private class ShuffleStatus( logDebug(s"Removing existing map output ${mapIndex} ${bmAddress}") if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == bmAddress) { _numAvailableMapOutputs -= 1 - mapStatusesDeleted(mapIndex) = mapStatuses(mapIndex) + val currentMapStatus = mapStatuses(mapIndex) + mapIdToMapIndex.remove(currentMapStatus.mapId) + mapStatusesDeleted(mapIndex) = currentMapStatus mapStatuses(mapIndex) = null invalidateSerializedMapOutputStatusCache() } @@ -292,7 +297,9 @@ private class ShuffleStatus( for (mapIndex <- mapStatuses.indices) { if (mapStatuses(mapIndex) != null && f(mapStatuses(mapIndex).location)) { _numAvailableMapOutputs -= 1 - mapStatusesDeleted(mapIndex) = mapStatuses(mapIndex) + val currentMapStatus = mapStatuses(mapIndex) + mapIdToMapIndex.remove(currentMapStatus.mapId) + mapStatusesDeleted(mapIndex) = currentMapStatus mapStatuses(mapIndex) = null invalidateSerializedMapOutputStatusCache() } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 7aec8eeaad42..db35e1620515 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -1110,4 +1110,57 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { rpcEnv.shutdown() } } + + test("mapIdToMapIndex should cleanup unused mapIndexes after removeOutputsByFilter") { + val rpcEnv = createRpcEnv("test") + val tracker = newTrackerMaster() + try { + tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf)) + tracker.registerShuffle(0, 1, 1) + tracker.registerMapOutput(0, 0, MapStatus(BlockManagerId("exec-1", "hostA", 1000), + Array(2L), 0)) + tracker.removeOutputsOnHost("hostA") + assert(tracker.shuffleStatuses(0).mapIdToMapIndex.filter(_._2 == 0).size == 0) + } finally { + tracker.stop() + rpcEnv.shutdown() + } + } + + test("mapIdToMapIndex should cleanup unused mapIndexes after unregisterMapOutput") { + val rpcEnv = createRpcEnv("test") + val tracker = newTrackerMaster() + try { + tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf)) + tracker.registerShuffle(0, 1, 1) + tracker.registerMapOutput(0, 0, MapStatus(BlockManagerId("exec-1", "hostA", 1000), + Array(2L), 0)) + tracker.unregisterMapOutput(0, 0, BlockManagerId("exec-1", "hostA", 1000)) + assert(tracker.shuffleStatuses(0).mapIdToMapIndex.filter(_._2 == 0).size == 0) + } finally { + tracker.stop() + rpcEnv.shutdown() + } + } + + test("mapIdToMapIndex should cleanup unused mapIndexes after registerMapOutput") { + val rpcEnv = createRpcEnv("test") + val tracker = newTrackerMaster() + try { + tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf)) + tracker.registerShuffle(0, 1, 1) + tracker.registerMapOutput(0, 0, MapStatus(BlockManagerId("exec-1", "hostA", 1000), + Array(2L), 0)) + // Another task also finished working on partition 0. + tracker.registerMapOutput(0, 0, MapStatus(BlockManagerId("exec-2", "hostB", 1000), + Array(2L), 1)) + assert(tracker.shuffleStatuses(0).mapIdToMapIndex.filter(_._2 == 0).size == 1) + } finally { + tracker.stop() + rpcEnv.shutdown() + } + } } From a52388bdfb929611374c39cd0ba0fd2fe5f201e4 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Thu, 23 May 2024 10:31:01 +0800 Subject: [PATCH 2/7] address comments --- .../scala/org/apache/spark/MapOutputTracker.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 36586daa2c7f..594eede2fda5 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -162,11 +162,12 @@ private class ShuffleStatus( * will be replaced by the new location. */ def addMapOutput(mapIndex: Int, status: MapStatus): Unit = withWriteLock { - if (mapStatuses(mapIndex) == null) { + val currentMapStatus = mapStatuses(mapIndex) + if (currentMapStatus == null) { _numAvailableMapOutputs += 1 invalidateSerializedMapOutputStatusCache() } else { - mapIdToMapIndex.remove(mapStatuses(mapIndex).mapId) + mapIdToMapIndex.remove(currentMapStatus.mapId) } mapStatuses(mapIndex) = status mapIdToMapIndex(status.mapId) = mapIndex @@ -225,9 +226,9 @@ private class ShuffleStatus( */ def removeMapOutput(mapIndex: Int, bmAddress: BlockManagerId): Unit = withWriteLock { logDebug(s"Removing existing map output ${mapIndex} ${bmAddress}") - if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == bmAddress) { + val currentMapStatus = mapStatuses(mapIndex) + if (currentMapStatus != null && currentMapStatus.location == bmAddress) { _numAvailableMapOutputs -= 1 - val currentMapStatus = mapStatuses(mapIndex) mapIdToMapIndex.remove(currentMapStatus.mapId) mapStatusesDeleted(mapIndex) = currentMapStatus mapStatuses(mapIndex) = null @@ -295,9 +296,9 @@ private class ShuffleStatus( */ def removeOutputsByFilter(f: BlockManagerId => Boolean): Unit = withWriteLock { for (mapIndex <- mapStatuses.indices) { - if (mapStatuses(mapIndex) != null && f(mapStatuses(mapIndex).location)) { + val currentMapStatus = mapStatuses(mapIndex) + if (currentMapStatus != null && f(currentMapStatus.location)) { _numAvailableMapOutputs -= 1 - val currentMapStatus = mapStatuses(mapIndex) mapIdToMapIndex.remove(currentMapStatus.mapId) mapStatusesDeleted(mapIndex) = currentMapStatus mapStatuses(mapIndex) = null From ab2cb35f4a5145ca844be6124123d43ef7d0f25c Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Thu, 23 May 2024 10:35:18 +0800 Subject: [PATCH 3/7] remove the dependency of mapIdToMapIndex for mapStatusesDeleted --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 594eede2fda5..a660bccd2e68 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -197,8 +197,8 @@ private class ShuffleStatus( mapStatus.updateLocation(bmAddress) invalidateSerializedMapOutputStatusCache() case None => - if (mapIndex.map(mapStatusesDeleted).exists(_.mapId == mapId)) { - val index = mapIndex.get + val index = mapStatusesDeleted.indexWhere(x => x != null && x.mapId == mapId) + if (index >= 0 && mapStatuses(index) == null) { val mapStatus = mapStatusesDeleted(index) mapStatus.updateLocation(bmAddress) mapStatuses(index) = mapStatus From 541f0afa620555cc54adf8ea563ae188527e5518 Mon Sep 17 00:00:00 2001 From: wuyi Date: Fri, 24 May 2024 10:28:32 +0800 Subject: [PATCH 4/7] Update core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala --- .../src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index db35e1620515..132d38e22434 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -1111,7 +1111,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { } } - test("mapIdToMapIndex should cleanup unused mapIndexes after removeOutputsByFilter") { + test("SPARK-48394: mapIdToMapIndex should cleanup unused mapIndexes after removeOutputsByFilter") { val rpcEnv = createRpcEnv("test") val tracker = newTrackerMaster() try { From 6ba0a180ffc887f6085200525ee30b3edf503412 Mon Sep 17 00:00:00 2001 From: wuyi Date: Fri, 24 May 2024 10:28:39 +0800 Subject: [PATCH 5/7] Update core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala --- .../src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 132d38e22434..96af2d251381 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -1128,7 +1128,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { } } - test("mapIdToMapIndex should cleanup unused mapIndexes after unregisterMapOutput") { + test("SPARK-48394: mapIdToMapIndex should cleanup unused mapIndexes after unregisterMapOutput") { val rpcEnv = createRpcEnv("test") val tracker = newTrackerMaster() try { From 656c1d51a213eee95c634d3f89004bac0686b586 Mon Sep 17 00:00:00 2001 From: wuyi Date: Fri, 24 May 2024 10:28:46 +0800 Subject: [PATCH 6/7] Update core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala --- .../src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 96af2d251381..0f9fd2ce379d 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -1145,7 +1145,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { } } - test("mapIdToMapIndex should cleanup unused mapIndexes after registerMapOutput") { + test("SPARK-48394: mapIdToMapIndex should cleanup unused mapIndexes after registerMapOutput") { val rpcEnv = createRpcEnv("test") val tracker = newTrackerMaster() try { From ee79518f4eea7f0525e8791b750bd74e574cadd5 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Fri, 24 May 2024 13:54:59 +0800 Subject: [PATCH 7/7] fix fmt --- .../test/scala/org/apache/spark/MapOutputTrackerSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 0f9fd2ce379d..26dc218c30c7 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -1111,7 +1111,9 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { } } - test("SPARK-48394: mapIdToMapIndex should cleanup unused mapIndexes after removeOutputsByFilter") { + test( + "SPARK-48394: mapIdToMapIndex should cleanup unused mapIndexes after removeOutputsByFilter" + ) { val rpcEnv = createRpcEnv("test") val tracker = newTrackerMaster() try {