Skip to content

Commit 583716e

Browse files
committed
only reuse completed and not interrupted worker
1 parent ace2917 commit 583716e

File tree

2 files changed

+19
-1
lines changed

2 files changed

+19
-1
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,16 @@ private[spark] class PythonRDD(
7575

7676
context.addTaskCompletionListener { context =>
7777
writerThread.shutdownOnTaskCompletion()
78-
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
78+
if (!context.isInterrupted) {
79+
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
80+
} else {
81+
try {
82+
worker.close()
83+
} catch {
84+
case e: Exception =>
85+
logWarning("Failed to close worker socket", e)
86+
}
87+
}
7988
}
8089

8190
writerThread.start()

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,15 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
239239
private def stopDaemon() {
240240
synchronized {
241241
if (useDaemon) {
242+
while (idleWorkers.length > 0) {
243+
val worker = idleWorkers.dequeue()
244+
try {
245+
worker.close()
246+
} catch {
247+
case e: Exception =>
248+
logWarning("Failed to close worker socket", e)
249+
}
250+
}
242251
// Request shutdown of existing daemon by sending SIGTERM
243252
if (daemon != null) {
244253
daemon.destroy()

0 commit comments

Comments
 (0)