From c0bb75eed56f55971eb9f9d72b08192b52dcd930 Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Sun, 7 May 2017 16:36:58 +0800 Subject: [PATCH 1/9] Remove driver when relaunching. --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 53384e737325..44313f5225cd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -800,7 +800,7 @@ private[deploy] class Master( driver.state = DriverState.RELAUNCHING waitingDrivers += driver schedule() - } + private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef): ApplicationInfo = { From 9b599d5e90af06ee1d30eccb9f44337783c7ba2e Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Sun, 7 May 2017 17:34:44 +0800 Subject: [PATCH 2/9] Add some. --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 44313f5225cd..53384e737325 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -800,7 +800,7 @@ private[deploy] class Master( driver.state = DriverState.RELAUNCHING waitingDrivers += driver schedule() - + } private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef): ApplicationInfo = { From 7a96a21a6703a4ca4229b42d2133da6000ccc437 Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Wed, 24 May 2017 16:57:21 +0800 Subject: [PATCH 3/9] Add a test case. --- .../spark/deploy/master/MasterSuite.scala | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 539264652d7d..978249f102e3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -21,6 +21,7 @@ import java.util.Date import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap import scala.concurrent.duration._ import scala.io.Source import scala.language.postfixOps @@ -499,4 +500,103 @@ class MasterSuite extends SparkFunSuite assert(receivedMasterAddress === RpcAddress("localhost2", 10000)) } } + + test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") { + val conf = new SparkConf().set("spark.worker.timeout", "1") + val master = makeMaster(conf) + master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) + eventually(timeout(10.seconds)) { + val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) + assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") + } + + val app = DeployTestUtils.createAppDesc() + var appId = "" + val driverEnv1 = RpcEnv.create("driver1", "localhost", 22344, conf, new SecurityManager(conf)) + val fakeDriver1 = driverEnv1.setupEndpoint("driver", new RpcEndpoint { + override val rpcEnv: RpcEnv = driverEnv1 + override def receive: PartialFunction[Any, Unit] = { + case RegisteredApplication(id, _) => appId = id + } + }) + val drivers = new HashMap[String, String] + val workerEnv1 = RpcEnv.create("worker1", "localhost", 12344, conf, new SecurityManager(conf)) + val fakeWorker1 = workerEnv1.setupEndpoint("worker", new RpcEndpoint { + override val rpcEnv: RpcEnv = workerEnv1 + override def receive: PartialFunction[Any, Unit] = { + case RegisteredWorker(masterRef, _, _) => + masterRef.send(WorkerLatestState("1", Nil, drivers.keys.toSeq)) + case LaunchDriver(id, desc) => + drivers(id) = id + master.self.send(RegisterApplication(app, fakeDriver1)) + case KillDriver(driverId) => + master.self.send(DriverStateChanged(driverId, DriverState.KILLED, None)) + drivers.remove(driverId) + } + }) + val worker1 = RegisterWorker( + "1", + "localhost", + 9999, + fakeWorker1, + 10, + 1024, + "http://localhost:8080", + RpcAddress("localhost2", 10000)) + master.self.send(worker1) + val driver = DeployTestUtils.createDriverDesc().copy(supervise = true) + master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)) + + eventually(timeout(10.seconds)) { + assert(!appId.isEmpty) + } + + eventually(timeout(10.seconds)) { + val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) + assert(masterState.workers(0).state == WorkerState.DEAD) + } + + val driverEnv2 = RpcEnv.create("driver2", "localhost", 22345, conf, new SecurityManager(conf)) + val fakeDriver2 = driverEnv2.setupEndpoint("driver", new RpcEndpoint { + override val rpcEnv: RpcEnv = driverEnv2 + override def receive: PartialFunction[Any, Unit] = { + case RegisteredApplication(id, _) => appId = id + } + }) + val workerEnv2 = RpcEnv.create("worker2", "localhost", 12345, conf, new SecurityManager(conf)) + val fakeWorker2 = workerEnv2.setupEndpoint("worker2", new RpcEndpoint { + override val rpcEnv: RpcEnv = workerEnv2 + override def receive: PartialFunction[Any, Unit] = { + case LaunchDriver(_, _) => + master.self.send(RegisterApplication(app, fakeDriver2)) + } + }) + + appId = "" + master.self.send(RegisterWorker( + "2", + "localhost", + 9998, + fakeWorker2, + 10, + 1024, + "http://localhost:8081", + RpcAddress("localhost2", 10001))) + eventually(timeout(10.seconds)) { + assert(!appId.isEmpty) + } + + master.self.send(worker1) + eventually(timeout(10.seconds)) { + val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) + + val worker = masterState.workers.filter(w => w.id == "1") + assert(worker.length == 1) + // make sure the `DriverStateChanged` arrives at Master. + assert(worker(0).drivers.isEmpty) + assert(masterState.activeDrivers.length == 1) + assert(masterState.activeDrivers(0).state == DriverState.RUNNING) + assert(masterState.activeApps.length == 1) + } + } } From 64570a7fe714f78d305ddde818cb5265e716460c Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Sun, 7 May 2017 16:36:58 +0800 Subject: [PATCH 4/9] Remove driver when relaunching. --- .../org/apache/spark/deploy/master/Master.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 53384e737325..12136b265c9e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -783,6 +783,10 @@ private[deploy] class Master( exec.state = ExecutorState.LOST exec.application.removeExecutor(exec) } + val failedApps = apps.find(p => p.driver.address.host == worker.endpoint.address.host) + for (app <- failedApps) { + finishApplication(app) + } for (driver <- worker.drivers.values) { if (driver.desc.supervise) { logInfo(s"Re-launching ${driver.id}") @@ -796,11 +800,14 @@ private[deploy] class Master( } private def relaunchDriver(driver: DriverInfo) { - driver.worker = None - driver.state = DriverState.RELAUNCHING - waitingDrivers += driver + removeDriver(driver.id, DriverState.RELAUNCHING, None) + val newDriver = createDriver(driver.desc) + persistenceEngine.addDriver(newDriver) + drivers.add(newDriver) + waitingDrivers += newDriver + schedule() - } + private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef): ApplicationInfo = { From d02fee2f0f1cfce181688ac6b885f53ddbce695e Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Wed, 24 May 2017 17:23:22 +0800 Subject: [PATCH 5/9] Minor update. --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 12136b265c9e..933209048cce 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -807,7 +807,7 @@ private[deploy] class Master( waitingDrivers += newDriver schedule() - + } private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef): ApplicationInfo = { From 9ea20611f28e845e9c74626aee3e191656fa01bb Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Thu, 25 May 2017 00:11:03 +0800 Subject: [PATCH 6/9] Separate clearing failed app into another pr. --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 933209048cce..692107d4717c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -783,10 +783,6 @@ private[deploy] class Master( exec.state = ExecutorState.LOST exec.application.removeExecutor(exec) } - val failedApps = apps.find(p => p.driver.address.host == worker.endpoint.address.host) - for (app <- failedApps) { - finishApplication(app) - } for (driver <- worker.drivers.values) { if (driver.desc.supervise) { logInfo(s"Re-launching ${driver.id}") From 6ab9a0f5a62e4ec3c9757f748ff5e278d4741d25 Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Thu, 25 May 2017 15:56:02 +0800 Subject: [PATCH 7/9] Remove duplicate code in tests. --- .../spark/deploy/master/MasterSuite.scala | 110 +++++++++--------- 1 file changed, 53 insertions(+), 57 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 978249f102e3..3daf44e99ca8 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.master import java.util.Date import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap @@ -35,7 +36,41 @@ import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy._ import org.apache.spark.deploy.DeployMessages._ -import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEnv} +import org.apache.spark.rpc._ + +object MockWorker { + val counter = new AtomicInteger(10000) +} + +class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extends RpcEndpoint { + val seq = MockWorker.counter.incrementAndGet() + val id = seq.toString + override val rpcEnv: RpcEnv = RpcEnv.create("worker", "localhost", seq, + conf, new SecurityManager(conf)) + var appRegistered = false + def newDriver(): RpcEndpointRef = { + val name = s"driver_${drivers.size}" + rpcEnv.setupEndpoint(name, new RpcEndpoint { + override val rpcEnv: RpcEnv = MockWorker.this.rpcEnv + override def receive: PartialFunction[Any, Unit] = { + case RegisteredApplication(_, _) => appRegistered = true + } + }) + } + + val appDesc = DeployTestUtils.createAppDesc() + val drivers = new HashMap[String, String] + override def receive: PartialFunction[Any, Unit] = { + case RegisteredWorker(masterRef, _, _) => + masterRef.send(WorkerLatestState("1", Nil, drivers.keys.toSeq)) + case LaunchDriver(driverId, desc) => + drivers(driverId) = driverId + master.send(RegisterApplication(appDesc, newDriver())) + case KillDriver(driverId) => + master.send(DriverStateChanged(driverId, DriverState.KILLED, None)) + drivers.remove(driverId) + } +} class MasterSuite extends SparkFunSuite with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter { @@ -509,46 +544,23 @@ class MasterSuite extends SparkFunSuite val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") } - - val app = DeployTestUtils.createAppDesc() - var appId = "" - val driverEnv1 = RpcEnv.create("driver1", "localhost", 22344, conf, new SecurityManager(conf)) - val fakeDriver1 = driverEnv1.setupEndpoint("driver", new RpcEndpoint { - override val rpcEnv: RpcEnv = driverEnv1 - override def receive: PartialFunction[Any, Unit] = { - case RegisteredApplication(id, _) => appId = id - } - }) - val drivers = new HashMap[String, String] - val workerEnv1 = RpcEnv.create("worker1", "localhost", 12344, conf, new SecurityManager(conf)) - val fakeWorker1 = workerEnv1.setupEndpoint("worker", new RpcEndpoint { - override val rpcEnv: RpcEnv = workerEnv1 - override def receive: PartialFunction[Any, Unit] = { - case RegisteredWorker(masterRef, _, _) => - masterRef.send(WorkerLatestState("1", Nil, drivers.keys.toSeq)) - case LaunchDriver(id, desc) => - drivers(id) = id - master.self.send(RegisterApplication(app, fakeDriver1)) - case KillDriver(driverId) => - master.self.send(DriverStateChanged(driverId, DriverState.KILLED, None)) - drivers.remove(driverId) - } - }) - val worker1 = RegisterWorker( - "1", + val worker1 = new MockWorker(master.self) + worker1.rpcEnv.setupEndpoint("worker", worker1) + val worker1Reg = RegisterWorker( + worker1.id, "localhost", - 9999, - fakeWorker1, + 9998, + worker1.self, 10, 1024, "http://localhost:8080", RpcAddress("localhost2", 10000)) - master.self.send(worker1) + master.self.send(worker1Reg) val driver = DeployTestUtils.createDriverDesc().copy(supervise = true) master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)) eventually(timeout(10.seconds)) { - assert(!appId.isEmpty) + assert(!worker1.appRegistered) } eventually(timeout(10.seconds)) { @@ -556,46 +568,30 @@ class MasterSuite extends SparkFunSuite assert(masterState.workers(0).state == WorkerState.DEAD) } - val driverEnv2 = RpcEnv.create("driver2", "localhost", 22345, conf, new SecurityManager(conf)) - val fakeDriver2 = driverEnv2.setupEndpoint("driver", new RpcEndpoint { - override val rpcEnv: RpcEnv = driverEnv2 - override def receive: PartialFunction[Any, Unit] = { - case RegisteredApplication(id, _) => appId = id - } - }) - val workerEnv2 = RpcEnv.create("worker2", "localhost", 12345, conf, new SecurityManager(conf)) - val fakeWorker2 = workerEnv2.setupEndpoint("worker2", new RpcEndpoint { - override val rpcEnv: RpcEnv = workerEnv2 - override def receive: PartialFunction[Any, Unit] = { - case LaunchDriver(_, _) => - master.self.send(RegisterApplication(app, fakeDriver2)) - } - }) - - appId = "" + val worker2 = new MockWorker(master.self) + worker2.rpcEnv.setupEndpoint("worker", worker2) master.self.send(RegisterWorker( - "2", + worker2.id, "localhost", - 9998, - fakeWorker2, + 9999, + worker2.self, 10, 1024, "http://localhost:8081", - RpcAddress("localhost2", 10001))) + RpcAddress("localhost", 10001))) eventually(timeout(10.seconds)) { - assert(!appId.isEmpty) + assert(!worker2.appRegistered) } - master.self.send(worker1) + master.self.send(worker1Reg) eventually(timeout(10.seconds)) { val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) - val worker = masterState.workers.filter(w => w.id == "1") + val worker = masterState.workers.filter(w => w.id == worker1.id) assert(worker.length == 1) // make sure the `DriverStateChanged` arrives at Master. assert(worker(0).drivers.isEmpty) assert(masterState.activeDrivers.length == 1) - assert(masterState.activeDrivers(0).state == DriverState.RUNNING) assert(masterState.activeApps.length == 1) } } From da0f977f2846d7051102a32521a1704dca74ed12 Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Fri, 26 May 2017 19:44:56 +0800 Subject: [PATCH 8/9] Add some regression tests. --- .../spark/deploy/master/MasterSuite.scala | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 3daf44e99ca8..cf706fbb7ef2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.HashMap import scala.concurrent.duration._ import scala.io.Source @@ -47,28 +48,37 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend val id = seq.toString override val rpcEnv: RpcEnv = RpcEnv.create("worker", "localhost", seq, conf, new SecurityManager(conf)) - var appRegistered = false - def newDriver(): RpcEndpointRef = { + var apps = new mutable.HashMap[String, String]() + val driverIdToAppId = new mutable.HashMap[String, String]() + def newDriver(driverId: String): RpcEndpointRef = { val name = s"driver_${drivers.size}" rpcEnv.setupEndpoint(name, new RpcEndpoint { override val rpcEnv: RpcEnv = MockWorker.this.rpcEnv override def receive: PartialFunction[Any, Unit] = { - case RegisteredApplication(_, _) => appRegistered = true + case RegisteredApplication(appId, _) => + apps(appId) = appId + driverIdToAppId(driverId) = appId } }) } val appDesc = DeployTestUtils.createAppDesc() - val drivers = new HashMap[String, String] + val drivers = new mutable.HashMap[String, String] override def receive: PartialFunction[Any, Unit] = { case RegisteredWorker(masterRef, _, _) => - masterRef.send(WorkerLatestState("1", Nil, drivers.keys.toSeq)) + masterRef.send(WorkerLatestState(id, Nil, drivers.keys.toSeq)) case LaunchDriver(driverId, desc) => drivers(driverId) = driverId - master.send(RegisterApplication(appDesc, newDriver())) + master.send(RegisterApplication(appDesc, newDriver(driverId))) case KillDriver(driverId) => master.send(DriverStateChanged(driverId, DriverState.KILLED, None)) drivers.remove(driverId) + driverIdToAppId.get(driverId) match { + case Some(appId) => + apps.remove(appId) + master.send(UnregisterApplication(appId)) + } + driverIdToAppId.remove(driverId) } } @@ -560,7 +570,7 @@ class MasterSuite extends SparkFunSuite master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)) eventually(timeout(10.seconds)) { - assert(!worker1.appRegistered) + assert(worker1.apps.nonEmpty) } eventually(timeout(10.seconds)) { @@ -580,7 +590,7 @@ class MasterSuite extends SparkFunSuite "http://localhost:8081", RpcAddress("localhost", 10001))) eventually(timeout(10.seconds)) { - assert(!worker2.appRegistered) + assert(worker2.apps.nonEmpty) } master.self.send(worker1Reg) @@ -591,6 +601,10 @@ class MasterSuite extends SparkFunSuite assert(worker.length == 1) // make sure the `DriverStateChanged` arrives at Master. assert(worker(0).drivers.isEmpty) + assert(worker1.apps.isEmpty) + assert(worker1.drivers.isEmpty) + assert(worker2.apps.size == 1) + assert(worker2.drivers.size == 1) assert(masterState.activeDrivers.length == 1) assert(masterState.activeApps.length == 1) } From 9ddf23af3ee5853c5d1b53a05afc38f38509a8c2 Mon Sep 17 00:00:00 2001 From: Li Yichao Date: Wed, 14 Jun 2017 17:56:23 +0800 Subject: [PATCH 9/9] Use Set to store drivers. --- .../org/apache/spark/deploy/master/MasterSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index c64115ec1772..6bb0eec04078 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -66,16 +66,16 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend } val appDesc = DeployTestUtils.createAppDesc() - val drivers = new mutable.HashMap[String, String] + val drivers = mutable.HashSet[String]() override def receive: PartialFunction[Any, Unit] = { case RegisteredWorker(masterRef, _, _) => - masterRef.send(WorkerLatestState(id, Nil, drivers.keys.toSeq)) + masterRef.send(WorkerLatestState(id, Nil, drivers.toSeq)) case LaunchDriver(driverId, desc) => - drivers(driverId) = driverId + drivers += driverId master.send(RegisterApplication(appDesc, newDriver(driverId))) case KillDriver(driverId) => master.send(DriverStateChanged(driverId, DriverState.KILLED, None)) - drivers.remove(driverId) + drivers -= driverId driverIdToAppId.get(driverId) match { case Some(appId) => apps.remove(appId)