diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index d9b93f247af..b916f73e470 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -68,6 +68,14 @@ public boolean isRunning() { public boolean isPending() { return this == PENDING; } + + public boolean isError() { + return this == ERROR; + } + + public boolean isFinished() { + return this == FINISHED; + } } private String jobName; diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index 2796500ac86..1b7daca45dd 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -52,9 +52,6 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import com.google.gson.GsonBuilder; -import com.google.gson.stream.JsonReader; -import java.io.StringReader; /** * Rest api endpoint for the noteBook. */ @@ -214,7 +211,7 @@ public Response getNotebook(@PathParam("notebookId") String notebookId) throws I /** * export note REST API - * + * * @param * @return note JSON with status.OK * @throws IOException @@ -228,7 +225,7 @@ public Response exportNoteBook(@PathParam("id") String noteId) throws IOExceptio /** * import new note REST API - * + * * @param req - notebook Json * @return JSON with new note ID * @throws IOException @@ -239,7 +236,7 @@ public Response importNotebook(String req) throws IOException { Note newNote = notebook.importNote(req, null); return new JsonResponse<>(Status.CREATED, "", newNote.getId()).build(); } - + /** * Create new note REST API * @param message - JSON with new note name @@ -259,6 +256,9 @@ public Response createNote(String message) throws IOException { Paragraph p = note.addParagraph(); p.setTitle(paragraphRequest.getTitle()); p.setText(paragraphRequest.getText()); + if (paragraphRequest.getSkipOnError()) { + p.getConfig().put("skipOnError", true); + } } } note.addParagraph(); // add one paragraph to the last @@ -292,7 +292,7 @@ public Response deleteNote(@PathParam("notebookId") String notebookId) throws IO notebookServer.broadcastNoteList(); return new JsonResponse<>(Status.OK, "").build(); } - + /** * Clone note REST API * @param @@ -451,7 +451,7 @@ public Response runNoteJobs(@PathParam("notebookId") String notebookId) throws if (note == null) { return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build(); } - + note.runAll(); return new JsonResponse<>(Status.OK).build(); } @@ -479,7 +479,7 @@ public Response stopNoteJobs(@PathParam("notebookId") String notebookId) throws } return new JsonResponse<>(Status.OK).build(); } - + /** * Get notebook job status REST API * @param @@ -498,10 +498,10 @@ public Response getNoteJobStatus(@PathParam("notebookId") String notebookId) thr return new JsonResponse<>(Status.OK, null, note.generateParagraphsInfo()).build(); } - + /** * Run paragraph job REST API - * + * * @param message - JSON with params if user wants to update dynamic form's value * null, empty string, empty json if user doesn't want to update * @@ -510,7 +510,7 @@ public Response getNoteJobStatus(@PathParam("notebookId") String notebookId) thr */ @POST @Path("job/{notebookId}/{paragraphId}") - public Response runParagraph(@PathParam("notebookId") String notebookId, + public Response runParagraph(@PathParam("notebookId") String notebookId, @PathParam("paragraphId") String paragraphId, String message) throws IOException, IllegalArgumentException { @@ -549,7 +549,7 @@ public Response runParagraph(@PathParam("notebookId") String notebookId, */ @DELETE @Path("job/{notebookId}/{paragraphId}") - public Response stopParagraph(@PathParam("notebookId") String notebookId, + public Response stopParagraph(@PathParam("notebookId") String notebookId, @PathParam("paragraphId") String paragraphId) throws IOException, IllegalArgumentException { LOG.info("stop paragraph job {} ", notebookId); @@ -565,7 +565,7 @@ public Response stopParagraph(@PathParam("notebookId") String notebookId, p.abort(); return new JsonResponse<>(Status.OK).build(); } - + /** * Register cron job REST API * @param message - JSON with cron expressions. @@ -580,12 +580,12 @@ public Response registerCronJob(@PathParam("notebookId") String notebookId, Stri CronRequest request = gson.fromJson(message, CronRequest.class); - + Note note = notebook.getNote(notebookId); if (note == null) { return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build(); } - + if (!CronExpression.isValidExpression(request.getCronString())) { return new JsonResponse<>(Status.BAD_REQUEST, "wrong cron expressions.").build(); } @@ -594,10 +594,10 @@ public Response registerCronJob(@PathParam("notebookId") String notebookId, Stri config.put("cron", request.getCronString()); note.setConfig(config); notebook.refreshCron(note.id()); - + return new JsonResponse<>(Status.OK).build(); } - + /** * Remove cron job REST API * @param @@ -614,15 +614,15 @@ public Response removeCronJob(@PathParam("notebookId") String notebookId) throws if (note == null) { return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build(); } - + Map config = note.getConfig(); config.put("cron", null); note.setConfig(config); notebook.refreshCron(note.id()); - + return new JsonResponse<>(Status.OK).build(); - } - + } + /** * Get cron job REST API * @param @@ -639,9 +639,9 @@ public Response getCronJob(@PathParam("notebookId") String notebookId) throws if (note == null) { return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build(); } - + return new JsonResponse<>(Status.OK, note.getConfig().get("cron")).build(); - } + } /** * Search for a Notes diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewParagraphRequest.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewParagraphRequest.java index bde920b8337..5e138ebad60 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewParagraphRequest.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewParagraphRequest.java @@ -25,6 +25,7 @@ public class NewParagraphRequest { String title; String text; + boolean skipOnError; Double index; public NewParagraphRequest() { @@ -39,6 +40,10 @@ public String getText() { return text; } + public boolean getSkipOnError() { + return skipOnError; + } + public Double getIndex() { return index; } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java index 913e184619e..f21422e21f6 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java @@ -52,6 +52,8 @@ public static enum OP { IMPORT_NOTE, // [c-s] import notebook // @param object notebook NOTE_UPDATE, + RUN_NOTE, // [c-s] run notebook + // @param id note id RUN_PARAGRAPH, // [c-s] run paragraph // @param id paragraph id @@ -100,7 +102,7 @@ public static enum OP { ANGULAR_OBJECT_UPDATE, // [s-c] add/update angular object ANGULAR_OBJECT_REMOVE, // [s-c] add angular object del - + ANGULAR_OBJECT_UPDATED, // [c-s] angular object value updated, ANGULAR_OBJECT_CLIENT_BIND, // [c-s] angular object updated from AngularJS z object diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 9412d7198d7..8d3add68831 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -20,33 +20,40 @@ import java.io.IOException; import java.net.URISyntaxException; import java.net.UnknownHostException; -import java.util.*; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import javax.servlet.http.HttpServletRequest; -import com.google.common.base.Strings; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; - import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; -import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; -import org.apache.zeppelin.notebook.*; +import org.apache.zeppelin.notebook.JobListenerFactory; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.notebook.NotebookAuthorization; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.notebook.ParagraphJobListener; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.server.ZeppelinServer; import org.apache.zeppelin.socket.Message.OP; import org.apache.zeppelin.ticket.TicketContainer; +import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.utils.SecurityUtils; import org.eclipse.jetty.websocket.WebSocket; import org.eclipse.jetty.websocket.WebSocketServlet; @@ -54,6 +61,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Strings; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + /** * Zeppelin websocket service. * @@ -107,7 +118,7 @@ public void onMessage(NotebookSocket conn, String msg) { if (LOG.isTraceEnabled()) { LOG.trace("RECEIVE MSG = " + messagereceived); } - + String ticket = TicketContainer.instance.getTicket(messagereceived.principal); if (ticket != null && !ticket.equals(messagereceived.ticket)) throw new Exception("Invalid ticket " + messagereceived.ticket + " != " + ticket); @@ -155,6 +166,9 @@ public void onMessage(NotebookSocket conn, String msg) { case IMPORT_NOTE: importNote(conn, userAndRoles, notebook, messagereceived); break; + case RUN_NOTE: + runNote(conn, userAndRoles, notebook, messagereceived); + break; case COMMIT_PARAGRAPH: updateParagraph(conn, userAndRoles, notebook, messagereceived); break; @@ -530,6 +544,25 @@ private void removeNote(NotebookSocket conn, HashSet userAndRoles, broadcastNoteList(); } + private void runNote(NotebookSocket conn, HashSet userAndRoles, + Notebook notebook, Message fromMessage) + throws IOException { + String noteId = (String) fromMessage.get("id"); + if (noteId == null) { + return; + } + + Note note = notebook.getNote(noteId); + NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); + if (!notebookAuthorization.isOwner(noteId, userAndRoles)) { + permissionError(conn, "remove", userAndRoles, notebookAuthorization.getOwners(noteId)); + return; + } + + note.runAll(); + broadcastNoteList(); + } + private void updateParagraph(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); @@ -1082,6 +1115,8 @@ public void beforeStatusChange(Job job, Status before, Status after) { @Override public void afterStatusChange(Job job, Status before, Status after) { + note.setExecutionStatus(job.getId(), after); + if (after == Status.ERROR) { if (job.getException() != null) { LOG.error("Error", job.getException()); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java index d1135c052fb..e51e96cf440 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java @@ -239,7 +239,7 @@ public void testDisableParagraphRunButton() throws Exception { driver.findElement(By.xpath(getParagraphXPath(1) + "//span[@class='icon-control-play']")).isDisplayed(), CoreMatchers.equalTo(false) ); - driver.findElement(By.xpath(".//*[@id='main']//button[@ng-click='runNote()']")).sendKeys(Keys.ENTER); + driver.findElement(By.xpath(".//*[@id='main']//button[@ng-click='runNote(note.id)']")).sendKeys(Keys.ENTER); sleep(1000, true); driver.findElement(By.xpath("//div[@class='modal-dialog'][contains(.,'Run all paragraphs?')]" + "//div[@class='modal-footer']//button[contains(.,'OK')]")).click(); diff --git a/zeppelin-web/src/app/notebook/notebook-actionBar.html b/zeppelin-web/src/app/notebook/notebook-actionBar.html index f371bd318a8..85e328dc1a8 100644 --- a/zeppelin-web/src/app/notebook/notebook-actionBar.html +++ b/zeppelin-web/src/app/notebook/notebook-actionBar.html @@ -19,7 +19,7 @@