Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2117,4 +2117,15 @@ package object config {
// batch of block will be loaded in memory with memory mapping, which has higher overhead
// with small MB sized chunk of data.
.createWithDefaultString("3m")

private[spark] val MARK_FILE_LOST_ON_EXECUTOR_LOST =
ConfigBuilder("spark.shuffle.markFileLostOnExecutorLost")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am having similar concern with @viirya and @attilapiros . I think we should not make it as a user-facing config. If we would like introducing a config for this anyway, it'd better to start with internal() config, but not user-facing.

Though spark.shuffle.manager can be arbitrary class, but in practice, they are just a handful of implementation in one company's environment, and ideally the infra developers should control which implementation to use, instead of user to control this. Similarly for whether to mark shuffle file lost should be controlled by developers team but not users.

Just share some context, in FB, we (spark developer) control these kinds of behavior transparently and make this invisible to spark user. This also helps us to migrate to newer implementation easier without worrying about users setting wrong config. The mixed case (query uses customized shuffle service and default shuffle service) can happen quite a bit in production, as we have rate limit for traffic on customized service, and need fallback.

Copy link
Author

@hiboyang hiboyang Mar 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @c21 , thanks for the comment, and known you from FB! We previously discussed with people from FB about Cosco, but not for this specific issue. Just curious, do you guys modify your internal Spark distribution to make executor lost event not trigger driver to delete map output?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you guys modify your internal Spark distribution to make executor lost event not trigger driver to delete map output?

Yeah I think so, I can do a double check with cosco folks as well tomorrow. btw nice to know you as well!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, that will be interested to know, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@c21 May I ask what is the status of open sourcing Cosco?
I just think it might help us to make better decisions about the Shuffle plugin API of Spark if we could look into that one too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@attilapiros - unfortunately there's no specific ETA now, but the team is working on it.

.doc("Mark shuffle files as lost when an executor is lost. If you set " +
"spark.shuffle.manager with a customized class to use a different shuffle " +
"implementation like storing shuffle files on remote storage/server or using third party " +
"remote shuffle service, and you are sure the shuffle files are not stored on the " +
"executor, consider setting this to false.")
.version("3.2.0")
.booleanConf
.createWithDefault(true)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2034,7 +2034,8 @@ private[spark] class DAGScheduler(
// if the cluster manager explicitly tells us that the entire worker was lost, then
// we know to unregister shuffle output. (Note that "worker" specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the Worker.)
val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled
val fileLost = (workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled) &&
env.blockManager.markFileLostOnExecutorLost
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we check this config to not take effect, if spark.shuffle.manager is sort (the default one)?

removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ private[spark] class BlockManager(
// same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)`
private[spark] val externalShuffleServiceEnabled: Boolean = externalBlockStoreClient.isDefined

private[spark] val markFileLostOnExecutorLost: Boolean = conf.get(
config.MARK_FILE_LOST_ON_EXECUTOR_LOST)

private val remoteReadNioBufferConversion =
conf.get(Network.NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2592,6 +2592,24 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
assertDataStructuresEmpty()
}

test("shuffle files should not be lost when setting spark.shuffle.markFileLostOnExecutorLost " +
"to false") {
conf.set(config.MARK_FILE_LOST_ON_EXECUTOR_LOST.key, "false")
assert(sc.env.blockManager.markFileLostOnExecutorLost == false)

val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
completeShuffleMapStageSuccessfully(0, 0, 1)
runEvent(ExecutorLost("hostA-exec", ExecutorKilled))
verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec")
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
}

/**
* Checks the DAGScheduler's internal logic for traversing an RDD DAG by making sure that
* getShuffleDependenciesAndResourceProfiles correctly returns the direct shuffle dependencies
Expand Down