diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 4a2281a4e8785..0878700f8bf34 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2117,4 +2117,15 @@ package object config { // batch of block will be loaded in memory with memory mapping, which has higher overhead // with small MB sized chunk of data. .createWithDefaultString("3m") + + private[spark] val MARK_FILE_LOST_ON_EXECUTOR_LOST = + ConfigBuilder("spark.shuffle.markFileLostOnExecutorLost") + .doc("Mark shuffle files as lost when an executor is lost. If you set " + + "spark.shuffle.manager with a customized class to use a different shuffle " + + "implementation like storing shuffle files on remote storage/server or using third party " + + "remote shuffle service, and you are sure the shuffle files are not stored on the " + + "executor, consider setting this to false.") + .version("3.2.0") + .booleanConf + .createWithDefault(true) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d711b432ae6df..2c9e63debfd1a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2034,7 +2034,8 @@ private[spark] class DAGScheduler( // if the cluster manager explicitly tells us that the entire worker was lost, then // we know to unregister shuffle output. (Note that "worker" specifically refers to the process // from a Standalone cluster, where the shuffle service lives in the Worker.) - val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled + val fileLost = (workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled) && + env.blockManager.markFileLostOnExecutorLost removeExecutorAndUnregisterOutputs( execId = execId, fileLost = fileLost, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 4c09e1615affb..3394ee7af9f39 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -179,6 +179,9 @@ private[spark] class BlockManager( // same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)` private[spark] val externalShuffleServiceEnabled: Boolean = externalBlockStoreClient.isDefined + private[spark] val markFileLostOnExecutorLost: Boolean = conf.get( + config.MARK_FILE_LOST_ON_EXECUTOR_LOST) + private val remoteReadNioBufferConversion = conf.get(Network.NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4c74e4fbb3728..8708a915645a9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2592,6 +2592,24 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assertDataStructuresEmpty() } + test("shuffle files should not be lost when setting spark.shuffle.markFileLostOnExecutorLost " + + "to false") { + conf.set(config.MARK_FILE_LOST_ON_EXECUTOR_LOST.key, "false") + assert(sc.env.blockManager.markFileLostOnExecutorLost == false) + + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0)) + completeShuffleMapStageSuccessfully(0, 0, 1) + runEvent(ExecutorLost("hostA-exec", ExecutorKilled)) + verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec") + verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec") + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === + HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + } + /** * Checks the DAGScheduler's internal logic for traversing an RDD DAG by making sure that * getShuffleDependenciesAndResourceProfiles correctly returns the direct shuffle dependencies