diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index 2b986337310..f7246b6c0e4 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -29,12 +29,15 @@ import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.rest.message.CronRequest; import org.apache.zeppelin.rest.message.InterpreterSettingListForNoteBind; import org.apache.zeppelin.rest.message.NewInterpreterSettingRequest; import org.apache.zeppelin.rest.message.NewNotebookRequest; import org.apache.zeppelin.server.JsonResponse; import org.apache.zeppelin.server.ZeppelinServer; import org.apache.zeppelin.socket.NotebookServer; +import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,11 +169,12 @@ public Response deleteNote(@PathParam("notebookId") String notebookId) throws IO notebookServer.broadcastNoteList(); return new JsonResponse(Status.OK, "").build(); } + /** * Clone note REST API * @param * @return JSON with status.CREATED - * @throws IOException + * @throws IOException, CloneNotSupportedException, IllegalArgumentException */ @POST @Path("{notebookId}") @@ -185,4 +189,194 @@ public Response cloneNote(@PathParam("notebookId") String notebookId, String mes notebookServer.broadcastNoteList(); return new JsonResponse(Status.CREATED, "", newNote.getId()).build(); } + + /** + * Run notebook jobs REST API + * @param + * @return JSON with status.ACCEPTED + * @throws IOException, IllegalArgumentException + */ + @POST + @Path("job/{notebookId}") + public Response runNoteJobs(@PathParam("notebookId") String notebookId) throws + IOException, IllegalArgumentException { + logger.info("run notebook jobs {} ", notebookId); + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + + note.runAll(); + return new JsonResponse(Status.ACCEPTED).build(); + } + + /** + * Stop(delete) notebook jobs REST API + * @param + * @return JSON with status.ACCEPTED + * @throws IOException, IllegalArgumentException + */ + @DELETE + @Path("job/{notebookId}") + public Response stopNoteJobs(@PathParam("notebookId") String notebookId) throws + IOException, IllegalArgumentException { + logger.info("stop notebook jobs {} ", notebookId); + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + + for (Paragraph p : note.getParagraphs()) { + if (!p.isTerminated()) { + p.abort(); + } + } + return new JsonResponse(Status.ACCEPTED).build(); + } + + /** + * Get notebook job status REST API + * @param + * @return JSON with status.OK + * @throws IOException, IllegalArgumentException + */ + @GET + @Path("job/{notebookId}") + public Response getNoteJobStatus(@PathParam("notebookId") String notebookId) throws + IOException, IllegalArgumentException { + logger.info("get notebook job status."); + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + + return new JsonResponse(Status.OK, null, note.generateParagraphsInfo()).build(); + } + + /** + * Run paragraph job REST API + * @param + * @return JSON with status.ACCEPTED + * @throws IOException, IllegalArgumentException + */ + @POST + @Path("job/{notebookId}/{paragraphId}") + public Response runParagraph(@PathParam("notebookId") String notebookId, + @PathParam("paragraphId") String paragraphId) throws + IOException, IllegalArgumentException { + logger.info("run paragraph job {} {} ", notebookId, paragraphId); + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + + if (note.getParagraph(paragraphId) == null) { + return new JsonResponse(Status.NOT_FOUND, "paragraph not found.").build(); + } + + note.run(paragraphId); + return new JsonResponse(Status.ACCEPTED).build(); + } + + /** + * Stop(delete) paragraph job REST API + * @param + * @return JSON with status.ACCEPTED + * @throws IOException, IllegalArgumentException + */ + @DELETE + @Path("job/{notebookId}/{paragraphId}") + public Response stopParagraph(@PathParam("notebookId") String notebookId, + @PathParam("paragraphId") String paragraphId) throws + IOException, IllegalArgumentException { + logger.info("stop paragraph job {} ", notebookId); + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + + Paragraph p = note.getParagraph(paragraphId); + if (p == null) { + return new JsonResponse(Status.NOT_FOUND, "paragraph not found.").build(); + } + p.abort(); + return new JsonResponse(Status.ACCEPTED).build(); + } + + /** + * Register cron job REST API + * @param message - JSON with cron expressions. + * @return JSON with status.ACCEPTED + * @throws IOException, IllegalArgumentException + */ + @POST + @Path("cron/{notebookId}") + public Response registerCronJob(@PathParam("notebookId") String notebookId, String message) throws + IOException, IllegalArgumentException { + logger.info("Register cron job note={} request cron msg={}", notebookId, message); + + CronRequest request = gson.fromJson(message, + CronRequest.class); + + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + + if (!CronExpression.isValidExpression(request.getCronString())) { + return new JsonResponse(Status.BAD_REQUEST, "wrong cron expressions.").build(); + } + + Map config = note.getConfig(); + config.put("cron", request.getCronString()); + note.setConfig(config); + notebook.refreshCron(note.id()); + + return new JsonResponse(Status.ACCEPTED).build(); + } + + /** + * Remove cron job REST API + * @param + * @return JSON with status.ACCEPTED + * @throws IOException, IllegalArgumentException + */ + @DELETE + @Path("cron/{notebookId}") + public Response removeCronJob(@PathParam("notebookId") String notebookId) throws + IOException, IllegalArgumentException { + logger.info("Remove cron job note {}", notebookId); + + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + + Map config = note.getConfig(); + config.put("cron", null); + note.setConfig(config); + notebook.refreshCron(note.id()); + + return new JsonResponse(Status.ACCEPTED).build(); + } + + /** + * Get cron job REST API + * @param + * @return JSON with status.ACCEPTED + * @throws IOException, IllegalArgumentException + */ + @GET + @Path("cron/{notebookId}") + public Response getCronJob(@PathParam("notebookId") String notebookId) throws + IOException, IllegalArgumentException { + logger.info("Get cron job note {}", notebookId); + + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + + return new JsonResponse(Status.ACCEPTED, note.getConfig().get("cron")).build(); + } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/CronRequest.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/CronRequest.java new file mode 100644 index 00000000000..5a339315e7d --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/CronRequest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rest.message; + +import java.util.Map; + +import org.apache.zeppelin.interpreter.InterpreterOption; + +/** + * CronRequest rest api request message + * + */ +public class CronRequest { + String cron; + + public CronRequest (){ + + } + + public String getCronString() { + return cron; + } +} diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 1895e16cd4c..79794e65ee9 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -393,6 +393,12 @@ protected Matcher isAllowed() { protected Matcher isCreated() { return responsesWith(201); } + protected Matcher isAccepted() { return responsesWith(202); } + + protected Matcher isBadRequest() { return responsesWith(400); } + + protected Matcher isNotFound() { return responsesWith(404); } + protected Matcher isNotAllowed() { return responsesWith(405); } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java index 8c8ff07af24..df7c50a998b 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java @@ -259,7 +259,7 @@ public void testCloneNotebook() throws IOException, CloneNotSupportedException, LOG.info("testCloneNotebook"); // Create note to clone Note note = ZeppelinServer.notebook.createNote(); - assertNotNull("cant create new note", note); + assertNotNull("can't create new note", note); note.setName("source note for clone"); Paragraph paragraph = note.addParagraph(); paragraph.setText("%md This is my new paragraph in my new note"); @@ -300,5 +300,80 @@ public void testListNotebooks() throws IOException { get.releaseConnection(); } + @Test + public void testNoteJobs() throws IOException, InterruptedException { + LOG.info("testNoteJobs"); + // Create note to run test. + Note note = ZeppelinServer.notebook.createNote(); + assertNotNull("can't create new note", note); + note.setName("note for run test"); + Paragraph paragraph = note.addParagraph(); + paragraph.setText("%md This is test paragraph."); + note.persist(); + String noteID = note.getId(); + + // Call Run Notebook Jobs REST API + PostMethod postNoteJobs = httpPost("/notebook/job/" + noteID, ""); + assertThat("test notebook jobs run:", postNoteJobs, isAccepted()); + postNoteJobs.releaseConnection(); + Thread.sleep(1000); + + // Call Stop Notebook Jobs REST API + DeleteMethod deleteNoteJobs = httpDelete("/notebook/job/" + noteID); + assertThat("test notebook stop:", deleteNoteJobs, isAccepted()); + deleteNoteJobs.releaseConnection(); + Thread.sleep(1000); + + // Call Run paragraph REST API + PostMethod postParagraph = httpPost("/notebook/job/" + noteID + "/" + paragraph.getId(), ""); + assertThat("test paragraph run:", postParagraph, isAccepted()); + postParagraph.releaseConnection(); + Thread.sleep(1000); + + // Call Stop paragraph REST API + DeleteMethod deleteParagraph = httpDelete("/notebook/job/" + noteID + "/" + paragraph.getId()); + assertThat("test paragraph stop:", deleteParagraph, isAccepted()); + deleteParagraph.releaseConnection(); + Thread.sleep(1000); + + //cleanup + ZeppelinServer.notebook.removeNote(note.getId()); + } + + @Test + public void testCronJobs() throws InterruptedException, IOException{ + // create a note and a paragraph + Note note = ZeppelinServer.notebook.createNote(); + + note.setName("note for run test"); + Paragraph paragraph = note.addParagraph(); + paragraph.setText("%md This is test paragraph."); + + String jsonRequest = "{\"cron\":\"* * * * * ?\" }"; + // right cron expression but not exist note. + PostMethod postCron = httpPost("/notebook/cron/notexistnote", jsonRequest); + assertThat("", postCron, isNotFound()); + postCron.releaseConnection(); + Thread.sleep(1000); + + // right cron expression. + postCron = httpPost("/notebook/cron/" + note.getId(), jsonRequest); + assertThat("", postCron, isAccepted()); + postCron.releaseConnection(); + Thread.sleep(1000); + + // wrong cron expression. + jsonRequest = "{\"cron\":\"a * * * * ?\" }"; + postCron = httpPost("/notebook/cron/" + note.getId(), jsonRequest); + assertThat("", postCron, isBadRequest()); + postCron.releaseConnection(); + Thread.sleep(1000); + + // remove cron job. + DeleteMethod deleteCron = httpDelete("/notebook/cron/" + note.getId()); + assertThat("", deleteCron, isAccepted()); + deleteCron.releaseConnection(); + ZeppelinServer.notebook.removeNote(note.getId()); + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 89a72b517c8..a9093608b4f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -294,6 +294,21 @@ public Paragraph getLastParagraph() { } } + public List> generateParagraphsInfo (){ + List> paragraphsInfo = new LinkedList<>(); + synchronized (paragraphs) { + for (Paragraph p : paragraphs) { + Map info = new HashMap<>(); + info.put("id", p.getId()); + info.put("status", p.getStatus().toString()); + info.put("started", p.getDateStarted().toString()); + info.put("finished", p.getDateFinished().toString()); + paragraphsInfo.add(info); + } + } + return paragraphsInfo; + } + /** * Run all paragraphs sequentially. *