Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -796,9 +796,12 @@ private[deploy] class Master(
}

private def relaunchDriver(driver: DriverInfo) {
driver.worker = None
driver.state = DriverState.RELAUNCHING
waitingDrivers += driver
removeDriver(driver.id, DriverState.RELAUNCHING, None)
val newDriver = createDriver(driver.desc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a good reason to remove and create the driver in this case? It looks like some kind of overkill compared to the old logic.

Copy link
Contributor Author

@lycplus lycplus May 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, we must distinguish the original driver and the newly relaunched one, because there will be statusUpdate of the two versions to arrive at master. For example, when the network partitioned worker reconnects to master, it will send DriverStateChanged with the driver id, and master must recognize it is the state of the original driver and not state of the newly launched driver.

The patch simply choose a new driver id to do this, which also has some Shortcomings, however. For example, In the UI, the two versions of driver are not related, and the final state is RELAUNCHING(which seems better to be relaunched).

Another way is to add some like attemptId to driver state, and then Let DriverStateChanged bring the attemptId to indicate its entity. This seems more complex.

What's your opinion?

persistenceEngine.addDriver(newDriver)
drivers.add(newDriver)
waitingDrivers += newDriver

schedule()
}

Expand Down
100 changes: 100 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Date
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import scala.io.Source
import scala.language.postfixOps
Expand Down Expand Up @@ -499,4 +500,103 @@ class MasterSuite extends SparkFunSuite
assert(receivedMasterAddress === RpcAddress("localhost2", 10000))
}
}

test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") {
val conf = new SparkConf().set("spark.worker.timeout", "1")
val master = makeMaster(conf)
master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
eventually(timeout(10.seconds)) {
val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we move this out of the eventually {...} block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, this can not be moved because MasterStateResponse is changed over time. If we move the rpc out, the masterState will never change, and the assert will fail.

See the above test SPARK-20529:..., there is a same eventually assert.

assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
}

val app = DeployTestUtils.createAppDesc()
var appId = ""
val driverEnv1 = RpcEnv.create("driver1", "localhost", 22344, conf, new SecurityManager(conf))
val fakeDriver1 = driverEnv1.setupEndpoint("driver", new RpcEndpoint {
override val rpcEnv: RpcEnv = driverEnv1
override def receive: PartialFunction[Any, Unit] = {
case RegisteredApplication(id, _) => appId = id
}
})
val drivers = new HashMap[String, String]
val workerEnv1 = RpcEnv.create("worker1", "localhost", 12344, conf, new SecurityManager(conf))
val fakeWorker1 = workerEnv1.setupEndpoint("worker", new RpcEndpoint {
override val rpcEnv: RpcEnv = workerEnv1
override def receive: PartialFunction[Any, Unit] = {
case RegisteredWorker(masterRef, _, _) =>
masterRef.send(WorkerLatestState("1", Nil, drivers.keys.toSeq))
case LaunchDriver(id, desc) =>
drivers(id) = id
master.self.send(RegisterApplication(app, fakeDriver1))
case KillDriver(driverId) =>
master.self.send(DriverStateChanged(driverId, DriverState.KILLED, None))
drivers.remove(driverId)
}
})
val worker1 = RegisterWorker(
"1",
"localhost",
9999,
fakeWorker1,
10,
1024,
"http://localhost:8080",
RpcAddress("localhost2", 10000))
master.self.send(worker1)
val driver = DeployTestUtils.createDriverDesc().copy(supervise = true)
master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))

eventually(timeout(10.seconds)) {
assert(!appId.isEmpty)
}

eventually(timeout(10.seconds)) {
val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

assert(masterState.workers(0).state == WorkerState.DEAD)
}

val driverEnv2 = RpcEnv.create("driver2", "localhost", 22345, conf, new SecurityManager(conf))
val fakeDriver2 = driverEnv2.setupEndpoint("driver", new RpcEndpoint {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe these duplicate code can be combined.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated, please have a look.

override val rpcEnv: RpcEnv = driverEnv2
override def receive: PartialFunction[Any, Unit] = {
case RegisteredApplication(id, _) => appId = id
}
})
val workerEnv2 = RpcEnv.create("worker2", "localhost", 12345, conf, new SecurityManager(conf))
val fakeWorker2 = workerEnv2.setupEndpoint("worker2", new RpcEndpoint {
override val rpcEnv: RpcEnv = workerEnv2
override def receive: PartialFunction[Any, Unit] = {
case LaunchDriver(_, _) =>
master.self.send(RegisterApplication(app, fakeDriver2))
}
})

appId = ""
master.self.send(RegisterWorker(
"2",
"localhost",
9998,
fakeWorker2,
10,
1024,
"http://localhost:8081",
RpcAddress("localhost2", 10001)))
eventually(timeout(10.seconds)) {
assert(!appId.isEmpty)
}

master.self.send(worker1)
eventually(timeout(10.seconds)) {
val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)

val worker = masterState.workers.filter(w => w.id == "1")
assert(worker.length == 1)
// make sure the `DriverStateChanged` arrives at Master.
assert(worker(0).drivers.isEmpty)
assert(masterState.activeDrivers.length == 1)
assert(masterState.activeDrivers(0).state == DriverState.RUNNING)
assert(masterState.activeApps.length == 1)
}
}
}