From 836608240a6fe1ef44cefc4c208e952fb3b0772c Mon Sep 17 00:00:00 2001 From: gaoyajun02 Date: Fri, 21 Oct 2022 20:15:56 +0800 Subject: [PATCH 1/5] [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size --- .../spark/storage/ShuffleBlockFetcherIterator.scala | 10 +++++++++- .../storage/ShuffleBlockFetcherIteratorSuite.scala | 13 +++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index b5f20522e91f6..cfb6bf636e4c4 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -798,7 +798,15 @@ final class ShuffleBlockFetcherIterator( // since the last call. val msg = s"Received a zero-size buffer for block $blockId from $address " + s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)" - throwFetchFailedException(blockId, mapIndex, address, new IOException(msg)) + if (blockId.isShuffleChunk) { + logWarning(msg) + pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address) + // Set result to null to trigger another iteration of the while loop to get either. + result = null + null + } else { + throwFetchFailedException(blockId, mapIndex, address, new IOException(msg)) + } } val in = try { diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index f8fe28c0512b7..64b6c93bf52cb 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -1814,4 +1814,17 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT intercept[FetchFailedException] { iterator.next() } } + test("SPARK-40872: fallback to original shuffle block when a push-merged shuffle chunk " + + "is zero-size") { + val blockManager = mock(classOf[BlockManager]) + val localDirs = Array("local-dir") + val blocksByAddress = prepareForFallbackToLocalBlocks( + blockManager, Map(SHUFFLE_MERGER_IDENTIFIER -> localDirs)) + val zeroSizeBuffer = createMockManagedBuffer(0) + doReturn(Seq({zeroSizeBuffer})).when(blockManager) + .getLocalMergedBlockData(ShuffleMergedBlockId(0, 0, 2), localDirs) + val iterator = createShuffleBlockIteratorWithDefaults(blocksByAddress, + blockManager = Some(blockManager), streamWrapperLimitSize = Some(100)) + verifyLocalBlocksFromFallback(iterator) + } } From 15e0bce1237ee6e667c95013072bf4b35af4bbf5 Mon Sep 17 00:00:00 2001 From: gaoyajun02 Date: Fri, 4 Nov 2022 17:07:23 +0800 Subject: [PATCH 2/5] update comment --- .../scala/org/apache/spark/storage/PushBasedFetchHelper.scala | 2 ++ .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 3 +++ 2 files changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala index dd81c860ba33d..8cc1b865207d5 100644 --- a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala +++ b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala @@ -285,6 +285,8 @@ private class PushBasedFetchHelper( * 2. There is a failure when fetching remote shuffle chunks. * 3. There is a failure when processing SuccessFetchResult which is for a shuffle chunk * (local or remote). + * 4. There is a zero-size buffer when processing SuccessFetchResult for a shuffle chunk + * (local or remote). */ def initiateFallbackFetchForPushMergedBlock( blockId: BlockId, diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index cfb6bf636e4c4..8193d584bf53e 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -799,6 +799,9 @@ final class ShuffleBlockFetcherIterator( val msg = s"Received a zero-size buffer for block $blockId from $address " + s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)" if (blockId.isShuffleChunk) { + // Zero-size block may come from nodes with hardware failures, For shuffle chunks, + // the original shuffle blocks that belong to that zero-size shuffle chunk is + // available and we can opt to fallback immediately. logWarning(msg) pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address) // Set result to null to trigger another iteration of the while loop to get either. From b992830386628d561bb1d221b73889de0ddf7f85 Mon Sep 17 00:00:00 2001 From: gaoyajun02 Date: Sat, 12 Nov 2022 20:09:04 +0800 Subject: [PATCH 3/5] fixup --- .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 8193d584bf53e..c234ec85ff1d5 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -806,7 +806,6 @@ final class ShuffleBlockFetcherIterator( pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address) // Set result to null to trigger another iteration of the while loop to get either. result = null - null } else { throwFetchFailedException(blockId, mapIndex, address, new IOException(msg)) } From c570799b12d5184e8ef6ae020e1b708199275806 Mon Sep 17 00:00:00 2001 From: gaoyajun02 Date: Fri, 18 Nov 2022 17:17:50 +0800 Subject: [PATCH 4/5] fixup --- .../storage/ShuffleBlockFetcherIterator.scala | 62 ++++++++++--------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index c234ec85ff1d5..d1e2e270711ad 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -782,7 +782,7 @@ final class ShuffleBlockFetcherIterator( logDebug("Number of requests in flight " + reqsInFlight) } - if (buf.size == 0) { + val in = if (buf.size == 0) { // We will never legitimately receive a zero-size block. All blocks with zero records // have zero size and all zero-size blocks have no records (and hence should never // have been requested in the first place). This statement relies on behaviors of the @@ -806,40 +806,44 @@ final class ShuffleBlockFetcherIterator( pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address) // Set result to null to trigger another iteration of the while loop to get either. result = null + null } else { throwFetchFailedException(blockId, mapIndex, address, new IOException(msg)) } - } - - val in = try { - val bufIn = buf.createInputStream() - if (checksumEnabled) { - val checksum = ShuffleChecksumHelper.getChecksumByAlgorithm(checksumAlgorithm) - checkedIn = new CheckedInputStream(bufIn, checksum) - checkedIn - } else { - bufIn - } - } catch { - // The exception could only be throwed by local shuffle block - case e: IOException => - assert(buf.isInstanceOf[FileSegmentManagedBuffer]) - e match { - case ce: ClosedByInterruptException => - logError("Failed to create input stream from local block, " + - ce.getMessage) - case e: IOException => logError("Failed to create input stream from local block", e) - } - buf.release() - if (blockId.isShuffleChunk) { - pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address) - // Set result to null to trigger another iteration of the while loop to get either. - result = null - null + } else { + try { + val bufIn = buf.createInputStream() + if (checksumEnabled) { + val checksum = ShuffleChecksumHelper.getChecksumByAlgorithm(checksumAlgorithm) + checkedIn = new CheckedInputStream(bufIn, checksum) + checkedIn } else { - throwFetchFailedException(blockId, mapIndex, address, e) + bufIn } + } catch { + // The exception could only be throwed by local shuffle block + case e: IOException => + assert(buf.isInstanceOf[FileSegmentManagedBuffer]) + e match { + case ce: ClosedByInterruptException => + logError("Failed to create input stream from local block, " + + ce.getMessage) + case e: IOException => logError( + "Failed to create input stream from local block", e) + } + buf.release() + if (blockId.isShuffleChunk) { + pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address) + // Set result to null to trigger another iteration of the while loop to get + // either. + result = null + null + } else { + throwFetchFailedException(blockId, mapIndex, address, e) + } + } } + if (in != null) { try { input = streamWrapper(blockId, in) From b58a8fc031b945a9261377f856d9ddbe49d54c7c Mon Sep 17 00:00:00 2001 From: gaoyajun02 Date: Fri, 18 Nov 2022 17:20:55 +0800 Subject: [PATCH 5/5] update --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index d1e2e270711ad..e35144756b591 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -828,8 +828,8 @@ final class ShuffleBlockFetcherIterator( case ce: ClosedByInterruptException => logError("Failed to create input stream from local block, " + ce.getMessage) - case e: IOException => logError( - "Failed to create input stream from local block", e) + case e: IOException => + logError("Failed to create input stream from local block", e) } buf.release() if (blockId.isShuffleChunk) {