-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled #28911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
0815e75
dc6e7b8
b1c6ac0
64037cf
285fd70
904335a
9e3c875
5e12b68
3960b44
27c5f5c
8a5811f
05598f8
23181b6
eb6eb9a
5fdf1bc
d192db8
db2500a
5c767da
0d25492
f6cfcbc
0a144e3
b44f1cb
342ae60
613ffab
cd3e30d
da8d78a
7d9036a
0372bd8
17f1b60
530d63c
b14d611
3f7ea0b
2aa71f6
054bf69
5fbd6bb
8c5fdb3
9aa6974
09665e2
6d21906
5ea8f24
57f3e1d
d70e757
6b97be5
5e98eca
8f16c17
7c38190
a35807b
a23ab17
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -1415,10 +1415,11 @@ package object config { | |||
|
|
||||
| private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = | ||||
| ConfigBuilder("spark.shuffle.readHostLocalDisk") | ||||
| .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and external " + | ||||
| s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " + | ||||
| "blocks requested from those block managers which are running on the same host are read " + | ||||
| "from the disk directly instead of being fetched as remote blocks over the network.") | ||||
| .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled, shuffle " + | ||||
| "blocks requested from those block managers which are running on the same host are " + | ||||
| "read from the disk directly instead of being fetched as remote blocks over the " + | ||||
| "network. Note that for k8s workloads, this only works when nodes are using " + | ||||
| "non-isolated container storage.") | ||||
|
||||
| address.host == blockManager.blockManagerId.host) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you four your clarification. If that's the truth, I think we should point it out at doc. cc @dongjoon-hyun @holdenk
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -113,6 +113,15 @@ class NettyBlockRpcServer( | |
| s"when there is not sufficient space available to store the block.") | ||
| responseContext.onFailure(exception) | ||
| } | ||
|
|
||
| case getLocalDirs: GetLocalDirsForExecutors => | ||
| assert(getLocalDirs.appId == appId) | ||
| assert(getLocalDirs.execIds.length == 1) | ||
|
||
| val execId = getLocalDirs.execIds.head | ||
| val dirs = blockManager.getLocalDiskDirs | ||
| responseContext | ||
| .onSuccess(new LocalDirsForExecutors(Map(execId -> dirs).asJava).toByteBuffer) | ||
|
|
||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.