@@ -58,30 +58,28 @@ private[spark] class ExecutorRunner(
5858 override def run () { fetchAndRunExecutor() }
5959 }
6060 workerThread.start()
61-
6261 // Shutdown hook that kills actors on shutdown.
6362 shutdownHook = new Thread () {
6463 override def run () {
65- if (process != null ) {
66- logInfo(" Shutdown hook killing child process." )
67- process.destroy()
68- process.waitFor()
69- }
64+ killProcess()
7065 }
7166 }
7267 Runtime .getRuntime.addShutdownHook(shutdownHook)
7368 }
7469
70+ private def killProcess () {
71+ if (process != null ) {
72+ logInfo(" Killing process!" )
73+ process.destroy()
74+ process.waitFor()
75+ }
76+ }
77+
7578 /** Stop this executor runner, including killing the process it launched */
7679 def kill () {
7780 if (workerThread != null ) {
7881 workerThread.interrupt()
7982 workerThread = null
80- if (process != null ) {
81- logInfo(" Killing process!" )
82- process.destroy()
83- process.waitFor()
84- }
8583 state = ExecutorState .KILLED
8684 worker ! ExecutorStateChanged (appId, execId, state, None , None )
8785 Runtime .getRuntime.removeShutdownHook(shutdownHook)
@@ -126,7 +124,6 @@ private[spark] class ExecutorRunner(
126124 // parent process for the executor command
127125 env.put(" SPARK_LAUNCH_WITH_SCALA" , " 0" )
128126 process = builder.start()
129-
130127 val header = " Spark Executor Command: %s\n %s\n\n " .format(
131128 command.mkString(" \" " , " \" \" " , " \" " ), " =" * 40 )
132129
@@ -146,9 +143,10 @@ private[spark] class ExecutorRunner(
146143 val message = " Command exited with code " + exitCode
147144 worker ! ExecutorStateChanged (appId, execId, state, Some (message), Some (exitCode))
148145 } catch {
149- case interrupted : InterruptedException =>
146+ case interrupted : InterruptedException => {
150147 logInfo(" Runner thread for executor " + fullId + " interrupted" )
151-
148+ killProcess()
149+ }
152150 case e : Exception => {
153151 logError(" Error running executor" , e)
154152 if (process != null ) {
0 commit comments