Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@
import org.apache.zeppelin.util.WatcherSecurityKey;
import org.apache.zeppelin.utils.InterpreterBindingUtils;
import org.apache.zeppelin.utils.SecurityUtils;
import org.apache.zeppelin.workflow.WorkflowJob;
import org.apache.zeppelin.workflow.WorkflowJobItem;
import org.apache.zeppelin.workflow.WorkflowManager;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.quartz.SchedulerException;
Expand Down Expand Up @@ -104,6 +107,7 @@ String getKey() {
Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create();
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
public WorkflowManager workflowManager = new WorkflowManager();
final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();

/**
Expand Down Expand Up @@ -1306,6 +1310,50 @@ private void runParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Not
Map<String, Object> params = (Map<String, Object>) fromMessage
.get("params");
p.settings.setParams(params);

List<Map<String, String>> workflow = (List<Map<String, String>>) fromMessage.get("workflowJob");

if (workflow != null && workflow.size() > 0) {
synchronized (workflowManager) {
workflowManager.setWorkflow(noteId, noteId, paragraphId);
WorkflowJob workflowJob = workflowManager.getWorkflow(noteId);
WorkflowJobItem parentItem = null;
WorkflowJobItem newJob;
for (Map<String, String> workflowJobItem : workflow) {
parentItem = workflowJob.getWorkflowJobItemLast();
String newJobNotebookId = workflowJobItem.get("notebookId");
String newJobParagraphId = workflowJobItem.get("paragraphId");

if (parentItem == null || newJobNotebookId == null || newJobParagraphId == null) {
continue;
}

Note noteCheckObject = notebook.getNote(newJobNotebookId);

if (noteCheckObject == null) {
continue;
}

if (noteCheckObject.getParagraph(newJobParagraphId) == null) {
if (newJobParagraphId.equals("*") == false) {
continue;
}
// run to all paragraph in note.
List<Paragraph> allParagaraphsInNote = noteCheckObject.getParagraphs();
for (Paragraph jobParagraph : allParagaraphsInNote) {
newJob = new WorkflowJobItem(newJobNotebookId, jobParagraph.getId());
parentItem.setOnSuccessJob(newJob);
parentItem = workflowJob.getWorkflowJobItemLast();
}
} else {
// run to each paragraph.
newJob = new WorkflowJobItem(newJobNotebookId, newJobParagraphId);
parentItem.setOnSuccessJob(newJob);
}
}
}
}

Map<String, Object> config = (Map<String, Object>) fromMessage
.get("config");
p.setConfig(config);
Expand Down Expand Up @@ -1616,6 +1664,7 @@ public void beforeStatusChange(Job job, Status before, Status after) {

@Override
public void afterStatusChange(Job job, Status before, Status after) {

if (after == Status.ERROR) {
if (job.getException() != null) {
LOG.error("Error", job.getException());
Expand All @@ -1631,6 +1680,42 @@ public void afterStatusChange(Job job, Status before, Status after) {
LOG.error(e.toString(), e);
}
}

notebookServer.workflowManager.setJobNotify(after, note.getId(), job.getId());

if (!after.isPending() && !after.isRunning()) {
Map<String, List<String>> waitJobLists;
WorkflowManager workflowManager = notebookServer.workflowManager;
synchronized (workflowManager) {
waitJobLists = workflowManager.getNextJob(note.getId(), job.getId());
if (waitJobLists != null) {
for (String notebookId : waitJobLists.keySet()) {
String paragraphId = waitJobLists.get(notebookId).get(0);
Note nextNote = notebookServer.notebook().getNote(notebookId);

if (nextNote == null) {
workflowManager.setJobNotify(Status.ERROR, notebookId, paragraphId);
continue;
}

Paragraph paragraph = nextNote.getParagraph(paragraphId);
if (paragraph == null) {
workflowManager.setJobNotify(Status.ERROR, notebookId, paragraphId);
continue;
}

try {
nextNote.persist(null);
} catch (IOException e) {
LOG.error(e.toString(), e);
}
nextNote.run(paragraphId);
workflowManager.setJobNotify(Status.RUNNING, notebookId, paragraphId);
}
}
}
}

notebookServer.broadcastNote(note);

try {
Expand All @@ -1652,7 +1737,6 @@ public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String ou
.put("noteId", paragraph.getNote().getId())
.put("paragraphId", paragraph.getId())
.put("data", output);

notebookServer.broadcast(paragraph.getNote().getId(), msg);
}

Expand All @@ -1669,6 +1753,9 @@ public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String ou
.put("paragraphId", paragraph.getId())
.put("data", output);

LOG.info("append updated {} note {} para {}",
paragraph.getStatus(), paragraph.getNote().getId(), paragraph.getId());

notebookServer.broadcast(paragraph.getNote().getId(), msg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,65 @@
<form id="{{paragraph.id}}_form" role="form"
ng-show="!paragraph.config.tableHide"
class=" paragraphForm form-horizontal row">
<!-- workflow -->
<div ng-if="paragraph.settings.forms[formulaire.name].type == 'workflow'"
class="form-group col-sm-12 col-md-12 col-lg-12"
ng-repeat="formulaire in paragraph.settings.forms"
ng-init="loadForm(formulaire, paragraph.settings.params)">
<div>
<label
ng-if="paragraph.settings.forms[formulaire.name].defaultValue == ''"
class="control-label input-sm"
ng-class="{'disable': paragraph.status == 'RUNNING' || paragraph.status == 'PENDING' }"
>
{{formulaire.name}}
</label>
<label
ng-if="paragraph.settings.forms[formulaire.name].defaultValue != ''"
class="control-label input-sm"
ng-class="{'disable': paragraph.status == 'RUNNING' || paragraph.status == 'PENDING' }"
>
{{paragraph.settings.forms[formulaire.name].defaultValue}}
</label>
</div>
<div>
<workflow-widget
workflow-job-input-data="paragraph.settings.forms.workflow.options"
workflow-job-result-data="paragraph.settings.workflowJob">
</workflow-widget>
</div>
</div>
<!-- dynamic form -->
<div ng-if="paragraph.settings.forms[formulaire.name].type !== 'workflow'"
<div class="form-group col-sm-6 col-md-6 col-lg-4"
ng-repeat="formulaire in paragraph.settings.forms | toArray | orderBy:'name.toString()'"
ng-init="loadForm(formulaire, paragraph.settings.params)">
<label class="control-label input-sm" ng-class="{'disable': paragraph.status == 'RUNNING' || paragraph.status == 'PENDING' }">{{formulaire.name}}</label>
<div>

<input class="form-control input-sm"
ng-if="!paragraph.settings.forms[formulaire.name].options"
ng-enter="runParagraph(getEditorValue())"
ng-model="paragraph.settings.params[formulaire.name]"
ng-class="{'disable': paragraph.status == 'RUNNING' || paragraph.status == 'PENDING' }"
name="{{formulaire.name}}" />
<input class="form-control input-sm"
ng-if="!paragraph.settings.forms[formulaire.name].options"
ng-enter="runParagraph(getEditorValue())"
ng-model="paragraph.settings.params[formulaire.name]"
ng-class="{'disable': paragraph.status == 'RUNNING' || paragraph.status == 'PENDING' }"
name="{{formulaire.name}}" />

<select class="form-control input-sm"
ng-if="paragraph.settings.forms[formulaire.name].options && paragraph.settings.forms[formulaire.name].type != 'checkbox'"
ng-change="runParagraph(getEditorValue())"
ng-model="paragraph.settings.params[formulaire.name]"
ng-class="{'disable': paragraph.status == 'RUNNING' || paragraph.status == 'PENDING' }"
name="{{formulaire.name}}"
ng-options="option.value as (option.displayName||option.value) for option in paragraph.settings.forms[formulaire.name].options">
</select>

<div ng-if="paragraph.settings.forms[formulaire.name].type == 'checkbox'">
<label ng-repeat="option in paragraph.settings.forms[formulaire.name].options"
class="checkbox-item input-sm">
<input type="checkbox"
ng-class="{'disable': paragraph.status == 'RUNNING' || paragraph.status == 'PENDING' }"
ng-checked="paragraph.settings.params[formulaire.name].indexOf(option.value) > -1"
ng-click="toggleCheckbox(formulaire, option, false)"/>{{option.displayName||option.value}}
</label>
</div>
<select class="form-control input-sm"
ng-if="paragraph.settings.forms[formulaire.name].options && paragraph.settings.forms[formulaire.name].type != 'checkbox'"
ng-change="runParagraph(getEditorValue())"
ng-model="paragraph.settings.params[formulaire.name]"
ng-class="{'disable': paragraph.status == 'RUNNING' || paragraph.status == 'PENDING' }"
name="{{formulaire.name}}"
ng-options="option.value as (option.displayName||option.value) for option in paragraph.settings.forms[formulaire.name].options">
</select>

<div ng-if="paragraph.settings.forms[formulaire.name].type == 'checkbox'">
<label ng-repeat="option in paragraph.settings.forms[formulaire.name].options"
class="checkbox-item input-sm">
<input type="checkbox"
ng-class="{'disable': paragraph.status == 'RUNNING' || paragraph.status == 'PENDING' }"
ng-checked="paragraph.settings.params[formulaire.name].indexOf(option.value) > -1"
ng-click="toggleCheckbox(formulaire, option, false)"/>{{option.displayName||option.value}}
</label>
</div>

</div>
</form>
82 changes: 81 additions & 1 deletion zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* limitations under the License.
*/
'use strict';

(function() {

angular.module('zeppelinWebApp').controller('ParagraphCtrl', ParagraphCtrl);
Expand All @@ -36,6 +37,7 @@
function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $location,
$timeout, $compile, $http, $q, websocketMsgSrv,
baseUrlSrv, ngToast, saveAsService) {

var ANGULAR_FUNCTION_OBJECT_NAME_PREFIX = '_Z_ANGULAR_FUNC_';
$scope.parentNote = null;
$scope.paragraph = null;
Expand All @@ -58,7 +60,7 @@
if (filtered.length === 1) {
var paragraph = filtered[0];
websocketMsgSrv.runParagraph(paragraph.id, paragraph.title, paragraph.text,
paragraph.config, paragraph.settings.params);
paragraph.config, paragraph.settings.params, paragraph.settings.workflowJob);
} else {
ngToast.danger({content: 'Cannot find a paragraph with id \'' + paragraphId + '\'',
verticalPosition: 'top', dismissOnTimeout: false});
Expand Down Expand Up @@ -248,9 +250,87 @@
try {
angular.element('#p' + $scope.paragraph.id + '_angular').html($scope.paragraph.result.msg);

<<<<<<< HEAD
$scope.isRunning = function() {
if ($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING') {
return true;
} else {
return false;
}
};

$scope.cancelParagraph = function() {
console.log('Cancel %o', $scope.paragraph.id);
websocketMsgSrv.cancelParagraphRun($scope.paragraph.id);
};

$scope.runParagraph = function(data) {
websocketMsgSrv.runParagraph($scope.paragraph.id, $scope.paragraph.title,
data, $scope.paragraph.config, $scope.paragraph.settings.params,
$scope.paragraph.settings.workflowJob);
$scope.originalText = angular.copy(data);
$scope.dirtyText = undefined;
};

$scope.saveParagraph = function() {
if ($scope.dirtyText === undefined || $scope.dirtyText === $scope.originalText) {
return;
}
commitParagraph($scope.paragraph.title, $scope.dirtyText, $scope.paragraph.config,
$scope.paragraph.settings.params);
$scope.originalText = angular.copy($scope.dirtyText);
$scope.dirtyText = undefined;
};

$scope.toggleEnableDisable = function() {
$scope.paragraph.config.enabled = $scope.paragraph.config.enabled ? false : true;
var newParams = angular.copy($scope.paragraph.settings.params);
var newConfig = angular.copy($scope.paragraph.config);
commitParagraph($scope.paragraph.title, $scope.paragraph.text, newConfig, newParams);
};

$scope.run = function() {
var editorValue = $scope.editor.getValue();
if (editorValue) {
if (!($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING')) {
$scope.runParagraph(editorValue);
}
}
};

$scope.moveUp = function() {
$scope.$emit('moveParagraphUp', $scope.paragraph.id);
};

$scope.moveDown = function() {
$scope.$emit('moveParagraphDown', $scope.paragraph.id);
};

$scope.insertNew = function(position) {
$scope.$emit('insertParagraph', $scope.paragraph.id, position || 'below');
};

$scope.removeParagraph = function() {
var paragraphs = angular.element('div[id$="_paragraphColumn_main"]');
if (paragraphs[paragraphs.length - 1].id.startsWith($scope.paragraph.id)) {
BootstrapDialog.alert({
closable: true,
message: 'The last paragraph can\'t be deleted.'
});
} else {
BootstrapDialog.confirm({
closable: true,
title: '',
message: 'Do you want to delete this paragraph?',
callback: function(result) {
if (result) {
console.log('Remove paragraph');
websocketMsgSrv.removeParagraph($scope.paragraph.id);
=======
$compile(angular.element('#p' + $scope.paragraph.id + '_angular').contents())(paragraphScope);
} catch (err) {
console.log('ANGULAR rendering error %o', err);
>>>>>>> master
}
} else {
$timeout(retryRenderer, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* limitations under the License.
*/
'use strict';

(function() {

angular.module('zeppelinWebApp').service('websocketMsgSrv', websocketMsgSrv);
Expand Down Expand Up @@ -111,15 +112,16 @@
websocketEvents.sendNewEvent({op: 'CANCEL_PARAGRAPH', data: {id: paragraphId}});
},

runParagraph: function(paragraphId, paragraphTitle, paragraphData, paragraphConfig, paragraphParams) {
runParagraph: function(paragraphId, paragraphTitle, paragraphData, paragraphConfig, paragraphParams, workflowJob) {
websocketEvents.sendNewEvent({
op: 'RUN_PARAGRAPH',
data: {
id: paragraphId,
title: paragraphTitle,
paragraph: paragraphData,
config: paragraphConfig,
params: paragraphParams
params: paragraphParams,
workflowJob: workflowJob ? workflowJob : null
}
});
},
Expand Down
Loading