Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}

def commit() {
SparkHadoopMapRedUtil.commitTask(
getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), jobID, splitID)
}

def commitJob() {
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,9 @@ case object TaskKilled extends TaskFailedReason {
case class TaskCommitDenied(
jobID: Int,
partitionID: Int,
attemptID: Int)
extends TaskFailedReason {
attemptNumber: Int) extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ class CommitDeniedException(
msg: String,
jobID: Int,
splitID: Int,
attemptID: Int)
attemptNumber: Int)
extends Exception(msg) {

def toTaskEndReason: TaskEndReason = new TaskCommitDenied(jobID, splitID, attemptID)
def toTaskEndReason: TaskEndReason = new TaskCommitDenied(jobID, splitID, attemptNumber)

}

Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ object SparkHadoopMapRedUtil extends Logging {
committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
jobId: Int,
splitId: Int,
attemptId: Int): Unit = {
splitId: Int): Unit = {

val mrTaskAttemptID = mrTaskContext.getTaskAttemptID

Expand Down Expand Up @@ -120,7 +119,8 @@ object SparkHadoopMapRedUtil extends Logging {

if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, attemptId)
val taskAttemptNumber = TaskContext.get().attemptNumber()
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)

if (canCommit) {
performCommit()
Expand All @@ -130,7 +130,7 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
throw new CommitDeniedException(message, jobId, splitId, attemptId)
throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
Expand All @@ -150,7 +150,6 @@ object SparkHadoopMapRedUtil extends Logging {
committer,
mrTaskContext,
sparkTaskContext.stageId(),
sparkTaskContext.partitionId(),
sparkTaskContext.attemptNumber())
sparkTaskContext.partitionId())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,11 @@ class DAGScheduler(
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)

outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
event.taskInfo.attempt, event.reason)
outputCommitCoordinator.taskCompleted(
stageId,
task.partitionId,
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)

// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
private sealed trait OutputCommitCoordinationMessage extends Serializable

private case object StopCoordinator extends OutputCommitCoordinationMessage
private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttempt: Long)
private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber: Int)

/**
* Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"
Expand All @@ -49,8 +49,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private val retryInterval = AkkaUtils.retryWaitMs(conf)

private type StageId = Int
private type PartitionId = Long
private type TaskAttemptId = Long
private type PartitionId = Int
private type TaskAttemptNumber = Int

/**
* Map from active stages's id => partition id => task attempt with exclusive lock on committing
Expand All @@ -62,7 +62,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
*/
private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptId]]
private type CommittersByStageMap =
mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]

/**
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
Expand All @@ -80,14 +81,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
*
* @param stage the stage number
* @param partition the partition number
* @param attempt a unique identifier for this task attempt
* @param attemptNumber how many times this task has been attempted
* (see [[TaskContext.attemptNumber()]])
* @return true if this task is authorized to commit, false otherwise
*/
def canCommit(
stage: StageId,
partition: PartitionId,
attempt: TaskAttemptId): Boolean = {
val msg = AskPermissionToCommitOutput(stage, partition, attempt)
attemptNumber: TaskAttemptNumber): Boolean = {
val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
coordinatorActor match {
case Some(actor) =>
AkkaUtils.askWithReply[Boolean](msg, actor, maxAttempts, retryInterval, timeout)
Expand All @@ -100,7 +102,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)

// Called by DAGScheduler
private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptId]()
authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptNumber]()
}

