diff --git a/notebook/2C6AEZVY2/note.json b/notebook/2C6AEZVY2/note.json new file mode 100644 index 00000000000..540cc3a9434 --- /dev/null +++ b/notebook/2C6AEZVY2/note.json @@ -0,0 +1,529 @@ +{ + "paragraphs": [ + { + "text": "%md\n# Zeppelin Workflow\n#### Describes how various interpreters can be used as workflows.", + "user": "anonymous", + "dateUpdated": "Jan 1, 2017 7:00:00 PM", + "config": { + "colWidth": 12.0, + "enabled": true, + "results": {}, + "editorSetting": { + "language": "markdown", + "editOnDblClick": true + }, + "editorMode": "ace/mode/markdown", + "editorHide": true, + "tableHide": false + }, + "settings": { + "params": {}, + "forms": {} + }, + "results": { + "code": "SUCCESS", + "msg": [ + { + "type": "HTML", + "data": "\u003ch1\u003eZeppelin Workflow\u003c/h1\u003e\n\u003ch4\u003eDescribes how various interpreters can be used as workflows.\u003c/h4\u003e\n" + } + ] + }, + "apps": [], + "jobName": "paragraph_1483311356894_400311605", + "id": "20170101-145556_1340440758", + "dateCreated": "Jan 1, 2017 2:55:56 PM", + "dateStarted": "Jan 1, 2017 7:00:00 PM", + "dateFinished": "Jan 1, 2017 7:00:00 PM", + "status": "FINISHED", + "progressUpdateIntervalMs": 500 + }, + { + "text": "%angular\n\u003cbr /\u003e\n\u003cbr /\u003e\n\u003cdiv class\u003d\u0027well\u0027\u003e\n \u003cdiv class\u003d\u0027row\u0027\u003e\n \u003cdiv class\u003d\u0027col-md-12 text-center text-warning\u0027\u003e\n Normaly Execute Cycle\n \u003c/div\u003e\n \u003c/div\u003e\n\u003c/div\u003e\n\u003cdiv class\u003d\u0027row\u0027\u003e\n \u003cdiv class\u003d\u0027col-md-12 text-right text-warning\u0027\u003e\n For example ---\u003e\n \u003c/div\u003e\n\u003c/div\u003e", + "user": "anonymous", + "dateUpdated": "Jan 1, 2017 7:00:00 PM", + "config": { + "colWidth": 12.0, + "enabled": true, + "results": {}, + "editorSetting": { + "language": "text", + "editOnDblClick": true + }, + "editorMode": "ace/mode/undefined", + "editorHide": true, + "tableHide": false + }, + "settings": { + "params": {}, + "forms": {} + }, + "results": { + "code": "SUCCESS", + "msg": [ + { + "type": "ANGULAR", + "data": "\u003cbr /\u003e\n\u003cbr /\u003e\n\u003cdiv class\u003d\u0027well\u0027\u003e\n \u003cdiv class\u003d\u0027row\u0027\u003e\n \u003cdiv class\u003d\u0027col-md-12 text-center text-warning\u0027\u003e\n Normaly Execute Cycle\n \u003c/div\u003e\n \u003c/div\u003e\n\u003c/div\u003e\n\u003cdiv class\u003d\u0027row\u0027\u003e\n \u003cdiv class\u003d\u0027col-md-12 text-right text-warning\u0027\u003e\n For example ---\u003e\n \u003c/div\u003e\n\u003c/div\u003e" + } + ] + }, + "apps": [], + "jobName": "paragraph_1483312806703_-1252533564", + "id": "20170101-152006_1383393641", + "dateCreated": "Jan 1, 2017 3:20:06 PM", + "dateStarted": "Jan 1, 2017 7:00:01 PM", + "dateFinished": "Jan 1, 2017 7:00:01 PM", + "status": "FINISHED", + "progressUpdateIntervalMs": 500 + }, + { + "text": "%spark\n\nprintln(\"%html \u003ch1 class\u003d\u0027text-primary\u0027\u003ePlease Execute this paragraph\u003c/h1\u003e\")\n\nz.run(\"2C6AEZVY2\", \"20170101-152325_935191391\")\nz.run(\"2C6AEZVY2\", \"20170101-152357_227883083\")\nz.run(\"2C6AEZVY2\", \"20170101-152408_302376653\")", + "user": "anonymous", + "dateUpdated": "Jan 1, 2017 7:00:01 PM", + "config": { + "colWidth": 3.0, + "enabled": true, + "results": {}, + "editorSetting": { + "language": "scala", + "editOnDblClick": false + }, + "editorMode": "ace/mode/scala" + }, + "settings": { + "params": {}, + "forms": {} + }, + "results": { + "code": "SUCCESS", + "msg": [ + { + "type": "HTML", + "data": "\u003ch1 class\u003d\u0027text-primary\u0027\u003ePlease Execute this paragraph\u003c/h1\u003e\n" + } + ] + }, + "apps": [], + "jobName": "paragraph_1483313002204_284228519", + "id": "20170101-152322_987058228", + "dateCreated": "Jan 1, 2017 3:23:22 PM", + "dateStarted": "Jan 1, 2017 7:00:01 PM", + "dateFinished": "Jan 1, 2017 7:00:04 PM", + "status": "FINISHED", + "progressUpdateIntervalMs": 500 + }, + { + "text": "%spark\nThread.sleep(3000)\n\nprintln(\"%html \u003ch1\u003eSPARK DONE\u003c/h1\u003e\")\n", + "user": "anonymous", + "dateUpdated": "Jan 1, 2017 7:00:01 PM", + "config": { + "colWidth": 2.0, + "enabled": true, + "results": {}, + "editorSetting": { + "language": "scala", + "editOnDblClick": false + }, + "editorMode": "ace/mode/scala", + "editorHide": false, + "tableHide": false, + "title": false + }, + "settings": { + "params": {}, + "forms": {} + }, + "results": { + "code": "SUCCESS", + "msg": [ + { + "type": "HTML", + "data": "\u003ch1\u003eSPARK DONE\u003c/h1\u003e\n" + } + ] + }, + "apps": [], + "jobName": "paragraph_1483313005407_295016813", + "id": "20170101-152325_935191391", + "dateCreated": "Jan 1, 2017 3:23:25 PM", + "dateStarted": "Jan 1, 2017 7:00:09 PM", + "dateFinished": "Jan 1, 2017 7:00:15 PM", + "status": "FINISHED", + "progressUpdateIntervalMs": 500 + }, + { + "text": "%md\n# MARKDOWN DONE", + "user": "anonymous", + "dateUpdated": "Jan 1, 2017 7:00:01 PM", + "config": { + "colWidth": 2.0, + "enabled": true, + "results": {}, + "editorSetting": { + "language": "markdown", + "editOnDblClick": true + }, + "editorMode": "ace/mode/markdown", + "editorHide": true, + "tableHide": false + }, + "settings": { + "params": {}, + "forms": {} + }, + "results": { + "code": "SUCCESS", + "msg": [ + { + "type": "HTML", + "data": "\u003ch1\u003eMARKDOWN DONE\u003c/h1\u003e\n" + } + ] + }, + "apps": [], + "jobName": "paragraph_1483313037019_1351567941", + "id": "20170101-152357_227883083", + "dateCreated": "Jan 1, 2017 3:23:57 PM", + "dateStarted": "Jan 1, 2017 7:00:04 PM", + "dateFinished": "Jan 1, 2017 7:00:04 PM", + "status": "FINISHED", + "progressUpdateIntervalMs": 500 + }, + { + "text": "%angular\n\u003ch1\u003eANGULAR DONE\u003c/h1\u003e\n", + "user": "anonymous", + "dateUpdated": "Jan 1, 2017 7:00:02 PM", + "config": { + "colWidth": 2.0, + "enabled": true, + "results": {}, + "editorSetting": { + "language": "text", + "editOnDblClick": true + }, + "editorMode": "ace/mode/undefined", + "editorHide": true, + "tableHide": false + }, + "settings": { + "params": {}, + "forms": {} + }, + "results": { + "code": "SUCCESS", + "msg": [ + { + "type": "ANGULAR", + "data": "\u003ch1\u003eANGULAR DONE\u003c/h1\u003e" + } + ] + }, + "apps": [], + "jobName": "paragraph_1483313048080_-1861070051", + "id": "20170101-152408_302376653", + "dateCreated": "Jan 1, 2017 3:24:08 PM", + "dateStarted": "Jan 1, 2017 7:00:04 PM", + "dateFinished": "Jan 1, 2017 7:00:04 PM", + "status": "FINISHED", + "progressUpdateIntervalMs": 500 + }, + { + "text": "%angular\n\u003cbr /\u003e\n\u003cbr /\u003e\n\u003cdiv class\u003d\u0027row well\u0027\u003e\n \u003cdiv class\u003d\u0027col-md-12 text-center text-primary\u0027\u003e\n Workflow execute\n \u003c/div\u003e\n\u003c/div\u003e\n", + "user": "anonymous", + "dateUpdated": "Jan 1, 2017 7:00:02 PM", + "config": { + "colWidth": 12.0, + "enabled": true, + "results": {}, + "editorSetting": { + "language": "text", + "editOnDblClick": true + }, + "editorMode": "ace/mode/undefined", + "editorHide": true, + "tableHide": false + }, + "settings": { + "params": {}, + "forms": {} + }, + "results": { + "code": "SUCCESS", + "msg": [ + { + "type": "ANGULAR", + "data": "\u003cbr /\u003e\n\u003cbr /\u003e\n\u003cdiv class\u003d\u0027row well\u0027\u003e\n \u003cdiv class\u003d\u0027col-md-12 text-center text-primary\u0027\u003e\n Workflow execute\n \u003c/div\u003e\n\u003c/div\u003e" + } + ] + }, + "apps": [], + "jobName": "paragraph_1483313003802_1345365986", + "id": "20170101-152323_1238754452", + "dateCreated": "Jan 1, 2017 3:23:23 PM", + "dateStarted": "Jan 1, 2017 7:00:02 PM", + "dateFinished": "Jan 1, 2017 7:00:02 PM", + "status": "FINISHED", + "progressUpdateIntervalMs": 500 + }, + { + "text": "%spark\n\nprintln(\"%html \u003ch1 class\u003d\u0027text-primary\u0027\u003eBased Thread workflow executor\u003c/h1\u003e\")\nnew Thread(new Runnable {\n def run() {\n z.run(\"2C6AEZVY2\", \"20170101-152406_303378533\")\n var job \u003d z.getZeppelinJobStatus(\"2C6AEZVY2\", \"20170101-152406_303378533\").getJobStatus();\n while (job.isPending || job.isRunning) {\n Thread.sleep(1000)\n job \u003d z.getZeppelinJobStatus(\"2C6AEZVY2\", \"20170101-152406_303378533\").getJobStatus()\n }\n \n z.run(\"2C6AEZVY2\", \"20170101-152404_1766974937\")\n job \u003d z.getZeppelinJobStatus(\"2C6AEZVY2\", \"20170101-152404_1766974937\").getJobStatus();\n \n while (job.isPending || job.isRunning) {\n Thread.sleep(500)\n job \u003d z.getZeppelinJobStatus(\"2C6AEZVY2\", \"20170101-152404_1766974937\").getJobStatus();\n }\n z.run(\"2C6AEZVY2\", \"20170101-164138_1051383091\")\n println(\"ex Hello\")\n job \u003d z.getZeppelinJobStatus(\"2C6AEZVY2\", \"20170101-164138_1051383091\").getJobStatus();\n while (job.isPending || job.isRunning) {\n Thread.sleep(500)\n job \u003d z.getZeppelinJobStatus(\"2C6AEZVY2\", \"20170101-164138_1051383091\").getJobStatus();\n }\n }\n}).start", + "user": "anonymous", + "dateUpdated": "Jan 2, 2017 9:16:42 AM", + "config": { + "colWidth": 3.0, + "enabled": true, + "results": {}, + "editorSetting": { + "language": "scala", + "editOnDblClick": false + }, + "editorMode": "ace/mode/scala" + }, + "settings": { + "params": {}, + "forms": {} + }, + "results": { + "code": "SUCCESS", + "msg": [ + { + "type": "HTML", + "data": "\u003ch1 class\u003d\u0027text-primary\u0027\u003eBased Thread workflow executor\u003c/h1\u003e\n" + } + ] + }, + "apps": [], + "jobName": "paragraph_1483313046793_1537817057", + "id": "20170101-152406_916644710", + "dateCreated": "Jan 1, 2017 3:24:06 PM", + "dateStarted": "Jan 2, 2017 9:16:42 AM", + "dateFinished": "Jan 2, 2017 9:16:43 AM", + "status": "FINISHED", + "progressUpdateIntervalMs": 500 + }, + { + "text": "%spark\nThread.sleep(3000)\nprintln(\"%html \u003ch1\u003eSPARK DONE\u003c/h1\u003e\")\n", + "user": "anonymous", + "dateUpdated": "Jan 2, 2017 9:00:32 AM", + "config": { + "colWidth": 3.0, + "enabled": true, + "results": {}, + "editorSetting": { + "language": "scala", + "editOnDblClick": false + }, + "editorMode": "ace/mode/scala" + }, + "settings": { + "params": {}, + "forms": {} + }, + "results": { + "code": "SUCCESS", + "msg": [ + { + "type": "HTML", + "data": "\u003ch1\u003eSPARK DONE\u003c/h1\u003e\n" + } + ] + }, + "apps": [], + "jobName": "paragraph_1483313046170_1692870864", + "id": "20170101-152406_303378533", + "dateCreated": "Jan 1, 2017 3:24:06 PM", + "dateStarted": "Jan 2, 2017 9:16:44 AM", + "dateFinished": "Jan 2, 2017 9:16:47 AM", + "status": "FINISHED", + "progressUpdateIntervalMs": 500 + }, + { + "text": "%md\n# MARKDOWN DONE", + "user": "anonymous", + "dateUpdated": "Jan 1, 2017 7:00:03 PM", + "config": { + "colWidth": 3.0, + "enabled": true, + "results": {}, + "editorSetting": { + "language": "markdown", + "editOnDblClick": true + }, + "editorMode": "ace/mode/markdown", + "editorHide": true, + "tableHide": false + }, + "settings": { + "params": {}, + "forms": {} + }, + "results": { + "code": "SUCCESS", + "msg": [ + { + "type": "HTML", + "data": "\u003ch1\u003eMARKDOWN DONE\u003c/h1\u003e\n" + } + ] + }, + "apps": [], + "jobName": "paragraph_1483313044517_-1645625991", + "id": "20170101-152404_1766974937", + "dateCreated": "Jan 1, 2017 3:24:04 PM", + "dateStarted": "Jan 2, 2017 9:16:49 AM", + "dateFinished": "Jan 2, 2017 9:16:49 AM", + "status": "FINISHED", + "progressUpdateIntervalMs": 500 + }, + { + "text": "%angular\n\u003ch1\u003eANGULAR DONE\u003c/h1\u003e\n", + "user": "anonymous", + "dateUpdated": "Jan 1, 2017 7:00:03 PM", + "config": { + "colWidth": 3.0, + "enabled": true, + "results": {}, + "editorSetting": { + "language": "markdown", + "editOnDblClick": true + }, + "editorMode": "ace/mode/undefined", + "editorHide": true, + "tableHide": false + }, + "settings": { + "params": {}, + "forms": {} + }, + "results": { + "code": "SUCCESS", + "msg": [ + { + "type": "ANGULAR", + "data": "\u003ch1\u003eANGULAR DONE\u003c/h1\u003e" + } + ] + }, + "apps": [], + "jobName": "paragraph_1483317698107_-128478476", + "id": "20170101-164138_1051383091", + "dateCreated": "Jan 1, 2017 4:41:38 PM", + "dateStarted": "Jan 2, 2017 9:16:51 AM", + "dateFinished": "Jan 2, 2017 9:16:51 AM", + "status": "FINISHED", + "progressUpdateIntervalMs": 500 + }, + { + "text": "import scala.concurrent.{Await, Future, future}\nimport scala.concurrent.ExecutionContext.Implicits.global\nimport scala.util.{Failure, Success}\n\ndef printWorkflowResult(msg:String): Unit \u003d {\n println(\"HEEEE\")\n}\n\nval myWorkflow \u003d Future {\n z.run(\"2C6AEZVY2\", \"20170101-152406_303378533\")\n var job \u003d z.getZeppelinJobStatus(\"2C6AEZVY2\", \"20170101-152406_303378533\").getJobStatus();\n while (job.isPending || job.isRunning) {\n Thread.sleep(1000)\n job \u003d z.getZeppelinJobStatus(\"2C6AEZVY2\", \"20170101-152406_303378533\").getJobStatus()\n }\n \n if (job.isFinished \u003d\u003d false) {\n new Exception(\"Fist job is fail!\")\n }\n \n println(\"dflkjsdlkfjdslk\")\n \n z.run(\"2C6AEZVY2\", \"20170101-152404_1766974937\")\n job \u003d z.getZeppelinJobStatus(\"2C6AEZVY2\", \"20170101-152404_1766974937\").getJobStatus();\n \n while (job.isPending || job.isRunning) {\n Thread.sleep(500)\n job \u003d z.getZeppelinJobStatus(\"2C6AEZVY2\", \"20170101-152404_1766974937\").getJobStatus();\n }\n \n if (job.isFinished \u003d\u003d false) {\n new Exception(\"second job is fail!\")\n }\n \n z.run(\"2C6AEZVY2\", \"20170101-164138_1051383091\")\n println(\"ex Hello\")\n job \u003d z.getZeppelinJobStatus(\"2C6AEZVY2\", \"20170101-164138_1051383091\").getJobStatus();\n while (job.isPending || job.isRunning) {\n Thread.sleep(500)\n job \u003d z.getZeppelinJobStatus(\"2C6AEZVY2\", \"20170101-164138_1051383091\").getJobStatus();\n }\n\n if (job.isFinished \u003d\u003d false) {\n new Exception(\"third job is fail!\")\n }\n \n val result:String \u003d \"Done\"\n result\n}\n\nmyWorkflow.onComplete {\n case Success(msg) \u003d\u003e println(\"Got the callback, meaning \" + msg)\n case Failure(e) \u003d\u003e e.printStackTrace\n}\n\nmyWorkflow onSuccess {\n case result \u003d\u003e printWorkflowResult(\"Workflow is Done\")\n}\n\nmyWorkflow onFailure {\n case failure \u003d\u003e printWorkflowResult(\"Workflow is Fail\")\n}\n\nprintln(\"Workflow Execute\")\n\n\n\n\n\nThread.sleep(5000)", + "user": "anonymous", + "dateUpdated": "Jan 2, 2017 9:10:23 AM", + "config": { + "colWidth": 12.0, + "enabled": true, + "results": {}, + "editorSetting": { + "language": "scala", + "editOnDblClick": false + }, + "editorMode": "ace/mode/scala" + }, + "settings": { + "params": {}, + "forms": {} + }, + "results": { + "code": "SUCCESS", + "msg": [ + { + "type": "TEXT", + "data": "\nimport scala.concurrent.{Await, Future, future}\n\nimport scala.concurrent.ExecutionContext.Implicits.global\n\nimport scala.util.{Failure, Success}\n\nprintWorkflowResult: (msg: String)Unit\n\nmyWorkflow: scala.concurrent.Future[String] \u003d scala.concurrent.impl.Promise$DefaultPromise@2f0159a2\nWorkflow Execute\n" + } + ] + }, + "apps": [], + "jobName": "paragraph_1483326003200_406855249", + "id": "20170101-190003_224666894", + "dateCreated": "Jan 1, 2017 7:00:03 PM", + "dateStarted": "Jan 2, 2017 9:10:23 AM", + "dateFinished": "Jan 2, 2017 9:10:34 AM", + "status": "FINISHED", + "progressUpdateIntervalMs": 500 + }, + { + "text": "%md\n", + "user": "anonymous", + "dateUpdated": "Jan 2, 2017 8:44:58 AM", + "config": {}, + "settings": { + "params": {}, + "forms": {} + }, + "apps": [], + "jobName": "paragraph_1483375498856_2033485473", + "id": "20170102-084458_592045884", + "dateCreated": "Jan 2, 2017 8:44:58 AM", + "status": "READY", + "progressUpdateIntervalMs": 500 + }, + { + "text": "%md\n", + "user": "anonymous", + "dateUpdated": "Jan 2, 2017 8:45:00 AM", + "config": {}, + "settings": { + "params": {}, + "forms": {} + }, + "apps": [], + "jobName": "paragraph_1483375500500_-1206885088", + "id": "20170102-084500_1266132957", + "dateCreated": "Jan 2, 2017 8:45:00 AM", + "status": "READY", + "progressUpdateIntervalMs": 500 + }, + { + "text": "%md\n", + "user": "anonymous", + "dateUpdated": "Jan 2, 2017 8:44:57 AM", + "config": {}, + "settings": { + "params": {}, + "forms": {} + }, + "apps": [], + "jobName": "paragraph_1483375497136_1427121206", + "id": "20170102-084457_301741370", + "dateCreated": "Jan 2, 2017 8:44:57 AM", + "status": "READY", + "progressUpdateIntervalMs": 500 + } + ], + "name": "Zeppelin Tutorial/Workflow Tutorial", + "id": "2C6AEZVY2", + "angularObjects": { + "2C8AHX3W9:shared_process": [], + "2C67YNHQU:shared_process": [], + "2C6B7JAYS:shared_process": [], + "2C548UHDB:shared_process": [], + "2C53F7RSM:shared_process": [], + "2C7E2MKRM:shared_process": [], + "2C5C149TR:shared_process": [], + "2C5VB3W86:shared_process": [], + "2C6Q32VJ2:shared_process": [], + "2C4G9MQGQ:shared_process": [], + "2C4QKZTAQ:shared_process": [], + "2C76ZHMJ8:shared_process": [], + "2C6KBJDXK:shared_process": [], + "2C78P62FM:shared_process": [], + "2C7YVY1JE:shared_process": [], + "2C6VZEHC3:shared_process": [], + "2C6P5GQ63:shared_process": [], + "2C851PFXZ:shared_process": [], + "2C6BHV77Q:shared_process": [] + }, + "config": { + "looknfeel": "default", + "personalizedMode": "false" + }, + "info": {} +} \ No newline at end of file diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index 281a8f83cad..e458a665308 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -46,6 +46,7 @@ import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterHookRegistry; import org.apache.zeppelin.interpreter.RemoteWorksController; +import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus; import org.apache.zeppelin.spark.dep.SparkDependencyResolver; import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourcePool; @@ -343,10 +344,21 @@ public void run(String noteId, String paragraphId, InterpreterContext context) { } + /** + * Run Zeppelin Note by note id + * @param noteId + */ + @ZeppelinApi public void runNote(String noteId) { runNote(noteId, interpreterContext); } + /** + * Run Zepppelin Note by note id + * @param noteId + * @param context + */ + @ZeppelinApi public void runNote(String noteId, InterpreterContext context) { String runningNoteId = context.getNoteId(); String runningParagraphId = context.getParagraphId(); @@ -364,6 +376,31 @@ public void runNote(String noteId, InterpreterContext context) { } } + /** + * get job status by zeppelin note id and paragraph id + * @param noteId + * @param paragraphId + * @return + */ + @ZeppelinApi + public RemoteZeppelinJobStatus getZeppelinJobStatus(String noteId, String paragraphId) { + return getZeppelinJobStatus(noteId, paragraphId, interpreterContext); + } + + /** + * get job status by zeppelin note id and paragraph id + * @param noteId + * @param paragraphId + * @param context + * @return + */ + @ZeppelinApi + public RemoteZeppelinJobStatus getZeppelinJobStatus( + String noteId, String paragraphId, InterpreterContext context) { + RemoteWorksController remoteWorksController = context.getRemoteWorksController(); + return remoteWorksController.getRemoteJobStatus(noteId, paragraphId); + } + /** * get Zeppelin Paragraph Runner from zeppelin server diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java index e1410d61abb..a1fc899801f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java @@ -26,4 +26,5 @@ public interface RemoteWorksController { List getRemoteContextRunner(String noteId); List getRemoteContextRunner(String noteId, String paragraphId); + RemoteZeppelinJobStatus getRemoteJobStatus(String noteId, String paragraphId); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java new file mode 100644 index 00000000000..93f2902c60b --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinJobStatus.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import org.apache.zeppelin.scheduler.Job.Status; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; + +/** + * Remote Zeppelin Server job status + */ +public class RemoteZeppelinJobStatus { + private String noteId; + private Status jobStatus; + private String paragraphId; + private Date lastRunningTime; + + public String getNoteId() { + return noteId; + } + + public void setNoteId(String noteId) { + this.noteId = noteId; + } + + public Status getJobStatus() { + return jobStatus; + } + + public void setJobStatus(Status jobStatus) { + this.jobStatus = jobStatus; + } + + public void setJobStatus(String jobStatusString) { + this.jobStatus = Status.valueOf(jobStatusString); + } + + public String getParagraphId() { + return paragraphId; + } + + public void setParagraphId(String paragraphId) { + this.paragraphId = paragraphId; + } + + public Date getLastRunningTime() { + return lastRunningTime; + } + + public void setLastRunningTime(Date lastRunningTime) { + this.lastRunningTime = lastRunningTime; + } + + public void setLastRunningTime(String lastRunningTimeString) { + DateFormat format = new SimpleDateFormat("MMMM d, yyyy", Locale.ENGLISH); + Date date = new Date(); + try { + date = format.parse(lastRunningTimeString); + } catch (ParseException e) { + + } finally { + this.lastRunningTime = date; + } + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinServerResource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinServerResource.java index b2a87aa8bd5..34bae1a4713 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinServerResource.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteZeppelinServerResource.java @@ -25,7 +25,8 @@ public class RemoteZeppelinServerResource { * Resource Type for Zeppelin Server */ public enum Type{ - PARAGRAPH_RUNNERS + PARAGRAPH_RUNNERS, + JOB_STATUS } private String ownerKey; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index 900d1ace3b6..97f08515f82 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -21,6 +21,7 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus; import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; @@ -68,6 +69,20 @@ public void getZeppelinServerNoteRunner( gson.toJson(eventBody))); } + public void getZeppelinServerJobStatus(String eventOwnerKey, String noteId, String paragraphId) { + RemoteZeppelinServerResource eventBody = new RemoteZeppelinServerResource(); + eventBody.setResourceType(RemoteZeppelinServerResource.Type.JOB_STATUS); + eventBody.setOwnerKey(eventOwnerKey); + RemoteZeppelinJobStatus jobStatus = new RemoteZeppelinJobStatus(); + jobStatus.setNoteId(noteId); + jobStatus.setParagraphId(paragraphId); + eventBody.setData(jobStatus); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.REMOTE_ZEPPELIN_JOB_STATUS, + gson.toJson(eventBody))); + } + /** * Run paragraph * @param runner diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index bf64d9fa084..48d468c7abb 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -26,6 +26,7 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus; import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; @@ -231,6 +232,12 @@ public void run() { progressRemoteZeppelinControlEvent( reqResourceBody.getResourceType(), listener, reqResourceBody); + } else if (event.getType() == RemoteInterpreterEventType.REMOTE_ZEPPELIN_JOB_STATUS) { + RemoteZeppelinServerResource reqResourceBody = gson.fromJson( + event.getData(), RemoteZeppelinServerResource.class); + progressRemoteJobStatusControlEvent( + reqResourceBody.getResourceType(), listener, reqResourceBody); + } else if (event.getType() == RemoteInterpreterEventType.META_INFOS) { Map metaInfos = gson.fromJson(event.getData(), new TypeToken>() { @@ -326,6 +333,66 @@ public void onError() { } } + private void progressRemoteJobStatusControlEvent( + RemoteZeppelinServerResource.Type resourceType, + RemoteInterpreterProcessListener remoteWorksEventListener, + RemoteZeppelinServerResource reqResourceBody) throws Exception { + boolean broken = false; + final Gson gson = new Gson(); + final String eventOwnerKey = reqResourceBody.getOwnerKey(); + Client interpreterServerMain = null; + try { + interpreterServerMain = interpreterProcess.getClient(); + final Client eventClient = interpreterServerMain; + if (resourceType == RemoteZeppelinServerResource.Type.JOB_STATUS) { + Map jobStatus = (Map) reqResourceBody.getData(); + + String noteId = (String) jobStatus.get("noteId"); + String paragraphId = (String) jobStatus.get("paragraphId"); + + RemoteInterpreterProcessListener.RemoteWorksEventListener callBackEvent = + new RemoteInterpreterProcessListener.RemoteWorksEventListener() { + + @Override + public void onFinished(Object resultObject) { + boolean clientBroken = false; + if (resultObject != null && resultObject instanceof RemoteZeppelinJobStatus) { + + RemoteZeppelinServerResource resResource = new RemoteZeppelinServerResource(); + resResource.setOwnerKey(eventOwnerKey); + resResource.setResourceType(RemoteZeppelinServerResource.Type.JOB_STATUS); + resResource.setData(resultObject); + + try { + eventClient.onReceivedZeppelinResource(gson.toJson(resResource)); + } catch (Exception e) { + clientBroken = true; + logger.error("Can't get Remote Job Status Event", e); + waitQuietly(); + } finally { + interpreterProcess.releaseClient(eventClient, clientBroken); + } + } + } + + @Override + public void onError() { + logger.info("onGetParagraphRunners onError"); + } + }; + + remoteWorksEventListener.onGetParagraphJobStatus(noteId, paragraphId, callBackEvent); + } + } catch (Exception e) { + broken = true; + logger.error("Can't get RemoteInterpreter Job Status Event", e); + waitQuietly(); + + } finally { + interpreterProcess.releaseClient(interpreterServerMain, broken); + } + } + private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) { Client client = null; boolean broken = false; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java index 66b08c95a1d..4ac60ffcf3e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java @@ -32,6 +32,8 @@ public void onOutputUpdated( public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception; public void onGetParagraphRunners( String noteId, String paragraphId, RemoteWorksEventListener callback); + public void onGetParagraphJobStatus( + String noteId, String paragraphId, RemoteWorksEventListener callback); /** * Remote works for Interpreter callback listener diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 6baed824cea..7c411254a7b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; +import org.apache.commons.lang.StringUtils; import org.apache.thrift.TException; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; @@ -372,6 +373,21 @@ public void onReceivedZeppelinResource(String responseJson) throws TException { response.getOwnerKey(), intpContextRunners); } + } else if (response.getResourceType() == RemoteZeppelinServerResource.Type.JOB_STATUS) { + + Map jobStatusMap = (Map) response.getData(); + + RemoteZeppelinJobStatus jobStatus = new RemoteZeppelinJobStatus(); + jobStatus.setNoteId((String) jobStatusMap.get("noteId")); + jobStatus.setParagraphId((String) jobStatusMap.get("paragraphId")); + jobStatus.setJobStatus((String) jobStatusMap.get("jobStatus")); + jobStatus.setLastRunningTime((String) jobStatusMap.get("lastRunningTime")); + + synchronized (this.remoteWorksResponsePool) { + this.remoteWorksResponsePool.put( + response.getOwnerKey(), + jobStatus); + } } } catch (Exception e) { throw e; @@ -718,7 +734,26 @@ public List getRemoteContextRunner( return runners; } + @Override + public RemoteZeppelinJobStatus getRemoteJobStatus(String noteId, String paragraphId) { + RemoteZeppelinJobStatus jobStatus = null; + String ownerKey = generateOwnerKey(); + if (StringUtils.isBlank(noteId) || StringUtils.isBlank(paragraphId)) { + return null; + } + server.eventClient.getZeppelinServerJobStatus(ownerKey, noteId, paragraphId); + try { + this.waitForEvent(ownerKey); + } catch (Exception e) { + return null; + } + synchronized (this.remoteWorksResponsePool) { + jobStatus = (RemoteZeppelinJobStatus) this.remoteWorksResponsePool.get(ownerKey); + this.remoteWorksResponsePool.remove(ownerKey); + } + return jobStatus; + } } private RemoteInterpreterResult convert(InterpreterResult result, diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java index 2d1c165d0ca..74d05820f45 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-12-30") public class InterpreterCompletion implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java index 5c730491e5e..402c7d54ecd 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-12-30") public class RemoteApplicationResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java index e721cb32978..845c6c672f7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-12-30") public class RemoteInterpreterContext implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java index 5f4201a3f12..57054e1f197 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-12-30") public class RemoteInterpreterEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index fc20cd91acb..1703b15b7c7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -42,7 +42,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { ANGULAR_REGISTRY_PUSH(11), APP_STATUS_UPDATE(12), META_INFOS(13), - REMOTE_ZEPPELIN_SERVER_RESOURCE(14); + REMOTE_ZEPPELIN_SERVER_RESOURCE(14), + REMOTE_ZEPPELIN_JOB_STATUS(15); private final int value; @@ -91,6 +92,8 @@ public static RemoteInterpreterEventType findByValue(int value) { return META_INFOS; case 14: return REMOTE_ZEPPELIN_SERVER_RESOURCE; + case 15: + return REMOTE_ZEPPELIN_JOB_STATUS; default: return null; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java index ce15084c69a..4e830a1a2ce 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-12-30") public class RemoteInterpreterResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java index c55c72e6100..20c213ed06f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-12-30") public class RemoteInterpreterResultMessage implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResultMessage"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java index feaebccd381..96dd2bc68cb 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-12-30") public class RemoteInterpreterService { public interface Iface { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java index 197a1351440..44e1fc61560 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-12-30") public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ZeppelinServerResourceParagraphRunner"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index 9bb26f3acdd..d4e39778054 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -62,6 +62,18 @@ public boolean isRunning() { public boolean isPending() { return this == PENDING; } + + public boolean isFinished() { + return this == FINISHED; + } + + public boolean isError() { + return this == ERROR; + } + + public boolean isAbort() { + return this == ABORT; + } } diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 50a5eb7f2e3..8a1d6e4d672 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -55,7 +55,8 @@ enum RemoteInterpreterEventType { ANGULAR_REGISTRY_PUSH = 11, APP_STATUS_UPDATE = 12, META_INFOS = 13, - REMOTE_ZEPPELIN_SERVER_RESOURCE = 14 + REMOTE_ZEPPELIN_SERVER_RESOURCE = 14, + REMOTE_ZEPPELIN_JOB_STATUS = 15 } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java index e3dc6b4c1b3..6a1eb36953f 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -182,4 +182,9 @@ public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorks public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception { } + + @Override + public void onGetParagraphJobStatus(String noteId, String paragraphId, RemoteWorksEventListener callback) { + + } } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index d7b2007e73c..cc022741ea7 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -357,4 +357,9 @@ public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorks public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception { } + + @Override + public void onGetParagraphJobStatus(String noteId, String paragraphId, RemoteWorksEventListener callback) { + + } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 1001961d23b..4a585e7458f 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -20,6 +20,7 @@ import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -47,6 +48,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResultMessage; import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -1609,7 +1611,7 @@ public void onGetParagraphRunners(String noteId, String paragraphId, if (notebookIns == null) { LOG.info("intepreter request notebook instance is null"); - callback.onFinished(notebookIns); + callback.onFinished(runner); } try { @@ -1633,6 +1635,37 @@ public void onGetParagraphRunners(String noteId, String paragraphId, } } + @Override + public void onGetParagraphJobStatus(String noteId, String paragraphId, + RemoteWorksEventListener callback) { + Notebook notebookIns = notebook(); + RemoteZeppelinJobStatus jobStatus = new RemoteZeppelinJobStatus(); + jobStatus.setNoteId(noteId); + jobStatus.setParagraphId(paragraphId); + jobStatus.setJobStatus(Status.ERROR); + jobStatus.setLastRunningTime(new Date()); + + if (notebookIns == null) { + LOG.info("intepreter request notebook instance is null"); + callback.onFinished(jobStatus); + } + + try { + Note note = notebookIns.getNote(noteId); + if (note != null) { + if (paragraphId != null) { + Paragraph paragraph = note.getParagraph(paragraphId); + jobStatus.setJobStatus(paragraph.getStatus()); + jobStatus.setLastRunningTime(paragraph.getDateStarted()); + } + } + callback.onFinished(jobStatus); + } catch (NullPointerException e) { + LOG.warn(e.getMessage()); + callback.onError(); + } + } + @Override public void onRemoteRunParagraph(String noteId, String paragraphId) throws Exception { Notebook notebookIns = notebook();