diff --git a/conf/zeppelin-env.cmd.template b/conf/zeppelin-env.cmd.template index 59953dcc38c..d4be68cf45d 100644 --- a/conf/zeppelin-env.cmd.template +++ b/conf/zeppelin-env.cmd.template @@ -34,6 +34,7 @@ REM set ZEPPELIN_NOTEBOOK_S3_USER REM User in bucket where notebook REM set ZEPPELIN_IDENT_STRING REM A string representing this instance of zeppelin. $USER by default. REM set ZEPPELIN_NICENESS REM The scheduling priority for daemons. Defaults to 0. REM set ZEPPELIN_INTERPRETER_LOCALREPO REM Local repository for interpreter's additional dependency loading +REM set ZEPPELIN_INTERPRETER_DOWNLOAD_DIR REM Path for interpreter download REM set ZEPPELIN_NOTEBOOK_STORAGE REM Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote). diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template index be6f3dd83f1..7fc616d28f7 100644 --- a/conf/zeppelin-env.sh.template +++ b/conf/zeppelin-env.sh.template @@ -35,6 +35,7 @@ # export ZEPPELIN_IDENT_STRING # A string representing this instance of zeppelin. $USER by default. # export ZEPPELIN_NICENESS # The scheduling priority for daemons. Defaults to 0. # export ZEPPELIN_INTERPRETER_LOCALREPO # Local repository for interpreter's additional dependency loading +# export ZEPPELIN_INTERPRETER_DOWNLOAD_DIR # Path for interpreter download # export ZEPPELIN_NOTEBOOK_STORAGE # Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote). #### Spark interpreter configuration #### diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 2d1fea1ba36..101bd618a22 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -170,6 +170,12 @@ Interpreter implementation base directory + + zeppelin.interpreter.download.dir + interpreter + Path for interpreter download + + zeppelin.interpreter.localRepo local-repo diff --git a/docs/install/install.md b/docs/install/install.md index 696f837b295..e110e0f978a 100644 --- a/docs/install/install.md +++ b/docs/install/install.md @@ -174,6 +174,12 @@ You can configure Zeppelin with both **environment variables** in `conf/zeppelin notebook The root directory where Zeppelin notebook directories are saved + + ZEPPELIN_INTERPRETER_DOWNLOAD_DIR + zeppelin.interpreter.download.dir + interpreter + Path for interpreter download + ZEPPELIN_NOTEBOOK_S3_BUCKET zeppelin.notebook.s3.bucket diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java index 9af0a60cd94..56e361e8655 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java @@ -37,6 +37,7 @@ import org.apache.zeppelin.dep.Repository; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; +import org.apache.zeppelin.rest.message.LoadDynamicInterpreterRequest; import org.apache.zeppelin.rest.message.NewInterpreterSettingRequest; import org.apache.zeppelin.rest.message.UpdateInterpreterSettingRequest; import org.apache.zeppelin.server.JsonResponse; @@ -249,4 +250,46 @@ public Response removeRepository(@PathParam("repoId") String repoId) { } return new JsonResponse(Status.OK).build(); } + + /** + * load a downloaded interpreter via external repository + */ + @POST + @Path("load/{interpreterGroupName}/{interpreterName}") + public Response loadDynamicInterpreter( + @PathParam("interpreterGroupName") String interpreterGroupName, + @PathParam("interpreterName") String interpreterName, String message) { + logger.info("dynamic load interpreter interpreterGroupName [{}] name [{}]", + interpreterGroupName, interpreterName); + try { + LoadDynamicInterpreterRequest request = gson.fromJson( + message, LoadDynamicInterpreterRequest.class + ); + + if (request.getClassName() == null + || request.getArtifact() == null + || request.getUrl() == null) { + throw new Exception("invalid request data"); + } + + boolean result = interpreterFactory.loadDynamicInterpreter( + interpreterGroupName, + interpreterName, + request.getArtifact(), + request.getClassName(), + request.getUrl(), + request.isSnapshot() + ); + + if (result == false) { + throw new Exception("can't not found artifact"); + } + + } catch (Exception e) { + logger.error("Exception in InterpreterRestApi while adding repository - load failed ", e); + return new JsonResponse( + Status.NOT_FOUND, e.getMessage(), ExceptionUtils.getStackTrace(e)).build(); + } + return new JsonResponse(Status.OK).build(); + } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/LoadDynamicInterpreterRequest.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/LoadDynamicInterpreterRequest.java new file mode 100644 index 00000000000..658768be5b6 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/LoadDynamicInterpreterRequest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rest.message; + +import java.util.Map; + +/** + * LoadDynamicInterpreterRequest rest api request message + */ + +public class LoadDynamicInterpreterRequest { + String artifact; + String className; + Map repository; + + public LoadDynamicInterpreterRequest() { + + } + + public String getArtifact() { return artifact; } + + public String getClassName() { return className; } + + public Map getRepository() { return repository; } + + public String getUrl() throws ClassCastException { + Object urlObj = repository.get("url"); + if (urlObj == null) { + return null; + } + return (String) urlObj; + } + + public Boolean isSnapshot() throws ClassCastException { + Object snapshotFlagObj = repository.get("snapshot"); + if (snapshotFlagObj == null) { + return false; + } + return (Boolean) snapshotFlagObj; + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index fa52957c9ea..75c517478ab 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -508,6 +508,7 @@ public static enum ConfVars { ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"), + ZEPPELIN_INTERPRETER_DOWNLOAD_DIR("zeppelin.interpreter.download.dir", "interpreter"), ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 17728406130..d410cf7fa1d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -22,8 +22,10 @@ import com.google.gson.reflect.TypeToken; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOExceptionWithCause; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.NullArgumentException; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.dep.Dependency; @@ -66,7 +68,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { .synchronizedMap(new HashMap()); private ZeppelinConfiguration conf; - String[] interpreterClassList; + List interpreterClassList; private Map interpreterSettings = new HashMap(); @@ -105,7 +107,10 @@ public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultO this.interpreterRepositories = depResolver.getRepos(); this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS); - interpreterClassList = replsConf.split(","); + interpreterClassList = new ArrayList(); + for (String className : replsConf.split(",")) { + interpreterClassList.add(className); + } GsonBuilder builder = new GsonBuilder(); builder.setPrettyPrinting(); @@ -116,6 +121,86 @@ public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultO init(); } + public boolean loadDynamicInterpreter(String intpGroupName, String intpName, String artifact, + String intpClassName) { + return loadDynamicInterpreter(intpGroupName, intpName, artifact, intpClassName, null, false); + } + + public boolean loadDynamicInterpreter(String intpGroupName, String intpName, String artifact, + String intpClassName, String repositoryUrl, boolean isSnapShotRepo) { + String[] artifactItem = artifact.split(":"); + String zepInterpreterRepoDir = conf.getString(ConfVars.ZEPPELIN_INTERPRETER_DOWNLOAD_DIR); + String zepInterpreterRepoFullPath = conf.getRelativeDir( + ConfVars.ZEPPELIN_INTERPRETER_DOWNLOAD_DIR + ); + String interpreterDesPath = String.format("%s/%s/%s/", zepInterpreterRepoDir, + intpGroupName, intpName); + String interpreterLoadPath = String.format("%s/%s/%s", zepInterpreterRepoFullPath, + intpGroupName, intpName); + + if (artifactItem.length <= 0) { + logger.error("Failed load dynamic interpreter - invalid artifact : {}", artifact); + return false; + } + + try { + if (repositoryUrl != null) { + depResolver.addRepo("dyInterpreterRepo", repositoryUrl, isSnapShotRepo); + } + logger.info("interpreter path : {}", interpreterLoadPath); + depResolver.load(artifact, interpreterDesPath); + setDynamicInterpreter(intpClassName, interpreterLoadPath); + } catch (Exception e) { + logger.error("Failed load dynamic interpreter : ", e); + return false; + } + return true; + } + + protected void setDynamicInterpreter(String interpreterClassName, String fileDirPath) + throws InterpreterException, IOException, ClassNotFoundException { + logger.info("load Dynamic Interpreter ClassName : {}", interpreterClassName); + logger.info("load Dynamic Interpreter FilePath : {}", interpreterClassName); + + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + interpreterClassList.add(interpreterClassName); + // Load classes + File interpreterDir = new File(fileDirPath); + + if (interpreterDir != null) { + URL[] urls = null; + try { + urls = recursiveBuildLibList(interpreterDir); + } catch (MalformedURLException e) { + logger.error("Can't load jars ", e); + throw new IOException(e); + } + URLClassLoader ccl = new URLClassLoader(urls, oldcl); + + try { + Class.forName(interpreterClassName, true, ccl); + Set keys = Interpreter.registeredInterpreters.keySet(); + for (String intName : keys) { + if (interpreterClassName.equals( + Interpreter.registeredInterpreters.get(intName).getClassName())) { + Interpreter.registeredInterpreters.get(intName).setPath(fileDirPath); + logger.info("Interpreter {} found. class={}", intName, fileDirPath); + cleanCl.put(fileDirPath, ccl); + } + } + } catch (ClassNotFoundException e) { + logger.error("Load error : ", e); + throw e; + } + } + + for (String settingId : interpreterSettings.keySet()) { + InterpreterSetting setting = interpreterSettings.get(settingId); + logger.info("Interpreter setting group {} : id={}, name={}", + setting.getGroup(), settingId, setting.getName()); + } + } + private void init() throws InterpreterException, IOException, RepositoryException { String interpreterJson = conf.getInterpreterJson(); ClassLoader cl = Thread.currentThread().getContextClassLoader();