diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index c434ffef0fa..4fbd7c05e84 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -68,6 +68,9 @@ import org.apache.zeppelin.util.WatcherSecurityKey; import org.apache.zeppelin.utils.InterpreterBindingUtils; import org.apache.zeppelin.utils.SecurityUtils; +import org.apache.zeppelin.workflow.WorkflowJob; +import org.apache.zeppelin.workflow.WorkflowJobItem; +import org.apache.zeppelin.workflow.WorkflowManager; import org.eclipse.jetty.websocket.servlet.WebSocketServlet; import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.quartz.SchedulerException; @@ -104,6 +107,7 @@ String getKey() { Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create(); final Map> noteSocketMap = new HashMap<>(); final Queue connectedSockets = new ConcurrentLinkedQueue<>(); + public WorkflowManager workflowManager = new WorkflowManager(); final Map> userConnectedSockets = new ConcurrentHashMap<>(); /** @@ -1306,6 +1310,50 @@ private void runParagraph(NotebookSocket conn, HashSet userAndRoles, Not Map params = (Map) fromMessage .get("params"); p.settings.setParams(params); + + List> workflow = (List>) fromMessage.get("workflowJob"); + + if (workflow != null && workflow.size() > 0) { + synchronized (workflowManager) { + workflowManager.setWorkflow(noteId, noteId, paragraphId); + WorkflowJob workflowJob = workflowManager.getWorkflow(noteId); + WorkflowJobItem parentItem = null; + WorkflowJobItem newJob; + for (Map workflowJobItem : workflow) { + parentItem = workflowJob.getWorkflowJobItemLast(); + String newJobNotebookId = workflowJobItem.get("notebookId"); + String newJobParagraphId = workflowJobItem.get("paragraphId"); + + if (parentItem == null || newJobNotebookId == null || newJobParagraphId == null) { + continue; + } + + Note noteCheckObject = notebook.getNote(newJobNotebookId); + + if (noteCheckObject == null) { + continue; + } + + if (noteCheckObject.getParagraph(newJobParagraphId) == null) { + if (newJobParagraphId.equals("*") == false) { + continue; + } + // run to all paragraph in note. + List allParagaraphsInNote = noteCheckObject.getParagraphs(); + for (Paragraph jobParagraph : allParagaraphsInNote) { + newJob = new WorkflowJobItem(newJobNotebookId, jobParagraph.getId()); + parentItem.setOnSuccessJob(newJob); + parentItem = workflowJob.getWorkflowJobItemLast(); + } + } else { + // run to each paragraph. + newJob = new WorkflowJobItem(newJobNotebookId, newJobParagraphId); + parentItem.setOnSuccessJob(newJob); + } + } + } + } + Map config = (Map) fromMessage .get("config"); p.setConfig(config); @@ -1616,6 +1664,7 @@ public void beforeStatusChange(Job job, Status before, Status after) { @Override public void afterStatusChange(Job job, Status before, Status after) { + if (after == Status.ERROR) { if (job.getException() != null) { LOG.error("Error", job.getException()); @@ -1631,6 +1680,42 @@ public void afterStatusChange(Job job, Status before, Status after) { LOG.error(e.toString(), e); } } + + notebookServer.workflowManager.setJobNotify(after, note.getId(), job.getId()); + + if (!after.isPending() && !after.isRunning()) { + Map> waitJobLists; + WorkflowManager workflowManager = notebookServer.workflowManager; + synchronized (workflowManager) { + waitJobLists = workflowManager.getNextJob(note.getId(), job.getId()); + if (waitJobLists != null) { + for (String notebookId : waitJobLists.keySet()) { + String paragraphId = waitJobLists.get(notebookId).get(0); + Note nextNote = notebookServer.notebook().getNote(notebookId); + + if (nextNote == null) { + workflowManager.setJobNotify(Status.ERROR, notebookId, paragraphId); + continue; + } + + Paragraph paragraph = nextNote.getParagraph(paragraphId); + if (paragraph == null) { + workflowManager.setJobNotify(Status.ERROR, notebookId, paragraphId); + continue; + } + + try { + nextNote.persist(null); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + nextNote.run(paragraphId); + workflowManager.setJobNotify(Status.RUNNING, notebookId, paragraphId); + } + } + } + } + notebookServer.broadcastNote(note); try { @@ -1652,7 +1737,6 @@ public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String ou .put("noteId", paragraph.getNote().getId()) .put("paragraphId", paragraph.getId()) .put("data", output); - notebookServer.broadcast(paragraph.getNote().getId(), msg); } @@ -1669,6 +1753,9 @@ public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String ou .put("paragraphId", paragraph.getId()) .put("data", output); + LOG.info("append updated {} note {} para {}", + paragraph.getStatus(), paragraph.getNote().getId(), paragraph.getId()); + notebookServer.broadcast(paragraph.getNote().getId(), msg); } } diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph-parameterizedQueryForm.html b/zeppelin-web/src/app/notebook/paragraph/paragraph-parameterizedQueryForm.html index 000b4fe160c..c88af887718 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph-parameterizedQueryForm.html +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph-parameterizedQueryForm.html @@ -14,38 +14,65 @@
+ +
+
+ + +
+
+ + +
+
+ +
-
- - + - - -
- -
+ +
+
+
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index b52b6660338..829bc2c34be 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -12,6 +12,7 @@ * limitations under the License. */ 'use strict'; + (function() { angular.module('zeppelinWebApp').controller('ParagraphCtrl', ParagraphCtrl); @@ -36,6 +37,7 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $location, $timeout, $compile, $http, $q, websocketMsgSrv, baseUrlSrv, ngToast, saveAsService) { + var ANGULAR_FUNCTION_OBJECT_NAME_PREFIX = '_Z_ANGULAR_FUNC_'; $scope.parentNote = null; $scope.paragraph = null; @@ -58,7 +60,7 @@ if (filtered.length === 1) { var paragraph = filtered[0]; websocketMsgSrv.runParagraph(paragraph.id, paragraph.title, paragraph.text, - paragraph.config, paragraph.settings.params); + paragraph.config, paragraph.settings.params, paragraph.settings.workflowJob); } else { ngToast.danger({content: 'Cannot find a paragraph with id \'' + paragraphId + '\'', verticalPosition: 'top', dismissOnTimeout: false}); @@ -248,9 +250,87 @@ try { angular.element('#p' + $scope.paragraph.id + '_angular').html($scope.paragraph.result.msg); +<<<<<<< HEAD + $scope.isRunning = function() { + if ($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING') { + return true; + } else { + return false; + } + }; + + $scope.cancelParagraph = function() { + console.log('Cancel %o', $scope.paragraph.id); + websocketMsgSrv.cancelParagraphRun($scope.paragraph.id); + }; + + $scope.runParagraph = function(data) { + websocketMsgSrv.runParagraph($scope.paragraph.id, $scope.paragraph.title, + data, $scope.paragraph.config, $scope.paragraph.settings.params, + $scope.paragraph.settings.workflowJob); + $scope.originalText = angular.copy(data); + $scope.dirtyText = undefined; + }; + + $scope.saveParagraph = function() { + if ($scope.dirtyText === undefined || $scope.dirtyText === $scope.originalText) { + return; + } + commitParagraph($scope.paragraph.title, $scope.dirtyText, $scope.paragraph.config, + $scope.paragraph.settings.params); + $scope.originalText = angular.copy($scope.dirtyText); + $scope.dirtyText = undefined; + }; + + $scope.toggleEnableDisable = function() { + $scope.paragraph.config.enabled = $scope.paragraph.config.enabled ? false : true; + var newParams = angular.copy($scope.paragraph.settings.params); + var newConfig = angular.copy($scope.paragraph.config); + commitParagraph($scope.paragraph.title, $scope.paragraph.text, newConfig, newParams); + }; + + $scope.run = function() { + var editorValue = $scope.editor.getValue(); + if (editorValue) { + if (!($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING')) { + $scope.runParagraph(editorValue); + } + } + }; + + $scope.moveUp = function() { + $scope.$emit('moveParagraphUp', $scope.paragraph.id); + }; + + $scope.moveDown = function() { + $scope.$emit('moveParagraphDown', $scope.paragraph.id); + }; + + $scope.insertNew = function(position) { + $scope.$emit('insertParagraph', $scope.paragraph.id, position || 'below'); + }; + + $scope.removeParagraph = function() { + var paragraphs = angular.element('div[id$="_paragraphColumn_main"]'); + if (paragraphs[paragraphs.length - 1].id.startsWith($scope.paragraph.id)) { + BootstrapDialog.alert({ + closable: true, + message: 'The last paragraph can\'t be deleted.' + }); + } else { + BootstrapDialog.confirm({ + closable: true, + title: '', + message: 'Do you want to delete this paragraph?', + callback: function(result) { + if (result) { + console.log('Remove paragraph'); + websocketMsgSrv.removeParagraph($scope.paragraph.id); +======= $compile(angular.element('#p' + $scope.paragraph.id + '_angular').contents())(paragraphScope); } catch (err) { console.log('ANGULAR rendering error %o', err); +>>>>>>> master } } else { $timeout(retryRenderer, 10); diff --git a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js index 5ffdaf7c259..0bbdbd4ae7c 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js +++ b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js @@ -12,6 +12,7 @@ * limitations under the License. */ 'use strict'; + (function() { angular.module('zeppelinWebApp').service('websocketMsgSrv', websocketMsgSrv); @@ -111,7 +112,7 @@ websocketEvents.sendNewEvent({op: 'CANCEL_PARAGRAPH', data: {id: paragraphId}}); }, - runParagraph: function(paragraphId, paragraphTitle, paragraphData, paragraphConfig, paragraphParams) { + runParagraph: function(paragraphId, paragraphTitle, paragraphData, paragraphConfig, paragraphParams, workflowJob) { websocketEvents.sendNewEvent({ op: 'RUN_PARAGRAPH', data: { @@ -119,7 +120,8 @@ title: paragraphTitle, paragraph: paragraphData, config: paragraphConfig, - params: paragraphParams + params: paragraphParams, + workflowJob: workflowJob ? workflowJob : null } }); }, diff --git a/zeppelin-web/src/components/workflow-widget/workflow-widget.directive.js b/zeppelin-web/src/components/workflow-widget/workflow-widget.directive.js new file mode 100644 index 00000000000..ff028c6aa26 --- /dev/null +++ b/zeppelin-web/src/components/workflow-widget/workflow-widget.directive.js @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +'use strict'; + +angular.module('zeppelinWebApp').controller('workflowWidgetCtrl', function($scope, notebookListDataFactory, + $rootScope, $routeParams, websocketMsgSrv) { + console.log('data ', $scope.workflowJobResultData); + + $scope.workflowJobLists = []; + $scope.workflowJobResultData = []; + + $scope.$watchCollection('workflowJobInputData', function() { + var NOTE_NOT_FOUNT = -1; + $scope.workflowJobLists = []; + $scope.workflowJobResultData = []; + var noteLists = notebookListDataFactory.root.children; + $scope.workflowJobInputData.map(function(item) { + var workflowJob = item.value.split(':'); + if (workflowJob[0] === undefined) { + return; + } + if (workflowJob[1] === undefined) { + workflowJob[1] = '*'; + } + + var noteIndex = _.findIndex(noteLists, {'id': workflowJob[0]}); + if (noteIndex === NOTE_NOT_FOUNT) { + return; + } + + var paragraphNameValue = workflowJob[1] === '*' ? 'ALL Paragraph' : workflowJob[1]; + $scope.workflowJobLists.push( + { + noteName: noteLists[noteIndex].name, + paragraphName: paragraphNameValue, + notebookId: workflowJob[0], + paragraphId: workflowJob[1] + } + ); + $scope.workflowJobResultData.push({notebookId: workflowJob[0], paragraphId: workflowJob[1]}); + }); + }); + + $scope.removeJob = function(jobIndex) { + $scope.workflowJobInputData.splice(jobIndex, 1); + }; + +}) + .directive('workflowWidget', function() { + return { + restrict: 'E', + scope: { + 'workflowJobInputData': '=', + 'workflowJobResultData': '=' + }, + controller: 'workflowWidgetCtrl', + templateUrl: 'components/workflow-widget/workflow-widget.html', + link: function(scope, elem, attrs) { + } + }; +}); diff --git a/zeppelin-web/src/components/workflow-widget/workflow-widget.html b/zeppelin-web/src/components/workflow-widget/workflow-widget.html new file mode 100644 index 00000000000..2feb0a0f87a --- /dev/null +++ b/zeppelin-web/src/components/workflow-widget/workflow-widget.html @@ -0,0 +1,51 @@ + + diff --git a/zeppelin-web/src/index.html b/zeppelin-web/src/index.html index fc000a9ed6f..7f93c5a0f9c 100644 --- a/zeppelin-web/src/index.html +++ b/zeppelin-web/src/index.html @@ -213,6 +213,7 @@ + diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/workflow/WorkflowJob.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/workflow/WorkflowJob.java new file mode 100644 index 00000000000..077ca737291 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/workflow/WorkflowJob.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.workflow; + +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.scheduler.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; + +/** + * workflow job object + * + */ +public class WorkflowJob { + static Logger logger = LoggerFactory.getLogger(WorkflowManager.class); + private String workflowId; + private WorkflowJobItem workflowJob; + + public WorkflowJob() { + this.workflowJob = null; + this.workflowId = String.valueOf(this.hashCode()); + } + + public WorkflowJob(String workflowId) { + this.workflowJob = null; + this.workflowId = workflowId; + } + + public String getWorkflowId() { + return workflowId; + } + + public void setWorkflowJobItem(WorkflowJobItem workflowItem) { + if (this.workflowJob == null) { + this.workflowId = workflowItem.getNotebookId(); + } + this.workflowJob = workflowItem; + } + + public WorkflowJobItem getWorkflowJobItemFirst() { + return this.workflowJob; + } + + public WorkflowJobItem getWorkflowJobItemLast() { + WorkflowJobItem currentJobItem = this.workflowJob; + WorkflowJobItem lastJobItem = null; + + while (currentJobItem != null) { + lastJobItem = currentJobItem; + currentJobItem = currentJobItem.getOnSuccessJob(); + } + return lastJobItem; + } + + public void removeWorkflowJobItemLast() { + List seqJobList = new LinkedList<>(); + WorkflowJobItem currentJobItem = this.workflowJob; + WorkflowJobItem lastJobItem = null; + + while (currentJobItem != null) { + seqJobList.add(currentJobItem); + currentJobItem = currentJobItem.getOnSuccessJob(); + } + + int workflowJobItemListSize = seqJobList.size(); + if (workflowJobItemListSize > 0) { + lastJobItem = seqJobList.get(workflowJobItemListSize - 1); + lastJobItem.setOnSuccessJob(null); + } + + } + + public WorkflowJobItem getWorkflowJobItemTarget(String notebookId, String paragraphId) { + if (this.workflowJob == null) { + return null; + } + + return this.workflowJob.getFindJob(notebookId, paragraphId); + } + + public WorkflowJobItem getIfNextJob(String finishiedNotebookId, String finishiedParagraphId) { + return this.workflowJob.getIfNextJob(finishiedNotebookId, finishiedParagraphId); + } + + public void notifyJobWork( + Job.Status status, String finishedNotebookId, String finishedParagraphId) { + if (this.workflowJob != null) { + this.workflowJob.notifyJobFinishied(status, finishedNotebookId, finishedParagraphId); + } + } + +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/workflow/WorkflowJobItem.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/workflow/WorkflowJobItem.java new file mode 100644 index 00000000000..1c950d5634f --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/workflow/WorkflowJobItem.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.workflow; + +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.scheduler.Job.Status; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.print.attribute.standard.JobState; + +/** + * workflow job item + * + */ +public class WorkflowJobItem { + static Logger logger = LoggerFactory.getLogger(WorkflowJobItem.class); + /** + * workflow job Status enum + * + */ + public enum WorkflowStatus { + WAIT, PROGRESS, ERROR, SUCCESS, ABORT + } + + private String notebookId; + private String paragaraphId; + private WorkflowJobItem onSuccessJob; + private WorkflowStatus status; + + public WorkflowJobItem() { + this.onSuccessJob = null; + this.notebookId = null; + this.paragaraphId = null; + this.status = WorkflowStatus.WAIT; + } + + public WorkflowJobItem(String notebookId, String paragaraphId) { + this(); + this.notebookId = notebookId; + this.paragaraphId = paragaraphId; + } + + public String getNotebookId() { + return notebookId; + } + + public void setNotebookId(String notebookId) { + this.notebookId = notebookId; + } + + public String getParagaraphId() { + return paragaraphId; + } + + public void setParagaraphId(String paragaraphId) { + this.paragaraphId = paragaraphId; + } + + public WorkflowStatus getStatus() { + return status; + } + + public void setStatus(WorkflowStatus status) { + this.status = status; + } + + public WorkflowJobItem getOnSuccessJob() { + return onSuccessJob; + } + + public void setOnSuccessJob(WorkflowJobItem onSuccessJob) { + this.onSuccessJob = onSuccessJob; + } + + public WorkflowJobItem getFindJob(String notebookId, String paragaraphId) { + if (isMyJob(notebookId, paragaraphId) == true) { + return this; + } + + if (this.onSuccessJob != null) { + return this.onSuccessJob.getFindJob(notebookId, paragaraphId); + } else { + return null; + } + } + + public WorkflowJobItem getIfNextJob(String finishedNotebookId, String finishedParagraphId) { + String childNotebookId = null; + String childParagraphId = null; + if (getStatus() == WorkflowStatus.SUCCESS) { + if (this.onSuccessJob != null) { + childNotebookId = this.onSuccessJob.getNotebookId(); + childParagraphId = this.onSuccessJob.getParagaraphId(); + return this.onSuccessJob.getIfNextJob(childNotebookId, childParagraphId); + } + } + + if (getStatus() == WorkflowStatus.WAIT) { + if (isMyJob(finishedNotebookId, finishedParagraphId)) { + return this; + } + } + + return null; + } + + public void notifyJobFinishied( + Status status, String finishedNotebookId, String finishedParagraphId) { + + if (isMyJob(finishedNotebookId, finishedParagraphId) && + (getStatus() == WorkflowStatus.WAIT || getStatus() == WorkflowStatus.PROGRESS)) { + if (Status.ABORT == status) { + setStatus(WorkflowStatus.ABORT); + } else if (Status.ERROR == status) { + setStatus(WorkflowStatus.ERROR); + } else if (Status.FINISHED == status) { + setStatus(WorkflowStatus.SUCCESS); + } else if (Status.RUNNING == status) { + setStatus(WorkflowStatus.PROGRESS); + } + } else { + if (this.onSuccessJob != null) { + this.onSuccessJob.notifyJobFinishied(status, finishedNotebookId, finishedParagraphId); + } + } + + } + + private boolean isMyJob(String finishedNotebookId, String finishedParagraphId) { + if (getNotebookId().equals(finishedNotebookId) && + getParagaraphId().equals(finishedParagraphId)) { + return true; + } + return false; + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/workflow/WorkflowManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/workflow/WorkflowManager.java new file mode 100644 index 00000000000..5af54ec4c63 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/workflow/WorkflowManager.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.workflow; + +import org.apache.zeppelin.scheduler.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * workflow manager + * + */ +public class WorkflowManager { + static Logger logger = LoggerFactory.getLogger(WorkflowManager.class); + private Map workflowJobLists; + + public WorkflowManager() { + this.workflowJobLists = new HashMap<>(); + } + + public void setWorkflow( + String workflowId, String jobRootNotebookId, String jobRootParagraphId) { + if (workflowJobLists.containsKey(workflowId) == true) { + workflowJobLists.remove(workflowId); + } + WorkflowJob newWorkflow = new WorkflowJob(workflowId); + WorkflowJobItem jobRootItem = new WorkflowJobItem(jobRootNotebookId, jobRootParagraphId); + newWorkflow.setWorkflowJobItem(jobRootItem); + workflowJobLists.put(workflowId, newWorkflow); + } + + public Map getWorkflow() { + return workflowJobLists; + } + + public WorkflowJob getWorkflow(String workflowId) { + return workflowJobLists.get(workflowId); + } + + public void setJobNotify(Job.Status jobStatus, String notebookId, String paragraphId) { + for (WorkflowJob job : workflowJobLists.values()) { + synchronized (job) { + job.notifyJobWork(jobStatus, notebookId, paragraphId); + } + } + } + + public Map> getNextJob(String notebookId, String paragraphId) { + Map> nextJobList = new HashMap<>(); + + for (WorkflowJob job : workflowJobLists.values()) { + synchronized (job) { + WorkflowJobItem nextJob = job.getIfNextJob(notebookId, paragraphId); + if (nextJob != null) { + LinkedList paragraphList = new LinkedList(); + paragraphList.add(nextJob.getParagaraphId()); + nextJobList.put(nextJob.getNotebookId(), paragraphList); + } + } + } + + return nextJobList; + } +}