Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,22 @@ public void run(String paragraphId) {
}
}

/**
* Check whether all paragraphs belongs to this note has terminated
* @return
*/
public boolean isTerminated() {
synchronized (paragraphs) {
for (Paragraph p : paragraphs) {
if (!p.isTerminated()) {
return false;
}
}
}

return true;
}

public List<InterpreterCompletion> completion(String paragraphId, String buffer, int cursor) {
Paragraph p = getParagraph(paragraphId);
p.setNoteReplLoader(replLoader);
Expand Down Expand Up @@ -561,5 +577,4 @@ public void afterStatusChange(Job job, Status before, Status after) {

@Override
public void onProgressUpdate(Job job, int progress) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
Note note = notebook.getNote(noteId);
note.runAll();

while (!note.getLastParagraph().isTerminated()) {
while (!note.isTerminated()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,22 @@ public class MockInterpreter1 extends Interpreter{
public MockInterpreter1(Properties property) {
super(property);
}
boolean open;


@Override
public void open() {
open = true;
}

@Override
public void close() {
open = false;
}


public boolean isOpen() {
return open;
}

@Override
Expand All @@ -51,6 +60,13 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
if ("getId".equals(st)) {
// get unique id of this interpreter instance
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
} else if (st.startsWith("sleep")) {
try {
Thread.sleep(Integer.parseInt(st.split(" ")[1]));
} catch (InterruptedException e) {
// nothing to do
}
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
} else {
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,37 @@ public MockInterpreter2(Properties property) {
super(property);
}

boolean open;

@Override
public void open() {
open = true;
}

@Override
public void close() {
open = false;
}

public boolean isOpen() {
return open;
}


@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
InterpreterResult result;

if ("getId".equals(st)) {
// get unique id of this interpreter instance
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
} else if (st.startsWith("sleep")) {
try {
Thread.sleep(Integer.parseInt(st.split(" ")[1]));
} catch (InterruptedException e) {
// nothing to do
}
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st);
} else {
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@

package org.apache.zeppelin.notebook;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;

import java.io.File;
Expand Down Expand Up @@ -284,36 +280,47 @@ public void testAutoRestartInterpreterAfterSchedule() throws InterruptedExceptio
Paragraph p = note.addParagraph();
Map config = new HashMap<String, Object>();
p.setConfig(config);
p.setText("p1");
p.setText("sleep 1000");

Paragraph p2 = note.addParagraph();
p2.setConfig(config);
p2.setText("%mock2 sleep 500");

// set cron scheduler, once a second
config = note.getConfig();
config.put("enabled", true);
config.put("cron", "* * * * * ?");
config.put("releaseresource", "true");
config.put("cron", "1/3 * * * * ?");
config.put("releaseresource", true);
note.setConfig(config);
notebook.refreshCron(note.id());
while (p.getStatus() != Status.FINISHED) {
Thread.sleep(100);
}
Date dateFinished = p.getDateFinished();
assertNotNull(dateFinished);

// restart interpreter
for (InterpreterSetting setting : note.getNoteReplLoader().getInterpreterSettings()) {
notebook.getInterpreterFactory().restart(setting.id());

MockInterpreter1 mock1 = ((MockInterpreter1) (((ClassloaderInterpreter)
((LazyOpenInterpreter) note.getNoteReplLoader().get("mock1")).getInnerInterpreter())
.getInnerInterpreter()));

MockInterpreter2 mock2 = ((MockInterpreter2) (((ClassloaderInterpreter)
((LazyOpenInterpreter) note.getNoteReplLoader().get("mock2")).getInnerInterpreter())
.getInnerInterpreter()));

// wait until interpreters are started
while (!mock1.isOpen() || !mock2.isOpen()) {
Thread.yield();
}

Thread.sleep(1000);
while (p.getStatus() != Status.FINISHED) {
Thread.sleep(100);
// wait until interpreters are closed
while (mock1.isOpen() || mock2.isOpen()) {
Thread.yield();
}
assertNotEquals(dateFinished, p.getDateFinished());


// remove cron scheduler.
config.put("cron", null);
note.setConfig(config);
notebook.refreshCron(note.id());

// make sure all paragraph has been executed
assertNotNull(p.getDateFinished());
assertNotNull(p2.getDateFinished());
}

@Test
Expand Down