-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23253][Core][Shuffle]Only write shuffle temporary index file when there is not an existing one #20422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Is this just trying to reuse a file that should have been cleaned up after prior failure? If so is that possible as a more direct solution? I wonder if there aren't corner cases here where the file exists and it is still being written to by another process. This could result in corruption. But I am not familiar with this mechanism |
|
I think it is necessary to add unit test to verify the changes. |
|
Test build #86768 has finished for PR 20422 at commit
|
|
thanks for taking a look at this @yaooqinn . To clarify -- there is no bug you are trying to fix here, is there? Its just an optimization? From a quick glance I think the change seems correct ... but also seems like such a minor improvement that I'm not sure I see the value in changing this. |
|
thanks guys for reviewing. yes, this is just a minor improvement which I guess code here seem not very logical when I was trying to do some optimizations for my customer's heavy shuffle case. If it's too trivial, I can close it. |
|
I agree with @squito , unless there's a bug in it, it is risky and unnecessary to change the logic in this critical path. |
|
Jenkins, ok to test |
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually think this is OK, though I'm still somewhat wary of changing this code for minor gains.
It is already tested in IndexShuffleBlockResolverSuite. Btw, I noticed that the comment here is wrong:
It should say "the dataFile should be the new one, since we deleted the dataFile from the first attempt".
It would also be good to expand that test to actually read the index file and make sure it has the right values.
| } | ||
| indexTmp.delete() | ||
| } else { | ||
| val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this below the comment "This is the first successul attempt".
I'd also include a comment about why we write to a temporary file, even though we're always going to rename (because in case the task dies somehow, we'd prefer to not leave a half-written index file in the final location).
|
Test build #86792 has finished for PR 20422 at commit
|
|
Test build #86806 has finished for PR 20422 at commit
|
|
retest this please |
|
Test build #86831 has finished for PR 20422 at commit
|
|
Test build #86858 has finished for PR 20422 at commit
|
|
@squito add a test for index file. plz check it again, thanks. |
|
@jerryshao are you ok with making this change? I think our original comments corssed paths as I was taking a closer look |
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the update @yaooqinn . i left some style comments, but also waiting to see @jerryshao 's opinion. Sorry for the back and forth, in any case its always useful to have more people looking at this code critically.
| val idxName = s"shuffle_${shuffleId}_${mapId}_0.index" | ||
| val resolver = new IndexShuffleBlockResolver(conf, blockManager) | ||
|
|
||
| val lengths = (1 to 2).map(_ => 8L).toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could do Array.fill(2)(8L)
| assert(firstByte2(0) === 2) | ||
| } | ||
|
|
||
| test("SPARK-23253: index files should be created properly") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for adding this, but actually I'm not sure this is covering any cases in the previous test, is it? I was thinking of just adding something to read the actual index file, and make sure it had the right values to go with the update to the data file (or no updates in some cases).
you may have added a couple more asserts than the original test -- if so, maybe they can just be added to the original?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 we can add check the index file in the original test case.
| @@ -123,7 +123,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa | |||
| assert(dataFile.length() === 35) | |||
| assert(!dataTmp2.exists()) | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to the change, but this should be assert(!dataTmp3.exists())
| package org.apache.spark.shuffle.sort | ||
|
|
||
| import java.io.{File, FileInputStream, FileOutputStream} | ||
| import java.io._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this change is not necessary
| assert(firstByte(0) === 0) | ||
|
|
||
| // The index file should not change | ||
| val secondBytes = new Array[Byte](8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should have a better name, perhaps indexSecondValue ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure whether we should change this, just my two cents.
|
Test build #86905 has finished for PR 20422 at commit
|
|
Test build #86906 has finished for PR 20422 at commit
|
|
Test build #86907 has finished for PR 20422 at commit
|
jerryshao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
| } { | ||
| indexIn.close() | ||
| } | ||
| assert(secondValueOffset(7) === 10, "The index file should not change") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: here and below, would be more clear if you use DataInputStream.readLong() (no magic 7 offset, and you check the rest of the bytes):
val indexIn = new DataInputStream( newFileInputStream(indexFile))
Utils.tryWithSafeFinally {
indexIn.readLong() // first offset is always 0
assert(10 === indexIn.readLong(),"The index file should not change")
}|
LGTM |
|
Test build #86958 has finished for PR 20422 at commit
|
|
Test build #86959 has finished for PR 20422 at commit
|
|
merged to master. thanks @yaooqinn for doing and updating the tests too |
What changes were proposed in this pull request?
Shuffle Index temporay file is used for atomic creating shuffle index file, it is not needed when the index file already exists after another attempts of same task had it done.
How was this patch tested?
exitsting ut
cc @squito