-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-48394][CORE] Cleanup mapIdToMapIndex on mapoutput unregister #46706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fc06d6a
a52388b
ab2cb35
541f0af
6ba0a18
656c1d5
ee79518
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,7 +44,6 @@ import org.apache.spark.shuffle.MetadataFetchFailedException | |
| import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId, ShuffleMergedBlockId} | ||
| import org.apache.spark.util._ | ||
| import org.apache.spark.util.ArrayImplicits._ | ||
| import org.apache.spark.util.collection.OpenHashMap | ||
| import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} | ||
|
|
||
| /** | ||
|
|
@@ -153,17 +152,22 @@ private class ShuffleStatus( | |
| /** | ||
| * Mapping from a mapId to the mapIndex, this is required to reduce the searching overhead within | ||
| * the function updateMapOutput(mapId, bmAddress). | ||
| * | ||
| * Exposed for testing. | ||
| */ | ||
| private[this] val mapIdToMapIndex = new OpenHashMap[Long, Int]() | ||
| private[spark] val mapIdToMapIndex = new HashMap[Long, Int]() | ||
|
|
||
| /** | ||
| * Register a map output. If there is already a registered location for the map output then it | ||
| * will be replaced by the new location. | ||
| */ | ||
| def addMapOutput(mapIndex: Int, status: MapStatus): Unit = withWriteLock { | ||
| if (mapStatuses(mapIndex) == null) { | ||
| val currentMapStatus = mapStatuses(mapIndex) | ||
| if (currentMapStatus == null) { | ||
| _numAvailableMapOutputs += 1 | ||
| invalidateSerializedMapOutputStatusCache() | ||
| } else { | ||
| mapIdToMapIndex.remove(currentMapStatus.mapId) | ||
| } | ||
| mapStatuses(mapIndex) = status | ||
| mapIdToMapIndex(status.mapId) = mapIndex | ||
|
|
@@ -193,8 +197,8 @@ private class ShuffleStatus( | |
| mapStatus.updateLocation(bmAddress) | ||
| invalidateSerializedMapOutputStatusCache() | ||
| case None => | ||
| if (mapIndex.map(mapStatusesDeleted).exists(_.mapId == mapId)) { | ||
| val index = mapIndex.get | ||
| val index = mapStatusesDeleted.indexWhere(x => x != null && x.mapId == mapId) | ||
| if (index >= 0 && mapStatuses(index) == null) { | ||
| val mapStatus = mapStatusesDeleted(index) | ||
| mapStatus.updateLocation(bmAddress) | ||
| mapStatuses(index) = mapStatus | ||
|
|
@@ -222,9 +226,11 @@ private class ShuffleStatus( | |
| */ | ||
| def removeMapOutput(mapIndex: Int, bmAddress: BlockManagerId): Unit = withWriteLock { | ||
| logDebug(s"Removing existing map output ${mapIndex} ${bmAddress}") | ||
| if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == bmAddress) { | ||
| val currentMapStatus = mapStatuses(mapIndex) | ||
| if (currentMapStatus != null && currentMapStatus.location == bmAddress) { | ||
| _numAvailableMapOutputs -= 1 | ||
| mapStatusesDeleted(mapIndex) = mapStatuses(mapIndex) | ||
| mapIdToMapIndex.remove(currentMapStatus.mapId) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing it here will mean we cant query for it in We should move this cleanup to when Same applies to the cases below as well.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good point. But IIUC, |
||
| mapStatusesDeleted(mapIndex) = currentMapStatus | ||
| mapStatuses(mapIndex) = null | ||
| invalidateSerializedMapOutputStatusCache() | ||
| } | ||
|
|
@@ -290,9 +296,11 @@ private class ShuffleStatus( | |
| */ | ||
| def removeOutputsByFilter(f: BlockManagerId => Boolean): Unit = withWriteLock { | ||
| for (mapIndex <- mapStatuses.indices) { | ||
| if (mapStatuses(mapIndex) != null && f(mapStatuses(mapIndex).location)) { | ||
| val currentMapStatus = mapStatuses(mapIndex) | ||
| if (currentMapStatus != null && f(currentMapStatus.location)) { | ||
| _numAvailableMapOutputs -= 1 | ||
| mapStatusesDeleted(mapIndex) = mapStatuses(mapIndex) | ||
| mapIdToMapIndex.remove(currentMapStatus.mapId) | ||
| mapStatusesDeleted(mapIndex) = currentMapStatus | ||
| mapStatuses(mapIndex) = null | ||
| invalidateSerializedMapOutputStatusCache() | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: Why change to
HashMapfromOpenHashMap? (it is specialized forLongandInt)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for the above question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OpenHashMapdoesn't support remove operation.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes !