Skip to content

Commit dafe343

Browse files
andrewor14pwendell
authored andcommitted
[HOTFIX] Wait for EOF only for the PySpark shell
In `SparkSubmitDriverBootstrapper`, we wait for the parent process to send us an `EOF` before finishing the application. This is applicable for the PySpark shell because we terminate the application the same way. However if we run a python application, for instance, the JVM actually never exits unless it receives a manual EOF from the user. This is causing a few tests to timeout. We only need to do this for the PySpark shell because Spark submit runs as a python subprocess only in this case. Thus, the normal Spark shell doesn't need to go through this case even though it is also a REPL. Thanks davies for reporting this. Author: Andrew Or <[email protected]> Closes #2170 from andrewor14/bootstrap-hotfix and squashes the following commits: 42963f5 [Andrew Or] Do not wait for EOF unless this is the pyspark shell
1 parent f38fab9 commit dafe343

File tree

2 files changed

+17
-11
lines changed

2 files changed

+17
-11
lines changed

bin/pyspark

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ if [[ "$1" =~ \.py$ ]]; then
102102
gatherSparkSubmitOpts "$@"
103103
exec $FWDIR/bin/spark-submit "${SUBMISSION_OPTS[@]}" $primary "${APPLICATION_OPTS[@]}"
104104
else
105+
# PySpark shell requires special handling downstream
106+
export PYSPARK_SHELL=1
105107
# Only use ipython if no command line arguments were provided [SPARK-1134]
106108
if [[ "$IPYTHON" = "1" ]]; then
107109
exec ${PYSPARK_PYTHON:-ipython} $IPYTHON_OPTS

core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -132,25 +132,29 @@ private[spark] object SparkSubmitDriverBootstrapper {
132132
val builder = new ProcessBuilder(filteredCommand)
133133
val process = builder.start()
134134

135-
// Redirect stdin, stdout, and stderr to/from the child JVM
135+
// Redirect stdout and stderr from the child JVM
136136
val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
137137
val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
138138
stdoutThread.start()
139139
stderrThread.start()
140140

141-
// In Windows, the subprocess reads directly from our stdin, so we should avoid spawning
142-
// a thread that contends with the subprocess in reading from System.in.
143-
if (Utils.isWindows) {
144-
// For the PySpark shell, the termination of this process is handled in java_gateway.py
145-
process.waitFor()
146-
} else {
147-
// Terminate on broken pipe, which signals that the parent process has exited. This is
148-
// important for the PySpark shell, where Spark submit itself is a python subprocess.
141+
// Redirect stdin to child JVM only if we're not running Windows. This is because the
142+
// subprocess there already reads directly from our stdin, so we should avoid spawning a
143+
// thread that contends with the subprocess in reading from System.in.
144+
val isWindows = Utils.isWindows
145+
val isPySparkShell = sys.env.contains("PYSPARK_SHELL")
146+
if (!isWindows) {
149147
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
150148
stdinThread.start()
151-
stdinThread.join()
152-
process.destroy()
149+
// For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM
150+
// should terminate on broken pipe, which signals that the parent process has exited. In
151+
// Windows, the termination logic for the PySpark shell is handled in java_gateway.py
152+
if (isPySparkShell) {
153+
stdinThread.join()
154+
process.destroy()
155+
}
153156
}
157+
process.waitFor()
154158
}
155159

156160
}

0 commit comments

Comments
 (0)