Skip to content

Commit 5b63000

Browse files
Reza SafiMarcelo Vanzin
authored andcommitted
[SPARK-22162][BRANCH-2.2] Executors and the driver should use consistent JobIDs in the RDD commit protocol
I have modified SparkHadoopMapReduceWriter so that executors and the driver always use consistent JobIds during the hadoop commit. Before SPARK-18191, spark always used the rddId, it just incorrectly named the variable stageId. After SPARK-18191, it used the rddId as the jobId on the driver's side, and the stageId as the jobId on the executors' side. With this change executors and the driver will consistently uses rddId as the jobId. Also with this change, during the hadoop commit protocol spark uses actual stageId to check whether a stage can be committed unlike before that it was using executors' jobId to do this check. In addition to the existing unit tests, a test has been added to check whether executors and the driver are using the same JobId. The test failed before this change and passed after applying this fix. Author: Reza Safi <[email protected]> Closes #19886 from rezasafi/stagerdd22.
1 parent f3f8c87 commit 5b63000

File tree

3 files changed

+53
-8
lines changed

3 files changed

+53
-8
lines changed

core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,13 @@ object SparkHadoopMapReduceWriter extends Logging {
6060
hadoopConf: Configuration): Unit = {
6161
// Extract context and configuration from RDD.
6262
val sparkContext = rdd.context
63-
val stageId = rdd.id
63+
val commitJobId = rdd.id
6464
val sparkConf = rdd.conf
6565
val conf = new SerializableConfiguration(hadoopConf)
6666

6767
// Set up a job.
6868
val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date())
69-
val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 0, 0)
69+
val jobAttemptId = new TaskAttemptID(jobTrackerId, commitJobId, TaskType.MAP, 0, 0)
7070
val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId)
7171
val format = jobContext.getOutputFormatClass
7272

@@ -78,7 +78,7 @@ object SparkHadoopMapReduceWriter extends Logging {
7878

7979
val committer = FileCommitProtocol.instantiate(
8080
className = classOf[HadoopMapReduceCommitProtocol].getName,
81-
jobId = stageId.toString,
81+
jobId = commitJobId.toString,
8282
outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
8383
isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
8484
committer.setupJob(jobContext)
@@ -89,7 +89,7 @@ object SparkHadoopMapReduceWriter extends Logging {
8989
executeTask(
9090
context = context,
9191
jobTrackerId = jobTrackerId,
92-
sparkStageId = context.stageId,
92+
commitJobId = commitJobId,
9393
sparkPartitionId = context.partitionId,
9494
sparkAttemptNumber = context.attemptNumber,
9595
committer = committer,
@@ -112,15 +112,15 @@ object SparkHadoopMapReduceWriter extends Logging {
112112
private def executeTask[K, V: ClassTag](
113113
context: TaskContext,
114114
jobTrackerId: String,
115-
sparkStageId: Int,
115+
commitJobId: Int,
116116
sparkPartitionId: Int,
117117
sparkAttemptNumber: Int,
118118
committer: FileCommitProtocol,
119119
hadoopConf: Configuration,
120120
outputFormat: Class[_ <: OutputFormat[K, V]],
121121
iterator: Iterator[(K, V)]): TaskCommitMessage = {
122122
// Set up a task.
123-
val attemptId = new TaskAttemptID(jobTrackerId, sparkStageId, TaskType.REDUCE,
123+
val attemptId = new TaskAttemptID(jobTrackerId, commitJobId, TaskType.REDUCE,
124124
sparkPartitionId, sparkAttemptNumber)
125125
val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
126126
committer.setupTask(taskContext)

core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
7070
if (shouldCoordinateWithDriver) {
7171
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
7272
val taskAttemptNumber = TaskContext.get().attemptNumber()
73-
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
73+
val stageId = TaskContext.get().stageId()
74+
val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber)
7475

7576
if (canCommit) {
7677
performCommit()
@@ -80,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging {
8081
logInfo(message)
8182
// We need to abort the task so that the driver can reschedule new attempts, if necessary
8283
committer.abortTask(mrTaskContext)
83-
throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
84+
throw new CommitDeniedException(message, stageId, splitId, taskAttemptNumber)
8485
}
8586
} else {
8687
// Speculation is disabled or a user has chosen to manually bypass the commit coordination

core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{Job => NewJob, JobContext => NewJobContext,
3030
OutputCommitter => NewOutputCommitter, OutputFormat => NewOutputFormat,
3131
RecordWriter => NewRecordWriter, TaskAttemptContext => NewTaskAttempContext}
3232
import org.apache.hadoop.util.Progressable
33+
import org.scalatest.Assertions
3334

3435
import org.apache.spark._
3536
import org.apache.spark.Partitioner
@@ -524,6 +525,15 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
524525
pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
525526
}
526527

528+
test("The JobId on the driver and executors should be the same during the commit") {
529+
// Create more than one rdd to mimic stageId not equal to rddId
530+
val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2)
531+
.map { p => (new Integer(p._1 + 1), new Integer(p._2 + 1)) }
532+
.filter { p => p._1 > 0 }
533+
pairs.saveAsNewAPIHadoopFile[YetAnotherFakeFormat]("ignored")
534+
assert(JobID.jobid != -1)
535+
}
536+
527537
test("saveAsHadoopFile should respect configured output committers") {
528538
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
529539
val conf = new JobConf()
@@ -908,6 +918,40 @@ class NewFakeFormatWithCallback() extends NewFakeFormat {
908918
}
909919
}
910920

921+
class YetAnotherFakeCommitter extends NewOutputCommitter with Assertions {
922+
def setupJob(j: NewJobContext): Unit = {
923+
JobID.jobid = j.getJobID().getId
924+
}
925+
926+
def needsTaskCommit(t: NewTaskAttempContext): Boolean = false
927+
928+
def setupTask(t: NewTaskAttempContext): Unit = {
929+
val jobId = t.getTaskAttemptID().getJobID().getId
930+
assert(jobId === JobID.jobid)
931+
}
932+
933+
def commitTask(t: NewTaskAttempContext): Unit = {}
934+
935+
def abortTask(t: NewTaskAttempContext): Unit = {}
936+
}
937+
938+
class YetAnotherFakeFormat() extends NewOutputFormat[Integer, Integer]() {
939+
940+
def checkOutputSpecs(j: NewJobContext): Unit = {}
941+
942+
def getRecordWriter(t: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
943+
new NewFakeWriter()
944+
}
945+
946+
def getOutputCommitter(t: NewTaskAttempContext): NewOutputCommitter = {
947+
new YetAnotherFakeCommitter()
948+
}
949+
}
950+
951+
object JobID {
952+
var jobid = -1
953+
}
954+
911955
class ConfigTestFormat() extends NewFakeFormat() with Configurable {
912956

913957
var setConfCalled = false

0 commit comments

Comments
 (0)