-
Notifications
You must be signed in to change notification settings - Fork 29k
[WIP][SPARK-20629][CORE] Copy shuffle data when nodes are being shutdown #28331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
4126c1b
8ee8949
afb1b1a
4071ae2
ff620ba
adb03db
be2a5e7
783114b
dbe2418
a240f98
ef8fcc5
838a346
e85c8ef
2da0f2d
9d31746
38ff8be
13ec43a
a92025c
fe265d7
70c3871
069dd3b
6340f9b
4cb0458
4cfeb8e
e81aa5a
841d443
ba20ec0
17a6a3f
155aeb2
7f93df6
7e32341
a3aa8eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -18,15 +18,18 @@ | |||
| package org.apache.spark.shuffle | ||||
|
|
||||
| import java.io._ | ||||
| import java.nio.ByteBuffer | ||||
| import java.nio.channels.Channels | ||||
| import java.nio.file.Files | ||||
|
|
||||
| import org.apache.spark.{SparkConf, SparkEnv} | ||||
| import org.apache.spark.internal.Logging | ||||
| import org.apache.spark.io.NioBufferedFileInputStream | ||||
| import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} | ||||
| import org.apache.spark.network.client.StreamCallbackWithID | ||||
| import org.apache.spark.network.netty.SparkTransportConf | ||||
| import org.apache.spark.network.shuffle.ExecutorDiskUtils | ||||
| import org.apache.spark.serializer.SerializerManager | ||||
| import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID | ||||
| import org.apache.spark.storage._ | ||||
| import org.apache.spark.util.Utils | ||||
|
|
@@ -55,6 +58,25 @@ private[spark] class IndexShuffleBlockResolver( | |||
|
|
||||
| def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, mapId, None) | ||||
|
|
||||
| /** | ||||
| * Get the shuffle files that are stored locally. Used for block migrations. | ||||
| */ | ||||
| def getStoredShuffles(): Set[(Int, Long)] = { | ||||
| // Matches ShuffleIndexBlockId name | ||||
| val pattern = "shuffle_(\\d+)_(\\d+)_.+\\.index".r | ||||
| val rootDirs = blockManager.diskBlockManager.localDirs | ||||
| // ExecutorDiskUtil puts things inside one level hashed sub directories | ||||
| val searchDirs = rootDirs.flatMap(_.listFiles()).filter(_.isDirectory()) ++ rootDirs | ||||
| val filenames = searchDirs.flatMap(_.list()) | ||||
| logDebug(s"Got block files ${filenames.toList}") | ||||
| filenames.flatMap{ fname => | ||||
| pattern.findAllIn(fname).matchData.map { | ||||
| matched => (matched.group(1).toInt, matched.group(2).toLong) | ||||
| } | ||||
| }.toSet | ||||
| } | ||||
|
|
||||
|
|
||||
| /** | ||||
| * Get the shuffle data file. | ||||
| * | ||||
|
|
@@ -148,6 +170,86 @@ private[spark] class IndexShuffleBlockResolver( | |||
| } | ||||
| } | ||||
|
|
||||
| /** | ||||
| * Write a provided shuffle block as a stream. Used for block migrations. | ||||
| * ShuffleBlockBatchIds must contain the full range represented in the ShuffleIndexBlock. | ||||
| * Requires the caller to delete any shuffle index blocks where the shuffle block fails to | ||||
| * put. | ||||
| */ | ||||
| def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager): | ||||
| StreamCallbackWithID = { | ||||
| val file = blockId match { | ||||
| case ShuffleIndexBlockId(shuffleId, mapId, _) => | ||||
| getIndexFile(shuffleId, mapId) | ||||
| case ShuffleBlockBatchId(shuffleId, mapId, _, _) => | ||||
|
||||
| assert(numLocs > 0, s"No shuffle block updates in ${blocksUpdated}") |
The shuffle data file probably was still served from the decommissioned executor otherwise the recalculation would be detected by the number of successful tasks/accumulator.
Please consider covering this case by introducing a new boolean flag to control this setting and check at least one shuffle migration test along with this setting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll make a more rigorious test that fails as is and then I'll fix the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And thanks for the suggestion, very useful :)
holdenk marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the file exists, does it mean there is index/data file with same shuffle id and map id? When it could happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this should never happen, I'm not sure though let me do some thinking on that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this mirrors the logic inside of writeIndexFileAndCommit, the matching check there was introduced in SPARK-17547
which I believe is for the situation where an exception occurred during a previous write and the filesystem is in a dirty state. So I think we should keep it to be safe.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use NOOP_REDUCE_ID instead of 0
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use NOOP_REDUCE_ID instead of 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think private[spark] as well