@@ -54,7 +54,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
5454
5555 def create (): Socket = {
5656 if (useDaemon) {
57- idleWorkers. synchronized {
57+ synchronized {
5858 if (idleWorkers.size > 0 ) {
5959 return idleWorkers.dequeue()
6060 }
@@ -216,18 +216,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
216216
217217 override def run () {
218218 while (true ) {
219- idleWorkers. synchronized {
219+ synchronized {
220220 if (lastActivity + IDLE_WORKER_TIMEOUT_MS < System .currentTimeMillis()) {
221- while (idleWorkers.length > 0 ) {
222- val worker = idleWorkers.dequeue()
223- try {
224- // the Python worker will exit after closing the socket
225- worker.close()
226- } catch {
227- case e : Exception =>
228- logWarning(" Failed to close worker socket" , e)
229- }
230- }
221+ cleanupIdleWorkers()
231222 lastActivity = System .currentTimeMillis()
232223 }
233224 }
@@ -236,18 +227,24 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
236227 }
237228 }
238229
230+ private def cleanupIdleWorkers () {
231+ while (idleWorkers.length > 0 ) {
232+ val worker = idleWorkers.dequeue()
233+ try {
234+ // the worker will exit after closing the socket
235+ worker.close()
236+ } catch {
237+ case e : Exception =>
238+ logWarning(" Failed to close worker socket" , e)
239+ }
240+ }
241+ }
242+
239243 private def stopDaemon () {
240244 synchronized {
241245 if (useDaemon) {
242- while (idleWorkers.length > 0 ) {
243- val worker = idleWorkers.dequeue()
244- try {
245- worker.close()
246- } catch {
247- case e : Exception =>
248- logWarning(" Failed to close worker socket" , e)
249- }
250- }
246+ cleanupIdleWorkers()
247+
251248 // Request shutdown of existing daemon by sending SIGTERM
252249 if (daemon != null ) {
253250 daemon.destroy()
@@ -266,25 +263,27 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
266263 }
267264
268265 def stopWorker (worker : Socket ) {
269- if (useDaemon) {
270- if (daemon != null ) {
271- daemonWorkers.get(worker).foreach { pid =>
272- // tell daemon to kill worker by pid
273- val output = new DataOutputStream (daemon.getOutputStream)
274- output.writeInt(pid)
275- output.flush()
276- daemon.getOutputStream.flush()
266+ synchronized {
267+ if (useDaemon) {
268+ if (daemon != null ) {
269+ daemonWorkers.get(worker).foreach { pid =>
270+ // tell daemon to kill worker by pid
271+ val output = new DataOutputStream (daemon.getOutputStream)
272+ output.writeInt(pid)
273+ output.flush()
274+ daemon.getOutputStream.flush()
275+ }
277276 }
277+ } else {
278+ simpleWorkers.get(worker).foreach(_.destroy())
278279 }
279- } else {
280- simpleWorkers.get(worker).foreach(_.destroy())
281280 }
282281 worker.close()
283282 }
284283
285284 def releaseWorker (worker : Socket ) {
286285 if (useDaemon && envVars.get(" SPARK_REUSE_WORKER" ).isDefined) {
287- idleWorkers. synchronized {
286+ synchronized {
288287 lastActivity = System .currentTimeMillis()
289288 idleWorkers.enqueue(worker)
290289 }
@@ -302,5 +301,5 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
302301
303302private object PythonWorkerFactory {
304303 val PROCESS_WAIT_TIMEOUT_MS = 10000
305- val IDLE_WORKER_TIMEOUT_MS = 60000
304+ val IDLE_WORKER_TIMEOUT_MS = 60000 // kill idle workers after 1 minute
306305}
0 commit comments