diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java index 0f302e96480..cabcabcc5ba 100644 --- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java @@ -296,15 +296,31 @@ public void testIPythonPlotting() throws InterpreterException, InterruptedExcept "df = pd.DataFrame(np.random.randn(1000, 4), index=idx, columns=list('ABCD')).cumsum()\n" + "import hvplot.pandas\n" + "df.hvplot()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(context.out.toInterpreterResultMessage().get(0).getData(), + InterpreterResult.Code.SUCCESS, result.code()); interpreterResultMessages = context.out.toInterpreterResultMessage(); - assertEquals(4, interpreterResultMessages.size()); - assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType()); - assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType()); - assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType()); - assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType()); - // docs_json is the source data of plotting which bokeh would use to render the plotting. - assertTrue(interpreterResultMessages.get(3).getData().contains("docs_json")); + + if (isPython2) { + // python 2 will have one extra output + // %text /home/travis/miniconda/lib/python2.7/site-packages/param/parameterized.py:2812: + // UserWarning: Config option `use_jedi` not recognized by `IPCompleter`. + // return inst.__call__(*args,**params) + assertEquals(5, interpreterResultMessages.size()); + assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType()); + assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType()); + assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType()); + assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(4).getType()); + // docs_json is the source data of plotting which bokeh would use to render the plotting. + assertTrue(interpreterResultMessages.get(4).getData().contains("docs_json")); + } else { + assertEquals(4, interpreterResultMessages.size()); + assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType()); + assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType()); + assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType()); + assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType()); + // docs_json is the source data of plotting which bokeh would use to render the plotting. + assertTrue(interpreterResultMessages.get(3).getData().contains("docs_json")); + } } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java index 6436d92800a..b1e1baf039a 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java @@ -23,11 +23,13 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.apache.zeppelin.python.IPythonInterpreter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.PrintStream; import java.util.Map; import java.util.Properties; @@ -91,25 +93,35 @@ public ZeppelinContext buildZeppelinContext() { @Override public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException { - Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties); - InterpreterContext.set(context); - String jobGroupId = Utils.buildJobGroupId(context); - String jobDesc = Utils.buildJobDesc(context); - String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')"; - InterpreterResult result = super.interpret(setJobGroupStmt, context); - if (result.code().equals(InterpreterResult.Code.ERROR)) { - return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup"); + // redirect java stdout/stdout to interpreter output. Because pyspark may call java code. + PrintStream originalStdout = System.out; + PrintStream originalStderr = System.err; + try { + System.setOut(new PrintStream(context.out)); + System.setErr(new PrintStream(context.out)); + Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties); + InterpreterContext.set(context); + String jobGroupId = Utils.buildJobGroupId(context); + String jobDesc = Utils.buildJobDesc(context); + String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')"; + InterpreterResult result = super.interpret(setJobGroupStmt, context); + if (result.code().equals(InterpreterResult.Code.ERROR)) { + return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup"); + } + String pool = "None"; + if (context.getLocalProperties().containsKey("pool")) { + pool = "'" + context.getLocalProperties().get("pool") + "'"; + } + String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")"; + result = super.interpret(setPoolStmt, context); + if (result.code().equals(InterpreterResult.Code.ERROR)) { + return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool"); + } + return super.interpret(st, context); + } finally { + System.setOut(originalStdout); + System.setErr(originalStderr); } - String pool = "None"; - if (context.getLocalProperties().containsKey("pool")) { - pool = "'" + context.getLocalProperties().get("pool") + "'"; - } - String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")"; - result = super.interpret(setPoolStmt, context); - if (result.code().equals(InterpreterResult.Code.ERROR)) { - return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool"); - } - return super.interpret(st, context); } @Override diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 94073e0f86a..f180799e029 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -25,6 +25,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.apache.zeppelin.python.IPythonInterpreter; import org.apache.zeppelin.python.PythonInterpreter; import org.slf4j.Logger; @@ -32,6 +33,7 @@ import java.io.File; import java.io.IOException; +import java.io.PrintStream; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; @@ -125,8 +127,18 @@ protected ZeppelinContext createZeppelinContext() { @Override public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException { - Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties); - return super.interpret(st, context); + // redirect java stdout/stdout to interpreter output. Because pyspark may call java code. + PrintStream originalStdout = System.out; + PrintStream originalStderr = System.err; + try { + System.setOut(new PrintStream(context.out)); + System.setErr(new PrintStream(context.out)); + Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties); + return super.interpret(st, context); + } finally { + System.setOut(originalStdout); + System.setErr(originalStderr); + } } @Override diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java index c64ba711fed..40ab8511dfd 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java @@ -217,6 +217,14 @@ public void run() { assertTrue(completions.size() > 0); completions.contains(new InterpreterCompletion("sc", "sc", "")); + // python call java via py4j + context = createInterpreterContext(mockIntpEventClient); + result = interpreter.interpret("sc._jvm.java.lang.System.out.println(\"hello world\")", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.toInterpreterResultMessage(); + assertEquals(1, interpreterResultMessages.size()); + assertEquals("hello world\n", interpreterResultMessages.get(0).getData()); + // pyspark streaming TODO(zjffdu) disable pyspark streaming test temporary context = createInterpreterContext(mockIntpEventClient); // result = interpreter.interpret( diff --git a/testing/install_external_dependencies.sh b/testing/install_external_dependencies.sh index b6cc1bbf09f..a1fdc32d592 100755 --- a/testing/install_external_dependencies.sh +++ b/testing/install_external_dependencies.sh @@ -36,11 +36,11 @@ if [[ -n "$PYTHON" ]] ; then if [[ "$PYTHON" == "2" ]] ; then pip install -q numpy==1.14.5 pandas==0.21.1 matplotlib==2.1.1 scipy==1.2.1 grpcio==1.19.0 bkzep==0.6.1 hvplot==0.5.2 \ - protobuf==3.7.0 pandasql==0.7.3 ipython==5.8.0 ipykernel==4.10.0 bokeh==1.3.4 + protobuf==3.7.0 pandasql==0.7.3 ipython==5.8.0 ipykernel==4.10.0 bokeh==1.3.4 panel==0.6.0 holoviews==1.12.3 else pip install -q pycodestyle==2.5.0 pip install -q numpy==1.17.3 pandas==0.25.0 scipy==1.3.1 grpcio==1.19.0 bkzep==0.6.1 hvplot==0.5.2 protobuf==3.10.0 \ - pandasql==0.7.3 ipython==7.8.0 matplotlib==3.0.3 ipykernel==5.1.2 jupyter_client==5.3.4 bokeh==1.3.4 pycodestyle==2.5.0 apache_beam==2.15.0 + pandasql==0.7.3 ipython==7.8.0 matplotlib==3.0.3 ipykernel==5.1.2 jupyter_client==5.3.4 bokeh==1.3.4 panel==0.6.0 holoviews==1.12.3 pycodestyle==2.5.0 apache_beam==2.15.0 fi if [[ -n "$TENSORFLOW" ]] ; then