-
Notifications
You must be signed in to change notification settings - Fork 51
[SPARK-25299] Use the shuffle writer plugin for the SortShuffleWriter. #532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Because compressed output streams don't like it.
| private def writeEmptyPartition(mapOutputWriter: ShuffleMapOutputWriter): Unit = { | ||
| var partitionWriter: ShufflePartitionWriter = null | ||
| try { | ||
| partitionWriter = mapOutputWriter.getNextPartitionWriter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you need to do partitonWriter.toStream(), as in UnsafeShuffleWriter, to ensure that the outputFileStream is created and an empty file exists. It seems be expected by the UnsafeShuffleWriterSuite, idk if it is the same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That shouldn't be necessary, the writer.close() should properly know what to do if a stream was never created.
| // The iterator may have stopped short of opening a writer for every partition. So fill in the | ||
| // remaining empty partitions. | ||
| for (emptyPartition <- nextPartitionId until numPartitions) { | ||
| writeEmptyPartition(mapOutputWriter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm wait why do we need this? Shouldn't new long[numPartitions] fill the array with default value of 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on the contract we want to present to other plugin writers. I.e. do we make a contract that we open a writer for strictly every partition, even empty ones? Or do we say we open for the first N partitions where N is the last non-empty partition? My take is that we should have the contract that we always open a writer for every partition, empty or not, from 0 through numPartitions - 1. But, again, this shows the limitation of presenting an API that doesn't include the partition identifier explicitly when getting partition writers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh i see hmm yea I'm ok keeping it like this then. It does show more consistency for plugin implementers
core/src/main/scala/org/apache/spark/util/collection/ShufflePartitionPairsWriter.scala
Outdated
Show resolved
Hide resolved
|
looks good to me! |
|
Ok I'm going to merge this - we have some incoming suggestions for the shuffle writer API but we should get the implementations with the existing APIs merged first. Thanks everyone who reviewed! |
svc-spark-25299
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
================================================================================================
BlockStoreShuffleReader reader
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
no aggregation or sorting: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
local fetch 11006 11081 123 0.9 1100.6 1.0X
remote rpc fetch 10901 10977 48 0.9 1090.1 1.0X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
with aggregation: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
local fetch 28171 28412 217 0.1 14085.4 1.0X
remote rpc fetch 28239 28689 249 0.1 14119.5 1.0X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
with sorting: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
local fetch 30290 30609 280 0.1 15145.2 1.0X
remote rpc fetch 30122 30697 369 0.1 15060.9 1.0X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
with seek: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
seek to last record 1 1 1 2797.3 0.4 1.0X
================================================================================================
BypassMergeSortShuffleWriter write
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
BypassMergeSortShuffleWrite without spill: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
small dataset without disk spill 2 4 2 0.5 2030.5 1.0X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
BypassMergeSortShuffleWrite with spill: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
without transferTo 7244 7359 105 0.9 1079.4 1.0X
with transferTo 7252 7311 64 0.9 1080.6 1.0X
================================================================================================
SortShuffleWriter writer
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
SortShuffleWriter without spills: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
small dataset without spills 11 17 5 0.1 10518.0 1.0X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
SortShuffleWriter with spills: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
no map side combine 14347 14510 129 0.5 2137.9 1.0X
with map side aggregation 14200 14277 59 0.5 2115.9 1.0X
with map side sort 14162 14267 82 0.5 2110.3 1.0X
================================================================================================
UnsafeShuffleWriter write
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
UnsafeShuffleWriter without spills: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
small dataset without spills 20 24 5 0.1 19894.4 1.0X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Linux 4.15.0-1014-gcp
Intel(R) Xeon(R) CPU @ 2.30GHz
UnsafeShuffleWriter with spills: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
without transferTo 15869 15965 81 0.8 1182.3 1.0X
with transferTo 15893 16023 144 0.8 1184.1 1.0X
#532) * [SPARK-25299] Use the shuffle writer plugin for the SortShuffleWriter. * Remove unused * Handle empty partitions properly. * Adjust formatting * Don't close streams twice. Because compressed output streams don't like it. * Clarify comment
No description provided.