// Called by DAGScheduler
Expand All @@ -112,7 +114,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[scheduler] def taskCompleted(
stage: StageId,
partition: PartitionId,
attempt: TaskAttemptId,
attemptNumber: TaskAttemptNumber,
reason: TaskEndReason): Unit = synchronized {
val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
logDebug(s"Ignoring task completion for completed stage")
Expand All @@ -122,12 +124,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
case Success =>
// The task output has been committed successfully
case denied: TaskCommitDenied =>
logInfo(
s"Task was denied committing, stage: $stage, partition: $partition, attempt: $attempt")
logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
s"attempt: $attemptNumber")
case otherReason =>
if (authorizedCommitters.get(partition).exists(_ == attempt)) {
logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
s" clearing lock")
if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
authorizedCommitters.remove(partition)
}
}
Expand All @@ -145,21 +147,23 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[scheduler] def handleAskPermissionToCommit(
stage: StageId,
partition: PartitionId,
attempt: TaskAttemptId): Boolean = synchronized {
attemptNumber: TaskAttemptNumber): Boolean = synchronized {
authorizedCommittersByStage.get(stage) match {
case Some(authorizedCommitters) =>
authorizedCommitters.get(partition) match {
case Some(existingCommitter) =>
logDebug(s"Denying $attempt to commit for stage=$stage, partition=$partition; " +
s"existingCommitter = $existingCommitter")
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition; existingCommitter = $existingCommitter")
false
case None =>
logDebug(s"Authorizing $attempt to commit for stage=$stage, partition=$partition")
authorizedCommitters(partition) = attempt
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition")
authorizedCommitters(partition) = attemptNumber
true
}
case None =>
logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit")
logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +
s"partition $partition to commit")
false
}
}
Expand All @@ -172,8 +176,9 @@ private[spark] object OutputCommitCoordinator {
extends Actor with ActorLogReceive with Logging {

override def receiveWithLogging = {
case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
sender ! outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt)
case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
sender ! outputCommitCoordinator.handleAskPermissionToCommit(
stage, partition, attemptNumber)
case StopCoordinator =>
logInfo("OutputCommitCoordinator stopped!")
context.stop(self)
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
val attempt: Int,
val attemptNumber: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to take a look at the code to see the meaning of attempt. I find https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L451-L452. Looks like it is indeed attemptNumber.

val launchTime: Long,
val executorId: String,
val host: String,
Expand Down Expand Up @@ -95,7 +95,10 @@ class TaskInfo(
}
}

def id: String = s"$index.$attempt"
@deprecated("Use attemptNumber", "1.6.0")
def attempt: Int = attemptNumber

def id: String = s"$index.$attemptNumber"

def duration: Long = {
if (!finished) {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<tr>
<td>{info.index}</td>
<td>{info.taskId}</td>
<td sorttable_customkey={info.attempt.toString}>{
if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString
<td sorttable_customkey={info.attemptNumber.toString}>{
if (info.speculative) s"${info.attemptNumber} (speculative)"
else info.attemptNumber.toString
}</td>
<td>{info.status}</td>
<td>{info.taskLocality}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private[spark] object JsonProtocol {
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
("Task ID" -> taskInfo.taskId) ~
("Index" -> taskInfo.index) ~
("Attempt" -> taskInfo.attempt) ~
("Attempt" -> taskInfo.attemptNumber) ~
("Launch Time" -> taskInfo.launchTime) ~
("Executor ID" -> taskInfo.executorId) ~
("Host" -> taskInfo.host) ~
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.{Span, Seconds}

import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext, SparkFunSuite, TaskContext}
import org.apache.spark.util.Utils

/**
* Integration tests for the OutputCommitCoordinator.
*
* See also: [[OutputCommitCoordinatorSuite]] for unit tests that use mocks.
*/
class OutputCommitCoordinatorIntegrationSuite
extends SparkFunSuite
with LocalSparkContext
with Timeouts {

override def beforeAll(): Unit = {
super.beforeAll()
val conf = new SparkConf()
.set("master", "local[2,4]")
.set("spark.speculation", "true")
.set("spark.hadoop.mapred.output.committer.class",
classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
sc = new SparkContext("local[2, 4]", "test", conf)
}

test("exception thrown in OutputCommitter.commitTask()") {
// Regression test for SPARK-10381
failAfter(Span(60, Seconds)) {
val tempDir = Utils.createTempDir()
try {
sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
} finally {
Utils.deleteRecursively(tempDir)
}
}
}
}

private class ThrowExceptionOnFirstAttemptOutputCommitter extends FileOutputCommitter {
override def commitTask(context: TaskAttemptContext): Unit = {
val ctx = TaskContext.get()
if (ctx.attemptNumber < 1) {
throw new java.io.FileNotFoundException("Intentional exception")
}
super.commitTask(context)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ import scala.language.postfixOps
* was not in SparkHadoopWriter, the tests would still pass because only one of the
* increments would be captured even though the commit in both tasks was executed
* erroneously.
*
* See also: [[OutputCommitCoordinatorIntegrationSuite]] for integration tests that do
* not use mocks.
*/
class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {

Expand Down Expand Up @@ -164,27 +167,28 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {

test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") {
val stage: Int = 1
val partition: Long = 2
val authorizedCommitter: Long = 3
val nonAuthorizedCommitter: Long = 100
val partition: Int = 2
val authorizedCommitter: Int = 3
val nonAuthorizedCommitter: Int = 100
outputCommitCoordinator.stageStart(stage)
assert(outputCommitCoordinator.canCommit(stage, partition, attempt = authorizedCommitter))
assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter))

assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
// The non-authorized committer fails
outputCommitCoordinator.taskCompleted(
stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled)
// New tasks should still not be able to commit because the authorized committer has not failed
assert(
!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 1))
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1))
// The authorized committer now fails, clearing the lock
outputCommitCoordinator.taskCompleted(
stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled)
// A new task should now be allowed to become the authorized committer
assert(
outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 2))
outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2))
// There can only be one authorized committer
assert(
!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 3))
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ class JsonProtocolSuite extends SparkFunSuite {
private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
assert(info1.taskId === info2.taskId)
assert(info1.index === info2.index)
assert(info1.attempt === info2.attempt)
assert(info1.attemptNumber === info2.attemptNumber)
assert(info1.launchTime === info2.launchTime)
assert(info1.executorId === info2.executorId)
assert(info1.host === info2.host)
Expand Down
Loading