diff --git a/bin/interpreter.sh b/bin/interpreter.sh index 458ffc00d47..f23ca823e62 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -220,8 +220,8 @@ if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUB fi eval $INTERPRETER_RUN_COMMAND & - pid=$! + if [[ -z "${pid}" ]]; then exit 1; else diff --git a/bin/stop-interpreter.sh b/bin/stop-interpreter.sh new file mode 100755 index 00000000000..e6ff16e9e9f --- /dev/null +++ b/bin/stop-interpreter.sh @@ -0,0 +1,47 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Stop Zeppelin Interpreter Processes +# + +bin=$(dirname "${BASH_SOURCE-$0}") +bin=$(cd "${bin}">/dev/null; pwd) + +. "${bin}/common.sh" + +export ZEPPELIN_FORCE_STOP=1 + +ZEPPELIN_STOP_INTERPRETER_MAIN=org.apache.zeppelin.interpreter.recovery.StopInterpreter +ZEPPELIN_LOGFILE="${ZEPPELIN_LOG_DIR}/stop-interpreter.log" +JAVA_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}" + +if [[ -d "${ZEPPELIN_HOME}/zeppelin-zengine/target/classes" ]]; then + ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-zengine/target/classes" +fi + +if [[ -d "${ZEPPELIN_HOME}/zeppelin-interpreter/target/classes" ]]; then + ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-interpreter/target/classes" +fi + +addJarInDir "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib" +addJarInDir "${ZEPPELIN_HOME}/zeppelin-server/target/lib" +addJarInDir "${ZEPPELIN_HOME}/lib" +addJarInDir "${ZEPPELIN_HOME}/lib/interpreter" + +CLASSPATH+=":${ZEPPELIN_CLASSPATH}" +$ZEPPELIN_RUNNER $JAVA_OPTS -cp $CLASSPATH $ZEPPELIN_STOP_INTERPRETER_MAIN ${@} diff --git a/bin/zeppelin-daemon.sh b/bin/zeppelin-daemon.sh index 5982aee2e0f..e8988497513 100755 --- a/bin/zeppelin-daemon.sh +++ b/bin/zeppelin-daemon.sh @@ -217,18 +217,6 @@ function stop() { action_msg "${ZEPPELIN_NAME} stop" "${SET_OK}" fi fi - - # list all pid that used in remote interpreter and kill them - for f in ${ZEPPELIN_PID_DIR}/*.pid; do - if [[ ! -f ${f} ]]; then - continue; - fi - - pid=$(cat ${f}) - wait_for_zeppelin_to_die $pid 20 - $(rm -f ${f}) - done - } function find_zeppelin_process() { diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 3c5bbeae59a..d566a717884 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -480,4 +480,45 @@ 10000:10010 --> + + + + + + + + + + + + diff --git a/docs/usage/interpreter/overview.md b/docs/usage/interpreter/overview.md index dd5ed220c88..035c381b8a9 100644 --- a/docs/usage/interpreter/overview.md +++ b/docs/usage/interpreter/overview.md @@ -144,3 +144,11 @@ So users needs to understand the ([interpreter mode setting ](../usage/interpret In this scenario, user need to put `ConfInterpreter` as the first paragraph as the below example. Otherwise the customized setting can not be applied (Actually it would report ERROR) + +## Interpreter Process Recovery + +Before 0.8.0, shutting down Zeppelin also mean to shutdown all the running interpreter processes. Usually admin will shutdown Zeppelin server for maintenance or upgrade, but don't want to shut down the running interpreter processes. +In such cases, interpreter process recovery is necessary. Starting from 0.8.0, user can enable interpreter process recovering via setting `zeppelin.recovery.storage.class` as +`org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage` or other implementations if available in future, by default it is `org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage` + which means recovery is not enabled. Enable recover means shutting down Zeppelin would not terminating interpreter process, +and when Zeppelin is restarted, it would try to reconnect to the existing running interpreter processes. If you want to kill all the interpreter processes after terminating Zeppelin even when recovery is enabled, you can run `bin/stop-interpreter.sh` diff --git a/spark/src/main/resources/interpreter-setting.json b/spark/src/main/resources/interpreter-setting.json index 485f6950df0..d656532eb07 100644 --- a/spark/src/main/resources/interpreter-setting.json +++ b/spark/src/main/resources/interpreter-setting.json @@ -61,7 +61,7 @@ "description": "Spark master uri. ex) spark://masterhost:7077", "type": "string" }, - "zeppelin.spark.unSupportedVersionCheck": { + "zeppelin.spark.enableSupportedVersionCheck": { "envName": null, "propertyName": "zeppelin.spark.enableSupportedVersionCheck", "defaultValue": true, diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 438c661f8bc..77279edcd39 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -355,6 +355,19 @@ public String getNotebookDir() { return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR); } + public String getRecoveryDir() { + return getRelativeDir(ConfVars.ZEPPELIN_RECOVERY_DIR); + } + + public String getRecoveryStorageClass() { + return getString(ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS); + } + + public boolean isRecoveryEnabled() { + return !getString(ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS).equals( + "org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage"); + } + public String getUser() { return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_USER); } @@ -658,6 +671,10 @@ public static enum ConfVars { ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), + ZEPPELIN_RECOVERY_DIR("zeppelin.recovery.dir", "recovery"), + ZEPPELIN_RECOVERY_STORAGE_CLASS("zeppelin.recovery.storage.class", + "org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage"), + // use specified notebook (id) as homescreen ZEPPELIN_NOTEBOOK_HOMESCREEN("zeppelin.notebook.homescreen", null), // whether homescreen notebook will be hidden from notebook list or not diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java index b991079feca..813dad86881 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java @@ -19,8 +19,20 @@ /** * Interface to InterpreterClient which is created by InterpreterLauncher. This is the component - * that is used to for the communication fromzeppelin-server process to zeppelin interpreter process + * that is used to for the communication from zeppelin-server process to zeppelin interpreter + * process. */ public interface InterpreterClient { + String getInterpreterSettingName(); + + void start(String userName, Boolean isUserImpersonate); + + void stop(); + + String getHost(); + + int getPort(); + + boolean isRunning(); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java index 9e253555a90..6901e2c7a62 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java @@ -30,6 +30,7 @@ public class InterpreterLaunchContext { private Properties properties; private InterpreterOption option; private InterpreterRunner runner; + private String interpreterGroupId; private String interpreterSettingId; private String interpreterSettingGroup; private String interpreterSettingName; @@ -37,12 +38,14 @@ public class InterpreterLaunchContext { public InterpreterLaunchContext(Properties properties, InterpreterOption option, InterpreterRunner runner, + String interpreterGroupId, String interpreterSettingId, String interpreterSettingGroup, String interpreterSettingName) { this.properties = properties; this.option = option; this.runner = runner; + this.interpreterGroupId = interpreterGroupId; this.interpreterSettingId = interpreterSettingId; this.interpreterSettingGroup = interpreterSettingGroup; this.interpreterSettingName = interpreterSettingName; @@ -60,6 +63,10 @@ public InterpreterRunner getRunner() { return runner; } + public String getInterpreterGroupId() { + return interpreterGroupId; + } + public String getInterpreterSettingId() { return interpreterSettingId; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java index 5d0acf3515a..1cee20e7a04 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.interpreter.launcher; import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import java.io.IOException; import java.util.Properties; @@ -29,9 +30,11 @@ public abstract class InterpreterLauncher { protected ZeppelinConfiguration zConf; protected Properties properties; + protected RecoveryStorage recoveryStorage; - public InterpreterLauncher(ZeppelinConfiguration zConf) { + public InterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { this.zConf = zConf; + this.recoveryStorage = recoveryStorage; } public abstract InterpreterClient launch(InterpreterLaunchContext context) throws IOException; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java new file mode 100644 index 00000000000..8bbe8302fcf --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.recovery; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.launcher.InterpreterClient; + +import java.io.IOException; +import java.util.Map; + + +/** + * Interface for storing interpreter process recovery metadata. + * + */ +public abstract class RecoveryStorage { + + protected ZeppelinConfiguration zConf; + protected Map restoredClients; + + public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException { + this.zConf = zConf; + } + + /** + * Update RecoveryStorage when new InterpreterClient is started + * @param client + * @throws IOException + */ + public abstract void onInterpreterClientStart(InterpreterClient client) throws IOException; + + /** + * Update RecoveryStorage when InterpreterClient is stopped + * @param client + * @throws IOException + */ + public abstract void onInterpreterClientStop(InterpreterClient client) throws IOException; + + /** + * + * It is only called when Zeppelin Server is started. + * + * @return + * @throws IOException + */ + public abstract Map restore() throws IOException; + + + /** + * It is called after constructor + * + * @throws IOException + */ + public void init() throws IOException { + this.restoredClients = restore(); + } + + public InterpreterClient getInterpreterClient(String interpreterGroupId) { + if (restoredClients.containsKey(interpreterGroupId)) { + return restoredClients.get(interpreterGroupId); + } else { + return null; + } + } +} diff --git a/zeppelin-server/notebook/.python.recovery.crc b/zeppelin-server/notebook/.python.recovery.crc new file mode 100644 index 00000000000..6bd3e7ae43b Binary files /dev/null and b/zeppelin-server/notebook/.python.recovery.crc differ diff --git a/zeppelin-server/notebook/python.recovery b/zeppelin-server/notebook/python.recovery new file mode 100644 index 00000000000..eaf4938fdad --- /dev/null +++ b/zeppelin-server/notebook/python.recovery @@ -0,0 +1 @@ +2CZA1DVUG:shared_process 192.168.3.2:55410 \ No newline at end of file diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index 08ede293e4d..925c637fcfc 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -349,6 +349,21 @@ + + maven-surefire-plugin + ${plugin.surefire.version} + + -Xmx2g -Xms1g -Dfile.encoding=UTF-8 + + ${tests.to.exclude} + + + 1 + + + + + org.scala-tools maven-scala-plugin diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 0b66a437d5b..f8625c2357c 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -162,7 +162,7 @@ public ZeppelinServer() throws Exception { public static void main(String[] args) throws InterruptedException { - ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + final ZeppelinConfiguration conf = ZeppelinConfiguration.create(); conf.setProperty("args", args); jettyWebServer = setupJettyServer(conf); @@ -199,7 +199,9 @@ public static void main(String[] args) throws InterruptedException { LOG.info("Shutting down Zeppelin Server ... "); try { jettyWebServer.stop(); - notebook.getInterpreterSettingManager().close(); + if (!conf.isRecoveryEnabled()) { + ZeppelinServer.notebook.getInterpreterSettingManager().close(); + } notebook.close(); Thread.sleep(3000); } catch (Exception e) { @@ -222,7 +224,9 @@ public static void main(String[] args) throws InterruptedException { } jettyWebServer.join(); - ZeppelinServer.notebook.getInterpreterSettingManager().close(); + if (!conf.isRecoveryEnabled()) { + ZeppelinServer.notebook.getInterpreterSettingManager().close(); + } } private static Server setupJettyServer(ZeppelinConfiguration conf) { diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java new file mode 100644 index 00000000000..37277ee0c36 --- /dev/null +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.recovery; + +import com.google.common.io.Files; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.io.FileUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; +import org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage; +import org.apache.zeppelin.interpreter.recovery.StopInterpreter; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.rest.AbstractTestRestApi; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.server.ZeppelinServer; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +public class RecoveryTest extends AbstractTestRestApi { + + private Gson gson = new Gson(); + private static File recoveryDir = null; + + @BeforeClass + public static void init() throws Exception { + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(), + FileSystemRecoveryStorage.class.getName()); + recoveryDir = Files.createTempDir(); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_DIR.getVarName(), recoveryDir.getAbsolutePath()); + startUp(RecoveryTest.class.getSimpleName()); + } + + @AfterClass + public static void destroy() throws Exception { + shutDown(); + FileUtils.deleteDirectory(recoveryDir); + } + + @Test + public void testRecovery() throws Exception { + Note note1 = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS); + + // run python interpreter and create new variable `user` + Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); + p1.setText("%python user='abc'"); + PostMethod post = httpPost("/notebook/job/" + note1.getId(), ""); + assertThat(post, isAllowed()); + Map resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken>() { + }.getType()); + assertEquals(resp.get("status"), "OK"); + post.releaseConnection(); + assertEquals(Job.Status.FINISHED, p1.getStatus()); + + // shutdown zeppelin and restart it + shutDown(); + startUp(RecoveryTest.class.getSimpleName()); + + // run the paragraph again, but change the text to print variable `user` + note1 = ZeppelinServer.notebook.getNote(note1.getId()); + p1 = note1.getParagraph(p1.getId()); + p1.setText("%python print(user)"); + post = httpPost("/notebook/job/" + note1.getId(), ""); + assertEquals(resp.get("status"), "OK"); + post.releaseConnection(); + assertEquals(Job.Status.FINISHED, p1.getStatus()); + assertEquals("abc\n", p1.getResult().message().get(0).getData()); + } + + @Test + public void testRecovery_2() throws Exception { + Note note1 = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS); + + // run python interpreter and create new variable `user` + Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); + p1.setText("%python user='abc'"); + PostMethod post = httpPost("/notebook/job/" + note1.getId(), ""); + assertThat(post, isAllowed()); + Map resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken>() { + }.getType()); + assertEquals(resp.get("status"), "OK"); + post.releaseConnection(); + assertEquals(Job.Status.FINISHED, p1.getStatus()); + + // restart the python interpreter + ZeppelinServer.notebook.getInterpreterSettingManager().restart( + ((ManagedInterpreterGroup) p1.getBindedInterpreter().getInterpreterGroup()) + .getInterpreterSetting().getId() + ); + + // shutdown zeppelin and restart it + shutDown(); + startUp(RecoveryTest.class.getSimpleName()); + + // run the paragraph again, but change the text to print variable `user`. + // can not recover the python interpreter, because it has been shutdown. + note1 = ZeppelinServer.notebook.getNote(note1.getId()); + p1 = note1.getParagraph(p1.getId()); + p1.setText("%python print(user)"); + post = httpPost("/notebook/job/" + note1.getId(), ""); + assertEquals(resp.get("status"), "OK"); + post.releaseConnection(); + assertEquals(Job.Status.ERROR, p1.getStatus()); + } + + @Test + public void testRecovery_3() throws Exception { + Note note1 = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS); + + // run python interpreter and create new variable `user` + Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); + p1.setText("%python user='abc'"); + PostMethod post = httpPost("/notebook/job/" + note1.getId(), ""); + assertThat(post, isAllowed()); + Map resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken>() { + }.getType()); + assertEquals(resp.get("status"), "OK"); + post.releaseConnection(); + assertEquals(Job.Status.FINISHED, p1.getStatus()); + + // shutdown zeppelin and restart it + shutDown(); + StopInterpreter.main(new String[]{}); + + startUp(RecoveryTest.class.getSimpleName()); + + // run the paragraph again, but change the text to print variable `user`. + // can not recover the python interpreter, because it has been shutdown. + note1 = ZeppelinServer.notebook.getNote(note1.getId()); + p1 = note1.getParagraph(p1.getId()); + p1.setText("%python print(user)"); + post = httpPost("/notebook/job/" + note1.getId(), ""); + assertEquals(resp.get("status"), "OK"); + post.releaseConnection(); + assertEquals(Job.Status.ERROR, p1.getStatus()); + } +} diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 431e3647b27..7c083650814 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -318,8 +318,10 @@ protected static void shutDown() throws Exception { if (!wasRunning) { // restart interpreter to stop all interpreter processes List settingList = ZeppelinServer.notebook.getInterpreterSettingManager().get(); - for (InterpreterSetting setting : settingList) { - ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId()); + if (!ZeppelinServer.notebook.getConf().isRecoveryEnabled()) { + for (InterpreterSetting setting : settingList) { + ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId()); + } } if (shiroIni != null) { FileUtils.deleteQuietly(shiroIni); @@ -350,7 +352,12 @@ protected static void shutDown() throws Exception { .clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED.getVarName()); } - FileUtils.deleteDirectory(confDir); + if (!ZeppelinServer.notebook.getConf().isRecoveryEnabled()) { + // don't delete interpreter.json when recovery is enabled. otherwise the interpreter setting + // id will change after zeppelin restart, then we can not recover interpreter process + // properly + FileUtils.deleteDirectory(confDir); + } } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index d5ff947ad0c..424aa27a166 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -38,6 +38,8 @@ import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher; import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher; import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager; +import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage; +import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller; @@ -144,6 +146,9 @@ public class InterpreterSetting { + private transient RecoveryStorage recoveryStorage; + /////////////////////////////////////////////////////////////////////////////////////////// + /** * Builder class for InterpreterSetting */ @@ -242,6 +247,11 @@ public Builder setLifecycleManager(LifecycleManager lifecycleManager) { return this; } + public Builder setRecoveryStorage(RecoveryStorage recoveryStorage) { + interpreterSetting.recoveryStorage = recoveryStorage; + return this; + } + public InterpreterSetting create() { // post processing interpreterSetting.postProcessing(); @@ -261,6 +271,13 @@ void postProcessing() { if (this.lifecycleManager == null) { this.lifecycleManager = new NullLifecycleManager(conf); } + if (this.recoveryStorage == null) { + try { + this.recoveryStorage = new NullRecoveryStorage(conf, interpreterSettingManager); + } catch (IOException e) { + // ignore this exception as NullRecoveryStorage will do nothing. + } + } } /** @@ -285,9 +302,9 @@ public InterpreterSetting(InterpreterSetting o) { private void createLauncher() { if (group.equals("spark")) { - this.launcher = new SparkInterpreterLauncher(this.conf); + this.launcher = new SparkInterpreterLauncher(this.conf, this.recoveryStorage); } else { - this.launcher = new ShellScriptLauncher(this.conf); + this.launcher = new ShellScriptLauncher(this.conf, this.recoveryStorage); } } @@ -344,6 +361,15 @@ public InterpreterSetting setLifecycleManager(LifecycleManager lifecycleManager) return this; } + public InterpreterSetting setRecoveryStorage(RecoveryStorage recoveryStorage) { + this.recoveryStorage = recoveryStorage; + return this; + } + + public RecoveryStorage getRecoveryStorage() { + return recoveryStorage; + } + public LifecycleManager getLifecycleManager() { return lifecycleManager; } @@ -408,7 +434,12 @@ public ManagedInterpreterGroup getOrCreateInterpreterGroup(String user, String n } void removeInterpreterGroup(String groupId) { - this.interpreterGroups.remove(groupId); + try { + interpreterGroupWriteLock.lock(); + this.interpreterGroups.remove(groupId); + } finally { + interpreterGroupWriteLock.unlock(); + } } public ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) { @@ -425,7 +456,6 @@ ManagedInterpreterGroup getInterpreterGroup(String groupId) { return interpreterGroups.get(groupId); } - @VisibleForTesting public ArrayList getAllInterpreterGroups() { try { interpreterGroupReadLock.lock(); @@ -668,16 +698,19 @@ List createInterpreters(String user, String interpreterGroupId, Str return interpreters; } - synchronized RemoteInterpreterProcess createInterpreterProcess(Properties properties) + synchronized RemoteInterpreterProcess createInterpreterProcess(String interpreterGroupId, + Properties properties) throws IOException { if (launcher == null) { createLauncher(); } InterpreterLaunchContext launchContext = new - InterpreterLaunchContext(properties, option, interpreterRunner, id, group, name); + InterpreterLaunchContext(properties, option, interpreterRunner, + interpreterGroupId, id, group, name); RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext); process.setRemoteInterpreterEventPoller( new RemoteInterpreterEventPoller(remoteInterpreterProcessListener, appEventListener)); + recoveryStorage.onInterpreterClientStart(process); return process; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java index 0b7efd5db63..42f82fad214 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -34,12 +34,16 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; +import org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage; +import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage; +import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.resource.ResourceSet; +import org.apache.zeppelin.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sonatype.aether.repository.Authentication; @@ -118,6 +122,7 @@ public class InterpreterSettingManager { private ApplicationEventListener appEventListener; private DependencyResolver dependencyResolver; private LifecycleManager lifecycleManager; + private RecoveryStorage recoveryStorage; public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration, AngularObjectRegistryListener angularObjectRegistryListener, @@ -154,13 +159,17 @@ public InterpreterSettingManager(ZeppelinConfiguration conf, this.angularObjectRegistryListener = angularObjectRegistryListener; this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; this.appEventListener = appEventListener; - try { - this.lifecycleManager = (LifecycleManager) - Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class) - .newInstance(conf); - } catch (Exception e) { - throw new IOException("Fail to create LifecycleManager", e); - } + + this.recoveryStorage = ReflectionUtils.createClazzInstance(conf.getRecoveryStorageClass(), + new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class}, + new Object[] {conf, this}); + this.recoveryStorage.init(); + LOGGER.info("Using RecoveryStorage: " + this.recoveryStorage.getClass().getName()); + + this.lifecycleManager = ReflectionUtils.createClazzInstance(conf.getLifecycleManagerClass(), + new Class[] {ZeppelinConfiguration.class}, + new Object[] {conf}); + LOGGER.info("Using LifecycleManager: " + this.lifecycleManager.getClass().getName()); init(); } @@ -174,6 +183,7 @@ private void initInterpreterSetting(InterpreterSetting interpreterSetting) { .setAppEventListener(appEventListener) .setDependencyResolver(dependencyResolver) .setLifecycleManager(lifecycleManager) + .setRecoveryStorage(recoveryStorage) .postProcessing(); } @@ -307,8 +317,16 @@ public boolean accept(Path entry) throws IOException { saveToFile(); } + public RemoteInterpreterProcessListener getRemoteInterpreterProcessListener() { + return remoteInterpreterProcessListener; + } + + public ApplicationEventListener getAppEventListener() { + return appEventListener; + } + private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir, - String interpreterJson) throws IOException { + String interpreterJson) throws IOException { URL[] urls = recursiveBuildLibList(new File(interpreterDir)); ClassLoader tempClassLoader = new URLClassLoader(urls, null); @@ -507,6 +525,10 @@ public List call(RemoteInterpreterService.Client client) throws Exceptio return resourceSet; } + public RecoveryStorage getRecoveryStorage() { + return recoveryStorage; + } + public void removeResourcesBelongsToParagraph(String noteId, String paragraphId) { for (ManagedInterpreterGroup intpGroup : getAllInterpreterGroup()) { ResourceSet resourceSet = new ResourceSet(); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java index 2378f140daa..641c0ac23ef 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java @@ -55,15 +55,31 @@ public InterpreterSetting getInterpreterSetting() { return interpreterSetting; } - public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess(Properties properties) + public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess(String userName, + Properties properties) throws IOException { if (remoteInterpreterProcess == null) { LOGGER.info("Create InterpreterProcess for InterpreterGroup: " + getId()); - remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(properties); + remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(id, properties); + synchronized (remoteInterpreterProcess) { + if (!remoteInterpreterProcess.isRunning()) { + remoteInterpreterProcess.start(userName, false); + remoteInterpreterProcess.getRemoteInterpreterEventPoller() + .setInterpreterProcess(remoteInterpreterProcess); + remoteInterpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(this); + remoteInterpreterProcess.getRemoteInterpreterEventPoller().start(); + getInterpreterSetting().getRecoveryStorage() + .onInterpreterClientStart(remoteInterpreterProcess); + } + } } return remoteInterpreterProcess; } + public RemoteInterpreterProcess getInterpreterProcess() { + return remoteInterpreterProcess; + } + public RemoteInterpreterProcess getRemoteInterpreterProcess() { return remoteInterpreterProcess; } @@ -94,6 +110,11 @@ public synchronized void close(String sessionId) { if (remoteInterpreterProcess != null) { LOGGER.info("Kill RemoteInterpreterProcess"); remoteInterpreterProcess.stop(); + try { + interpreterSetting.getRecoveryStorage().onInterpreterClientStop(remoteInterpreterProcess); + } catch (IOException e) { + LOGGER.error("Fail to store recovery data", e); + } remoteInterpreterProcess = null; } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java index 8c86129f64a..6ddcacf275a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java @@ -21,50 +21,68 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.InterpreterRunner; +import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * Interpreter Launcher which use shell script to launch the interpreter process. - * */ public class ShellScriptLauncher extends InterpreterLauncher { private static final Logger LOGGER = LoggerFactory.getLogger(ShellScriptLauncher.class); - public ShellScriptLauncher(ZeppelinConfiguration zConf) { - super(zConf); + public ShellScriptLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { + super(zConf, recoveryStorage); } @Override - public InterpreterClient launch(InterpreterLaunchContext context) { + public InterpreterClient launch(InterpreterLaunchContext context) throws IOException { LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup()); this.properties = context.getProperties(); InterpreterOption option = context.getOption(); InterpreterRunner runner = context.getRunner(); String groupName = context.getInterpreterSettingGroup(); String name = context.getInterpreterSettingName(); - int connectTimeout = zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); + if (option.isExistingProcess()) { return new RemoteInterpreterRunningProcess( + context.getInterpreterSettingName(), connectTimeout, option.getHost(), option.getPort()); } else { + // try to recover it first + if (zConf.isRecoveryEnabled()) { + InterpreterClient recoveredClient = + recoveryStorage.getInterpreterClient(context.getInterpreterGroupId()); + if (recoveredClient != null) { + if (recoveredClient.isRunning()) { + LOGGER.info("Recover interpreter process: " + recoveredClient.getHost() + ":" + + recoveredClient.getPort()); + return recoveredClient; + } else { + LOGGER.warn("Cannot recover interpreter process: " + recoveredClient.getHost() + ":" + + recoveredClient.getPort() + ", as it is already terminated."); + } + } + } + // create new remote process String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/" + context.getInterpreterSettingId(); return new RemoteInterpreterManagedProcess( runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(), - zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(), + zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(), zConf.getInterpreterDir() + "/" + groupName, localRepoPath, buildEnvFromProperties(), connectTimeout, name); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 32a0530af18..e8a9cdf881e 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -19,6 +19,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,8 +36,8 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher { private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class); - public SparkInterpreterLauncher(ZeppelinConfiguration zConf) { - super(zConf); + public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { + super(zConf, recoveryStorage); } @Override diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java new file mode 100644 index 00000000000..5a0c8adf6cd --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.recovery; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.InterpreterSettingManager; +import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; +import org.apache.zeppelin.interpreter.launcher.InterpreterClient; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; +import org.apache.zeppelin.notebook.FileSystemStorage; +import org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * Hadoop compatible FileSystem based RecoveryStorage implementation. + * + * Save InterpreterProcess in the format of: + * InterpreterGroupId host:port + */ +public class FileSystemRecoveryStorage extends RecoveryStorage { + + private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemRecoveryStorage.class); + + private InterpreterSettingManager interpreterSettingManager; + private FileSystemStorage fs; + private Path recoveryDir; + + public FileSystemRecoveryStorage(ZeppelinConfiguration zConf, + InterpreterSettingManager interpreterSettingManager) + throws IOException { + super(zConf); + this.interpreterSettingManager = interpreterSettingManager; + this.zConf = zConf; + this.fs = FileSystemStorage.get(zConf); + this.recoveryDir = this.fs.makeQualified(new Path(zConf.getRecoveryDir())); + LOGGER.info("Using folder {} to store recovery data", recoveryDir); + this.fs.tryMkDir(recoveryDir); + } + + @Override + public void onInterpreterClientStart(InterpreterClient client) throws IOException { + save(client.getInterpreterSettingName()); + } + + @Override + public void onInterpreterClientStop(InterpreterClient client) throws IOException { + save(client.getInterpreterSettingName()); + } + + private void save(String interpreterSettingName) throws IOException { + InterpreterSetting interpreterSetting = + interpreterSettingManager.getInterpreterSettingByName(interpreterSettingName); + List recoveryContent = new ArrayList<>(); + for (ManagedInterpreterGroup interpreterGroup : interpreterSetting.getAllInterpreterGroups()) { + RemoteInterpreterProcess interpreterProcess = interpreterGroup.getInterpreterProcess(); + if (interpreterProcess != null) { + recoveryContent.add(interpreterGroup.getId() + "\t" + interpreterProcess.getHost() + ":" + + interpreterProcess.getPort()); + } + } + LOGGER.debug("Updating recovery data for interpreterSetting: " + interpreterSettingName); + LOGGER.debug("Recovery Data: " + StringUtils.join(recoveryContent, System.lineSeparator())); + Path recoveryFile = new Path(recoveryDir, interpreterSettingName + ".recovery"); + fs.writeFile(StringUtils.join(recoveryContent, System.lineSeparator()), recoveryFile, true); + } + + @Override + public Map restore() throws IOException { + Map clients = new HashMap<>(); + List paths = fs.list(new Path(recoveryDir + "/*.recovery")); + + for (Path path : paths) { + String fileName = path.getName(); + String interpreterSettingName = fileName.substring(0, + fileName.length() - ".recovery".length()); + String recoveryContent = fs.readFile(path); + if (!StringUtils.isBlank(recoveryContent)) { + for (String line : recoveryContent.split(System.lineSeparator())) { + String[] tokens = line.split("\t"); + String groupId = tokens[0]; + String[] hostPort = tokens[1].split(":"); + int connectTimeout = + zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); + RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess( + interpreterSettingName, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1])); + // interpreterSettingManager may be null when this class is used when it is used + // stop-interpreter.sh + if (interpreterSettingManager != null) { + client.setRemoteInterpreterEventPoller(new RemoteInterpreterEventPoller( + interpreterSettingManager.getRemoteInterpreterProcessListener(), + interpreterSettingManager.getAppEventListener())); + } + clients.put(groupId, client); + LOGGER.info("Recovering Interpreter Process: " + hostPort[0] + ":" + hostPort[1]); + } + } + } + + return clients; + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java new file mode 100644 index 00000000000..3a7d12c70f3 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.recovery; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterSettingManager; +import org.apache.zeppelin.interpreter.launcher.InterpreterClient; + +import java.io.IOException; +import java.util.Map; + + +/** + * RecoveryStorage that do nothing, used when recovery is not enabled. + * + */ +public class NullRecoveryStorage extends RecoveryStorage { + + public NullRecoveryStorage(ZeppelinConfiguration zConf, + InterpreterSettingManager interpreterSettingManager) + throws IOException { + super(zConf); + } + + @Override + public void onInterpreterClientStart(InterpreterClient client) throws IOException { + + } + + @Override + public void onInterpreterClientStop(InterpreterClient client) throws IOException { + + } + + @Override + public Map restore() throws IOException { + return null; + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java new file mode 100644 index 00000000000..d74b1621e7e --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java @@ -0,0 +1,40 @@ +package org.apache.zeppelin.interpreter.recovery; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterSettingManager; +import org.apache.zeppelin.interpreter.launcher.InterpreterClient; +import org.apache.zeppelin.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + + +/** + * Utility class for stopping interpreter in the case that you want to stop all the + * interpreter process even when you enable recovery, or you want to kill interpreter process + * to avoid orphan process. + */ +public class StopInterpreter { + + private static Logger LOGGER = LoggerFactory.getLogger(StopInterpreter.class); + + public static void main(String[] args) throws IOException { + ZeppelinConfiguration zConf = ZeppelinConfiguration.create(); + RecoveryStorage recoveryStorage = null; + + recoveryStorage = ReflectionUtils.createClazzInstance(zConf.getRecoveryStorageClass(), + new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class}, + new Object[] {zConf, null}); + + LOGGER.info("Using RecoveryStorage: " + recoveryStorage.getClass().getName()); + Map restoredClients = recoveryStorage.restore(); + if (restoredClients != null) { + for (InterpreterClient client : restoredClients.values()) { + LOGGER.info("Stop Interpreter Process: " + client.getHost() + ":" + client.getPort()); + client.stop(); + } + } + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 6defd9ba825..bda8010d93c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -102,16 +102,7 @@ public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() thr return this.interpreterProcess; } ManagedInterpreterGroup intpGroup = getInterpreterGroup(); - this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess(properties); - synchronized (interpreterProcess) { - if (!interpreterProcess.isRunning()) { - interpreterProcess.start(this.getUserName(), false); - interpreterProcess.getRemoteInterpreterEventPoller() - .setInterpreterProcess(interpreterProcess); - interpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(intpGroup); - interpreterProcess.getRemoteInterpreterEventPoller().start(); - } - } + this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess(getUserName(), properties); return interpreterProcess; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index 27e826c70a1..3dd5bfa3493 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -214,7 +214,7 @@ public void stop() { callbackServer.stop(); } if (isRunning()) { - logger.info("kill interpreter process"); + logger.info("Kill interpreter process"); try { callRemoteFunction(new RemoteFunction() { @Override @@ -263,7 +263,6 @@ public String getInterpreterDir() { return interpreterDir; } - @VisibleForTesting public String getInterpreterSettingName() { return interpreterSettingName; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 88cc4894bed..08653ae390f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -51,12 +51,6 @@ public void setRemoteInterpreterEventPoller(RemoteInterpreterEventPoller eventPo this.remoteInterpreterEventPoller = eventPoller; } - public abstract String getHost(); - public abstract int getPort(); - public abstract void start(String userName, Boolean isUserImpersonate); - public abstract void stop(); - public abstract boolean isRunning(); - public int getConnectTimeout() { return connectTimeout; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java index d8715a0d499..0e87e4f7d4f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.interpreter.remote; import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,13 +28,16 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); private final String host; private final int port; + private final String interpreterSettingName; public RemoteInterpreterRunningProcess( + String interpreterSettingName, int connectTimeout, String host, int port ) { super(connectTimeout); + this.interpreterSettingName = interpreterSettingName; this.host = host; this.port = port; } @@ -48,6 +52,11 @@ public int getPort() { return port; } + @Override + public String getInterpreterSettingName() { + return interpreterSettingName; + } + @Override public void start(String userName, Boolean isUserImpersonate) { // assume process is externally managed. nothing to do @@ -55,7 +64,24 @@ public void start(String userName, Boolean isUserImpersonate) { @Override public void stop() { - // assume process is externally managed. nothing to do + // assume process is externally managed. nothing to do. But will kill it + // when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that. + if (System.getenv("ZEPPELIN_FORCE_STOP") != null) { + if (isRunning()) { + logger.info("Kill interpreter process"); + try { + callRemoteFunction(new RemoteFunction() { + @Override + public Void call(RemoteInterpreterService.Client client) throws Exception { + client.shutdown(); + return null; + } + }); + } catch (Exception e) { + logger.warn("ignore the exception when shutting down interpreter process.", e); + } + } + } } @Override diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/FileSystemStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/FileSystemStorage.java new file mode 100644 index 00000000000..6f3d3f97f5c --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/FileSystemStorage.java @@ -0,0 +1,168 @@ +package org.apache.zeppelin.notebook; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; + + +/** + * Hadoop FileSystem wrapper. Support both secure and no-secure mode + */ +public class FileSystemStorage { + + private static Logger LOGGER = LoggerFactory.getLogger(FileSystemStorage.class); + + private static FileSystemStorage instance; + + private ZeppelinConfiguration zConf; + private Configuration hadoopConf; + private boolean isSecurityEnabled = false; + private FileSystem fs; + + private FileSystemStorage(ZeppelinConfiguration zConf) throws IOException { + this.zConf = zConf; + this.hadoopConf = new Configuration(); + this.hadoopConf.set("fs.file.impl", RawLocalFileSystem.class.getName()); + this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled(); + + if (isSecurityEnabled) { + String keytab = zConf.getString( + ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB); + String principal = zConf.getString( + ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL); + if (StringUtils.isBlank(keytab) || StringUtils.isBlank(principal)) { + throw new IOException("keytab and principal can not be empty, keytab: " + keytab + + ", principal: " + principal); + } + UserGroupInformation.loginUserFromKeytab(principal, keytab); + } + + try { + this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), this.hadoopConf); + LOGGER.info("Creating FileSystem: " + this.fs.getClass().getCanonicalName()); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + + public static synchronized FileSystemStorage get(ZeppelinConfiguration zConf) throws IOException { + if (instance == null) { + instance = new FileSystemStorage(zConf); + } + return instance; + } + + public Path makeQualified(Path path) { + return fs.makeQualified(path); + } + + public void tryMkDir(final Path dir) throws IOException { + callHdfsOperation(new HdfsOperation() { + @Override + public Void call() throws IOException { + if (!fs.exists(dir)) { + fs.mkdirs(dir); + LOGGER.info("Create dir {} in hdfs", dir.toString()); + } + if (fs.isFile(dir)) { + throw new IOException(dir.toString() + " is file instead of directory, please remove " + + "it or specify another directory"); + } + fs.mkdirs(dir); + return null; + } + }); + } + + public List list(final Path path) throws IOException { + return callHdfsOperation(new HdfsOperation>() { + @Override + public List call() throws IOException { + List paths = new ArrayList<>(); + for (FileStatus status : fs.globStatus(path)) { + paths.add(status.getPath()); + } + return paths; + } + }); + } + + public boolean delete(final Path path) throws IOException { + return callHdfsOperation(new HdfsOperation() { + @Override + public Boolean call() throws IOException { + return fs.delete(path, true); + } + }); + } + + public String readFile(final Path file) throws IOException { + return callHdfsOperation(new HdfsOperation() { + @Override + public String call() throws IOException { + LOGGER.debug("Read from file: " + file); + ByteArrayOutputStream noteBytes = new ByteArrayOutputStream(); + IOUtils.copyBytes(fs.open(file), noteBytes, hadoopConf); + return new String(noteBytes.toString( + zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING))); + } + }); + } + + public void writeFile(final String content, final Path file, boolean writeTempFileFirst) + throws IOException { + callHdfsOperation(new HdfsOperation() { + @Override + public Void call() throws IOException { + InputStream in = new ByteArrayInputStream(content.getBytes( + zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING))); + Path tmpFile = new Path(file.toString() + ".tmp"); + IOUtils.copyBytes(in, fs.create(tmpFile), hadoopConf); + fs.delete(file, true); + fs.rename(tmpFile, file); + return null; + } + }); + } + + private interface HdfsOperation { + T call() throws IOException; + } + + public synchronized T callHdfsOperation(final HdfsOperation func) throws IOException { + if (isSecurityEnabled) { + UserGroupInformation.getLoginUser().reloginFromKeytab(); + try { + return UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction() { + @Override + public T run() throws Exception { + return func.call(); + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } else { + return func.call(); + } + } + +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/FileSystemNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/FileSystemNotebookRepo.java index ba858e69255..d8ec0e5400b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/FileSystemNotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/FileSystemNotebookRepo.java @@ -8,6 +8,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.FileSystemStorage; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.NoteInfo; import org.apache.zeppelin.user.AuthenticationInfo; @@ -37,108 +38,45 @@ public class FileSystemNotebookRepo implements NotebookRepo { private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemNotebookRepo.class); - private Configuration hadoopConf; - private ZeppelinConfiguration zConf; - private boolean isSecurityEnabled = false; - private FileSystem fs; + private FileSystemStorage fs; private Path notebookDir; public FileSystemNotebookRepo(ZeppelinConfiguration zConf) throws IOException { - this.zConf = zConf; - this.hadoopConf = new Configuration(); - - this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled(); - if (isSecurityEnabled) { - String keytab = zConf.getString( - ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB); - String principal = zConf.getString( - ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL); - if (StringUtils.isBlank(keytab) || StringUtils.isBlank(principal)) { - throw new IOException("keytab and principal can not be empty, keytab: " + keytab - + ", principal: " + principal); - } - UserGroupInformation.loginUserFromKeytab(principal, keytab); - } + this.fs = FileSystemStorage.get(zConf); + this.notebookDir = this.fs.makeQualified(new Path(zConf.getNotebookDir())); + LOGGER.info("Using folder {} to store notebook", notebookDir); + this.fs.tryMkDir(notebookDir); - try { - this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), new Configuration()); - LOGGER.info("Creating FileSystem: " + this.fs.getClass().getCanonicalName()); - this.notebookDir = fs.makeQualified(new Path(zConf.getNotebookDir())); - LOGGER.info("Using folder {} to store notebook", notebookDir); - } catch (URISyntaxException e) { - throw new IOException(e); - } - if (!fs.exists(notebookDir)) { - fs.mkdirs(notebookDir); - LOGGER.info("Create notebook dir {} in hdfs", notebookDir.toString()); - } - if (fs.isFile(notebookDir)) { - throw new IOException("notebookDir {} is file instead of directory, please remove it or " + - "specify another directory"); - } } @Override public List list(AuthenticationInfo subject) throws IOException { - return callHdfsOperation(new HdfsOperation>() { - @Override - public List call() throws IOException { - List noteInfos = new ArrayList<>(); - for (FileStatus status : fs.globStatus(new Path(notebookDir, "*/note.json"))) { - NoteInfo noteInfo = new NoteInfo(status.getPath().getParent().getName(), "", null); - noteInfos.add(noteInfo); - } - return noteInfos; - } - }); + List notePaths = fs.list(new Path(notebookDir, "*/note.json")); + List noteInfos = new ArrayList<>(); + for (Path path : notePaths) { + NoteInfo noteInfo = new NoteInfo(path.getParent().getName(), "", null); + noteInfos.add(noteInfo); + } + return noteInfos; } @Override public Note get(final String noteId, AuthenticationInfo subject) throws IOException { - return callHdfsOperation(new HdfsOperation() { - @Override - public Note call() throws IOException { - Path notePath = new Path(notebookDir.toString() + "/" + noteId + "/note.json"); - LOGGER.debug("Read note from file: " + notePath); - ByteArrayOutputStream noteBytes = new ByteArrayOutputStream(); - IOUtils.copyBytes(fs.open(notePath), noteBytes, hadoopConf); - return Note.fromJson(new String(noteBytes.toString( - zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)))); - } - }); + String content = this.fs.readFile( + new Path(notebookDir.toString() + "/" + noteId + "/note.json")); + return Note.fromJson(content); } @Override public void save(final Note note, AuthenticationInfo subject) throws IOException { - callHdfsOperation(new HdfsOperation() { - @Override - public Void call() throws IOException { - Path notePath = new Path(notebookDir.toString() + "/" + note.getId() + "/note.json"); - Path tmpNotePath = new Path(notebookDir.toString() + "/" + note.getId() + "/.note.json"); - LOGGER.debug("Saving note to file: " + notePath); - if (fs.exists(tmpNotePath)) { - fs.delete(tmpNotePath, true); - } - InputStream in = new ByteArrayInputStream(note.toJson().getBytes( - zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING))); - IOUtils.copyBytes(in, fs.create(tmpNotePath), hadoopConf); - fs.delete(notePath, true); - fs.rename(tmpNotePath, notePath); - return null; - } - }); + this.fs.writeFile(note.toJson(), + new Path(notebookDir.toString() + "/" + note.getId() + "/note.json"), + true); } @Override public void remove(final String noteId, AuthenticationInfo subject) throws IOException { - callHdfsOperation(new HdfsOperation() { - @Override - public Void call() throws IOException { - Path noteFolder = new Path(notebookDir.toString() + "/" + noteId); - fs.delete(noteFolder, true); - return null; - } - }); + this.fs.delete(new Path(notebookDir.toString() + "/" + noteId)); } @Override @@ -182,26 +120,4 @@ public List getSettings(AuthenticationInfo subject) { public void updateSettings(Map settings, AuthenticationInfo subject) { LOGGER.warn("updateSettings is not implemented for HdfsNotebookRepo"); } - - private interface HdfsOperation { - T call() throws IOException; - } - - public synchronized T callHdfsOperation(final HdfsOperation func) throws IOException { - if (isSecurityEnabled) { - UserGroupInformation.getLoginUser().reloginFromKeytab(); - try { - return UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction() { - @Override - public T run() throws Exception { - return func.call(); - } - }); - } catch (InterruptedException e) { - throw new IOException(e); - } - } else { - return func.call(); - } - } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/ReflectionUtils.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/ReflectionUtils.java new file mode 100644 index 00000000000..ca09992a7d8 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/ReflectionUtils.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.util; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + + +/** + * Utility class for creating instances via java reflection. + * + */ +public class ReflectionUtils { + + public static Class getClazz(String className) throws IOException { + Class clazz = null; + try { + clazz = Class.forName(className, true, Thread.currentThread().getContextClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException("Unable to load class: " + className, e); + } + + return clazz; + } + + private static T getNewInstance(Class clazz) throws IOException { + T instance; + try { + instance = clazz.newInstance(); + } catch (InstantiationException e) { + throw new IOException( + "Unable to instantiate class with 0 arguments: " + clazz.getName(), e); + } catch (IllegalAccessException e) { + throw new IOException( + "Unable to instantiate class with 0 arguments: " + clazz.getName(), e); + } + return instance; + } + + private static T getNewInstance(Class clazz, + Class[] parameterTypes, + Object[] parameters) + throws IOException { + T instance; + try { + Constructor constructor = clazz.getConstructor(parameterTypes); + instance = constructor.newInstance(parameters); + } catch (InstantiationException e) { + throw new IOException( + "Unable to instantiate class with " + parameters.length + " arguments: " + + clazz.getName(), e); + } catch (IllegalAccessException e) { + throw new IOException( + "Unable to instantiate class with " + parameters.length + " arguments: " + + clazz.getName(), e); + } catch (NoSuchMethodException e) { + throw new IOException( + "Unable to instantiate class with " + parameters.length + " arguments: " + + clazz.getName(), e); + } catch (InvocationTargetException e) { + throw new IOException( + "Unable to instantiate class with " + parameters.length + " arguments: " + + clazz.getName(), e); + } + return instance; + } + + public static T createClazzInstance(String className) throws IOException { + Class clazz = getClazz(className); + @SuppressWarnings("unchecked") + T instance = (T) getNewInstance(clazz); + return instance; + } + + public static T createClazzInstance(String className, + Class[] parameterTypes, + Object[] parameters) throws IOException { + Class clazz = getClazz(className); + T instance = (T) getNewInstance(clazz, parameterTypes, parameters); + return instance; + } + + +} diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java index 9df402d35e8..16c8c1d8cec 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java @@ -33,7 +33,7 @@ public abstract class AbstractInterpreterTest { protected File interpreterDir; protected File confDir; protected File notebookDir; - protected ZeppelinConfiguration conf = new ZeppelinConfiguration(); + protected ZeppelinConfiguration conf; @Before public void setUp() throws Exception { diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java index 0c7f4baacfb..f7988e35701 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java @@ -22,6 +22,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; import org.junit.Test; +import java.io.IOException; import java.util.Properties; import static org.junit.Assert.assertEquals; @@ -30,14 +31,14 @@ public class ShellScriptLauncherTest { @Test - public void testLauncher() { + public void testLauncher() throws IOException { ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - ShellScriptLauncher launcher = new ShellScriptLauncher(zConf); + ShellScriptLauncher launcher = new ShellScriptLauncher(zConf, null); Properties properties = new Properties(); properties.setProperty("ENV_1", "VALUE_1"); properties.setProperty("property_1", "value_1"); InterpreterOption option = new InterpreterOption(); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "groupName", "name"); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "groupName", "name"); InterpreterClient client = launcher.launch(context); assertTrue( client instanceof RemoteInterpreterManagedProcess); RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index b788ebdeeea..3d7e251b079 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -22,6 +22,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; import org.junit.Test; +import java.io.IOException; import java.util.Properties; import static org.junit.Assert.assertEquals; @@ -30,9 +31,9 @@ public class SparkInterpreterLauncherTest { @Test - public void testLocalMode() { + public void testLocalMode() throws IOException { ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); properties.setProperty("SPARK_HOME", "/user/spark"); properties.setProperty("property_1", "value_1"); @@ -41,7 +42,7 @@ public void testLocalMode() { properties.setProperty("spark.jars", "jar_1"); InterpreterOption option = new InterpreterOption(); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark"); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark"); InterpreterClient client = launcher.launch(context); assertTrue( client instanceof RemoteInterpreterManagedProcess); RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; @@ -55,9 +56,9 @@ public void testLocalMode() { } @Test - public void testYarnClientMode_1() { + public void testYarnClientMode_1() throws IOException { ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); properties.setProperty("SPARK_HOME", "/user/spark"); properties.setProperty("property_1", "value_1"); @@ -66,7 +67,7 @@ public void testYarnClientMode_1() { properties.setProperty("spark.jars", "jar_1"); InterpreterOption option = new InterpreterOption(); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark"); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark"); InterpreterClient client = launcher.launch(context); assertTrue( client instanceof RemoteInterpreterManagedProcess); RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; @@ -80,9 +81,9 @@ public void testYarnClientMode_1() { } @Test - public void testYarnClientMode_2() { + public void testYarnClientMode_2() throws IOException { ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); properties.setProperty("SPARK_HOME", "/user/spark"); properties.setProperty("property_1", "value_1"); @@ -92,7 +93,7 @@ public void testYarnClientMode_2() { properties.setProperty("spark.jars", "jar_1"); InterpreterOption option = new InterpreterOption(); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark"); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark"); InterpreterClient client = launcher.launch(context); assertTrue( client instanceof RemoteInterpreterManagedProcess); RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; @@ -106,9 +107,9 @@ public void testYarnClientMode_2() { } @Test - public void testYarnClusterMode_1() { + public void testYarnClusterMode_1() throws IOException { ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); properties.setProperty("SPARK_HOME", "/user/spark"); properties.setProperty("property_1", "value_1"); @@ -117,7 +118,7 @@ public void testYarnClusterMode_1() { properties.setProperty("spark.jars", "jar_1"); InterpreterOption option = new InterpreterOption(); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark"); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark"); InterpreterClient client = launcher.launch(context); assertTrue( client instanceof RemoteInterpreterManagedProcess); RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; @@ -132,9 +133,9 @@ public void testYarnClusterMode_1() { } @Test - public void testYarnClusterMode_2() { + public void testYarnClusterMode_2() throws IOException { ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); Properties properties = new Properties(); properties.setProperty("SPARK_HOME", "/user/spark"); properties.setProperty("property_1", "value_1"); @@ -144,7 +145,7 @@ public void testYarnClusterMode_2() { properties.setProperty("spark.jars", "jar_1"); InterpreterOption option = new InterpreterOption(); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark"); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark"); InterpreterClient client = launcher.launch(context); assertTrue( client instanceof RemoteInterpreterManagedProcess); RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java new file mode 100644 index 00000000000..cf1899c13e5 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java @@ -0,0 +1,92 @@ +package org.apache.zeppelin.interpreter.recovery; + +import com.google.common.io.Files; +import org.apache.commons.io.FileUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.AbstractInterpreterTest; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; + +public class FileSystemRecoveryStorageTest extends AbstractInterpreterTest { + + private File recoveryDir = null; + + @Before + public void setUp() throws Exception { + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(), + FileSystemRecoveryStorage.class.getName()); + recoveryDir = Files.createTempDir(); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_DIR.getVarName(), recoveryDir.getAbsolutePath()); + super.setUp(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + FileUtils.deleteDirectory(recoveryDir); + } + + @Test + public void testSingleInterpreterProcess() throws InterpreterException, IOException { + InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test"); + interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED); + + Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1"); + RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1; + InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap(), new GUI(), + new GUI(), null, null, new ArrayList(), null); + remoteInterpreter1.interpret("hello", context1); + + assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size()); + + interpreterSetting.close(); + assertEquals(0, interpreterSettingManager.getRecoveryStorage().restore().size()); + } + + @Test + public void testMultipleInterpreterProcess() throws InterpreterException, IOException { + InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test"); + interpreterSetting.getOption().setPerUser(InterpreterOption.ISOLATED); + + Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1"); + RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1; + InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap(), new GUI(), + new GUI(), null, null, new ArrayList(), null); + remoteInterpreter1.interpret("hello", context1); + assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size()); + + Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note2"); + RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2; + InterpreterContext context2 = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap(), new GUI(), + new GUI(), null, null, new ArrayList(), null); + remoteInterpreter2.interpret("hello", context2); + + assertEquals(2, interpreterSettingManager.getRecoveryStorage().restore().size()); + + interpreterSettingManager.restart(interpreterSetting.getId(), "note1", "user1"); + assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size()); + + interpreterSetting.close(); + assertEquals(0, interpreterSettingManager.getRecoveryStorage().restore().size()); + } + +}