Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
529 changes: 529 additions & 0 deletions notebook/2C6AEZVY2/note.json

Large diffs are not rendered by default.

37 changes: 37 additions & 0 deletions spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
Expand Down Expand Up @@ -343,10 +344,21 @@ public void run(String noteId, String paragraphId, InterpreterContext context) {

}

/**
* Run Zeppelin Note by note id
* @param noteId
*/
@ZeppelinApi
public void runNote(String noteId) {
runNote(noteId, interpreterContext);
}

/**
* Run Zepppelin Note by note id
* @param noteId
* @param context
*/
@ZeppelinApi
public void runNote(String noteId, InterpreterContext context) {
String runningNoteId = context.getNoteId();
String runningParagraphId = context.getParagraphId();
Expand All @@ -364,6 +376,31 @@ public void runNote(String noteId, InterpreterContext context) {
}
}

/**
* get job status by zeppelin note id and paragraph id
* @param noteId
* @param paragraphId
* @return
*/
@ZeppelinApi
public RemoteZeppelinJobStatus getZeppelinJobStatus(String noteId, String paragraphId) {
return getZeppelinJobStatus(noteId, paragraphId, interpreterContext);
}

/**
* get job status by zeppelin note id and paragraph id
* @param noteId
* @param paragraphId
* @param context
* @return
*/
@ZeppelinApi
public RemoteZeppelinJobStatus getZeppelinJobStatus(
String noteId, String paragraphId, InterpreterContext context) {
RemoteWorksController remoteWorksController = context.getRemoteWorksController();
return remoteWorksController.getRemoteJobStatus(noteId, paragraphId);
}


/**
* get Zeppelin Paragraph Runner from zeppelin server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@
public interface RemoteWorksController {
List<InterpreterContextRunner> getRemoteContextRunner(String noteId);
List<InterpreterContextRunner> getRemoteContextRunner(String noteId, String paragraphId);
RemoteZeppelinJobStatus getRemoteJobStatus(String noteId, String paragraphId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.interpreter;

import org.apache.zeppelin.scheduler.Job.Status;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

/**
* Remote Zeppelin Server job status
*/
public class RemoteZeppelinJobStatus {
private String noteId;
private Status jobStatus;
private String paragraphId;
private Date lastRunningTime;

public String getNoteId() {
return noteId;
}

public void setNoteId(String noteId) {
this.noteId = noteId;
}

public Status getJobStatus() {
return jobStatus;
}

public void setJobStatus(Status jobStatus) {
this.jobStatus = jobStatus;
}

public void setJobStatus(String jobStatusString) {
this.jobStatus = Status.valueOf(jobStatusString);
}

public String getParagraphId() {
return paragraphId;
}

public void setParagraphId(String paragraphId) {
this.paragraphId = paragraphId;
}

public Date getLastRunningTime() {
return lastRunningTime;
}

public void setLastRunningTime(Date lastRunningTime) {
this.lastRunningTime = lastRunningTime;
}

public void setLastRunningTime(String lastRunningTimeString) {
DateFormat format = new SimpleDateFormat("MMMM d, yyyy", Locale.ENGLISH);
Date date = new Date();
try {
date = format.parse(lastRunningTimeString);
} catch (ParseException e) {

} finally {
this.lastRunningTime = date;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public class RemoteZeppelinServerResource {
* Resource Type for Zeppelin Server
*/
public enum Type{
PARAGRAPH_RUNNERS
PARAGRAPH_RUNNERS,
JOB_STATUS
}

private String ownerKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus;
import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
Expand Down Expand Up @@ -68,6 +69,20 @@ public void getZeppelinServerNoteRunner(
gson.toJson(eventBody)));
}

public void getZeppelinServerJobStatus(String eventOwnerKey, String noteId, String paragraphId) {
RemoteZeppelinServerResource eventBody = new RemoteZeppelinServerResource();
eventBody.setResourceType(RemoteZeppelinServerResource.Type.JOB_STATUS);
eventBody.setOwnerKey(eventOwnerKey);
RemoteZeppelinJobStatus jobStatus = new RemoteZeppelinJobStatus();
jobStatus.setNoteId(noteId);
jobStatus.setParagraphId(paragraphId);
eventBody.setData(jobStatus);

sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.REMOTE_ZEPPELIN_JOB_STATUS,
gson.toJson(eventBody)));
}

