Skip to content

Conversation

@jiangxb1987
Copy link
Contributor

@jiangxb1987 jiangxb1987 commented Jun 4, 2018

What changes were proposed in this pull request?

This PR is to add new RDDBarrier and BarrierTaskContext to support barrier scheduling in Spark. It also modifies how the job scheduling works to accommodate the new feature.

Note: this is a prototype to facilitate the discussion. It's not meant for the final design or anything. It just shows one way that might works.

How was this patch tested?

Simple unit test and integration test.

@SparkQA
Copy link

SparkQA commented Jun 5, 2018

Test build #91471 has finished for PR 21494 at commit 84cdc68.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member

Ngone51 commented Jun 6, 2018

Hi, @jiangxb1987 , can you explain more for what is barrier scheduling in spark and elaborate an example which would only works with barrier scheduling( but could not work under current spark schedule mechanism) for better understanding ?

@jiangxb1987
Copy link
Contributor Author

@Ngone51 You can refer to the SPIP that xiangrui proposed in SPARK-24374 for a basic background and major goal of barrier scheduling, and you can also refer to SPARK-24375 for a design sketch. If you have further comments please feel free to talk on the JIRA (recommended because that works better for something we may want to revisit later) or here :)

Copy link

@galv galv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't understood everything yet. I'll have to return to this later to review fully.

val tc = TaskContext.get.asInstanceOf[org.apache.spark.barrier.BarrierTaskContext]
// If we don't get the expected taskInfos, the job shall abort due to stage failure.
if (tc.hosts().length != 2) {
throw new SparkException("Expected taksInfos length is 2, actual length is " +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taksInfos -> taskInfos

throw new SparkException("Expected taksInfos length is 2, actual length is " +
s"${tc.hosts().length}.")
}
// println(tc.getTaskInfos().toList)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove comment

shuffle.DiskBytesSpilled = 0
_accumulatorRegistry.clear()

if (isBarrier):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: if (isBarrier): -> if isBarrier:


if (isBarrier):
port = 25333 + 2 + 2 * taskContext._partitionId
paras = GatewayParameters(port=port)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paras -> params

_accumulatorRegistry.clear()

if (isBarrier):
port = 25333 + 2 + 2 * taskContext._partitionId
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using DEFAULT_PORT and DEFAULT_PYTHON_PORT. They are exposed as part of the public API of py4j: https://github.com/bartdag/py4j/blob/216432d859de41441f0d1a0d55b31b5d8d09dd28/py4j-python/src/py4j/java_gateway.py#L54

By the way, acquiring ports like this is a little hacky and may require more thought.

// TODO: We should kill any running task attempts when the task set manager becomes a zombie.
private[scheduler] var isZombie = false

private[scheduler] lazy val barrierCoordinator = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend adding a return type here for readability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 @galv
We also have barrierCoordinator with type RpcEndpointRef at each TaskContext, so it's better to add return type for both.

import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.RpcUtils

class BarrierTaskContext(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BarrierTaskContextImpl?

case IncreaseEpoch(previousEpoch) =>
if (previousEpoch == epoch) {
syncRequests.foreach(_.sendFailure(new RuntimeException(
s"The coordinator cannot get all barrier sync requests within $timeout ms.")))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we considered to increase incrementally the time out when we can't get all barrier sync requests at an epoch?


syncRequests += context
replyIfGetAllSyncRequest()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (epoch == this.epoch) {
 ...
} else { // Received RpcCallContext from failed previousEpoch.
  context.sendFailure(new RuntimeException(
    s"The coordinator cannot get all barrier sync requests within $timeout ms.")))
}

taskScheduler.cancelTasks(stageId, interruptThread = false)
} catch {
case e: UnsupportedOperationException =>
logInfo(s"Could not cancel tasks for stage $stageId", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under barrier execution, will it be a problem if we can not cancel tasks?

taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
// Skip the barrier taskSet if the available slots are less than the number of pending tasks.
if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
// Skip the launch process.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging something instead of silently passing?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to propagate info to scheduler and resource manager layer for preempt scheduling

timer.schedule(new TimerTask {
override def run(): Unit = {
// self can be null after this RPC endpoint is stopped.
if (self != null) self.send(IncreaseEpoch(currentEpoch))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once this epoch fails to sync, the stage will be failed and resubmitted. I think it will begin from new task set, so IncreaseEpoch seems useless because it doesn't really increase epoch?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

register task level barriers sequence and hierarchy may be?

// Write out the TaskContextInfo
val isBarrier = context.isInstanceOf[BarrierTaskContext]
dataOut.writeBoolean(isBarrier)
if (isBarrier) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this would be language dependent? would need something for R runner too?

// TODO: We should kill any running task attempts when the task set manager becomes a zombie.
private[scheduler] var isZombie = false

private[scheduler] lazy val barrierCoordinator = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 @galv
We also have barrierCoordinator with type RpcEndpointRef at each TaskContext, so it's better to add return type for both.

timeout: Long,
override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {

private var epoch = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will epoch value be logged on driver and executors? It should be useful to diagnose upper level MPI program.

/** Returns the default Spark timeout to use for RPC ask operations. */
def askRpcTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "120s")
RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "900s")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why hard-code this change? Couldn't you have set this at runtime if you needed it increased? I'm concerned about it breaking backwards compatibility with jobs that for whatever reason depend on the 120 second timeout.



/**
* An RDD that supports running MPI programme.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

programme -> program

@jiangxb1987
Copy link
Contributor Author

Close this in favor of #21758 and #21898 , thanks for your comments! I hope they're addressed in the new code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants