Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/interpreter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUB
fi

eval $INTERPRETER_RUN_COMMAND &

pid=$!

if [[ -z "${pid}" ]]; then
exit 1;
else
Expand Down
47 changes: 47 additions & 0 deletions bin/stop-interpreter.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Stop Zeppelin Interpreter Processes
#

bin=$(dirname "${BASH_SOURCE-$0}")
bin=$(cd "${bin}">/dev/null; pwd)

. "${bin}/common.sh"

export ZEPPELIN_FORCE_STOP=1

ZEPPELIN_STOP_INTERPRETER_MAIN=org.apache.zeppelin.interpreter.recovery.StopInterpreter
ZEPPELIN_LOGFILE="${ZEPPELIN_LOG_DIR}/stop-interpreter.log"
JAVA_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}"

if [[ -d "${ZEPPELIN_HOME}/zeppelin-zengine/target/classes" ]]; then
ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-zengine/target/classes"
fi

if [[ -d "${ZEPPELIN_HOME}/zeppelin-interpreter/target/classes" ]]; then
ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-interpreter/target/classes"
fi

addJarInDir "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"
addJarInDir "${ZEPPELIN_HOME}/zeppelin-server/target/lib"
addJarInDir "${ZEPPELIN_HOME}/lib"
addJarInDir "${ZEPPELIN_HOME}/lib/interpreter"

CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
$ZEPPELIN_RUNNER $JAVA_OPTS -cp $CLASSPATH $ZEPPELIN_STOP_INTERPRETER_MAIN ${@}
12 changes: 0 additions & 12 deletions bin/zeppelin-daemon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -217,18 +217,6 @@ function stop() {
action_msg "${ZEPPELIN_NAME} stop" "${SET_OK}"
fi
fi

# list all pid that used in remote interpreter and kill them
for f in ${ZEPPELIN_PID_DIR}/*.pid; do
if [[ ! -f ${f} ]]; then
continue;
fi

pid=$(cat ${f})
wait_for_zeppelin_to_die $pid 20
$(rm -f ${f})
done

}

function find_zeppelin_process() {
Expand Down
41 changes: 41 additions & 0 deletions conf/zeppelin-site.xml.template
Original file line number Diff line number Diff line change
Expand Up @@ -480,4 +480,45 @@
<value>10000:10010</value>
</property>
-->

<!--
<property>
<name>zeppelin.interpreter.lifecyclemanager.class</name>
<value>org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager</value>
</property>
-->

<!--
<property>
<name>zeppelin.interpreter.lifecyclemanager.timeout.checkinterval</name>
<value>6000</value>
<description>Check interval of interpreter expiration in seconds</description>
</property>
-->

<!--
<property>
<name>zeppelin.interpreter.lifecyclemanager.timeout.threshold</name>
<value>3600000</value>
<description>Threshold of interpreter idle time in seconds, interpeter exceed this threshold will be killed</description>
</property>
-->

<!--
<property>
<name>zeppelin.recovery.storage.class</name>
<value>org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage</value>
<description>ReoveryStorage implementation</description>
</property>
-->

<!--
<property>
<name>zeppelin.recovery.dir</name>
<value>recovery</value>
<description>Location where recovery metadata is stored</description>
</property>
-->


</configuration>
8 changes: 8 additions & 0 deletions docs/usage/interpreter/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,11 @@ So users needs to understand the ([interpreter mode setting ](../usage/interpret
In this scenario, user need to put `ConfInterpreter` as the first paragraph as the below example. Otherwise the customized setting can not be applied (Actually it would report ERROR)
<img src="{{BASE_PATH}}/assets/themes/zeppelin/img/screenshots/conf_interpreter.png" width="500px">


## Interpreter Process Recovery

Before 0.8.0, shutting down Zeppelin also mean to shutdown all the running interpreter processes. Usually admin will shutdown Zeppelin server for maintenance or upgrade, but don't want to shut down the running interpreter processes.
In such cases, interpreter process recovery is necessary. Starting from 0.8.0, user can enable interpreter process recovering via setting `zeppelin.recovery.storage.class` as
`org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage` or other implementations if available in future, by default it is `org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage`
which means recovery is not enabled. Enable recover means shutting down Zeppelin would not terminating interpreter process,
and when Zeppelin is restarted, it would try to reconnect to the existing running interpreter processes. If you want to kill all the interpreter processes after terminating Zeppelin even when recovery is enabled, you can run `bin/stop-interpreter.sh`
2 changes: 1 addition & 1 deletion spark/src/main/resources/interpreter-setting.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
"description": "Spark master uri. ex) spark://masterhost:7077",
"type": "string"
},
"zeppelin.spark.unSupportedVersionCheck": {
"zeppelin.spark.enableSupportedVersionCheck": {
"envName": null,
"propertyName": "zeppelin.spark.enableSupportedVersionCheck",
"defaultValue": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,19 @@ public String getNotebookDir() {
return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR);
}

public String getRecoveryDir() {
return getRelativeDir(ConfVars.ZEPPELIN_RECOVERY_DIR);
}

public String getRecoveryStorageClass() {
return getString(ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS);
}

public boolean isRecoveryEnabled() {
return !getString(ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS).equals(
"org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage");
}

public String getUser() {
return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_USER);
}
Expand Down Expand Up @@ -658,6 +671,10 @@ public static enum ConfVars {
ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
ZEPPELIN_RECOVERY_DIR("zeppelin.recovery.dir", "recovery"),
ZEPPELIN_RECOVERY_STORAGE_CLASS("zeppelin.recovery.storage.class",
"org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage"),

// use specified notebook (id) as homescreen
ZEPPELIN_NOTEBOOK_HOMESCREEN("zeppelin.notebook.homescreen", null),
// whether homescreen notebook will be hidden from notebook list or not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,20 @@

/**
* Interface to InterpreterClient which is created by InterpreterLauncher. This is the component
* that is used to for the communication fromzeppelin-server process to zeppelin interpreter process
* that is used to for the communication from zeppelin-server process to zeppelin interpreter
* process.
*/
public interface InterpreterClient {

String getInterpreterSettingName();

void start(String userName, Boolean isUserImpersonate);

void stop();

String getHost();

int getPort();

boolean isRunning();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,22 @@ public class InterpreterLaunchContext {
private Properties properties;
private InterpreterOption option;
private InterpreterRunner runner;
private String interpreterGroupId;
private String interpreterSettingId;
private String interpreterSettingGroup;
private String interpreterSettingName;

public InterpreterLaunchContext(Properties properties,
InterpreterOption option,
InterpreterRunner runner,
String interpreterGroupId,
String interpreterSettingId,
String interpreterSettingGroup,
String interpreterSettingName) {
this.properties = properties;
this.option = option;
this.runner = runner;
this.interpreterGroupId = interpreterGroupId;
this.interpreterSettingId = interpreterSettingId;
this.interpreterSettingGroup = interpreterSettingGroup;
this.interpreterSettingName = interpreterSettingName;
Expand All @@ -60,6 +63,10 @@ public InterpreterRunner getRunner() {
return runner;
}

public String getInterpreterGroupId() {
return interpreterGroupId;
}

public String getInterpreterSettingId() {
return interpreterSettingId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.zeppelin.interpreter.launcher;

import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;

import java.io.IOException;
import java.util.Properties;
Expand All @@ -29,9 +30,11 @@ public abstract class InterpreterLauncher {

protected ZeppelinConfiguration zConf;
protected Properties properties;
protected RecoveryStorage recoveryStorage;

public InterpreterLauncher(ZeppelinConfiguration zConf) {
public InterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
this.zConf = zConf;
this.recoveryStorage = recoveryStorage;
}

public abstract InterpreterClient launch(InterpreterLaunchContext context) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.interpreter.recovery;

import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;

import java.io.IOException;
import java.util.Map;


/**
* Interface for storing interpreter process recovery metadata.
*
*/
public abstract class RecoveryStorage {

protected ZeppelinConfiguration zConf;
protected Map<String, InterpreterClient> restoredClients;

public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException {
this.zConf = zConf;
}

/**
* Update RecoveryStorage when new InterpreterClient is started
* @param client
* @throws IOException
*/
public abstract void onInterpreterClientStart(InterpreterClient client) throws IOException;

/**
* Update RecoveryStorage when InterpreterClient is stopped
* @param client
* @throws IOException
*/
public abstract void onInterpreterClientStop(InterpreterClient client) throws IOException;

/**
*
* It is only called when Zeppelin Server is started.
*
* @return
* @throws IOException
*/
public abstract Map<String, InterpreterClient> restore() throws IOException;


/**
* It is called after constructor
*
* @throws IOException
*/
public void init() throws IOException {
this.restoredClients = restore();
}

public InterpreterClient getInterpreterClient(String interpreterGroupId) {
if (restoredClients.containsKey(interpreterGroupId)) {
return restoredClients.get(interpreterGroupId);
} else {
return null;
}
}
}
Binary file added zeppelin-server/notebook/.python.recovery.crc
Binary file not shown.
1 change: 1 addition & 0 deletions zeppelin-server/notebook/python.recovery
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2CZA1DVUG:shared_process 192.168.3.2:55410
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be in the commit? and recovery.crc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will delete it

15 changes: 15 additions & 0 deletions zeppelin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,21 @@
</configuration>
</plugin>

<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${plugin.surefire.version}</version>
<configuration combine.children="append">
<argLine>-Xmx2g -Xms1g -Dfile.encoding=UTF-8</argLine>
<excludes>
<exclude>${tests.to.exclude}</exclude>
</excludes>
<environmentVariables>
<ZEPPELIN_FORCE_STOP>1</ZEPPELIN_FORCE_STOP>
</environmentVariables>
</configuration>
</plugin>


<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public ZeppelinServer() throws Exception {

public static void main(String[] args) throws InterruptedException {

ZeppelinConfiguration conf = ZeppelinConfiguration.create();
final ZeppelinConfiguration conf = ZeppelinConfiguration.create();
conf.setProperty("args", args);

jettyWebServer = setupJettyServer(conf);
Expand Down Expand Up @@ -199,7 +199,9 @@ public static void main(String[] args) throws InterruptedException {
LOG.info("Shutting down Zeppelin Server ... ");
try {
jettyWebServer.stop();
notebook.getInterpreterSettingManager().close();
if (!conf.isRecoveryEnabled()) {
ZeppelinServer.notebook.getInterpreterSettingManager().close();
}
notebook.close();
Thread.sleep(3000);
} catch (Exception e) {
Expand All @@ -222,7 +224,9 @@ public static void main(String[] args) throws InterruptedException {
}

jettyWebServer.join();
ZeppelinServer.notebook.getInterpreterSettingManager().close();
if (!conf.isRecoveryEnabled()) {
ZeppelinServer.notebook.getInterpreterSettingManager().close();
}
}

private static Server setupJettyServer(ZeppelinConfiguration conf) {
Expand Down
Loading