-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4879] Use driver to coordinate Hadoop output committing for speculative tasks #4066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 35 commits
6e6f748
bc80770
c9decc6
6b543ba
66a71cd
1c2b219
f135a8e
abc7db4
83de900
78eb1b5
9c6a4fa
8d5a091
c334255
d431144
1df2a91
9fe6495
d63f63f
60a47f4
594e41a
0aec91e
b344bad
92e6dc9
f7d69c5
c79df98
dd00b7c
459310a
997b41b
a7c0e29
f582574
97da5fe
ede7590
3969f5f
48d5c1c
ed8b554
14861ea
e7be65a
ed783b2
658116b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapred._ | |
| import org.apache.hadoop.fs.FileSystem | ||
| import org.apache.hadoop.fs.Path | ||
|
|
||
| import org.apache.spark.executor.CommitDeniedException | ||
| import org.apache.spark.mapred.SparkHadoopMapRedUtil | ||
| import org.apache.spark.rdd.HadoopRDD | ||
|
|
||
|
|
@@ -105,24 +106,56 @@ class SparkHadoopWriter(@transient jobConf: JobConf) | |
| def commit() { | ||
| val taCtxt = getTaskContext() | ||
| val cmtr = getOutputCommitter() | ||
| if (cmtr.needsTaskCommit(taCtxt)) { | ||
|
|
||
| // Called after we have decided to commit | ||
| def performCommit(): Unit = { | ||
| try { | ||
| cmtr.commitTask(taCtxt) | ||
| logInfo (taID + ": Committed") | ||
| logInfo (s"$taID: Committed") | ||
| } catch { | ||
| case e: IOException => { | ||
| case e: IOException => | ||
| logError("Error committing the output of task: " + taID.value, e) | ||
| cmtr.abortTask(taCtxt) | ||
| throw e | ||
| } | ||
| } | ||
|
|
||
| // First, check whether the task's output has already been committed by some other attempt | ||
| if (cmtr.needsTaskCommit(taCtxt)) { | ||
| // The task output needs to be committed, but we don't know whether some other task attempt | ||
| // might be racing to commit the same output partition. Therefore, coordinate with the driver | ||
| // in order to determine whether this attempt can commit. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add (see SPARK-4879) to the comment here? |
||
| val shouldCoordinateWithDriver: Boolean = { | ||
| val sparkConf = SparkEnv.get.conf | ||
| // We only need to coordinate with the driver if there are multiple concurrent task | ||
| // attempts, which should only occur if speculation is enabled | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can anyone think of cases where this assumption would be violated? Can this ever be violated due to, say, transitive network failures?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess that depends on the semantics of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's actually more of a Spark scheduler semantics issue: I'm wondering whether it's possible to have multiple concurrently running task attempts for the same task even when speculation is disabled.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, i think there is a possible to have multiple concurrently running task attempts for the same task when there are some fetchFailed resultTasks. example: after one minute, one failed resultTask reports fetchFailed and DAGScheduler makes stage failedStage. then DAGScheduler will resubmit all uncompleted tasks of this result stage,but now some of uncompleted tasks have been submitted before. so at this time, it has multiple concurrently running task attempts for the same task. @JoshRosen @vanzin
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lianhuiwang but by the time there's a fetch failed exception isn't the first task already finished? So at any given time there's still only 1 running task attempt, right? |
||
| val speculationEnabled = sparkConf.getBoolean("spark.speculation", false) | ||
| // This (undocumented) setting is an escape-hatch in case the commit code introduces bugs | ||
| sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled) | ||
| } | ||
| if (shouldCoordinateWithDriver) { | ||
| val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator | ||
| val canCommit = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) | ||
| if (canCommit) { | ||
| performCommit() | ||
| } else { | ||
| val msg = s"$taID: Not committed because the driver did not authorize commit" | ||
| logInfo(msg) | ||
| // We need to abort the task so that the driver can reschedule new attempts, if necessary | ||
| cmtr.abortTask(taCtxt) | ||
| throw new CommitDeniedException(msg, jobID, splitID, attemptID) | ||
| } | ||
| } else { | ||
| // Speculation is disabled or a user has chosen to manually bypass the commit coordination | ||
| performCommit() | ||
| } | ||
| } else { | ||
| logInfo ("No need to commit output of task: " + taID.value) | ||
| // Some other attempt committed the output, so we do nothing and signal success | ||
| logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}") | ||
| } | ||
| } | ||
|
|
||
| def commitJob() { | ||
| // always ? Or if cmtr.needsTaskCommit ? | ||
| val cmtr = getOutputCommitter() | ||
| cmtr.commitJob(getJobContext()) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.executor | ||
|
|
||
| import org.apache.spark.{TaskCommitDenied, TaskEndReason} | ||
|
|
||
| /** | ||
| * Exception thrown when a task attempts to commit output to HDFS but is denied by the driver. | ||
| */ | ||
| class CommitDeniedException( | ||
| msg: String, | ||
| jobID: Int, | ||
| splitID: Int, | ||
| attemptID: Int) | ||
| extends Exception(msg) { | ||
|
|
||
| def toTaskEndReason: TaskEndReason = new TaskCommitDenied(jobID, splitID, attemptID) | ||
|
|
||
| } | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics | |
| import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.storage._ | ||
| import org.apache.spark.util.{CallSite, EventLoop, SystemClock, Clock, Utils} | ||
| import org.apache.spark.util._ | ||
| import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat | ||
|
|
||
| /** | ||
|
|
@@ -63,7 +63,7 @@ class DAGScheduler( | |
| mapOutputTracker: MapOutputTrackerMaster, | ||
| blockManagerMaster: BlockManagerMaster, | ||
| env: SparkEnv, | ||
| clock: Clock = SystemClock) | ||
| clock: org.apache.spark.util.Clock = SystemClock) | ||
| extends Logging { | ||
|
|
||
| def this(sc: SparkContext, taskScheduler: TaskScheduler) = { | ||
|
|
@@ -126,6 +126,8 @@ class DAGScheduler( | |
| private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) | ||
| taskScheduler.setDAGScheduler(this) | ||
|
|
||
| private val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. super minor, but you can just get it from the |
||
|
|
||
| // Called by TaskScheduler to report task's starting. | ||
| def taskStarted(task: Task[_], taskInfo: TaskInfo) { | ||
| eventProcessLoop.post(BeginEvent(task, taskInfo)) | ||
|
|
@@ -808,6 +810,7 @@ class DAGScheduler( | |
| // will be posted, which should always come after a corresponding SparkListenerStageSubmitted | ||
| // event. | ||
| stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) | ||
| outputCommitCoordinator.stageStart(stage.id) | ||
| listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) | ||
|
|
||
| // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. | ||
|
|
@@ -865,6 +868,7 @@ class DAGScheduler( | |
| } else { | ||
| // Because we posted SparkListenerStageSubmitted earlier, we should post | ||
| // SparkListenerStageCompleted here in case there are no tasks to run. | ||
| outputCommitCoordinator.stageEnd(stage.id) | ||
| listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) | ||
| logDebug("Stage " + stage + " is actually done; %b %d %d".format( | ||
| stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) | ||
|
|
@@ -909,6 +913,9 @@ class DAGScheduler( | |
| val stageId = task.stageId | ||
| val taskType = Utils.getFormattedClassName(task) | ||
|
|
||
| outputCommitCoordinator.taskCompleted(stageId, task.partitionId, | ||
| event.taskInfo.attempt, event.reason) | ||
|
|
||
| // The success case is dealt with separately below, since we need to compute accumulator | ||
| // updates before posting. | ||
| if (event.reason != Success) { | ||
|
|
@@ -921,6 +928,7 @@ class DAGScheduler( | |
| // Skip all the actions if the stage has been cancelled. | ||
| return | ||
| } | ||
|
|
||
| val stage = stageIdToStage(task.stageId) | ||
|
|
||
| def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = { | ||
|
|
@@ -1073,6 +1081,9 @@ class DAGScheduler( | |
| handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch)) | ||
| } | ||
|
|
||
| case commitDenied: TaskCommitDenied => | ||
| // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits | ||
|
|
||
| case ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics) => | ||
| // Do nothing here, left up to the TaskScheduler to decide how to handle user failures | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see how this could be confusing: the OutputCommitCoordinator class serves as both a server and a client interface, similar to MapOutputStatusTracker and a couple of other classes. I'll see about splitting the client functionality into a separate
OutputCommitCoordinatorClientclass that's initialized everywhere and leave the OutputCommitCoordinator as a driver-only component.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, per offline discussion with @andrewor14, I'm going to leave this as-is for now.