Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
36f31fc
Creation of zeppelin-interpreter submodule
Leemoonsoo Feb 16, 2015
8b5991d
structure for remote interpreter using thrift
Leemoonsoo Feb 17, 2015
83be346
some interpreter method call implementation
Leemoonsoo Feb 17, 2015
6c8f30b
Remote interpreter implementation
Leemoonsoo Feb 23, 2015
abc78ef
kill interpreter process when zeppelin-daemon stops
Leemoonsoo Feb 23, 2015
94eb55b
client connection pool
Leemoonsoo Feb 23, 2015
fae3317
Handle process termination
Leemoonsoo Feb 26, 2015
cc409e8
Use className as instance id
Leemoonsoo Feb 26, 2015
2235c18
Remove unused method
Leemoonsoo Feb 26, 2015
422a550
Run job scheduler on remote process
Leemoonsoo Feb 27, 2015
79793e0
graceful shutdown inside of ide (eclipse)
Leemoonsoo Feb 27, 2015
36ca9ad
Handle output type from remote interpreter process
Leemoonsoo Feb 27, 2015
ffc35c7
take care of dynamic form
Leemoonsoo Feb 28, 2015
07422aa
setting gui for remote process
Leemoonsoo Feb 28, 2015
196c1d5
Merge branch 'master' into new/separate_process_interpreter
Leemoonsoo Feb 28, 2015
9a43ee0
Edit remote process mode setting
Leemoonsoo Feb 28, 2015
d7b7289
Env variable for interpreter process
Leemoonsoo Feb 28, 2015
8ac0074
Fix for checkstyle
Leemoonsoo Mar 1, 2015
c546abc
Refactoring around InterpreterOption
Leemoonsoo Mar 2, 2015
058eb45
Fix dep interpreter
Leemoonsoo Mar 2, 2015
ccf6f18
Aware of remote mode when interpreter reference each other
Leemoonsoo Mar 2, 2015
9a083d0
Initialize interpreters in the same group at once so they can referen…
Leemoonsoo Mar 4, 2015
32b6333
Add some test for remote interpreter
Leemoonsoo Mar 7, 2015
d4ead64
Make fork mode default
Leemoonsoo Mar 7, 2015
835bbc9
User friendly log file name for interpreter process
Leemoonsoo Mar 7, 2015
2250a5c
stop zeppelin server first, and then force interpreter process stop, …
Leemoonsoo Mar 7, 2015
927a5e2
Merge branch 'master' into new/separate_process_interpreter
Leemoonsoo Mar 8, 2015
c587c1e
Get SparkInterpreter's scheduler from SparkSqlInterpreter without ope…
Leemoonsoo Mar 8, 2015
7dade29
Move scheduler test from zeppelin-zengine to zeppelin-interpreter
Leemoonsoo Mar 8, 2015
47de680
Pass classloader to scala compiler in remote process
Leemoonsoo Mar 9, 2015
3490f2e
Prevent remote interpreter open before it's been created
Leemoonsoo Mar 9, 2015
7d3f55d
Implement RemoteScheduler
Leemoonsoo Mar 11, 2015
f3c77e8
Fix autocompletion
Leemoonsoo Mar 11, 2015
39e1217
Add some doc
Leemoonsoo Mar 12, 2015
78691e8
Remove fork option
Leemoonsoo Mar 13, 2015
8aa26e8
Fix code style
Leemoonsoo Mar 13, 2015
a3df56a
Take ZEPPELIN_JAVA_OPTS as default value of ZEPPELIN_INTP_JAVA_OPTS
Leemoonsoo Mar 13, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion bin/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ function addJarInDir(){

addJarInDir "${ZEPPELIN_HOME}"
addJarInDir "${ZEPPELIN_HOME}/lib"
addJarInDir "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"
addJarInDir "${ZEPPELIN_HOME}/zeppelin-zengine/target/lib"
addJarInDir "${ZEPPELIN_HOME}/zeppelin-server/target/lib"
addJarInDir "${ZEPPELIN_HOME}/zeppelin-web/target/lib"
Expand Down Expand Up @@ -118,9 +119,22 @@ if [[ -z "$ZEPPELIN_MEM" ]]; then
export ZEPPELIN_MEM="-Xmx1024m -XX:MaxPermSize=512m"
fi

JAVA_OPTS+="${ZEPPELIN_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING} ${ZEPPELIN_MEM}"
JAVA_OPTS+=" ${ZEPPELIN_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING} ${ZEPPELIN_MEM}"
export JAVA_OPTS

# jvm options for interpreter process
if [[ -z "${ZEPPELIN_INTP_JAVA_OPTS}" ]]; then
export ZEPPELIN_INTP_JAVA_OPTS="${ZEPPELIN_JAVA_OPTS}"
fi

if [[ -z "${ZEPPELIN_INTP_MEM}" ]]; then
export ZEPPELIN_INTP_MEM="${ZEPPELIN_MEM}"
fi

JAVA_INTP_OPTS+=" ${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING} ${ZEPPELIN_INTP_MEM}"
export JAVA_INTP_OPTS


if [[ -n "${JAVA_HOME}" ]]; then
ZEPPELIN_RUNNER="${JAVA_HOME}/bin/java"
else
Expand Down
85 changes: 85 additions & 0 deletions bin/interpreter.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/bin/bash

bin=$(dirname "${BASH_SOURCE-$0}")
bin=$(cd "${bin}">/dev/null; pwd)


function usage() {
echo "usage) $0 -p <port> -d <directory to load>"
}

while getopts "hp:d:" o; do
case ${o} in
h)
usage
exit 0
;;
d)
INTERPRETER_DIR=${OPTARG}
;;
p)
PORT=${OPTARG}
;;
esac
done


