[SPARK-25299] Make UnsafeShuffleWriter use the new API#9
Conversation
91fea02 to
6f0c44b
Compare
c6cd4c7 to
3ee7d73
Compare
* But don't close the partition stream returned by the partition writer.
| ShufflePartitionWriter writer = null; | ||
| try { | ||
| writer = mapWriter.getNextPartitionWriter(); | ||
| writer.toStream(); |
There was a problem hiding this comment.
Without this the file does not get created and it fails UnsafeShuffleWriter.writeEmptyIterator
There was a problem hiding this comment.
When we commit all partitions, should we not check that we never created a writer; if we never did then we need to make the file?
There was a problem hiding this comment.
Agreed. I believe we talked about this in the first writer PR where the empty file was made by blockResolver.writeIndexFileAndCommit()?
There was a problem hiding this comment.
It seems that the test is expecting a file.... idk why, so maybe we should bring that create file method back?
There was a problem hiding this comment.
We've always expected a file, that's correct. The question is why do we have to call writer.toStream to ensure the file is created? I'm fairly certain the error is in how we wrote the plugin.
There was a problem hiding this comment.
Which is why, i think on commit we might need to create an empty file if a file does not exist. As we initially wrote but decided to take out
There was a problem hiding this comment.
We thought that blockIndexResolver.writeIndexFileAndCommit should create the file. Why is it not doing so in this case?
| final CountingOutputStream mergedFileOutputStream = new CountingOutputStream(bos); | ||
|
|
||
| boolean threwException = true; | ||
|
|
| FileChannel mergedFileOutputChannel = null; | ||
|
|
||
| boolean threwException = true; | ||
|
|
| writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); | ||
| } | ||
| } catch (IOException e) { | ||
| throw e; |
| outputFileChannel.close(); | ||
| } | ||
|
|
||
|
|
||
| Random rand = new Random(); | ||
| TaskContext$.MODULE$.setTaskContext(new TaskContextImpl( | ||
| 0, 0, 0, rand.nextInt(10000), 0, taskMemoryManager, new Properties(), null, taskMetrics)); |
There was a problem hiding this comment.
Any reason we can't use a fixed number instead of a random one?
yifeih
left a comment
There was a problem hiding this comment.
just a few questions and comments, other than that, looks good to me!
| ShufflePartitionWriter writer = null; | ||
| try { | ||
| writer = mapWriter.getNextPartitionWriter(); | ||
| writer.toStream(); |
There was a problem hiding this comment.
Agreed. I believe we talked about this in the first writer PR where the empty file was made by blockResolver.writeIndexFileAndCommit()?
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| flush(); |
There was a problem hiding this comment.
On one hand it's inconsequential because the contract is the writer above this never closes this stream - it leaves it up to the partition writer to close this.
On the other hand, this is to preserve an optimization we discovered in UnsafeShuffleWriter that encourages us to minimize the number of times we flush the buffer to the underlying file.
We can get correctness by just flushing the merged buffered file stream at the very end when we commit all the partitions.
| when(taskContext.taskMemoryManager()).thenReturn(taskMemoryManager); | ||
|
|
||
| Random rand = new Random(); | ||
| TaskContext$.MODULE$.setTaskContext(new TaskContextImpl( |
There was a problem hiding this comment.
I'm a little confused about this because there's already a taskContext object that's mocked. why can't you just set it to that?
| public void commitAllPartitions() throws IOException { | ||
| cleanUp(); | ||
| blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, outputTempFile); | ||
| if (!outputFile.exists()) { |
There was a problem hiding this comment.
Why isn't blockResolver#writeIndexFileAndCommit creating the file?
| final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled(); | ||
| final int numPartitions = partitioner.numPartitions(); | ||
| long[] partitionLengths = new long[numPartitions]; | ||
| logger.error(mapWriter.toString()); |
There was a problem hiding this comment.
meep, i left a log statement :(
| // output file would have already been counted as shuffle bytes written. | ||
| Files.move(spills[0].file, outputFile); | ||
| return spills[0].partitionLengths; | ||
| ShufflePartitionWriter writer = null; |
There was a problem hiding this comment.
Do we not need to commitAllPartitions here? I'd think we wouldn't need to create getNextPartitionWriter either.
|
Ok this looks fine. Please squash and merge into your branch and we'll cherry-pick to our fork. Thanks! |
Ported from bloomberg#9. Credits to @ifilonenko!
…on-k8s#536) Ported from #9. Credits to @ifilonenko!
Ported from bloomberg#9. Credits to @ifilonenko!
What changes were proposed in this pull request?
Make UnsafeShuffleWriter use the new API
Shuffle Writes [4/6]
How was this patch tested?
Compiled and unit tests