Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ private class PushBasedFetchHelper(
* 2. There is a failure when fetching remote shuffle chunks.
* 3. There is a failure when processing SuccessFetchResult which is for a shuffle chunk
* (local or remote).
* 4. There is a zero-size buffer when processing SuccessFetchResult for a shuffle chunk
* (local or remote).
*/
def initiateFallbackFetchForPushMergedBlock(
blockId: BlockId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ final class ShuffleBlockFetcherIterator(
logDebug("Number of requests in flight " + reqsInFlight)
}

if (buf.size == 0) {
val in = if (buf.size == 0) {
// We will never legitimately receive a zero-size block. All blocks with zero records
// have zero size and all zero-size blocks have no records (and hence should never
// have been requested in the first place). This statement relies on behaviors of the
Expand All @@ -798,38 +798,52 @@ final class ShuffleBlockFetcherIterator(
// since the last call.
val msg = s"Received a zero-size buffer for block $blockId from $address " +
s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
}

val in = try {
val bufIn = buf.createInputStream()
if (checksumEnabled) {
val checksum = ShuffleChecksumHelper.getChecksumByAlgorithm(checksumAlgorithm)
checkedIn = new CheckedInputStream(bufIn, checksum)
checkedIn
if (blockId.isShuffleChunk) {
// Zero-size block may come from nodes with hardware failures, For shuffle chunks,
// the original shuffle blocks that belong to that zero-size shuffle chunk is
// available and we can opt to fallback immediately.
logWarning(msg)
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
// Set result to null to trigger another iteration of the while loop to get either.
result = null
null
} else {
bufIn
throwFetchFailedException(blockId, mapIndex, address, new IOException(msg))
}
} catch {
// The exception could only be throwed by local shuffle block
case e: IOException =>
assert(buf.isInstanceOf[FileSegmentManagedBuffer])
e match {
case ce: ClosedByInterruptException =>
logError("Failed to create input stream from local block, " +
ce.getMessage)
case e: IOException => logError("Failed to create input stream from local block", e)
}
buf.release()
if (blockId.isShuffleChunk) {
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
// Set result to null to trigger another iteration of the while loop to get either.
result = null
null
} else {
try {
val bufIn = buf.createInputStream()
if (checksumEnabled) {
val checksum = ShuffleChecksumHelper.getChecksumByAlgorithm(checksumAlgorithm)
checkedIn = new CheckedInputStream(bufIn, checksum)
checkedIn
} else {
throwFetchFailedException(blockId, mapIndex, address, e)
bufIn
}
} catch {
// The exception could only be throwed by local shuffle block
case e: IOException =>
assert(buf.isInstanceOf[FileSegmentManagedBuffer])
e match {
case ce: ClosedByInterruptException =>
logError("Failed to create input stream from local block, " +
ce.getMessage)
case e: IOException =>
logError("Failed to create input stream from local block", e)
}
buf.release()
if (blockId.isShuffleChunk) {
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
// Set result to null to trigger another iteration of the while loop to get
// either.
result = null
null
} else {
throwFetchFailedException(blockId, mapIndex, address, e)
}
}
}

if (in != null) {
try {
input = streamWrapper(blockId, in)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1814,4 +1814,17 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
intercept[FetchFailedException] { iterator.next() }
}

test("SPARK-40872: fallback to original shuffle block when a push-merged shuffle chunk " +
"is zero-size") {
val blockManager = mock(classOf[BlockManager])
val localDirs = Array("local-dir")
val blocksByAddress = prepareForFallbackToLocalBlocks(
blockManager, Map(SHUFFLE_MERGER_IDENTIFIER -> localDirs))
val zeroSizeBuffer = createMockManagedBuffer(0)
doReturn(Seq({zeroSizeBuffer})).when(blockManager)
.getLocalMergedBlockData(ShuffleMergedBlockId(0, 0, 2), localDirs)
val iterator = createShuffleBlockIteratorWithDefaults(blocksByAddress,
blockManager = Some(blockManager), streamWrapperLimitSize = Some(100))
verifyLocalBlocksFromFallback(iterator)
}
}