-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20868][CORE] UnsafeShuffleWriter should verify the position after FileChannel.transferTo #18091
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20868][CORE] UnsafeShuffleWriter should verify the position after FileChannel.transferTo #18091
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -425,14 +425,8 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th | |
| long bytesToTransfer = partitionLengthInSpill; | ||
| final FileChannel spillInputChannel = spillInputChannels[i]; | ||
| final long writeStartTime = System.nanoTime(); | ||
| while (bytesToTransfer > 0) { | ||
| final long actualBytesTransferred = spillInputChannel.transferTo( | ||
| spillInputChannelPositions[i], | ||
| bytesToTransfer, | ||
| mergedFileOutputChannel); | ||
| spillInputChannelPositions[i] += actualBytesTransferred; | ||
| bytesToTransfer -= actualBytesTransferred; | ||
| } | ||
| Utils.copyFileStreamNIO(spillInputChannel, mergedFileOutputChannel, bytesToTransfer); | ||
|
||
| spillInputChannelPositions[i] += bytesToTransfer; | ||
| writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); | ||
| bytesWrittenToMergedFile += partitionLengthInSpill; | ||
| partitionLengths[partition] += partitionLengthInSpill; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,7 @@ import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, ThreadInf | |
| import java.math.{MathContext, RoundingMode} | ||
| import java.net._ | ||
| import java.nio.ByteBuffer | ||
| import java.nio.channels.Channels | ||
| import java.nio.channels.{Channels, FileChannel} | ||
| import java.nio.charset.StandardCharsets | ||
| import java.nio.file.{Files, Paths} | ||
| import java.util.{Locale, Properties, Random, UUID} | ||
|
|
@@ -60,7 +60,6 @@ import org.apache.spark.internal.Logging | |
| import org.apache.spark.internal.config._ | ||
| import org.apache.spark.network.util.JavaUtils | ||
| import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} | ||
| import org.apache.spark.util.logging.RollingFileAppender | ||
|
|
||
| /** CallSite represents a place in user code. It can have a short and a long form. */ | ||
| private[spark] case class CallSite(shortForm: String, longForm: String) | ||
|
|
@@ -319,41 +318,22 @@ private[spark] object Utils extends Logging { | |
| * copying is disabled by default unless explicitly set transferToEnabled as true, | ||
| * the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false]. | ||
| */ | ||
| def copyStream(in: InputStream, | ||
| out: OutputStream, | ||
| closeStreams: Boolean = false, | ||
| transferToEnabled: Boolean = false): Long = | ||
| { | ||
| var count = 0L | ||
| def copyStream( | ||
| in: InputStream, | ||
| out: OutputStream, | ||
| closeStreams: Boolean = false, | ||
| transferToEnabled: Boolean = false): Long = { | ||
| tryWithSafeFinally { | ||
| if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream] | ||
| && transferToEnabled) { | ||
| // When both streams are File stream, use transferTo to improve copy performance. | ||
| val inChannel = in.asInstanceOf[FileInputStream].getChannel() | ||
| val outChannel = out.asInstanceOf[FileOutputStream].getChannel() | ||
| val initialPos = outChannel.position() | ||
| val size = inChannel.size() | ||
|
|
||
| // In case transferTo method transferred less data than we have required. | ||
| while (count < size) { | ||
| count += inChannel.transferTo(count, size - count, outChannel) | ||
| } | ||
|
|
||
| // Check the position after transferTo loop to see if it is in the right position and | ||
| // give user information if not. | ||
| // Position will not be increased to the expected length after calling transferTo in | ||
| // kernel version 2.6.32, this issue can be seen in | ||
| // https://bugs.openjdk.java.net/browse/JDK-7052359 | ||
| // This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948). | ||
| val finalPos = outChannel.position() | ||
| assert(finalPos == initialPos + size, | ||
| s""" | ||
| |Current position $finalPos do not equal to expected position ${initialPos + size} | ||
| |after transferTo, please check your kernel version to see if it is 2.6.32, | ||
| |this is a kernel bug which will lead to unexpected behavior when using transferTo. | ||
| |You can set spark.file.transferTo = false to disable this NIO feature. | ||
| """.stripMargin) | ||
| copyFileStreamNIO(inChannel, outChannel, size) | ||
| size | ||
| } else { | ||
| var count = 0L | ||
| val buf = new Array[Byte](8192) | ||
| var n = 0 | ||
| while (n != -1) { | ||
|
|
@@ -363,8 +343,8 @@ private[spark] object Utils extends Logging { | |
| count += n | ||
| } | ||
| } | ||
| count | ||
| } | ||
| count | ||
| } { | ||
| if (closeStreams) { | ||
| try { | ||
|
|
@@ -376,6 +356,31 @@ private[spark] object Utils extends Logging { | |
| } | ||
| } | ||
|
|
||
| def copyFileStreamNIO(input: FileChannel, output: FileChannel, bytesToCopy: Long): Unit = { | ||
| val initialPos = output.position() | ||
| var count = 0L | ||
| // In case transferTo method transferred less data than we have required. | ||
| while (count < bytesToCopy) { | ||
| count += input.transferTo(count, bytesToCopy - count, output) | ||
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add an |
||
| // Check the position after transferTo loop to see if it is in the right position and | ||
| // give user information if not. | ||
| // Position will not be increased to the expected length after calling transferTo in | ||
| // kernel version 2.6.32, this issue can be seen in | ||
| // https://bugs.openjdk.java.net/browse/JDK-7052359 | ||
| // This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948). | ||
| val finalPos = output.position() | ||
| val expectedPos = initialPos + bytesToCopy | ||
| assert(finalPos == expectedPos, | ||
| s""" | ||
| |Current position $finalPos do not equal to expected position $expectedPos | ||
| |after transferTo, please check your kernel version to see if it is 2.6.32, | ||
| |this is a kernel bug which will lead to unexpected behavior when using transferTo. | ||
| |You can set spark.file.transferTo = false to disable this NIO feature. | ||
| """.stripMargin) | ||
| } | ||
|
|
||
| /** | ||
| * Construct a URI container information used for authentication. | ||
| * This also sets the default authenticator to properly negotiation the | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: we don't need this
bytesToTransferanymore. We can remove this mutable variable and just usepartitionLengthInSpillin its place.