-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-34541][CORE] Fixed an issue where data could not be cleaned up when unregisterShuffle. #31664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
gentle ping @srowen @otterc @HyukjinKwon , thanks for taking a look. |
| import org.apache.spark.rdd.ShuffledRDD | ||
| import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer} | ||
| import org.apache.spark.util.Utils | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: redundant empty line.
|
|
||
| test("SPARK-34541 Data could not be cleaned up when unregisterShuffle") { | ||
| val conf = new SparkConf(loadDefaults = false) | ||
| val tempDir: File = Utils.createTempDir() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use withTempDir?
| assert (!file.exists(), s"Shuffle file $file was not cleaned up") | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary line
| ))) | ||
| } | ||
|
|
||
| test("SPARK-34541 Data could not be cleaned up when unregisterShuffle") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: SPARK-34541:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
… when unregisterShuffle.
… when unregisterShuffle.
… when unregisterShuffle.
|
Jenkins retest this please |
|
Test build #135545 has finished for PR 31664 at commit
|
| ))) | ||
| } | ||
|
|
||
| test("Data could not be cleaned up when unregisterShuffle") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: SPARK-34541: Data ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, this test requires spark.shuffle.useOldFetchProtocol=true?
And it's better to test both true and false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that context.taskAttemptId and partitionId are the same, both increasing from 0. I don't understand why the protocol should be differentiated on the write side.
I met this problem before, but now the scene has not been recovered
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can run a simple job before our target job to make the taskAttemptId starts from 1. e.g.,
sc.parallelize(1 to 10, 1).count().
I tried this way and the issue can be reproduced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, thank you. I'll add test later, And I don't understand why the protocol should be differentiated on the WriteSide, As follow:
ShuffleMapTask#runTask
// While we use the old shuffle fetch protocol, we use partitionId as mapId in the ShuffleBlockId construction. val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { partitionId } else context.taskAttemptId()
In readSide, we need to use protoco to distinguish messages, But in writeSide, register to ExternalShuffleService by RegisterExecutor , It paas the localDir to shuffleService, So shuffleService know the middle file by shuffle, But seems unrelated to mapId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I roughly remember that's because we want to ensure the unique file name at write size. cc @xuanyuanking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at write size -> at write side? :)
Yes, you can check the description in #25620. TL;DR: we need a unique file name to resolve the indeterminate shuffle issue.
… when unregisterShuffle.
| ))) | ||
| } | ||
|
|
||
| test("Shuffle data can be cleaned up whether spark.shuffle.useOldFetchProtocol=true/false") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually test different config values like this:
Seq(true, false).foreach { value =>
test(s"SPARK-34541: shuffle data can be cleaned up whether spark.shuffle.useOldFetchProtocol=$value") {
...
conf.set(spark.shuffle.useOldFetchProtocol, value)
...
}Could you follow this way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, Thanks for guiding codeStyle very much, xiexie~
… when unregisterShuffle.
| } | ||
|
|
||
| Seq("true", "false").foreach { value => | ||
| test(s"SPARK-34541: shuffle can be removed when spark.shuffle.useOldFetchProtocol=$value") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this possible to add in ShuffleSuite. If so, we don't need to consider the spark.shuffle.useOldFetchProtocol since ShuffleOldFetchProtocolSuite would cover the old protocol case.
If it's hard to move it to ShuffleSuite. I'm also OK with the current code.
xuanyuanking
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix! LGTM
… when unregisterShuffle.
|
Jenkins retest this please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #135762 has finished for PR 31664 at commit
|
gentle ping @srowen . |
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 OK with you?
Ngone51
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen Thanks for the ping.
Overall, the change still looks good to me. I have left another two minor comments. Let's address them and trigger another round test to pass k8s tests.
| manager.unregisterShuffle(0) | ||
| } | ||
|
|
||
| test(s"SPARK-34541: shuffle can be removed when spark.shuffle.useOldFetchProtocol=true") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: SPARK-34541: shuffle files should be removed normally
(spark.shuffle.useOldFetchProtocol=true is not valid for ShuffleSuite.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ShuffleSuite is abstract class, ShuffleOldFetchProtocolSuite would cover the spark.shuffle.useOldFetchProtocol=true case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, the test name is not correct for other extended shuffle suites. For other suites, spark.shuffle.useOldFetchProtocol should be false, right?
| mapSideCombine = true | ||
| ))) | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: revert the unrelated change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line-133 is empty line, we add test in SortShuffleManagerSuite before, after #31664 (comment), move the test to ShuffleSuite, So i change the empty line together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know the reason, but it's still not a necessary change, especially when there's no other changes in the file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok,i will revert it.
|
LGTM |
|
Jenkins retest this please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #135848 has finished for PR 31664 at commit
|
|
Merged to master |
What changes were proposed in this pull request?
Fixed an issue where data could not be cleaned up when unregisterShuffle.
Why are the changes needed?
While we use the old shuffle fetch protocol, we use partitionId as mapId in the ShuffleBlockId construction,but we use
context.taskAttemptId()as mapId that it is cached intaskIdMapsForShufflewhen wegetWriter[K, V].where data could not be cleaned up when unregisterShuffle ,because we remove a shuffle's metadata from the
taskIdMapsForShuffle's mapIds, the mapId iscontext.taskAttemptId()instead of partitionId.Does this PR introduce any user-facing change?
yes
How was this patch tested?
add new test.