Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void testRunParagraphSynchronously() throws IOException {
}

@Test
public void testRunAllParagraph_AllSuccess() throws IOException {
public void testRunNoteBlocking() throws IOException {
Note note1 = null;
try {
note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
Expand Down Expand Up @@ -231,6 +231,48 @@ public void testRunAllParagraph_AllSuccess() throws IOException {
}
}

@Test
public void testRunNoteNonBlocking() throws Exception {
Note note1 = null;
try {
note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
// 2 paragraphs
// P1:
// %python
// import time
// time.sleep(5)
// name='hello'
// z.put('name', name)
// P2:
// %%sh(interpolate=true)
// echo '{name}'
//
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
Paragraph p2 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p1.setText("%python import time\ntime.sleep(5)\nname='hello'\nz.put('name', name)");
p2.setText("%sh(interpolate=true) echo '{name}'");

PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?waitToFinish=false", "");
assertThat(post, isAllowed());
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
new TypeToken<Map<String, Object>>() {}.getType());
assertEquals(resp.get("status"), "OK");
post.releaseConnection();

p1.waitUntilFinished();
p2.waitUntilFinished();

assertEquals(Job.Status.FINISHED, p1.getStatus());
assertEquals(Job.Status.FINISHED, p2.getStatus());
assertEquals("hello\n", p2.getReturn().message().get(0).getData());
} finally {
// cleanup
if (null != note1) {
TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous);
}
}
}

@Test
public void testRunAllParagraph_FirstFailed() throws IOException {
Note note1 = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,24 @@ public String call(Client client) throws Exception {
public Scheduler getScheduler() {
// one session own one Scheduler, so that when one session is closed, all the jobs/paragraphs
// running under the scheduler of this session will be aborted.
Scheduler s = new RemoteScheduler(
RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-" + sessionId,
SchedulerFactory.singleton().getExecutor(),
this);
return SchedulerFactory.singleton().createOrGetScheduler(s);
String executionMode = getProperty(".execution.mode", "paragraph");
if (executionMode.equals("paragraph")) {
Scheduler s = new RemoteScheduler(
RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-" + sessionId,
SchedulerFactory.singleton().getExecutor(),
this);
return SchedulerFactory.singleton().createOrGetScheduler(s);
} else if (executionMode.equals("note")) {
String noteId = getProperty(".noteId");
Scheduler s = new RemoteScheduler(
RemoteInterpreter.class.getSimpleName() + "-" + noteId,
SchedulerFactory.singleton().getExecutor(),
this);
return SchedulerFactory.singleton().createOrGetScheduler(s);
} else {
throw new RuntimeException("Invalid execution mode: " + executionMode);
}

}

private RemoteInterpreterContext convert(InterpreterContext ic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,14 +747,26 @@ private void setParagraphMagic(Paragraph p, int index) {
}
}

public void runAll(AuthenticationInfo authenticationInfo, boolean blocking) throws Exception {
public void runAll(AuthenticationInfo authenticationInfo,
boolean blocking) throws Exception {
setRunning(true);
try {
for (Paragraph p : getParagraphs()) {
if (!p.isEnabled()) {
continue;
}
p.setAuthenticationInfo(authenticationInfo);
try {
Interpreter interpreter = p.getBindedInterpreter();
if (interpreter != null) {
// set interpreter property to execution.mode to be note
// so that it could use the correct scheduler. see ZEPPELIN-4832
interpreter.setProperty(".execution.mode", "note");
interpreter.setProperty(".noteId", id);
}
} catch (InterpreterNotFoundException e) {
// ignore, because the following run method will fail if interpreter not found.
}
if (!run(p.getId(), blocking)) {
logger.warn("Skip running the remain notes because paragraph {} fails", p.getId());
throw new Exception("Fail to run note because paragraph " + p.getId() + " is failed, " +
Expand Down Expand Up @@ -787,7 +799,9 @@ public boolean run(String paragraphId, boolean blocking) {
* @param ctxUser
* @return
*/
public boolean run(String paragraphId, boolean blocking, String ctxUser) {
public boolean run(String paragraphId,
boolean blocking,
String ctxUser) {
Paragraph p = getParagraph(paragraphId);

if (isPersonalizedMode() && ctxUser != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,14 @@ public void checkpointOutput() {
}
}

@VisibleForTesting
public void waitUntilFinished() throws Exception {
while(!isTerminated()) {
LOGGER.debug("Wait for paragraph to be finished");
Thread.sleep(1000);
}
}

private GUI getNoteGui() {
GUI gui = new GUI();
gui.setParams(this.note.getNoteParams());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,31 @@ public RemoteScheduler(String name,
public void runJobInScheduler(Job job) {
JobRunner jobRunner = new JobRunner(this, job);
executor.execute(jobRunner);
// wait until it is submitted to the remote
while (!jobRunner.isJobSubmittedInRemote()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " +
"queue.wait", e);
String executionMode =
remoteInterpreter.getProperty(".execution.mode", "paragraph");
if (executionMode.equals("paragraph")) {
// wait until it is submitted to the remote
while (!jobRunner.isJobSubmittedInRemote()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " +
"queue.wait", e);
}
}
} else if (executionMode.equals("note")){
// wait until it is finished
while (!jobRunner.isJobExecuted()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobExecuted " +
"queue.wait", e);
}
}
} else {
throw new RuntimeException("Invalid job execution.mode: " + executionMode +
", only 'note' and 'paragraph' are valid");
}
}

Expand Down Expand Up @@ -152,6 +169,10 @@ public boolean isJobSubmittedInRemote() {
return jobSubmittedRemotely;
}

public boolean isJobExecuted() {
return jobExecuted;
}

@Override
public void run() {
JobStatusPoller jobStatusPoller = new JobStatusPoller(job, this, 100);
Expand Down