diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java index bfd3e446a2f..73c8ef0d242 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java @@ -26,6 +26,8 @@ */ public interface InterpreterClient { + String getInterpreterGroupId(); + String getInterpreterSettingName(); void start(String userName) throws IOException; diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java index b406ec3afd9..ff6d69a8fa8 100644 --- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java @@ -78,6 +78,7 @@ public InterpreterClient online(HashMap result) { return new RemoteInterpreterRunningProcess( context.getInterpreterSettingName(), + context.getInterpreterGroupId(), connectTimeout, intpTserverHost, intpTserverPort); @@ -149,6 +150,7 @@ public InterpreterClient online(HashMap result) { return new RemoteInterpreterRunningProcess( context.getInterpreterSettingName(), + context.getInterpreterGroupId(), connectTimeout, intpTserverHost, intpTserverPort); diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java index 2d648984ff9..23e426249ea 100644 --- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java +++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java @@ -154,6 +154,11 @@ public DockerInterpreterProcess( DOCKER_HOST = (dockerHost == null) ? defDockerHost : dockerHost; } + @Override + public String getInterpreterGroupId() { + return interpreterGroupId; + } + @Override public String getInterpreterSettingName() { return interpreterSettingName; diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java index 864f660bb32..120500b4213 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java @@ -94,6 +94,11 @@ String getPodName() { return podName; } + @Override + public String getInterpreterGroupId() { + return interpreterGroupId; + } + @Override public String getInterpreterSettingName() { return interpreterSettingName; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java index d78cb2cb6db..ff60b39a0dc 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java @@ -58,6 +58,7 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep if (option.isExistingProcess()) { return new RemoteInterpreterRunningProcess( context.getInterpreterSettingName(), + context.getInterpreterGroupId(), connectTimeout, option.getHost(), option.getPort()); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java index bef2c8fd2a6..1b660ac24aa 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java @@ -110,7 +110,7 @@ public Map restore() throws IOException { int connectTimeout = zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess( - interpreterSettingName, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1])); + interpreterSettingName, groupId, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1])); // interpreterSettingManager may be null when this class is used when it is used // stop-interpreter.sh clients.put(groupId, client); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index 69d82b6808c..84cab149a2e 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -141,7 +141,7 @@ public void start(String userName) throws IOException { public void stop() { if (isRunning()) { - LOGGER.info("Kill interpreter process"); + LOGGER.info("Kill interpreter process for interpreter group: {}", getInterpreterGroupId()); try { callRemoteFunction(new RemoteFunction() { @Override @@ -157,10 +157,9 @@ public Void call(RemoteInterpreterService.Client client) throws Exception { // Shutdown connection shutdown(); this.interpreterProcessLauncher.stop(); + this.interpreterProcessLauncher = null; + LOGGER.info("Remote process of interpreter group: {} is terminated", getInterpreterGroupId()); } - - interpreterProcessLauncher = null; - LOGGER.info("Remote process terminated"); } @Override @@ -196,6 +195,7 @@ public String getInterpreterSettingName() { return interpreterSettingName; } + @Override public String getInterpreterGroupId() { return interpreterGroupId; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java index c2efcf4af95..d78bfcaeff8 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java @@ -16,7 +16,6 @@ */ package org.apache.zeppelin.interpreter.remote; -import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,19 +24,23 @@ * This class connects to existing process */ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { - private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); + private final String host; private final int port; private final String interpreterSettingName; + private final String interpreterGroupId; public RemoteInterpreterRunningProcess( String interpreterSettingName, + String interpreterGroupId, int connectTimeout, String host, int port ) { super(connectTimeout); this.interpreterSettingName = interpreterSettingName; + this.interpreterGroupId = interpreterGroupId; this.host = host; this.port = port; } @@ -57,6 +60,11 @@ public String getInterpreterSettingName() { return interpreterSettingName; } + @Override + public String getInterpreterGroupId() { + return interpreterGroupId; + } + @Override public void start(String userName) { // assume process is externally managed. nothing to do @@ -68,7 +76,7 @@ public void stop() { // when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that. if (System.getenv("ZEPPELIN_FORCE_STOP") != null) { if (isRunning()) { - logger.info("Kill interpreter process"); + LOGGER.info("Kill interpreter process of interpreter group: {}", interpreterGroupId); try { callRemoteFunction(new RemoteFunction() { @Override @@ -78,8 +86,12 @@ public Void call(RemoteInterpreterService.Client client) throws Exception { } }); } catch (Exception e) { - logger.warn("ignore the exception when shutting down interpreter process.", e); + LOGGER.warn("ignore the exception when shutting down interpreter process.", e); } + + // Shutdown connection + shutdown(); + LOGGER.info("Remote process of interpreter group: {} is terminated.", getInterpreterGroupId()); } } }