diff --git a/.travis.yml b/.travis.yml index e1b3c5dd7e1..a1b467d9971 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,34 +22,34 @@ before_install: - "sh -e /etc/init.d/xvfb start" install: - - /bin/bash ./dev/travis/travis-install.sh `pwd` mvn package -DskipTests -Phadoop-2.3 -Ppyspark -B + - mvn package -DskipTests -Phadoop-2.3 -Ppyspark -B before_script: - script: # spark 1.4 - - /bin/bash ./dev/travis/travis-install.sh `pwd` mvn package -Pbuild-distr -Phadoop-2.3 -Ppyspark -B + - mvn package -Pbuild-distr -Phadoop-2.3 -Ppyspark -B - ./testing/startSparkCluster.sh 1.4.0 2.3 - - /bin/bash ./dev/travis/travis-install.sh `pwd` mvn verify -Pusing-packaged-distr -Phadoop-2.3 -Ppyspark -B + - mvn verify -Pusing-packaged-distr -Phadoop-2.3 -Ppyspark -B - ./testing/stopSparkCluster.sh 1.4.0 2.3 # spark 1.3 - - /bin/bash ./dev/travis/travis-install.sh `pwd` mvn clean package -DskipTests -Pspark-1.3 -Phadoop-2.3 -B -pl 'zeppelin-interpreter,spark' - - /bin/bash ./dev/travis/travis-install.sh `pwd` mvn package -Pbuild-distr -Pspark-1.3 -Phadoop-2.3 -B + - rm -rf `pwd`/interpreter/spark + - mvn package -DskipTests -Pspark-1.3 -Phadoop-2.3 -Ppyspark -B -pl 'zeppelin-interpreter,spark-dependencies,spark' - ./testing/startSparkCluster.sh 1.3.1 2.3 - - mvn verify -Pspark-1.3 -Phadoop-2.3 -B -pl 'zeppelin-interpreter,spark' + - mvn package -Pspark-1.3 -Phadoop-2.3 -B -pl 'zeppelin-interpreter,zeppelin-zengine,zeppelin-server' -Dtest=org.apache.zeppelin.rest.*Test -DfailIfNoTests=false - ./testing/stopSparkCluster.sh 1.3.1 2.3 -# spark 1.2 - - /bin/bash ./dev/travis/travis-install.sh `pwd` mvn clean package -DskipTests -Pspark-1.2 -Phadoop-2.3 -B -pl 'zeppelin-interpreter,spark' - - /bin/bash ./dev/travis/travis-install.sh `pwd` mvn package -Pbuild-distr -Pspark-1.2 -Phadoop-2.3 -B + # spark 1.2 + - rm -rf `pwd`/interpreter/spark + - mvn package -Pspark-1.2 -Phadoop-2.3 -Ppyspark -B -pl 'zeppelin-interpreter,spark-dependencies,spark' - ./testing/startSparkCluster.sh 1.2.1 2.3 - - mvn verify -Pspark-1.2 -Phadoop-2.3 -B -pl 'zeppelin-interpreter,spark' + - mvn package -Pspark-1.2 -Phadoop-2.3 -B -pl 'zeppelin-interpreter,zeppelin-zengine,zeppelin-server' -Dtest=org.apache.zeppelin.rest.*Test -DfailIfNoTests=false - ./testing/stopSparkCluster.sh 1.2.1 2.3 # spark 1.1 - - /bin/bash ./dev/travis/travis-install.sh `pwd` mvn clean package -DskipTests -Pspark-1.1 -Phadoop-2.3 -B -pl 'zeppelin-interpreter,spark' - - mvn package -Pbuild-distr -Pspark-1.1 -Phadoop-2.3 -B + - rm -rf `pwd`/interpreter/spark + - mvn package -Pspark-1.1 -Phadoop-2.3 -Ppyspark -B -pl 'zeppelin-interpreter,spark-dependencies,spark' - ./testing/startSparkCluster.sh 1.1.1 2.3 - - /bin/bash ./dev/travis/travis-install.sh `pwd` mvn verify -Pspark-1.1 -Phadoop-2.3 -B -pl 'zeppelin-interpreter,spark' + - mvn package -Pspark-1.1 -Phadoop-2.3 -B -pl 'zeppelin-interpreter,zeppelin-zengine,zeppelin-server' -Dtest=org.apache.zeppelin.rest.*Test -DfailIfNoTests=false - ./testing/stopSparkCluster.sh 1.1.1 2.3 after_failure: diff --git a/README.md b/README.md index d5658587f63..a3bc0ad1ce8 100644 --- a/README.md +++ b/README.md @@ -38,50 +38,52 @@ sudo apt-get install npm ### Build If you want to build Zeppelin from the source, please first clone this repository. And then: ``` -mvn clean package +mvn clean package -DskipTests ``` -Build with specific version -Spark 1.1.x -``` -mvn clean package -Pspark-1.1 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests -``` -Spark 1.2.x +Build with specific Spark version + +Spark 1.4.x ``` -mvn clean package -Pspark-1.2 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests +mvn clean package -Pspark-1.4 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests ``` Spark 1.3.x ``` mvn clean package -Pspark-1.3 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests ``` -Spark 1.4.x +Spark 1.2.x ``` -mvn clean package -Pspark-1.4 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests +mvn clean package -Pspark-1.2 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests +``` +Spark 1.1.x +``` +mvn clean package -Pspark-1.1 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests ``` CDH 5.X ``` mvn clean package -Pspark-1.2 -Dhadoop.version=2.5.0-cdh5.3.0 -Phadoop-2.4 -DskipTests ``` -Yarn (Hadoop 2.2.x) +Yarn (Hadoop 2.7.x) ``` -mvn clean package -Pspark-1.1 -Dhadoop.version=2.2.0 -Phadoop-2.2 -Pyarn -DskipTests +mvn clean package -Pspark-1.4 -Dspark.version=1.4.1 -Dhadoop.version=2.7.0 -Phadoop-2.6 -Pyarn -DskipTests ``` -Yarn (Hadoop 2.3.x) +Yarn (Hadoop 2.6.x) ``` -mvn clean package -Pspark-1.1 -Dhadoop.version=2.3.0 -Phadoop-2.3 -Pyarn -DskipTests +mvn clean package -Pspark-1.1 -Dhadoop.version=2.6.0 -Phadoop-2.6 -Pyarn -DskipTests ``` Yarn (Hadoop 2.4.x) ``` mvn clean package -Pspark-1.1 -Dhadoop.version=2.4.0 -Phadoop-2.4 -Pyarn -DskipTests ``` -Yarn (Hadoop 2.6.x) +Yarn (Hadoop 2.3.x) ``` -mvn clean package -Pspark-1.1 -Dhadoop.version=2.6.0 -Phadoop-2.6 -Pyarn -DskipTests +mvn clean package -Pspark-1.1 -Dhadoop.version=2.3.0 -Phadoop-2.3 -Pyarn -DskipTests ``` -Yarn (Hadoop 2.7.x) +Yarn (Hadoop 2.2.x) ``` -mvn clean package -Pspark-1.4 -Dspark.version=1.4.1 -Dhadoop.version=2.7.0 -Phadoop-2.6 -Pyarn -DskipTests +mvn clean package -Pspark-1.1 -Dhadoop.version=2.2.0 -Phadoop-2.2 -Pyarn -DskipTests ``` + Ignite (1.1.0-incubating and later) ``` mvn clean package -Dignite.version=1.1.0-incubating -DskipTests @@ -96,6 +98,19 @@ If you wish to configure Zeppelin option (like port number), configure the follo (You can copy ```./conf/zeppelin-env.sh.template``` into ```./conf/zeppelin-env.sh```. Same for ```zeppelin-site.xml```.) + +#### Setting SPARK_HOME and HADOOP_HOME + +Without SPARK_HOME and HADOOP_HOME, Zeppelin uses embedded Spark and Hadoop binaries that you have specified with mvn build option. +If you want to use system provided Spark and Hadoop, export SPARK_HOME and HADOOP_HOME in zeppelin-env.sh +You can use any supported version of spark without rebuilding Zeppelin. + +``` +# ./conf/zeppelin-env.sh +export SPARK_HOME=... +export HADOOP_HOME=... +``` + #### External cluster configuration Mesos diff --git a/bin/common.sh b/bin/common.sh index 7aab870af14..188ff863966 100644 --- a/bin/common.sh +++ b/bin/common.sh @@ -80,22 +80,10 @@ function addEachJarInDir(){ function addJarInDir(){ if [[ -d "${1}" ]]; then - export ZEPPELIN_CLASSPATH="${1}/*:${ZEPPELIN_CLASSPATH}" + ZEPPELIN_CLASSPATH="${1}/*:${ZEPPELIN_CLASSPATH}" fi } -if [[ ! -z "${SPARK_HOME}" ]] && [[ -d "${SPARK_HOME}" ]]; then - addJarInDir "${SPARK_HOME}" -fi - -if [[ ! -z "${HADOOP_HOME}" ]] && [[ -d "${HADOOP_HOME}" ]]; then - addJarInDir "${HADOOP_HOME}" -fi - -if [[ ! -z "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then - ZEPPELIN_CLASSPATH+=":${HADOOP_CONF_DIR}" -fi - export ZEPPELIN_CLASSPATH # Text encoding for diff --git a/bin/interpreter.sh b/bin/interpreter.sh index 93ae1e55088..e1b4f4acbbb 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -57,9 +57,6 @@ fi addJarInDir "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib" addJarInDir "${INTERPRETER_DIR}" -export SPARK_CLASSPATH+=":${ZEPPELIN_CLASSPATH}" -CLASSPATH+=":${ZEPPELIN_CLASSPATH}" - HOSTNAME=$(hostname) ZEPPELIN_SERVER=org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer @@ -73,19 +70,76 @@ if [[ ! -d "${ZEPPELIN_LOG_DIR}" ]]; then $(mkdir -p "${ZEPPELIN_LOG_DIR}") fi -if [[ ! -z "${SPARK_HOME}" ]]; then - PYSPARKPATH="${SPARK_HOME}/python:${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-0.8.2.1-src.zip" -else - PYSPARKPATH="${ZEPPELIN_HOME}/interpreter/spark/pyspark/pyspark.zip:${ZEPPELIN_HOME}/interpreter/spark/pyspark/py4j-0.8.2.1-src.zip" -fi +# set spark related env variables +if [[ "${INTERPRETER_ID}" == "spark" ]]; then + # add Hadoop jars into classpath + if [[ -n "${HADOOP_HOME}" ]]; then + # Apache + addEachJarInDir "${HADOOP_HOME}/share" -if [[ x"" == x"${PYTHONPATH}" ]]; then - export PYTHONPATH="${PYSPARKPATH}" -else - export PYTHONPATH="${PYTHONPATH}:${PYSPARKPATH}" + # CDH + addJarInDir "${HADOOP_HOME}" + addJarInDir "${HADOOP_HOME}/lib" + fi + + # autodetect HADOOP_CONF_HOME by heuristic + if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then + if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then + export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop" + elif [[ -d "/etc/hadoop/conf" ]]; then + export HADOOP_CONF_DIR="/etc/hadoop/conf" + fi + fi + + if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then + ZEPPELIN_CLASSPATH+=":${HADOOP_CONF_DIR}" + fi + + # add Spark jars into classpath + if [[ -n "${SPARK_HOME}" ]]; then + addJarInDir "${SPARK_HOME}/lib" + PYSPARKPATH="${SPARK_HOME}/python:${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-0.8.2.1-src.zip" + else + addJarInDir "${INTERPRETER_DIR}/dep" + PYSPARKPATH="${ZEPPELIN_HOME}/interpreter/spark/pyspark/pyspark.zip:${ZEPPELIN_HOME}/interpreter/spark/pyspark/py4j-0.8.2.1-src.zip" + fi + + if [[ x"" == x"${PYTHONPATH}" ]]; then + export PYTHONPATH="${PYSPARKPATH}" + else + export PYTHONPATH="${PYTHONPATH}:${PYSPARKPATH}" + fi + + + # autodetect SPARK_CONF_DIR + if [[ -n "${SPARK_HOME}" ]] && [[ -z "${SPARK_CONF_DIR}" ]]; then + if [[ -d "${SPARK_HOME}/conf" ]]; then + SPARK_CONF_DIR="${SPARK_HOME}/conf" + fi + fi + + # read spark-*.conf if exists + if [[ -d "${SPARK_CONF_DIR}" ]]; then + ls ${SPARK_CONF_DIR}/spark-*.conf > /dev/null 2>&1 + if [[ "$?" -eq 0 ]]; then + for file in ${SPARK_CONF_DIR}/spark-*.conf; do + while read -r line; do + echo "${line}" | grep -e "^spark[.]" > /dev/null + if [ "$?" -ne 0 ]; then + # skip the line not started with 'spark.' + continue; + fi + SPARK_CONF_KEY=`echo "${line}" | sed -e 's/\(^spark[^ ]*\)[ \t]*\(.*\)/\1/g'` + SPARK_CONF_VALUE=`echo "${line}" | sed -e 's/\(^spark[^ ]*\)[ \t]*\(.*\)/\2/g'` + export ZEPPELIN_JAVA_OPTS+=" -D${SPARK_CONF_KEY}=\"${SPARK_CONF_VALUE}\"" + done < "${file}" + done + fi + fi fi -unset PYSPARKPATH +export SPARK_CLASSPATH+=":${ZEPPELIN_CLASSPATH}" +CLASSPATH+=":${ZEPPELIN_CLASSPATH}" ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} -cp ${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} & pid=$! diff --git a/dev/travis/save-logs.py b/dev/travis/save-logs.py index 5f4ad28bade..d0480e8f456 100755 --- a/dev/travis/save-logs.py +++ b/dev/travis/save-logs.py @@ -42,8 +42,7 @@ def main(file, cmd): errcode = process.wait() diff = datetime.now() - start sys.stdout.write("\r%d seconds %d log lines"%(diff.seconds, count)) - print - print cmd, "done", errcode + sys.stdout.write("\n" + str(cmd) + " done " + str(errcode) + "\n") return errcode if __name__ == "__main__": diff --git a/pom.xml b/pom.xml index 9b7cd4610be..ff651603f0e 100755 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,7 @@ zeppelin-interpreter zeppelin-zengine + spark-dependencies spark markdown angular diff --git a/spark-dependencies/pom.xml b/spark-dependencies/pom.xml new file mode 100644 index 00000000000..c451c391cf7 --- /dev/null +++ b/spark-dependencies/pom.xml @@ -0,0 +1,791 @@ + + + + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.6.0-incubating-SNAPSHOT + .. + + + org.apache.zeppelin + zeppelin-spark-dependencies + jar + 0.6.0-incubating-SNAPSHOT + Zeppelin: Spark dependencies + Zeppelin spark support + http://zeppelin.incubator.apache.org + + + + 1.4.1 + 2.10.4 + 2.10 + + 2.3.0 + ${hadoop.version} + 1.7.7 + + 0.7.1 + 2.4.1 + + org.spark-project.akka + 2.3.4-spark + + http://www.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}.tgz + 0.8.2.1 + + + + + cloudera + https://repository.cloudera.com/artifactory/cloudera-repos/ + + + + + + + org.apache.avro + avro + ${avro.version} + + + org.apache.avro + avro-ipc + ${avro.version} + + + io.netty + netty + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + org.mortbay.jetty + servlet-api + + + org.apache.velocity + velocity + + + + + org.apache.avro + avro-mapred + ${avro.version} + ${avro.mapred.classifier} + + + io.netty + netty + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + org.mortbay.jetty + servlet-api + + + org.apache.velocity + velocity + + + + + + + net.java.dev.jets3t + jets3t + ${jets3t.version} + runtime + + + commons-logging + commons-logging + + + + + org.apache.hadoop + hadoop-yarn-api + ${yarn.version} + + + asm + asm + + + org.ow2.asm + asm + + + org.jboss.netty + netty + + + commons-logging + commons-logging + + + + + + org.apache.hadoop + hadoop-yarn-common + ${yarn.version} + + + asm + asm + + + org.ow2.asm + asm + + + org.jboss.netty + netty + + + javax.servlet + servlet-api + + + commons-logging + commons-logging + + + + + + org.apache.hadoop + hadoop-yarn-server-web-proxy + ${yarn.version} + + + asm + asm + + + org.ow2.asm + asm + + + org.jboss.netty + netty + + + javax.servlet + servlet-api + + + commons-logging + commons-logging + + + + + + org.apache.hadoop + hadoop-yarn-client + ${yarn.version} + + + asm + asm + + + org.ow2.asm + asm + + + org.jboss.netty + netty + + + javax.servlet + servlet-api + + + commons-logging + commons-logging + + + + + + + + + + org.apache.spark + spark-core_2.10 + ${spark.version} + + + org.apache.hadoop + hadoop-client + + + + + + org.apache.spark + spark-repl_2.10 + ${spark.version} + + + + org.apache.spark + spark-sql_2.10 + ${spark.version} + + + + org.apache.spark + spark-hive_2.10 + ${spark.version} + + + + + org.apache.spark + spark-streaming_2.10 + ${spark.version} + + + + + org.apache.spark + spark-streaming-twitter_2.10 + ${spark.version} + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + + ${akka.group} + akka-actor_${scala.binary.version} + ${akka.version} + + + ${akka.group} + akka-remote_${scala.binary.version} + ${akka.version} + + + ${akka.group} + akka-slf4j_${scala.binary.version} + ${akka.version} + + + ${akka.group} + akka-testkit_${scala.binary.version} + ${akka.version} + + + ${akka.group} + akka-zeromq_${scala.binary.version} + ${akka.version} + + + ${akka.group} + akka-actor_${scala.binary.version} + + + + + + + + + spark-1.1 + + + + + 1.1.1 + 2.2.3-shaded-protobuf + + + + + cassandra-spark-1.1 + + + com.datastax.spark + spark-cassandra-connector_${scala.binary.version} + 1.1.1 + + + org.joda + joda-convert + + + + + + 1.1.1 + 2.2.3-shaded-protobuf + + + + + spark-1.2 + + + + 1.2.1 + + + + + cassandra-spark-1.2 + + 1.2.1 + + + + com.datastax.spark + spark-cassandra-connector_${scala.binary.version} + 1.2.1 + + + org.joda + joda-convert + + + + + + + + spark-1.3 + + + 1.3.1 + + + + + + + + + cassandra-spark-1.3 + + 1.3.0 + + + + + com.datastax.spark + spark-cassandra-connector_${scala.binary.version} + + 1.3.0-SNAPSHOT + + + org.joda + joda-convert + + + + + + + + spark-1.4 + + 1.4.1 + + + + + + + + hadoop-0.23 + + + + org.apache.avro + avro + + + + 0.23.10 + + + + + hadoop-1 + + 1.0.4 + hadoop1 + 1.8.8 + org.spark-project.akka + + + + + hadoop-2.2 + + 2.2.0 + 2.5.0 + hadoop2 + + + + + hadoop-2.3 + + 2.3.0 + 2.5.0 + 0.9.3 + hadoop2 + + + + + hadoop-2.4 + + 2.4.0 + 2.5.0 + 0.9.3 + hadoop2 + + + + + hadoop-2.6 + + 2.6.0 + 2.5.0 + 0.9.3 + hadoop2 + + + + + mapr3 + + false + + + 1.0.3-mapr-3.0.3 + 2.3.0-mapr-4.0.0-FCS + 0.7.1 + + + + + mapr4 + + false + + + 2.3.0-mapr-4.0.0-FCS + 2.3.0-mapr-4.0.0-FCS + 0.7.1 + + + + org.apache.curator + curator-recipes + 2.4.0 + + + org.apache.zookeeper + zookeeper + + + + + org.apache.zookeeper + zookeeper + 3.4.5-mapr-1406 + + + + + + yarn + + + org.apache.spark + spark-yarn_2.10 + ${spark.version} + + + + org.apache.hadoop + hadoop-yarn-api + ${yarn.version} + + + + + + pyspark + + http://www.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}.tgz + + + + + + com.googlecode.maven-download-plugin + download-maven-plugin + 1.2.1 + + + download-pyspark-files + validate + + wget + + + ${spark.download.url} + true + ${project.build.directory}/spark-dist + + + + + + maven-clean-plugin + + + + ${basedir}/../python/build + + + ${project.build.directory}/spark-dist + + + + + + org.apache.maven.plugins + maven-antrun-plugin + 1.7 + + + download-and-zip-pyspark-files + generate-resources + + run + + + + + + + + + + + + + + + + + + + + org.apache.rat + apache-rat-plugin + + + **/.idea/ + **/*.iml + .gitignore + **/.settings/* + **/.classpath + **/.project + **/target/** + **/derby.log + **/metastore_db/ + **/README.md + dependency-reduced-pom.xml + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-clean-plugin + + + + ../interpreter/spark/dep + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.17 + + 1 + false + -Xmx1024m -XX:MaxPermSize=256m + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.3 + + + + *:* + + org/datanucleus/** + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + reference.conf + + + + + + package + + shade + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/spark/dep + false + false + true + org.datanucleus + + + + package + + copy + + + ${project.build.directory}/../../interpreter/spark/dep + false + false + true + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + diff --git a/spark/pom.xml b/spark/pom.xml index aa076877374..59da081e2e1 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -34,23 +34,13 @@ Zeppelin spark support http://zeppelin.incubator.apache.org - 1.4.1 2.10.4 2.10 2.3.0 - ${hadoop.version} - 1.7.7 - - 0.7.1 - 2.4.1 - - org.spark-project.akka - 2.3.4-spark - - http://www.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}.tgz + 0.8.2.1 @@ -59,192 +49,6 @@ https://repository.cloudera.com/artifactory/cloudera-repos/ - - - - - org.apache.avro - avro - ${avro.version} - - - org.apache.avro - avro-ipc - ${avro.version} - - - io.netty - netty - - - org.mortbay.jetty - jetty - - - org.mortbay.jetty - jetty-util - - - org.mortbay.jetty - servlet-api - - - org.apache.velocity - velocity - - - - - org.apache.avro - avro-mapred - ${avro.version} - ${avro.mapred.classifier} - - - io.netty - netty - - - org.mortbay.jetty - jetty - - - org.mortbay.jetty - jetty-util - - - org.mortbay.jetty - servlet-api - - - org.apache.velocity - velocity - - - - - - - net.java.dev.jets3t - jets3t - ${jets3t.version} - runtime - - - commons-logging - commons-logging - - - - - org.apache.hadoop - hadoop-yarn-api - ${yarn.version} - - - asm - asm - - - org.ow2.asm - asm - - - org.jboss.netty - netty - - - commons-logging - commons-logging - - - - - - org.apache.hadoop - hadoop-yarn-common - ${yarn.version} - - - asm - asm - - - org.ow2.asm - asm - - - org.jboss.netty - netty - - - javax.servlet - servlet-api - - - commons-logging - commons-logging - - - - - - org.apache.hadoop - hadoop-yarn-server-web-proxy - ${yarn.version} - - - asm - asm - - - org.ow2.asm - asm - - - org.jboss.netty - netty - - - javax.servlet - servlet-api - - - commons-logging - commons-logging - - - - - - org.apache.hadoop - hadoop-yarn-client - ${yarn.version} - - - asm - asm - - - org.ow2.asm - asm - - - org.jboss.netty - netty - - - javax.servlet - servlet-api - - - commons-logging - commons-logging - - - - - @@ -264,104 +68,19 @@ provided - - - org.apache.spark - spark-core_2.10 - ${spark.version} - - - org.apache.hadoop - hadoop-client - - - - - - org.apache.spark - spark-repl_2.10 - ${spark.version} - - - - org.apache.spark - spark-sql_2.10 - ${spark.version} - - - - org.apache.spark - spark-hive_2.10 - ${spark.version} - - - - - org.apache.spark - spark-streaming_2.10 - ${spark.version} - - - - - org.apache.spark - spark-streaming-twitter_2.10 - ${spark.version} - - - - org.apache.spark - spark-catalyst_${scala.binary.version} - ${spark.version} - - - - - - org.apache.hadoop - hadoop-client - ${hadoop.version} - - - - com.google.protobuf - protobuf-java - ${protobuf.version} + ${project.groupId} + zeppelin-spark-dependencies + ${project.version} + provided - ${akka.group} - akka-actor_${scala.binary.version} - ${akka.version} + com.google.guava + guava + 14.0.1 - - ${akka.group} - akka-remote_${scala.binary.version} - ${akka.version} - - - ${akka.group} - akka-slf4j_${scala.binary.version} - ${akka.version} - - - ${akka.group} - akka-testkit_${scala.binary.version} - ${akka.version} - - - ${akka.group} - akka-zeromq_${scala.binary.version} - ${akka.version} - - - ${akka.group} - akka-actor_${scala.binary.version} - - - - + org.apache.maven @@ -482,6 +201,92 @@ 1.1 + + + org.apache.spark + spark-core_2.10 + ${spark.version} + provided + + + + org.apache.spark + spark-repl_2.10 + ${spark.version} + provided + + + + org.apache.spark + spark-sql_2.10 + ${spark.version} + provided + + + + org.apache.spark + spark-hive_2.10 + ${spark.version} + provided + + + + org.apache.spark + spark-catalyst_2.10 + ${spark.version} + provided + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + provided + + + + org.scala-lang + scala-library + ${scala.version} + provided + + + + org.scala-lang + scala-compiler + ${scala.version} + provided + + + + org.scala-lang + scala-reflect + ${scala.version} + provided + + + + commons-lang + commons-lang + provided + + + + org.apache.commons + commons-compress + 1.9 + provided + + + + net.sf.py4j + py4j + ${py4j.version} + + + org.scalatest @@ -497,356 +302,6 @@ - - - spark-1.1 - - - - - 1.1.1 - 2.2.3-shaded-protobuf - - - - - cassandra-spark-1.1 - - - com.datastax.spark - spark-cassandra-connector_${scala.binary.version} - 1.1.1 - - - org.joda - joda-convert - - - - - - 1.1.1 - 2.2.3-shaded-protobuf - - - - - spark-1.2 - - - - 1.2.1 - - - - - cassandra-spark-1.2 - - 1.2.1 - - - - com.datastax.spark - spark-cassandra-connector_${scala.binary.version} - 1.2.1 - - - org.joda - joda-convert - - - - - - - - spark-1.3 - - - 1.3.1 - - - - - - - - - cassandra-spark-1.3 - - 1.3.0 - - - - - com.datastax.spark - spark-cassandra-connector_${scala.binary.version} - - 1.3.0-SNAPSHOT - - - org.joda - joda-convert - - - - - - - - spark-1.4 - - 1.4.1 - - - - - - - - hadoop-0.23 - - - - org.apache.avro - avro - - - - 0.23.10 - - - - - hadoop-1 - - 1.0.4 - hadoop1 - 1.8.8 - org.spark-project.akka - - - - - hadoop-2.2 - - 2.2.0 - 2.5.0 - hadoop2 - - - - - hadoop-2.3 - - 2.3.0 - 2.5.0 - 0.9.3 - hadoop2 - - - - - hadoop-2.4 - - 2.4.0 - 2.5.0 - 0.9.3 - hadoop2 - - - - - hadoop-2.6 - - 2.6.0 - 2.5.0 - 0.9.3 - hadoop2 - - - - - mapr3 - - false - - - 1.0.3-mapr-3.0.3 - 2.3.0-mapr-4.0.0-FCS - 0.7.1 - - - - - mapr4 - - false - - - 2.3.0-mapr-4.0.0-FCS - 2.3.0-mapr-4.0.0-FCS - 0.7.1 - - - - org.apache.curator - curator-recipes - 2.4.0 - - - org.apache.zookeeper - zookeeper - - - - - org.apache.zookeeper - zookeeper - 3.4.5-mapr-1406 - - - - - - yarn - - - org.apache.spark - spark-yarn_2.10 - ${spark.version} - - - - org.apache.hadoop - hadoop-yarn-api - ${yarn.version} - - - - - - pyspark - - http://www.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}.tgz - - - - - - com.googlecode.maven-download-plugin - download-maven-plugin - 1.2.1 - - - download-pyspark-files - validate - - wget - - - ${spark.download.url} - true - ${project.build.directory}/spark-dist - - - - - - maven-clean-plugin - - - - ${basedir}/../python/build - - - ${project.build.directory}/spark-dist - - - - - - org.apache.maven.plugins - maven-antrun-plugin - 1.7 - - - download-and-zip-pyspark-files - generate-resources - - run - - - - - - - - - - - - - - - - - - hadoop-provided - - false - - - - org.apache.hadoop - hadoop-client - provided - - - org.apache.hadoop - hadoop-yarn-api - provided - - - org.apache.hadoop - hadoop-yarn-common - provided - - - org.apache.hadoop - hadoop-yarn-server-web-proxy - provided - - - org.apache.hadoop - hadoop-yarn-client - provided - - - org.apache.avro - avro - provided - - - org.apache.avro - avro-ipc - provided - - - org.apache.zookeeper - zookeeper - ${zookeeper.version} - provided - - - - - @@ -889,6 +344,17 @@ + + maven-clean-plugin + + + + ../interpreter/spark + + + + + org.apache.maven.plugins maven-surefire-plugin @@ -900,40 +366,6 @@ - - org.apache.maven.plugins - maven-shade-plugin - 2.3 - - - - *:* - - org/datanucleus/** - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - reference.conf - - - - - - package - - shade - - - - - - org.apache.maven.plugins maven-dependency-plugin @@ -950,7 +382,7 @@ false false true - org.datanucleus + runtime @@ -963,6 +395,7 @@ false false true + runtime ${project.groupId} diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 852dd335183..a0f4d9f6c2e 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -92,15 +92,6 @@ public PySparkInterpreter(Properties property) { scriptPath = System.getProperty("java.io.tmpdir") + "/zeppelin_pyspark.py"; } - private String getSparkHome() { - String sparkHome = getProperty("spark.home"); - if (sparkHome == null) { - throw new InterpreterException("spark.home is undefined"); - } else { - return sparkHome; - } - } - private void createPythonScript() { ClassLoader classLoader = getClass().getClassLoader(); @@ -125,6 +116,17 @@ private void createPythonScript() { @Override public void open() { + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + if (sparkInterpreter.diagnosis()) { + SparkConfValidator validator = sparkInterpreter.getValidator(); + if (!validator.hasError()) { + validator.validatePyspark(sparkInterpreter.isYarnMode()); + } + if (validator.hasError()) { + return; + } + } + // create python script createPythonScript(); @@ -187,8 +189,12 @@ private int findRandomOpenPortOnAllLocalInterfaces() { @Override public void close() { - executor.getWatchdog().destroyProcess(); - gatewayServer.shutdown(); + if (executor != null) { + executor.getWatchdog().destroyProcess(); + } + if (gatewayServer != null) { + gatewayServer.shutdown(); + } } PythonInterpretRequest pythonInterpretRequest = null; @@ -255,6 +261,14 @@ public void onPythonScriptInitialized() { @Override public InterpreterResult interpret(String st, InterpreterContext context) { + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + if (sparkInterpreter.diagnosis()) { + SparkConfValidator validator = sparkInterpreter.getValidator(); + if (validator.hasError()) { + return new InterpreterResult(Code.ERROR, validator.getError()); + } + } + if (!pythonscriptRunning) { return new InterpreterResult(Code.ERROR, "python process not running" + outputStream.toString()); @@ -285,7 +299,6 @@ public InterpreterResult interpret(String st, InterpreterContext context) { + outputStream.toString()); } - SparkInterpreter sparkInterpreter = getSparkInterpreter(); if (!sparkInterpreter.getSparkContext().version().startsWith("1.2") && !sparkInterpreter.getSparkContext().version().startsWith("1.3") && !sparkInterpreter.getSparkContext().version().startsWith("1.4")) { diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkConfValidator.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkConfValidator.java new file mode 100644 index 00000000000..63b9941b529 --- /dev/null +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkConfValidator.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.spark; + +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Properties; +import java.util.regex.Pattern; + +/** + * Validate configurations. + */ +public class SparkConfValidator { + private String error; + private String sparkHome; + private String hadoopHome; + private String hadoopConfDir; + private String pythonPath; + + /** + * + * @param sparkHome SPARK_HOME env variable + * @param hadoopHome HADOOP_HOME env variable + * @param hadoopConfDir HADOOP_CONF_DIR env variable + * @param pythonPath PYTHONPATH env variable + */ + public SparkConfValidator(String sparkHome, String hadoopHome, + String hadoopConfDir, String pythonPath) { + clear(); + this.sparkHome = sparkHome; + this.hadoopHome = hadoopHome; + this.hadoopConfDir = hadoopConfDir; + this.pythonPath = pythonPath; + } + private void clear() { + error = ""; + } + + public boolean validateSpark() { + clear(); + + // Check classes are loaded + if (!checkSparkClassAvailability()) { + return false; + } + + return true; + } + + private boolean checkSparkClassAvailability() { + if (checkClassAvailability("org.apache.spark.SparkContext")) { + return true; + } else { + // classes are not available. + if (sparkHome == null) { + error = "SPARK_HOME is not defined"; + return false; + } else { + if (!new File(sparkHome).isDirectory()) { + error = "SPARK_HOME " + sparkHome + " is not a valid directory"; + return false; + } + } + + // unknown reason + error = "Spark artifacts are not available in current classpaths\n"; + printClasspath(); + return false; + } + } + + private void printClasspath() { + ClassLoader cl = getClass().getClassLoader(); + URL[] urls = ((URLClassLoader) cl).getURLs(); + for (URL url : urls) { + error += url.getFile() + "\n"; + } + } + + private boolean checkClassAvailability(String className) { + try { + getClass().getClassLoader().loadClass(className); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } + + public boolean validateYarn(Properties property) { + clear(); + if (!checkClassAvailability("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")) { + if (sparkHome == null) { + error += "Build Zeppelin with -Pyarn flag or set SPARK_HOME"; + return false; + } else if (!new File(sparkHome).isDirectory()) { + error += "SPARK_HOME " + sparkHome + " is not a valid directory"; + return false; + } else { + // unknown reason + error += "spark-yarn artifact is not available in current classpaths\n"; + printClasspath(); + return false; + } + } + + if (!checkClassAvailability("org.apache.hadoop.yarn.conf.YarnConfiguration")) { + if (hadoopHome == null) { + // unknown reason + error += "hadoop-yarn-api artifact is not available in current classpaths.\n"; + error += "Please rebuild Zeppelin or try to set HADOOP_HOME"; + printClasspath(); + return false; + } else if (!new File(hadoopHome).isDirectory()) { + error += "HADOOP_HOME " + hadoopHome + " is not a valid directory"; + return false; + } + } + + // check hadoop conf dir + if (hadoopConfDir == null) { + error += "HADOOP_CONF_DIR is not defined"; + return false; + } else if (!new File(hadoopConfDir).isDirectory()) { + error += "HADOOP_CONF_DIR " + hadoopConfDir + " is not a valid directory"; + return false; + } + + // check spark.yarn.jar + String sparkYarnJar = property.getProperty("spark.yarn.jar"); + if (sparkYarnJar == null || sparkYarnJar.trim().length() == 0) { + error += "spark.yarn.jar is not defined. Please set this property in Interpreter menu"; + return false; + } else if (!new File(sparkYarnJar).isFile()) { + error += "spark.yarn.jar " + sparkYarnJar + " is not a valid file"; + return false; + } + return true; + } + + public boolean validatePyspark(boolean yarnMode) { + clear(); + + if (pythonPath == null) { + error += "PYTHONPATH is not defined. It is usually configured automatically." + + "Please report this problem"; + return false; + } else { + boolean pysparkFound = false; + boolean py4jFound = false; + + for (String p : pythonPath.split(":")) { + File path = new File(p); + String name = path.getName(); + + if (Pattern.matches("py4j.*src.zip", name) && path.isFile()) { + py4jFound = true; + } else if (name.equals("pyspark.zip") && path.isFile()) { + pysparkFound = true; + } else if (name.equals("python") && sparkHome != null && + path.getAbsolutePath().equals(sparkHome + "/python")) { + pysparkFound = true; + } + } + + if (!pysparkFound) { + error += "pyspark.zip or SPARK_HOME/python directory not found"; + return false; + } + + if (!py4jFound) { + error += "py4j-x.x.x.x-src.zip is not found. Please check your SPARK_HOME"; + return false; + } + + if (yarnMode) { + // more test on yarn-client mode + } + return true; + } + } + + + public String getError() { + return error; + } + + public boolean hasError() { + return (error != null && error.length() > 0); + } +} diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index a4ff494ce1c..ff80271e121 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -29,6 +29,7 @@ import java.util.*; import com.google.common.base.Joiner; + import org.apache.spark.HttpServer; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; @@ -40,7 +41,7 @@ import org.apache.spark.scheduler.ActiveJob; import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.Pool; -import org.apache.spark.scheduler.Stage; +import org.apache.spark.scheduler.SparkListener; import org.apache.spark.sql.SQLContext; import org.apache.spark.ui.jobs.JobProgressListener; import org.apache.zeppelin.interpreter.Interpreter; @@ -67,6 +68,7 @@ import scala.collection.Iterator; import scala.collection.JavaConversions; import scala.collection.JavaConverters; +import scala.collection.Seq; import scala.collection.mutable.HashMap; import scala.collection.mutable.HashSet; import scala.tools.nsc.Settings; @@ -88,6 +90,9 @@ public class SparkInterpreter extends Interpreter { "spark", SparkInterpreter.class.getName(), new InterpreterPropertyBuilder() + .add("zeppelin.spark.diagnosis", + getSystemDefault("ZEPPELIN_SPARK_DIAGNOSIS", "zeppelin.spark.diagnosis", "false"), + "Self diagnosis of configuration") .add("spark.app.name", "Zeppelin", "The name of spark application.") .add("master", getSystemDefault("MASTER", "spark.master", "local[*]"), @@ -126,6 +131,7 @@ public class SparkInterpreter extends Interpreter { private Map binder; private SparkEnv env; + private SparkConfValidator sparkConfValidator; public SparkInterpreter(Properties property) { @@ -156,7 +162,14 @@ public boolean isSparkContextInitialized() { private static JobProgressListener setupListeners(SparkContext context) { JobProgressListener pl = new JobProgressListener(context.getConf()); - context.listenerBus().addListener(pl); + try { + Object listenerBus = context.getClass().getMethod("listenerBus").invoke(context); + Method m = listenerBus.getClass().getMethod("addListener", SparkListener.class); + m.invoke(listenerBus, pl); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + e.printStackTrace(); + } return pl; } @@ -164,6 +177,22 @@ private boolean useHiveContext() { return Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext")); } + public boolean diagnosis() { + if (getProperty("zeppelin.spark.diagnosis") == null) { + return false; + } else { + return Boolean.parseBoolean(getProperty("zeppelin.spark.diagnosis")); + } + } + + public boolean isYarnMode() { + return getProperty("master").equals("yarn-client") || getProperty("master").equals("yarn"); + } + + public SparkConfValidator getValidator() { + return sparkConfValidator; + } + public SQLContext getSQLContext() { if (sqlc == null) { if (useHiveContext()) { @@ -271,7 +300,7 @@ public SparkContext createSparkContext() { } //TODO(jongyoul): Move these codes into PySparkInterpreter.java - + String pysparkBasePath = getSystemDefault("SPARK_HOME", "spark.home", null); File pysparkPath; if (null == pysparkBasePath) { @@ -329,6 +358,23 @@ public static String getSystemDefault( @Override public void open() { + if (diagnosis()) { + sparkConfValidator = new SparkConfValidator( + getSystemDefault("SPARK_HOME", "spark.home", null), + System.getenv("HADOOP_HOME"), + System.getenv("HADOOP_CONF_DIR"), + System.getenv("PYTHONPATH") + ); + + if (!sparkConfValidator.validateSpark()) { + return; + } + + if (isYarnMode() && !sparkConfValidator.validateYarn(getProperty())) { + return; + } + } + URL[] urls = getClassloaderUrls(); // Very nice discussion about how scala compiler handle classpath @@ -547,6 +593,9 @@ private List classPath(ClassLoader cl) { @Override public List completion(String buf, int cursor) { + if (completor == null) { + return new LinkedList(); + } ScalaCompleter c = completor.completer(); Candidates ret = c.complete(buf, cursor); return scala.collection.JavaConversions.asJavaList(ret.candidates()); @@ -572,6 +621,10 @@ String getJobGroup(InterpreterContext context){ */ @Override public InterpreterResult interpret(String line, InterpreterContext context) { + if (diagnosis() && sparkConfValidator.hasError()) { + return new InterpreterResult(Code.ERROR, sparkConfValidator.getError()); + } + z.setInterpreterContext(context); if (line == null || line.trim().length() == 0) { return new InterpreterResult(Code.SUCCESS); @@ -606,7 +659,7 @@ public InterpreterResult interpretInput(String[] lines) { String incomplete = ""; for (int l = 0; l < linesToRun.length; l++) { - String s = linesToRun[l]; + String s = linesToRun[l]; // check if next line starts with "." (but not ".." or "./") it is treated as an invocation if (l + 1 < linesToRun.length) { String nextLine = linesToRun[l + 1].trim(); @@ -647,11 +700,18 @@ public InterpreterResult interpretInput(String[] lines) { @Override public void cancel(InterpreterContext context) { + if (sc == null) { + return; + } sc.cancelJobGroup(getJobGroup(context)); } @Override public int getProgress(InterpreterContext context) { + if (sc == null) { + return 0; + } + String jobGroup = getJobGroup(context); int completedTasks = 0; int totalTasks = 0; @@ -671,18 +731,26 @@ public int getProgress(InterpreterContext context) { if (jobGroup.equals(g)) { int[] progressInfo = null; - if (sc.version().startsWith("1.0")) { - progressInfo = getProgressFromStage_1_0x(sparkListener, job.finalStage()); - } else if (sc.version().startsWith("1.1")) { - progressInfo = getProgressFromStage_1_1x(sparkListener, job.finalStage()); - } else if (sc.version().startsWith("1.2")) { - progressInfo = getProgressFromStage_1_1x(sparkListener, job.finalStage()); - } else if (sc.version().startsWith("1.3")) { - progressInfo = getProgressFromStage_1_1x(sparkListener, job.finalStage()); - } else if (sc.version().startsWith("1.4")) { - progressInfo = getProgressFromStage_1_1x(sparkListener, job.finalStage()); - } else { - continue; + try { + Object finalStage = job.getClass().getMethod("finalStage").invoke(job); + if (sc.version().startsWith("1.0")) { + progressInfo = getProgressFromStage_1_0x(sparkListener, finalStage); + } else if (sc.version().startsWith("1.1")) { + progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage); + } else if (sc.version().startsWith("1.2")) { + progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage); + } else if (sc.version().startsWith("1.3")) { + progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage); + } else if (sc.version().startsWith("1.4")) { + progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage); + } else { + continue; + } + } catch (IllegalAccessException | IllegalArgumentException + | InvocationTargetException | NoSuchMethodException + | SecurityException e) { + logger.error("Can't get progress info", e); + return 0; } totalTasks += progressInfo[0]; completedTasks += progressInfo[1]; @@ -695,33 +763,27 @@ public int getProgress(InterpreterContext context) { return completedTasks * 100 / totalTasks; } - private int[] getProgressFromStage_1_0x(JobProgressListener sparkListener, Stage stage) { - int numTasks = stage.numTasks(); + private int[] getProgressFromStage_1_0x(JobProgressListener sparkListener, Object stage) + throws IllegalAccessException, IllegalArgumentException, + InvocationTargetException, NoSuchMethodException, SecurityException { + int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage); int completedTasks = 0; - Method method; + int id = (int) stage.getClass().getMethod("id").invoke(stage); + Object completedTaskInfo = null; - try { - method = sparkListener.getClass().getMethod("stageIdToTasksComplete"); - completedTaskInfo = - JavaConversions.asJavaMap((HashMap) method.invoke(sparkListener)).get( - stage.id()); - } catch (NoSuchMethodException | SecurityException e) { - logger.error("Error while getting progress", e); - } catch (IllegalAccessException e) { - logger.error("Error while getting progress", e); - } catch (IllegalArgumentException e) { - logger.error("Error while getting progress", e); - } catch (InvocationTargetException e) { - logger.error("Error while getting progress", e); - } + + completedTaskInfo = JavaConversions.asJavaMap( + (HashMap) sparkListener.getClass() + .getMethod("stageIdToTasksComplete").invoke(sparkListener)).get(id); if (completedTaskInfo != null) { completedTasks += (int) completedTaskInfo; } - List parents = JavaConversions.asJavaList(stage.parents()); + List parents = JavaConversions.asJavaList((Seq) stage.getClass() + .getMethod("parents").invoke(stage)); if (parents != null) { - for (Stage s : parents) { + for (Object s : parents) { int[] p = getProgressFromStage_1_0x(sparkListener, s); numTasks += p[0]; completedTasks += p[1]; @@ -731,9 +793,12 @@ private int[] getProgressFromStage_1_0x(JobProgressListener sparkListener, Stage return new int[] {numTasks, completedTasks}; } - private int[] getProgressFromStage_1_1x(JobProgressListener sparkListener, Stage stage) { - int numTasks = stage.numTasks(); + private int[] getProgressFromStage_1_1x(JobProgressListener sparkListener, Object stage) + throws IllegalAccessException, IllegalArgumentException, + InvocationTargetException, NoSuchMethodException, SecurityException { + int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage); int completedTasks = 0; + int id = (int) stage.getClass().getMethod("id").invoke(stage); try { Method stageIdToData = sparkListener.getClass().getMethod("stageIdToData"); @@ -747,7 +812,7 @@ private int[] getProgressFromStage_1_1x(JobProgressListener sparkListener, Stage Set> keys = JavaConverters.asJavaSetConverter(stageIdData.keySet()).asJava(); for (Tuple2 k : keys) { - if (stage.id() == (int) k._1()) { + if (id == (int) k._1()) { Object uiData = stageIdData.get(k).get(); completedTasks += (int) numCompletedTasks.invoke(uiData); } @@ -756,9 +821,10 @@ private int[] getProgressFromStage_1_1x(JobProgressListener sparkListener, Stage logger.error("Error on getting progress information", e); } - List parents = JavaConversions.asJavaList(stage.parents()); + List parents = JavaConversions.asJavaList((Seq) stage.getClass() + .getMethod("parents").invoke(stage)); if (parents != null) { - for (Stage s : parents) { + for (Object s : parents) { int[] p = getProgressFromStage_1_1x(sparkListener, s); numTasks += p[0]; completedTasks += p[1]; @@ -779,10 +845,14 @@ private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) { @Override public void close() { - sc.stop(); - sc = null; + if (sc != null) { + sc.stop(); + sc = null; + } - intp.close(); + if (intp != null) { + intp.close(); + } } @Override diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index d3bda44cf9d..0828afabfd4 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -21,21 +21,15 @@ import java.lang.reflect.Method; import java.util.List; import java.util.Properties; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.spark.SparkContext; -import org.apache.spark.scheduler.ActiveJob; -import org.apache.spark.scheduler.DAGScheduler; -import org.apache.spark.scheduler.Stage; import org.apache.spark.sql.SQLContext; -import org.apache.spark.ui.jobs.JobProgressListener; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterUtils; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.WrappedInterpreter; @@ -44,13 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple2; -import scala.collection.Iterator; -import scala.collection.JavaConversions; -import scala.collection.JavaConverters; -import scala.collection.mutable.HashMap; -import scala.collection.mutable.HashSet; - /** * Spark SQL interpreter for Zeppelin. * @@ -118,9 +105,14 @@ public void close() {} @Override public InterpreterResult interpret(String st, InterpreterContext context) { + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + if (sparkInterpreter.diagnosis() && sparkInterpreter.getValidator().hasError()) { + return new InterpreterResult(Code.ERROR, sparkInterpreter.getValidator().getError()); + } + SQLContext sqlc = null; - sqlc = getSparkInterpreter().getSQLContext(); + sqlc = sparkInterpreter.getSQLContext(); SparkContext sc = sqlc.sparkContext(); if (concurrentSQL()) { @@ -129,8 +121,19 @@ public InterpreterResult interpret(String st, InterpreterContext context) { sc.setLocalProperty("spark.scheduler.pool", null); } + Object rdd = null; + try { + // method signature of sqlc.sql() is changed + // from def sql(sqlText: String): SchemaRDD (1.2 and prior) + // to def sql(sqlText: String): DataFrame (1.3 and later). + // Therefore need to use reflection to keep binary compatibility for all spark versions. + Method sqlMethod = sqlc.getClass().getMethod("sql", String.class); + rdd = sqlMethod.invoke(sqlc, st); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + throw new InterpreterException(e); + } - Object rdd = sqlc.sql(st); String msg = ZeppelinContext.showDF(sc, context, rdd, maxResult); return new InterpreterResult(Code.SUCCESS, msg); } @@ -138,8 +141,10 @@ public InterpreterResult interpret(String st, InterpreterContext context) { @Override public void cancel(InterpreterContext context) { SQLContext sqlc = getSparkInterpreter().getSQLContext(); + if (sqlc == null) { + return; + } SparkContext sc = sqlc.sparkContext(); - sc.cancelJobGroup(getJobGroup(context)); } @@ -151,117 +156,10 @@ public FormType getFormType() { @Override public int getProgress(InterpreterContext context) { - String jobGroup = getJobGroup(context); - SQLContext sqlc = getSparkInterpreter().getSQLContext(); - SparkContext sc = sqlc.sparkContext(); - JobProgressListener sparkListener = getSparkInterpreter().getJobProgressListener(); - int completedTasks = 0; - int totalTasks = 0; - - DAGScheduler scheduler = sc.dagScheduler(); - HashSet jobs = scheduler.activeJobs(); - Iterator it = jobs.iterator(); - while (it.hasNext()) { - ActiveJob job = it.next(); - String g = (String) job.properties().get("spark.jobGroup.id"); - if (jobGroup.equals(g)) { - int[] progressInfo = null; - if (sc.version().startsWith("1.0")) { - progressInfo = getProgressFromStage_1_0x(sparkListener, job.finalStage()); - } else if (sc.version().startsWith("1.1")) { - progressInfo = getProgressFromStage_1_1x(sparkListener, job.finalStage()); - } else if (sc.version().startsWith("1.2")) { - progressInfo = getProgressFromStage_1_1x(sparkListener, job.finalStage()); - } else if (sc.version().startsWith("1.3")) { - progressInfo = getProgressFromStage_1_1x(sparkListener, job.finalStage()); - } else if (sc.version().startsWith("1.4")) { - progressInfo = getProgressFromStage_1_1x(sparkListener, job.finalStage()); - } else { - logger.warn("Spark {} getting progress information not supported" + sc.version()); - continue; - } - totalTasks += progressInfo[0]; - completedTasks += progressInfo[1]; - } - } - - if (totalTasks == 0) { - return 0; - } - return completedTasks * 100 / totalTasks; - } - - private int[] getProgressFromStage_1_0x(JobProgressListener sparkListener, Stage stage) { - int numTasks = stage.numTasks(); - int completedTasks = 0; - - Method method; - Object completedTaskInfo = null; - try { - method = sparkListener.getClass().getMethod("stageIdToTasksComplete"); - completedTaskInfo = - JavaConversions.asJavaMap((HashMap) method.invoke(sparkListener)).get( - stage.id()); - } catch (NoSuchMethodException | SecurityException e) { - logger.error("Error while getting progress", e); - } catch (IllegalAccessException e) { - logger.error("Error while getting progress", e); - } catch (IllegalArgumentException e) { - logger.error("Error while getting progress", e); - } catch (InvocationTargetException e) { - logger.error("Error while getting progress", e); - } - - if (completedTaskInfo != null) { - completedTasks += (int) completedTaskInfo; - } - List parents = JavaConversions.asJavaList(stage.parents()); - if (parents != null) { - for (Stage s : parents) { - int[] p = getProgressFromStage_1_0x(sparkListener, s); - numTasks += p[0]; - completedTasks += p[1]; - } - } - - return new int[] {numTasks, completedTasks}; + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + return sparkInterpreter.getProgress(context); } - private int[] getProgressFromStage_1_1x(JobProgressListener sparkListener, Stage stage) { - int numTasks = stage.numTasks(); - int completedTasks = 0; - - try { - Method stageIdToData = sparkListener.getClass().getMethod("stageIdToData"); - HashMap, Object> stageIdData = - (HashMap, Object>) stageIdToData.invoke(sparkListener); - Class stageUIDataClass = - this.getClass().forName("org.apache.spark.ui.jobs.UIData$StageUIData"); - - Method numCompletedTasks = stageUIDataClass.getMethod("numCompleteTasks"); - - Set> keys = - JavaConverters.asJavaSetConverter(stageIdData.keySet()).asJava(); - for (Tuple2 k : keys) { - if (stage.id() == (int) k._1()) { - Object uiData = stageIdData.get(k).get(); - completedTasks += (int) numCompletedTasks.invoke(uiData); - } - } - } catch (Exception e) { - logger.error("Error on getting progress information", e); - } - - List parents = JavaConversions.asJavaList(stage.parents()); - if (parents != null) { - for (Stage s : parents) { - int[] p = getProgressFromStage_1_1x(sparkListener, s); - numTasks += p[0]; - completedTasks += p[1]; - } - } - return new int[] {numTasks, completedTasks}; - } @Override public Scheduler getScheduler() { diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkConfValidatorTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkConfValidatorTest.java new file mode 100644 index 00000000000..7e1e3e996a8 --- /dev/null +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkConfValidatorTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.spark; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class SparkConfValidatorTest { + + private File tmpDir; + + @Before + public void setUp() { + tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis()); + tmpDir.mkdirs(); + } + + @After + public void tearDown() throws Exception { + delete(tmpDir); + } + + private void delete(File file) { + if (file.isFile()) file.delete(); + else if (file.isDirectory()) { + File[] files = file.listFiles(); + if (files != null && files.length > 0) { + for (File f : files) { + delete(f); + } + } + file.delete(); + } + } + + @Test + public void testValidateSpark() { + String sparkHome = null; + String hadoopHome = null; + String hadoopConfDir = null; + String pysparkPath = null; + + SparkConfValidator scv = new SparkConfValidator( + sparkHome, hadoopHome, hadoopConfDir, pysparkPath); + + assertTrue(scv.validateSpark()); + } + + @Test + public void testValidateYarn() { + String sparkHome = null; + String hadoopHome = null; + String hadoopConfDir = "/tmp"; + String pysparkPath = null; + + SparkConfValidator scv = new SparkConfValidator( + sparkHome, hadoopHome, hadoopConfDir, pysparkPath); + + Properties p = new Properties(); + p.setProperty("spark.yarn.jar", "../README.md"); + + boolean testBuiltWithYarnEnabled = scv.validateYarn(p); + + if (testBuiltWithYarnEnabled) { + // check HADOOP_CONF_DIR is missing + scv = new SparkConfValidator( + sparkHome, hadoopHome, null, pysparkPath); + assertFalse(scv.validateYarn(p)); + + // check HADOOP_CONF_DIR not exists or a file + scv = new SparkConfValidator( + sparkHome, hadoopHome, "/notexists", pysparkPath); + assertFalse(scv.validateYarn(p)); + + scv = new SparkConfValidator( + sparkHome, hadoopHome, "../README.md", pysparkPath); + assertFalse(scv.validateYarn(p)); + + + // check spark.yarn.jar is missing + scv = new SparkConfValidator( + sparkHome, hadoopHome, hadoopConfDir, pysparkPath); + p.setProperty("spark.yarn.jar", "/notexists"); + assertFalse(scv.validateYarn(p)); + + // check spark.yarn.jar is not a file + scv = new SparkConfValidator( + sparkHome, hadoopHome, hadoopConfDir, pysparkPath); + p.setProperty("spark.yarn.jar", "/tmp"); + assertFalse(scv.validateYarn(p)); + } + } + + @Test + public void testValidatePyspark() throws IOException { + String sparkHome = null; + String hadoopHome = null; + String hadoopConfDir = null; + String pythonPath = null; + + SparkConfValidator scv = new SparkConfValidator( + sparkHome, hadoopHome, hadoopConfDir, pythonPath); + assertFalse(scv.validatePyspark(false)); + + // create mock file and directories + new File(tmpDir, "py4j-0.0.0.0-src.zip").createNewFile(); + new File(tmpDir, "pyspark.zip").createNewFile(); + new File(tmpDir, "python").mkdir(); + new File(tmpDir, "invalidpath").mkdir(); + + // when py4j and python dir is found + pythonPath = tmpDir.getAbsolutePath() + "/py4j-0.0.0.0-src.zip:" + + tmpDir.getAbsolutePath() + "/python"; + scv = new SparkConfValidator(tmpDir.getAbsolutePath(), hadoopHome, hadoopConfDir, pythonPath); + assertTrue(scv.validatePyspark(false)); + + scv = new SparkConfValidator(null, hadoopHome, hadoopConfDir, pythonPath); + assertFalse(scv.validatePyspark(false)); + + scv = new SparkConfValidator(tmpDir.getAbsolutePath() + "/invalidpath", hadoopHome, hadoopConfDir, pythonPath); + assertFalse(scv.validatePyspark(false)); + + // when py4j and python.zip is found + pythonPath = tmpDir.getAbsolutePath() + "/py4j-0.0.0.0-src.zip:" + + tmpDir.getAbsolutePath() + "/python"; + + scv = new SparkConfValidator(tmpDir.getAbsolutePath(), hadoopHome, hadoopConfDir, pythonPath); + assertTrue(scv.validatePyspark(false)); + + scv = new SparkConfValidator(null, hadoopHome, hadoopConfDir, pythonPath); + assertFalse(scv.validatePyspark(false)); + + scv = new SparkConfValidator(tmpDir.getAbsolutePath() + "/invalidpath", hadoopHome, hadoopConfDir, pythonPath); + assertFalse(scv.validatePyspark(false)); + + // when python dir or python.zip is not found + pythonPath = tmpDir.getAbsolutePath() + "/py4j-0.0.0.0-src.zip:" + + tmpDir.getAbsolutePath() + "/invalidpath"; + + scv = new SparkConfValidator(tmpDir.getAbsolutePath(), hadoopHome, hadoopConfDir, pythonPath); + assertFalse(scv.validatePyspark(false)); + + // when py4j can not be found + pythonPath = tmpDir.getAbsolutePath() + "/invalidpath:" + + tmpDir.getAbsolutePath() + "/python"; + scv = new SparkConfValidator(tmpDir.getAbsolutePath(), hadoopHome, hadoopConfDir, pythonPath); + assertFalse(scv.validatePyspark(false)); + } +} diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index aa395aaeb2b..aab80432cb8 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -108,8 +108,12 @@ protected static void startUp() throws Exception { // ci environment runs spark cluster for testing // so configure zeppelin use spark cluster if ("true".equals(System.getenv("CI"))) { - // assume first one is spark - InterpreterSetting sparkIntpSetting = ZeppelinServer.notebook.getInterpreterFactory().get().get(0); + InterpreterSetting sparkIntpSetting = null; + for(InterpreterSetting intpSetting : ZeppelinServer.notebook.getInterpreterFactory().get()) { + if (intpSetting.getGroup().equals("spark")) { + sparkIntpSetting = intpSetting; + } + } // set spark master sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071"); @@ -120,8 +124,12 @@ protected static void startUp() throws Exception { ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id()); } else { - // assume first one is spark - InterpreterSetting sparkIntpSetting = ZeppelinServer.notebook.getInterpreterFactory().get().get(0); + InterpreterSetting sparkIntpSetting = null; + for(InterpreterSetting intpSetting : ZeppelinServer.notebook.getInterpreterFactory().get()) { + if (intpSetting.getGroup().equals("spark")) { + sparkIntpSetting = intpSetting; + } + } String sparkHome = getSparkHome(); if (sparkHome != null) { diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index fd4a8b301bf..aa2476a5d5d 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -64,7 +64,7 @@ public void basicRDDTransformationAndActionTest() throws IOException { // run markdown paragraph, again Paragraph p = note.addParagraph(); - p.setText("print(sc.parallelize(1 to 10).reduce(_ + _))"); + p.setText("%spark print(sc.parallelize(1 to 10).reduce(_ + _))"); note.run(p.getId()); waitForFinish(p); assertEquals("55", p.getResult().message()); @@ -88,7 +88,7 @@ public void pySparkTest() throws IOException { } ZeppelinServer.notebook.removeNote(note.id()); } - + @Test public void pySparkAutoConvertOptionTest() throws IOException { // create new note @@ -113,11 +113,11 @@ public void zRunTest() throws IOException { // create new note Note note = ZeppelinServer.notebook.createNote(); Paragraph p0 = note.addParagraph(); - p0.setText("z.run(1)"); + p0.setText("%spark z.run(1)"); Paragraph p1 = note.addParagraph(); - p1.setText("val a=10"); + p1.setText("%spark val a=10"); Paragraph p2 = note.addParagraph(); - p2.setText("print(a)"); + p2.setText("%spark print(a)"); note.run(p0.getId()); waitForFinish(p0); @@ -135,7 +135,7 @@ public void zRunTest() throws IOException { */ private int getSparkVersionNumber(Note note) { Paragraph p = note.addParagraph(); - p.setText("print(sc.version)"); + p.setText("%spark print(sc.version)"); note.run(p.getId()); waitForFinish(p); String sparkVersion = p.getResult().message();