diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index 2b986337310..e7e185381c0 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -29,6 +29,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.rest.message.InterpreterSettingListForNoteBind; import org.apache.zeppelin.rest.message.NewInterpreterSettingRequest; import org.apache.zeppelin.rest.message.NewNotebookRequest; @@ -166,11 +167,12 @@ public Response deleteNote(@PathParam("notebookId") String notebookId) throws IO notebookServer.broadcastNoteList(); return new JsonResponse(Status.OK, "").build(); } + /** * Clone note REST API * @param * @return JSON with status.CREATED - * @throws IOException + * @throws IOException, CloneNotSupportedException, IllegalArgumentException */ @POST @Path("{notebookId}") @@ -185,4 +187,117 @@ public Response cloneNote(@PathParam("notebookId") String notebookId, String mes notebookServer.broadcastNoteList(); return new JsonResponse(Status.CREATED, "", newNote.getId()).build(); } + + /** + * Run notebook jobs REST API + * @param + * @return JSON with status.ACCEPTED + * @throws IOException, IllegalArgumentException + */ + @POST + @Path("job/{notebookId}") + public Response runNoteJobs(@PathParam("notebookId") String notebookId) throws + IOException, IllegalArgumentException { + logger.info("run notebook jobs {} ", notebookId); + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + + note.runAll(); + return new JsonResponse(Status.ACCEPTED).build(); + } + + /** + * Stop(delete) notebook jobs REST API + * @param + * @return JSON with status.ACCEPTED + * @throws IOException, IllegalArgumentException + */ + @DELETE + @Path("job/{notebookId}") + public Response stopNoteJobs(@PathParam("notebookId") String notebookId) throws + IOException, IllegalArgumentException { + logger.info("stop notebook jobs {} ", notebookId); + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + + for (Paragraph p : note.getParagraphs()) { + if (!p.isTerminated()) { + p.abort(); + } + } + return new JsonResponse(Status.ACCEPTED).build(); + } + + /** + * Get notebook job status REST API + * @param + * @return JSON with status.OK + * @throws IOException, IllegalArgumentException + */ + @GET + @Path("job/{notebookId}") + public Response getNoteJobStatus(@PathParam("notebookId") String notebookId) throws + IOException, IllegalArgumentException { + logger.info("get notebook job status."); + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + + return new JsonResponse(Status.OK, null, note.generateParagraphsInfo()).build(); + } + + /** + * Run paragraph job REST API + * @param + * @return JSON with status.ACCEPTED + * @throws IOException, IllegalArgumentException + */ + @POST + @Path("job/{notebookId}/{paragraphId}") + public Response runParagraph(@PathParam("notebookId") String notebookId, + @PathParam("paragraphId") String paragraphId) throws + IOException, IllegalArgumentException { + logger.info("run paragraph job {} {} ", notebookId, paragraphId); + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + + if (note.getParagraph(paragraphId) == null) { + return new JsonResponse(Status.NOT_FOUND, "paragraph not found.").build(); + } + + note.run(paragraphId); + return new JsonResponse(Status.ACCEPTED).build(); + } + + /** + * Stop(delete) paragraph job REST API + * @param + * @return JSON with status.ACCEPTED + * @throws IOException, IllegalArgumentException + */ + @DELETE + @Path("job/{notebookId}/{paragraphId}") + public Response stopParagraph(@PathParam("notebookId") String notebookId, + @PathParam("paragraphId") String paragraphId) throws + IOException, IllegalArgumentException { + logger.info("stop paragraph job {} ", notebookId); + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + + Paragraph p = note.getParagraph(paragraphId); + if (p == null) { + return new JsonResponse(Status.NOT_FOUND, "paragraph not found.").build(); + } + p.abort(); + return new JsonResponse(Status.ACCEPTED).build(); + } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 1895e16cd4c..0c20da065b8 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -393,6 +393,8 @@ protected Matcher isAllowed() { protected Matcher isCreated() { return responsesWith(201); } + protected Matcher isAccepted() { return responsesWith(202); } + protected Matcher isNotAllowed() { return responsesWith(405); } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java index 8c8ff07af24..f0cff3c99c9 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java @@ -259,7 +259,7 @@ public void testCloneNotebook() throws IOException, CloneNotSupportedException, LOG.info("testCloneNotebook"); // Create note to clone Note note = ZeppelinServer.notebook.createNote(); - assertNotNull("cant create new note", note); + assertNotNull("can't create new note", note); note.setName("source note for clone"); Paragraph paragraph = note.addParagraph(); paragraph.setText("%md This is my new paragraph in my new note"); @@ -300,5 +300,49 @@ public void testListNotebooks() throws IOException { get.releaseConnection(); } + @Test + public void testNoteJobs() throws IOException, InterruptedException { + LOG.info("testNoteJobs"); + // Create note to run test. + Note note = ZeppelinServer.notebook.createNote(); + assertNotNull("can't create new note", note); + note.setName("note for run test"); + Paragraph paragraph = note.addParagraph(); + paragraph.setText("%md This is test paragraph."); + note.persist(); + String noteID = note.getId(); + + // Call Run Notebook Jobs REST API + PostMethod postNoteJobs = httpPost("/notebook/job/" + noteID, ""); + assertThat("test notebook jobs run:", postNoteJobs, isAccepted()); + postNoteJobs.releaseConnection(); + + // wait until job is finished or timeout. + int timeout = 1; + while (paragraph.getStatus() == Status.PENDING || paragraph.isTerminated()) { + Thread.sleep(1000); + if (timeout++ > 10) { + LOG.info("testNoteJobs timeout job."); + break; + } + } + // Call Stop Notebook Jobs REST API + DeleteMethod deleteNoteJobs = httpDelete("/notebook/job/" + noteID); + assertThat("test notebook stop:", deleteNoteJobs, isAccepted()); + deleteNoteJobs.releaseConnection(); + + // Call Run paragraph REST API + PostMethod postParagraph = httpPost("/notebook/job/" + noteID + "/" + paragraph.getId(), ""); + assertThat("test paragraph run:", postParagraph, isAccepted()); + postParagraph.releaseConnection(); + + // Call Stop paragraph REST API + DeleteMethod deleteParagraph = httpDelete("/notebook/job/" + noteID + "/" + paragraph.getId()); + assertThat("test paragraph stop:", deleteParagraph, isAccepted()); + deleteParagraph.releaseConnection(); + + //cleanup + ZeppelinServer.notebook.removeNote(note.getId()); + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 89a72b517c8..a9093608b4f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -294,6 +294,21 @@ public Paragraph getLastParagraph() { } } + public List> generateParagraphsInfo (){ + List> paragraphsInfo = new LinkedList<>(); + synchronized (paragraphs) { + for (Paragraph p : paragraphs) { + Map info = new HashMap<>(); + info.put("id", p.getId()); + info.put("status", p.getStatus().toString()); + info.put("started", p.getDateStarted().toString()); + info.put("finished", p.getDateFinished().toString()); + paragraphsInfo.add(info); + } + } + return paragraphsInfo; + } + /** * Run all paragraphs sequentially. *