diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index 3168f04787e..9a61be637ec 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -40,7 +40,7 @@ public static void setUp() { Properties p = new Properties(); flink = new FlinkInterpreter(p); flink.open(); - context = new InterpreterContext(null, null, null, null, null, null, null, null); + context = new InterpreterContext(null, null, null, null, null, null, null, null, null); } @AfterClass diff --git a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java index c22080d57f0..c86fcf3371e 100644 --- a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java +++ b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java @@ -79,9 +79,9 @@ public void readTest() throws IOException { HiveInterpreter t = new HiveInterpreter(properties); t.open(); - assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message().contains("SCHEMA_NAME")); + assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null)).message().contains("SCHEMA_NAME")); assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", - t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message()); + t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message()); } @Test @@ -101,7 +101,7 @@ public void readTestWithConfiguration() throws IOException { t.open(); assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", - t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message()); + t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message()); } @Test @@ -117,13 +117,13 @@ public void jdbcRestart() throws IOException, SQLException, ClassNotFoundExcepti t.open(); InterpreterResult interpreterResult = - t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)); + t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)); assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message()); t.getConnection("default").close(); interpreterResult = - t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)); + t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)); assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message()); } @@ -139,7 +139,7 @@ public void test() throws IOException { HiveInterpreter t = new HiveInterpreter(properties); t.open(); - InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null); + InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null); //simple select test InterpreterResult result = t.interpret("select * from test_table", interpreterContext); diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java index f46b049ccb3..cf9808389e5 100644 --- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java +++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java @@ -40,7 +40,7 @@ public class IgniteInterpreterTest { private static final String HOST = "127.0.0.1:47500..47509"; private static final InterpreterContext INTP_CONTEXT = - new InterpreterContext(null, null, null, null, null, null, null, null); + new InterpreterContext(null, null, null, null, null, null, null, null, null); private IgniteInterpreter intp; private Ignite ignite; diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java index d6915ac1572..7574d673719 100644 --- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java +++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java @@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest { private static final String HOST = "127.0.0.1:47500..47509"; private static final InterpreterContext INTP_CONTEXT = - new InterpreterContext(null, null, null, null, null, null, null, null); + new InterpreterContext(null, null, null, null, null, null, null, null, null); private Ignite ignite; private IgniteSqlInterpreter intp; diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java index 7a753fa7cd5..606d4d965bc 100644 --- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java +++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java @@ -65,7 +65,7 @@ public void setUp() throws Exception { context = new InterpreterContext("note", "id", "title", "text", new HashMap(), new GUI(), new AngularObjectRegistry( intpGroup.getId(), null), - new LinkedList()); + new LinkedList(), null); } @After diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 8c4ba877f75..c5441ab9baf 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -73,8 +73,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand private GatewayServer gatewayServer; private DefaultExecutor executor; private int port; - private ByteArrayOutputStream outputStream; - private ByteArrayOutputStream errStream; + private SparkOutputStream outputStream; private BufferedWriter ins; private PipedInputStream in; private ByteArrayOutputStream input; @@ -173,7 +172,7 @@ private void createGatewayServerAndStartScript() { cmd.addArgument(Integer.toString(port), false); cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false); executor = new DefaultExecutor(); - outputStream = new ByteArrayOutputStream(); + outputStream = new SparkOutputStream(); PipedOutputStream ps = new PipedOutputStream(); in = null; try { @@ -274,7 +273,6 @@ public void setStatementsFinished(String out, boolean error) { statementError = error; statementFinishedNotifier.notify(); } - } boolean pythonScriptInitialized = false; @@ -287,6 +285,10 @@ public void onPythonScriptInitialized() { } } + public void appendOutput(String message) throws IOException { + outputStream.getInterpreterOutput().write(message); + } + @Override public InterpreterResult interpret(String st, InterpreterContext context) { SparkInterpreter sparkInterpreter = getSparkInterpreter(); @@ -300,7 +302,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) { + outputStream.toString()); } - outputStream.reset(); + outputStream.setInterpreterOutput(context.out); synchronized (pythonScriptInitializeNotifier) { long startTime = System.currentTimeMillis(); @@ -314,15 +316,24 @@ public InterpreterResult interpret(String st, InterpreterContext context) { } } + String errorMessage = ""; + try { + context.out.flush(); + errorMessage = new String(context.out.toByteArray()); + } catch (IOException e) { + throw new InterpreterException(e); + } + + if (pythonscriptRunning == false) { // python script failed to initialize and terminated return new InterpreterResult(Code.ERROR, "failed to start pyspark" - + outputStream.toString()); + + errorMessage); } if (pythonScriptInitialized == false) { // timeout. didn't get initialized message return new InterpreterResult(Code.ERROR, "pyspark is not responding " - + outputStream.toString()); + + errorMessage); } if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) { @@ -352,7 +363,14 @@ public InterpreterResult interpret(String st, InterpreterContext context) { if (statementError) { return new InterpreterResult(Code.ERROR, statementOutput); } else { - return new InterpreterResult(Code.SUCCESS, statementOutput); + + try { + context.out.flush(); + } catch (IOException e) { + throw new InterpreterException(e); + } + + return new InterpreterResult(Code.SUCCESS); } } @@ -389,8 +407,6 @@ public List completion(String buf, int cursor) { return new LinkedList(); } - outputStream.reset(); - pythonInterpretRequest = new PythonInterpretRequest(completionCommand, ""); statementOutput = null; diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index d9757919cb8..7ee6d7ccb60 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -17,9 +17,7 @@ package org.apache.zeppelin.spark; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.PrintStream; import java.io.PrintWriter; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -41,7 +39,6 @@ import org.apache.spark.scheduler.ActiveJob; import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.Pool; -import org.apache.spark.scheduler.SparkListener; import org.apache.spark.sql.SQLContext; import org.apache.spark.ui.jobs.JobProgressListener; import org.apache.zeppelin.interpreter.Interpreter; @@ -115,7 +112,7 @@ public class SparkInterpreter extends Interpreter { private SparkILoop interpreter; private SparkIMain intp; private SparkContext sc; - private ByteArrayOutputStream out; + private SparkOutputStream out; private SQLContext sqlc; private SparkDependencyResolver dep; private SparkJLineCompletion completor; @@ -129,7 +126,7 @@ public class SparkInterpreter extends Interpreter { public SparkInterpreter(Properties property) { super(property); - out = new ByteArrayOutputStream(); + out = new SparkOutputStream(); } public SparkInterpreter(Properties property, SparkContext sc) { @@ -452,10 +449,9 @@ public void open() { b.v_$eq(true); settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); - PrintStream printStream = new PrintStream(out); - /* spark interpreter */ this.interpreter = new SparkILoop(null, new PrintWriter(out)); + interpreter.settings_$eq(settings); interpreter.createInterpreter(); @@ -481,7 +477,7 @@ public void open() { dep = getDependencyResolver(); - z = new ZeppelinContext(sc, sqlc, null, dep, printStream, + z = new ZeppelinContext(sc, sqlc, null, dep, Integer.parseInt(getProperty("zeppelin.spark.maxResult"))); intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); @@ -489,7 +485,6 @@ public void open() { binder.put("sc", sc); binder.put("sqlc", sqlc); binder.put("z", z); - binder.put("out", printStream); intp.interpret("@transient val z = " + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]"); @@ -675,13 +670,13 @@ public InterpreterResult interpret(String[] lines, InterpreterContext context) { synchronized (this) { z.setGui(context.getGui()); sc.setJobGroup(getJobGroup(context), "Zeppelin", false); - InterpreterResult r = interpretInput(lines); + InterpreterResult r = interpretInput(lines, context); sc.clearJobGroup(); return r; } } - public InterpreterResult interpretInput(String[] lines) { + public InterpreterResult interpretInput(String[] lines, InterpreterContext context) { SparkEnv.set(env); // add print("") to make sure not finishing with comment @@ -692,8 +687,9 @@ public InterpreterResult interpretInput(String[] lines) { } linesToRun[lines.length] = "print(\"\")"; - Console.setOut((java.io.PrintStream) binder.get("out")); - out.reset(); + Console.setOut(context.out); + out.setInterpreterOutput(context.out); + context.out.clear(); Code r = null; String incomplete = ""; @@ -713,6 +709,7 @@ public InterpreterResult interpretInput(String[] lines) { res = intp.interpret(incomplete + s); } catch (Exception e) { sc.clearJobGroup(); + out.setInterpreterOutput(null); logger.info("Interpreter exception", e); return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); } @@ -721,7 +718,8 @@ public InterpreterResult interpretInput(String[] lines) { if (r == Code.ERROR) { sc.clearJobGroup(); - return new InterpreterResult(r, out.toString()); + out.setInterpreterOutput(null); + return new InterpreterResult(r, ""); } else if (r == Code.INCOMPLETE) { incomplete += s + "\n"; } else { @@ -730,9 +728,13 @@ public InterpreterResult interpretInput(String[] lines) { } if (r == Code.INCOMPLETE) { + sc.clearJobGroup(); + out.setInterpreterOutput(null); return new InterpreterResult(r, "Incomplete expression"); } else { - return new InterpreterResult(r, out.toString()); + sc.clearJobGroup(); + out.setInterpreterOutput(null); + return new InterpreterResult(Code.SUCCESS); } } diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java new file mode 100644 index 00000000000..98a4090b117 --- /dev/null +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.spark; + +import org.apache.zeppelin.interpreter.InterpreterOutput; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * InterpreterOutput can be attached / detached. + */ +public class SparkOutputStream extends OutputStream { + InterpreterOutput interpreterOutput; + + public SparkOutputStream() { + } + + public InterpreterOutput getInterpreterOutput() { + return interpreterOutput; + } + + public void setInterpreterOutput(InterpreterOutput interpreterOutput) { + this.interpreterOutput = interpreterOutput; + } + + @Override + public void write(int b) throws IOException { + if (interpreterOutput != null) { + interpreterOutput.write(b); + } + } + + @Override + public void write(byte [] b) throws IOException { + if (interpreterOutput != null) { + interpreterOutput.write(b); + } + } + + @Override + public void write(byte [] b, int offset, int len) throws IOException { + if (interpreterOutput != null) { + interpreterOutput.write(b, offset, len); + } + } + + @Override + public void close() throws IOException { + if (interpreterOutput != null) { + interpreterOutput.close(); + } + } + + @Override + public void flush() throws IOException { + if (interpreterOutput != null) { + interpreterOutput.flush(); + } + } +} diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index af806bf02f5..7c301d79b0f 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -21,6 +21,7 @@ import static scala.collection.JavaConversions.asJavaIterable; import static scala.collection.JavaConversions.collectionAsScalaIterable; +import java.io.IOException; import java.io.PrintStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -54,19 +55,17 @@ */ public class ZeppelinContext extends HashMap { private SparkDependencyResolver dep; - private PrintStream out; private InterpreterContext interpreterContext; private int maxResult; public ZeppelinContext(SparkContext sc, SQLContext sql, InterpreterContext interpreterContext, - SparkDependencyResolver dep, PrintStream printStream, + SparkDependencyResolver dep, int maxResult) { this.sc = sc; this.sqlContext = sql; this.interpreterContext = interpreterContext; this.dep = dep; - this.out = printStream; this.maxResult = maxResult; } @@ -273,10 +272,15 @@ public void show(Object o, int maxResult) { throw new InterpreterException("Can not road DataFrame/SchemaRDD class"); } - if (cls.isInstance(o)) { - out.print(showDF(sc, interpreterContext, o, maxResult)); - } else { - out.print(o.toString()); + + try { + if (cls.isInstance(o)) { + interpreterContext.out.write(showDF(sc, interpreterContext, o, maxResult)); + } else { + interpreterContext.out.write(o.toString()); + } + } catch (IOException e) { + throw new InterpreterException(e); } } diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 62f0a82bd46..7da0f4e8caf 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -36,10 +36,7 @@ def __init__(self): self.out = "" def write(self, message): - self.out = self.out + message - - def get(self): - return self.out + intp.appendOutput(message) def reset(self): self.out = "" @@ -224,7 +221,7 @@ def getCompletion(self, text_value): sc.setJobGroup(jobGroup, "Zeppelin") eval(compiledCode) - intp.setStatementsFinished(output.get(), False) + intp.setStatementsFinished("", False) except Py4JJavaError: excInnerError = traceback.format_exc() # format_tb() does not return the inner exception innerErrorStart = excInnerError.find("Py4JJavaError:") diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java index efa8fae2546..2b5613a2950 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java @@ -60,7 +60,7 @@ public void setUp() throws Exception { context = new InterpreterContext("note", "id", "title", "text", new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList()); + new LinkedList(), null); } @After diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index b6299786ead..778966f2ba9 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -28,10 +28,7 @@ import org.apache.spark.SparkContext; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterContextRunner; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.junit.After; import org.junit.Before; @@ -79,9 +76,21 @@ public void setUp() throws Exception { InterpreterGroup intpGroup = new InterpreterGroup(); context = new InterpreterContext("note", "id", "title", "text", - new HashMap(), new GUI(), new AngularObjectRegistry( - intpGroup.getId(), null), - new LinkedList()); + new HashMap(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList(), + new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + + } + })); } @After diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index 4688cf8347e..731eab61fb9 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -25,10 +25,7 @@ import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterContextRunner; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.InterpreterResult.Type; import org.junit.After; import org.junit.Before; @@ -69,7 +66,17 @@ public void setUp() throws Exception { } context = new InterpreterContext("note", "id", "title", "text", new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList()); + new LinkedList(), new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + + } + })); } @After diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java index 0417f9108bc..e3f6b59bc41 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -29,6 +29,7 @@ public class InterpreterContext { private static final ThreadLocal threadIC = new ThreadLocal(); + public final InterpreterOutput out; public static InterpreterContext get() { return threadIC.get(); @@ -58,7 +59,8 @@ public InterpreterContext(String noteId, Map config, GUI gui, AngularObjectRegistry angularObjectRegistry, - List runners + List runners, + InterpreterOutput out ) { this.noteId = noteId; this.paragraphId = paragraphId; @@ -68,6 +70,7 @@ public InterpreterContext(String noteId, this.gui = gui; this.angularObjectRegistry = angularObjectRegistry; this.runners = runners; + this.out = out; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java new file mode 100644 index 00000000000..42ebe485e1b --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.URL; +import java.util.LinkedList; +import java.util.List; + +/** + * InterpreterOutput is OutputStream that supposed to print content on notebook + * in addition to InterpreterResult which used to return from Interpreter.interpret(). + */ +public class InterpreterOutput extends OutputStream { + Logger logger = LoggerFactory.getLogger(InterpreterOutput.class); + private final int NEW_LINE_CHAR = '\n'; + + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + + private final List outList = new LinkedList(); + private InterpreterOutputChangeWatcher watcher; + private final InterpreterOutputListener flushListener; + private InterpreterResult.Type type = InterpreterResult.Type.TEXT; + private boolean firstWrite = true; + + public InterpreterOutput(InterpreterOutputListener flushListener) { + this.flushListener = flushListener; + clear(); + } + + public InterpreterOutput(InterpreterOutputListener flushListener, + InterpreterOutputChangeListener listener) throws IOException { + this.flushListener = flushListener; + clear(); + watcher = new InterpreterOutputChangeWatcher(listener); + watcher.start(); + } + + public InterpreterResult.Type getType() { + return type; + } + + public void setType(InterpreterResult.Type type) { + if (this.type != type) { + clear(); + flushListener.onUpdate(this, new byte[]{}); + this.type = type; + } + } + + public void clear() { + synchronized (outList) { + type = InterpreterResult.Type.TEXT; + buffer.reset(); + outList.clear(); + if (watcher != null) { + watcher.clear(); + } + } + } + + @Override + public void write(int b) throws IOException { + synchronized (outList) { + buffer.write(b); + if (b == NEW_LINE_CHAR) { + // first time use of this outputstream. + if (firstWrite) { + // clear the output on gui + flushListener.onUpdate(this, new byte[]{}); + firstWrite = false; + } + + flush(); + } + } + } + + private byte [] detectTypeFromLine(byte [] byteArray) { + // check output type directive + String line = new String(byteArray); + for (InterpreterResult.Type t : InterpreterResult.Type.values()) { + String typeString = '%' + t.name().toLowerCase(); + if ((typeString + "\n").equals(line)) { + setType(t); + byteArray = null; + break; + } else if (line.startsWith(typeString + " ")) { + setType(t); + byteArray = line.substring(typeString.length() + 1).getBytes(); + break; + } + } + + return byteArray; + } + + @Override + public void write(byte [] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte [] b, int off, int len) throws IOException { + synchronized (outList) { + for (int i = off; i < len; i++) { + write(b[i]); + } + } + } + + /** + * In dev mode, it monitors file and update ZeppelinServer + * @param file + * @throws IOException + */ + public void write(File file) throws IOException { + outList.add(file); + if (watcher != null) { + watcher.watch(file); + } + } + + public void write(String string) throws IOException { + write(string.getBytes()); + } + + /** + * write contents in the resource file in the classpath + * @param url + * @throws IOException + */ + public void write(URL url) throws IOException { + if ("file".equals(url.getProtocol())) { + write(new File(url.getPath())); + } else { + outList.add(url); + } + } + + public void writeResource(String resourceName) throws IOException { + // search file under resource dir first for dev mode + File mainResource = new File("./src/main/resources/" + resourceName); + File testResource = new File("./src/test/resources/" + resourceName); + if (mainResource.isFile()) { + write(mainResource); + } else if (testResource.isFile()) { + write(testResource); + } else { + // search from classpath + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + if (cl == null) { + cl = this.getClass().getClassLoader(); + } + if (cl == null) { + cl = ClassLoader.getSystemClassLoader(); + } + + write(cl.getResource(resourceName)); + } + } + + public byte[] toByteArray() throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + List all = new LinkedList(); + + synchronized (outList) { + all.addAll(outList); + } + + for (Object o : all) { + if (o instanceof File) { + File f = (File) o; + FileInputStream fin = new FileInputStream(f); + copyStream(fin, out); + fin.close(); + } else if (o instanceof byte[]) { + out.write((byte[]) o); + } else if (o instanceof Integer) { + out.write((int) o); + } else if (o instanceof URL) { + InputStream fin = ((URL) o).openStream(); + copyStream(fin, out); + fin.close(); + } else { + // can not handle the object + } + } + out.close(); + return out.toByteArray(); + } + + public void flush() throws IOException { + synchronized (outList) { + buffer.flush(); + byte[] bytes = buffer.toByteArray(); + bytes = detectTypeFromLine(bytes); + if (bytes != null) { + outList.add(bytes); + if (type == InterpreterResult.Type.TEXT) { + flushListener.onAppend(this, bytes); + } + } + buffer.reset(); + } + } + + private void copyStream(InputStream in, OutputStream out) throws IOException { + int bufferSize = 8192; + byte[] buffer = new byte[bufferSize]; + + while (true) { + int bytesRead = in.read(buffer); + if (bytesRead == -1) { + break; + } else { + out.write(buffer, 0, bytesRead); + } + } + } + + @Override + public void close() throws IOException { + flush(); + + if (watcher != null) { + watcher.clear(); + watcher.shutdown(); + } + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java new file mode 100644 index 00000000000..a639e0c6418 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +import java.io.File; + +/** + * InterpreterOutputChangeListener + */ +public interface InterpreterOutputChangeListener { + public void fileChanged(File file); + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java new file mode 100644 index 00000000000..5fe8237c257 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; +import static java.nio.file.StandardWatchEventKinds.OVERFLOW; +import java.io.File; +import java.io.IOException; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Watch the change for the development mode support + */ +public class InterpreterOutputChangeWatcher extends Thread { + Logger logger = LoggerFactory.getLogger(InterpreterOutputChangeWatcher.class); + + private WatchService watcher; + private final List watchFiles = new LinkedList(); + private final Map watchKeys = new HashMap(); + private InterpreterOutputChangeListener listener; + private boolean stop; + + public InterpreterOutputChangeWatcher(InterpreterOutputChangeListener listener) + throws IOException { + watcher = FileSystems.getDefault().newWatchService(); + this.listener = listener; + } + + public void watch(File file) throws IOException { + String dirString; + if (file.isFile()) { + dirString = file.getParentFile().getAbsolutePath(); + } else { + throw new IOException(file.getName() + " is not a file"); + } + + if (dirString == null) { + dirString = "/"; + } + + Path dir = FileSystems.getDefault().getPath(dirString); + logger.info("watch " + dir); + WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); + synchronized (watchKeys) { + watchKeys.put(key, new File(dirString)); + watchFiles.add(file); + } + } + + public void clear() { + synchronized (watchKeys) { + for (WatchKey key : watchKeys.keySet()) { + key.cancel(); + + } + watchKeys.clear(); + watchFiles.clear(); + } + } + + public void shutdown() throws IOException { + stop = true; + clear(); + watcher.close(); + } + + public void run() { + while (!stop) { + WatchKey key = null; + try { + key = watcher.poll(1, TimeUnit.SECONDS); + } catch (InterruptedException | ClosedWatchServiceException e) { + break; + } + + if (key == null) { + continue; + } + for (WatchEvent event : key.pollEvents()) { + WatchEvent.Kind kind = event.kind(); + if (kind == OVERFLOW) { + continue; + } + WatchEvent ev = (WatchEvent) event; + Path filename = ev.context(); + // search for filename + synchronized (watchKeys) { + for (File f : watchFiles) { + if (f.getName().compareTo(filename.toString()) == 0) { + File changedFile; + if (filename.isAbsolute()) { + changedFile = new File(filename.toString()); + } else { + changedFile = new File(watchKeys.get(key), filename.toString()); + } + logger.info("File change detected " + changedFile.getAbsolutePath()); + if (listener != null) { + listener.fileChanged(changedFile); + } + } + } + } + } + + boolean valid = key.reset(); + if (!valid) { + break; + } + } + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java new file mode 100644 index 00000000000..bdb262ae294 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +/** + * Listen InterpreterOutput buffer flush + */ +public interface InterpreterOutputListener { + /** + * called when newline is detected + * @param line + */ + public void onAppend(InterpreterOutput out, byte[] line); + + /** + * when entire output is updated. eg) after detecting new display system + * @param output + */ + public void onUpdate(InterpreterOutput out, byte[] output); +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java index 593cfc76ce9..d2137961464 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java @@ -146,4 +146,8 @@ public InterpreterResult type(Type type) { this.type = type; return this; } + + public String toString() { + return "%" + type.name().toLowerCase() + " " + msg; + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 455156ce113..d2a24e85a39 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -48,6 +48,7 @@ * */ public class RemoteInterpreter extends Interpreter { + private final RemoteInterpreterProcessListener remoteInterpreterProcessListener; Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class); Gson gson = new Gson(); private String interpreterRunner; @@ -60,32 +61,35 @@ public class RemoteInterpreter extends Interpreter { private int connectTimeout; public RemoteInterpreter(Properties property, - String className, - String interpreterRunner, - String interpreterPath, - int connectTimeout) { + String className, + String interpreterRunner, + String interpreterPath, + int connectTimeout, + RemoteInterpreterProcessListener remoteInterpreterProcessListener) { super(property); - this.className = className; initialized = false; this.interpreterRunner = interpreterRunner; this.interpreterPath = interpreterPath; env = new HashMap(); this.connectTimeout = connectTimeout; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; } public RemoteInterpreter(Properties property, - String className, - String interpreterRunner, - String interpreterPath, - Map env, - int connectTimeout) { + String className, + String interpreterRunner, + String interpreterPath, + Map env, + int connectTimeout, + RemoteInterpreterProcessListener remoteInterpreterProcessListener) { super(property); this.className = className; this.interpreterRunner = interpreterRunner; this.interpreterPath = interpreterPath; this.env = env; this.connectTimeout = connectTimeout; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; } @Override @@ -103,7 +107,8 @@ public RemoteInterpreterProcess getInterpreterProcess() { if (intpGroup.getRemoteInterpreterProcess() == null) { // create new remote process RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess( - interpreterRunner, interpreterPath, env, connectTimeout); + interpreterRunner, interpreterPath, env, connectTimeout, + remoteInterpreterProcessListener); intpGroup.setRemoteInterpreterProcess(remoteProcess); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index c39e0feb0af..61862053fce 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -18,29 +18,35 @@ package org.apache.zeppelin.interpreter.remote; import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import org.apache.thrift.TException; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterOutputListener; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; + /** * */ public class RemoteInterpreterEventPoller extends Thread { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class); + private final RemoteInterpreterProcessListener listener; private volatile boolean shutdown; private RemoteInterpreterProcess interpreterProcess; private InterpreterGroup interpreterGroup; - public RemoteInterpreterEventPoller() { + public RemoteInterpreterEventPoller(RemoteInterpreterProcessListener listener) { + this.listener = listener; shutdown = false; } @@ -110,6 +116,24 @@ public void run() { interpreterProcess.getInterpreterContextRunnerPool().run( runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId()); + } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) { + // on output append + Map outputAppend = gson.fromJson( + event.getData(), new TypeToken>() {}.getType()); + String noteId = outputAppend.get("noteId"); + String paragraphId = outputAppend.get("paragraphId"); + String outputToAppend = outputAppend.get("data"); + + listener.onOutputAppend(noteId, paragraphId, outputToAppend); + } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) { + // on output update + Map outputAppend = gson.fromJson( + event.getData(), new TypeToken>() {}.getType()); + String noteId = outputAppend.get("noteId"); + String paragraphId = outputAppend.get("paragraphId"); + String outputToUpdate = outputAppend.get("data"); + + listener.onOutputUpdated(noteId, paragraphId, outputToUpdate); } logger.debug("Event from remoteproceess {}", event.getType()); } catch (Exception e) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 2c195dcd106..56b54850c7a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -53,10 +53,11 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { private int connectTimeout; public RemoteInterpreterProcess(String intpRunner, - String intpDir, - Map env, - int connectTimeout) { - this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(), connectTimeout); + String intpDir, + Map env, + int connectTimeout, + RemoteInterpreterProcessListener listener) { + this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(listener), connectTimeout); } RemoteInterpreterProcess(String intpRunner, diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java new file mode 100644 index 00000000000..da6ac63c51d --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote; + +/** + * Event from remoteInterpreterProcess + */ +public interface RemoteInterpreterProcessListener { + public void onOutputAppend(String noteId, String paragraphId, String output); + public void onOutputUpdated(String noteId, String paragraphId, String output); +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index a8da8c02d48..728d2108cf4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -35,15 +35,8 @@ import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.ClassloaderInterpreter; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterContextRunner; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; @@ -300,7 +293,26 @@ protected Object jobRun() throws Throwable { try { InterpreterContext.set(context); InterpreterResult result = interpreter.interpret(script, context); - return result; + + // data from context.out is prepended to InterpreterResult if both defined + String message = ""; + + context.out.flush(); + InterpreterResult.Type outputType = context.out.getType(); + byte[] interpreterOutput = context.out.toByteArray(); + context.out.clear(); + + if (interpreterOutput != null && interpreterOutput.length > 0) { + message = new String(interpreterOutput); + } + + String interpreterResultMessage = result.message(); + if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) { + message += interpreterResultMessage; + return new InterpreterResult(result.code(), result.type(), message); + } else { + return new InterpreterResult(result.code(), outputType, message); + } } finally { InterpreterContext.remove(); } @@ -351,7 +363,8 @@ public List completion(String className, String buf, int cursor) throws private InterpreterContext convert(RemoteInterpreterContext ric) { List contextRunners = new LinkedList(); List runners = gson.fromJson(ric.getRunners(), - new TypeToken>(){}.getType()); + new TypeToken>() { + }.getType()); for (InterpreterContextRunner r : runners) { contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId())); @@ -366,7 +379,40 @@ private InterpreterContext convert(RemoteInterpreterContext ric) { new TypeToken>() {}.getType()), gson.fromJson(ric.getGui(), GUI.class), interpreterGroup.getAngularObjectRegistry(), - contextRunners); + contextRunners, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId())); + } + + + private InterpreterOutput createInterpreterOutput(final String noteId, final String paragraphId) { + return new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + Map appendOutput = new HashMap(); + appendOutput.put("noteId", noteId); + appendOutput.put("paragraphId", paragraphId); + appendOutput.put("data", new String(line)); + + Gson gson = new Gson(); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.OUTPUT_APPEND, + gson.toJson(appendOutput))); + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + Map appendOutput = new HashMap(); + appendOutput.put("noteId", noteId); + appendOutput.put("paragraphId", paragraphId); + appendOutput.put("data", new String(output)); + + Gson gson = new Gson(); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.OUTPUT_UPDATE, + gson.toJson(appendOutput))); + } + }); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java index a55d5de5df9..175f482cadb 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4") public class RemoteInterpreterContext implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java index 96a49b512a4..79203fbec29 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4") public class RemoteInterpreterEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index 9a7d142b7bb..d6503183d06 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -33,7 +33,9 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { ANGULAR_OBJECT_ADD(2), ANGULAR_OBJECT_UPDATE(3), ANGULAR_OBJECT_REMOVE(4), - RUN_INTERPRETER_CONTEXT_RUNNER(5); + RUN_INTERPRETER_CONTEXT_RUNNER(5), + OUTPUT_APPEND(6), + OUTPUT_UPDATE(7); private final int value; @@ -64,6 +66,10 @@ public static RemoteInterpreterEventType findByValue(int value) { return ANGULAR_OBJECT_REMOVE; case 5: return RUN_INTERPRETER_CONTEXT_RUNNER; + case 6: + return OUTPUT_APPEND; + case 7: + return OUTPUT_UPDATE; default: return null; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java index 36c0f252eb9..cc50f9c89fe 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4") public class RemoteInterpreterResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java index 6e6730ea9be..738b453d0e2 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4") public class RemoteInterpreterService { public interface Iface { diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 144784c539e..65fd0a7027e 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -42,7 +42,9 @@ enum RemoteInterpreterEventType { ANGULAR_OBJECT_ADD = 2, ANGULAR_OBJECT_UPDATE = 3, ANGULAR_OBJECT_REMOVE = 4, - RUN_INTERPRETER_CONTEXT_RUNNER = 5 + RUN_INTERPRETER_CONTEXT_RUNNER = 5, + OUTPUT_APPEND = 6, + OUTPUT_UPDATE = 7 } struct RemoteInterpreterEvent { diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java index 080bdaa4337..9c2732dd2f8 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java @@ -27,7 +27,7 @@ public class InterpreterContextTest { public void testThreadLocal() { assertNull(InterpreterContext.get()); - InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null)); + InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null)); assertNotNull(InterpreterContext.get()); InterpreterContext.remove(); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java new file mode 100644 index 00000000000..e37680905bb --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChangeListener { + private File tmpDir; + private File fileChanged; + private int numChanged; + private InterpreterOutputChangeWatcher watcher; + + @Before + public void setUp() throws Exception { + watcher = new InterpreterOutputChangeWatcher(this); + watcher.start(); + + tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis()); + tmpDir.mkdirs(); + fileChanged = null; + numChanged = 0; + } + + @After + public void tearDown() throws Exception { + watcher.shutdown(); + delete(tmpDir); + } + + private void delete(File file){ + if(file.isFile()) file.delete(); + else if(file.isDirectory()){ + File [] files = file.listFiles(); + if(files!=null && files.length>0){ + for(File f : files){ + delete(f); + } + } + file.delete(); + } + } + + + @Test + public void test() throws IOException, InterruptedException { + assertNull(fileChanged); + assertEquals(0, numChanged); + + Thread.sleep(1000); + // create new file + File file1 = new File(tmpDir, "test1"); + file1.createNewFile(); + + File file2 = new File(tmpDir, "test2"); + file2.createNewFile(); + + watcher.watch(file1); + Thread.sleep(1000); + + FileOutputStream out1 = new FileOutputStream(file1); + out1.write(1); + out1.close(); + + FileOutputStream out2 = new FileOutputStream(file2); + out2.write(1); + out2.close(); + + synchronized (this) { + wait(30*1000); + } + + assertNotNull(fileChanged); + assertEquals(1, numChanged); + } + + + @Override + public void fileChanged(File file) { + fileChanged = file; + numChanged++; + + synchronized(this) { + notify(); + } + } + +} \ No newline at end of file diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java new file mode 100644 index 00000000000..f8f4809a5eb --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class InterpreterOutputTest implements InterpreterOutputListener { + private InterpreterOutput out; + int numAppendEvent; + int numUpdateEvent; + + @Before + public void setUp() { + out = new InterpreterOutput(this); + numAppendEvent = 0; + numUpdateEvent = 0; + } + + @After + public void tearDown() throws IOException { + out.close(); + } + + @Test + public void testDetectNewline() throws IOException { + out.write("hello\nworld"); + assertEquals("hello\n", new String(out.toByteArray())); + assertEquals(1, numAppendEvent); + assertEquals(1, numUpdateEvent); + + out.write("\n"); + assertEquals("hello\nworld\n", new String(out.toByteArray())); + assertEquals(2, numAppendEvent); + assertEquals(1, numUpdateEvent); + } + + @Test + public void testFlush() throws IOException { + out.write("hello\nworld"); + assertEquals("hello\n", new String(out.toByteArray())); + assertEquals(1, numAppendEvent); + assertEquals(1, numUpdateEvent); + + out.flush(); + assertEquals("hello\nworld", new String(out.toByteArray())); + assertEquals(2, numAppendEvent); + assertEquals(1, numUpdateEvent); + + out.clear(); + out.write("%html div"); + assertEquals("", new String(out.toByteArray())); + assertEquals(InterpreterResult.Type.TEXT, out.getType()); + + out.flush(); + out.write("%html div"); + assertEquals("div", new String(out.toByteArray())); + assertEquals(InterpreterResult.Type.HTML, out.getType()); + } + + @Test + public void testType() throws IOException { + // default output stream type is TEXT + out.write("Text\n"); + assertEquals(InterpreterResult.Type.TEXT, out.getType()); + assertEquals("Text\n", new String(out.toByteArray())); + assertEquals(1, numAppendEvent); + assertEquals(1, numUpdateEvent); + + // change type + out.write("%html\n"); + assertEquals(InterpreterResult.Type.HTML, out.getType()); + assertEquals("", new String(out.toByteArray())); + assertEquals(1, numAppendEvent); + assertEquals(2, numUpdateEvent); + + // none TEXT type output stream does not generate append event + out.write("
html
\n"); + assertEquals(InterpreterResult.Type.HTML, out.getType()); + assertEquals(1, numAppendEvent); + assertEquals(2, numUpdateEvent); + assertEquals("
html
\n", new String(out.toByteArray())); + + // change type to text again + out.write("%text hello\n"); + assertEquals(InterpreterResult.Type.TEXT, out.getType()); + assertEquals(2, numAppendEvent); + assertEquals(3, numUpdateEvent); + assertEquals("hello\n", new String(out.toByteArray())); + } + + @Test + public void testType2() throws IOException { + out.write("%html\nHello"); + assertEquals(InterpreterResult.Type.HTML, out.getType()); + } + + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + numAppendEvent++; + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + numUpdateEvent++; + } +} \ No newline at end of file diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java index 007730afc4a..d7ab9e8fbe0 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java @@ -105,4 +105,9 @@ public void testComplexMagicData() { "123\n", result.message()); } + @Test + public void testToString() { + assertEquals("%html hello", new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html hello").toString()); + } + } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java index 29a1fb11972..906878d2b7e 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java @@ -64,12 +64,13 @@ public void setUp() throws Exception { Properties p = new Properties(); intp = new RemoteInterpreter( - p, - MockInterpreterAngular.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 + p, + MockInterpreterAngular.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + null ); intpGroup.add(intp); @@ -83,7 +84,7 @@ public void setUp() throws Exception { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList()); + new LinkedList(), null); intp.open(); } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java new file mode 100644 index 00000000000..623a0379496 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + + +/** + * Test for remote interpreter output stream + */ +public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener { + private InterpreterGroup intpGroup; + private HashMap env; + + @Before + public void setUp() throws Exception { + intpGroup = new InterpreterGroup(); + env = new HashMap(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + } + + @After + public void tearDown() throws Exception { + intpGroup.close(); + intpGroup.destroy(); + } + + private RemoteInterpreter createMockInterpreter() { + RemoteInterpreter intp = new RemoteInterpreter( + new Properties(), + MockInterpreterOutputStream.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + this); + + + intpGroup.add(intp); + intp.setInterpreterGroup(intpGroup); + return intp; + } + + private InterpreterContext createInterpreterContext() { + return new InterpreterContext( + "noteId", + "id", + "title", + "text", + new HashMap(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList(), null); + } + + @Test + public void testInterpreterResultOnly() { + RemoteInterpreter intp = createMockInterpreter(); + InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("staticresult", ret.message()); + + ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("staticresult2", ret.message()); + + ret = intp.interpret("ERROR::staticresult3", createInterpreterContext()); + assertEquals(InterpreterResult.Code.ERROR, ret.code()); + assertEquals("staticresult3", ret.message()); + } + + @Test + public void testInterpreterOutputStreamOnly() { + RemoteInterpreter intp = createMockInterpreter(); + InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("streamresult", ret.message()); + + ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext()); + assertEquals(InterpreterResult.Code.ERROR, ret.code()); + assertEquals("streamresult2", ret.message()); + } + + @Test + public void testInterpreterResultOutputStreamMixed() { + RemoteInterpreter intp = createMockInterpreter(); + InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("streamstatic", ret.message()); + } + + @Test + public void testOutputType() { + RemoteInterpreter intp = createMockInterpreter(); + + InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext()); + assertEquals(InterpreterResult.Type.HTML, ret.type()); + assertEquals("hello", ret.message()); + + ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext()); + assertEquals(InterpreterResult.Type.HTML, ret.type()); + assertEquals("hello", ret.message()); + + ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext()); + assertEquals(InterpreterResult.Type.ANGULAR, ret.type()); + assertEquals("helloworld", ret.message()); + } + + @Override + public void onOutputAppend(String noteId, String paragraphId, String output) { + + } + + @Override + public void onOutputUpdated(String noteId, String paragraphId, String output) { + + } +} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java index ea5397ed6d9..abee5b80207 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java @@ -34,7 +34,7 @@ public void testStartStop() { InterpreterGroup intpGroup = new InterpreterGroup(); RemoteInterpreterProcess rip = new RemoteInterpreterProcess( "../bin/interpreter.sh", "nonexists", new HashMap(), - 10 * 1000); + 10 * 1000, null); assertFalse(rip.isRunning()); assertEquals(0, rip.referenceCount()); assertEquals(1, rip.reference(intpGroup)); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index c938ff36622..034a676c2ed 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -63,30 +63,38 @@ public void tearDown() throws Exception { intpGroup.destroy(); } + private RemoteInterpreter createMockInterpreterA(Properties p) { + return new RemoteInterpreter( + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + null); + } + + private RemoteInterpreter createMockInterpreterB(Properties p) { + return new RemoteInterpreter( + p, + MockInterpreterB.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + null); + } + @Test public void testRemoteInterperterCall() throws TTransportException, IOException { Properties p = new Properties(); - RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + RemoteInterpreter intpA = createMockInterpreterA(p); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); - RemoteInterpreter intpB = new RemoteInterpreter( - p, - MockInterpreterB.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + RemoteInterpreter intpB = createMockInterpreterB(p); intpGroup.add(intpB); intpB.setInterpreterGroup(intpGroup); @@ -113,7 +121,7 @@ public void testRemoteInterperterCall() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); intpB.open(); assertEquals(2, process.referenceCount()); @@ -131,14 +139,7 @@ public void testRemoteInterperterCall() throws TTransportException, IOException public void testRemoteInterperterErrorStatus() throws TTransportException, IOException { Properties p = new Properties(); - RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + RemoteInterpreter intpA = createMockInterpreterA(p); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); @@ -153,7 +154,7 @@ public void testRemoteInterperterErrorStatus() throws TTransportException, IOExc new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); assertEquals(Code.ERROR, ret.code()); } @@ -163,24 +164,26 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException Properties p = new Properties(); RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + null ); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); RemoteInterpreter intpB = new RemoteInterpreter( - p, - MockInterpreterB.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 + p, + MockInterpreterB.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + null ); intpGroup.add(intpB); @@ -199,7 +202,7 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); assertEquals("500", ret.message()); ret = intpB.interpret("500", @@ -211,7 +214,7 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); assertEquals("1000", ret.message()); long end = System.currentTimeMillis(); assertTrue(end - start >= 1000); @@ -225,26 +228,12 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException { Properties p = new Properties(); - final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + final RemoteInterpreter intpA = createMockInterpreterA(p); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); - final RemoteInterpreter intpB = new RemoteInterpreter( - p, - MockInterpreterB.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + final RemoteInterpreter intpB = createMockInterpreterB(p); intpGroup.add(intpB); intpB.setInterpreterGroup(intpGroup); @@ -276,7 +265,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); } @Override @@ -310,7 +299,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); } @Override @@ -340,14 +329,7 @@ protected boolean jobAbort() { public void testRunOrderPreserved() throws InterruptedException { Properties p = new Properties(); - final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + final RemoteInterpreter intpA = createMockInterpreterA(p); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); @@ -382,7 +364,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); synchronized (results) { results.add(ret.message()); @@ -421,14 +403,7 @@ public void testRunParallel() throws InterruptedException { Properties p = new Properties(); p.put("parallel", "true"); - final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + final RemoteInterpreter intpA = createMockInterpreterA(p); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); @@ -466,7 +441,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); synchronized (results) { results.add(ret.message()); @@ -501,14 +476,7 @@ protected boolean jobAbort() { public void testInterpreterGroupResetBeforeProcessStarts() { Properties p = new Properties(); - RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + RemoteInterpreter intpA = createMockInterpreterA(p); intpA.setInterpreterGroup(intpGroup); RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); @@ -523,14 +491,7 @@ public void testInterpreterGroupResetBeforeProcessStarts() { public void testInterpreterGroupResetAfterProcessFinished() { Properties p = new Properties(); - RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + RemoteInterpreter intpA = createMockInterpreterA(p); intpA.setInterpreterGroup(intpGroup); RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); @@ -548,14 +509,7 @@ public void testInterpreterGroupResetAfterProcessFinished() { public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException { Properties p = new Properties(); - final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + final RemoteInterpreter intpA = createMockInterpreterA(p); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); @@ -585,7 +539,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); } @Override @@ -616,26 +570,12 @@ protected boolean jobAbort() { public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() { Properties p = new Properties(); - RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + RemoteInterpreter intpA = createMockInterpreterA(p); intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); - RemoteInterpreter intpB = new RemoteInterpreter( - p, - MockInterpreterB.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 - ); + RemoteInterpreter intpB = createMockInterpreterB(p); intpGroup.add(intpB); intpB.setInterpreterGroup(intpGroup); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java new file mode 100644 index 00000000000..bc1859f9647 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote.mock; + +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +/** + * MockInterpreter to test outputstream + */ +public class MockInterpreterOutputStream extends Interpreter { + static { + Interpreter.register( + "interpreterOutputStream", + "group1", + MockInterpreterA.class.getName(), + new InterpreterPropertyBuilder().build()); + + } + + private String lastSt; + + public MockInterpreterOutputStream(Properties property) { + super(property); + } + + @Override + public void open() { + //new RuntimeException().printStackTrace(); + } + + @Override + public void close() { + } + + public String getLastStatement() { + return lastSt; + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] ret = st.split(":"); + try { + if (ret[1] != null) { + context.out.write(ret[1]); + } + } catch (IOException e) { + throw new InterpreterException(e); + } + return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ? + ret[2] : ""); + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List completion(String buf, int cursor) { + return null; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); + } +} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index d17df4f1406..05bc6763363 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -64,12 +64,13 @@ public void test() throws Exception { env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + null ); intpGroup.add(intpA); @@ -103,7 +104,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); return "1000"; } @@ -147,12 +148,13 @@ public void testAbortOnPending() throws Exception { env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); final RemoteInterpreter intpA = new RemoteInterpreter( - p, - MockInterpreterA.class.getName(), - new File("../bin/interpreter.sh").getAbsolutePath(), - "fake", - env, - 10 * 1000 + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000, + null ); intpGroup.add(intpA); @@ -173,7 +175,7 @@ public void testAbortOnPending() throws Exception { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList()); + new LinkedList(), null); @Override public int progress() { @@ -209,7 +211,7 @@ protected boolean jobAbort() { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList()); + new LinkedList(), null); @Override public int progress() { diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 9e7a97cf862..dff75c7d729 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -81,7 +81,8 @@ public ZeppelinServer() throws Exception { this.depResolver = new DependencyResolver(conf.getString(ConfVars.ZEPPELIN_DEP_LOCALREPO)); this.schedulerFactory = new SchedulerFactory(); - this.replFactory = new InterpreterFactory(conf, notebookWsServer, depResolver); + this.replFactory = new InterpreterFactory(conf, notebookWsServer, + notebookWsServer, depResolver); this.notebookRepo = new NotebookRepoSync(conf); this.notebookIndex = new LuceneSearch(); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java index 0142df2c6f7..4296e93d912 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java @@ -93,6 +93,8 @@ public static enum OP { PARAGRAPH_REMOVE, PARAGRAPH_CLEAR_OUTPUT, + PARAGRAPH_APPEND_OUTPUT, // [s-c] append output + PARAGRAPH_UPDATE_OUTPUT, // [s-c] update (replace) output PING, ANGULAR_OBJECT_UPDATE, // [s-c] add/update angular object diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 3dfdca3cd8d..64698fcdfd4 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -30,12 +30,11 @@ import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.display.Input; +import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; -import org.apache.zeppelin.notebook.JobListenerFactory; -import org.apache.zeppelin.notebook.Note; -import org.apache.zeppelin.notebook.Notebook; -import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; +import org.apache.zeppelin.notebook.*; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; @@ -57,7 +56,8 @@ * */ public class NotebookServer extends WebSocketServlet implements - NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener { + NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener, + RemoteInterpreterProcessListener { private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); Gson gson = new Gson(); final Map> noteSocketMap = new HashMap<>(); @@ -748,15 +748,47 @@ private void runParagraph(NotebookSocket conn, Notebook notebook, } } + /** + * This callback is for the paragraph that runs on ZeppelinServer + * @param noteId + * @param paragraphId + * @param output output to append + */ + @Override + public void onOutputAppend(String noteId, String paragraphId, String output) { + Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT) + .put("noteId", noteId) + .put("paragraphId", paragraphId) + .put("data", output); + Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId); + broadcast(noteId, msg); + } + + /** + * This callback is for the paragraph that runs on ZeppelinServer + * @param noteId + * @param paragraphId + * @param output output to update (replace) + */ + @Override + public void onOutputUpdated(String noteId, String paragraphId, String output) { + Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT) + .put("noteId", noteId) + .put("paragraphId", paragraphId) + .put("data", output); + Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId); + broadcast(noteId, msg); + } + /** * Need description here. * */ - public static class ParagraphJobListener implements JobListener { + public static class ParagraphListenerImpl implements ParagraphJobListener { private NotebookServer notebookServer; private Note note; - public ParagraphJobListener(NotebookServer notebookServer, Note note) { + public ParagraphListenerImpl(NotebookServer notebookServer, Note note) { this.notebookServer = notebookServer; this.note = note; } @@ -791,11 +823,43 @@ public void afterStatusChange(Job job, Status before, Status after) { } notebookServer.broadcastNote(note); } + + /** + * This callback is for praragraph that runs on RemoteInterpreterProcess + * @param paragraph + * @param out + * @param output + */ + @Override + public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) { + Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT) + .put("noteId", paragraph.getNote().getId()) + .put("paragraphId", paragraph.getId()) + .put("data", output); + + notebookServer.broadcast(paragraph.getNote().getId(), msg); + } + + /** + * This callback is for paragraph that runs on RemoteInterpreterProcess + * @param paragraph + * @param out + * @param output + */ + @Override + public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) { + Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT) + .put("noteId", paragraph.getNote().getId()) + .put("paragraphId", paragraph.getId()) + .put("data", output); + + notebookServer.broadcast(paragraph.getNote().getId(), msg); + } } @Override - public JobListener getParagraphJobListener(Note note) { - return new ParagraphJobListener(this, note); + public ParagraphJobListener getParagraphJobListener(Note note) { + return new ParagraphListenerImpl(this, note); } private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException { diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html b/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html index 644b8e044b3..39eb29e2334 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html @@ -23,24 +23,22 @@ ng-bind-html="paragraph.result.comment"> -
-
+ ng-if="getResultType() == 'TEXT'">
+ ng-if="getResultType() == 'HTML'">
+ ng-if="getResultType() == 'ANGULAR'">
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index 6a70f48ee3c..2b4c2b2567e 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -54,11 +54,13 @@ angular.module('zeppelinWebApp') $scope.renderHtml(); } else if ($scope.getResultType() === 'ANGULAR') { $scope.renderAngular(); + } else if ($scope.getResultType() === 'TEXT') { + $scope.renderText(); } }; - $scope.renderHtml = function() { - var retryRenderer = function() { + $scope.renderHtml = function() { + var retryRenderer = function() { if (angular.element('#p' + $scope.paragraph.id + '_html').length) { try { angular.element('#p' + $scope.paragraph.id + '_html').html($scope.paragraph.result.msg); @@ -93,6 +95,42 @@ angular.module('zeppelinWebApp') $timeout(retryRenderer); }; + $scope.renderText = function() { + var retryRenderer = function() { + + var textEl = angular.element('#p' + $scope.paragraph.id + '_text'); + if (textEl.length) { + // clear all lines before render + $scope.clearTextOutput(); + + if ($scope.paragraph.result && $scope.paragraph.result.msg) { + $scope.appendTextOutput($scope.paragraph.result.msg); + } + } else { + $timeout(retryRenderer, 10); + } + }; + $timeout(retryRenderer); + }; + + $scope.clearTextOutput = function() { + var textEl = angular.element('#p' + $scope.paragraph.id + '_text'); + if (textEl.length) { + textEl.children().remove(); + } + }; + + $scope.appendTextOutput = function(msg) { + var textEl = angular.element('#p' + $scope.paragraph.id + '_text'); + if (textEl.length) { + var lines = msg.split('\n'); + for (var i=0; i < lines.length; i++) { + textEl.append(angular.element('
').text(lines[i])); + } + } + }; + + var initializeDefault = function() { var config = $scope.paragraph.config; @@ -156,6 +194,10 @@ angular.module('zeppelinWebApp') } }); + var isEmpty = function (object) { + return !object; + }; + // TODO: this may have impact on performance when there are many paragraphs in a note. $scope.$on('updateParagraph', function(event, data) { if (data.paragraph.id === $scope.paragraph.id && @@ -166,6 +208,7 @@ angular.module('zeppelinWebApp') data.paragraph.status !== $scope.paragraph.status || data.paragraph.jobName !== $scope.paragraph.jobName || data.paragraph.title !== $scope.paragraph.title || + isEmpty(data.paragraph.result) !== isEmpty($scope.paragraph.result) || data.paragraph.errorMessage !== $scope.paragraph.errorMessage || !angular.equals(data.paragraph.settings, $scope.paragraph.settings) || !angular.equals(data.paragraph.config, $scope.paragraph.config)) @@ -175,7 +218,8 @@ angular.module('zeppelinWebApp') var newType = $scope.getResultType(data.paragraph); var oldGraphMode = $scope.getGraphMode(); var newGraphMode = $scope.getGraphMode(data.paragraph); - var resultRefreshed = (data.paragraph.dateFinished !== $scope.paragraph.dateFinished); + var resultRefreshed = (data.paragraph.dateFinished !== $scope.paragraph.dateFinished) || isEmpty(data.paragraph.result) !== isEmpty($scope.paragraph.result); + var statusChanged = (data.paragraph.status !== $scope.paragraph.status); //console.log("updateParagraph oldData %o, newData %o. type %o -> %o, mode %o -> %o", $scope.paragraph, data, oldType, newType, oldGraphMode, newGraphMode); @@ -234,6 +278,8 @@ angular.module('zeppelinWebApp') $scope.renderHtml(); } else if (newType === 'ANGULAR' && resultRefreshed) { $scope.renderAngular(); + } else if (newType === 'TEXT' && resultRefreshed) { + $scope.renderText(); } if (statusChanged || resultRefreshed) { @@ -252,6 +298,19 @@ angular.module('zeppelinWebApp') }); + $scope.$on('appendParagraphOutput', function(event, data) { + if ($scope.paragraph.id === data.paragraphId) { + $scope.appendTextOutput(data.data); + } + }); + + $scope.$on('updateParagraphOutput', function(event, data) { + if ($scope.paragraph.id === data.paragraphId) { + $scope.clearTextOutput(data.data); + $scope.appendTextOutput(data.data); + } + }); + $scope.isRunning = function() { if ($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING') { return true; diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js index bb99d562709..800d450d50d 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js +++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js @@ -54,6 +54,10 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope, $rootScope.$broadcast('setNoteMenu', data.notes); } else if (op === 'PARAGRAPH') { $rootScope.$broadcast('updateParagraph', data); + } else if (op === 'PARAGRAPH_APPEND_OUTPUT') { + $rootScope.$broadcast('appendParagraphOutput', data); + } else if (op === 'PARAGRAPH_UPDATE_OUTPUT') { + $rootScope.$broadcast('updateParagraphOutput', data); } else if (op === 'PROGRESS') { $rootScope.$broadcast('updateProgress', data); } else if (op === 'COMPLETION_LIST') { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 4ff0cc3ad59..039d970a207 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -29,6 +29,7 @@ import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.slf4j.Logger; @@ -65,25 +66,30 @@ public class InterpreterFactory { private InterpreterOption defaultOption; AngularObjectRegistryListener angularObjectRegistryListener; + private final RemoteInterpreterProcessListener remoteInterpreterProcessListener; DependencyResolver depResolver; public InterpreterFactory(ZeppelinConfiguration conf, AngularObjectRegistryListener angularObjectRegistryListener, + RemoteInterpreterProcessListener remoteInterpreterProcessListener, DependencyResolver depResolver) throws InterpreterException, IOException { - this(conf, new InterpreterOption(true), angularObjectRegistryListener, depResolver); + this(conf, new InterpreterOption(true), angularObjectRegistryListener, + remoteInterpreterProcessListener, depResolver); } public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption, AngularObjectRegistryListener angularObjectRegistryListener, + RemoteInterpreterProcessListener remoteInterpreterProcessListener, DependencyResolver depResolver) throws InterpreterException, IOException { this.conf = conf; this.defaultOption = defaultOption; this.angularObjectRegistryListener = angularObjectRegistryListener; this.depResolver = depResolver; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS); interpreterClassList = replsConf.split(","); @@ -500,7 +506,8 @@ public List getNoteInterpreterSettingBinding(String noteId) { /** * Change interpreter property and restart - * @param name + * @param id + * @param option * @param properties * @throws IOException */ @@ -659,7 +666,7 @@ private Interpreter createRemoteRepl(String interpreterPath, String className, int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter( property, className, conf.getInterpreterRemoteRunnerPath(), - interpreterPath, connectTimeout)); + interpreterPath, connectTimeout, remoteInterpreterProcessListener)); return intp; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java index 5a7e966c99e..13877304810 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java @@ -17,11 +17,9 @@ package org.apache.zeppelin.notebook; -import org.apache.zeppelin.scheduler.JobListener; - /** * TODO(moon): provide description. */ public interface JobListenerFactory { - public JobListener getParagraphJobListener(Note note); + public ParagraphJobListener getParagraphJobListener(Note note); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 392b9682ab2..10f080dd03a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -19,39 +19,43 @@ import java.io.IOException; import java.io.Serializable; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.Input; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.utility.IdHashes; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; import org.apache.zeppelin.search.SearchService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.gson.Gson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Binded interpreters for a note */ public class Note implements Serializable, JobListener { + static Logger logger = LoggerFactory.getLogger(Note.class); private static final long serialVersionUID = 7920699076577612429L; + // threadpool for delayed persist of note + private static final ScheduledThreadPoolExecutor delayedPersistThreadPool = + new ScheduledThreadPoolExecutor(0); + static { + delayedPersistThreadPool.setRemoveOnCancelPolicy(true); + } + final List paragraphs = new LinkedList<>(); + private String name = ""; private String id; @@ -62,6 +66,7 @@ public class Note implements Serializable, JobListener { private transient JobListenerFactory jobListenerFactory; private transient NotebookRepo repo; private transient SearchService index; + private transient ScheduledFuture delayedPersist; /** * note configurations. @@ -144,9 +149,8 @@ public Map> getAngularObjects() { /** * Add paragraph last. - * - * @param p */ + public Paragraph addParagraph() { Paragraph p = new Paragraph(this, this, replLoader); synchronized (paragraphs) { @@ -187,7 +191,6 @@ public void addCloneParagraph(Paragraph srcParagraph) { * Insert paragraph in given index. * * @param index - * @param p */ public Paragraph insertParagraph(int index) { Paragraph p = new Paragraph(this, this, replLoader); @@ -339,8 +342,6 @@ public List> generateParagraphsInfo (){ /** * Run all paragraphs sequentially. - * - * @param jobListener */ public void runAll() { synchronized (paragraphs) { @@ -400,15 +401,55 @@ private void snapshotAngularObjectRegistry() { } public void persist() throws IOException { + stopDelayedPersistTimer(); snapshotAngularObjectRegistry(); index.updateIndexDoc(this); repo.save(this); } + /** + * Persist this note with maximum delay. + * @param maxDelaySec + */ + public void persist(int maxDelaySec) { + startDelayedPersistTimer(maxDelaySec); + } + public void unpersist() throws IOException { repo.remove(id()); } + + private void startDelayedPersistTimer(int maxDelaySec) { + synchronized (this) { + if (delayedPersist != null) { + return; + } + + delayedPersist = delayedPersistThreadPool.schedule(new Runnable() { + + @Override + public void run() { + try { + persist(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + }, maxDelaySec, TimeUnit.SECONDS); + } + } + + private void stopDelayedPersistTimer() { + synchronized (this) { + if (delayedPersist == null) { + return; + } + + delayedPersist.cancel(false); + } + } + public Map getConfig() { if (config == null) { config = new HashMap<>(); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 433095b379c..65210f52bdb 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.*; @@ -213,7 +214,29 @@ protected Object jobRun() throws Throwable { if (Code.KEEP_PREVIOUS_RESULT == ret.code()) { return getReturn(); } - return ret; + + String message = ""; + + context.out.flush(); + InterpreterResult.Type outputType = context.out.getType(); + byte[] interpreterOutput = context.out.toByteArray(); + context.out.clear(); + + if (interpreterOutput != null && interpreterOutput.length > 0) { + message = new String(interpreterOutput); + } + + if (message.isEmpty()) { + return ret; + } else { + String interpreterResultMessage = ret.message(); + if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) { + message += interpreterResultMessage; + return new InterpreterResult(ret.code(), ret.type(), message); + } else { + return new InterpreterResult(ret.code(), outputType, message); + } + } } finally { InterpreterContext.remove(); } @@ -244,6 +267,7 @@ private InterpreterContext getInterpreterContext() { runners.add(new ParagraphRunner(note, note.id(), p.getId())); } + final Paragraph self = this; InterpreterContext interpreterContext = new InterpreterContext( note.id(), getId(), @@ -252,7 +276,34 @@ private InterpreterContext getInterpreterContext() { this.getConfig(), this.settings, registry, - runners); + runners, + new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + updateParagraphResult(out); + ((ParagraphJobListener) getListener()).onOutputAppend(self, out, new String(line)); + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + updateParagraphResult(out); + ((ParagraphJobListener) getListener()).onOutputUpdate(self, out, + new String(output)); + } + + private void updateParagraphResult(InterpreterOutput out) { + // update paragraph result + Throwable t = null; + String message = null; + try { + message = new String(out.toByteArray()); + } catch (IOException e) { + logger().error(e.getMessage(), e); + t = e; + } + setReturn(new InterpreterResult(Code.SUCCESS, out.getType(), message), t); + } + })); return interpreterContext; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java new file mode 100644 index 00000000000..f6404d76ae7 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook; + +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.scheduler.JobListener; + +/** + * Listen paragraph update + */ +public interface ParagraphJobListener extends JobListener { + public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output); + public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output); +} diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java index abd0e3bf11b..17d91ccb86f 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java @@ -55,8 +55,8 @@ public void setUp() throws Exception { System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath()); System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2"); conf = new ZeppelinConfiguration(); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); - context = new InterpreterContext("note", "id", "title", "text", null, null, null, null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null); + context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null); } @@ -140,7 +140,7 @@ public void testSaveLoad() throws InterpreterException, IOException { factory.add("newsetting", "mock1", new InterpreterOption(false), new Properties()); assertEquals(3, factory.get().size()); - InterpreterFactory factory2 = new InterpreterFactory(conf, null, null); + InterpreterFactory factory2 = new InterpreterFactory(conf, null, null, null); assertEquals(3, factory2.get().size()); } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java index a0455eb7ccd..05451e14006 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java @@ -58,7 +58,7 @@ public void setUp() throws Exception { MockInterpreter11.register("mock11", "group1", "org.apache.zeppelin.interpreter.mock.MockInterpreter11"); MockInterpreter2.register("mock2", "group2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null); } @After diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 34f7a1bb108..8529f9c20ab 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -38,6 +38,7 @@ import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.mock.MockInterpreter1; import org.apache.zeppelin.interpreter.mock.MockInterpreter2; @@ -85,7 +86,7 @@ public void setUp() throws Exception { MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null); SearchService search = mock(SearchService.class); notebookRepo = new VFSNotebookRepo(conf); @@ -172,7 +173,8 @@ public void testPersist() throws IOException, SchedulerException{ note.persist(); Notebook notebook2 = new Notebook( - conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null, null), this, null); + conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null, null, null), this, + null); assertEquals(1, notebook2.getAllNotes().size()); } @@ -411,8 +413,16 @@ else if(file.isDirectory()){ } @Override - public JobListener getParagraphJobListener(Note note) { - return new JobListener(){ + public ParagraphJobListener getParagraphJobListener(Note note) { + return new ParagraphJobListener(){ + + @Override + public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) { + } + + @Override + public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) { + } @Override public void onProgressUpdate(Job job, int progress) { diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java index 60b3ba35566..31970afd133 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java @@ -30,12 +30,10 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.mock.MockInterpreter1; import org.apache.zeppelin.interpreter.mock.MockInterpreter2; -import org.apache.zeppelin.notebook.JobListenerFactory; -import org.apache.zeppelin.notebook.Note; -import org.apache.zeppelin.notebook.Notebook; -import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.notebook.*; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; @@ -87,7 +85,7 @@ public void setUp() throws Exception { MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null); SearchService search = mock(SearchService.class); notebookRepoSync = new NotebookRepoSync(conf); @@ -224,8 +222,16 @@ else if(file.isDirectory()){ } @Override - public JobListener getParagraphJobListener(Note note) { - return new JobListener(){ + public ParagraphJobListener getParagraphJobListener(Note note) { + return new ParagraphJobListener(){ + + @Override + public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) { + } + + @Override + public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) { + } @Override public void onProgressUpdate(Job job, int progress) { diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java index cff086dc843..2e2801c6635 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java @@ -30,10 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.mock.MockInterpreter1; -import org.apache.zeppelin.notebook.JobListenerFactory; -import org.apache.zeppelin.notebook.Note; -import org.apache.zeppelin.notebook.Notebook; -import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.notebook.*; import org.apache.zeppelin.scheduler.JobListener; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.apache.zeppelin.search.SearchService; @@ -76,7 +73,7 @@ public void setUp() throws Exception { MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); this.schedulerFactory = new SchedulerFactory(); - factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null); + factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null); SearchService search = mock(SearchService.class); notebookRepo = new VFSNotebookRepo(conf); @@ -140,7 +137,7 @@ public void run() { } @Override - public JobListener getParagraphJobListener(Note note) { + public ParagraphJobListener getParagraphJobListener(Note note) { return null; } }