Skip to content

Commit 69b412a

Browse files
author
jinxing
committed
[SPARK-18113] Use ask to replace askWithRetry in canCommit and make receiver idempotent.
1 parent 923e594 commit 69b412a

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.collection.mutable
2222
import org.apache.spark._
2323
import org.apache.spark.internal.Logging
2424
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
25+
import org.apache.spark.util.{RpcUtils, ThreadUtils}
2526

2627
private sealed trait OutputCommitCoordinationMessage extends Serializable
2728

@@ -88,7 +89,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
8889
val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
8990
coordinatorRef match {
9091
case Some(endpointRef) =>
91-
endpointRef.askWithRetry[Boolean](msg)
92+
ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg),
93+
RpcUtils.askRpcTimeout(conf).duration)
9294
case None =>
9395
logError(
9496
"canCommit called after coordinator was stopped (is SparkEnv shutdown in progress)?")
@@ -165,9 +167,18 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
165167
authorizedCommitters(partition) = attemptNumber
166168
true
167169
case existingCommitter =>
168-
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
169-
s"partition=$partition; existingCommitter = $existingCommitter")
170-
false
170+
// Coordinator should be idempotent when receiving AskPermissionToCommit.
171+
if (existingCommitter == attemptNumber) {
172+
logWarning(s"Authorizing duplicate request to commit for " +
173+
s"attemptNumber=$attemptNumber to commit for stage=$stage," +
174+
s" partition=$partition; existingCommitter = $existingCommitter." +
175+
s" This can indicate dropped network traffic.")
176+
true
177+
} else {
178+
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
179+
s"partition=$partition; existingCommitter = $existingCommitter")
180+
false
181+
}
171182
}
172183
case None =>
173184
logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +

core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
189189
assert(
190190
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
191191
}
192+
193+
test("Duplicate calls to canCommit from the authorized committer gets idempotent responses.") {
194+
val rdd = sc.parallelize(Seq(1), 1)
195+
sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _,
196+
0 until rdd.partitions.size)
197+
}
192198
}
193199

194200
/**
@@ -221,6 +227,16 @@ private case class OutputCommitFunctions(tempDirPath: String) {
221227
if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter)
222228
}
223229

230+
// Receiver should be idempotent for AskPermissionToCommitOutput
231+
def callCanCommitMultipleTimes(iter: Iterator[Int]): Unit = {
232+
val ctx = TaskContext.get()
233+
val canCommit1 = SparkEnv.get.outputCommitCoordinator
234+
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
235+
val canCommit2 = SparkEnv.get.outputCommitCoordinator
236+
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
237+
assert(canCommit1 && canCommit2)
238+
}
239+
224240
private def runCommitWithProvidedCommitter(
225241
ctx: TaskContext,
226242
iter: Iterator[Int],

0 commit comments

Comments
 (0)