diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java index 328a1b2f895..b4fbb3d5489 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java @@ -193,7 +193,7 @@ public void testRunParagraphSynchronously() throws IOException { } @Test - public void testRunAllParagraph_AllSuccess() throws IOException { + public void testRunNoteBlocking() throws IOException { Note note1 = null; try { note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); @@ -231,6 +231,48 @@ public void testRunAllParagraph_AllSuccess() throws IOException { } } + @Test + public void testRunNoteNonBlocking() throws Exception { + Note note1 = null; + try { + note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); + // 2 paragraphs + // P1: + // %python + // import time + // time.sleep(5) + // name='hello' + // z.put('name', name) + // P2: + // %%sh(interpolate=true) + // echo '{name}' + // + Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); + p1.setText("%python import time\ntime.sleep(5)\nname='hello'\nz.put('name', name)"); + p2.setText("%sh(interpolate=true) echo '{name}'"); + + PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?waitToFinish=false", ""); + assertThat(post, isAllowed()); + Map resp = gson.fromJson(post.getResponseBodyAsString(), + new TypeToken>() {}.getType()); + assertEquals(resp.get("status"), "OK"); + post.releaseConnection(); + + p1.waitUntilFinished(); + p2.waitUntilFinished(); + + assertEquals(Job.Status.FINISHED, p1.getStatus()); + assertEquals(Job.Status.FINISHED, p2.getStatus()); + assertEquals("hello\n", p2.getReturn().message().get(0).getData()); + } finally { + // cleanup + if (null != note1) { + TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + } + } + } + @Test public void testRunAllParagraph_FirstFailed() throws IOException { Note note1 = null; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 692224b29f8..d1604c1bbff 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -391,11 +391,24 @@ public String call(Client client) throws Exception { public Scheduler getScheduler() { // one session own one Scheduler, so that when one session is closed, all the jobs/paragraphs // running under the scheduler of this session will be aborted. - Scheduler s = new RemoteScheduler( - RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-" + sessionId, - SchedulerFactory.singleton().getExecutor(), - this); - return SchedulerFactory.singleton().createOrGetScheduler(s); + String executionMode = getProperty(".execution.mode", "paragraph"); + if (executionMode.equals("paragraph")) { + Scheduler s = new RemoteScheduler( + RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-" + sessionId, + SchedulerFactory.singleton().getExecutor(), + this); + return SchedulerFactory.singleton().createOrGetScheduler(s); + } else if (executionMode.equals("note")) { + String noteId = getProperty(".noteId"); + Scheduler s = new RemoteScheduler( + RemoteInterpreter.class.getSimpleName() + "-" + noteId, + SchedulerFactory.singleton().getExecutor(), + this); + return SchedulerFactory.singleton().createOrGetScheduler(s); + } else { + throw new RuntimeException("Invalid execution mode: " + executionMode); + } + } private RemoteInterpreterContext convert(InterpreterContext ic) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index ce7049109a0..55cb4f429a4 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -747,7 +747,8 @@ private void setParagraphMagic(Paragraph p, int index) { } } - public void runAll(AuthenticationInfo authenticationInfo, boolean blocking) throws Exception { + public void runAll(AuthenticationInfo authenticationInfo, + boolean blocking) throws Exception { setRunning(true); try { for (Paragraph p : getParagraphs()) { @@ -755,6 +756,17 @@ public void runAll(AuthenticationInfo authenticationInfo, boolean blocking) thro continue; } p.setAuthenticationInfo(authenticationInfo); + try { + Interpreter interpreter = p.getBindedInterpreter(); + if (interpreter != null) { + // set interpreter property to execution.mode to be note + // so that it could use the correct scheduler. see ZEPPELIN-4832 + interpreter.setProperty(".execution.mode", "note"); + interpreter.setProperty(".noteId", id); + } + } catch (InterpreterNotFoundException e) { + // ignore, because the following run method will fail if interpreter not found. + } if (!run(p.getId(), blocking)) { logger.warn("Skip running the remain notes because paragraph {} fails", p.getId()); throw new Exception("Fail to run note because paragraph " + p.getId() + " is failed, " + @@ -787,7 +799,9 @@ public boolean run(String paragraphId, boolean blocking) { * @param ctxUser * @return */ - public boolean run(String paragraphId, boolean blocking, String ctxUser) { + public boolean run(String paragraphId, + boolean blocking, + String ctxUser) { Paragraph p = getParagraph(paragraphId); if (isPersonalizedMode() && ctxUser != null) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 0ff95c3faf2..0086a47b335 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -707,6 +707,14 @@ public void checkpointOutput() { } } + @VisibleForTesting + public void waitUntilFinished() throws Exception { + while(!isTerminated()) { + LOGGER.debug("Wait for paragraph to be finished"); + Thread.sleep(1000); + } + } + private GUI getNoteGui() { GUI gui = new GUI(); gui.setParams(this.note.getNoteParams()); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index 5f19df140a0..3797c8bc8d8 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -47,14 +47,31 @@ public RemoteScheduler(String name, public void runJobInScheduler(Job job) { JobRunner jobRunner = new JobRunner(this, job); executor.execute(jobRunner); - // wait until it is submitted to the remote - while (!jobRunner.isJobSubmittedInRemote()) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " + - "queue.wait", e); + String executionMode = + remoteInterpreter.getProperty(".execution.mode", "paragraph"); + if (executionMode.equals("paragraph")) { + // wait until it is submitted to the remote + while (!jobRunner.isJobSubmittedInRemote()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " + + "queue.wait", e); + } } + } else if (executionMode.equals("note")){ + // wait until it is finished + while (!jobRunner.isJobExecuted()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobExecuted " + + "queue.wait", e); + } + } + } else { + throw new RuntimeException("Invalid job execution.mode: " + executionMode + + ", only 'note' and 'paragraph' are valid"); } } @@ -152,6 +169,10 @@ public boolean isJobSubmittedInRemote() { return jobSubmittedRemotely; } + public boolean isJobExecuted() { + return jobExecuted; + } + @Override public void run() { JobStatusPoller jobStatusPoller = new JobStatusPoller(job, this, 100);