Skip to content

Commit ff5be9a

Browse files
sryzapwendell
authored andcommitted
SPARK-1004. PySpark on YARN
This reopens https://github.com/apache/incubator-spark/pull/640 against the new repo Author: Sandy Ryza <[email protected]> Closes #30 from sryza/sandy-spark-1004 and squashes the following commits: 89889d4 [Sandy Ryza] Move unzipping py4j to the generate-resources phase so that it gets included in the jar the first time 5165a02 [Sandy Ryza] Fix docs fd0df79 [Sandy Ryza] PySpark on YARN
1 parent 7025dda commit ff5be9a

File tree

11 files changed

+85
-19
lines changed

11 files changed

+85
-19
lines changed

bin/pyspark

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ export PYSPARK_PYTHON
4646

4747
# Add the PySpark classes to the Python path:
4848
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
49+
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
4950

5051
# Load the PySpark shell.py script when ./pyspark is used interactively:
5152
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP

bin/pyspark2.cmd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ rem Figure out which Python to use.
4545
if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python
4646

4747
set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
48+
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%
4849

4950
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
5051
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py

core/pom.xml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,48 @@
294294
</environmentVariables>
295295
</configuration>
296296
</plugin>
297+
<!-- Unzip py4j so we can include its files in the jar -->
298+
<plugin>
299+
<groupId>org.codehaus.mojo</groupId>
300+
<artifactId>exec-maven-plugin</artifactId>
301+
<version>1.2.1</version>
302+
<executions>
303+
<execution>
304+
<phase>generate-resources</phase>
305+
<goals>
306+
<goal>exec</goal>
307+
</goals>
308+
</execution>
309+
</executions>
310+
<configuration>
311+
<executable>unzip</executable>
312+
<workingDirectory>../python</workingDirectory>
313+
<arguments>
314+
<argument>-o</argument>
315+
<argument>lib/py4j*.zip</argument>
316+
<argument>-d</argument>
317+
<argument>build</argument>
318+
</arguments>
319+
</configuration>
320+
</plugin>
297321
</plugins>
322+
323+
<resources>
324+
<resource>
325+
<directory>src/main/resources</directory>
326+
</resource>
327+
<resource>
328+
<directory>../python</directory>
329+
<includes>
330+
<include>pyspark/*.py</include>
331+
</includes>
332+
</resource>
333+
<resource>
334+
<directory>../python/build</directory>
335+
<includes>
336+
<include>py4j/*.py</include>
337+
</includes>
338+
</resource>
339+
</resources>
298340
</build>
299341
</project>

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
7878
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
7979

8080
// Create and start the worker
81-
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
82-
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
81+
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
8382
val workerEnv = pb.environment()
8483
workerEnv.putAll(envVars)
85-
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
86-
workerEnv.put("PYTHONPATH", pythonPath)
8784
val worker = pb.start()
8885

8986
// Redirect the worker's stderr to ours
@@ -154,12 +151,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
154151

155152
try {
156153
// Create and start the daemon
157-
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
158-
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
154+
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
159155
val workerEnv = pb.environment()
160156
workerEnv.putAll(envVars)
161-
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
162-
workerEnv.put("PYTHONPATH", pythonPath)
163157
daemon = pb.start()
164158

165159
// Redirect the stderr to ours

docs/python-programming-guide.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.
6363
Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
6464
The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`.
6565

66+
# Running PySpark on YARN
67+
68+
To run PySpark against a YARN cluster, simply set the MASTER environment variable to "yarn-client".
6669

6770
# Interactive Use
6871

python/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
11
*.pyc
22
docs/
3+
pyspark.egg-info
4+
build/
5+
dist/

python/lib/PY4J_VERSION.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

python/pyspark/__init__.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,6 @@
4949
Main entry point for accessing data stored in Apache Hive..
5050
"""
5151

52-
53-
54-
import sys
55-
import os
56-
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j-0.8.1-src.zip"))
57-
58-
5952
from pyspark.conf import SparkConf
6053
from pyspark.context import SparkContext
6154
from pyspark.sql import SQLContext

python/pyspark/java_gateway.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@
2424
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
2525

2626

27-
SPARK_HOME = os.environ["SPARK_HOME"]
27+
def launch_gateway():
28+
SPARK_HOME = os.environ["SPARK_HOME"]
2829

30+
set_env_vars_for_yarn()
2931

30-
def launch_gateway():
3132
# Launch the Py4j gateway using Spark's run command so that we pick up the
3233
# proper classpath and settings from spark-env.sh
3334
on_windows = platform.system() == "Windows"
@@ -70,3 +71,27 @@ def run(self):
7071
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
7172
java_import(gateway.jvm, "scala.Tuple2")
7273
return gateway
74+
75+
def set_env_vars_for_yarn():
76+
# Add the spark jar, which includes the pyspark files, to the python path
77+
env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", ""))
78+
if "PYTHONPATH" in env_map:
79+
env_map["PYTHONPATH"] += ":spark.jar"
80+
else:
81+
env_map["PYTHONPATH"] = "spark.jar"
82+
83+
os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items())
84+
85+
def parse_env(env_str):
86+
# Turns a comma-separated of env settings into a dict that maps env vars to
87+
# their values.
88+
env = {}
89+
for var_str in env_str.split(","):
90+
parts = var_str.split("=")
91+
if len(parts) == 2:
92+
env[parts[0]] = parts[1]
93+
elif len(var_str) > 0:
94+
print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str
95+
sys.exit(1)
96+
97+
return env

python/pyspark/tests.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030

3131
from pyspark.context import SparkContext
3232
from pyspark.files import SparkFiles
33-
from pyspark.java_gateway import SPARK_HOME
3433
from pyspark.serializers import read_int
3534

3635

36+
SPARK_HOME = os.environ["SPARK_HOME"]
37+
38+
3739
class PySparkTestCase(unittest.TestCase):
3840

3941
def setUp(self):

0 commit comments

Comments
 (0)