Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ public void appendOutput(String message) throws IOException {
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
sparkInterpreter.populateSparkWebUrl(context);
if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) {
return new InterpreterResult(Code.ERROR, "Spark "
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.base.Joiner;
Expand All @@ -45,6 +43,7 @@
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Pool;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.ui.SparkUI;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
Expand Down Expand Up @@ -112,6 +111,7 @@ public class SparkInterpreter extends Interpreter {

private SparkOutputStream out;
private SparkDependencyResolver dep;
private String sparkUrl;

/**
* completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
Expand Down Expand Up @@ -938,6 +938,13 @@ public void open() {
numReferenceOfSparkContext.incrementAndGet();
}

private String getSparkUIUrl() {
Option<SparkUI> sparkUiOption = (Option<SparkUI>) Utils.invokeMethod(sc, "ui");
SparkUI sparkUi = sparkUiOption.get();
String sparkWebUrl = sparkUi.appUIAddress();
return sparkWebUrl;
}

private Results.Result interpret(String line) {
return (Results.Result) Utils.invokeMethod(
intp,
Expand All @@ -946,6 +953,20 @@ private Results.Result interpret(String line) {
new Object[] {line});
}

public void populateSparkWebUrl(InterpreterContext ctx) {
if (sparkUrl == null) {
sparkUrl = getSparkUIUrl();
Map<String, String> infos = new java.util.HashMap<>();
if (sparkUrl != null) {
infos.put("url", sparkUrl);
logger.info("Sending metainfos to Zeppelin server: {}", infos.toString());
if (ctx != null && ctx.getClient() != null) {
ctx.getClient().onMetaInfosReceived(infos);
}
}
}
}

private List<File> currentClassPath() {
List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
Expand Down Expand Up @@ -1085,7 +1106,7 @@ public InterpreterResult interpret(String line, InterpreterContext context) {
return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString()
+ " is not supported");
}

populateSparkWebUrl(context);
z.setInterpreterContext(context);
if (line == null || line.trim().length() == 0) {
return new InterpreterResult(Code.SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void open() {
@Override
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) {

getSparkInterpreter().populateSparkWebUrl(interpreterContext);
String imageWidth = getProperty("zeppelin.R.image.width");

String[] sl = lines.split("\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");
}

sparkInterpreter.populateSparkWebUrl(context);
sqlc = getSparkInterpreter().getSQLContext();
SparkContext sc = sqlc.sparkContext();
if (concurrentSQL()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import java.util.List;
import java.util.Map;

import org.apache.zeppelin.annotation.ZeppelinApi;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.resource.ResourcePool;

/**
Expand Down Expand Up @@ -57,6 +59,7 @@ public static void remove() {
private ResourcePool resourcePool;
private List<InterpreterContextRunner> runners;
private String className;
private RemoteEventClientWrapper client;

public InterpreterContext(String noteId,
String paragraphId,
Expand All @@ -83,6 +86,22 @@ public InterpreterContext(String noteId,
this.out = out;
}

public InterpreterContext(String noteId,
String paragraphId,
String paragraphTitle,
String paragraphText,
AuthenticationInfo authenticationInfo,
Map<String, Object> config,
GUI gui,
AngularObjectRegistry angularObjectRegistry,
ResourcePool resourcePool,
List<InterpreterContextRunner> contextRunners,
InterpreterOutput output,
RemoteInterpreterEventClient eventClient) {
this(noteId, paragraphId, paragraphTitle, paragraphText, authenticationInfo, config, gui,
angularObjectRegistry, resourcePool, contextRunners, output);
this.client = new RemoteEventClient(eventClient);
}

public String getNoteId() {
return noteId;
Expand Down Expand Up @@ -131,4 +150,8 @@ public String getClassName() {
public void setClassName(String className) {
this.className = className;
}

public RemoteEventClientWrapper getClient() {
return client;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.apache.zeppelin.interpreter.remote;

import java.util.Map;

/**
*
* Wrapper arnd RemoteInterpreterEventClient
* to expose methods in the client
*
*/
public class RemoteEventClient implements RemoteEventClientWrapper {

private RemoteInterpreterEventClient client;

public RemoteEventClient(RemoteInterpreterEventClient client) {
this.client = client;
}

@Override
public void onMetaInfosReceived(Map<String, String> infos) {
client.onMetaInfosReceived(infos);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.zeppelin.interpreter.remote;

import java.util.Map;

/**
*
* Wrapper interface for RemoterInterpreterEventClient
* to expose only required methods from EventClient
*
*/
public interface RemoteEventClientWrapper {

public void onMetaInfosReceived(Map<String, String> infos);

}
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@ public void onAppStatusUpdate(String noteId, String paragraphId, String appId, S
gson.toJson(appendOutput)));
}

public void onMetaInfosReceived(Map<String, String> infos) {
sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.META_INFOS,
gson.toJson(infos)));
}

/**
* Wait for eventQueue becomes empty
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ public void run() {
String status = appStatusUpdate.get("status");

appListener.onStatusChange(noteId, paragraphId, appId, status);
} else if (event.getType() == RemoteInterpreterEventType.META_INFOS) {
Map<String, String> metaInfos = gson.fromJson(event.getData(),
new TypeToken<Map<String, String>>() {
}.getType());
String id = interpreterGroup.getId();
int indexOfColon = id.indexOf(":");
String settingId = id.substring(0, indexOfColon);
listener.onMetaInfosReceived(settingId, metaInfos);
}
logger.debug("Event from remoteproceess {}", event.getType());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
*/
package org.apache.zeppelin.interpreter.remote;

import java.util.Map;

/**
* Event from remoteInterpreterProcess
*/
public interface RemoteInterpreterProcessListener {
public void onOutputAppend(String noteId, String paragraphId, String output);
public void onOutputUpdated(String noteId, String paragraphId, String output);
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos);
}
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ private InterpreterContext convert(RemoteInterpreterContext ric, InterpreterOutp
gson.fromJson(ric.getGui(), GUI.class),
interpreterGroup.getAngularObjectRegistry(),
interpreterGroup.getResourcePool(),
contextRunners, output);
contextRunners, output, eventClient);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
package org.apache.zeppelin.interpreter.thrift;


