Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ public boolean isRunning() {
public boolean isPending() {
return this == PENDING;
}

public boolean isError() {
return this == ERROR;
}

public boolean isFinished() {
return this == FINISHED;
}
}

private String jobName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import java.io.StringReader;
/**
* Rest api endpoint for the noteBook.
*/
Expand Down Expand Up @@ -214,7 +211,7 @@ public Response getNotebook(@PathParam("notebookId") String notebookId) throws I

/**
* export note REST API
*
*
* @param
* @return note JSON with status.OK
* @throws IOException
Expand All @@ -228,7 +225,7 @@ public Response exportNoteBook(@PathParam("id") String noteId) throws IOExceptio

/**
* import new note REST API
*
*
* @param req - notebook Json
* @return JSON with new note ID
* @throws IOException
Expand All @@ -239,7 +236,7 @@ public Response importNotebook(String req) throws IOException {
Note newNote = notebook.importNote(req, null);
return new JsonResponse<>(Status.CREATED, "", newNote.getId()).build();
}

/**
* Create new note REST API
* @param message - JSON with new note name
Expand All @@ -259,6 +256,9 @@ public Response createNote(String message) throws IOException {
Paragraph p = note.addParagraph();
p.setTitle(paragraphRequest.getTitle());
p.setText(paragraphRequest.getText());
if (paragraphRequest.getSkipOnError()) {
p.getConfig().put("skipOnError", true);
}
}
}
note.addParagraph(); // add one paragraph to the last
Expand Down Expand Up @@ -292,7 +292,7 @@ public Response deleteNote(@PathParam("notebookId") String notebookId) throws IO
notebookServer.broadcastNoteList();
return new JsonResponse<>(Status.OK, "").build();
}

/**
* Clone note REST API
* @param
Expand Down Expand Up @@ -451,7 +451,7 @@ public Response runNoteJobs(@PathParam("notebookId") String notebookId) throws
if (note == null) {
return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build();
}

note.runAll();
return new JsonResponse<>(Status.OK).build();
}
Expand Down Expand Up @@ -479,7 +479,7 @@ public Response stopNoteJobs(@PathParam("notebookId") String notebookId) throws
}
return new JsonResponse<>(Status.OK).build();
}

/**
* Get notebook job status REST API
* @param
Expand All @@ -498,10 +498,10 @@ public Response getNoteJobStatus(@PathParam("notebookId") String notebookId) thr

return new JsonResponse<>(Status.OK, null, note.generateParagraphsInfo()).build();
}

/**
* Run paragraph job REST API
*
*
* @param message - JSON with params if user wants to update dynamic form's value
* null, empty string, empty json if user doesn't want to update
*
Expand All @@ -510,7 +510,7 @@ public Response getNoteJobStatus(@PathParam("notebookId") String notebookId) thr
*/
@POST
@Path("job/{notebookId}/{paragraphId}")
public Response runParagraph(@PathParam("notebookId") String notebookId,
public Response runParagraph(@PathParam("notebookId") String notebookId,
@PathParam("paragraphId") String paragraphId,
String message) throws
IOException, IllegalArgumentException {
Expand Down Expand Up @@ -549,7 +549,7 @@ public Response runParagraph(@PathParam("notebookId") String notebookId,
*/
@DELETE
@Path("job/{notebookId}/{paragraphId}")
public Response stopParagraph(@PathParam("notebookId") String notebookId,
public Response stopParagraph(@PathParam("notebookId") String notebookId,
@PathParam("paragraphId") String paragraphId) throws
IOException, IllegalArgumentException {
LOG.info("stop paragraph job {} ", notebookId);
Expand All @@ -565,7 +565,7 @@ public Response stopParagraph(@PathParam("notebookId") String notebookId,
p.abort();
return new JsonResponse<>(Status.OK).build();
}

/**
* Register cron job REST API
* @param message - JSON with cron expressions.
Expand All @@ -580,12 +580,12 @@ public Response registerCronJob(@PathParam("notebookId") String notebookId, Stri

CronRequest request = gson.fromJson(message,
CronRequest.class);

Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build();
}

if (!CronExpression.isValidExpression(request.getCronString())) {
return new JsonResponse<>(Status.BAD_REQUEST, "wrong cron expressions.").build();
}
Expand All @@ -594,10 +594,10 @@ public Response registerCronJob(@PathParam("notebookId") String notebookId, Stri
config.put("cron", request.getCronString());
note.setConfig(config);
notebook.refreshCron(note.id());

return new JsonResponse<>(Status.OK).build();
}

/**
* Remove cron job REST API
* @param
Expand All @@ -614,15 +614,15 @@ public Response removeCronJob(@PathParam("notebookId") String notebookId) throws
if (note == null) {
return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build();
}

Map<String, Object> config = note.getConfig();
config.put("cron", null);
note.setConfig(config);
notebook.refreshCron(note.id());

return new JsonResponse<>(Status.OK).build();
}
}

/**
* Get cron job REST API
* @param
Expand All @@ -639,9 +639,9 @@ public Response getCronJob(@PathParam("notebookId") String notebookId) throws
if (note == null) {
return new JsonResponse<>(Status.NOT_FOUND, "note not found.").build();
}

return new JsonResponse<>(Status.OK, note.getConfig().get("cron")).build();
}
}

/**
* Search for a Notes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public class NewParagraphRequest {
String title;
String text;
boolean skipOnError;
Double index;

public NewParagraphRequest() {
Expand All @@ -39,6 +40,10 @@ public String getText() {
return text;
}

public boolean getSkipOnError() {
return skipOnError;
}

public Double getIndex() {
return index;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public static enum OP {
IMPORT_NOTE, // [c-s] import notebook
// @param object notebook
NOTE_UPDATE,
RUN_NOTE, // [c-s] run notebook
// @param id note id

RUN_PARAGRAPH, // [c-s] run paragraph
// @param id paragraph id
Expand Down Expand Up @@ -100,7 +102,7 @@ public static enum OP {

ANGULAR_OBJECT_UPDATE, // [s-c] add/update angular object
ANGULAR_OBJECT_REMOVE, // [s-c] add angular object del

ANGULAR_OBJECT_UPDATED, // [c-s] angular object value updated,

ANGULAR_OBJECT_CLIENT_BIND, // [c-s] angular object updated from AngularJS z object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,51 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

import javax.servlet.http.HttpServletRequest;

import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.notebook.JobListenerFactory;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.NotebookAuthorization;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.notebook.ParagraphJobListener;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
import org.apache.zeppelin.socket.Message.OP;
import org.apache.zeppelin.ticket.TicketContainer;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.utils.SecurityUtils;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketServlet;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

/**
* Zeppelin websocket service.
*
Expand Down Expand Up @@ -107,7 +118,7 @@ public void onMessage(NotebookSocket conn, String msg) {
if (LOG.isTraceEnabled()) {
LOG.trace("RECEIVE MSG = " + messagereceived);
}

String ticket = TicketContainer.instance.getTicket(messagereceived.principal);
if (ticket != null && !ticket.equals(messagereceived.ticket))
throw new Exception("Invalid ticket " + messagereceived.ticket + " != " + ticket);
Expand Down Expand Up @@ -155,6 +166,9 @@ public void onMessage(NotebookSocket conn, String msg) {
case IMPORT_NOTE:
importNote(conn, userAndRoles, notebook, messagereceived);
break;
case RUN_NOTE:
runNote(conn, userAndRoles, notebook, messagereceived);
break;
case COMMIT_PARAGRAPH:
updateParagraph(conn, userAndRoles, notebook, messagereceived);
break;
Expand Down Expand Up @@ -530,6 +544,25 @@ private void removeNote(NotebookSocket conn, HashSet<String> userAndRoles,
broadcastNoteList();
}

private void runNote(NotebookSocket conn, HashSet<String> userAndRoles,
Notebook notebook, Message fromMessage)
throws IOException {
String noteId = (String) fromMessage.get("id");
if (noteId == null) {
return;
}

Note note = notebook.getNote(noteId);
NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
if (!notebookAuthorization.isOwner(noteId, userAndRoles)) {
permissionError(conn, "remove", userAndRoles, notebookAuthorization.getOwners(noteId));
return;
}

note.runAll();
broadcastNoteList();
}

private void updateParagraph(NotebookSocket conn, HashSet<String> userAndRoles,
Notebook notebook, Message fromMessage) throws IOException {
String paragraphId = (String) fromMessage.get("id");
Expand Down Expand Up @@ -1082,6 +1115,8 @@ public void beforeStatusChange(Job job, Status before, Status after) {

@Override
public void afterStatusChange(Job job, Status before, Status after) {
note.setExecutionStatus(job.getId(), after);

if (after == Status.ERROR) {
if (job.getException() != null) {
LOG.error("Error", job.getException());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public void testDisableParagraphRunButton() throws Exception {
driver.findElement(By.xpath(getParagraphXPath(1) + "//span[@class='icon-control-play']")).isDisplayed(), CoreMatchers.equalTo(false)
);

driver.findElement(By.xpath(".//*[@id='main']//button[@ng-click='runNote()']")).sendKeys(Keys.ENTER);
driver.findElement(By.xpath(".//*[@id='main']//button[@ng-click='runNote(note.id)']")).sendKeys(Keys.ENTER);
sleep(1000, true);
driver.findElement(By.xpath("//div[@class='modal-dialog'][contains(.,'Run all paragraphs?')]" +
"//div[@class='modal-footer']//button[contains(.,'OK')]")).click();
Expand Down
2 changes: 1 addition & 1 deletion zeppelin-web/src/app/notebook/notebook-actionBar.html
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ <h3>
<span class="labelBtn btn-group">
<button type="button"
class="btn btn-default btn-xs"
ng-click="runNote()"
ng-click="runNote(note.id)"
ng-class="{'disabled':isNoteRunning()}"
tooltip-placement="bottom" tooltip="Run all paragraphs">
<i class="icon-control-play"></i>
Expand Down
12 changes: 6 additions & 6 deletions zeppelin-web/src/app/notebook/notebook.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,17 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl',
document.getElementById('note.checkpoint.message').value='';
};

$scope.runNote = function() {
$scope.runNote = function(noteId) {
BootstrapDialog.confirm({
closable: true,
title: '',
message: 'Run all paragraphs?',
callback: function(result) {
if (result) {
_.forEach($scope.note.paragraphs, function (n, key) {
angular.element('#' + n.id + '_paragraphColumn_main').scope().runParagraph(n.text);
});
}
_.forEach($scope.note.paragraphs, function(n, key) {
angular.element('#' + n.id + '_paragraphColumn_main').scope().saveParagraph();
});

websocketMsgSrv.runNote(noteId);
}
});
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@
{{paragraph.config.enabled ? "Disable" : "Enable"}} run</a>
</li>
<li>
<a ng-click="toggleSkipOnError()"><span class="icon-control-forward"></span>
{{paragraph.config.skipOnError ? "Disable" : "Enable"}} Skip On Error</a>
</li>
<li>
<a ng-click="goToSingleParagraph()"><span class="icon-share-alt"></span> Link this paragraph</a>
</li>
<li>
Expand Down
Loading