Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions bin/interpreter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function usage() {
echo "usage) $0 -p <port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>"
}

while getopts "hp:d:l:v:u:g:" o; do
while getopts "hc:p:d:l:v:u:g:" o; do
case ${o} in
h)
usage
Expand All @@ -32,8 +32,11 @@ while getopts "hp:d:l:v:u:g:" o; do
d)
INTERPRETER_DIR=${OPTARG}
;;
c)
CALLBACK_HOST=${OPTARG} # This will be used callback host
;;
p)
PORT=${OPTARG}
PORT=${OPTARG} # This will be used callback port
;;
l)
LOCAL_INTERPRETER_REPO=${OPTARG}
Expand Down Expand Up @@ -202,12 +205,12 @@ fi

if [[ -n "${SPARK_SUBMIT}" ]]; then
if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]]; then
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${PORT}`
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
else
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${PORT}`
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
fi
else
INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} `
INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} `
fi

if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then
Expand Down
6 changes: 6 additions & 0 deletions conf/zeppelin-site.xml.template
Original file line number Diff line number Diff line change
Expand Up @@ -430,4 +430,10 @@
<description>The HTTP X-XSS-Protection response header is a feature of Internet Explorer, Chrome and Safari that stops pages from loading when they detect reflected cross-site scripting (XSS) attacks. When value is set to 1 and a cross-site scripting attack is detected, the browser will sanitize the page (remove the unsafe parts).</description>
</property>
-->
<!--
<property>
<name>zeppelin.interpreter.callback.portRange</name>
<value>10000:10010</value>
</property>
-->
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public class ZeppelinDevServer extends