import java.util.Map;
import java.util.HashMap;
import org.apache.thrift.TEnum;

public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
NO_OP(1),
Expand All @@ -39,7 +36,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
OUTPUT_APPEND(8),
OUTPUT_UPDATE(9),
ANGULAR_REGISTRY_PUSH(10),
APP_STATUS_UPDATE(11);
APP_STATUS_UPDATE(11),
META_INFOS(12);

private final int value;

Expand Down Expand Up @@ -82,6 +80,8 @@ public static RemoteInterpreterEventType findByValue(int value) {
return ANGULAR_REGISTRY_PUSH;
case 11:
return APP_STATUS_UPDATE;
case 12:
return META_INFOS;
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.File;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -154,4 +155,9 @@ public void onOutputAppend(String noteId, String paragraphId, String output) {
public void onOutputUpdated(String noteId, String paragraphId, String output) {

}

@Override
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,9 @@ public void onOutputAppend(String noteId, String paragraphId, String output) {
public void onOutputUpdated(String noteId, String paragraphId, String output) {

}

@Override
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@
package org.apache.zeppelin.rest;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

Expand Down Expand Up @@ -215,6 +219,31 @@ public Response addRepository(String message) {
return new JsonResponse(Status.CREATED).build();
}

/**
* get the metainfo property value
*/
@GET
@Path("getmetainfos/{settingId}")
public Response getMetaInfo(@Context HttpServletRequest req,
@PathParam("settingId") String settingId) {
String propName = req.getParameter("propName");
if (propName == null) {
return new JsonResponse<>(Status.BAD_REQUEST).build();
}
String propValue = null;
InterpreterSetting interpreterSetting = interpreterFactory.get(settingId);
Map<String, String> infos = interpreterSetting.getInfos();
if (infos != null) {
propValue = infos.get(propName);
}
Map<String, String> respMap = new HashMap<>();
respMap.put(propName, propValue);
logger.debug("Get meta info");
logger.debug("Interpretersetting Id: {}, property Name:{}, property value: {}", settingId,
propName, propValue);
return new JsonResponse<>(Status.OK, respMap).build();
}

/**
* Delete repository
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1759,5 +1759,12 @@ private void getInterpreterSettings(NotebookSocket conn, AuthenticationInfo subj
.put("interpreterSettings", availableSettings)));
}

@Override
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
InterpreterSetting interpreterSetting = notebook().getInterpreterFactory()
.get(settingId);
interpreterSetting.setInfos(metaInfos);
}

}

16 changes: 16 additions & 0 deletions zeppelin-web/src/app/interpreter/interpreter.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,22 @@
getRepositories();
};

$scope.showSparkUI = function(settingId) {
$http.get(baseUrlSrv.getRestApiBase() + '/interpreter/getmetainfos/' + settingId + '?propName=url')
.success(function(data, status, headers, config) {
var url = data.body.url;
if (!url) {
BootstrapDialog.alert({
message: 'No spark application running'
});
return;
}
window.open(url, '_blank');
}).error(function(data, status, headers, config) {
console.log('Error %o %o', status, data.message);
});
};

init();
}

Expand Down
4 changes: 4 additions & 0 deletions zeppelin-web/src/app/interpreter/interpreter.html
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ <h3 class="interpreter-title">{{setting.name}}

</h3>
<span style="float:right" ng-show="!valueform.$visible" >
<button class="btn btn-default btn-xs"
ng-click="showSparkUI(setting.id)"
ng-show="setting.group == 'spark'">
<span class="fa fa-external-link"></span> spark ui</button>
<button class="btn btn-default btn-xs"
ng-click="valueform.$show();
copyOriginInterpreterSettingProperties(setting.id)">
Expand Down
Loading