diff --git a/.travis.yml b/.travis.yml index 0a02d080716..76125d19e2d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -83,14 +83,14 @@ matrix: - sudo: required jdk: "oraclejdk8" dist: trusty - env: PYTHON="3" SPARKR="true" PROFILE="-Pspark-2.2 -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="install -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/JdbcIntegrationTest.java,**/SparkIntegrationTest.java,**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false" + env: PYTHON="3" SPARKR="true" PROFILE="-Pspark-2.2 -Phelium-dev -Pexamples -Pspark-scala-2.11" BUILD_FLAG="install -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/JdbcIntegrationTest.java,**/SparkIntegrationTest.java,**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false" # Test selenium with spark module for spark 2.3 - jdk: "oraclejdk8" dist: trusty addons: firefox: "31.0" - env: BUILD_PLUGINS="true" CI="true" PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.3.2" HADOOP_VER="2.6" PROFILE="-Pspark-2.3 -Phadoop2 -Phelium-dev -Pexamples -Pintegration -Pscala-2.11" BUILD_FLAG="install -DskipTests -DskipRat -pl ${INTERPRETERS}" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-integration -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" CI="true" PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.3.2" HADOOP_VER="2.6" PROFILE="-Pspark-2.3 -Phadoop2 -Phelium-dev -Pexamples -Pintegration -Pspark-scala-2.11" BUILD_FLAG="install -DskipTests -DskipRat -pl ${INTERPRETERS}" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-integration -DfailIfNoTests=false" # Test interpreter modules - jdk: "oraclejdk8" @@ -99,41 +99,47 @@ matrix: # Run Spark integration test and unit test separately for each spark version - # ZeppelinSparkClusterTest24, SparkIntegrationTest24, JdbcIntegrationTest, Unit test of Spark 2.4 + # ZeppelinSparkClusterTest24, SparkIntegrationTest24, JdbcIntegrationTest, Unit test of Spark 2.4 (Scala-2.11) - sudo: required jdk: "oraclejdk8" dist: trusty - env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.11" PROFILE="-Pspark-2.4 -Pscala-2.11 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,jdbc,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24,JdbcIntegrationTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.11" PROFILE="-Pspark-2.4 -Pspark-scala-2.11 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,jdbc,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24,JdbcIntegrationTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false" - # ZeppelinSparkClusterTest23, SparkIntegrationTest23, Unit test of Spark 2.3 + # ZeppelinSparkClusterTest24, SparkIntegrationTest24, JdbcIntegrationTest, Unit test of Spark 2.4 (Scala-2.12) - sudo: required jdk: "oraclejdk8" dist: trusty - env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.11" PROFILE="-Pspark-2.3 -Pscala-2.11 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest23,SparkIntegrationTest23,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.12" PROFILE="-Pspark-2.4 -Pspark-scala-2.12 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,jdbc,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24,JdbcIntegrationTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false" - # ZeppelinSparkClusterTest22, SparkIntegrationTest22, Unit test of Spark 2.2 + # ZeppelinSparkClusterTest23, SparkIntegrationTest23, Unit test of Spark 2.3 (Scala-2.11) - sudo: required jdk: "oraclejdk8" dist: trusty - env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.2 -Pscala-2.10 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest22,SparkIntegrationTest22,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.11" PROFILE="-Pspark-2.3 -Pspark-scala-2.11 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest23,SparkIntegrationTest23,org.apache.zeppelin.spark.* -DfailIfNoTests=false" - # ZeppelinSparkClusterTest21, SparkIntegrationTest21, Unit test of Spark 2.1 + # ZeppelinSparkClusterTest22, SparkIntegrationTest22, Unit test of Spark 2.2 (Scala-2.10) - sudo: required jdk: "oraclejdk8" dist: trusty - env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.1 -Phadoop2 -Pscala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest21,SparkIntegrationTest21,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.2 -Pspark-scala-2.10 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest22,SparkIntegrationTest22,org.apache.zeppelin.spark.* -DfailIfNoTests=false" - # ZeppelinSparkClusterTest20, SparkIntegrationTest20, Unit test of Spark 2.0 + # ZeppelinSparkClusterTest21, SparkIntegrationTest21, Unit test of Spark 2.1 (Scala-2.10) - sudo: required jdk: "oraclejdk8" dist: trusty - env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.0 -Phadoop2 -Pscala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest20,SparkIntegrationTest20,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.1 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest21,SparkIntegrationTest21,org.apache.zeppelin.spark.* -DfailIfNoTests=false" - # ZeppelinSparkClusterTest16, SparkIntegrationTest16, Unit test of Spark 1.6 + # ZeppelinSparkClusterTest20, SparkIntegrationTest20, Unit test of Spark 2.0 (Scala-2.10) - sudo: required jdk: "oraclejdk8" dist: trusty - env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-1.6 -Phadoop2 -Pscala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest16,SparkIntegrationTest16,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.0 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest20,SparkIntegrationTest20,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + + # ZeppelinSparkClusterTest16, SparkIntegrationTest16, Unit test of Spark 1.6 (Scala-2.10) + - sudo: required + jdk: "oraclejdk8" + dist: trusty + env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-1.6 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest16,SparkIntegrationTest16,org.apache.zeppelin.spark.* -DfailIfNoTests=false" # Test python/pyspark with python 2, livy 0.5 - sudo: required diff --git a/pom.xml b/pom.xml index a3aad15b076..afb702d556c 100644 --- a/pom.xml +++ b/pom.xml @@ -97,7 +97,7 @@ 1.8 2.10.5 2.10 - 2.2.4 + 3.0.7 1.12.5 diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml index c58324f04a5..b58cc56804e 100644 --- a/spark/interpreter/pom.xml +++ b/spark/interpreter/pom.xml @@ -48,7 +48,7 @@ 3.2.6 3.2.10 - ${scala.version} + ${spark.scala.version} **/PySparkInterpreterMatplotlibTest.java **/*Test.* @@ -59,18 +59,20 @@ org.apache.zeppelin zeppelin-display ${project.version} - - - - org.apache.zeppelin - spark-scala-2.10 - ${project.version} - - - - org.apache.zeppelin - spark-scala-2.11 - ${project.version} + + + org.scala-lang + scala-library + + + org.scala-lang + scala-compiler + + + org.scala-lang + scalap + + @@ -126,14 +128,14 @@ org.apache.spark - spark-repl_${scala.binary.version} + spark-repl_${spark.scala.binary.version} ${spark.version} provided org.apache.spark - spark-core_${scala.binary.version} + spark-core_${spark.scala.binary.version} ${spark.version} provided @@ -153,7 +155,7 @@ org.apache.spark - spark-hive_${scala.binary.version} + spark-hive_${spark.scala.binary.version} ${spark.version} provided @@ -172,21 +174,21 @@ org.scala-lang scala-library - ${scala.version} + ${spark.scala.version} provided org.scala-lang scala-compiler - ${scala.version} + ${spark.scala.version} provided org.scala-lang scala-reflect - ${scala.version} + ${spark.scala.version} provided @@ -212,7 +214,7 @@ org.scalatest - scalatest_${scala.binary.version} + scalatest_${spark.scala.binary.version} ${scalatest.version} test diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java new file mode 100644 index 00000000000..a4bac1fd81e --- /dev/null +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark; + +import org.apache.spark.SparkContext; +import org.apache.spark.sql.SQLContext; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; + +import java.util.List; + +/** + * This is bridge class which bridge the communication between java side and scala side. + * Java side reply on this abstract class which is implemented by different scala versions. + */ +public abstract class AbstractSparkScalaInterpreter { + + public abstract SparkContext getSparkContext(); + + public abstract SQLContext getSqlContext(); + + public abstract Object getSparkSession(); + + public abstract String getSparkUrl(); + + public abstract BaseZeppelinContext getZeppelinContext(); + + public int getProgress(InterpreterContext context) throws InterpreterException { + return getProgress(Utils.buildJobGroupId(context), context); + } + + public abstract int getProgress(String jobGroup, + InterpreterContext context) throws InterpreterException; + + public void cancel(InterpreterContext context) throws InterpreterException { + getSparkContext().cancelJobGroup(Utils.buildJobGroupId(context)); + } + + public Interpreter.FormType getFormType() throws InterpreterException { + return Interpreter.FormType.SIMPLE; + } + + public abstract void open(); + + public abstract void close(); + + public abstract InterpreterResult interpret(String st, InterpreterContext context); + + public abstract List completion(String buf, + int cursor, + InterpreterContext interpreterContext); +} diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index d1433e80f27..960227a58d2 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -140,7 +140,7 @@ protected void preCallPython(InterpreterContext context) { if (context.getLocalProperties().containsKey("pool")) { pool = "'" + context.getLocalProperties().get("pool") + "'"; } - String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")"; + String setPoolStmt = "if 'sc' in locals():\n\tsc.setLocalProperty('spark.scheduler.pool', " + pool + ")"; callPython(new PythonInterpretRequest(setPoolStmt, false, false)); } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 1b5b9f623cb..33769be8255 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -17,52 +17,50 @@ package org.apache.zeppelin.spark; -import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.apache.zeppelin.interpreter.AbstractInterpreter; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterHookRegistry; +import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; /** - * SparkInterpreter of Java implementation. It is just wrapper of Spark211Interpreter - * and Spark210Interpreter. + * SparkInterpreter of Java implementation. It delegates to different scala version AbstractSparkScalaInterpreter. + * */ public class SparkInterpreter extends AbstractInterpreter { private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreter.class); - private BaseSparkScalaInterpreter innerInterpreter; + private static AtomicInteger SESSION_NUM = new AtomicInteger(0); + private AbstractSparkScalaInterpreter innerInterpreter; private Map innerInterpreterClassMap = new HashMap<>(); private SparkContext sc; private JavaSparkContext jsc; private SQLContext sqlContext; private Object sparkSession; - private SparkZeppelinContext z; private SparkVersion sparkVersion; private boolean enableSupportedVersionCheck; private String sparkUrl; - private SparkShims sparkShims; - - private static InterpreterHookRegistry hooks; public SparkInterpreter(Properties properties) { @@ -75,13 +73,12 @@ public SparkInterpreter(Properties properties) { properties.getProperty("zeppelin.spark.enableSupportedVersionCheck", "true")); innerInterpreterClassMap.put("2.10", "org.apache.zeppelin.spark.SparkScala210Interpreter"); innerInterpreterClassMap.put("2.11", "org.apache.zeppelin.spark.SparkScala211Interpreter"); + innerInterpreterClassMap.put("2.12", "org.apache.zeppelin.spark.SparkScala212Interpreter"); } @Override public void open() throws InterpreterException { try { - String scalaVersion = extractScalaVersion(); - LOGGER.info("Using Scala Version: " + scalaVersion); SparkConf conf = new SparkConf(); for (Map.Entry entry : getProperties().entrySet()) { if (!StringUtils.isBlank(entry.getValue().toString())) { @@ -99,16 +96,10 @@ public void open() throws InterpreterException { } // use local mode for embedded spark mode when spark.master is not found conf.setIfMissing("spark.master", "local"); - - String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion); - Class clazz = Class.forName(innerIntpClassName); - this.innerInterpreter = (BaseSparkScalaInterpreter) - clazz.getConstructor(SparkConf.class, List.class, Boolean.class) - .newInstance(conf, getDependencyFiles(), - Boolean.parseBoolean(getProperty("zeppelin.spark.printREPLOutput", "true"))); + this.innerInterpreter = loadSparkScalaInterpreter(conf); this.innerInterpreter.open(); - sc = this.innerInterpreter.sc(); + sc = this.innerInterpreter.getSparkContext(); jsc = JavaSparkContext.fromSparkContext(sc); sparkVersion = SparkVersion.fromVersionString(sc.version()); if (enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion()) { @@ -116,38 +107,72 @@ public void open() throws InterpreterException { + "\nYou can set zeppelin.spark.enableSupportedVersionCheck to false if you really" + " want to try this version of spark."); } - sqlContext = this.innerInterpreter.sqlContext(); - sparkSession = this.innerInterpreter.sparkSession(); - hooks = getInterpreterGroup().getInterpreterHookRegistry(); - sparkUrl = this.innerInterpreter.sparkUrl(); + sqlContext = this.innerInterpreter.getSqlContext(); + sparkSession = this.innerInterpreter.getSparkSession(); + sparkUrl = this.innerInterpreter.getSparkUrl(); String sparkUrlProp = getProperty("zeppelin.spark.uiWebUrl", ""); if (!StringUtils.isBlank(sparkUrlProp)) { sparkUrl = sparkUrlProp; } - sparkShims = SparkShims.getInstance(sc.version(), getProperties()); - sparkShims.setupSparkListener(sc.master(), sparkUrl, InterpreterContext.get()); - z = new SparkZeppelinContext(sc, sparkShims, hooks, - Integer.parseInt(getProperty("zeppelin.spark.maxResult"))); - this.innerInterpreter.bind("z", z.getClass().getCanonicalName(), z, - Lists.newArrayList("@transient")); + SESSION_NUM.incrementAndGet(); } catch (Exception e) { LOGGER.error("Fail to open SparkInterpreter", e); throw new InterpreterException("Fail to open SparkInterpreter", e); } } + /** + * Load AbstractSparkScalaInterpreter based on the runtime scala version. + * Load AbstractSparkScalaInterpreter from the following location: + * + * SparkScala210Interpreter ZEPPELIN_HOME/interpreter/spark/scala-2.10 + * SparkScala211Interpreter ZEPPELIN_HOME/interpreter/spark/scala-2.11 + * SparkScala212Interpreter ZEPPELIN_HOME/interpreter/spark/scala-2.12 + * + * @param conf + * @return AbstractSparkScalaInterpreter + * @throws Exception + */ + private AbstractSparkScalaInterpreter loadSparkScalaInterpreter(SparkConf conf) throws Exception { + String scalaVersion = extractScalaVersion(); + ClassLoader scalaInterpreterClassLoader = Thread.currentThread().getContextClassLoader(); + + String zeppelinHome = System.getenv("ZEPPELIN_HOME"); + if (zeppelinHome != null) { + // ZEPPELIN_HOME is null in yarn-cluster mode, load it directly via current ClassLoader. + // otherwise, load from the specific folder ZEPPELIN_HOME/interpreter/spark/scala- + + File scalaJarFolder = new File(zeppelinHome + "/interpreter/spark/scala-" + scalaVersion); + List urls = new ArrayList<>(); + for (File file : scalaJarFolder.listFiles()) { + LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of spark scala interpreter: " + + scalaJarFolder); + urls.add(file.toURI().toURL()); + } + scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new URL[0]), + Thread.currentThread().getContextClassLoader()); + } + + String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion); + Class clazz = scalaInterpreterClassLoader.loadClass(innerIntpClassName); + return (AbstractSparkScalaInterpreter) + clazz.getConstructor(SparkConf.class, List.class, Properties.class, InterpreterGroup.class, URLClassLoader.class) + .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), scalaInterpreterClassLoader); + } + @Override - public void close() { + public void close() throws InterpreterException { LOGGER.info("Close SparkInterpreter"); - if (innerInterpreter != null) { + if (SESSION_NUM.decrementAndGet() == 0 && innerInterpreter != null) { innerInterpreter.close(); innerInterpreter = null; } } @Override - public InterpreterResult internalInterpret(String st, InterpreterContext context) { + public InterpreterResult internalInterpret(String st, + InterpreterContext context) throws InterpreterException { context.out.clear(); sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false); // set spark.scheduler.pool to null to clear the pool assosiated with this paragraph @@ -158,15 +183,14 @@ public InterpreterResult internalInterpret(String st, InterpreterContext context } @Override - public void cancel(InterpreterContext context) { - sc.cancelJobGroup(Utils.buildJobGroupId(context)); + public void cancel(InterpreterContext context) throws InterpreterException { + innerInterpreter.cancel(context); } @Override public List completion(String buf, int cursor, - InterpreterContext interpreterContext) { - LOGGER.debug("buf: " + buf + ", cursor:" + cursor); + InterpreterContext interpreterContext) throws InterpreterException { return innerInterpreter.completion(buf, cursor, interpreterContext); } @@ -176,12 +200,12 @@ public FormType getFormType() { } @Override - public int getProgress(InterpreterContext context) { + public int getProgress(InterpreterContext context) throws InterpreterException { return innerInterpreter.getProgress(Utils.buildJobGroupId(context), context); } - public SparkZeppelinContext getZeppelinContext() { - return this.z; + public BaseZeppelinContext getZeppelinContext() { + return this.innerInterpreter.getZeppelinContext(); } public SparkContext getSparkContext() { @@ -204,19 +228,20 @@ public SparkVersion getSparkVersion() { return sparkVersion; } - private String extractScalaVersion() throws IOException, InterruptedException { + private String extractScalaVersion() throws InterpreterException { String scalaVersionString = scala.util.Properties.versionString(); + LOGGER.info("Using Scala: " + scalaVersionString); if (scalaVersionString.contains("version 2.10")) { return "2.10"; - } else { + } else if (scalaVersionString.contains("version 2.11")) { return "2.11"; + } else if (scalaVersionString.contains("version 2.12")) { + return "2.12"; + } else { + throw new InterpreterException("Unsupported scala version: " + scalaVersionString); } } - public boolean isSparkContextInitialized() { - return this.sc != null; - } - private List getDependencyFiles() throws InterpreterException { List depFiles = new ArrayList<>(); // add jar from local repo diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 3b14eedcfb4..4afb4849115 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -101,11 +101,12 @@ public void open() throws InterpreterException { ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession()); } ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); - ZeppelinRContext.setZeppelinContext((SparkZeppelinContext) sparkInterpreter.getZeppelinContext()); + ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext()); zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, SparkRBackend.port(), sparkVersion, timeout, this); try { zeppelinR.open(); + logger.info("ZeppelinR is opened successfully."); } catch (IOException e) { throw new InterpreterException("Exception while opening SparkRInterpreter", e); } @@ -167,7 +168,7 @@ public InterpreterResult interpret(String lines, InterpreterContext interpreterC return new InterpreterResult( rDisplay.code(), - rDisplay.type(), + rDisplay.typ(), rDisplay.content() ); } else { @@ -183,8 +184,9 @@ public InterpreterResult interpret(String lines, InterpreterContext interpreterC } @Override - public void close() { + public void close() throws InterpreterException { zeppelinR.close(); + this.sparkInterpreter.close(); } @Override diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java index 744e5329bf8..723a983d81c 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java @@ -39,7 +39,6 @@ class Utils { "%html Spark lower than 2.2 is deprecated, " + "if you don't want to see this message, please set " + "zeppelin.spark.deprecateMsg.show to false."; - private static final String SCALA_COMPILER_VERSION = evaluateScalaCompilerVersion(); static Object invokeMethod(Object o, String name) { return invokeMethod(o, name, new Class[]{}, new Object[]{}); @@ -106,45 +105,6 @@ static boolean isScala2_10() { } } - static boolean isScala2_11() { - return !isScala2_10(); - } - - static boolean isCompilerAboveScala2_11_7() { - if (isScala2_10() || SCALA_COMPILER_VERSION == null) { - return false; - } - Pattern p = Pattern.compile("([0-9]+)[.]([0-9]+)[.]([0-9]+)"); - Matcher m = p.matcher(SCALA_COMPILER_VERSION); - if (m.matches()) { - int major = Integer.parseInt(m.group(1)); - int minor = Integer.parseInt(m.group(2)); - int bugfix = Integer.parseInt(m.group(3)); - return (major > 2 || (major == 2 && minor > 11) || (major == 2 && minor == 11 && bugfix > 7)); - } - return false; - } - - private static String evaluateScalaCompilerVersion() { - String version = null; - try { - Properties p = new Properties(); - Class completionClass = findClass("scala.tools.nsc.interpreter.JLineCompletion"); - if (completionClass != null) { - try (java.io.InputStream in = completionClass.getClass() - .getResourceAsStream("/compiler.properties")) { - p.load(in); - version = p.getProperty("version.number"); - } catch (java.io.IOException e) { - logger.error("Failed to evaluate Scala compiler version", e); - } - } - } catch (RuntimeException e) { - logger.error("Failed to evaluate Scala compiler version", e); - } - return version; - } - static boolean isSpark2() { try { Class.forName("org.apache.spark.sql.SparkSession"); diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java index 71f35688a40..60c5b17b02c 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java @@ -151,7 +151,7 @@ public void open() throws IOException, InterpreterException { cmd.addArgument(SparkRBackend.socketSecret()); } // dump out the R command to facilitate manually running it, e.g. for fault diagnosis purposes - logger.debug(cmd.toString()); + logger.debug("R Command: " + cmd.toString()); executor = new DefaultExecutor(); outputStream = new SparkRInterpreterOutputStream(logger, sparkRInterpreter); diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java index 80ea03b9fcd..59a1e6f2e3f 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java @@ -20,6 +20,7 @@ import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; /** * Contains the Spark and Zeppelin Contexts made available to SparkR. @@ -27,7 +28,7 @@ public class ZeppelinRContext { private static SparkContext sparkContext; private static SQLContext sqlContext; - private static SparkZeppelinContext zeppelinContext; + private static BaseZeppelinContext zeppelinContext; private static Object sparkSession; private static JavaSparkContext javaSparkContext; @@ -35,7 +36,7 @@ public static void setSparkContext(SparkContext sparkContext) { ZeppelinRContext.sparkContext = sparkContext; } - public static void setZeppelinContext(SparkZeppelinContext zeppelinContext) { + public static void setZeppelinContext(BaseZeppelinContext zeppelinContext) { ZeppelinRContext.zeppelinContext = zeppelinContext; } @@ -55,7 +56,7 @@ public static SQLContext getSqlContext() { return sqlContext; } - public static SparkZeppelinContext getZeppelinContext() { + public static BaseZeppelinContext getZeppelinContext() { return zeppelinContext; } diff --git a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala b/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala index a9014c2abbf..9880691747a 100644 --- a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala +++ b/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala @@ -29,7 +29,7 @@ import org.jsoup.safety.Whitelist import scala.collection.JavaConversions._ import scala.util.matching.Regex -case class RDisplay(content: String, `type`: Type, code: Code) +class RDisplay(val content: String, val typ: Type, val code: Code) object ZeppelinRDisplay { @@ -42,7 +42,7 @@ object ZeppelinRDisplay { val body = document.body() - if (body.getElementsByTag("p").isEmpty) return RDisplay(body.html(), HTML, SUCCESS) + if (body.getElementsByTag("p").isEmpty) return new RDisplay(body.html(), HTML, SUCCESS) val bodyHtml = body.html() @@ -70,21 +70,21 @@ object ZeppelinRDisplay { // remove HTML tag while preserving whitespaces and newlines val text = Jsoup.clean(body.html(), "", Whitelist.none(), new OutputSettings().prettyPrint(false)) - RDisplay(text, TEXT, SUCCESS) + new RDisplay(text, TEXT, SUCCESS) } private def tableDisplay(body: Element): RDisplay = { val p = body.getElementsByTag("p").first().html.replace("“%table " , "").replace("”", "") val r = (pattern findFirstIn p).getOrElse("") val table = p.replace(r, "").replace("\\t", "\t").replace("\\n", "\n") - RDisplay(table, TABLE, SUCCESS) + new RDisplay(table, TABLE, SUCCESS) } private def imgDisplay(body: Element): RDisplay = { val p = body.getElementsByTag("p").first().html.replace("“%img " , "").replace("”", "") val r = (pattern findFirstIn p).getOrElse("") val img = p.replace(r, "") - RDisplay(img, IMG, SUCCESS) + new RDisplay(img, IMG, SUCCESS) } private def htmlDisplay(body: Element, imageWidth: String): RDisplay = { @@ -112,6 +112,6 @@ object ZeppelinRDisplay { image.attr("width", imageWidth) } - RDisplay(body.html, HTML, SUCCESS) + new RDisplay(body.html, HTML, SUCCESS) } } diff --git a/spark/interpreter/src/test/resources/log4j.properties b/spark/interpreter/src/test/resources/log4j.properties index edd13e463ba..38ba9e1ce46 100644 --- a/spark/interpreter/src/test/resources/log4j.properties +++ b/spark/interpreter/src/test/resources/log4j.properties @@ -44,7 +44,7 @@ log4j.logger.DataNucleus.Datastore=ERROR log4j.logger.org.hibernate.type=ALL log4j.logger.org.apache.zeppelin.interpreter=WARN -log4j.logger.org.apache.zeppelin.spark=INFO +log4j.logger.org.apache.zeppelin.spark=DEBUG log4j.logger.org.apache.zeppelin.python=DEBUG log4j.logger.org.apache.spark.repl.Main=WARN diff --git a/spark/pom.xml b/spark/pom.xml index 91b9c2fb438..b83281df288 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -44,8 +44,10 @@ 2.15.2 - 2.2.0 - 0.10.4 + 2.2.3 + 2.11 + 2.11.12 + 0.10.7 spark-${spark.version} @@ -61,6 +63,7 @@ spark-scala-parent scala-2.10 scala-2.11 + scala-2.12 spark-dependencies spark-shims spark1-shims @@ -84,13 +87,6 @@ log4j - - org.scalatest - scalatest_${scala.binary.version} - ${scalatest.version} - test - - junit junit @@ -117,24 +113,6 @@ - - org.scalatest - scalatest-maven-plugin - - ${project.build.directory}/surefire-reports - . - WDF TestSuite.txt - - - - test - - test - - - - - net.alchim31.maven scala-maven-plugin @@ -189,10 +167,40 @@ + + + + spark-scala-2.12 + + 2.12.7 + 2.12 + + + + + spark-scala-2.11 + + true + + + 2.11.8 + 2.11 + + + + + spark-scala-2.10 + + 2.10.5 + 2.10 + + + + spark-2.4 - 2.4.0 + 2.4.3 2.5.0 0.10.7 @@ -201,7 +209,7 @@ spark-2.3 - 2.3.2 + 2.3.3 2.5.0 0.10.7 @@ -213,8 +221,8 @@ true - 2.2.1 - 0.10.4 + 2.2.3 + 0.10.7 diff --git a/spark/scala-2.10/pom.xml b/spark/scala-2.10/pom.xml index 2c8e8d90973..f0e2f5b9cc0 100644 --- a/spark/scala-2.10/pom.xml +++ b/spark/scala-2.10/pom.xml @@ -33,10 +33,29 @@ Zeppelin: Spark Interpreter Scala_2.10 - 2.2.0 - 2.10.5 - 2.10 - ${scala.version} + 2.2.3 + 2.10.5 + 2.10 + ${spark.scala.version} + + + + maven-resources-plugin + + + org.codehaus.mojo + build-helper-maven-plugin + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + diff --git a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala index 9d2ac83e01c..eb0e29756c5 100644 --- a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala +++ b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala @@ -18,14 +18,16 @@ package org.apache.zeppelin.spark import java.io.File +import java.net.URLClassLoader import java.nio.file.{Files, Paths} +import java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.repl.SparkILoop import org.apache.spark.repl.SparkILoop._ import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion import org.apache.zeppelin.interpreter.util.InterpreterOutputStream -import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult} +import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup} import org.slf4j.{Logger, LoggerFactory} import scala.tools.nsc.Settings @@ -36,8 +38,10 @@ import scala.tools.nsc.interpreter._ */ class SparkScala210Interpreter(override val conf: SparkConf, override val depFiles: java.util.List[String], - override val printReplOutput: java.lang.Boolean) - extends BaseSparkScalaInterpreter(conf, depFiles, printReplOutput) { + override val properties: Properties, + override val interpreterGroup: InterpreterGroup, + override val sparkInterpreterClassLoader: URLClassLoader) + extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) { lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass) @@ -64,10 +68,10 @@ class SparkScala210Interpreter(override val conf: SparkConf, } val settings = new Settings() - settings.embeddedDefaults(Thread.currentThread().getContextClassLoader()) + settings.embeddedDefaults(sparkInterpreterClassLoader) settings.usejavacp.value = true settings.classpath.value = getUserJars.mkString(File.pathSeparator) - if (printReplOutput) { + if (properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean) { Console.setOut(interpreterOutput) } sparkILoop = new SparkILoop() @@ -80,13 +84,18 @@ class SparkScala210Interpreter(override val conf: SparkConf, "org$apache$spark$repl$SparkILoop$$chooseReader", Array(settings.getClass), Array(settings)).asInstanceOf[InteractiveReader] setDeclaredField(sparkILoop, "org$apache$spark$repl$SparkILoop$$in", reader) - scalaCompleter = reader.completion.completer() + this.scalaCompletion = reader.completion createSparkContext() + createZeppelinContext() } - override def close(): Unit = { - super.close() + protected def completion(buf: String, + cursor: Int, + context: InterpreterContext): java.util.List[InterpreterCompletion] = { + val completions = scalaCompletion.completer().complete(buf.substring(0, cursor), cursor).candidates + .map(e => new InterpreterCompletion(e, e, null)) + scala.collection.JavaConversions.seqAsJavaList(completions) } def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result = diff --git a/spark/scala-2.11/pom.xml b/spark/scala-2.11/pom.xml index fcee7c48133..23b7461f2dd 100644 --- a/spark/scala-2.11/pom.xml +++ b/spark/scala-2.11/pom.xml @@ -33,10 +33,29 @@ Zeppelin: Spark Interpreter Scala_2.11 - 2.4.0 - 2.11.12 - 2.11 - ${scala.version} + 2.4.3 + 2.11.12 + 2.11 + ${spark.scala.version} + + + + maven-resources-plugin + + + org.codehaus.mojo + build-helper-maven-plugin + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala index 8465145e60e..7d99a0b8a08 100644 --- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala +++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala @@ -20,12 +20,13 @@ package org.apache.zeppelin.spark import java.io.{BufferedReader, File} import java.net.URLClassLoader import java.nio.file.{Files, Paths} +import java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.repl.SparkILoop import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion import org.apache.zeppelin.interpreter.util.InterpreterOutputStream -import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult} +import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup} import org.slf4j.LoggerFactory import org.slf4j.Logger @@ -37,8 +38,10 @@ import scala.tools.nsc.interpreter._ */ class SparkScala211Interpreter(override val conf: SparkConf, override val depFiles: java.util.List[String], - override val printReplOutput: java.lang.Boolean) - extends BaseSparkScalaInterpreter(conf, depFiles, printReplOutput) { + override val properties: Properties, + override val interpreterGroup: InterpreterGroup, + override val sparkInterpreterClassLoader: URLClassLoader) + extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) { import SparkScala211Interpreter._ @@ -66,10 +69,11 @@ class SparkScala211Interpreter(override val conf: SparkConf, val settings = new Settings() settings.processArguments(List("-Yrepl-class-based", "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) - settings.embeddedDefaults(Thread.currentThread().getContextClassLoader()) + settings.embeddedDefaults(sparkInterpreterClassLoader) settings.usejavacp.value = true settings.classpath.value = getUserJars.mkString(File.pathSeparator) + val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean val replOut = if (printReplOutput) { new JPrintWriter(interpreterOutput, true) } else { @@ -85,18 +89,29 @@ class SparkScala211Interpreter(override val conf: SparkConf, sparkILoop.in = reader sparkILoop.initializeSynchronous() loopPostInit(this) - this.scalaCompleter = reader.completion.completer() + this.scalaCompletion = reader.completion createSparkContext() + createZeppelinContext() + } + + protected override def completion(buf: String, + cursor: Int, + context: InterpreterContext): java.util.List[InterpreterCompletion] = { + val completions = scalaCompletion.completer().complete(buf.substring(0, cursor), cursor).candidates + .map(e => new InterpreterCompletion(e, e, null)) + scala.collection.JavaConversions.seqAsJavaList(completions) } protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = { sparkILoop.beQuietDuring { - sparkILoop.bind(name, tpe, value, modifier) + val result = sparkILoop.bind(name, tpe, value, modifier) + if (result != IR.Success) { + throw new RuntimeException("Fail to bind variable: " + name) + } } } - override def close(): Unit = { super.close() if (sparkILoop != null) { diff --git a/spark/scala-2.12/pom.xml b/spark/scala-2.12/pom.xml new file mode 100644 index 00000000000..086203bdde2 --- /dev/null +++ b/spark/scala-2.12/pom.xml @@ -0,0 +1,61 @@ + + + + + org.apache.zeppelin + spark-scala-parent + 0.9.0-SNAPSHOT + ../spark-scala-parent/pom.xml + + + 4.0.0 + org.apache.zeppelin + spark-scala-2.12 + 0.9.0-SNAPSHOT + jar + Zeppelin: Spark Interpreter Scala_2.12 + + + 2.4.3 + 2.12.8 + 2.12 + ${spark.scala.version} + + + + + + maven-resources-plugin + + + org.codehaus.mojo + build-helper-maven-plugin + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + + diff --git a/spark/scala-2.12/spark-scala-parent b/spark/scala-2.12/spark-scala-parent new file mode 120000 index 00000000000..e5e899e58cf --- /dev/null +++ b/spark/scala-2.12/spark-scala-parent @@ -0,0 +1 @@ +../spark-scala-parent \ No newline at end of file diff --git a/spark/scala-2.12/src/main/resources/log4j.properties b/spark/scala-2.12/src/main/resources/log4j.properties new file mode 100644 index 00000000000..0c90b21ae00 --- /dev/null +++ b/spark/scala-2.12/src/main/resources/log4j.properties @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n +#log4j.appender.stdout.layout.ConversionPattern= +#%5p [%t] (%F:%L) - %m%n +#%-4r [%t] %-5p %c %x - %m%n +# + +# Root logger option +log4j.rootLogger=INFO, stdout + +#mute some noisy guys +log4j.logger.org.apache.hadoop.mapred=WARN +log4j.logger.org.apache.hadoop.hive.ql=WARN +log4j.logger.org.apache.hadoop.hive.metastore=WARN +log4j.logger.org.apache.haadoop.hive.service.HiveServer=WARN +log4j.logger.org.apache.zeppelin.scheduler=WARN + +log4j.logger.org.quartz=WARN +log4j.logger.DataNucleus=WARN +log4j.logger.DataNucleus.MetaData=ERROR +log4j.logger.DataNucleus.Datastore=ERROR + +# Log all JDBC parameters +log4j.logger.org.hibernate.type=ALL + +log4j.logger.org.apache.zeppelin.interpreter=DEBUG +log4j.logger.org.apache.zeppelin.spark=DEBUG + + +log4j.logger.org.apache.spark.repl.Main=INFO diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala new file mode 100644 index 00000000000..a0fe7f12553 --- /dev/null +++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark + +import java.io.{BufferedReader, File} +import java.net.URLClassLoader +import java.nio.file.{Files, Paths} +import java.util.Properties + +import org.apache.spark.SparkConf +import org.apache.spark.repl.SparkILoop +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion +import org.apache.zeppelin.interpreter.util.InterpreterOutputStream +import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup} +import org.slf4j.LoggerFactory +import org.slf4j.Logger + +import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter._ + +/** + * SparkInterpreter for scala-2.12 + */ +class SparkScala212Interpreter(override val conf: SparkConf, + override val depFiles: java.util.List[String], + override val properties: Properties, + override val interpreterGroup: InterpreterGroup, + override val sparkInterpreterClassLoader: URLClassLoader) + extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) { + + lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass) + + private var sparkILoop: SparkILoop = _ + + override val interpreterOutput = new InterpreterOutputStream(LOGGER) + + override def open(): Unit = { + super.open() + if (conf.get("spark.master", "local") == "yarn-client") { + System.setProperty("SPARK_YARN_MODE", "true") + } + // Only Spark1 requires to create http server, Spark2 removes HttpServer class. + val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir")) + val outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile + outputDir.deleteOnExit() + conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) + + val settings = new Settings() + settings.processArguments(List("-Yrepl-class-based", + "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) + settings.embeddedDefaults(sparkInterpreterClassLoader) + settings.usejavacp.value = true + settings.classpath.value = getUserJars.mkString(File.pathSeparator) + + val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean + val replOut = if (printReplOutput) { + new JPrintWriter(interpreterOutput, true) + } else { + new JPrintWriter(Console.out, true) + } + sparkILoop = new SparkILoop(None, replOut) + sparkILoop.settings = settings + sparkILoop.createInterpreter() + val in0 = getDeclareField(sparkILoop, "in0").asInstanceOf[Option[BufferedReader]] + val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true)) + + sparkILoop.in = reader + sparkILoop.initializeSynchronous() + sparkILoop.in.postInit() + this.scalaCompletion = reader.completion + + createSparkContext() + createZeppelinContext() + } + + protected override def completion(buf: String, + cursor: Int, + context: InterpreterContext): java.util.List[InterpreterCompletion] = { + val completions = scalaCompletion.complete(buf.substring(0, cursor), cursor).candidates + .map(e => new InterpreterCompletion(e, e, null)) + scala.collection.JavaConversions.seqAsJavaList(completions) + } + + protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = { + sparkILoop.beQuietDuring { + val result = sparkILoop.bind(name, tpe, value, modifier) + if (result != IR.Success) { + throw new RuntimeException("Fail to bind variable: " + name) + } + } + } + + + override def close(): Unit = { + super.close() + if (sparkILoop != null) { + sparkILoop.closeInterpreter() + } + } + + def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result = + sparkILoop.interpret(code) + +} diff --git a/spark/spark-dependencies/pom.xml b/spark/spark-dependencies/pom.xml index b9d8e194a6d..dcd8ff6f928 100644 --- a/spark/spark-dependencies/pom.xml +++ b/spark/spark-dependencies/pom.xml @@ -56,10 +56,39 @@ + + + org.apache.zeppelin + spark-interpreter + ${project.version} + provided + + + + org.apache.zeppelin + spark-scala-2.10 + ${project.version} + provided + + + + org.apache.zeppelin + spark-scala-2.11 + ${project.version} + provided + + + + org.apache.zeppelin + spark-scala-2.12 + ${project.version} + provided + + org.apache.spark - spark-core_${scala.binary.version} + spark-core_${spark.scala.binary.version} ${spark.version} @@ -71,31 +100,31 @@ org.apache.spark - spark-repl_${scala.binary.version} + spark-repl_${spark.scala.binary.version} ${spark.version} org.apache.spark - spark-sql_${scala.binary.version} + spark-sql_${spark.scala.binary.version} ${spark.version} org.apache.spark - spark-hive_${scala.binary.version} + spark-hive_${spark.scala.binary.version} ${spark.version} org.apache.spark - spark-streaming_${scala.binary.version} + spark-streaming_${spark.scala.binary.version} ${spark.version} org.apache.spark - spark-catalyst_${scala.binary.version} + spark-catalyst_${spark.scala.binary.version} ${spark.version} @@ -105,11 +134,10 @@ hadoop-client ${hadoop.version} - - + org.apache.spark - spark-yarn_${scala.binary.version} + spark-yarn_${spark.scala.binary.version} ${spark.version} diff --git a/spark/spark-scala-parent/pom.xml b/spark/spark-scala-parent/pom.xml index 50bedc7047e..b19afb6c652 100644 --- a/spark/spark-scala-parent/pom.xml +++ b/spark/spark-scala-parent/pom.xml @@ -36,15 +36,15 @@ 2.4.0 - 2.11 - 2.11.12 - ${scala.binary.version} + 2.11 + 2.11.12 + ${spark.scala.binary.version} org.apache.zeppelin - zeppelin-interpreter + spark-interpreter ${project.version} provided @@ -57,21 +57,34 @@ org.apache.spark - spark-repl_${scala.binary.version} + spark-repl_${spark.scala.binary.version} ${spark.version} provided org.apache.spark - spark-core_${scala.binary.version} + spark-core_${spark.scala.binary.version} ${spark.version} provided + + + com.thoughtworks.paranamer + paranamer + + + + + + com.thoughtworks.paranamer + paranamer + 2.8 + runtime org.apache.spark - spark-hive_${scala.binary.version} + spark-hive_${spark.scala.binary.version} ${spark.version} provided @@ -79,21 +92,21 @@ org.scala-lang scala-compiler - ${scala.version} + ${spark.scala.version} provided org.scala-lang scala-library - ${scala.version} + ${spark.scala.version} provided org.scala-lang scala-reflect - ${scala.version} + ${spark.scala.version} provided @@ -113,8 +126,24 @@ + + + + + org.apache.maven.plugins + maven-clean-plugin + ${plugin.clean.version} + + + + ${project.basedir}/../../interpreter/spark/scala-${spark.scala.binary.version} + false + + + + maven-resources-plugin @@ -215,7 +244,7 @@ - ${scala.compile.version} + ${spark.scala.version} -unchecked -deprecation @@ -237,7 +266,17 @@ + + org.apache.maven.plugins + maven-jar-plugin + 2.3.1 + + ${project.basedir}/../../interpreter/spark/scala-${spark.scala.binary.version} + + + + diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index 3a2cd0baf1e..421d85aa301 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -24,14 +24,13 @@ import java.nio.file.Paths import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.sql.SQLContext -import org.apache.spark.{JobProgressUtil, SparkConf, SparkContext} -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion +import org.apache.spark.{SparkConf, SparkContext} import org.apache.zeppelin.interpreter.util.InterpreterOutputStream -import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult} +import org.apache.zeppelin.interpreter.{BaseZeppelinContext, InterpreterContext, InterpreterGroup, InterpreterResult} import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ -import scala.tools.nsc.interpreter.Completion.ScalaCompleter +import scala.tools.nsc.interpreter.Completion import scala.util.control.NonFatal /** @@ -40,10 +39,15 @@ import scala.util.control.NonFatal * * @param conf * @param depFiles + * @param properties + * @param interpreterGroup */ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, val depFiles: java.util.List[String], - val printReplOutput: java.lang.Boolean) { + val properties: java.util.Properties, + val interpreterGroup: InterpreterGroup, + val sparkInterpreterClassLoader: URLClassLoader) + extends AbstractSparkScalaInterpreter() { protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) @@ -59,7 +63,9 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, protected var sparkUrl: String = _ - protected var scalaCompleter: ScalaCompleter = _ + protected var scalaCompletion: Completion = _ + + protected var z: SparkZeppelinContext = _ protected val interpreterOutput: InterpreterOutputStream @@ -139,18 +145,20 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, protected def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result - protected def completion(buf: String, - cursor: Int, - context: InterpreterContext): java.util.List[InterpreterCompletion] = { - val completions = scalaCompleter.complete(buf.substring(0, cursor), cursor).candidates - .map(e => new InterpreterCompletion(e, e, null)) - scala.collection.JavaConversions.seqAsJavaList(completions) - } - protected def getProgress(jobGroup: String, context: InterpreterContext): Int = { JobProgressUtil.progress(sc, jobGroup) } + override def getSparkContext: SparkContext = sc + + override def getSqlContext: SQLContext = sqlContext + + override def getSparkSession: AnyRef = sparkSession + + override def getSparkUrl: String = sparkUrl + + override def getZeppelinContext: BaseZeppelinContext = z + protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit // for use in java side @@ -161,20 +169,18 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, bind(name, tpe, value, modifier.asScala.toList) protected def close(): Unit = { - if (BaseSparkScalaInterpreter.sessionNum.decrementAndGet() == 0) { - if (sc != null) { - sc.stop() - } - if (sparkHttpServer != null) { - sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer) - } - sc = null - sqlContext = null - if (sparkSession != null) { - sparkSession.getClass.getMethod("stop").invoke(sparkSession) - sparkSession = null - } + if (sparkHttpServer != null) { + sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer) + } + if (sc != null) { + sc.stop() + } + sc = null + if (sparkSession != null) { + sparkSession.getClass.getMethod("stop").invoke(sparkSession) + sparkSession = null } + sqlContext = null } protected def createSparkContext(): Unit = { @@ -295,6 +301,16 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, interpret("print(\"\")") } + protected def createZeppelinContext(): Unit = { + val sparkShims = SparkShims.getInstance(sc.version, properties) + sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) + + z = new SparkZeppelinContext(sc, sparkShims, + interpreterGroup.getInterpreterHookRegistry, + properties.getProperty("zeppelin.spark.maxResult").toInt) + bind("z", z.getClass.getCanonicalName, z, List("""@transient""")) + } + private def isSparkSessionPresent(): Boolean = { try { Class.forName("org.apache.spark.sql.SparkSession") @@ -392,6 +408,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, classLoader = classLoader.getParent } } + + extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.toString) LOGGER.debug("User jar for spark repl: " + extraJars.mkString(",")) extraJars } diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala index 517bed0cc93..3b44c9a0175 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala @@ -15,7 +15,9 @@ * limitations under the License. */ -package org.apache.spark +package org.apache.zeppelin.spark + +import org.apache.spark.SparkContext object JobProgressUtil { diff --git a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala similarity index 93% rename from spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala rename to spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala index e80c1527462..83594d06605 100644 --- a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala @@ -37,12 +37,11 @@ class SparkZeppelinContext(val sc: SparkContext, val maxResult2: Int) extends BaseZeppelinContext(hooks2, maxResult2) { private val interpreterClassMap = Map( - "spark" -> "org.apache.zeppelin.spark.SparkInterpreter", - "sql" -> "org.apache.zeppelin.spark.SparkSqlInterpreter", - "dep" -> "org.apache.zeppelin.spark.DepInterpreter", - "pyspark" -> "org.apache.zeppelin.spark.PySparkInterpreter", - "ipyspark" -> "org.apache.zeppelin.spark.IPySparkInterpreter", - "r" -> "org.apache.zeppelin.spark.SparkRInterpreter" + ("spark", "org.apache.zeppelin.spark.SparkInterpreter"), + ("sql", "org.apache.zeppelin.spark.SparkSqlInterpreter"), + ("pyspark", "org.apache.zeppelin.spark.PySparkInterpreter"), + ("ipyspark", "org.apache.zeppelin.spark.IPySparkInterpreter"), + ("r", "org.apache.zeppelin.spark.SparkRInterpreter") ) private val supportedClasses = scala.collection.mutable.ArrayBuffer[Class[_]]() diff --git a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java index efd65fc1290..10fb9d6d7ab 100644 --- a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java +++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java @@ -21,7 +21,6 @@ import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionUtil; import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.ResultMessages; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/spark/spark2-shims/pom.xml b/spark/spark2-shims/pom.xml index 2c2fe499734..79c6a89efed 100644 --- a/spark/spark2-shims/pom.xml +++ b/spark/spark2-shims/pom.xml @@ -35,7 +35,7 @@ 2.11 - 2.1.2 + 2.3.2 diff --git a/zeppelin-display/pom.xml b/zeppelin-display/pom.xml index ae69cb0a0ba..6c227c2528d 100644 --- a/zeppelin-display/pom.xml +++ b/zeppelin-display/pom.xml @@ -103,7 +103,7 @@ org.scala-lang.modules scala-xml_${scala.binary.version} - 1.0.2 + 1.1.0 provided diff --git a/zeppelin-display/src/main/scala/org/apache/zeppelin/display/angular/AbstractAngularElem.scala b/zeppelin-display/src/main/scala/org/apache/zeppelin/display/angular/AbstractAngularElem.scala index 7049e7ad5c0..66961fd6758 100644 --- a/zeppelin-display/src/main/scala/org/apache/zeppelin/display/angular/AbstractAngularElem.scala +++ b/zeppelin-display/src/main/scala/org/apache/zeppelin/display/angular/AbstractAngularElem.scala @@ -84,7 +84,7 @@ abstract class AbstractAngularElem(val interpreterContext: InterpreterContext, newElem( interpreterContext, name, - angularObjects + (name -> angularObject), + angularObjects + ((name, angularObject)), elem) } @@ -147,7 +147,7 @@ abstract class AbstractAngularElem(val interpreterContext: InterpreterContext, newElem( interpreterContext, modelName, - angularObjects + (eventName -> angularObject), + angularObjects + ((eventName, angularObject)), elem) } diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml index 851823d0825..18f2eb27bb9 100644 --- a/zeppelin-interpreter-integration/pom.xml +++ b/zeppelin-interpreter-integration/pom.xml @@ -59,6 +59,20 @@ + + + org.apache.zeppelin + zeppelin-zengine + ${project.version} + tests + + + com.google.guava + guava + + + + com.google.guava guava diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java index b7a7da2a9e8..ca393f2d9f1 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java @@ -28,6 +28,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSettingManager; +import org.apache.zeppelin.interpreter.integration.DownloadUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java index d5d07a068c8..bebca317fdb 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java @@ -31,6 +31,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSettingManager; +import org.apache.zeppelin.interpreter.integration.DownloadUtils; import org.codehaus.plexus.util.xml.pull.XmlPullParserException; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -89,6 +90,7 @@ private void testInterpreterBasics() throws IOException, InterpreterException, X // add jars & packages for testing InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); sparkInterpreterSetting.setProperty("spark.jars.packages", "com.maxmind.geoip2:geoip2:2.5.0"); + sparkInterpreterSetting.setProperty("SPARK_PRINT_LAUNCH_COMMAND", "true"); MavenXpp3Reader reader = new MavenXpp3Reader(); Model model = reader.read(new FileReader("pom.xml")); sparkInterpreterSetting.setProperty("spark.jars", new File("target/zeppelin-interpreter-integration-" + model.getVersion() + ".jar").getAbsolutePath()); @@ -134,7 +136,7 @@ private void testInterpreterBasics() throws IOException, InterpreterException, X } else { interpreterResult = sparkrInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", context); } - assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); + assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); assertEquals(InterpreterResult.Type.TEXT, interpreterResult.message().get(0).getType()); assertTrue(interpreterResult.message().get(0).getData().contains("eruptions waiting")); } @@ -182,6 +184,27 @@ public void testYarnClientMode() throws IOException, YarnException, InterruptedE assertEquals(1, response.getApplicationList().size()); interpreterSettingManager.close(); + + waitForYarnAppCompleted(30 * 1000); + } + + private void waitForYarnAppCompleted(int timeout) throws YarnException { + long start = System.currentTimeMillis(); + boolean yarnAppCompleted = false; + while ((System.currentTimeMillis() - start) < timeout ) { + GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING)); + GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request); + if (response.getApplicationList().isEmpty()) { + yarnAppCompleted = true; + break; + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + assertTrue("Yarn app is not completed in " + timeout + " milliseconds.", yarnAppCompleted); } @Test @@ -206,6 +229,8 @@ public void testYarnClusterMode() throws IOException, YarnException, Interrupted assertEquals(1, response.getApplicationList().size()); interpreterSettingManager.close(); + + waitForYarnAppCompleted(30 * 1000); } private boolean isSpark2() { diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java index 0441cac090d..96b484ae24c 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java @@ -33,7 +33,7 @@ public SparkIntegrationTest24(String sparkVersion) { @Parameterized.Parameters public static List data() { return Arrays.asList(new Object[][]{ - {"2.4.0"} + {"2.4.3"} }); } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java index 321b94f9ddc..9c301b16b05 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java @@ -16,7 +16,6 @@ */ package org.apache.zeppelin.integration; -import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.display.AngularObject; @@ -25,6 +24,7 @@ import org.apache.zeppelin.interpreter.InterpreterProperty; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.integration.DownloadUtils; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.Notebook; @@ -36,8 +36,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +43,6 @@ import java.io.FileWriter; import java.io.IOException; import java.io.StringReader; -import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.List; diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java index b7fadd468ac..4431f942e65 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java @@ -33,7 +33,7 @@ public ZeppelinSparkClusterTest24(String sparkVersion) throws Exception { @Parameterized.Parameters public static List data() { return Arrays.asList(new Object[][]{ - {"2.4.0"} + {"2.4.3"} }); } } diff --git a/zeppelin-interpreter-integration/src/test/resources/log4j.properties b/zeppelin-interpreter-integration/src/test/resources/log4j.properties index 50300f14b80..95c474c5e40 100644 --- a/zeppelin-interpreter-integration/src/test/resources/log4j.properties +++ b/zeppelin-interpreter-integration/src/test/resources/log4j.properties @@ -44,3 +44,4 @@ log4j.logger.org.hibernate.type=ALL log4j.logger.org.apache.hadoop=WARN log4j.logger.org.apache.zeppelin.interpreter=DEBUG +log4j.logger.org.apache.zeppelin.util=DEBUG diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ProcessLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ProcessLauncher.java index 05218cef84c..bdea797f17f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ProcessLauncher.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ProcessLauncher.java @@ -93,7 +93,6 @@ public void onTimeout() { } public void onProcessRunning() { - LOGGER.info("Process is running"); transition(State.RUNNING); } diff --git a/zeppelin-plugins/launcher/spark/pom.xml b/zeppelin-plugins/launcher/spark/pom.xml index 9ea80d2f404..5a9d41b4274 100644 --- a/zeppelin-plugins/launcher/spark/pom.xml +++ b/zeppelin-plugins/launcher/spark/pom.xml @@ -53,6 +53,16 @@ maven-dependency-plugin + + + maven-surefire-plugin + ${plugin.surefire.version} + + + ${project.basedir}/../../.. + + + diff --git a/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 1bf446f3ef0..ea883a2f060 100644 --- a/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -17,11 +17,23 @@ package org.apache.zeppelin.interpreter.launcher; +import java.io.FileInputStream; +import java.io.FilenameFilter; import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; + +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; @@ -46,7 +58,7 @@ public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage rec } @Override - protected Map buildEnvFromProperties(InterpreterLaunchContext context) { + protected Map buildEnvFromProperties(InterpreterLaunchContext context) throws IOException { Map env = super.buildEnvFromProperties(context); Properties sparkProperties = new Properties(); String sparkMaster = getSparkMaster(properties); @@ -77,34 +89,53 @@ protected Map buildEnvFromProperties(InterpreterLaunchContext co } else { sparkProperties.put("spark.files", zConf.getConfDir() + "/log4j_yarn_cluster.properties"); } + sparkProperties.put("spark.yarn.maxAppAttempts", "1"); + } + + + if (isYarnMode() + && getDeployMode().equals("cluster")) { + try { + List additionalJars = new ArrayList(); + Path localRepoPath = + Paths.get(zConf.getInterpreterLocalRepoPath(), context.getInterpreterSettingId()); + if (Files.exists(localRepoPath) && Files.isDirectory(localRepoPath)) { + List localRepoJars = StreamSupport.stream( + Files.newDirectoryStream(localRepoPath, entry -> Files.isRegularFile(entry)) + .spliterator(), + false) + .map(jar -> jar.toAbsolutePath().toString()).collect(Collectors.toList()); + additionalJars.addAll(localRepoJars); + } + + String scalaVersion = detectSparkScalaVersion(properties.getProperty("SPARK_HOME")); + Path scalaFolder = Paths.get(zConf.getZeppelinHome(), "/interpreter/spark/scala-" + scalaVersion); + List scalaJars = StreamSupport.stream( + Files.newDirectoryStream(scalaFolder, entry -> Files.isRegularFile(entry)) + .spliterator(), + false) + .map(jar -> jar.toAbsolutePath().toString()).collect(Collectors.toList()); + additionalJars.addAll(scalaJars); + + if (sparkProperties.containsKey("spark.jars")) { + sparkProperties.put("spark.jars", sparkProperties.getProperty("spark.jars") + "," + + StringUtils.join(additionalJars, ",")); + } else { + sparkProperties.put("spark.jars", StringUtils.join(additionalJars, ",")); + } + } catch (Exception e) { + throw new IOException("Cannot make a list of additional jars from localRepo: {}", e); + } } + for (String name : sparkProperties.stringPropertyNames()) { sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name)); } String useProxyUserEnv = System.getenv("ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER"); if (context.getOption().isUserImpersonate() && (StringUtils.isBlank(useProxyUserEnv) || - !useProxyUserEnv.equals("false"))) { + !useProxyUserEnv.equals("false"))) { sparkConfBuilder.append(" --proxy-user " + context.getUserName()); } - Path localRepoPath = - Paths.get(zConf.getInterpreterLocalRepoPath(), context.getInterpreterSettingId()); - if (isYarnMode() - && getDeployMode().equals("cluster") - && Files.exists(localRepoPath) - && Files.isDirectory(localRepoPath)) { - try { - StreamSupport.stream( - Files.newDirectoryStream(localRepoPath, entry -> Files.isRegularFile(entry)) - .spliterator(), - false) - .map(jar -> jar.toAbsolutePath().toString()) - .reduce((x, y) -> x.concat(",").concat(y)) - .ifPresent(extraJars -> sparkConfBuilder.append(" --jars ").append(extraJars)); - } catch (IOException e) { - LOGGER.error("Cannot make a list of additional jars from localRepo: {}", localRepoPath, e); - } - - } env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString()); @@ -137,6 +168,66 @@ && getDeployMode().equals("cluster") } + private String detectSparkScalaVersion(String sparkHome) throws Exception { + ProcessBuilder builder = new ProcessBuilder(sparkHome + "/bin/spark-submit", "--version"); + File processOutputFile = File.createTempFile("zeppelin-spark", ".out"); + builder.redirectError(processOutputFile); + Process process = builder.start(); + process.waitFor(); + String processOutput = IOUtils.toString(new FileInputStream(processOutputFile)); + Pattern pattern = Pattern.compile(".*Using Scala version (.*),.*"); + Matcher matcher = pattern.matcher(processOutput); + if (matcher.find()) { + String scalaVersion = matcher.group(1); + if (scalaVersion.startsWith("2.10")) { + return "2.10"; + } else if (scalaVersion.startsWith("2.11")) { + return "2.11"; + } else if (scalaVersion.startsWith("2.12")) { + return "2.12"; + } else { + throw new Exception("Unsupported scala version: " + scalaVersion); + } + } else { + return detectSparkScalaVersionByReplClass(sparkHome); + } + } + + private String detectSparkScalaVersionByReplClass(String sparkHome) throws Exception { + File sparkLibFolder = new File(sparkHome + "/lib"); + if (sparkLibFolder.exists()) { + // spark 1.6 if spark/lib exists + File[] sparkAssemblyJars = new File(sparkHome + "/lib").listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.contains("spark-assembly"); + } + }); + if (sparkAssemblyJars.length == 0) { + throw new Exception("No spark assembly file found in SPARK_HOME: " + sparkHome); + } + if (sparkAssemblyJars.length > 1) { + throw new Exception("Multiple spark assembly file found in SPARK_HOME: " + sparkHome); + } + URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{sparkAssemblyJars[0].toURI().toURL()}); + try { + urlClassLoader.loadClass("org.apache.spark.repl.SparkCommandLine"); + return "2.10"; + } catch (ClassNotFoundException e) { + return "2.11"; + } + } else { + // spark 2.x if spark/lib doesn't exists + File sparkJarsFolder = new File(sparkHome + "/jars"); + boolean sparkRepl211Exists = + Stream.of(sparkJarsFolder.listFiles()).anyMatch(file -> file.getName().contains("spark-repl_2.11")); + if (sparkRepl211Exists) { + return "2.11"; + } else { + return "2.10"; + } + } + } /** * get environmental variable in the following order @@ -174,27 +265,29 @@ private void mergeSparkProperty(Properties sparkProperties, String propertyName, } private void setupPropertiesForSparkR(Properties sparkProperties) { - String sparkHome = getEnv("SPARK_HOME"); - File sparkRBasePath = null; - if (sparkHome == null) { - if (!getSparkMaster(properties).startsWith("local")) { - throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" + - " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " + - " interpreter setting"); + if (isYarnMode()) { + String sparkHome = getEnv("SPARK_HOME"); + File sparkRBasePath = null; + if (sparkHome == null) { + if (!getSparkMaster(properties).startsWith("local")) { + throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" + + " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " + + " interpreter setting"); + } + String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME); + sparkRBasePath = new File(zeppelinHome, + "interpreter" + File.separator + "spark" + File.separator + "R"); + } else { + sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib"); } - String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME); - sparkRBasePath = new File(zeppelinHome, - "interpreter" + File.separator + "spark" + File.separator + "R"); - } else { - sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib"); - } - File sparkRPath = new File(sparkRBasePath, "sparkr.zip"); - if (sparkRPath.exists() && sparkRPath.isFile()) { - mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", - sparkRPath.getAbsolutePath() + "#sparkr"); - } else { - LOGGER.warn("sparkr.zip is not found, SparkR may not work."); + File sparkRPath = new File(sparkRBasePath, "sparkr.zip"); + if (sparkRPath.exists() && sparkRPath.isFile()) { + mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", + sparkRPath.getAbsolutePath() + "#sparkr"); + } else { + LOGGER.warn("sparkr.zip is not found, SparkR may not work."); + } } } diff --git a/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index d7dcd0a91cd..118e7d47656 100644 --- a/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -17,28 +17,37 @@ package org.apache.zeppelin.interpreter.launcher; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import org.apache.commons.io.FileUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.integration.DownloadUtils; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.util.Util; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Properties; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class SparkInterpreterLauncherTest { + + private String sparkHome; + private String zeppelinHome; + @Before public void setUp() { for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) { System.clearProperty(confVar.getVarName()); } + + sparkHome = DownloadUtils.downloadSpark("2.3.2"); + zeppelinHome = ZeppelinConfiguration.create().getZeppelinHome(); } @Test @@ -46,7 +55,7 @@ public void testConnectTimeOut() throws IOException { ZeppelinConfiguration zConf = new ZeppelinConfiguration(); SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty( ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000"); InterpreterOption option = new InterpreterOption(); @@ -56,8 +65,8 @@ public void testConnectTimeOut() throws IOException { assertTrue( client instanceof RemoteInterpreterManagedProcess); RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; assertEquals("name", interpreterProcess.getInterpreterSettingName()); - assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); - assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); + assertEquals(zeppelinHome + "/interpreter/groupName", interpreterProcess.getInterpreterDir()); + assertEquals(zeppelinHome + "/local-repo/groupId", interpreterProcess.getLocalRepoDir()); assertEquals(10000, interpreterProcess.getConnectTimeout()); assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 2); @@ -69,7 +78,7 @@ public void testLocalMode() throws IOException { ZeppelinConfiguration zConf = new ZeppelinConfiguration(); SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); properties.setProperty("master", "local[*]"); properties.setProperty("spark.files", "file_1"); @@ -85,7 +94,7 @@ public void testLocalMode() throws IOException { assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 2); - assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); + assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME")); assertEquals(" --master local[*] --conf spark.files='file_1' --conf spark.jars='jar_1'", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } @@ -94,7 +103,7 @@ public void testYarnClientMode_1() throws IOException { ZeppelinConfiguration zConf = new ZeppelinConfiguration(); SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); properties.setProperty("master", "yarn-client"); properties.setProperty("spark.files", "file_1"); @@ -110,8 +119,15 @@ public void testYarnClientMode_1() throws IOException { assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 2); - assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); - assertEquals(" --master yarn-client --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME")); + + String sparkJars = "'jar_1'"; + String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; + String sparkFiles = "'file_1'"; + assertEquals(" --master yarn-client --conf spark.yarn.dist.archives=" + sparkrZip + + " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + + " --conf spark.yarn.isPython=true", + interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } @Test @@ -119,7 +135,7 @@ public void testYarnClientMode_2() throws IOException { ZeppelinConfiguration zConf = new ZeppelinConfiguration(); SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); properties.setProperty("master", "yarn"); properties.setProperty("spark.submit.deployMode", "client"); @@ -136,8 +152,16 @@ public void testYarnClientMode_2() throws IOException { assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 2); - assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); - assertEquals(" --master yarn --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='client' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME")); + + String sparkJars = "'jar_1'"; + String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; + String sparkFiles = "'file_1'"; + assertEquals(" --master yarn --conf spark.yarn.dist.archives=" + sparkrZip + + " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + + " --conf spark.submit.deployMode='client'" + + " --conf spark.yarn.isPython=true", + interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } @Test @@ -145,7 +169,7 @@ public void testYarnClusterMode_1() throws IOException { ZeppelinConfiguration zConf = new ZeppelinConfiguration(); SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); properties.setProperty("master", "yarn-cluster"); properties.setProperty("spark.files", "file_1"); @@ -161,9 +185,19 @@ public void testYarnClusterMode_1() throws IOException { assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 3); - assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); + assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME")); + assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); - assertEquals(" --master yarn-cluster --conf spark.files='file_1',.//conf/log4j_yarn_cluster.properties --conf spark.jars='jar_1' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + String sparkJars = "'jar_1'," + + zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar"; + String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; + String sparkFiles = "'file_1'," + zeppelinHome + "/conf/log4j_yarn_cluster.properties"; + assertEquals(" --master yarn-cluster --conf spark.yarn.dist.archives=" + sparkrZip + + " --conf spark.yarn.maxAppAttempts=1" + + " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + + " --conf spark.yarn.isPython=true" + + " --conf spark.yarn.submit.waitAppCompletion=false", + interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } @Test @@ -171,7 +205,7 @@ public void testYarnClusterMode_2() throws IOException { ZeppelinConfiguration zConf = new ZeppelinConfiguration(); SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); properties.setProperty("master", "yarn"); properties.setProperty("spark.submit.deployMode", "cluster"); @@ -194,9 +228,19 @@ public void testYarnClusterMode_2() throws IOException { assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 3); - assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); + assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME")); assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); - assertEquals(" --master yarn --conf spark.files='file_1',.//conf/log4j_yarn_cluster.properties --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1 --jars " + Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar").toString(), interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + String sparkJars = "'jar_1'," + + Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar").toString() + "," + + zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar"; + String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; + String sparkFiles = "'file_1'," + zeppelinHome + "/conf/log4j_yarn_cluster.properties"; + assertEquals(" --master yarn --conf spark.yarn.dist.archives=" + sparkrZip + + " --conf spark.yarn.maxAppAttempts=1" + + " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + + " --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true" + + " --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1", + interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); Files.deleteIfExists(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar")); FileUtils.deleteDirectory(localRepoPath.toFile()); } @@ -206,7 +250,7 @@ public void testYarnClusterMode_3() throws IOException { ZeppelinConfiguration zConf = new ZeppelinConfiguration(); SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); - properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); properties.setProperty("master", "yarn"); properties.setProperty("spark.submit.deployMode", "cluster"); @@ -228,9 +272,19 @@ public void testYarnClusterMode_3() throws IOException { assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 3); - assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); + assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME")); assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); - assertEquals(" --master yarn --conf spark.files='file_1',.//conf/log4j_yarn_cluster.properties --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + + String sparkJars = "'jar_1'," + + zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar"; + String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; + String sparkFiles = "'file_1'," + zeppelinHome + "/conf/log4j_yarn_cluster.properties"; + assertEquals(" --master yarn --conf spark.yarn.dist.archives=" + sparkrZip + + " --conf spark.yarn.maxAppAttempts=1" + + " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + + " --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true" + + " --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1", + interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); FileUtils.deleteDirectory(localRepoPath.toFile()); } } diff --git a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java index 47756c9901f..df9aa3176c8 100644 --- a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java @@ -88,7 +88,7 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep } } - protected Map buildEnvFromProperties(InterpreterLaunchContext context) { + protected Map buildEnvFromProperties(InterpreterLaunchContext context) throws IOException { Map env = new HashMap<>(); for (Object key : context.getProperties().keySet()) { if (RemoteInterpreterUtils.isEnvString((String) key)) { diff --git a/zeppelin-plugins/pom.xml b/zeppelin-plugins/pom.xml index 727acd700c0..b0216213321 100644 --- a/zeppelin-plugins/pom.xml +++ b/zeppelin-plugins/pom.xml @@ -64,6 +64,14 @@ + + ${project.groupId} + zeppelin-zengine + ${project.version} + tests + provided + + junit diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index 35218a2f94e..41d414d681b 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -74,6 +74,11 @@ commons-io + + commons-cli + commons-cli + + commons-logging commons-logging diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/DownloadUtils.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java similarity index 98% rename from zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/DownloadUtils.java rename to zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java index 4371b2b34e1..546790d4187 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/DownloadUtils.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.zeppelin.integration; +package org.apache.zeppelin.interpreter.integration; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -65,7 +65,7 @@ public static String downloadFlink(String version) { LOGGER.info("Skip to download flink as it is already downloaded."); return targetFlinkHomeFolder.getAbsolutePath(); } - download("flink", version, "-bin-hadoop27-scala_2.11.tgz"); + download("flink", version, "-bin-hadoop2.6.tgz"); return targetFlinkHomeFolder.getAbsolutePath(); }