diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 01d625abe00..de19fe00543 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -428,6 +428,22 @@ public void run(String paragraphId) { } } + /** + * Check whether all paragraphs belongs to this note has terminated + * @return + */ + public boolean isTerminated() { + synchronized (paragraphs) { + for (Paragraph p : paragraphs) { + if (!p.isTerminated()) { + return false; + } + } + } + + return true; + } + public List completion(String paragraphId, String buffer, int cursor) { Paragraph p = getParagraph(paragraphId); p.setNoteReplLoader(replLoader); @@ -561,5 +577,4 @@ public void afterStatusChange(Job job, Status before, Status after) { @Override public void onProgressUpdate(Job job, int progress) {} - } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 58a552d9ae0..30faeeafca5 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -634,7 +634,7 @@ public void execute(JobExecutionContext context) throws JobExecutionException { Note note = notebook.getNote(noteId); note.runAll(); - while (!note.getLastParagraph().isTerminated()) { + while (!note.isTerminated()) { try { Thread.sleep(1000); } catch (InterruptedException e) { diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java index 1b0ec1a5025..794ab6ccba2 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java @@ -35,13 +35,22 @@ public class MockInterpreter1 extends Interpreter{ public MockInterpreter1(Properties property) { super(property); } + boolean open; + @Override public void open() { + open = true; } @Override public void close() { + open = false; + } + + + public boolean isOpen() { + return open; } @Override @@ -51,6 +60,13 @@ public InterpreterResult interpret(String st, InterpreterContext context) { if ("getId".equals(st)) { // get unique id of this interpreter instance result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode()); + } else if (st.startsWith("sleep")) { + try { + Thread.sleep(Integer.parseInt(st.split(" ")[1])); + } catch (InterruptedException e) { + // nothing to do + } + result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st); } else { result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st); } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java index 0fe3a16c868..169bc3ce0da 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java @@ -36,14 +36,23 @@ public MockInterpreter2(Properties property) { super(property); } + boolean open; + @Override public void open() { + open = true; } @Override public void close() { + open = false; + } + + public boolean isOpen() { + return open; } + @Override public InterpreterResult interpret(String st, InterpreterContext context) { InterpreterResult result; @@ -51,6 +60,13 @@ public InterpreterResult interpret(String st, InterpreterContext context) { if ("getId".equals(st)) { // get unique id of this interpreter instance result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode()); + } else if (st.startsWith("sleep")) { + try { + Thread.sleep(Integer.parseInt(st.split(" ")[1])); + } catch (InterruptedException e) { + // nothing to do + } + result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st); } else { result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st); } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 53749d1ddea..1f7d5c07b8c 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -17,11 +17,7 @@ package org.apache.zeppelin.notebook; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.Mockito.mock; import java.io.File; @@ -284,36 +280,47 @@ public void testAutoRestartInterpreterAfterSchedule() throws InterruptedExceptio Paragraph p = note.addParagraph(); Map config = new HashMap(); p.setConfig(config); - p.setText("p1"); + p.setText("sleep 1000"); + + Paragraph p2 = note.addParagraph(); + p2.setConfig(config); + p2.setText("%mock2 sleep 500"); // set cron scheduler, once a second config = note.getConfig(); config.put("enabled", true); - config.put("cron", "* * * * * ?"); - config.put("releaseresource", "true"); + config.put("cron", "1/3 * * * * ?"); + config.put("releaseresource", true); note.setConfig(config); notebook.refreshCron(note.id()); - while (p.getStatus() != Status.FINISHED) { - Thread.sleep(100); - } - Date dateFinished = p.getDateFinished(); - assertNotNull(dateFinished); - // restart interpreter - for (InterpreterSetting setting : note.getNoteReplLoader().getInterpreterSettings()) { - notebook.getInterpreterFactory().restart(setting.id()); + + MockInterpreter1 mock1 = ((MockInterpreter1) (((ClassloaderInterpreter) + ((LazyOpenInterpreter) note.getNoteReplLoader().get("mock1")).getInnerInterpreter()) + .getInnerInterpreter())); + + MockInterpreter2 mock2 = ((MockInterpreter2) (((ClassloaderInterpreter) + ((LazyOpenInterpreter) note.getNoteReplLoader().get("mock2")).getInnerInterpreter()) + .getInnerInterpreter())); + + // wait until interpreters are started + while (!mock1.isOpen() || !mock2.isOpen()) { + Thread.yield(); } - Thread.sleep(1000); - while (p.getStatus() != Status.FINISHED) { - Thread.sleep(100); + // wait until interpreters are closed + while (mock1.isOpen() || mock2.isOpen()) { + Thread.yield(); } - assertNotEquals(dateFinished, p.getDateFinished()); - + // remove cron scheduler. config.put("cron", null); note.setConfig(config); notebook.refreshCron(note.id()); + + // make sure all paragraph has been executed + assertNotNull(p.getDateFinished()); + assertNotNull(p2.getDateFinished()); } @Test