if [ -z "${PORT}" ] || [ -z "${INTERPRETER_DIR}" ]; then
usage
exit 1
fi

. "${bin}/common.sh"

ZEPPELIN_CLASSPATH+=":${ZEPPELIN_CONF_DIR}"

addJarInDir "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"
addJarInDir "${INTERPRETER_DIR}"

export CLASSPATH+=":${ZEPPELIN_CLASSPATH}"

HOSTNAME=$(hostname)
ZEPPELIN_SERVER=com.nflabs.zeppelin.interpreter.remote.RemoteInterpreterServer

INTERPRETER_ID=$(basename "${INTERPRETER_DIR}")
ZEPPELIN_PID="${ZEPPELIN_PID_DIR}/zeppelin-interpreter-${INTERPRETER_ID}-${ZEPPELIN_IDENT_STRING}-${HOSTNAME}.pid"
ZEPPELIN_LOGFILE="${ZEPPELIN_LOG_DIR}/zeppelin-interpreter-${INTERPRETER_ID}-${ZEPPELIN_IDENT_STRING}-${HOSTNAME}.log"
JAVA_INTP_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}"

if [[ ! -d "${ZEPPELIN_LOG_DIR}" ]]; then
echo "Log dir doesn't exist, create ${ZEPPELIN_LOG_DIR}"
$(mkdir -p "${ZEPPELIN_LOG_DIR}")
fi



${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} -cp ${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} &
pid=$!
if [[ -z "${pid}" ]]; then
return 1;
else
echo ${pid} > ${ZEPPELIN_PID}
fi


trap 'shutdown_hook;' SIGTERM SIGINT SIGQUIT
function shutdown_hook() {
local count
count=0
while [[ "${count}" -lt 10 ]]; do
$(kill ${pid} > /dev/null 2> /dev/null)
if kill -0 ${pid} > /dev/null 2>&1; then
sleep 3
let "count+=1"
else
rm -f "${ZEPPELIN_PID}"
break
fi
if [[ "${count}" == "5" ]]; then
$(kill -9 ${pid} > /dev/null 2> /dev/null)
rm -f "${ZEPPELIN_PID}"
fi
done
}

