@@ -73,18 +73,18 @@ trait SparkHadoopMapRedUtil {
7373}
7474
7575object SparkHadoopMapRedUtil extends Logging {
76- def commitTask (
77- committer : MapReduceOutputCommitter ,
78- mrTaskContext : MapReduceTaskAttemptContext ,
79- sparkTaskContext : TaskContext ) : Unit = {
80- commitTask(
81- committer,
82- mrTaskContext,
83- sparkTaskContext.stageId(),
84- sparkTaskContext.partitionId(),
85- sparkTaskContext.attemptNumber())
86- }
87-
76+ /**
77+ * Commits a task output. Before committing the task output, we need to know whether some other
78+ * task attempt might be racing to commit the same output partition. Therefore, coordinate with
79+ * the driver in order to determine whether this attempt can commit (please see SPARK-4879 for
80+ * details).
81+ *
82+ * Commit output coordinator is only contacted when the following two configurations are both set
83+ * to `true`:
84+ *
85+ * - `spark.speculation`
86+ * - `spark.hadoop.outputCommitCoordination.enabled`
87+ */
8888 def commitTask (
8989 committer : MapReduceOutputCommitter ,
9090 mrTaskContext : MapReduceTaskAttemptContext ,
@@ -109,9 +109,6 @@ object SparkHadoopMapRedUtil extends Logging {
109109
110110 // First, check whether the task's output has already been committed by some other attempt
111111 if (committer.needsTaskCommit(mrTaskContext)) {
112- // The task output needs to be committed, but we don't know whether some other task attempt
113- // might be racing to commit the same output partition. Therefore, coordinate with the driver
114- // in order to determine whether this attempt can commit (see SPARK-4879).
115112 val shouldCoordinateWithDriver : Boolean = {
116113 val sparkConf = SparkEnv .get.conf
117114 // We only need to coordinate with the driver if there are multiple concurrent task
@@ -144,4 +141,16 @@ object SparkHadoopMapRedUtil extends Logging {
144141 logInfo(s " No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID" )
145142 }
146143 }
144+
145+ def commitTask (
146+ committer : MapReduceOutputCommitter ,
147+ mrTaskContext : MapReduceTaskAttemptContext ,
148+ sparkTaskContext : TaskContext ): Unit = {
149+ commitTask(
150+ committer,
151+ mrTaskContext,
152+ sparkTaskContext.stageId(),
153+ sparkTaskContext.partitionId(),
154+ sparkTaskContext.attemptNumber())
155+ }
147156}
0 commit comments