Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ public static enum ConfVars {
ZEPPELIN_NOTEBOOK_S3_BUCKET("zeppelin.notebook.s3.bucket", "zeppelin"),
ZEPPELIN_NOTEBOOK_S3_USER("zeppelin.notebook.s3.user", "user"),
ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", VFSNotebookRepo.class.getName()),
// Notebook list and contents will be always loaded from repository if set true.
// If set false, modified notebooks or new notebooks added on file system level
// won't be reflected on Zeppelin till user restarts Zeppelin.
ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE("zeppelin.notebook.reloadAllNotesFromStorage", false),
ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner", "bin/interpreter.sh"),
// Decide when new note is created, interpreter settings will be binded automatically or not.
ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding", true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,23 @@ private void loadAllNotes() throws IOException {
}
}

/**
* Reload all notes from repository after clearing `notes`
* to reflect the changes of added/deleted/modified notebooks on file system level.
*
* @return
* @throws IOException
*/
private void reloadAllNotes() throws IOException {
synchronized (notes) {
notes.clear();
}
List<NoteInfo> noteInfos = notebookRepo.list();
for (NoteInfo info : noteInfos) {
loadNoteFromRepo(info.getId());
}
}

class SnapshotAngularObject {
String intpGroupId;
AngularObject angularObject;
Expand All @@ -288,6 +305,13 @@ public Date getLastUpdate() {
}

public List<Note> getAllNotes() {
if (conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE)) {
try {
reloadAllNotes();
} catch (IOException e) {
logger.error("Cannot reload notes from storage", e);
}
}
synchronized (notes) {
List<Note> noteList = new ArrayList<Note>(notes.values());
Collections.sort(noteList, new Comparator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObjectRegistry;
Expand All @@ -46,120 +48,161 @@

public class NotebookTest implements JobListenerFactory{

private File tmpDir;
private ZeppelinConfiguration conf;
private SchedulerFactory schedulerFactory;
private File notebookDir;
private Notebook notebook;
private NotebookRepo notebookRepo;
private File tmpDir;
private ZeppelinConfiguration conf;
private SchedulerFactory schedulerFactory;
private File notebookDir;
private Notebook notebook;
private NotebookRepo notebookRepo;
private InterpreterFactory factory;

@Before
public void setUp() throws Exception {
tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
tmpDir.mkdirs();
new File(tmpDir, "conf").mkdirs();
notebookDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis()+"/notebook");
notebookDir.mkdirs();
@Before
public void setUp() throws Exception {
tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
tmpDir.mkdirs();
new File(tmpDir, "conf").mkdirs();
notebookDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis()+"/notebook");
notebookDir.mkdirs();

System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");

conf = ZeppelinConfiguration.create();
conf = ZeppelinConfiguration.create();

this.schedulerFactory = new SchedulerFactory();
this.schedulerFactory = new SchedulerFactory();

MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");

factory = new InterpreterFactory(conf, new InterpreterOption(false), null);

notebookRepo = new VFSNotebookRepo(conf);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this);
}

@After
public void tearDown() throws Exception {
delete(tmpDir);
}

@Test
public void testSelectingReplImplementation() throws IOException {
Note note = notebook.createNote();
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());

// run with defatul repl
Paragraph p1 = note.addParagraph();
p1.setText("hello world");
note.run(p1.getId());
while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
assertEquals("repl1: hello world", p1.getResult().message());

// run with specific repl
Paragraph p2 = note.addParagraph();
p2.setText("%mock2 hello world");
note.run(p2.getId());
while(p2.isTerminated()==false || p2.getResult()==null) Thread.yield();
assertEquals("repl2: hello world", p2.getResult().message());
}

@Test
public void testPersist() throws IOException, SchedulerException{
Note note = notebook.createNote();

// run with default repl
Paragraph p1 = note.addParagraph();
p1.setText("hello world");
note.persist();

Notebook notebook2 = new Notebook(conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null), this);
assertEquals(1, notebook2.getAllNotes().size());
}

@Test
public void testRunAll() throws IOException {
Note note = notebook.createNote();
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this);
}

@After
public void tearDown() throws Exception {
delete(tmpDir);
}

@Test
public void testSelectingReplImplementation() throws IOException {
Note note = notebook.createNote();
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());

Paragraph p1 = note.addParagraph();
p1.setText("p1");
Paragraph p2 = note.addParagraph();
p2.setText("p2");
assertEquals(null, p2.getResult());
note.runAll();

while(p2.isTerminated()==false || p2.getResult()==null) Thread.yield();
assertEquals("repl1: p2", p2.getResult().message());
}

@Test
public void testSchedule() throws InterruptedException, IOException{
// create a note and a paragraph
Note note = notebook.createNote();
// run with defatul repl
Paragraph p1 = note.addParagraph();
p1.setText("hello world");
note.run(p1.getId());
while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
assertEquals("repl1: hello world", p1.getResult().message());

// run with specific repl
Paragraph p2 = note.addParagraph();
p2.setText("%mock2 hello world");
note.run(p2.getId());
while(p2.isTerminated()==false || p2.getResult()==null) Thread.yield();
assertEquals("repl2: hello world", p2.getResult().message());
}

