@@ -48,25 +48,29 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
4848 private type StageId = Int
4949 private type PartitionId = Int
5050 private type TaskAttemptNumber = Int
51-
5251 private val NO_AUTHORIZED_COMMITTER : TaskAttemptNumber = - 1
52+ private case class StageState (numPartitions : Int ) {
53+ val authorizedCommitters = Array .fill[TaskAttemptNumber ](numPartitions)(NO_AUTHORIZED_COMMITTER )
54+ val failures = mutable.Map [PartitionId , mutable.Set [TaskAttemptNumber ]]()
55+ }
5356
5457 /**
55- * Map from active stages's id => partition id => task attempt with exclusive lock on committing
56- * output for that partition.
58+ * Map from active stages's id => authorized task attempts for each partition id, which hold an
59+ * exclusive lock on committing task output for that partition, as well as any known failed
60+ * attempts in the stage.
5761 *
5862 * Entries are added to the top-level map when stages start and are removed they finish
5963 * (either successfully or unsuccessfully).
6064 *
6165 * Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
6266 */
63- private val authorizedCommittersByStage = mutable.Map [StageId , Array [ TaskAttemptNumber ] ]()
67+ private val stageStates = mutable.Map [StageId , StageState ]()
6468
6569 /**
6670 * Returns whether the OutputCommitCoordinator's internal data structures are all empty.
6771 */
6872 def isEmpty : Boolean = {
69- authorizedCommittersByStage .isEmpty
73+ stageStates .isEmpty
7074 }
7175
7276 /**
@@ -105,19 +109,13 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
105109 * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e.
106110 * the maximum possible value of `context.partitionId`).
107111 */
108- private [scheduler] def stageStart (
109- stage : StageId ,
110- maxPartitionId : Int ): Unit = {
111- val arr = new Array [TaskAttemptNumber ](maxPartitionId + 1 )
112- java.util.Arrays .fill(arr, NO_AUTHORIZED_COMMITTER )
113- synchronized {
114- authorizedCommittersByStage(stage) = arr
115- }
112+ private [scheduler] def stageStart (stage : StageId , maxPartitionId : Int ): Unit = synchronized {
113+ stageStates(stage) = new StageState (maxPartitionId + 1 )
116114 }
117115
118116 // Called by DAGScheduler
119117 private [scheduler] def stageEnd (stage : StageId ): Unit = synchronized {
120- authorizedCommittersByStage .remove(stage)
118+ stageStates .remove(stage)
121119 }
122120
123121 // Called by DAGScheduler
@@ -126,7 +124,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
126124 partition : PartitionId ,
127125 attemptNumber : TaskAttemptNumber ,
128126 reason : TaskEndReason ): Unit = synchronized {
129- val authorizedCommitters = authorizedCommittersByStage .getOrElse(stage, {
127+ val stageState = stageStates .getOrElse(stage, {
130128 logDebug(s " Ignoring task completion for completed stage " )
131129 return
132130 })
@@ -137,10 +135,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
137135 logInfo(s " Task was denied committing, stage: $stage, partition: $partition, " +
138136 s " attempt: $attemptNumber" )
139137 case otherReason =>
140- if (authorizedCommitters(partition) == attemptNumber) {
138+ // Mark the attempt as failed to blacklist from future commit protocol
139+ stageState.failures.getOrElseUpdate(partition, mutable.Set ()) += attemptNumber
140+ if (stageState.authorizedCommitters(partition) == attemptNumber) {
141141 logDebug(s " Authorized committer (attemptNumber= $attemptNumber, stage= $stage, " +
142142 s " partition= $partition) failed; clearing lock " )
143- authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
143+ stageState. authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
144144 }
145145 }
146146 }
@@ -149,7 +149,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
149149 if (isDriver) {
150150 coordinatorRef.foreach(_ send StopCoordinator )
151151 coordinatorRef = None
152- authorizedCommittersByStage .clear()
152+ stageStates .clear()
153153 }
154154 }
155155
@@ -158,13 +158,17 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
158158 stage : StageId ,
159159 partition : PartitionId ,
160160 attemptNumber : TaskAttemptNumber ): Boolean = synchronized {
161- authorizedCommittersByStage.get(stage) match {
162- case Some (authorizedCommitters) =>
163- authorizedCommitters(partition) match {
161+ stageStates.get(stage) match {
162+ case Some (state) if attemptFailed(state, partition, attemptNumber) =>
163+ logInfo(s " Denying attemptNumber= $attemptNumber to commit for stage= $stage, " +
164+ s " partition= $partition as task attempt $attemptNumber has already failed. " )
165+ false
166+ case Some (state) =>
167+ state.authorizedCommitters(partition) match {
164168 case NO_AUTHORIZED_COMMITTER =>
165169 logDebug(s " Authorizing attemptNumber= $attemptNumber to commit for stage= $stage, " +
166170 s " partition= $partition" )
167- authorizedCommitters(partition) = attemptNumber
171+ state. authorizedCommitters(partition) = attemptNumber
168172 true
169173 case existingCommitter =>
170174 // Coordinator should be idempotent when receiving AskPermissionToCommit.
@@ -181,11 +185,18 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
181185 }
182186 }
183187 case None =>
184- logDebug(s " Stage $stage has completed, so not allowing attempt number $attemptNumber of " +
185- s " partition $partition to commit " )
188+ logDebug(s " Stage $stage has completed, so not allowing " +
189+ s " attempt number $attemptNumber of partition $partition to commit " )
186190 false
187191 }
188192 }
193+
194+ private def attemptFailed (
195+ stageState : StageState ,
196+ partition : PartitionId ,
197+ attempt : TaskAttemptNumber ): Boolean = synchronized {
198+ stageState.failures.get(partition).exists(_.contains(attempt))
199+ }
189200}
190201
191202private [spark] object OutputCommitCoordinator {
0 commit comments