Skip to content

Commit 1826372

Browse files
rxinpwendell
authored andcommitted
[SPARK-4085] Propagate FetchFailedException when Spark fails to read local shuffle file.
cc aarondav kayousterhout pwendell This should go into 1.2? Author: Reynold Xin <[email protected]> Closes #3579 from rxin/SPARK-4085 and squashes the following commits: 255b4fd [Reynold Xin] Updated test. f9814d9 [Reynold Xin] Code review feedback. 2afaf35 [Reynold Xin] [SPARK-4085] Propagate FetchFailedException when Spark fails to read local shuffle file.
1 parent 96b2785 commit 1826372

File tree

3 files changed

+40
-13
lines changed

3 files changed

+40
-13
lines changed

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.storage
1919

20+
import java.io.{InputStream, IOException}
2021
import java.util.concurrent.LinkedBlockingQueue
2122

2223
import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
@@ -289,17 +290,22 @@ final class ShuffleBlockFetcherIterator(
289290
}
290291

291292
val iteratorTry: Try[Iterator[Any]] = result match {
292-
case FailureFetchResult(_, e) => Failure(e)
293-
case SuccessFetchResult(blockId, _, buf) => {
294-
val is = blockManager.wrapForCompression(blockId, buf.createInputStream())
295-
val iter = serializer.newInstance().deserializeStream(is).asIterator
296-
Success(CompletionIterator[Any, Iterator[Any]](iter, {
297-
// Once the iterator is exhausted, release the buffer and set currentResult to null
298-
// so we don't release it again in cleanup.
299-
currentResult = null
300-
buf.release()
301-
}))
302-
}
293+
case FailureFetchResult(_, e) =>
294+
Failure(e)
295+
case SuccessFetchResult(blockId, _, buf) =>
296+
// There is a chance that createInputStream can fail (e.g. fetching a local file that does
297+
// not exist, SPARK-4085). In that case, we should propagate the right exception so
298+
// the scheduler gets a FetchFailedException.
299+
Try(buf.createInputStream()).map { is0 =>
300+
val is = blockManager.wrapForCompression(blockId, is0)
301+
val iter = serializer.newInstance().deserializeStream(is).asIterator
302+
CompletionIterator[Any, Iterator[Any]](iter, {
303+
// Once the iterator is exhausted, release the buffer and set currentResult to null
304+
// so we don't release it again in cleanup.
305+
currentResult = null
306+
buf.release()
307+
})
308+
}
303309
}
304310

305311
(result.blockId, iteratorTry)

core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark
1919

20-
import java.util.concurrent.atomic.AtomicInteger
21-
2220
import org.scalatest.BeforeAndAfterAll
2321

2422
import org.apache.spark.network.TransportContext

core/src/test/scala/org/apache/spark/ShuffleSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.scalatest.Matchers
2323
import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
2424
import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD}
2525
import org.apache.spark.serializer.KryoSerializer
26+
import org.apache.spark.storage.{ShuffleDataBlockId, ShuffleBlockId}
2627
import org.apache.spark.util.MutablePair
2728

2829
abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
@@ -263,6 +264,28 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
263264
}
264265
}
265266
}
267+
268+
test("[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file") {
269+
val myConf = conf.clone().set("spark.test.noStageRetry", "false")
270+
sc = new SparkContext("local", "test", myConf)
271+
val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_ + _)
272+
rdd.count()
273+
274+
// Delete one of the local shuffle blocks.
275+
val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
276+
val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
277+
assert(hashFile.exists() || sortFile.exists())
278+
279+
if (hashFile.exists()) {
280+
hashFile.delete()
281+
}
282+
if (sortFile.exists()) {
283+
sortFile.delete()
284+
}
285+
286+
// This count should retry the execution of the previous stage and rerun shuffle.
287+
rdd.count()
288+
}
266289
}
267290

268291
object ShuffleSuite {

0 commit comments

Comments
 (0)