private DevInterpreter interpreter = null;
private InterpreterOutput out;
public ZeppelinDevServer(int port) throws TException {
super(port);
public ZeppelinDevServer(int port) throws TException, IOException {
super(null, port);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,55 @@

package org.apache.zeppelin.interpreter.remote;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.display.*;
import org.apache.zeppelin.helium.*;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.helium.Application;
import org.apache.zeppelin.helium.ApplicationContext;
import org.apache.zeppelin.helium.ApplicationException;
import org.apache.zeppelin.helium.ApplicationLoader;
import org.apache.zeppelin.helium.HeliumAppAngularObjectRegistry;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.interpreter.Constants;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterHookListener;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.*;
import org.apache.zeppelin.resource.*;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
import org.apache.zeppelin.resource.DistributedResourcePool;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.ResourceSet;
import org.apache.zeppelin.resource.WellKnownResourceName;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
Expand All @@ -49,8 +75,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Entry point for Interpreter process.
Expand All @@ -70,6 +110,9 @@ public class RemoteInterpreterServer
Gson gson = new Gson();

RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
private String callbackHost;
private int callbackPort;
private String host;
private int port;
private TThreadPoolServer server;

Expand All @@ -87,11 +130,34 @@ public class RemoteInterpreterServer
// Hold information for manual progress update
private ConcurrentMap<String, Integer> progressMap = new ConcurrentHashMap<>();

public RemoteInterpreterServer(int port) throws TTransportException {
this.port = port;
private boolean isTest;

public RemoteInterpreterServer(String callbackHost, int port) throws IOException,
TTransportException {
this(callbackHost, port, false);
}

public RemoteInterpreterServer(String callbackHost, int port, boolean isTest)
throws TTransportException, IOException {
if (null != callbackHost) {
this.callbackHost = callbackHost;
this.callbackPort = port;
} else {
// DevInterpreter
this.port = port;
}
this.isTest = isTest;

processor = new RemoteInterpreterService.Processor<>(this);
TServerSocket serverTransport = new TServerSocket(port);
TServerSocket serverTransport;
if (null == callbackHost) {
// Dev Interpreter
serverTransport = new TServerSocket(port);
} else {
this.port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
this.host = RemoteInterpreterUtils.findAvailableHostAddress();
serverTransport = new TServerSocket(this.port);
}
server = new TThreadPoolServer(
new TThreadPoolServer.Args(serverTransport).processor(processor));
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
Expand All @@ -100,6 +166,36 @@ public RemoteInterpreterServer(int port) throws TTransportException {

@Override
public void run() {
if (null != callbackHost && !isTest) {
new Thread(new Runnable() {
boolean interrupted = false;
@Override
public void run() {
while (!interrupted && !server.isServing()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
interrupted = true;
}
}

if (!interrupted) {
CallbackInfo callbackInfo = new CallbackInfo(host, port);
try {
RemoteInterpreterUtils
.registerInterpreter(callbackHost, callbackPort, callbackInfo);
} catch (TException e) {
logger.error("Error while registering interpreter: {}", callbackInfo, e);
try {
shutdown();
} catch (TException e1) {
logger.warn("Exception occurs while shutting down", e1);
}
}
}
}
}).start();
}
logger.info("Starting remote interpreter server on port {}", port);
server.serve();
}
Expand Down Expand Up @@ -151,13 +247,15 @@ public boolean isRunning() {


public static void main(String[] args)
throws TTransportException, InterruptedException {

throws TTransportException, InterruptedException, IOException {
String callbackHost = null;
int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
callbackHost = args[0];
port = Integer.parseInt(args[1]);
}
RemoteInterpreterServer remoteInterpreterServer = new RemoteInterpreterServer(port);
RemoteInterpreterServer remoteInterpreterServer =
new RemoteInterpreterServer(callbackHost, port);
remoteInterpreterServer.start();
remoteInterpreterServer.join();
System.exit(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,96 @@

package org.apache.zeppelin.interpreter.remote;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.ConnectException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Collections;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService;

/**
*
*/
public class RemoteInterpreterUtils {
static Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterUtils.class);


public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException {
int port;
try (ServerSocket socket = new ServerSocket(0);) {
port = socket.getLocalPort();
socket.close();
return findRandomAvailablePortOnAllLocalInterfaces(":");
}

/**
* start:end
*
* @param portRange
* @return
* @throws IOException
*/
public static int findRandomAvailablePortOnAllLocalInterfaces(String portRange)
throws IOException {

// ':' is the default value which means no constraints on the portRange
if (portRange == null || portRange.equals(":")) {
int port;
try (ServerSocket socket = new ServerSocket(0);) {
port = socket.getLocalPort();
socket.close();
}
return port;
}
return port;
// valid user registered port https://en.wikipedia.org/wiki/Registered_port
int start = 1024;
int end = 49151;
String[] ports = portRange.split(":", -1);
if (!ports[0].isEmpty()) {
start = Integer.parseInt(ports[0]);
}
if (!ports[1].isEmpty()) {
end = Integer.parseInt(ports[1]);
}
for (int i = start; i <= end; ++i) {
try {
ServerSocket socket = new ServerSocket(i);
return socket.getLocalPort();
} catch (Exception e) {
// ignore this
}
}
throw new IOException("No available port in the portRange: " + portRange);
}

public static String findAvailableHostAddress() throws UnknownHostException, SocketException {
InetAddress address = InetAddress.getLocalHost();
if (address.isLoopbackAddress()) {
for (NetworkInterface networkInterface : Collections
.list(NetworkInterface.getNetworkInterfaces())) {
if (!networkInterface.isLoopback()) {
for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses()) {
InetAddress a = interfaceAddress.getAddress();
if (a instanceof Inet4Address) {
return a.getHostAddress();
}
}
}
}
}
return address.getHostAddress();
}

public static boolean checkIfRemoteEndpointAccessible(String host, int port) {
Expand Down Expand Up @@ -80,4 +149,17 @@ public static boolean isEnvString(String key) {

return key.matches("^[A-Z_0-9]*");
}

public static void registerInterpreter(String callbackHost, int callbackPort,
final CallbackInfo callbackInfo) throws TException {
LOGGER.info("callbackHost: {}, callbackPort: {}, callbackInfo: {}", callbackHost, callbackPort,
callbackInfo);
try (TTransport transport = new TSocket(callbackHost, callbackPort)) {
transport.open();
TProtocol protocol = new TBinaryProtocol(transport);
RemoteInterpreterCallbackService.Client client = new RemoteInterpreterCallbackService.Client(
protocol);
client.callback(callbackInfo);
}
}
}
Loading