Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
*/
public interface InterpreterClient {

String getInterpreterGroupId();

String getInterpreterSettingName();

void start(String userName) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public InterpreterClient online(HashMap<String, Object> result) {

return new RemoteInterpreterRunningProcess(
context.getInterpreterSettingName(),
context.getInterpreterGroupId(),
connectTimeout,
intpTserverHost,
intpTserverPort);
Expand Down Expand Up @@ -149,6 +150,7 @@ public InterpreterClient online(HashMap<String, Object> result) {

return new RemoteInterpreterRunningProcess(
context.getInterpreterSettingName(),
context.getInterpreterGroupId(),
connectTimeout,
intpTserverHost,
intpTserverPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ public DockerInterpreterProcess(
DOCKER_HOST = (dockerHost == null) ? defDockerHost : dockerHost;
}

@Override
public String getInterpreterGroupId() {
return interpreterGroupId;
}

@Override
public String getInterpreterSettingName() {
return interpreterSettingName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ String getPodName() {
return podName;
}

@Override
public String getInterpreterGroupId() {
return interpreterGroupId;
}

@Override
public String getInterpreterSettingName() {
return interpreterSettingName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep
if (option.isExistingProcess()) {
return new RemoteInterpreterRunningProcess(
context.getInterpreterSettingName(),
context.getInterpreterGroupId(),
connectTimeout,
option.getHost(),
option.getPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public Map<String, InterpreterClient> restore() throws IOException {
int connectTimeout =
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess(
interpreterSettingName, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1]));
interpreterSettingName, groupId, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1]));
// interpreterSettingManager may be null when this class is used when it is used
// stop-interpreter.sh
clients.put(groupId, client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void start(String userName) throws IOException {

public void stop() {
if (isRunning()) {
LOGGER.info("Kill interpreter process");
LOGGER.info("Kill interpreter process for interpreter group: {}", getInterpreterGroupId());
try {
callRemoteFunction(new RemoteFunction<Void>() {
@Override
Expand All @@ -157,10 +157,9 @@ public Void call(RemoteInterpreterService.Client client) throws Exception {
// Shutdown connection
shutdown();
this.interpreterProcessLauncher.stop();
this.interpreterProcessLauncher = null;
LOGGER.info("Remote process of interpreter group: {} is terminated", getInterpreterGroupId());
}

interpreterProcessLauncher = null;
LOGGER.info("Remote process terminated");
}

@Override
Expand Down Expand Up @@ -196,6 +195,7 @@ public String getInterpreterSettingName() {
return interpreterSettingName;
}

@Override
public String getInterpreterGroupId() {
return interpreterGroupId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.zeppelin.interpreter.remote;

import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -25,19 +24,23 @@
* This class connects to existing process
*/
public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);

private final String host;
private final int port;
private final String interpreterSettingName;
private final String interpreterGroupId;

public RemoteInterpreterRunningProcess(
String interpreterSettingName,
String interpreterGroupId,
int connectTimeout,
String host,
int port
) {
super(connectTimeout);
this.interpreterSettingName = interpreterSettingName;
this.interpreterGroupId = interpreterGroupId;
this.host = host;
this.port = port;
}
Expand All @@ -57,6 +60,11 @@ public String getInterpreterSettingName() {
return interpreterSettingName;
}

@Override
public String getInterpreterGroupId() {
return interpreterGroupId;
}

@Override
public void start(String userName) {
// assume process is externally managed. nothing to do
Expand All @@ -68,7 +76,7 @@ public void stop() {
// when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that.
if (System.getenv("ZEPPELIN_FORCE_STOP") != null) {
if (isRunning()) {
logger.info("Kill interpreter process");
LOGGER.info("Kill interpreter process of interpreter group: {}", interpreterGroupId);
try {
callRemoteFunction(new RemoteFunction<Void>() {
@Override
Expand All @@ -78,8 +86,12 @@ public Void call(RemoteInterpreterService.Client client) throws Exception {
}
});
} catch (Exception e) {
logger.warn("ignore the exception when shutting down interpreter process.", e);
LOGGER.warn("ignore the exception when shutting down interpreter process.", e);
}

// Shutdown connection
shutdown();
LOGGER.info("Remote process of interpreter group: {} is terminated.", getInterpreterGroupId());
}
}
}
Expand Down