/**
* Run paragraph
* @param runner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.RemoteZeppelinJobStatus;
import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
Expand Down Expand Up @@ -231,6 +232,12 @@ public void run() {
progressRemoteZeppelinControlEvent(
reqResourceBody.getResourceType(), listener, reqResourceBody);

} else if (event.getType() == RemoteInterpreterEventType.REMOTE_ZEPPELIN_JOB_STATUS) {
RemoteZeppelinServerResource reqResourceBody = gson.fromJson(
event.getData(), RemoteZeppelinServerResource.class);
progressRemoteJobStatusControlEvent(
reqResourceBody.getResourceType(), listener, reqResourceBody);

} else if (event.getType() == RemoteInterpreterEventType.META_INFOS) {
Map<String, String> metaInfos = gson.fromJson(event.getData(),
new TypeToken<Map<String, String>>() {
Expand Down Expand Up @@ -326,6 +333,66 @@ public void onError() {
}
}

private void progressRemoteJobStatusControlEvent(
RemoteZeppelinServerResource.Type resourceType,
RemoteInterpreterProcessListener remoteWorksEventListener,
RemoteZeppelinServerResource reqResourceBody) throws Exception {
boolean broken = false;
final Gson gson = new Gson();
final String eventOwnerKey = reqResourceBody.getOwnerKey();
Client interpreterServerMain = null;
try {
interpreterServerMain = interpreterProcess.getClient();
final Client eventClient = interpreterServerMain;
if (resourceType == RemoteZeppelinServerResource.Type.JOB_STATUS) {
Map<String, Object> jobStatus = (Map<String, Object>) reqResourceBody.getData();

String noteId = (String) jobStatus.get("noteId");
String paragraphId = (String) jobStatus.get("paragraphId");

RemoteInterpreterProcessListener.RemoteWorksEventListener callBackEvent =
new RemoteInterpreterProcessListener.RemoteWorksEventListener() {

@Override
public void onFinished(Object resultObject) {
boolean clientBroken = false;
if (resultObject != null && resultObject instanceof RemoteZeppelinJobStatus) {

RemoteZeppelinServerResource resResource = new RemoteZeppelinServerResource();
resResource.setOwnerKey(eventOwnerKey);
resResource.setResourceType(RemoteZeppelinServerResource.Type.JOB_STATUS);
resResource.setData(resultObject);

try {
eventClient.onReceivedZeppelinResource(gson.toJson(resResource));
} catch (Exception e) {
clientBroken = true;
logger.error("Can't get Remote Job Status Event", e);
waitQuietly();
} finally {
interpreterProcess.releaseClient(eventClient, clientBroken);
}
}
}

@Override
public void onError() {
logger.info("onGetParagraphRunners onError");
}
};

remoteWorksEventListener.onGetParagraphJobStatus(noteId, paragraphId, callBackEvent);
}
} catch (Exception e) {
broken = true;
logger.error("Can't get RemoteInterpreter Job Status Event", e);
waitQuietly();

} finally {
interpreterProcess.releaseClient(interpreterServerMain, broken);
}
}

private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) {
Client client = null;
boolean broken = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public void onOutputUpdated(
public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception;
public void onGetParagraphRunners(
String noteId, String paragraphId, RemoteWorksEventListener callback);
public void onGetParagraphJobStatus(
String noteId, String paragraphId, RemoteWorksEventListener callback);

/**
* Remote works for Interpreter callback listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
Expand Down Expand Up @@ -372,6 +373,21 @@ public void onReceivedZeppelinResource(String responseJson) throws TException {
response.getOwnerKey(),
intpContextRunners);
}
} else if (response.getResourceType() == RemoteZeppelinServerResource.Type.JOB_STATUS) {

Map<String, Object> jobStatusMap = (Map<String, Object>) response.getData();

RemoteZeppelinJobStatus jobStatus = new RemoteZeppelinJobStatus();
jobStatus.setNoteId((String) jobStatusMap.get("noteId"));
jobStatus.setParagraphId((String) jobStatusMap.get("paragraphId"));
jobStatus.setJobStatus((String) jobStatusMap.get("jobStatus"));
jobStatus.setLastRunningTime((String) jobStatusMap.get("lastRunningTime"));

synchronized (this.remoteWorksResponsePool) {
this.remoteWorksResponsePool.put(
response.getOwnerKey(),
jobStatus);
}
}
} catch (Exception e) {
throw e;
Expand Down Expand Up @@ -718,7 +734,26 @@ public List<InterpreterContextRunner> getRemoteContextRunner(
return runners;
}

@Override
public RemoteZeppelinJobStatus getRemoteJobStatus(String noteId, String paragraphId) {
RemoteZeppelinJobStatus jobStatus = null;
String ownerKey = generateOwnerKey();
if (StringUtils.isBlank(noteId) || StringUtils.isBlank(paragraphId)) {
return null;
}
server.eventClient.getZeppelinServerJobStatus(ownerKey, noteId, paragraphId);

try {
this.waitForEvent(ownerKey);
} catch (Exception e) {
return null;
}
synchronized (this.remoteWorksResponsePool) {
jobStatus = (RemoteZeppelinJobStatus) this.remoteWorksResponsePool.get(ownerKey);
this.remoteWorksResponsePool.remove(ownerKey);
}
return jobStatus;
}
}

private RemoteInterpreterResult convert(InterpreterResult result,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-12-30")
public class InterpreterCompletion implements org.apache.thrift.TBase<InterpreterCompletion, InterpreterCompletion._Fields>, java.io.Serializable, Cloneable, Comparable<InterpreterCompletion> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-12-30")
public class RemoteApplicationResult implements org.apache.thrift.TBase<RemoteApplicationResult, RemoteApplicationResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteApplicationResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-12-30")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-12-30")
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");

Expand Down
Loading