@Test
public void testGetAllNotes() throws IOException {
// get all notes after copy the {notebookId}/note.json into notebookDir
File srcDir = new File("src/test/resources/2A94M5J1Z");
File destDir = new File(notebookDir.getAbsolutePath() + "/2A94M5J1Z");

try {
FileUtils.copyDirectory(srcDir, destDir);
} catch (IOException e) {
e.printStackTrace();
}

Note copiedNote = notebookRepo.get("2A94M5J1Z");

// when ZEPPELIN_NOTEBOOK_GET_FROM_REPO set to be false
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE.getVarName(), "false");
List<Note> notes = notebook.getAllNotes();
assertEquals(notes.size(), 0);

// when ZEPPELIN_NOTEBOOK_GET_FROM_REPO set to be true
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE.getVarName(), "true");
notes = notebook.getAllNotes();
assertEquals(notes.size(), 1);
assertEquals(notes.get(0).id(), copiedNote.id());
assertEquals(notes.get(0).getName(), copiedNote.getName());
assertEquals(notes.get(0).getParagraphs(), copiedNote.getParagraphs());

// get all notes after remove the {notebookId}/note.json from notebookDir
// when ZEPPELIN_NOTEBOOK_GET_FROM_REPO set to be false
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE.getVarName(), "false");
// delete the notebook
FileUtils.deleteDirectory(destDir);
notes = notebook.getAllNotes();
assertEquals(notes.size(), 1);

// when ZEPPELIN_NOTEBOOK_GET_FROM_REPO set to be true
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_RELOAD_FROM_STORAGE.getVarName(), "true");
notes = notebook.getAllNotes();
assertEquals(notes.size(), 0);
}

@Test
public void testPersist() throws IOException, SchedulerException{
Note note = notebook.createNote();

// run with default repl
Paragraph p1 = note.addParagraph();
p1.setText("hello world");
note.persist();

Notebook notebook2 = new Notebook(conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null), this);
assertEquals(1, notebook2.getAllNotes().size());
}

@Test
public void testRunAll() throws IOException {
Note note = notebook.createNote();
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());

Paragraph p = note.addParagraph();
p.setText("p1");
Date dateFinished = p.getDateFinished();
assertNull(dateFinished);

// set cron scheduler, once a second
Map<String, Object> config = note.getConfig();
config.put("cron", "* * * * * ?");
note.setConfig(config);
notebook.refreshCron(note.id());
Thread.sleep(1*1000);
dateFinished = p.getDateFinished();
assertNotNull(dateFinished);

// remove cron scheduler.
config.put("cron", null);
note.setConfig(config);
notebook.refreshCron(note.id());
Thread.sleep(1*1000);
assertEquals(dateFinished, p.getDateFinished());
}
Paragraph p1 = note.addParagraph();
p1.setText("p1");
Paragraph p2 = note.addParagraph();
p2.setText("p2");
assertEquals(null, p2.getResult());
note.runAll();

while(p2.isTerminated()==false || p2.getResult()==null) Thread.yield();
assertEquals("repl1: p2", p2.getResult().message());
}

@Test
public void testSchedule() throws InterruptedException, IOException{
// create a note and a paragraph
Note note = notebook.createNote();
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());

Paragraph p = note.addParagraph();
p.setText("p1");
Date dateFinished = p.getDateFinished();
assertNull(dateFinished);

// set cron scheduler, once a second
Map<String, Object> config = note.getConfig();
config.put("cron", "* * * * * ?");
note.setConfig(config);
notebook.refreshCron(note.id());
Thread.sleep(1*1000);
dateFinished = p.getDateFinished();
assertNotNull(dateFinished);

// remove cron scheduler.
config.put("cron", null);
note.setConfig(config);
notebook.refreshCron(note.id());
Thread.sleep(1*1000);
assertEquals(dateFinished, p.getDateFinished());
}

@Test
public void testAngularObjectRemovalOnNotebookRemove() throws InterruptedException,
Expand All @@ -184,7 +227,7 @@ public void testAngularObjectRemovalOnNotebookRemove() throws InterruptedExcepti
assertNull(registry.get("o1", note.id()));
// global object sould be remained
assertNotNull(registry.get("o2", null));
}
}

@Test
public void testAngularObjectRemovalOnInterpreterRestart() throws InterruptedException,
Expand Down Expand Up @@ -214,34 +257,34 @@ public void testAngularObjectRemovalOnInterpreterRestart() throws InterruptedExc
notebook.removeNote(note.id());
}

private void delete(File file){
if(file.isFile()) file.delete();
else if(file.isDirectory()){
File [] files = file.listFiles();
if(files!=null && files.length>0){
for(File f : files){
delete(f);
}
}
file.delete();
}
}

@Override
public JobListener getParagraphJobListener(Note note) {
return new JobListener(){

@Override
public void onProgressUpdate(Job job, int progress) {
}

@Override
public void beforeStatusChange(Job job, Status before, Status after) {
}

@Override
public void afterStatusChange(Job job, Status before, Status after) {
}
};
}
private void delete(File file){
if(file.isFile()) file.delete();
else if(file.isDirectory()){
File [] files = file.listFiles();
if(files!=null && files.length>0){
for(File f : files){
delete(f);
}
}
file.delete();
}
}

@Override
public JobListener getParagraphJobListener(Note note) {
return new JobListener(){

@Override
public void onProgressUpdate(Job job, int progress) {
}

@Override
public void beforeStatusChange(Job job, Status before, Status after) {
}

@Override
public void afterStatusChange(Job job, Status before, Status after) {
}
};
}
}
Loading