Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions bin/interpreter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function usage() {
echo "usage) $0 -p <port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>"
}

while getopts "hp:d:l:v:u:g:" o; do
while getopts "hc:p:d:l:v:u:g:" o; do
case ${o} in
h)
usage
Expand All @@ -32,8 +32,11 @@ while getopts "hp:d:l:v:u:g:" o; do
d)
INTERPRETER_DIR=${OPTARG}
;;
c)
CALLBACK_HOST=${OPTARG} # This will be used callback host
;;
p)
PORT=${OPTARG}
PORT=${OPTARG} # This will be used callback port
;;
l)
LOCAL_INTERPRETER_REPO=${OPTARG}
Expand Down Expand Up @@ -202,12 +205,12 @@ fi

if [[ -n "${SPARK_SUBMIT}" ]]; then
if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]]; then
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${PORT}`
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
else
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${PORT}`
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
fi
else
INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} `
INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} `
fi

if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public class ZeppelinDevServer extends

private DevInterpreter interpreter = null;
private InterpreterOutput out;
public ZeppelinDevServer(int port) throws TException {
super(port);
public ZeppelinDevServer(int port) throws TException, IOException {
super(null, port);
}

@Override
Expand Down
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
<commons.collections.version>3.2.1</commons.collections.version>
<commons.logging.version>1.1.1</commons.logging.version>
<shiro.version>1.2.3</shiro.version>
<netty.version>4.1.12.Final</netty.version>

<!-- test library versions -->
<junit.version>4.12</junit.version>
Expand All @@ -132,6 +133,8 @@
<plugin.download.version>1.3.0</plugin.download.version>
<plugin.deploy.version>2.8.2</plugin.deploy.version>

<zeppelin.shade.package>zeppelin-package</zeppelin.shade.package>

<PermGen>64m</PermGen>
<MaxPermGen>512m</MaxPermGen>

Expand Down Expand Up @@ -254,6 +257,11 @@
<artifactId>shiro-config-core</artifactId>
<version>${shiro.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>

<!-- Test libraries -->
<dependency>
Expand Down
64 changes: 64 additions & 0 deletions zeppelin-interpreter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand All @@ -231,4 +236,63 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>io.netty:netty-all</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>${zeppelin.shade.package}.io.netty</shadedPattern>
<includes>
<include>io.netty.**</include>
</includes>
</relocation>
</relocations>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>process-test-resources</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<excludeGroupIds>io.netty</excludeGroupIds>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,77 @@

package org.apache.zeppelin.interpreter.remote;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.*;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.display.*;
import org.apache.zeppelin.helium.*;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.helium.Application;
import org.apache.zeppelin.helium.ApplicationContext;
import org.apache.zeppelin.helium.ApplicationException;
import org.apache.zeppelin.helium.ApplicationLoader;
import org.apache.zeppelin.helium.HeliumAppAngularObjectRegistry;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.interpreter.Constants;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterHookListener;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.*;
import org.apache.zeppelin.resource.*;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
import org.apache.zeppelin.resource.DistributedResourcePool;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.ResourceSet;
import org.apache.zeppelin.resource.WellKnownResourceName;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.JobProgressPoller;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

/**
* Entry point for Interpreter process.
Expand All @@ -68,6 +107,9 @@ public class RemoteInterpreterServer
Gson gson = new Gson();

RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
private String callbackHost;
private int callbackPort;
private String host;
private int port;
private TThreadPoolServer server;

Expand All @@ -82,11 +124,26 @@ public class RemoteInterpreterServer

private final long DEFAULT_SHUTDOWN_TIMEOUT = 2000;

public RemoteInterpreterServer(int port) throws TTransportException {
this.port = port;
public RemoteInterpreterServer(String callbackHost, int port)
throws TTransportException, IOException {
if (null != callbackHost) {
this.callbackHost = callbackHost;
this.callbackPort = port;
} else {
// DevInterpreter
this.port = port;
}

processor = new RemoteInterpreterService.Processor<>(this);
TServerSocket serverTransport = new TServerSocket(port);
TServerSocket serverTransport;
if (null == callbackHost) {
// Dev Interpreter
serverTransport = new TServerSocket(port);
} else {
this.port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
this.host = RemoteInterpreterUtils.findAvailableHostname();
serverTransport = new TServerSocket(this.port);
}
server = new TThreadPoolServer(
new TThreadPoolServer.Args(serverTransport).processor(processor));
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
Expand All @@ -95,6 +152,26 @@ public RemoteInterpreterServer(int port) throws TTransportException {

@Override
public void run() {
if (null != callbackHost) {
new Thread(new Runnable() {
boolean interrupted = false;
@Override
public void run() {
while (!interrupted && !server.isServing()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
interrupted = true;
}
}

if (!interrupted) {
RemoteInterpreterUtils
.registerInterpreter(callbackHost, callbackPort, host + ":" + port);
}
}
}).start();
}
logger.info("Starting remote interpreter server on port {}", port);
server.serve();
}
Expand Down Expand Up @@ -141,13 +218,15 @@ public boolean isRunning() {


public static void main(String[] args)
throws TTransportException, InterruptedException {

throws TTransportException, InterruptedException, IOException {
String callbackHost = null;
int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
callbackHost = args[0];
port = Integer.parseInt(args[1]);
}
RemoteInterpreterServer remoteInterpreterServer = new RemoteInterpreterServer(port);
RemoteInterpreterServer remoteInterpreterServer =
new RemoteInterpreterServer(callbackHost, port);
remoteInterpreterServer.start();
remoteInterpreterServer.join();
System.exit(0);
Expand Down
Loading