wait
30 changes: 22 additions & 8 deletions bin/zeppelin-daemon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,32 @@ function start() {

function stop() {
local pid

# zeppelin daemon kill
if [[ ! -f "${ZEPPELIN_PID}" ]]; then
echo "${ZEPPELIN_NAME} is not running"
return 0
fi
pid=$(cat ${ZEPPELIN_PID})
if [[ -z "${pid}" ]]; then
echo "${ZEPPELIN_NAME} is not running"
else
wait_for_zeppelin_to_die $pid
$(rm -f ${ZEPPELIN_PID})
action_msg "${ZEPPELIN_NAME} stop" "${SET_OK}"
pid=$(cat ${ZEPPELIN_PID})
if [[ -z "${pid}" ]]; then
echo "${ZEPPELIN_NAME} is not running"
else
wait_for_zeppelin_to_die $pid
$(rm -f ${ZEPPELIN_PID})
action_msg "${ZEPPELIN_NAME} stop" "${SET_OK}"
fi
fi

# list all pid that used in remote interpreter and kill them
for f in ${ZEPPELIN_PID_DIR}/*.pid; do
if [[ ! -f ${f} ]]; then
continue;
fi

pid=$(cat ${f})
wait_for_zeppelin_to_die $pid
$(rm -f ${f})
done

}

function find_zeppelin_process() {
Expand Down
2 changes: 2 additions & 0 deletions conf/zeppelin-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# export MASTER= # Spark master url. eg. spark://master_addr:7077. Leave empty if you want to use local mode
# export ZEPPELIN_JAVA_OPTS # Additional jvm options. for example, export ZEPPELIN_JAVA_OPTS="-Dspark.executor.memory=8g -Dspark.cores.max=16"
# export ZEPPELIN_MEM # Zeppelin jvm mem options Default -Xmx1024m -XX:MaxPermSize=512m
# export ZEPPELIN_INTP_MEM # zeppelin interpreter process jvm mem options. Defualt = ZEPPELIN_MEM
# export ZEPPELIN_INTP_JAVA_OPTS # zeppelin interpreter process jvm options. Default = ZEPPELIN_JAVA_OPTS

# export ZEPPELIN_CONF_DIR # Alternate zeppelin conf dir. Default is ${ZEPPELIN_HOME}/conf.
# export ZEPPELIN_LOG_DIR # Where log files are stored. PWD by default.
Expand Down
2 changes: 1 addition & 1 deletion markdown/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-zengine</artifactId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ public void open() {
@Override
public void close() {}

@Override
public Object getValue(String name) {
return null;
}

@Override
public InterpreterResult interpret(String st, InterpreterContext interpreterContext) {
String html;
Expand All @@ -58,9 +53,6 @@ public InterpreterResult interpret(String st, InterpreterContext interpreterCont
@Override
public void cancel(InterpreterContext context) {}

@Override
public void bindValue(String name, Object o) {}

@Override
public FormType getFormType() {
return FormType.SIMPLE;
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
<inceptionYear>2013</inceptionYear>

<modules>
<module>zeppelin-interpreter</module>
<module>zeppelin-zengine</module>
<module>spark</module>
<module>markdown</module>
Expand Down Expand Up @@ -878,6 +879,7 @@
</goals>
<configuration>
<failOnViolation>true</failOnViolation>
<excludes>com/nflabs/zeppelin/interpreter/thrift/*</excludes>
</configuration>
</execution>
</executions>
Expand Down
2 changes: 1 addition & 1 deletion shell/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-zengine</artifactId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ public void open() {}
@Override
public void close() {}

@Override
public Object getValue(String name) {
return null;
}

@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
Expand Down Expand Up @@ -77,9 +73,6 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr
@Override
public void cancel(InterpreterContext context) {}

@Override
public void bindValue(String name, Object o) {}

@Override
public FormType getFormType() {
return FormType.SIMPLE;
Expand Down
5 changes: 3 additions & 2 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-zengine</artifactId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
Expand Down
30 changes: 13 additions & 17 deletions spark/src/main/java/com/nflabs/zeppelin/spark/DepInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class DepInterpreter extends Interpreter {
"spark",
DepInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add("zeppelin.dep.localrepo", "local-repo", "local repository for dependency loader")
.build());

}
Expand Down Expand Up @@ -129,7 +130,7 @@ private void createIMain() {
intp.setContextClassLoader();
intp.initializeSynchronous();

depc = new DependencyContext();
depc = new DependencyContext(getProperty("zeppelin.dep.localrepo"));
completor = new SparkJLineCompletion(intp);

intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
Expand All @@ -141,7 +142,6 @@ private void createIMain() {

}

@Override
public Object getValue(String name) {
Object ret = intp.valueOfTerm(name);
if (ret instanceof None) {
Expand All @@ -160,11 +160,8 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
out.reset();

SparkInterpreter sparkInterpreter = getSparkInterpreter();
if (sparkInterpreter == null) {
return new InterpreterResult(Code.ERROR,
"Must be used with SparkInterpreter");
}
if (sparkInterpreter.isSparkContextInitialized()) {

if (sparkInterpreter != null && sparkInterpreter.isSparkContextInitialized()) {
return new InterpreterResult(Code.ERROR,
"Must be used before SparkInterpreter (%spark) initialized");
}
Expand Down Expand Up @@ -202,13 +199,10 @@ private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
public void cancel(InterpreterContext context) {
}

@Override
public void bindValue(String name, Object o) {
}

@Override
public FormType getFormType() {
return null;
return FormType.NATIVE;
}

@Override
Expand Down Expand Up @@ -257,13 +251,15 @@ private SparkInterpreter getSparkInterpreter() {
if (intpGroup == null) {
return null;
}
for (Interpreter intp : intpGroup){
if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
synchronized (intpGroup) {
for (Interpreter intp : intpGroup){
if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
return (SparkInterpreter) p;
}
return (SparkInterpreter) p;
}
}
return null;
Expand Down
Loading