From a482faa3931c361661834a498baadcf87dd6784d Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Thu, 4 Jun 2020 15:36:01 +0800 Subject: [PATCH 1/3] [ZEPPELIN-4852]. Add name to RemoteInterpreterProcess --- .../interpreter/launcher/InterpreterClient.java | 2 ++ .../launcher/StandardInterpreterLauncher.java | 1 + .../recovery/FileSystemRecoveryStorage.java | 2 +- .../remote/RemoteInterpreterManagedProcess.java | 8 ++++---- .../remote/RemoteInterpreterRunningProcess.java | 14 +++++++++++++- 5 files changed, 21 insertions(+), 6 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java index bfd3e446a2f..73c8ef0d242 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java @@ -26,6 +26,8 @@ */ public interface InterpreterClient { + String getInterpreterGroupId(); + String getInterpreterSettingName(); void start(String userName) throws IOException; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java index d78cb2cb6db..ff60b39a0dc 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java @@ -58,6 +58,7 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep if (option.isExistingProcess()) { return new RemoteInterpreterRunningProcess( context.getInterpreterSettingName(), + context.getInterpreterGroupId(), connectTimeout, option.getHost(), option.getPort()); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java index bef2c8fd2a6..1b660ac24aa 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java @@ -110,7 +110,7 @@ public Map restore() throws IOException { int connectTimeout = zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess( - interpreterSettingName, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1])); + interpreterSettingName, groupId, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1])); // interpreterSettingManager may be null when this class is used when it is used // stop-interpreter.sh clients.put(groupId, client); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index 69d82b6808c..b7a0b2ef638 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -141,7 +141,7 @@ public void start(String userName) throws IOException { public void stop() { if (isRunning()) { - LOGGER.info("Kill interpreter process"); + LOGGER.info("Kill interpreter process for interpreter group: " + getInterpreterGroupId()); try { callRemoteFunction(new RemoteFunction() { @Override @@ -157,10 +157,9 @@ public Void call(RemoteInterpreterService.Client client) throws Exception { // Shutdown connection shutdown(); this.interpreterProcessLauncher.stop(); + this.interpreterProcessLauncher = null; + LOGGER.info("Remote process of interpreter group: " + getInterpreterGroupId() + " is terminated"); } - - interpreterProcessLauncher = null; - LOGGER.info("Remote process terminated"); } @Override @@ -196,6 +195,7 @@ public String getInterpreterSettingName() { return interpreterSettingName; } + @Override public String getInterpreterGroupId() { return interpreterGroupId; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java index c2efcf4af95..7366a9bf72a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java @@ -29,15 +29,18 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { private final String host; private final int port; private final String interpreterSettingName; + private final String interpreterGroupId; public RemoteInterpreterRunningProcess( String interpreterSettingName, + String interpreterGroupId, int connectTimeout, String host, int port ) { super(connectTimeout); this.interpreterSettingName = interpreterSettingName; + this.interpreterGroupId = interpreterGroupId; this.host = host; this.port = port; } @@ -57,6 +60,11 @@ public String getInterpreterSettingName() { return interpreterSettingName; } + @Override + public String getInterpreterGroupId() { + return interpreterGroupId; + } + @Override public void start(String userName) { // assume process is externally managed. nothing to do @@ -68,7 +76,7 @@ public void stop() { // when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that. if (System.getenv("ZEPPELIN_FORCE_STOP") != null) { if (isRunning()) { - logger.info("Kill interpreter process"); + logger.info("Kill interpreter process of interpreter group: " + interpreterGroupId); try { callRemoteFunction(new RemoteFunction() { @Override @@ -80,6 +88,10 @@ public Void call(RemoteInterpreterService.Client client) throws Exception { } catch (Exception e) { logger.warn("ignore the exception when shutting down interpreter process.", e); } + + // Shutdown connection + shutdown(); + logger.info("Remote process of interpreter group: " + getInterpreterGroupId() + " is terminated"); } } } From 4fc92cc60e05c488c4daa41427d2bd4532a1b345 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Thu, 4 Jun 2020 16:06:22 +0800 Subject: [PATCH 2/3] address comment --- .../launcher/ClusterInterpreterLauncher.java | 2 ++ .../interpreter/launcher/DockerInterpreterProcess.java | 5 +++++ .../launcher/K8sRemoteInterpreterProcess.java | 5 +++++ .../remote/RemoteInterpreterRunningProcess.java | 10 +++++----- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java index b406ec3afd9..ff6d69a8fa8 100644 --- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java @@ -78,6 +78,7 @@ public InterpreterClient online(HashMap result) { return new RemoteInterpreterRunningProcess( context.getInterpreterSettingName(), + context.getInterpreterGroupId(), connectTimeout, intpTserverHost, intpTserverPort); @@ -149,6 +150,7 @@ public InterpreterClient online(HashMap result) { return new RemoteInterpreterRunningProcess( context.getInterpreterSettingName(), + context.getInterpreterGroupId(), connectTimeout, intpTserverHost, intpTserverPort); diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java index 2d648984ff9..23e426249ea 100644 --- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java +++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java @@ -154,6 +154,11 @@ public DockerInterpreterProcess( DOCKER_HOST = (dockerHost == null) ? defDockerHost : dockerHost; } + @Override + public String getInterpreterGroupId() { + return interpreterGroupId; + } + @Override public String getInterpreterSettingName() { return interpreterSettingName; diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java index 864f660bb32..120500b4213 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java @@ -94,6 +94,11 @@ String getPodName() { return podName; } + @Override + public String getInterpreterGroupId() { + return interpreterGroupId; + } + @Override public String getInterpreterSettingName() { return interpreterSettingName; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java index 7366a9bf72a..e5f2a04bf32 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java @@ -16,7 +16,6 @@ */ package org.apache.zeppelin.interpreter.remote; -import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,7 +24,8 @@ * This class connects to existing process */ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { - private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); + private final String host; private final int port; private final String interpreterSettingName; @@ -76,7 +76,7 @@ public void stop() { // when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that. if (System.getenv("ZEPPELIN_FORCE_STOP") != null) { if (isRunning()) { - logger.info("Kill interpreter process of interpreter group: " + interpreterGroupId); + LOGGER.info("Kill interpreter process of interpreter group: " + interpreterGroupId); try { callRemoteFunction(new RemoteFunction() { @Override @@ -86,12 +86,12 @@ public Void call(RemoteInterpreterService.Client client) throws Exception { } }); } catch (Exception e) { - logger.warn("ignore the exception when shutting down interpreter process.", e); + LOGGER.warn("ignore the exception when shutting down interpreter process.", e); } // Shutdown connection shutdown(); - logger.info("Remote process of interpreter group: " + getInterpreterGroupId() + " is terminated"); + LOGGER.info("Remote process of interpreter group: " + getInterpreterGroupId() + " is terminated"); } } } From 01ec89373747748d2a4c5217bd20217c2e010642 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Thu, 4 Jun 2020 23:26:46 +0800 Subject: [PATCH 3/3] address comment --- .../interpreter/remote/RemoteInterpreterManagedProcess.java | 4 ++-- .../interpreter/remote/RemoteInterpreterRunningProcess.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index b7a0b2ef638..84cab149a2e 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -141,7 +141,7 @@ public void start(String userName) throws IOException { public void stop() { if (isRunning()) { - LOGGER.info("Kill interpreter process for interpreter group: " + getInterpreterGroupId()); + LOGGER.info("Kill interpreter process for interpreter group: {}", getInterpreterGroupId()); try { callRemoteFunction(new RemoteFunction() { @Override @@ -158,7 +158,7 @@ public Void call(RemoteInterpreterService.Client client) throws Exception { shutdown(); this.interpreterProcessLauncher.stop(); this.interpreterProcessLauncher = null; - LOGGER.info("Remote process of interpreter group: " + getInterpreterGroupId() + " is terminated"); + LOGGER.info("Remote process of interpreter group: {} is terminated", getInterpreterGroupId()); } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java index e5f2a04bf32..d78bfcaeff8 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java @@ -76,7 +76,7 @@ public void stop() { // when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that. if (System.getenv("ZEPPELIN_FORCE_STOP") != null) { if (isRunning()) { - LOGGER.info("Kill interpreter process of interpreter group: " + interpreterGroupId); + LOGGER.info("Kill interpreter process of interpreter group: {}", interpreterGroupId); try { callRemoteFunction(new RemoteFunction() { @Override @@ -91,7 +91,7 @@ public Void call(RemoteInterpreterService.Client client) throws Exception { // Shutdown connection shutdown(); - LOGGER.info("Remote process of interpreter group: " + getInterpreterGroupId() + " is terminated"); + LOGGER.info("Remote process of interpreter group: {} is terminated.", getInterpreterGroupId()); } } }