diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index 3168f04787e..9a61be637ec 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -40,7 +40,7 @@ public static void setUp() { Properties p = new Properties(); flink = new FlinkInterpreter(p); flink.open(); - context = new InterpreterContext(null, null, null, null, null, null, null, null); + context = new InterpreterContext(null, null, null, null, null, null, null, null, null); } @AfterClass diff --git a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java index c22080d57f0..c86fcf3371e 100644 --- a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java +++ b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java @@ -79,9 +79,9 @@ public void readTest() throws IOException { HiveInterpreter t = new HiveInterpreter(properties); t.open(); - assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message().contains("SCHEMA_NAME")); + assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null)).message().contains("SCHEMA_NAME")); assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", - t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message()); + t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message()); } @Test @@ -101,7 +101,7 @@ public void readTestWithConfiguration() throws IOException { t.open(); assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", - t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message()); + t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message()); } @Test @@ -117,13 +117,13 @@ public void jdbcRestart() throws IOException, SQLException, ClassNotFoundExcepti t.open(); InterpreterResult interpreterResult = - t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)); + t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)); assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message()); t.getConnection("default").close(); interpreterResult = - t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)); + t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)); assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message()); } @@ -139,7 +139,7 @@ public void test() throws IOException { HiveInterpreter t = new HiveInterpreter(properties); t.open(); - InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null); + InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null); //simple select test InterpreterResult result = t.interpret("select * from test_table", interpreterContext); diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java index f46b049ccb3..cf9808389e5 100644 --- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java +++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java @@ -40,7 +40,7 @@ public class IgniteInterpreterTest { private static final String HOST = "127.0.0.1:47500..47509"; private static final InterpreterContext INTP_CONTEXT = - new InterpreterContext(null, null, null, null, null, null, null, null); + new InterpreterContext(null, null, null, null, null, null, null, null, null); private IgniteInterpreter intp; private Ignite ignite; diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java index 3843704f4ea..8ac212f8572 100644 --- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java +++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java @@ -43,7 +43,7 @@ public class IgniteSqlInterpreterTest { private static final String HOST = "127.0.0.1:47500..47509"; private static final InterpreterContext INTP_CONTEXT = - new InterpreterContext(null, null, null, null, null, null, null, null); + new InterpreterContext(null, null, null, null, null, null, null, null, null); private Ignite ignite; private IgniteSqlInterpreter intp; diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java index efa8fae2546..2b5613a2950 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java @@ -60,7 +60,7 @@ public void setUp() throws Exception { context = new InterpreterContext("note", "id", "title", "text", new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList()); + new LinkedList(), null); } @After diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index be65c0937df..8e671c85f48 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -78,7 +78,7 @@ public void setUp() throws Exception { context = new InterpreterContext("note", "id", "title", "text", new HashMap(), new GUI(), new AngularObjectRegistry( intpGroup.getId(), null), - new LinkedList()); + new LinkedList(), null); } @After diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index bb818fd2c56..282e49b1610 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -65,7 +65,7 @@ public void setUp() throws Exception { } context = new InterpreterContext("note", "id", "title", "text", new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList()); + new LinkedList(), null); } @After diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java index 0417f9108bc..e3f6b59bc41 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -29,6 +29,7 @@ public class InterpreterContext { private static final ThreadLocal threadIC = new ThreadLocal(); + public final InterpreterOutput out; public static InterpreterContext get() { return threadIC.get(); @@ -58,7 +59,8 @@ public InterpreterContext(String noteId, Map config, GUI gui, AngularObjectRegistry angularObjectRegistry, - List runners + List runners, + InterpreterOutput out ) { this.noteId = noteId; this.paragraphId = paragraphId; @@ -68,6 +70,7 @@ public InterpreterContext(String noteId, this.gui = gui; this.angularObjectRegistry = angularObjectRegistry; this.runners = runners; + this.out = out; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java new file mode 100644 index 00000000000..95f7b475c2b --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.util.LinkedList; +import java.util.List; + +/** + * InterpreterOutput is OutputStream that supposed to print content on notebook + * in addition to InterpreterResult which used to return from Interpreter.interpret(). + */ +public class InterpreterOutput extends OutputStream { + private final List outList = new LinkedList(); + private InterpreterOutputChangeWatcher watcher; + + public InterpreterOutput() { + clear(); + } + + public InterpreterOutput(InterpreterOutputChangeListener listener) throws IOException { + clear(); + watcher = new InterpreterOutputChangeWatcher(listener); + watcher.start(); + } + + public void clear() { + synchronized (outList) { + outList.clear(); + if (watcher != null) { + watcher.clear(); + } + } + } + + @Override + public void write(int b) throws IOException { + synchronized (outList) { + outList.add(b); + } + } + + @Override + public void write(byte [] b) throws IOException { + synchronized (outList) { + outList.add(b); + } + } + + @Override + public void write(byte [] b, int off, int len) throws IOException { + synchronized (outList) { + byte[] buf = new byte[len]; + System.arraycopy(b, off, buf, 0, len); + outList.add(buf); + } + } + + /** + * In dev mode, it monitors file and update ZeppelinServer + * @param file + * @throws IOException + */ + public void write(File file) throws IOException { + outList.add(file); + if (watcher != null) { + watcher.watch(file); + } + } + + public void write(String string) throws IOException { + write(string.getBytes()); + } + + /** + * write contents in the resource file in the classpath + * @param url + * @throws IOException + */ + public void write(URL url) throws IOException { + if ("file".equals(url.getProtocol())) { + write(new File(url.getPath())); + } else { + outList.add(url); + } + } + + public void writeResource(String resourceName) throws IOException { + // search file under resource dir first for dev mode + File mainResource = new File("./src/main/resources/" + resourceName); + File testResource = new File("./src/test/resources/" + resourceName); + if (mainResource.isFile()) { + write(mainResource); + } else if (testResource.isFile()) { + write(testResource); + } else { + // search from classpath + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + if (cl == null) { + cl = this.getClass().getClassLoader(); + } + if (cl == null) { + cl = ClassLoader.getSystemClassLoader(); + } + + write(cl.getResource(resourceName)); + } + } + + public byte[] toByteArray() throws IOException { + return toByteArray(false); + } + + public byte[] toByteArray(boolean clear) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + List all = new LinkedList(); + + synchronized (outList) { + all.addAll(outList); + } + + for (Object o : all) { + if (o instanceof File) { + File f = (File) o; + FileInputStream fin = new FileInputStream(f); + copyStream(fin, out); + fin.close(); + } else if (o instanceof byte[]) { + out.write((byte[]) o); + } else if (o instanceof Integer) { + out.write((int) o); + } else if (o instanceof URL) { + InputStream fin = ((URL) o).openStream(); + copyStream(fin, out); + fin.close(); + } else { + // can not handle the object + } + } + + if (clear) { + clear(); + } + + out.close(); + return out.toByteArray(); + } + + private void copyStream(InputStream in, OutputStream out) throws IOException { + int bufferSize = 8192; + byte[] buffer = new byte[bufferSize]; + + while (true) { + int bytesRead = in.read(buffer); + if (bytesRead == -1) { + break; + } else { + out.write(buffer, 0, bytesRead); + } + } + } + + @Override + public void close() throws IOException { + if (watcher != null) { + watcher.clear(); + watcher.shutdown(); + } + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java new file mode 100644 index 00000000000..a639e0c6418 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +import java.io.File; + +/** + * InterpreterOutputChangeListener + */ +public interface InterpreterOutputChangeListener { + public void fileChanged(File file); + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java new file mode 100644 index 00000000000..5fe8237c257 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; +import static java.nio.file.StandardWatchEventKinds.OVERFLOW; +import java.io.File; +import java.io.IOException; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Watch the change for the development mode support + */ +public class InterpreterOutputChangeWatcher extends Thread { + Logger logger = LoggerFactory.getLogger(InterpreterOutputChangeWatcher.class); + + private WatchService watcher; + private final List watchFiles = new LinkedList(); + private final Map watchKeys = new HashMap(); + private InterpreterOutputChangeListener listener; + private boolean stop; + + public InterpreterOutputChangeWatcher(InterpreterOutputChangeListener listener) + throws IOException { + watcher = FileSystems.getDefault().newWatchService(); + this.listener = listener; + } + + public void watch(File file) throws IOException { + String dirString; + if (file.isFile()) { + dirString = file.getParentFile().getAbsolutePath(); + } else { + throw new IOException(file.getName() + " is not a file"); + } + + if (dirString == null) { + dirString = "/"; + } + + Path dir = FileSystems.getDefault().getPath(dirString); + logger.info("watch " + dir); + WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); + synchronized (watchKeys) { + watchKeys.put(key, new File(dirString)); + watchFiles.add(file); + } + } + + public void clear() { + synchronized (watchKeys) { + for (WatchKey key : watchKeys.keySet()) { + key.cancel(); + + } + watchKeys.clear(); + watchFiles.clear(); + } + } + + public void shutdown() throws IOException { + stop = true; + clear(); + watcher.close(); + } + + public void run() { + while (!stop) { + WatchKey key = null; + try { + key = watcher.poll(1, TimeUnit.SECONDS); + } catch (InterruptedException | ClosedWatchServiceException e) { + break; + } + + if (key == null) { + continue; + } + for (WatchEvent event : key.pollEvents()) { + WatchEvent.Kind kind = event.kind(); + if (kind == OVERFLOW) { + continue; + } + WatchEvent ev = (WatchEvent) event; + Path filename = ev.context(); + // search for filename + synchronized (watchKeys) { + for (File f : watchFiles) { + if (f.getName().compareTo(filename.toString()) == 0) { + File changedFile; + if (filename.isAbsolute()) { + changedFile = new File(filename.toString()); + } else { + changedFile = new File(watchKeys.get(key), filename.toString()); + } + logger.info("File change detected " + changedFile.getAbsolutePath()); + if (listener != null) { + listener.fileChanged(changedFile); + } + } + } + } + } + + boolean valid = key.reset(); + if (!valid) { + break; + } + } + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java index 593cfc76ce9..d2137961464 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java @@ -146,4 +146,8 @@ public InterpreterResult type(Type type) { this.type = type; return this; } + + public String toString() { + return "%" + type.name().toLowerCase() + " " + msg; + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 46a07083468..c423af1a31b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -35,15 +35,8 @@ import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.ClassloaderInterpreter; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterContextRunner; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; @@ -298,7 +291,25 @@ protected Object jobRun() throws Throwable { try { InterpreterContext.set(context); InterpreterResult result = interpreter.interpret(script, context); - return result; + + // prepend context.out to result for now + // later, context.out should be streamed to the front-end + String output = null; + + byte[] interpreterOutput = context.out.toByteArray(true); + if (interpreterOutput != null) { + output = new String(interpreterOutput); + } + + if (result.message() != null) { + if (output == null || output.length() == 0) { + output = result.toString(); + } else { + output += result.message(); + } + } + + return new InterpreterResult(result.code(), output); } finally { InterpreterContext.remove(); } @@ -349,7 +360,8 @@ public List completion(String className, String buf, int cursor) throws private InterpreterContext convert(RemoteInterpreterContext ric) { List contextRunners = new LinkedList(); List runners = gson.fromJson(ric.getRunners(), - new TypeToken>(){}.getType()); + new TypeToken>() { + }.getType()); for (InterpreterContextRunner r : runners) { contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId())); @@ -364,7 +376,12 @@ private InterpreterContext convert(RemoteInterpreterContext ric) { new TypeToken>() {}.getType()), gson.fromJson(ric.getGui(), GUI.class), interpreterGroup.getAngularObjectRegistry(), - contextRunners); + contextRunners, createInterpreterOutput()); + } + + + private InterpreterOutput createInterpreterOutput() { + return new InterpreterOutput(); } @@ -468,7 +485,6 @@ public RemoteInterpreterEvent getEvent() throws TException { /** * called when object is updated in client (web) side. - * @param className * @param name * @param noteId noteId where the update issues * @param object diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java index 080bdaa4337..9c2732dd2f8 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java @@ -27,7 +27,7 @@ public class InterpreterContextTest { public void testThreadLocal() { assertNull(InterpreterContext.get()); - InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null)); + InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null)); assertNotNull(InterpreterContext.get()); InterpreterContext.remove(); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java new file mode 100644 index 00000000000..e37680905bb --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChangeListener { + private File tmpDir; + private File fileChanged; + private int numChanged; + private InterpreterOutputChangeWatcher watcher; + + @Before + public void setUp() throws Exception { + watcher = new InterpreterOutputChangeWatcher(this); + watcher.start(); + + tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis()); + tmpDir.mkdirs(); + fileChanged = null; + numChanged = 0; + } + + @After + public void tearDown() throws Exception { + watcher.shutdown(); + delete(tmpDir); + } + + private void delete(File file){ + if(file.isFile()) file.delete(); + else if(file.isDirectory()){ + File [] files = file.listFiles(); + if(files!=null && files.length>0){ + for(File f : files){ + delete(f); + } + } + file.delete(); + } + } + + + @Test + public void test() throws IOException, InterruptedException { + assertNull(fileChanged); + assertEquals(0, numChanged); + + Thread.sleep(1000); + // create new file + File file1 = new File(tmpDir, "test1"); + file1.createNewFile(); + + File file2 = new File(tmpDir, "test2"); + file2.createNewFile(); + + watcher.watch(file1); + Thread.sleep(1000); + + FileOutputStream out1 = new FileOutputStream(file1); + out1.write(1); + out1.close(); + + FileOutputStream out2 = new FileOutputStream(file2); + out2.write(1); + out2.close(); + + synchronized (this) { + wait(30*1000); + } + + assertNotNull(fileChanged); + assertEquals(1, numChanged); + } + + + @Override + public void fileChanged(File file) { + fileChanged = file; + numChanged++; + + synchronized(this) { + notify(); + } + } + +} \ No newline at end of file diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java new file mode 100644 index 00000000000..ecd1c9e7f79 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class InterpreterOutputTest { + private InterpreterOutput out; + + + @Before + public void setUp() { + out = new InterpreterOutput(); + } + + @After + public void tearDown() throws IOException { + out.close(); + } + + + @Test + public void testWrite() throws IOException { + out.write(1); + assertEquals(1, out.toByteArray()[0]); + } + + @Test + public void testStringWrite() throws IOException { + Writer writer = new OutputStreamWriter(out); + writer.write("hello"); + writer.flush(); + assertEquals("hello", new String(out.toByteArray())); + } +} \ No newline at end of file diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java index 007730afc4a..d7ab9e8fbe0 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java @@ -105,4 +105,9 @@ public void testComplexMagicData() { "123\n", result.message()); } + @Test + public void testToString() { + assertEquals("%html hello", new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html hello").toString()); + } + } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java index 29a1fb11972..6e9773320d6 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java @@ -83,7 +83,7 @@ public void setUp() throws Exception { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList()); + new LinkedList(), null); intp.open(); } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index bbda252ed21..307e35e2c83 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -113,7 +113,7 @@ public void testRemoteInterperterCall() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); intpB.open(); assertEquals(2, process.referenceCount()); @@ -153,7 +153,7 @@ public void testRemoteInterperterErrorStatus() throws TTransportException, IOExc new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); assertEquals(Code.ERROR, ret.code()); } @@ -199,7 +199,7 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); assertEquals("500", ret.message()); ret = intpB.interpret("500", @@ -211,7 +211,7 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); assertEquals("1000", ret.message()); long end = System.currentTimeMillis(); assertTrue(end - start >= 1000); @@ -279,7 +279,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); } @Override @@ -313,7 +313,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); } @Override @@ -388,7 +388,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); synchronized (results) { results.add(ret.message()); @@ -472,7 +472,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); synchronized (results) { results.add(ret.message()); @@ -591,7 +591,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); } @Override diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index d17df4f1406..51d18a69d9f 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -103,7 +103,7 @@ protected Object jobRun() throws Throwable { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList())); + new LinkedList(), null)); return "1000"; } @@ -173,7 +173,7 @@ public void testAbortOnPending() throws Exception { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList()); + new LinkedList(), null); @Override public int progress() { @@ -209,7 +209,7 @@ protected boolean jobAbort() { new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList()); + new LinkedList(), null); @Override public int progress() { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index ec47efdde61..3b908b808cd 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -210,7 +210,20 @@ protected Object jobRun() throws Throwable { InterpreterContext context = getInterpreterContext(); InterpreterContext.set(context); InterpreterResult ret = repl.interpret(script, context); - + byte[] interpreterOutput = context.out.toByteArray(true); + + // data from context.out is prepended to ret.message for now. + // later context.out should be streamed to the front-end. + String message = null; + if (interpreterOutput != null && interpreterOutput.length > 0) { + // something printed in InterpreterOutput + message = new String(interpreterOutput); + + if (ret.message() != null) { + message += ret.message(); + } + return new InterpreterResult(ret.code(), message); + } if (Code.KEEP_PREVIOUS_RESULT == ret.code()) { return getReturn(); } @@ -253,7 +266,8 @@ private InterpreterContext getInterpreterContext() { this.getConfig(), this.settings, registry, - runners); + runners, + new InterpreterOutput()); return interpreterContext; } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java index 6a69b83b2e9..d204a2a2685 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java @@ -56,7 +56,7 @@ public void setUp() throws Exception { System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2"); conf = new ZeppelinConfiguration(); factory = new InterpreterFactory(conf, new InterpreterOption(false), null); - context = new InterpreterContext("note", "id", "title", "text", null, null, null, null); + context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null); }