Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ protected void refresh(InterpreterContext context) throws Exception {
String output = buildResult();
context.out.write(output);
context.out.flush();
// should checkpoint the html output, otherwise frontend won't display the output
// after recovering.
context.getIntpEventClient().checkpointOutput(context.getNoteId(), context.getParagraphId());
isFirstRefresh = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
}

def close(): Unit = {
LOGGER.info("Closing FlinkScalaInterpreter")
if (properties.getProperty("flink.interpreter.close.shutdown_cluster", "true").toBoolean) {
if (cluster != null) {
cluster match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Interpreter context
*/
public class InterpreterContext {
private static final ThreadLocal<InterpreterContext> threadIC = new ThreadLocal<>();
private static final ConcurrentHashMap<Thread, InterpreterContext> allContexts = new ConcurrentHashMap();

public InterpreterOutput out;

Expand All @@ -40,10 +42,16 @@ public static InterpreterContext get() {

public static void set(InterpreterContext ic) {
threadIC.set(ic);
allContexts.put(Thread.currentThread(), ic);
}

public static void remove() {
threadIC.remove();
allContexts.remove(Thread.currentThread());
}

public static ConcurrentHashMap<Thread, InterpreterContext> getAllContexts() {
return allContexts;
}

private String noteId;
Expand Down Expand Up @@ -241,10 +249,18 @@ public AngularObjectRegistry getAngularObjectRegistry() {
return angularObjectRegistry;
}

public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) {
this.angularObjectRegistry = angularObjectRegistry;
}

public ResourcePool getResourcePool() {
return resourcePool;
}

public void setResourcePool(ResourcePool resourcePool) {
this.resourcePool = resourcePool;
}

public String getInterpreterClassName() {
return interpreterClassName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,13 @@ public class InterpreterOutput extends OutputStream {
public InterpreterOutput(InterpreterOutputListener flushListener) {
this.flushListener = flushListener;
changeListener = null;
clear();
}

public InterpreterOutput(InterpreterOutputListener flushListener,
InterpreterOutputChangeListener listener)
throws IOException {
this.flushListener = flushListener;
this.changeListener = listener;
clear();
}

public void setType(InterpreterResult.Type type) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,59 @@

/**
* Interface to InterpreterClient which is created by InterpreterLauncher. This is the component
* that is used to for the communication from zeppelin-server process to zeppelin interpreter
* process.
* that is used for the communication from zeppelin-server process to zeppelin interpreter
* process and also manage the lifecycle of interpreter process.
*/
public interface InterpreterClient {

/**
* InterpreterGroupId that is associated with this interpreter process.
*
* @return
*/
String getInterpreterGroupId();

/**
* InterpreterSetting name of this interpreter process.
*
* @return
*/
String getInterpreterSettingName();

/**
* Start interpreter process.
*
* @param userName
* @throws IOException
*/
void start(String userName) throws IOException;

/**
* Stop interpreter process.
*
*/
void stop();

/**
* Host name of interpreter process thrift server
*
* @return
*/
String getHost();

/**
* Port of interpreter process thrift server
*
* @return
*/
int getPort();

boolean isRunning();

/**
* Return true if recovering successfully, otherwise return false.
*
* @return
*/
boolean recover();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class InterpreterLaunchContext {
private String interpreterSettingId;
private String interpreterSettingGroup;
private String interpreterSettingName;
private int zeppelinServerRPCPort;
private String zeppelinServerHost;
private int intpEventServerPort;
private String intpEventServerHost;

public InterpreterLaunchContext(Properties properties,
InterpreterOption option,
Expand All @@ -46,8 +46,8 @@ public InterpreterLaunchContext(Properties properties,
String interpreterSettingId,
String interpreterSettingGroup,
String interpreterSettingName,
int zeppelinServerRPCPort,
String zeppelinServerHost) {
int intpEventServerPort,
String intpEventServerHost) {
this.properties = properties;
this.option = option;
this.runner = runner;
Expand All @@ -56,8 +56,8 @@ public InterpreterLaunchContext(Properties properties,
this.interpreterSettingId = interpreterSettingId;
this.interpreterSettingGroup = interpreterSettingGroup;
this.interpreterSettingName = interpreterSettingName;
this.zeppelinServerRPCPort = zeppelinServerRPCPort;
this.zeppelinServerHost = zeppelinServerHost;
this.intpEventServerPort = intpEventServerPort;
this.intpEventServerHost = intpEventServerHost;
}

public Properties getProperties() {
Expand Down Expand Up @@ -92,11 +92,11 @@ public String getUserName() {
return userName;
}

public int getZeppelinServerRPCPort() {
return zeppelinServerRPCPort;
public int getIntpEventServerPort() {
return intpEventServerPort;
}

public String getZeppelinServerHost() {
return zeppelinServerHost;
public String getIntpEventServerHost() {
return intpEventServerHost;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package org.apache.zeppelin.interpreter.launcher;

import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterRunner;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Properties;
Expand All @@ -28,6 +32,7 @@
*/
public abstract class InterpreterLauncher {

private static Logger LOGGER = LoggerFactory.getLogger(InterpreterLauncher.class);
private static String SPECIAL_CHARACTER="{}()<>&*‘|=?;[]$–#~!.\"%/\\:+,`";

protected ZeppelinConfiguration zConf;
Expand All @@ -43,6 +48,11 @@ public void setProperties(Properties props) {
this.properties = props;
}

/**
* The timeout setting in interpreter setting take precedence over
* that in zeppelin-site.xml
* @return
*/
protected int getConnectTimeout() {
int connectTimeout =
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
Expand All @@ -65,5 +75,43 @@ public static String escapeSpecialCharacter(String command) {
return builder.toString();
}

public abstract InterpreterClient launch(InterpreterLaunchContext context) throws IOException;
/**
* Try to recover interpreter process first, then call launchDirectly via sub class implementation.
*
* @param context
* @return
* @throws IOException
*/
public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
// try to recover it first
if (zConf.isRecoveryEnabled()) {
InterpreterClient recoveredClient =
recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
if (recoveredClient != null) {
if (recoveredClient.isRunning()) {
LOGGER.info("Recover interpreter process running at {} of interpreter group: {}",
recoveredClient.getHost() + ":" + recoveredClient.getPort(),
recoveredClient.getInterpreterGroupId());
return recoveredClient;
} else {
recoveryStorage.removeInterpreterClient(context.getInterpreterGroupId());
LOGGER.warn("Unable to recover interpreter process: " + recoveredClient.getHost() + ":"
+ recoveredClient.getPort() + ", as it is already terminated.");
}
}
}

// launch it via sub class implementation without recovering.
return launchDirectly(context);
}

/**
* launch interpreter process directly without recovering.
*
* @param context
* @return
* @throws IOException
*/
public abstract InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,37 @@
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;


/**
* Interface for storing interpreter process recovery metadata.
* Just store mapping between interpreterGroupId to interpreter process host:ip
*
*/
public abstract class RecoveryStorage {

protected ZeppelinConfiguration zConf;
protected Map<String, InterpreterClient> restoredClients;

public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException {
// TODO(zjffdu) The constructor is inconsistent between base class and its implementation.
// The implementation actually use InterpreterSettingManager, the interface should also use it.
public RecoveryStorage(ZeppelinConfiguration zConf) {
this.zConf = zConf;
}

/**
* Update RecoveryStorage when new InterpreterClient is started
*
* @param client
* @throws IOException
*/
public abstract void onInterpreterClientStart(InterpreterClient client) throws IOException;

/**
* Update RecoveryStorage when InterpreterClient is stopped
*
* @param client
* @throws IOException
*/
Expand All @@ -55,7 +61,7 @@ public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException {
*
* It is only called when Zeppelin Server is started.
*
* @return
* @return Map between interpreterGroupId to InterpreterClient
* @throws IOException
*/
public abstract Map<String, InterpreterClient> restore() throws IOException;
Expand All @@ -67,14 +73,33 @@ public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException {
* @throws IOException
*/
public void init() throws IOException {
this.restoredClients = restore();
Map<String, InterpreterClient> restoredClientsInStorage= restore();
this.restoredClients = new HashMap<String, InterpreterClient>();
for (Map.Entry<String, InterpreterClient> entry : restoredClientsInStorage.entrySet()) {
if (entry.getValue().recover()) {
this.restoredClients.put(entry.getKey(), entry.getValue());
} else {
onInterpreterClientStop(entry.getValue());
}
}
}

/**
* Get InterpreterClient that is associated with this interpreterGroupId, return null when there's
* no such InterpreterClient.
*
* @param interpreterGroupId
* @return InterpreterClient
*/
public InterpreterClient getInterpreterClient(String interpreterGroupId) {
if (restoredClients.containsKey(interpreterGroupId)) {
return restoredClients.get(interpreterGroupId);
} else {
return null;
}
}

public void removeInterpreterClient(String interpreterGroupId) {
this.restoredClients.remove(interpreterGroupId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.zeppelin.interpreter.thrift.OutputUpdateAllEvent;
import org.apache.zeppelin.interpreter.thrift.OutputUpdateEvent;
import org.apache.zeppelin.interpreter.thrift.ParagraphInfo;
import org.apache.zeppelin.interpreter.thrift.RegisterInfo;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventService;
import org.apache.zeppelin.interpreter.thrift.RunParagraphsEvent;
import org.apache.zeppelin.interpreter.thrift.ServiceException;
Expand Down Expand Up @@ -62,9 +63,9 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
private PooledRemoteClient<RemoteInterpreterEventService.Client> remoteClient;
private String intpGroupId;

public RemoteInterpreterEventClient(String host, int port) {
public RemoteInterpreterEventClient(String intpEventHost, int intpEventPort) {
this.remoteClient = new PooledRemoteClient<>(() -> {
TSocket transport = new TSocket(host, port);
TSocket transport = new TSocket(intpEventHost, intpEventPort);
try {
transport.open();
} catch (TTransportException e) {
Expand All @@ -83,6 +84,13 @@ public void setIntpGroupId(String intpGroupId) {
this.intpGroupId = intpGroupId;
}

public void registerInterpreterProcess(RegisterInfo registerInfo) {
callRemoteFunction(client -> {
client.registerInterpreterProcess(registerInfo);
return null;
});
}

/**
* Get all resources except for specific resourcePool
*
Expand Down
Loading