diff --git a/docs/development/writingzeppelininterpreter.md b/docs/development/writingzeppelininterpreter.md index 0842fe6f6eb..e7bf6359a81 100644 --- a/docs/development/writingzeppelininterpreter.md +++ b/docs/development/writingzeppelininterpreter.md @@ -36,14 +36,48 @@ In 'Separate Interpreter for each note' mode, new Interpreter instance will be c ### Make your own Interpreter Creating a new interpreter is quite simple. Just extend [org.apache.zeppelin.interpreter](https://github.com/apache/incubator-zeppelin/blob/master/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java) abstract class and implement some methods. -You can include `org.apache.zeppelin:zeppelin-interpreter:[VERSION]` artifact in your build system. -Your interpreter name is derived from the static register method. - +You can include `org.apache.zeppelin:zeppelin-interpreter:[VERSION]` artifact in your build system. And you should your jars under your interpreter directory with specific directory name. Zeppelin server reads interpreter directories recursively and initializes interpreters including your own interpreter. + +There are three locations where you can store your interpreter group, name and other information. Zeppelin server tries to find the location below. Next, Zeppelin tries to find `interpareter-setting.json` in your interpreter jar. +``` +{ZEPPELIN_INTERPRETER_DIR}/{YOUR_OWN_INTERPRETER_DIR}/interpreter-setting.json +``` + +Here is an example of `interpareter-setting.json` on your own interpreter. +```json +[ + { + "interpreterGroup": "your-group", + "interpreterName": "your-name", + "interpreterClassName": "your.own.interpreter.class", + "properties": { + "propertiies1": { + "envName": null, + "propertyName": "property.1.name", + "defaultValue": "propertyDefaultValue", + "description": "Property description" + }, + "properties2": { + "envName": PROPERTIES_2, + "propertyName": null, + "defaultValue": "property2DefaultValue", + "description": "Property 2 description" + }, ... + } + }, + { + ... + } +] +``` + +Finally, Zeppelin uses static initialization with the following: ``` static { Interpreter.register("MyInterpreterName", MyClassName.class.getName()); } ``` +**Static initialization is deprecated and will be supported until 0.6.0.** The name will appear later in the interpreter name option box during the interpreter configuration process. The name of the interpreter is what you later write to identify a paragraph which should be interpreted using this interpreter. diff --git a/spark/pom.xml b/spark/pom.xml index 3b88f3b82a8..093c5145ae0 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -313,6 +313,7 @@ **/metastore_db/ **/README.md **/dependency-reduced-pom.xml + **/interpreter-setting.json diff --git a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java index 2586955fa7b..f7c164c78ea 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java @@ -62,23 +62,6 @@ * */ public class DepInterpreter extends Interpreter { - - static { - Interpreter.register( - "dep", - "spark", - DepInterpreter.class.getName(), - new InterpreterPropertyBuilder() - .add("zeppelin.dep.localrepo", - getSystemDefault("ZEPPELIN_DEP_LOCALREPO", null, "local-repo"), - "local repository for dependency loader") - .add("zeppelin.dep.additionalRemoteRepository", - "spark-packages,http://dl.bintray.com/spark-packages/maven,false;", - "A list of 'id,remote-repository-URL,is-snapshot;' for each remote repository.") - .build()); - - } - private SparkIMain intp; private ByteArrayOutputStream out; private SparkDependencyContext depc; diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index ea0054188ac..1e2ef2e55af 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -77,17 +77,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand private String scriptPath; boolean pythonscriptRunning = false; - static { - Interpreter.register( - "pyspark", - "spark", - PySparkInterpreter.class.getName(), - new InterpreterPropertyBuilder() - .add("zeppelin.pyspark.python", - SparkInterpreter.getSystemDefault("PYSPARK_PYTHON", null, "python"), - "Python command to run pyspark with").build()); - } - public PySparkInterpreter(Properties property) { super(property); diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 7d134ee1c9c..0ee89cd2c11 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -80,38 +80,6 @@ public class SparkInterpreter extends Interpreter { public static Logger logger = LoggerFactory.getLogger(SparkInterpreter.class); - static { - Interpreter.register( - "spark", - "spark", - SparkInterpreter.class.getName(), - new InterpreterPropertyBuilder() - .add("spark.app.name", - getSystemDefault("SPARK_APP_NAME", "spark.app.name", "Zeppelin"), - "The name of spark application.") - .add("master", - getSystemDefault("MASTER", "spark.master", "local[*]"), - "Spark master uri. ex) spark://masterhost:7077") - .add("spark.executor.memory", - getSystemDefault(null, "spark.executor.memory", ""), - "Executor memory per worker instance. ex) 512m, 32g") - .add("spark.cores.max", - getSystemDefault(null, "spark.cores.max", ""), - "Total number of cores to use. Empty value uses all available core.") - .add("zeppelin.spark.useHiveContext", - getSystemDefault("ZEPPELIN_SPARK_USEHIVECONTEXT", - "zeppelin.spark.useHiveContext", "true"), - "Use HiveContext instead of SQLContext if it is true.") - .add("zeppelin.spark.maxResult", - getSystemDefault("ZEPPELIN_SPARK_MAXRESULT", "zeppelin.spark.maxResult", "1000"), - "Max number of SparkSQL result to display.") - .add("args", "", "spark commandline args") - .add("zeppelin.spark.printREPLOutput", "true", - "Print REPL output") - .build() - ); - } - private ZeppelinContext z; private SparkILoop interpreter; private SparkIMain intp; diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index e0ea766212f..021c95ff413 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -42,32 +42,6 @@ public class SparkRInterpreter extends Interpreter { private static String renderOptions; private ZeppelinR zeppelinR; - static { - Interpreter.register( - "r", - "spark", - SparkRInterpreter.class.getName(), - new InterpreterPropertyBuilder() - .add("zeppelin.R.cmd", - SparkInterpreter.getSystemDefault("ZEPPELIN_R_CMD", "zeppelin.R.cmd", "R"), - "R repl path") - .add("zeppelin.R.knitr", - SparkInterpreter.getSystemDefault("ZEPPELIN_R_KNITR", "zeppelin.R.knitr", "true"), - "whether use knitr or not") - .add("zeppelin.R.image.width", - SparkInterpreter.getSystemDefault("ZEPPELIN_R_IMAGE_WIDTH", - "zeppelin.R.image.width", "100%"), - "") - .add("zeppelin.R.render.options", - SparkInterpreter.getSystemDefault("ZEPPELIN_R_RENDER_OPTIONS", - "zeppelin.R.render.options", - "out.format = 'html', comment = NA, " - + "echo = FALSE, results = 'asis', message = F, warning = F"), - "") - .build()); - } - - public SparkRInterpreter(Properties property) { super(property); } diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 3b850b495ee..ed2e3367e95 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -46,27 +46,6 @@ public class SparkSqlInterpreter extends Interpreter { Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class); AtomicInteger num = new AtomicInteger(0); - static { - Interpreter.register( - "sql", - "spark", - SparkSqlInterpreter.class.getName(), - new InterpreterPropertyBuilder() - .add("zeppelin.spark.maxResult", - SparkInterpreter.getSystemDefault("ZEPPELIN_SPARK_MAXRESULT", - "zeppelin.spark.maxResult", "1000"), - "Max number of SparkSQL result to display.") - .add("zeppelin.spark.concurrentSQL", - SparkInterpreter.getSystemDefault("ZEPPELIN_SPARK_CONCURRENTSQL", - "zeppelin.spark.concurrentSQL", "false"), - "Execute multiple SQL concurrently if set true.") - .add("zeppelin.spark.sql.stacktrace", - SparkInterpreter.getSystemDefault("ZEPPELIN_SPARK_SQL_STACKTRACE", - "zeppelin.spark.sql.stacktrace", "false"), - "Show full exception stacktrace for SQL queries if set to true.") - .build()); - } - private String getJobGroup(InterpreterContext context){ return "zeppelin-" + context.getParagraphId(); } diff --git a/spark/src/main/resources/interpreter-setting.json b/spark/src/main/resources/interpreter-setting.json new file mode 100644 index 00000000000..ee7a1925781 --- /dev/null +++ b/spark/src/main/resources/interpreter-setting.json @@ -0,0 +1,146 @@ +[ + { + "interpreterGroup": "spark", + "interpreterName": "spark", + "interpreterClassName": "org.apache.zeppelin.spark.SparkInterpreter", + "properties": { + "spark.executor.memory": { + "envName": null, + "propertyName": "spark.executor.memory", + "defaultValue": "", + "description": "Executor memory per worker instance. ex) 512m, 32g" + }, + "args": { + "envName": null, + "propertyName": null, + "defaultValue": "", + "description": "spark commandline args" + }, + "zeppelin.spark.useHiveContext": { + "envName": "ZEPPELIN_SPARK_USEHIVECONTEXT", + "propertyName": "zeppelin.spark.useHiveContext", + "defaultValue": "true", + "description": "Use HiveContext instead of SQLContext if it is true." + }, + "spark.app.name": { + "envName": "SPARK_APP_NAME", + + "propertyName": "spark.app.name", + "defaultValue": "Zeppelin", + "description": "The name of spark application." + }, + "zeppelin.spark.printREPLOutput": { + "envName": null, + "propertyName": null, + "defaultValue": "true", + "description": "Print REPL output" + }, + "spark.cores.max": { + "envName": null, + "propertyName": "spark.cores.max", + "defaultValue": "", + "description": "Total number of cores to use. Empty value uses all available core." + }, + "zeppelin.spark.maxResult": { + "envName": "ZEPPELIN_SPARK_MAXRESULT", + "propertyName": "zeppelin.spark.maxResult", + "defaultValue": "1000", + "description": "Max number of SparkSQL result to display." + }, + "master": { + "envName": "MASTER", + "propertyName": "spark.master", + "defaultValue": "local[*]", + "description": "Spark master uri. ex) spark://masterhost:7077" + } + } + }, + { + "interpreterGroup": "spark", + "interpreterName": "sql", + "interpreterClassName": "org.apache.zeppelin.spark.SparkSqlInterpreter", + "properties": { + "zeppelin.spark.concurrentSQL": { + "envName": "ZEPPELIN_SPARK_CONCURRENTSQL", + "propertyName": "zeppelin.spark.concurrentSQL", + "defaultValue": "false", + "description": "Execute multiple SQL concurrently if set true." + }, + "zeppelin.spark.sql.stacktrace": { + "envName": "ZEPPELIN_SPARK_SQL_STACKTRACE", + "propertyName": "zeppelin.spark.sql.stacktrace", + "defaultValue": "false", + "description": "Show full exception stacktrace for SQL queries if set to true." + }, + "zeppelin.spark.maxResult": { + "envName": "ZEPPELIN_SPARK_MAXRESULT", + "propertyName": "zeppelin.spark.maxResult", + "defaultValue": "1000", + "description": "Max number of SparkSQL result to display." + } + } + }, + { + "interpreterGroup": "spark", + "interpreterName": "dep", + "interpreterClassName": "org.apache.zeppelin.spark.DepInterpreter", + "properties": { + "zeppelin.dep.localrepo": { + "envName": "ZEPPELIN_DEP_LOCALREPO", + "propertyName": null, + "defaultValue": "local-repo", + "description": "local repository for dependency loader" + }, + "zeppelin.dep.additionalRemoteRepository": { + "envName": null, + "propertyName": null, + "defaultValue": "spark-packages,http://dl.bintray.com/spark-packages/maven,false;", + "description": "A list of 'id,remote-repository-URL,is-snapshot;' for each remote repository." + } + } + }, + { + "interpreterGroup": "spark", + "interpreterName": "pyspark", + "interpreterClassName": "org.apache.zeppelin.spark.PySparkInterpreter", + "properties": { + "zeppelin.pyspark.python": { + "envName": "PYSPARK_PYTHON", + "propertyName": null, + "defaultValue": "python", + "description": "Python command to run pyspark with" + } + } + }, + { + "interpreterGroup": "spark", + "interpreterName": "r", + "interpreterClassName": "org.apache.zeppelin.spark.SparkRInterpreter", + "properties": { + "zeppelin.R.knitr": { + "envName": "ZEPPELIN_R_KNITR", + "propertyName": "zeppelin.R.knitr", + "defaultValue": "true", + "description": "whether use knitr or not" + }, + "zeppelin.R.cmd": { + "envName": "ZEPPELIN_R_CMD", + "propertyName": "zeppelin.R.cmd", + "defaultValue": "R", + "description": "R repl path" + }, + "zeppelin.R.image.width": { + "envName": "ZEPPELIN_R_IMAGE_WIDTH", + "propertyName": "zeppelin.R.image.width", + "defaultValue": "100%", + "description": "" + }, + "zeppelin.R.render.options": { + "envName": "ZEPPELIN_R_RENDER_OPTIONS", + "propertyName": "zeppelin.R.render.options", + "defaultValue": "out.format = 'html', comment = NA, echo = FALSE, results = 'asis', message = F, warning = F", + "description": "" + } + } + } +] diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java index dc8fd4ced7c..03ecb9efedd 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java @@ -39,6 +39,13 @@ public class DepInterpreterTest { private File tmpDir; private SparkInterpreter repl; + private Properties getTestProperties() { + Properties p = new Properties(); + p.setProperty("zeppelin.dep.localrepo", "local-repo"); + p.setProperty("zeppelin.dep.additionalRemoteRepository", "spark-packages,http://dl.bintray.com/spark-packages/maven,false;"); + return p; + } + @Before public void setUp() throws Exception { tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis()); @@ -46,7 +53,7 @@ public void setUp() throws Exception { tmpDir.mkdirs(); - Properties p = new Properties(); + Properties p = getTestProperties(); dep = new DepInterpreter(p); dep.open(); diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 5b132777419..409f938d63d 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -63,6 +63,16 @@ public static int getSparkVersionNumber() { return version; } + public static Properties getSparkTestProperties() { + Properties p = new Properties(); + p.setProperty("master", "local[*]"); + p.setProperty("spark.app.name", "Zeppelin Test"); + p.setProperty("zeppelin.spark.useHiveContext", "true"); + p.setProperty("zeppelin.spark.maxResult", "1000"); + + return p; + } + @Before public void setUp() throws Exception { tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis()); @@ -71,10 +81,9 @@ public void setUp() throws Exception { tmpDir.mkdirs(); if (repl == null) { - Properties p = new Properties(); intpGroup = new InterpreterGroup(); intpGroup.put("note", new LinkedList()); - repl = new SparkInterpreter(p); + repl = new SparkInterpreter(getSparkTestProperties()); repl.setInterpreterGroup(intpGroup); intpGroup.get("note").add(repl); repl.open(); @@ -207,8 +216,7 @@ public void emptyConfigurationVariablesOnlyForNonSparkProperties() { @Test public void shareSingleSparkContext() throws InterruptedException { // create another SparkInterpreter - Properties p = new Properties(); - SparkInterpreter repl2 = new SparkInterpreter(p); + SparkInterpreter repl2 = new SparkInterpreter(getSparkTestProperties()); repl2.setInterpreterGroup(intpGroup); intpGroup.get("note").add(repl2); repl2.open(); diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index c2cc1e637c6..3196cf56bd8 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -46,6 +46,10 @@ public class SparkSqlInterpreterTest { @Before public void setUp() throws Exception { Properties p = new Properties(); + p.putAll(SparkInterpreterTest.getSparkTestProperties()); + p.setProperty("zeppelin.spark.maxResult", "1000"); + p.setProperty("zeppelin.spark.concurrentSQL", "false"); + p.setProperty("zeppelin.spark.sql.stacktrace", "false"); if (repl == null) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java index c8454786128..9de2879351f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Properties; +import com.google.gson.annotations.SerializedName; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; @@ -129,6 +130,7 @@ public void destroy() { protected Properties property; public Interpreter(Properties property) { + logger.debug("Properties: {}", property); this.property = property; } @@ -140,13 +142,16 @@ public Properties getProperty() { Properties p = new Properties(); p.putAll(property); - Map defaultProperties = Interpreter - .findRegisteredInterpreterByClassName(getClassName()).getProperties(); - for (String k : defaultProperties.keySet()) { - if (!p.containsKey(k)) { - String value = defaultProperties.get(k).getDefaultValue(); - if (value != null) { - p.put(k, defaultProperties.get(k).getDefaultValue()); + RegisteredInterpreter registeredInterpreter = Interpreter.findRegisteredInterpreterByClassName( + getClassName()); + if (null != registeredInterpreter) { + Map defaultProperties = registeredInterpreter.getProperties(); + for (String k : defaultProperties.keySet()) { + if (!p.containsKey(k)) { + String value = defaultProperties.get(k).getValue(); + if (value != null) { + p.put(k, defaultProperties.get(k).getValue()); + } } } } @@ -155,17 +160,9 @@ public Properties getProperty() { } public String getProperty(String key) { - if (property.containsKey(key)) { - return property.getProperty(key); - } - - Map defaultProperties = Interpreter - .findRegisteredInterpreterByClassName(getClassName()).getProperties(); - if (defaultProperties.containsKey(key)) { - return defaultProperties.get(key).getDefaultValue(); - } + logger.debug("key: {}, value: {}", key, getProperty().getProperty(key)); - return null; + return getProperty().getProperty(key); } @@ -228,8 +225,11 @@ public static enum FormType { * Represent registered interpreter class */ public static class RegisteredInterpreter { - private String name; + @SerializedName("interpreterGroup") private String group; + @SerializedName("interpreterName") + private String name; + @SerializedName("interpreterClassName") private String className; private Map properties; private String path; @@ -267,6 +267,10 @@ public String getPath() { return path; } + public String getInterpreterKey() { + return getGroup() + "." + getName(); + } + } /** @@ -287,10 +291,21 @@ public static void register(String name, String group, String className) { register(name, group, className, new HashMap()); } + @Deprecated public static void register(String name, String group, String className, - Map properties) { - registeredInterpreters.put(group + "." + name, new RegisteredInterpreter( - name, group, className, properties)); + Map properties) { + logger.error("Static initialization is deprecated. You should change it to use " + + "interpreter-setting.json in your jar or " + + "interpreter/{interpreter}/interpreter-setting.json"); + register(new RegisteredInterpreter(name, group, className, properties)); + } + + public static void register(RegisteredInterpreter registeredInterpreter) { + // TODO(jongyoul): Error should occur when two same interpreter key with different settings + String interpreterKey = registeredInterpreter.getInterpreterKey(); + if (!registeredInterpreters.containsKey(interpreterKey)) { + registeredInterpreters.put(interpreterKey, registeredInterpreter); + } } public static RegisteredInterpreter findRegisteredInterpreterByClassName(String className) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java index cc13aceebae..488f2a1fa93 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java @@ -21,16 +21,39 @@ * Represent property of interpreter */ public class InterpreterProperty { + String envName; + String propertyName; String defaultValue; String description; - public InterpreterProperty(String defaultValue, - String description) { - super(); + public InterpreterProperty(String envName, String propertyName, String defaultValue, + String description) { + this.envName = envName; + this.propertyName = propertyName; this.defaultValue = defaultValue; this.description = description; } + public InterpreterProperty(String defaultValue, String description) { + this(null, null, defaultValue, description); + } + + public String getEnvName() { + return envName; + } + + public void setEnvName(String envName) { + this.envName = envName; + } + + public String getPropertyName() { + return propertyName; + } + + public void setPropertyName(String propertyName) { + this.propertyName = propertyName; + } + public String getDefaultValue() { return defaultValue; } @@ -46,4 +69,28 @@ public String getDescription() { public void setDescription(String description) { this.description = description; } + + public String getValue() { + //TODO(jongyoul): Remove SparkInterpreter's getSystemDefault method + if (envName != null && !envName.isEmpty()) { + String envValue = System.getenv().get(envName); + if (envValue != null) { + return envValue; + } + } + + if (propertyName != null && !propertyName.isEmpty()) { + String propValue = System.getProperty(propertyName); + if (propValue != null) { + return propValue; + } + } + return defaultValue; + } + + @Override + public String toString() { + return String.format("{envName=%s, propertyName=%s, defaultValue=%s, description=%20s", envName, + propertyName, defaultValue, description); + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index e4273c4935d..18291621249 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -177,9 +177,10 @@ public synchronized void init() { } } catch (TException e) { - broken = true; + logger.error("Failed to create interpreter: {}", getClassName()); throw new InterpreterException(e); } finally { + // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken interpreterProcess.releaseClient(client, broken); } } @@ -195,12 +196,18 @@ public void open() { synchronized (interpreterGroup) { // initialize all interpreters in this interpreter group List interpreters = interpreterGroup.get(noteId); - for (Interpreter intp : interpreters) { + for (Interpreter intp : new ArrayList<>(interpreters)) { Interpreter p = intp; while (p instanceof WrappedInterpreter) { p = ((WrappedInterpreter) p).getInnerInterpreter(); } - ((RemoteInterpreter) p).init(); + try { + ((RemoteInterpreter) p).init(); + } catch (InterpreterException e) { + logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup", + p.getClassName()); + interpreters.remove(p); + } } } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 541aae1397b..a5fba383bc4 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -342,6 +342,10 @@ public String getInterpreterDir() { return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_DIR); } + public String getInterpreterJson() { + return getString(ConfVars.ZEPPELIN_INTERPRETER_JSON); + } + public String getInterpreterSettingPath() { return getRelativeDir(String.format("%s/interpreter.json", getConfDir())); } @@ -480,6 +484,7 @@ public static enum ConfVars { + "org.apache.zeppelin.scalding.ScaldingInterpreter," + "org.apache.zeppelin.jdbc.JDBCInterpreter," + "org.apache.zeppelin.hbase.HbaseInterpreter"), + ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"), ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 08e64656d65..17728406130 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -20,6 +20,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.NullArgumentException; @@ -45,9 +46,14 @@ import java.io.*; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Type; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.*; /** @@ -111,31 +117,42 @@ public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultO } private void init() throws InterpreterException, IOException, RepositoryException { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + String interpreterJson = conf.getInterpreterJson(); + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + + Path interpretersDir = Paths.get(conf.getInterpreterDir()); + if (Files.exists(interpretersDir)) { + for (Path interpreterDir : Files.newDirectoryStream(interpretersDir, + new DirectoryStream.Filter() { + @Override + public boolean accept(Path entry) throws IOException { + return Files.exists(entry) && Files.isDirectory(entry); + } + })) { + String interpreterDirString = interpreterDir.toString(); - // Load classes - File[] interpreterDirs = new File(conf.getInterpreterDir()).listFiles(); - if (interpreterDirs != null) { - for (File path : interpreterDirs) { - logger.info("Reading " + path.getAbsolutePath()); - URL[] urls = null; - try { - urls = recursiveBuildLibList(path); - } catch (MalformedURLException e1) { - logger.error("Can't load jars ", e1); - } - URLClassLoader ccl = new URLClassLoader(urls, oldcl); + registerInterpreterFromPath(interpreterDirString, interpreterJson); + + registerInterpreterFromResource(cl, interpreterDirString, interpreterJson); + /** + * TODO(jongyoul) + * - Remove these codes below because of legacy code + * - Support ThreadInterpreter + */ + URLClassLoader ccl = new URLClassLoader(recursiveBuildLibList(interpreterDir.toFile()), cl); for (String className : interpreterClassList) { try { + // Load classes Class.forName(className, true, ccl); - Set keys = Interpreter.registeredInterpreters.keySet(); - for (String intName : keys) { + Set interpreterKeys = Interpreter.registeredInterpreters.keySet(); + for (String interpreterKey : interpreterKeys) { if (className.equals( - Interpreter.registeredInterpreters.get(intName).getClassName())) { - Interpreter.registeredInterpreters.get(intName).setPath(path.getAbsolutePath()); - logger.info("Interpreter " + intName + " found. class=" + className); - cleanCl.put(path.getAbsolutePath(), ccl); + Interpreter.registeredInterpreters.get(interpreterKey).getClassName())) { + Interpreter.registeredInterpreters.get(interpreterKey).setPath( + interpreterDirString); + logger.info("Interpreter " + interpreterKey + " found. class=" + className); + cleanCl.put(interpreterDirString, ccl); } } } catch (ClassNotFoundException e) { @@ -145,13 +162,19 @@ private void init() throws InterpreterException, IOException, RepositoryExceptio } } + for (RegisteredInterpreter registeredInterpreter : + Interpreter.registeredInterpreters.values()) { + logger.debug("Registered: {} -> {}. Properties: {}", + registeredInterpreter.getInterpreterKey(), registeredInterpreter.getClassName(), + registeredInterpreter.getProperties()); + } + loadFromFile(); // if no interpreter settings are loaded, create default set synchronized (interpreterSettings) { if (interpreterSettings.size() == 0) { - HashMap> groupClassNameMap = - new HashMap>(); + HashMap> groupClassNameMap = new HashMap<>(); for (String k : Interpreter.registeredInterpreters.keySet()) { RegisteredInterpreter info = Interpreter.registeredInterpreters.get(k); @@ -175,17 +198,13 @@ private void init() throws InterpreterException, IOException, RepositoryExceptio } for (String k : info.getProperties().keySet()) { - p.put(k, info.getProperties().get(k).getDefaultValue()); + p.put(k, info.getProperties().get(k).getValue()); } } if (found) { // add all interpreters in group - add(groupName, - groupName, - new LinkedList(), - defaultOption, - p); + add(groupName, groupName, new LinkedList(), defaultOption, p); groupClassNameMap.remove(groupName); break; } @@ -196,11 +215,70 @@ private void init() throws InterpreterException, IOException, RepositoryExceptio for (String settingId : interpreterSettings.keySet()) { InterpreterSetting setting = interpreterSettings.get(settingId); - logger.info("Interpreter setting group {} : id={}, name={}", - setting.getGroup(), settingId, setting.getName()); + logger.info("Interpreter setting group {} : id={}, name={}", setting.getGroup(), settingId, + setting.getName()); + } + } + + private void registerInterpreterFromResource(ClassLoader cl, String interpreterDir, + String interpreterJson) + throws MalformedURLException { + URL[] urls = recursiveBuildLibList(new File(interpreterDir)); + ClassLoader tempClassLoader = new URLClassLoader(urls, cl); + + InputStream inputStream = tempClassLoader.getResourceAsStream(interpreterJson); + + if (null != inputStream) { + logger.debug("Reading {} from resources in {}", interpreterJson, interpreterDir); + List registeredInterpreterList = getInterpreterListFromJson( + inputStream); + registerInterpreters(registeredInterpreterList, interpreterDir); } } + private void registerInterpreterFromPath(String interpreterDir, + String interpreterJson) throws IOException { + + Path interpreterJsonPath = Paths.get(interpreterDir, interpreterJson); + if (Files.exists(interpreterJsonPath)) { + logger.debug("Reading {}", interpreterJsonPath); + List registeredInterpreterList = getInterpreterListFromJson( + interpreterJsonPath); + registerInterpreters(registeredInterpreterList, interpreterDir); + } + } + + private List getInterpreterListFromJson(Path filename) + throws FileNotFoundException { + return getInterpreterListFromJson(new FileInputStream(filename.toFile())); + } + + private List getInterpreterListFromJson(InputStream stream) { + Type registeredInterpreterListType = new TypeToken>() { + }.getType(); + return gson.fromJson(new InputStreamReader(stream), registeredInterpreterListType); + } + + private void registerInterpreters(List registeredInterpreters, + String absolutePath) { + for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) { + String className = registeredInterpreter.getClassName(); + if (validateRegisterInterpreter(registeredInterpreter) && + null == Interpreter.findRegisteredInterpreterByClassName(className)) { + registeredInterpreter.setPath(absolutePath); + Interpreter.register(registeredInterpreter); + logger.debug("Registered. key: {}, className: {}, path: {}", + registeredInterpreter.getInterpreterKey(), registeredInterpreter.getClassName(), + registeredInterpreter.getProperties()); + } + } + } + + private boolean validateRegisterInterpreter(RegisteredInterpreter registeredInterpreter) { + return null != registeredInterpreter.getGroup() && null != registeredInterpreter.getName() && + null != registeredInterpreter.getClassName(); + } + private void loadFromFile() throws IOException { GsonBuilder builder = new GsonBuilder(); builder.setPrettyPrinting(); @@ -745,6 +823,8 @@ private Interpreter createRepl(String dirName, String className, throws InterpreterException { logger.info("Create repl {} from {}", className, dirName); + updatePropertiesFromRegisteredInterpreter(property, className); + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); try { @@ -806,6 +886,9 @@ private Interpreter createRemoteRepl(String interpreterPath, String noteId, Stri int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId; int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE); + + updatePropertiesFromRegisteredInterpreter(property, className); + LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter( property, noteId, className, conf.getInterpreterRemoteRunnerPath(), interpreterPath, localRepoPath, connectTimeout, @@ -813,6 +896,22 @@ private Interpreter createRemoteRepl(String interpreterPath, String noteId, Stri return intp; } + private Properties updatePropertiesFromRegisteredInterpreter(Properties properties, + String className) { + RegisteredInterpreter registeredInterpreter = Interpreter.findRegisteredInterpreterByClassName( + className); + if (null != registeredInterpreter) { + Map defaultProperties = registeredInterpreter.getProperties(); + for (String key : defaultProperties.keySet()) { + if (!properties.containsKey(key) && null != defaultProperties.get(key).getValue()) { + properties.setProperty(key, defaultProperties.get(key).getValue()); + } + } + } + + return properties; + } + private URL[] recursiveBuildLibList(File path) throws MalformedURLException { URL[] urls = new URL[0];