Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.{Timer, TimerTask}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Consumer

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, HashSet}

import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
Expand Down Expand Up @@ -108,7 +108,7 @@ private[spark] class BarrierCoordinator(

// The request method which is called inside this barrier sync. All tasks should make sure

@dilipbiswal dilipbiswal May 7, 2020

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 Super Nit: Perhaps change the comment here to reflect the change ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks.

// that they're calling the same method within the same barrier sync phase.
private var requestMethod: RequestMethod.Value = _
private val requestMethods = new HashSet[RequestMethod.Value]

// A timer task that ensures we may timeout for a barrier() call.
private var timerTask: TimerTask = null
Expand Down Expand Up @@ -141,17 +141,14 @@ private[spark] class BarrierCoordinator(
val taskId = request.taskAttemptId
val epoch = request.barrierEpoch
val curReqMethod = request.requestMethod

if (requesters.isEmpty) {
requestMethod = curReqMethod
} else if (requestMethod != curReqMethod) {
requesters.foreach(
_.sendFailure(new SparkException(s"$barrierId tried to use requestMethod " +
s"`$curReqMethod` during barrier epoch $barrierEpoch, which does not match " +
s"the current synchronized requestMethod `$requestMethod`"
))
)
cleanupBarrierStage(barrierId)
requestMethods.add(curReqMethod)
if (requestMethods.size > 1) {
val error = new SparkException(s"Different barrier sync types found for the " +
s"sync $barrierId: ${requestMethods.mkString(", ")}. Please use the " +

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally ignored barrier epoch here because it (-1) becomes meaningless after clear. But we can update the logic of clear if any of you think it's necessary here.

s"same barrier sync type within a single sync.")
(requesters :+ requester).foreach(_.sendFailure(error))
clear()
return
}

// Require the number of tasks is correctly set from the BarrierTaskContext.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
val error = intercept[SparkException] {
rdd2.collect()
}.getMessage
assert(
error.contains("does not match the current synchronized requestMethod") ||
error.contains("not properly killed")
)
assert(error.contains("Different barrier sync types found"))
}

test("successively sync with allGather and barrier") {
Expand Down