diff --git a/dev/pip-sanity-check.py b/dev/pip-sanity-check.py deleted file mode 100644 index c491005f49719..0000000000000 --- a/dev/pip-sanity-check.py +++ /dev/null @@ -1,38 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from __future__ import print_function - -from pyspark.sql import SparkSession -from pyspark.ml.param import Params -from pyspark.mllib.linalg import * -import sys - -if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("PipSanityCheck")\ - .getOrCreate() - sc = spark.sparkContext - rdd = sc.parallelize(range(100), 10) - value = rdd.reduce(lambda x, y: x + y) - if (value != 4950): - print("Value {0} did not match expected value.".format(value), file=sys.stderr) - sys.exit(-1) - print("Successfully ran pip sanity check") - - spark.stop() diff --git a/dev/run-pip-tests b/dev/run-pip-tests index d51dde12a03c5..990e741d3fe22 100755 --- a/dev/run-pip-tests +++ b/dev/run-pip-tests @@ -115,9 +115,9 @@ for python in "${PYTHON_EXECS[@]}"; do cd / echo "Run basic sanity check on pip installed version with spark-submit" - spark-submit "$FWDIR"/dev/pip-sanity-check.py + spark-submit "$FWDIR"/python/pyspark/standalone_tests.py echo "Run basic sanity check with import based" - python "$FWDIR"/dev/pip-sanity-check.py + python "$FWDIR"/python/pyspark/standalone_tests.py echo "Run the tests for context.py" python "$FWDIR"/python/pyspark/context.py diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 7164180a6a7b0..c3d2eb4fee82a 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -381,6 +381,12 @@ def __hash__(self): ] ) +pyspark_standalone = Module( + name="pyspark-standalone", + dependencies=[pyspark_core], + source_file_regexes=["python/pyspark/(java_gateway.py|context.py)"], + python_test_goals=["pyspark.standalone_tests"] +) pyspark_sql = Module( name="pyspark-sql", diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 3e704fe9bf6ec..c6a55b9a5df77 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -15,6 +15,9 @@ # limitations under the License. # +from __future__ import print_function + + import atexit import os import sys @@ -23,7 +26,10 @@ import shlex import socket import platform +import warnings from subprocess import Popen, PIPE +from threading import Thread + if sys.version >= '3': xrange = range @@ -39,8 +45,25 @@ def launch_gateway(conf=None): :param conf: spark configuration passed to spark-submit :return: """ + # If sys.stdout has been changed the child processes JVM will not respect that + # so grab the jvm output and copy it over if we are in a notebook. + redirect_shells = ["ZMQInteractiveShell", "StringIO"] + grab_jvm_output = (sys.stdout != sys.__stdout__ and + sys.stdout.__class__.__name__ in redirect_shells) + + if hasattr(sys, "pypy_translation_info") and grab_jvm_output: + warnings.warn( + "Unable to grab JVM output with PyPy." + "JVM log messages may not be delivered to the notebook.") + grab_jvm_putput = False + if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) + if grab_jvm_output: + warnings.warn( + "Gateway already launched, can not grab output." + "JVM messages may not be delivered to the notebook.", + RuntimeWarning) else: SPARK_HOME = _find_spark_home() # Launch the Py4j gateway using Spark's run command so that we pick up the @@ -70,14 +93,38 @@ def launch_gateway(conf=None): # Launch the Java gateway. # We open a pipe to stdin so that the Java gateway can die when the pipe is broken + proc_kwargs = {"env": env, "stdin": PIPE} if not on_windows: # Don't send ctrl-c / SIGINT to the Java gateway: + # However, preexec_fn not supported on Windows def preexec_func(): signal.signal(signal.SIGINT, signal.SIG_IGN) - proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env) - else: - # preexec_fn not supported on Windows - proc = Popen(command, stdin=PIPE, env=env) + + proc_kwargs["preexec_fn"] = preexec_func + + # If we need to copy stderr/stdout through, set up a pipe. + if grab_jvm_output: + proc_kwargs["stderr"] = PIPE + proc_kwargs["stdout"] = PIPE + proc_kwargs["bufsize"] = 1 + proc_kwargs["close_fds"] = True + + proc = Popen(command, **proc_kwargs) + + def connect(input_pipe, out_pipe): + """Connect the input pipe to the output. We can't use os.dup for IPython + or directly write to them (see https://github.com/ipython/ipython/pull/3072/).""" + for line in iter(input_pipe.readline, b''): + print(line, file=out_pipe) + input_pipe.close() + + if grab_jvm_output: + t = Thread(target=connect, args=(proc.stdout, sys.stdout)) + t.daemon = True + t.start() + t = Thread(target=connect, args=(proc.stderr, sys.stderr)) + t.daemon = True + t.start() gateway_port = None # We use select() here in order to avoid blocking indefinitely if the subprocess dies diff --git a/python/pyspark/standalone_tests.py b/python/pyspark/standalone_tests.py new file mode 100644 index 0000000000000..b9b183af9d0e5 --- /dev/null +++ b/python/pyspark/standalone_tests.py @@ -0,0 +1,95 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Standalone tests for PySpark - can be used to quickly test PySpark pip installation. When launched +without spark-submit verifies Jupyter redirection. +""" + +from __future__ import print_function + +import os +import sys + + +if sys.version >= "3": + from io import StringIO +else: + from StringIO import StringIO + +if __name__ == "__main__": + gateway_already_started = "PYSPARK_GATEWAY_PORT" in os.environ + try: + if not gateway_already_started and not hasattr(sys, "pypy_translation_info"): + print("Running redirection tests since not in existing gateway") + _old_stdout = sys.stdout + _old_stderr = sys.stderr + # Verify stdout/stderr overwrite support for jupyter + sys.stdout = new_stdout = StringIO() + sys.stderr = new_stderr = StringIO() + print("Redirected to {0} / {1}".format(sys.stdout, sys.stderr), file=_old_stdout) + elif hasattr(sys, "pypy_translation_info"): + print("Skipping redirection tests in pypy") + else: + print("Skipping redirection tests since gateway already exists") + + from pyspark.sql import SparkSession + if 'numpy' in sys.modules: + from pyspark.ml.param import Params + from pyspark.mllib.linalg import * + else: + print("Skipping pyspark ml import tests, missing numpy") + + spark = SparkSession\ + .builder\ + .appName("PipSanityCheck")\ + .getOrCreate() + print("Spark context created") + sc = spark.sparkContext + rdd = sc.parallelize(range(100), 10) + value = rdd.reduce(lambda x, y: x + y) + + if (value != 4950): + print("Value {0} did not match expected value.".format(value), file=sys.__stderr__) + sys.exit(-1) + + if not gateway_already_started: + try: + rdd2 = rdd.map(lambda x: str(x).startsWith("expected error")) + rdd2.collect() + except: + pass + + sys.stdout = _old_stdout + sys.stderr = _old_stderr + logs = new_stderr.getvalue() + new_stdout.getvalue() + + if logs.find("'str' object has no attribute 'startsWith'") == -1 and \ + logs.find("SystemError: unknown opcode") == -1: + print("Failed to find helpful error message, redirect failed?") + print("logs were {0}".format(logs)) + sys.exit(-1) + else: + print("Redirection tests passed") + print("Successfully ran pip sanity check") + except Exception as inst: + # If there is an uncaught exception print it, restore the stderr + print("Exception during testing, {0}".format(inst), file=sys.__stderr__) + sys.stderr = sys.__stderr__ + raise + + spark.stop() diff --git a/python/run-tests.py b/python/run-tests.py index 1341086f02db0..08031b50eb4e0 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -65,7 +65,11 @@ def print_red(text): raise Exception("Cannot find assembly build directory, please build Spark first.") -def run_individual_python_test(test_name, pyspark_python): +def run_generic_test(test_name, pyspark_python, launch_cmd): + """ + Run a generic python test. launch_cmd should be set to pyspark for normal tests or the same as + pyspark_python for standalone tests. + """ env = dict(os.environ) env.update({ 'SPARK_DIST_CLASSPATH': SPARK_DIST_CLASSPATH, @@ -79,7 +83,7 @@ def run_individual_python_test(test_name, pyspark_python): try: per_test_output = tempfile.TemporaryFile() retcode = subprocess.Popen( - [os.path.join(SPARK_HOME, "bin/pyspark"), test_name], + [launch_cmd, test_name], stderr=per_test_output, stdout=per_test_output, env=env).wait() except: LOGGER.exception("Got exception while running %s with %s", test_name, pyspark_python) @@ -112,6 +116,20 @@ def run_individual_python_test(test_name, pyspark_python): LOGGER.info("Finished test(%s): %s (%is)", pyspark_python, test_name, duration) +def run_standalone_python_test(test_name, pyspark_python): + """ + Runs a standalone python test. This verifies PySpark launch behaviour when starting the JVM from + Python side instead of JVM starting Python. + """ + run_generic_test(test_name, pyspark_python, launch_cmd=pyspark_python) + + +def run_individual_python_test(test_name, pyspark_python): + """Run a Python test launching the JVM first.""" + launch_cmd = os.path.join(SPARK_HOME, "bin/pyspark") + run_generic_test(test_name, pyspark_python, launch_cmd) + + def get_default_python_executables(): python_execs = [x for x in ["python2.7", "python3.4", "pypy"] if which(x)] if "python2.7" not in python_execs: @@ -198,7 +216,10 @@ def process_queue(task_queue): except Queue.Empty: break try: - run_individual_python_test(test_goal, python_exec) + if test_goal == 'pyspark.standalone': + run_standalone_python_test(test_goal, python_exec) + else: + run_individual_python_test(test_goal, python_exec) finally: task_queue.task_done()