[SPARK-20014] Optimize mergeSpillsWithFileStream method#17343
[SPARK-20014] Optimize mergeSpillsWithFileStream method#17343sitalkedia wants to merge 3 commits intoapache:masterfrom
Conversation
e9ac76e to
1834db6
Compare
|
Test build #74798 has finished for PR 17343 at commit
|
|
Test build #74800 has finished for PR 17343 at commit
|
|
Test build #74802 has finished for PR 17343 at commit
|
|
Test build #74805 has finished for PR 17343 at commit
|
|
If we make flush() noop, then buffered (uncommitted) data wont be written to the stream; am I missing something here, or is this change broken ? |
|
Background - you need to do a flush() to ensure the indices generated are valid. |
|
Ah, looks like I missed that CountingOutputStream was introduced after BOS and not before. |
|
Can you add some documentation inline so in the future we'd know why specific implementations were chosen? |
|
@rxin - Updated documentation. |
5fe279e to
06c1909
Compare
|
Test build #74893 has finished for PR 17343 at commit
|
|
Test build #74897 has finished for PR 17343 at commit
|
|
LGTM will wait a bit to allow for others to comment. |
|
ping @zsxwing. |
sameeragarwal
left a comment
There was a problem hiding this comment.
Thanks @sitalkedia, the optimization looks solid. I've some extremely minor stylistic comments (unfortunately spark's style-checker doesn't work on .java files so many of these errors weren't caught automatically). Additionally, just to make sure, is this code covered under existing tests?
| import java.nio.channels.FileChannel; | ||
| import java.util.Iterator; | ||
|
|
||
| import org.apache.spark.io.NioBufferedFileInputStream; |
|
|
||
| private class CloseAndFlushShieldOutputStream extends CloseShieldOutputStream { | ||
|
|
||
| public CloseAndFlushShieldOutputStream(OutputStream outputStream) { |
|
|
||
| final OutputStream bos = new BufferedOutputStream( | ||
| new FileOutputStream(outputFile), | ||
| (int) sparkConf.getSizeAsKb("spark.shuffle.unsafe.file.output.buffer", "32k") * 1024); |
There was a problem hiding this comment.
Is there a reason to introduce an extra config? Can we not use spark.shuffle.file.buffer here?
There was a problem hiding this comment.
@sameeragarwal - Thanks for taking a look. Tha rational behind having a separate config for write buffer is that it is useful to have a larger write buffer than the read buffer, because for jobs spilling a large amount of data to disk might create multiple spill files on disk. So we will have multiple read buffer but only one write buffer. Having a larger write buffer allows us to do the merge all in memory without hitting the disk frequently for writes. We have observed this config helps speed up our large jobs significantly.
There was a problem hiding this comment.
nit: please create a field to store it rather than parsing the conf for each call.
There was a problem hiding this comment.
Hmm.. I am not sure if I get it. The function mergeSpillsWithFileStream will be called only once per task?
| /** | ||
| * Merges spill files using Java FileStreams. This code path is slower than the NIO-based merge, | ||
| * {@link UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[], File)}, so it's only used in | ||
| * Merges spill files using Java FileStreams. This code path is typically slower than the NIO-based merge, |
There was a problem hiding this comment.
nit: some of these lines are great than 100ch (in comments and code). Can you please fix those?
| for (int i = 0; i < spills.length; i++) { | ||
| spillInputStreams[i] = new FileInputStream(spills[i].file); | ||
| spillInputStreams[i] = new NioBufferedFileInputStream(spills[i].file, | ||
| (int) sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024); |
There was a problem hiding this comment.
nit: the formatting seems a bit off
|
Thanks @sameeragarwal, addressed the check style issues. Yes, the exisiting unit tests in UnsafeShuffleWriter#mergeSpillsWithTransferToAndLZF covers this code. |
|
Test build #77382 has finished for PR 17343 at commit
|
|
cc @zsxwing Could you find some time to review this? |
|
LGTM |
zsxwing
left a comment
There was a problem hiding this comment.
Looks pretty good except some nits.
|
|
||
| final OutputStream bos = new BufferedOutputStream( | ||
| new FileOutputStream(outputFile), | ||
| (int) sparkConf.getSizeAsKb("spark.shuffle.unsafe.file.output.buffer", "32k") * 1024); |
There was a problem hiding this comment.
nit: please create a field to store it rather than parsing the conf for each call.
| spillInputStreams[i] = new FileInputStream(spills[i].file); | ||
| spillInputStreams[i] = new NioBufferedFileInputStream( | ||
| spills[i].file, | ||
| (int) sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024); |
There was a problem hiding this comment.
nit: please create a field to store it rather than parsing the conf inside the loop.
|
LGTM pending tests. |
|
Test build #77430 has finished for PR 17343 at commit
|
|
LGTM. Thanks! Merging to master. |
What changes were proposed in this pull request?
When the individual partition size in a spill is small, mergeSpillsWithTransferTo method does many small disk ios which is really inefficient. One way to improve the performance will be to use mergeSpillsWithFileStream method by turning off transfer to and using buffered file read/write to improve the io throughput.
However, the current implementation of mergeSpillsWithFileStream does not do a buffer read/write of the files and in addition to that it unnecessarily flushes the output files for each partitions.
How was this patch tested?
Tested this change by running a job on the cluster and the map stage run time was reduced by around 20%.