diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java index 3c3125f35b3..567b9404ae6 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java @@ -83,6 +83,9 @@ protected void refresh(InterpreterContext context) throws Exception { String output = buildResult(); context.out.write(output); context.out.flush(); + // should checkpoint the html output, otherwise frontend won't display the output + // after recovering. + context.getIntpEventClient().checkpointOutput(context.getNoteId(), context.getParagraphId()); isFirstRefresh = false; } diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 625a8edfad9..7922c991448 100644 --- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -653,6 +653,7 @@ class FlinkScalaInterpreter(val properties: Properties) { } def close(): Unit = { + LOGGER.info("Closing FlinkScalaInterpreter") if (properties.getProperty("flink.interpreter.close.shutdown_cluster", "true").toBoolean) { if (cluster != null) { cluster match { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java index 2b56971a5f2..7a215efca9d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -25,12 +25,14 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * Interpreter context */ public class InterpreterContext { private static final ThreadLocal threadIC = new ThreadLocal<>(); + private static final ConcurrentHashMap allContexts = new ConcurrentHashMap(); public InterpreterOutput out; @@ -40,10 +42,16 @@ public static InterpreterContext get() { public static void set(InterpreterContext ic) { threadIC.set(ic); + allContexts.put(Thread.currentThread(), ic); } public static void remove() { threadIC.remove(); + allContexts.remove(Thread.currentThread()); + } + + public static ConcurrentHashMap getAllContexts() { + return allContexts; } private String noteId; @@ -241,10 +249,18 @@ public AngularObjectRegistry getAngularObjectRegistry() { return angularObjectRegistry; } + public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) { + this.angularObjectRegistry = angularObjectRegistry; + } + public ResourcePool getResourcePool() { return resourcePool; } + public void setResourcePool(ResourcePool resourcePool) { + this.resourcePool = resourcePool; + } + public String getInterpreterClassName() { return interpreterClassName; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java index ef1aafb9998..aaf6eda0f92 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java @@ -59,7 +59,6 @@ public class InterpreterOutput extends OutputStream { public InterpreterOutput(InterpreterOutputListener flushListener) { this.flushListener = flushListener; changeListener = null; - clear(); } public InterpreterOutput(InterpreterOutputListener flushListener, @@ -67,7 +66,6 @@ public InterpreterOutput(InterpreterOutputListener flushListener, throws IOException { this.flushListener = flushListener; this.changeListener = listener; - clear(); } public void setType(InterpreterResult.Type type) throws IOException { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java index 73c8ef0d242..65243c74907 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java @@ -21,22 +21,59 @@ /** * Interface to InterpreterClient which is created by InterpreterLauncher. This is the component - * that is used to for the communication from zeppelin-server process to zeppelin interpreter - * process. + * that is used for the communication from zeppelin-server process to zeppelin interpreter + * process and also manage the lifecycle of interpreter process. */ public interface InterpreterClient { + /** + * InterpreterGroupId that is associated with this interpreter process. + * + * @return + */ String getInterpreterGroupId(); + /** + * InterpreterSetting name of this interpreter process. + * + * @return + */ String getInterpreterSettingName(); + /** + * Start interpreter process. + * + * @param userName + * @throws IOException + */ void start(String userName) throws IOException; + /** + * Stop interpreter process. + * + */ void stop(); + /** + * Host name of interpreter process thrift server + * + * @return + */ String getHost(); + /** + * Port of interpreter process thrift server + * + * @return + */ int getPort(); boolean isRunning(); + + /** + * Return true if recovering successfully, otherwise return false. + * + * @return + */ + boolean recover(); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java index 136d866fa3b..cc90a63ee0c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java @@ -35,8 +35,8 @@ public class InterpreterLaunchContext { private String interpreterSettingId; private String interpreterSettingGroup; private String interpreterSettingName; - private int zeppelinServerRPCPort; - private String zeppelinServerHost; + private int intpEventServerPort; + private String intpEventServerHost; public InterpreterLaunchContext(Properties properties, InterpreterOption option, @@ -46,8 +46,8 @@ public InterpreterLaunchContext(Properties properties, String interpreterSettingId, String interpreterSettingGroup, String interpreterSettingName, - int zeppelinServerRPCPort, - String zeppelinServerHost) { + int intpEventServerPort, + String intpEventServerHost) { this.properties = properties; this.option = option; this.runner = runner; @@ -56,8 +56,8 @@ public InterpreterLaunchContext(Properties properties, this.interpreterSettingId = interpreterSettingId; this.interpreterSettingGroup = interpreterSettingGroup; this.interpreterSettingName = interpreterSettingName; - this.zeppelinServerRPCPort = zeppelinServerRPCPort; - this.zeppelinServerHost = zeppelinServerHost; + this.intpEventServerPort = intpEventServerPort; + this.intpEventServerHost = intpEventServerHost; } public Properties getProperties() { @@ -92,11 +92,11 @@ public String getUserName() { return userName; } - public int getZeppelinServerRPCPort() { - return zeppelinServerRPCPort; + public int getIntpEventServerPort() { + return intpEventServerPort; } - public String getZeppelinServerHost() { - return zeppelinServerHost; + public String getIntpEventServerHost() { + return intpEventServerHost; } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java index 8e8bf534d3e..1fb2ea92085 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java @@ -18,7 +18,11 @@ package org.apache.zeppelin.interpreter.launcher; import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.InterpreterRunner; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Properties; @@ -28,6 +32,7 @@ */ public abstract class InterpreterLauncher { + private static Logger LOGGER = LoggerFactory.getLogger(InterpreterLauncher.class); private static String SPECIAL_CHARACTER="{}()<>&*‘|=?;[]$–#~!.\"%/\\:+,`"; protected ZeppelinConfiguration zConf; @@ -43,6 +48,11 @@ public void setProperties(Properties props) { this.properties = props; } + /** + * The timeout setting in interpreter setting take precedence over + * that in zeppelin-site.xml + * @return + */ protected int getConnectTimeout() { int connectTimeout = zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); @@ -65,5 +75,43 @@ public static String escapeSpecialCharacter(String command) { return builder.toString(); } - public abstract InterpreterClient launch(InterpreterLaunchContext context) throws IOException; + /** + * Try to recover interpreter process first, then call launchDirectly via sub class implementation. + * + * @param context + * @return + * @throws IOException + */ + public InterpreterClient launch(InterpreterLaunchContext context) throws IOException { + // try to recover it first + if (zConf.isRecoveryEnabled()) { + InterpreterClient recoveredClient = + recoveryStorage.getInterpreterClient(context.getInterpreterGroupId()); + if (recoveredClient != null) { + if (recoveredClient.isRunning()) { + LOGGER.info("Recover interpreter process running at {} of interpreter group: {}", + recoveredClient.getHost() + ":" + recoveredClient.getPort(), + recoveredClient.getInterpreterGroupId()); + return recoveredClient; + } else { + recoveryStorage.removeInterpreterClient(context.getInterpreterGroupId()); + LOGGER.warn("Unable to recover interpreter process: " + recoveredClient.getHost() + ":" + + recoveredClient.getPort() + ", as it is already terminated."); + } + } + } + + // launch it via sub class implementation without recovering. + return launchDirectly(context); + } + + /** + * launch interpreter process directly without recovering. + * + * @param context + * @return + * @throws IOException + */ + public abstract InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException; + } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java index 8bbe8302fcf..b7c87c07533 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java @@ -21,11 +21,13 @@ import org.apache.zeppelin.interpreter.launcher.InterpreterClient; import java.io.IOException; +import java.util.HashMap; import java.util.Map; /** * Interface for storing interpreter process recovery metadata. + * Just store mapping between interpreterGroupId to interpreter process host:ip * */ public abstract class RecoveryStorage { @@ -33,12 +35,15 @@ public abstract class RecoveryStorage { protected ZeppelinConfiguration zConf; protected Map restoredClients; - public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException { + // TODO(zjffdu) The constructor is inconsistent between base class and its implementation. + // The implementation actually use InterpreterSettingManager, the interface should also use it. + public RecoveryStorage(ZeppelinConfiguration zConf) { this.zConf = zConf; } /** * Update RecoveryStorage when new InterpreterClient is started + * * @param client * @throws IOException */ @@ -46,6 +51,7 @@ public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException { /** * Update RecoveryStorage when InterpreterClient is stopped + * * @param client * @throws IOException */ @@ -55,7 +61,7 @@ public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException { * * It is only called when Zeppelin Server is started. * - * @return + * @return Map between interpreterGroupId to InterpreterClient * @throws IOException */ public abstract Map restore() throws IOException; @@ -67,9 +73,24 @@ public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException { * @throws IOException */ public void init() throws IOException { - this.restoredClients = restore(); + Map restoredClientsInStorage= restore(); + this.restoredClients = new HashMap(); + for (Map.Entry entry : restoredClientsInStorage.entrySet()) { + if (entry.getValue().recover()) { + this.restoredClients.put(entry.getKey(), entry.getValue()); + } else { + onInterpreterClientStop(entry.getValue()); + } + } } + /** + * Get InterpreterClient that is associated with this interpreterGroupId, return null when there's + * no such InterpreterClient. + * + * @param interpreterGroupId + * @return InterpreterClient + */ public InterpreterClient getInterpreterClient(String interpreterGroupId) { if (restoredClients.containsKey(interpreterGroupId)) { return restoredClients.get(interpreterGroupId); @@ -77,4 +98,8 @@ public InterpreterClient getInterpreterClient(String interpreterGroupId) { return null; } } + + public void removeInterpreterClient(String interpreterGroupId) { + this.restoredClients.remove(interpreterGroupId); + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index 3de4e10233f..eadbf242c18 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -33,6 +33,7 @@ import org.apache.zeppelin.interpreter.thrift.OutputUpdateAllEvent; import org.apache.zeppelin.interpreter.thrift.OutputUpdateEvent; import org.apache.zeppelin.interpreter.thrift.ParagraphInfo; +import org.apache.zeppelin.interpreter.thrift.RegisterInfo; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventService; import org.apache.zeppelin.interpreter.thrift.RunParagraphsEvent; import org.apache.zeppelin.interpreter.thrift.ServiceException; @@ -62,9 +63,9 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, private PooledRemoteClient remoteClient; private String intpGroupId; - public RemoteInterpreterEventClient(String host, int port) { + public RemoteInterpreterEventClient(String intpEventHost, int intpEventPort) { this.remoteClient = new PooledRemoteClient<>(() -> { - TSocket transport = new TSocket(host, port); + TSocket transport = new TSocket(intpEventHost, intpEventPort); try { transport.open(); } catch (TTransportException e) { @@ -83,6 +84,13 @@ public void setIntpGroupId(String intpGroupId) { this.intpGroupId = intpGroupId; } + public void registerInterpreterProcess(RegisterInfo registerInfo) { + callRemoteFunction(client -> { + client.registerInterpreterProcess(registerInfo); + return null; + }); + } + /** * Get all resources except for specific resourcePool * diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 9c48f010d1e..d711d3a9bac 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -90,7 +90,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -99,6 +98,9 @@ import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META; @@ -109,7 +111,7 @@ public class RemoteInterpreterServer extends Thread implements RemoteInterpreterService.Iface { - private static Logger logger = LoggerFactory.getLogger(RemoteInterpreterServer.class); + private static Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterServer.class); private String interpreterGroupId; private InterpreterGroup interpreterGroup; @@ -120,11 +122,10 @@ public class RemoteInterpreterServer extends Thread private Gson gson = new Gson(); private String intpEventServerHost; + private int intpEventServerPort; private String host; private int port; private TThreadPoolServer server; - RemoteInterpreterEventService.Client intpEventServiceClient; - RemoteInterpreterEventClient intpEventClient; private DependencyResolver depLoader; @@ -138,11 +139,18 @@ public class RemoteInterpreterServer extends Thread // Hold information for manual progress update private ConcurrentMap progressMap = new ConcurrentHashMap<>(); + // keep track of the running jobs for job recovery. + private ConcurrentMap runningJobs = new ConcurrentHashMap<>(); + // cache result threshold, result cache is for purpose of recover paragraph even after + // paragraph is finished + private int resultCacheInSeconds; + private ScheduledExecutorService resultCleanService = Executors.newSingleThreadScheduledExecutor(); + private boolean isTest; // cluster manager client - ZeppelinConfiguration zconf = ZeppelinConfiguration.create(); - ClusterManagerClient clusterManagerClient; + private ZeppelinConfiguration zconf = ZeppelinConfiguration.create(); + private ClusterManagerClient clusterManagerClient; public RemoteInterpreterServer(String intpEventServerHost, int intpEventServerPort, @@ -158,15 +166,12 @@ public RemoteInterpreterServer(String intpEventServerHost, String interpreterGroupId, boolean isTest) throws TTransportException, IOException { - logger.info("Starting remote interpreter server on port {}, intpEventServerAddress: {}:{}", port, + LOGGER.info("Starting remote interpreter server on port {}, intpEventServerAddress: {}:{}", port, intpEventServerHost, intpEventServerPort); if (null != intpEventServerHost) { this.intpEventServerHost = intpEventServerHost; + this.intpEventServerPort = intpEventServerPort; if (!isTest) { - TTransport transport = new TSocket(intpEventServerHost, intpEventServerPort); - transport.open(); - TProtocol protocol = new TBinaryProtocol(transport); - intpEventServiceClient = new RemoteInterpreterEventService.Client(protocol); intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort); } } else { @@ -185,7 +190,7 @@ public RemoteInterpreterServer(String intpEventServerHost, serverTransport = RemoteInterpreterUtils.createTServerSocket(portRange); this.port = serverTransport.getServerSocket().getLocalPort(); this.host = RemoteInterpreterUtils.findAvailableHostAddress(); - logger.info("Launching ThriftServer at " + this.host + ":" + this.port); + LOGGER.info("Launching ThriftServer at " + this.host + ":" + this.port); } server = new TThreadPoolServer( new TThreadPoolServer.Args(serverTransport).processor(processor)); @@ -223,13 +228,15 @@ public void run() { if (!interrupted) { RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId); try { - intpEventServiceClient.registerInterpreterProcess(registerInfo); - } catch (TException e) { - logger.error("Error while registering interpreter: {}", registerInfo, e); + LOGGER.info("Registering interpreter process"); + intpEventClient.registerInterpreterProcess(registerInfo); + LOGGER.info("Registered interpreter process"); + } catch (Exception e) { + LOGGER.error("Error while registering interpreter: {}", registerInfo, e); try { shutdown(); } catch (TException e1) { - logger.warn("Exception occurs while shutting down", e1); + LOGGER.warn("Exception occurs while shutting down", e1); } } } @@ -243,7 +250,7 @@ public void run() { @Override public void shutdown() throws TException { Thread shutDownThread = new Thread(() -> { - logger.info("Shutting down..."); + LOGGER.info("Shutting down..."); // delete interpreter cluster meta deleteClusterMeta(); @@ -254,7 +261,7 @@ public void shutdown() throws TException { try { interpreter.close(); } catch (InterpreterException e) { - logger.warn("Fail to close interpreter", e); + LOGGER.warn("Fail to close interpreter", e); } } } @@ -276,16 +283,16 @@ public void shutdown() throws TException { try { Thread.sleep(300); } catch (InterruptedException e) { - logger.info("Exception in RemoteInterpreterServer while shutdown, Thread.sleep", e); + LOGGER.info("Exception in RemoteInterpreterServer while shutdown, Thread.sleep", e); } } if (server.isServing()) { - logger.info("Force shutting down"); + LOGGER.info("Force shutting down"); System.exit(0); } - logger.info("Shutting down"); + LOGGER.info("Shutting down"); }, "Shutdown-Thread"); shutDownThread.start(); @@ -329,7 +336,7 @@ public void handle(Signal signal) { try { remoteInterpreterServer.shutdown(); } catch (TException e) { - logger.error("Error on shutdown RemoteInterpreterServer", e); + LOGGER.error("Error on shutdown RemoteInterpreterServer", e); } } }); @@ -368,53 +375,60 @@ private void deleteClusterMeta() { clusterManagerClient.deleteClusterMeta(INTP_PROCESS_META, interpreterGroupId); Thread.sleep(300); } catch (InterruptedException e) { - logger.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); } } @Override public void createInterpreter(String interpreterGroupId, String sessionId, String className, Map properties, String userName) throws TException { - if (interpreterGroup == null) { - interpreterGroup = new InterpreterGroup(interpreterGroupId); - angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient); - hookRegistry = new InterpreterHookRegistry(); - resourcePool = new DistributedResourcePool(interpreterGroup.getId(), intpEventClient); - interpreterGroup.setInterpreterHookRegistry(hookRegistry); - interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); - interpreterGroup.setResourcePool(resourcePool); - intpEventClient.setIntpGroupId(interpreterGroupId); + try { + if (interpreterGroup == null) { + interpreterGroup = new InterpreterGroup(interpreterGroupId); + angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient); + hookRegistry = new InterpreterHookRegistry(); + resourcePool = new DistributedResourcePool(interpreterGroup.getId(), intpEventClient); + interpreterGroup.setInterpreterHookRegistry(hookRegistry); + interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); + interpreterGroup.setResourcePool(resourcePool); + intpEventClient.setIntpGroupId(interpreterGroupId); + + String localRepoPath = properties.get("zeppelin.interpreter.localRepo"); + if (properties.containsKey("zeppelin.interpreter.output.limit")) { + InterpreterOutput.limit = Integer.parseInt( + properties.get("zeppelin.interpreter.output.limit")); + } - String localRepoPath = properties.get("zeppelin.interpreter.localRepo"); - if (properties.containsKey("zeppelin.interpreter.output.limit")) { - InterpreterOutput.limit = Integer.parseInt( - properties.get("zeppelin.interpreter.output.limit")); - } + depLoader = new DependencyResolver(localRepoPath); + appLoader = new ApplicationLoader(resourcePool, depLoader); - depLoader = new DependencyResolver(localRepoPath); - appLoader = new ApplicationLoader(resourcePool, depLoader); - } + resultCacheInSeconds = + Integer.parseInt(properties.getOrDefault("zeppelin.interpreter.result.cache", "0")); + } - try { - Class replClass = (Class) Object.class.forName(className); - Properties p = new Properties(); - p.putAll(properties); - setSystemProperty(p); - - Constructor constructor = - replClass.getConstructor(new Class[]{Properties.class}); - Interpreter repl = constructor.newInstance(p); - repl.setClassloaderUrls(new URL[]{}); - logger.info("Instantiate interpreter {}", className); - repl.setInterpreterGroup(interpreterGroup); - repl.setUserName(userName); - - interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(repl), sessionId); - } catch (ClassNotFoundException | NoSuchMethodException | SecurityException - | InstantiationException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException e) { - logger.error(e.toString(), e); - throw new TException(e); + try { + Class replClass = (Class) Object.class.forName(className); + Properties p = new Properties(); + p.putAll(properties); + setSystemProperty(p); + + Constructor constructor = + replClass.getConstructor(new Class[]{Properties.class}); + Interpreter repl = constructor.newInstance(p); + repl.setClassloaderUrls(new URL[]{}); + LOGGER.info("Instantiate interpreter {}", className); + repl.setInterpreterGroup(interpreterGroup); + repl.setUserName(userName); + + interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(repl), sessionId); + } catch (ClassNotFoundException | NoSuchMethodException | SecurityException + | InstantiationException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + LOGGER.error(e.toString(), e); + throw new TException(e); + } + } catch (Exception e) { + throw new TException(e.getMessage(), e); } } @@ -464,7 +478,7 @@ protected Interpreter getInterpreter(String sessionId, String className) throws @Override public void open(String sessionId, String className) throws TException { - logger.info(String.format("Open Interpreter %s for session %s ", className, sessionId)); + LOGGER.info(String.format("Open Interpreter %s for session %s ", className, sessionId)); Interpreter intp = getInterpreter(sessionId, className); try { intp.open(); @@ -482,12 +496,12 @@ public void close(String sessionId, String className) throws TException { // see NoteInterpreterLoader.SHARED_SESSION if (appInfo.noteId.equals(sessionId) || sessionId.equals("shared_session")) { try { - logger.info("Unload App {} ", appInfo.pkg.getName()); + LOGGER.info("Unload App {} ", appInfo.pkg.getName()); appInfo.app.unload(); // see ApplicationState.Status.UNLOADED intpEventClient.onAppStatusUpdate(appInfo.noteId, appInfo.paragraphId, appId, "UNLOADED"); } catch (ApplicationException e) { - logger.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); } } } @@ -504,7 +518,7 @@ public void close(String sessionId, String className) throws TException { try { inp.close(); } catch (InterpreterException e) { - logger.warn("Fail to close interpreter", e); + LOGGER.warn("Fail to close interpreter", e); } it.remove(); break; @@ -516,48 +530,94 @@ public void close(String sessionId, String className) throws TException { } @Override - public RemoteInterpreterResult interpret(String sessionId, String className, String st, + public void reconnect(String host, int port) throws TException { + try { + LOGGER.info("Reconnect to this interpreter process from {}:{}", host, port); + this.intpEventServerHost = host; + this.intpEventServerPort = port; + intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort); + intpEventClient.setIntpGroupId(interpreterGroupId); + + this.angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient); + this.resourcePool = new DistributedResourcePool(interpreterGroup.getId(), intpEventClient); + + // reset all the available InterpreterContext's components that use intpEventClient. + for (InterpreterContext context : InterpreterContext.getAllContexts().values()) { + context.setIntpEventClient(intpEventClient); + context.setAngularObjectRegistry(angularObjectRegistry); + context.setResourcePool(resourcePool); + } + } catch (Exception e) { + throw new TException("Fail to reconnect", e); + } + } + + @Override + public RemoteInterpreterResult interpret(String sessionId, + String className, + String st, RemoteInterpreterContext interpreterContext) throws TException { - if (logger.isDebugEnabled()) { - logger.debug("st:\n{}", st); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("st:\n{}", st); } Interpreter intp = getInterpreter(sessionId, className); InterpreterContext context = convert(interpreterContext); context.setInterpreterClassName(intp.getClassName()); - Scheduler scheduler = intp.getScheduler(); - InterpretJobListener jobListener = new InterpretJobListener(); - InterpretJob job = new InterpretJob( - interpreterContext.getParagraphId(), - "RemoteInterpretJob_" + System.currentTimeMillis(), - jobListener, - intp, - st, - context); - scheduler.submit(job); - - while (!job.isTerminated()) { + InterpretJob interpretJob = null; + boolean isRecover = Boolean.parseBoolean( + context.getLocalProperties().getOrDefault("isRecover", "false")); + if (isRecover) { + LOGGER.info("Recovering paragraph: " + context.getParagraphId() + " of note: " + + context.getNoteId()); + interpretJob = runningJobs.get(context.getParagraphId()); + if (interpretJob == null) { + InterpreterResult result = new InterpreterResult(Code.ERROR, "Job is finished, unable to recover it"); + return convert(result, + context.getConfig(), + context.getGui(), + context.getNoteGui()); + } + } else { + Scheduler scheduler = intp.getScheduler(); + InterpretJobListener jobListener = new InterpretJobListener(); + interpretJob = new InterpretJob( + context.getParagraphId(), + "RemoteInterpretJob_" + System.currentTimeMillis(), + jobListener, + intp, + st, + context); + runningJobs.put(context.getParagraphId(), interpretJob); + scheduler.submit(interpretJob); + } + + while (!interpretJob.isTerminated()) { + JobListener jobListener = interpretJob.getListener(); synchronized (jobListener) { try { jobListener.wait(1000); } catch (InterruptedException e) { - logger.info("Exception in RemoteInterpreterServer while interpret, jobListener.wait", e); + LOGGER.info("Exception in RemoteInterpreterServer while interpret, jobListener.wait", e); } } } - progressMap.remove(interpreterContext.getParagraphId()); + progressMap.remove(context.getParagraphId()); + resultCleanService.schedule(()-> { + runningJobs.remove(context.getParagraphId()); + }, resultCacheInSeconds, TimeUnit.SECONDS); - InterpreterResult result = (InterpreterResult) job.getReturn(); + InterpreterResult result = interpretJob.getReturn(); // in case of job abort in PENDING status, result can be null if (result == null) { result = new InterpreterResult(Code.KEEP_PREVIOUS_RESULT); } return convert(result, - context.getConfig(), - context.getGui(), - context.getNoteGui()); + context.getConfig(), + context.getGui(), + context.getNoteGui()); } class InterpretJobListener implements JobListener { @@ -576,7 +636,6 @@ public void onStatusChange(Job job, Status before, Status after) { public static class InterpretJob extends Job { - private Interpreter interpreter; private String script; private InterpreterContext context; @@ -657,6 +716,8 @@ public InterpreterResult jobRun() throws Throwable { ClassLoader currentThreadContextClassloader = Thread.currentThread().getContextClassLoader(); try { InterpreterContext.set(context); + // clear the result of last run in frontend before running this paragraph. + context.out.clear(); InterpreterResult result = null; @@ -680,7 +741,7 @@ public InterpreterResult jobRun() throws Throwable { // global_post_hook processInterpreterHooks(context.getNoteId()); processInterpreterHooks(null); - logger.debug("Script after hooks: " + script); + LOGGER.debug("Script after hooks: " + script); result = interpreter.interpret(script, context); } @@ -698,21 +759,21 @@ public InterpreterResult jobRun() throws Throwable { List stringResult = new ArrayList<>(); for (InterpreterResultMessage msg : resultMessages) { if (msg.getType() == InterpreterResult.Type.IMG) { - logger.debug("InterpreterResultMessage: IMAGE_DATA"); + LOGGER.debug("InterpreterResultMessage: IMAGE_DATA"); } else { - logger.debug("InterpreterResultMessage: " + msg.toString()); + LOGGER.debug("InterpreterResultMessage: " + msg.toString()); } stringResult.add(msg.getData()); } // put result into resource pool if (context.getLocalProperties().containsKey("saveAs")) { if (stringResult.size() == 1) { - logger.info("Saving result into ResourcePool as single string: " + + LOGGER.info("Saving result into ResourcePool as single string: " + context.getLocalProperties().get("saveAs")); context.getResourcePool().put( context.getLocalProperties().get("saveAs"), stringResult.get(0)); } else { - logger.info("Saving result into ResourcePool as string list: " + + LOGGER.info("Saving result into ResourcePool as string list: " + context.getLocalProperties().get("saveAs")); context.getResourcePool().put( context.getLocalProperties().get("saveAs"), stringResult); @@ -743,7 +804,7 @@ public void setResult(InterpreterResult result) { public void cancel(String sessionId, String className, RemoteInterpreterContext interpreterContext) throws TException { - logger.info("cancel {} {}", className, interpreterContext.getParagraphId()); + LOGGER.info("cancel {} {}", className, interpreterContext.getParagraphId()); Interpreter intp = getInterpreter(sessionId, className); String jobId = interpreterContext.getParagraphId(); Job job = intp.getScheduler().getJob(jobId); @@ -755,7 +816,7 @@ public void cancel(String sessionId, try { intp.cancel(convert(interpreterContext, null)); } catch (InterpreterException e) { - logger.error("Fail to cancel paragraph: " + interpreterContext.getParagraphId()); + LOGGER.error("Fail to cancel paragraph: " + interpreterContext.getParagraphId()); } }); thread.start(); @@ -845,14 +906,14 @@ public void onUpdateAll(InterpreterOutput out) { intpEventClient.onInterpreterOutputUpdateAll( noteId, paragraphId, out.toInterpreterResultMessage()); } catch (IOException e) { - logger.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); } } @Override public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) { String output = new String(line); - logger.debug("Output Append: {}", output); + LOGGER.debug("Output Append: {}", output); intpEventClient.onInterpreterOutputAppend( noteId, paragraphId, index, output); } @@ -862,11 +923,11 @@ public void onUpdate(int index, InterpreterResultMessageOutput out) { String output; try { output = new String(out.toByteArray()); - logger.debug("Output Update for index {}: {}", index, output); + LOGGER.debug("Output Update for index {}: {}", index, output); intpEventClient.onInterpreterOutputUpdate( noteId, paragraphId, index, out.getType(), output); } catch (IOException e) { - logger.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); } } }); @@ -932,7 +993,7 @@ public void angularObjectUpdate(String name, String noteId, String paragraphId, // first try local objects AngularObject ao = registry.get(name, noteId, paragraphId); if (ao == null) { - logger.debug("Angular object {} not exists", name); + LOGGER.debug("Angular object {} not exists", name); return; } @@ -950,7 +1011,7 @@ public void angularObjectUpdate(String name, String noteId, String paragraphId, return; } catch (Exception e) { // it's not a previous object's type. proceed to treat as a generic type - logger.debug(e.getMessage(), e); + LOGGER.debug(e.getMessage(), e); } } @@ -962,7 +1023,7 @@ public void angularObjectUpdate(String name, String noteId, String paragraphId, }.getType()); } catch (Exception e) { // it's not a generic json object, too. okay, proceed to threat as a string type - logger.debug(e.getMessage(), e); + LOGGER.debug(e.getMessage(), e); } } @@ -997,7 +1058,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str }.getType()); } catch (Exception e) { // it's okay. proceed to treat object as a string - logger.debug(e.getMessage(), e); + LOGGER.debug(e.getMessage(), e); } // try string object type at last @@ -1017,7 +1078,7 @@ public void angularObjectRemove(String name, String noteId, String paragraphId) @Override public List resourcePoolGetAll() throws TException { - logger.debug("Request resourcePoolGetAll from ZeppelinServer"); + LOGGER.debug("Request resourcePoolGetAll from ZeppelinServer"); List result = new LinkedList<>(); if (resourcePool == null) { @@ -1041,7 +1102,7 @@ public boolean resourceRemove(String noteId, String paragraphId, String resource @Override public ByteBuffer resourceGet(String noteId, String paragraphId, String resourceName) throws TException { - logger.debug("Request resourceGet {} from ZeppelinServer", resourceName); + LOGGER.debug("Request resourceGet {} from ZeppelinServer", resourceName); Resource resource = resourcePool.get(noteId, paragraphId, resourceName, false); if (resource == null || resource.get() == null || !resource.isSerializable()) { @@ -1050,7 +1111,7 @@ public ByteBuffer resourceGet(String noteId, String paragraphId, String resource try { return Resource.serializeObject(resource.get()); } catch (IOException e) { - logger.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); return ByteBuffer.allocate(0); } } @@ -1099,7 +1160,7 @@ public ByteBuffer resourceInvokeMethod( } } } catch (Exception e) { - logger.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); return ByteBuffer.allocate(0); } } @@ -1114,7 +1175,7 @@ public void angularRegistryPush(String registryAsString) throws TException { }.getType()); interpreterGroup.getAngularObjectRegistry().setRegistry(deserializedRegistry); } catch (Exception e) { - logger.info("Exception in RemoteInterpreterServer while angularRegistryPush, nolock", e); + LOGGER.info("Exception in RemoteInterpreterServer while angularRegistryPush, nolock", e); } } @@ -1138,7 +1199,7 @@ public void onUpdate(int index, InterpreterResultMessageOutput out) { intpEventClient.onAppOutputUpdate(noteId, paragraphId, index, appId, out.getType(), new String(out.toByteArray())); } catch (IOException e) { - logger.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); } } }); @@ -1161,7 +1222,7 @@ public RemoteApplicationResult loadApplication( String applicationInstanceId, String packageInfo, String noteId, String paragraphId) throws TException { if (runningApplications.containsKey(applicationInstanceId)) { - logger.warn("Application instance {} is already running"); + LOGGER.warn("Application instance {} is already running"); return new RemoteApplicationResult(true, ""); } HeliumPackage pkgInfo = HeliumPackage.fromJson(packageInfo); @@ -1169,7 +1230,7 @@ public RemoteApplicationResult loadApplication( pkgInfo, noteId, paragraphId, applicationInstanceId); try { Application app = null; - logger.info( + LOGGER.info( "Loading application {}({}), artifact={}, className={} into note={}, paragraph={}", pkgInfo.getName(), applicationInstanceId, @@ -1183,7 +1244,7 @@ public RemoteApplicationResult loadApplication( new RunningApplication(pkgInfo, app, noteId, paragraphId)); return new RemoteApplicationResult(true, ""); } catch (Exception e) { - logger.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); return new RemoteApplicationResult(false, e.getMessage()); } } @@ -1194,10 +1255,10 @@ public RemoteApplicationResult unloadApplication(String applicationInstanceId) RunningApplication runningApplication = runningApplications.remove(applicationInstanceId); if (runningApplication != null) { try { - logger.info("Unloading application {}", applicationInstanceId); + LOGGER.info("Unloading application {}", applicationInstanceId); runningApplication.app.unload(); } catch (ApplicationException e) { - logger.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); return new RemoteApplicationResult(false, e.getMessage()); } } @@ -1207,11 +1268,11 @@ public RemoteApplicationResult unloadApplication(String applicationInstanceId) @Override public RemoteApplicationResult runApplication(String applicationInstanceId) throws TException { - logger.info("run application {}", applicationInstanceId); + LOGGER.info("run application {}", applicationInstanceId); RunningApplication runningApp = runningApplications.get(applicationInstanceId); if (runningApp == null) { - logger.error("Application instance {} not exists", applicationInstanceId); + LOGGER.error("Application instance {} not exists", applicationInstanceId); return new RemoteApplicationResult(false, "Application instance does not exists"); } else { ApplicationContext context = runningApp.app.context(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java index 4b053d626b6..7ae3e8ca55c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class AngularObjectId implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AngularObjectId"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java index 2511ab94b26..42e062e787c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class AppOutputAppendEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppOutputAppendEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java index 8f4c9ca70d4..87ae2dd729a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class AppOutputUpdateEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppOutputUpdateEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java index 550efebd399..1ffb9b8c8e9 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class AppStatusUpdateEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppStatusUpdateEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java index ddb5512ab39..e138d8ce568 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class InterpreterCompletion implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java index c0757fe1d33..7e03eb4977b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class OutputAppendEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputAppendEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java index 944241e2c61..0de49fd80da 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class OutputUpdateAllEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputUpdateAllEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java index cea1774df53..3b46850963a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class OutputUpdateEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputUpdateEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java index 465b8bf31f1..98f3653cf08 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class ParagraphInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ParagraphInfo"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java index 8744e2b7718..804c859fde9 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class RegisterInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RegisterInfo"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java index d869dffa324..974bce8b8e1 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class RemoteApplicationResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java index ce9dff3eba8..31aa514f003 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class RemoteInterpreterContext implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java index 2a992086f6f..7d3031c300b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class RemoteInterpreterEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java index e1eb9e8eab3..8acc1136d58 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class RemoteInterpreterEventService { public interface Iface { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index 06be47a1ed1..7761c24bc2a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { NO_OP(1), ANGULAR_OBJECT_ADD(2), diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java index 2817903c9a8..5eb6ee3ab19 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class RemoteInterpreterResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java index e0ec93678d3..fb2642a6eb6 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class RemoteInterpreterResultMessage implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResultMessage"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java index 68dc727703d..a472004fe35 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class RemoteInterpreterService { public interface Iface { @@ -35,6 +35,8 @@ public interface Iface { public void close(java.lang.String sessionId, java.lang.String className) throws org.apache.thrift.TException; + public void reconnect(java.lang.String host, int port) throws org.apache.thrift.TException; + public RemoteInterpreterResult interpret(java.lang.String sessionId, java.lang.String className, java.lang.String st, RemoteInterpreterContext interpreterContext) throws org.apache.thrift.TException; public void cancel(java.lang.String sessionId, java.lang.String className, RemoteInterpreterContext interpreterContext) throws org.apache.thrift.TException; @@ -81,6 +83,8 @@ public interface AsyncIface { public void close(java.lang.String sessionId, java.lang.String className, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + public void reconnect(java.lang.String host, int port, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + public void interpret(java.lang.String sessionId, java.lang.String className, java.lang.String st, RemoteInterpreterContext interpreterContext, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; public void cancel(java.lang.String sessionId, java.lang.String className, RemoteInterpreterContext interpreterContext, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; @@ -205,6 +209,27 @@ public void recv_close() throws org.apache.thrift.TException return; } + public void reconnect(java.lang.String host, int port) throws org.apache.thrift.TException + { + send_reconnect(host, port); + recv_reconnect(); + } + + public void send_reconnect(java.lang.String host, int port) throws org.apache.thrift.TException + { + reconnect_args args = new reconnect_args(); + args.setHost(host); + args.setPort(port); + sendBase("reconnect", args); + } + + public void recv_reconnect() throws org.apache.thrift.TException + { + reconnect_result result = new reconnect_result(); + receiveBase(result, "reconnect"); + return; + } + public RemoteInterpreterResult interpret(java.lang.String sessionId, java.lang.String className, java.lang.String st, RemoteInterpreterContext interpreterContext) throws org.apache.thrift.TException { send_interpret(sessionId, className, st, interpreterContext); @@ -762,6 +787,41 @@ public Void getResult() throws org.apache.thrift.TException { } } + public void reconnect(java.lang.String host, int port, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + checkReady(); + reconnect_call method_call = new reconnect_call(host, port, resultHandler, this, ___protocolFactory, ___transport); + this.___currentMethod = method_call; + ___manager.call(method_call); + } + + public static class reconnect_call extends org.apache.thrift.async.TAsyncMethodCall { + private java.lang.String host; + private int port; + public reconnect_call(java.lang.String host, int port, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + super(client, protocolFactory, transport, resultHandler, false); + this.host = host; + this.port = port; + } + + public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { + prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("reconnect", org.apache.thrift.protocol.TMessageType.CALL, 0)); + reconnect_args args = new reconnect_args(); + args.setHost(host); + args.setPort(port); + args.write(prot); + prot.writeMessageEnd(); + } + + public Void getResult() throws org.apache.thrift.TException { + if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { + throw new java.lang.IllegalStateException("Method call not finished!"); + } + org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); + org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + return null; + } + } + public void interpret(java.lang.String sessionId, java.lang.String className, java.lang.String st, RemoteInterpreterContext interpreterContext, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { checkReady(); interpret_call method_call = new interpret_call(sessionId, className, st, interpreterContext, resultHandler, this, ___protocolFactory, ___transport); @@ -1441,6 +1501,7 @@ protected Processor(I iface, java.util.Map extends org.apache.thrift.ProcessFunction { + public reconnect() { + super("reconnect"); + } + + public reconnect_args getEmptyArgsInstance() { + return new reconnect_args(); + } + + protected boolean isOneway() { + return false; + } + + @Override + protected boolean rethrowUnhandledExceptions() { + return false; + } + + public reconnect_result getResult(I iface, reconnect_args args) throws org.apache.thrift.TException { + reconnect_result result = new reconnect_result(); + iface.reconnect(args.host, args.port); + return result; + } + } + public static class interpret extends org.apache.thrift.ProcessFunction { public interpret() { super("interpret"); @@ -2005,6 +2091,7 @@ protected AsyncProcessor(I iface, java.util.Map extends org.apache.thrift.AsyncProcessFunction { + public reconnect() { + super("reconnect"); + } + + public reconnect_args getEmptyArgsInstance() { + return new reconnect_args(); + } + + public org.apache.thrift.async.AsyncMethodCallback getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { + final org.apache.thrift.AsyncProcessFunction fcall = this; + return new org.apache.thrift.async.AsyncMethodCallback() { + public void onComplete(Void o) { + reconnect_result result = new reconnect_result(); + try { + fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid); + } catch (org.apache.thrift.transport.TTransportException e) { + _LOGGER.error("TTransportException writing to internal frame buffer", e); + fb.close(); + } catch (java.lang.Exception e) { + _LOGGER.error("Exception writing to internal frame buffer", e); + onError(e); + } + } + public void onError(java.lang.Exception e) { + byte msgType = org.apache.thrift.protocol.TMessageType.REPLY; + org.apache.thrift.TSerializable msg; + reconnect_result result = new reconnect_result(); + if (e instanceof org.apache.thrift.transport.TTransportException) { + _LOGGER.error("TTransportException inside handler", e); + fb.close(); + return; + } else if (e instanceof org.apache.thrift.TApplicationException) { + _LOGGER.error("TApplicationException inside handler", e); + msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; + msg = (org.apache.thrift.TApplicationException)e; + } else { + _LOGGER.error("Exception inside handler", e); + msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; + msg = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage()); + } + try { + fcall.sendResponse(fb,msg,msgType,seqid); + } catch (java.lang.Exception ex) { + _LOGGER.error("Exception writing to internal frame buffer", ex); + fb.close(); + } + } + }; + } + + protected boolean isOneway() { + return false; + } + + public void start(I iface, reconnect_args args, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + iface.reconnect(args.host, args.port,resultHandler); + } + } + public static class interpret extends org.apache.thrift.AsyncProcessFunction { public interpret() { super("interpret"); @@ -5847,6 +5994,727 @@ private static S scheme(org.apache. } } + public static class reconnect_args implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("reconnect_args"); + + private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)1); + private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, (short)2); + + private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new reconnect_argsStandardSchemeFactory(); + private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new reconnect_argsTupleSchemeFactory(); + + public @org.apache.thrift.annotation.Nullable java.lang.String host; // required + public int port; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + HOST((short)1, "host"), + PORT((short)2, "port"); + + private static final java.util.Map byName = new java.util.HashMap(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // HOST + return HOST; + case 2: // PORT + return PORT; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __PORT_ISSET_ID = 0; + private byte __isset_bitfield = 0; + public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(reconnect_args.class, metaDataMap); + } + + public reconnect_args() { + } + + public reconnect_args( + java.lang.String host, + int port) + { + this(); + this.host = host; + this.port = port; + setPortIsSet(true); + } + + /** + * Performs a deep copy on other. + */ + public reconnect_args(reconnect_args other) { + __isset_bitfield = other.__isset_bitfield; + if (other.isSetHost()) { + this.host = other.host; + } + this.port = other.port; + } + + public reconnect_args deepCopy() { + return new reconnect_args(this); + } + + @Override + public void clear() { + this.host = null; + setPortIsSet(false); + this.port = 0; + } + + @org.apache.thrift.annotation.Nullable + public java.lang.String getHost() { + return this.host; + } + + public reconnect_args setHost(@org.apache.thrift.annotation.Nullable java.lang.String host) { + this.host = host; + return this; + } + + public void unsetHost() { + this.host = null; + } + + /** Returns true if field host is set (has been assigned a value) and false otherwise */ + public boolean isSetHost() { + return this.host != null; + } + + public void setHostIsSet(boolean value) { + if (!value) { + this.host = null; + } + } + + public int getPort() { + return this.port; + } + + public reconnect_args setPort(int port) { + this.port = port; + setPortIsSet(true); + return this; + } + + public void unsetPort() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __PORT_ISSET_ID); + } + + /** Returns true if field port is set (has been assigned a value) and false otherwise */ + public boolean isSetPort() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __PORT_ISSET_ID); + } + + public void setPortIsSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __PORT_ISSET_ID, value); + } + + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { + switch (field) { + case HOST: + if (value == null) { + unsetHost(); + } else { + setHost((java.lang.String)value); + } + break; + + case PORT: + if (value == null) { + unsetPort(); + } else { + setPort((java.lang.Integer)value); + } + break; + + } + } + + @org.apache.thrift.annotation.Nullable + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case HOST: + return getHost(); + + case PORT: + return getPort(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case HOST: + return isSetHost(); + case PORT: + return isSetPort(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof reconnect_args) + return this.equals((reconnect_args)that); + return false; + } + + public boolean equals(reconnect_args that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_host = true && this.isSetHost(); + boolean that_present_host = true && that.isSetHost(); + if (this_present_host || that_present_host) { + if (!(this_present_host && that_present_host)) + return false; + if (!this.host.equals(that.host)) + return false; + } + + boolean this_present_port = true; + boolean that_present_port = true; + if (this_present_port || that_present_port) { + if (!(this_present_port && that_present_port)) + return false; + if (this.port != that.port) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((isSetHost()) ? 131071 : 524287); + if (isSetHost()) + hashCode = hashCode * 8191 + host.hashCode(); + + hashCode = hashCode * 8191 + port; + + return hashCode; + } + + @Override + public int compareTo(reconnect_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(isSetHost()).compareTo(other.isSetHost()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetHost()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, other.host); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = java.lang.Boolean.valueOf(isSetPort()).compareTo(other.isSetPort()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetPort()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, other.port); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + @org.apache.thrift.annotation.Nullable + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("reconnect_args("); + boolean first = true; + + sb.append("host:"); + if (this.host == null) { + sb.append("null"); + } else { + sb.append(this.host); + } + first = false; + if (!first) sb.append(", "); + sb.append("port:"); + sb.append(this.port); + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bitfield = 0; + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class reconnect_argsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public reconnect_argsStandardScheme getScheme() { + return new reconnect_argsStandardScheme(); + } + } + + private static class reconnect_argsStandardScheme extends org.apache.thrift.scheme.StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, reconnect_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // HOST + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.host = iprot.readString(); + struct.setHostIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // PORT + if (schemeField.type == org.apache.thrift.protocol.TType.I32) { + struct.port = iprot.readI32(); + struct.setPortIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, reconnect_args struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.host != null) { + oprot.writeFieldBegin(HOST_FIELD_DESC); + oprot.writeString(struct.host); + oprot.writeFieldEnd(); + } + oprot.writeFieldBegin(PORT_FIELD_DESC); + oprot.writeI32(struct.port); + oprot.writeFieldEnd(); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class reconnect_argsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public reconnect_argsTupleScheme getScheme() { + return new reconnect_argsTupleScheme(); + } + } + + private static class reconnect_argsTupleScheme extends org.apache.thrift.scheme.TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, reconnect_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.isSetHost()) { + optionals.set(0); + } + if (struct.isSetPort()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.isSetHost()) { + oprot.writeString(struct.host); + } + if (struct.isSetPort()) { + oprot.writeI32(struct.port); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, reconnect_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(2); + if (incoming.get(0)) { + struct.host = iprot.readString(); + struct.setHostIsSet(true); + } + if (incoming.get(1)) { + struct.port = iprot.readI32(); + struct.setPortIsSet(true); + } + } + } + + private static S scheme(org.apache.thrift.protocol.TProtocol proto) { + return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } + } + + public static class reconnect_result implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("reconnect_result"); + + + private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new reconnect_resultStandardSchemeFactory(); + private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new reconnect_resultTupleSchemeFactory(); + + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { +; + + private static final java.util.Map byName = new java.util.HashMap(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(reconnect_result.class, metaDataMap); + } + + public reconnect_result() { + } + + /** + * Performs a deep copy on other. + */ + public reconnect_result(reconnect_result other) { + } + + public reconnect_result deepCopy() { + return new reconnect_result(this); + } + + @Override + public void clear() { + } + + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { + switch (field) { + } + } + + @org.apache.thrift.annotation.Nullable + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof reconnect_result) + return this.equals((reconnect_result)that); + return false; + } + + public boolean equals(reconnect_result that) { + if (that == null) + return false; + if (this == that) + return true; + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + return hashCode; + } + + @Override + public int compareTo(reconnect_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + return 0; + } + + @org.apache.thrift.annotation.Nullable + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("reconnect_result("); + boolean first = true; + + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class reconnect_resultStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public reconnect_resultStandardScheme getScheme() { + return new reconnect_resultStandardScheme(); + } + } + + private static class reconnect_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, reconnect_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, reconnect_result struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class reconnect_resultTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public reconnect_resultTupleScheme getScheme() { + return new reconnect_resultTupleScheme(); + } + } + + private static class reconnect_resultTupleScheme extends org.apache.thrift.scheme.TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, reconnect_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, reconnect_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + } + } + + private static S scheme(org.apache.thrift.protocol.TProtocol proto) { + return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } + } + public static class interpret_args implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("interpret_args"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java index 72cae21620b..4a39c826ddf 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class RunParagraphsEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RunParagraphsEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java index 48861827f27..772dd05bc2c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-06-09") public class ServiceException extends org.apache.thrift.TException implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ServiceException"); diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 2dd9b017504..baaac22a0e5 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -93,6 +93,7 @@ service RemoteInterpreterService { void createInterpreter(1: string intpGroupId, 2: string sessionId, 3: string className, 4: map properties, 5: string userName); void open(1: string sessionId, 2: string className); void close(1: string sessionId, 2: string className); + void reconnect(1: string host, 2: i32 port); RemoteInterpreterResult interpret(1: string sessionId, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext); void cancel(1: string sessionId, 2: string className, 3: RemoteInterpreterContext interpreterContext); i32 getProgress(1: string sessionId, 2: string className, 3: RemoteInterpreterContext interpreterContext); diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java index ff6d69a8fa8..f392b3d1a93 100644 --- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java @@ -60,7 +60,7 @@ public ClusterInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage r } @Override - public InterpreterClient launch(InterpreterLaunchContext context) throws IOException { + public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException { LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup()); this.context = context; @@ -80,8 +80,11 @@ public InterpreterClient online(HashMap result) { context.getInterpreterSettingName(), context.getInterpreterGroupId(), connectTimeout, + context.getIntpEventServerHost(), + context.getIntpEventServerPort(), intpTserverHost, - intpTserverPort); + intpTserverPort, + false); } @Override @@ -152,8 +155,11 @@ public InterpreterClient online(HashMap result) { context.getInterpreterSettingName(), context.getInterpreterGroupId(), connectTimeout, + context.getIntpEventServerHost(), + context.getIntpEventServerPort(), intpTserverHost, - intpTserverPort); + intpTserverPort, + false); } @Override @@ -244,8 +250,8 @@ private RemoteInterpreterProcess createClusterIntpProcess() { clusterIntpProcess = new ClusterInterpreterProcess( runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(), - context.getZeppelinServerRPCPort(), - context.getZeppelinServerHost(), + context.getIntpEventServerPort(), + context.getIntpEventServerHost(), zConf.getInterpreterPortRange(), zConf.getInterpreterDir() + "/" + intpSetGroupName, localRepoPath, diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java index 744e880e450..e7960adb10a 100644 --- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java @@ -10,8 +10,8 @@ public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess { public ClusterInterpreterProcess( String intpRunner, - int zeppelinServerRPCPort, - String zeppelinServerRPCHost, + int intpEventServerPort, + String intpEventServerHost, String interpreterPortRange, String intpDir, String localRepoDir, @@ -22,8 +22,8 @@ public ClusterInterpreterProcess( boolean isUserImpersonated) { super(intpRunner, - zeppelinServerRPCPort, - zeppelinServerRPCHost, + intpEventServerPort, + intpEventServerHost, interpreterPortRange, intpDir, localRepoDir, diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java index e6c9ae1ecf4..ded68855e43 100644 --- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java @@ -38,7 +38,7 @@ public DockerInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage re } @Override - public InterpreterClient launch(InterpreterLaunchContext context) throws IOException { + public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException { LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup()); this.context = context; this.properties = context.getProperties(); @@ -71,8 +71,8 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep context.getInterpreterSettingName(), properties, env, - context.getZeppelinServerHost(), - Integer.toString(context.getZeppelinServerRPCPort()), + context.getIntpEventServerHost(), + context.getIntpEventServerPort(), connectTimeout); } diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java index e096f4c031b..7e1bbb54fcc 100644 --- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java +++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java @@ -77,8 +77,6 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { private final String containerImage; private final Properties properties; private final Map envs; - private final String zeppelinServiceHost; - private final String zeppelinServiceRpcPort; private AtomicBoolean dockerStarted = new AtomicBoolean(false); @@ -117,11 +115,11 @@ public DockerInterpreterProcess( String interpreterSettingName, Properties properties, Map envs, - String zeppelinServiceHost, - String zeppelinServiceRpcPort, + String intpEventServerHost, + int intpEventServerPort, int connectTimeout ) { - super(connectTimeout); + super(connectTimeout, intpEventServerHost, intpEventServerPort); this.containerImage = containerImage; this.interpreterGroupId = interpreterGroupId; @@ -129,8 +127,6 @@ public DockerInterpreterProcess( this.interpreterSettingName = interpreterSettingName; this.properties = properties; this.envs = new HashMap(envs); - this.zeppelinServiceHost = zeppelinServiceHost; - this.zeppelinServiceRpcPort = zeppelinServiceRpcPort; this.zconf = zconf; this.containerName = interpreterGroupId.toLowerCase(); @@ -212,7 +208,7 @@ public void start(String userName) throws IOException { // Create container with exposed ports final ContainerConfig containerConfig = ContainerConfig.builder() .hostConfig(hostConfig) - .hostname(this.zeppelinServiceHost) + .hostname(this.intpEventServerHost) .image(containerImage) .workingDir("/") .env(listEnv) @@ -305,8 +301,8 @@ Properties getTemplateBindings() throws IOException { dockerProperties.put("zeppelin.interpreter.localRepo", "/tmp/local-repo"); dockerProperties.put("zeppelin.interpreter.rpc.portRange", dockerIntpServicePort + ":" + dockerIntpServicePort); - dockerProperties.put("zeppelin.server.rpc.host", zeppelinServiceHost); - dockerProperties.put("zeppelin.server.rpc.portRange", zeppelinServiceRpcPort); + dockerProperties.put("zeppelin.server.rpc.host", intpEventServerHost); + dockerProperties.put("zeppelin.server.rpc.portRange", intpEventServerPort); // interpreter properties overrides the values dockerProperties.putAll(Maps.fromProperties(properties)); diff --git a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java index a7003333b94..a6863c86a9b 100644 --- a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java +++ b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java @@ -90,7 +90,7 @@ public void testEnv() throws IOException { properties, envs, "zeppelin.server.hostname", - "12320", + 12320, 5000); assertEquals(intp.CONTAINER_SPARK_HOME, "my-spark-home"); @@ -116,7 +116,7 @@ public void testTemplateBindings() throws IOException { properties, envs, "zeppelin.server.hostname", - "12320", + 12320, 5000); Properties dockerProperties = intp.getTemplateBindings(); diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java index 120500b4213..f07389ea5f2 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java @@ -30,8 +30,6 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { private final String containerImage; private final Properties properties; private final Map envs; - private final String zeppelinService; - private final String zeppelinServiceRpcPort; private final Gson gson = new Gson(); private final String podName; @@ -60,14 +58,14 @@ public K8sRemoteInterpreterProcess( String interpreterSettingName, Properties properties, Map envs, - String zeppelinService, - String zeppelinServiceRpcPort, + String intpEventServerHost, + int intpEventServerPort, boolean portForward, String sparkImage, int connectTimeout, boolean isUserImpersonatedForSpark ) { - super(connectTimeout); + super(connectTimeout, intpEventServerHost, intpEventServerPort); this.kubectl = kubectl; this.specTempaltes = specTemplates; this.containerImage = containerImage; @@ -76,8 +74,6 @@ public K8sRemoteInterpreterProcess( this.interpreterSettingName = interpreterSettingName; this.properties = properties; this.envs = new HashMap<>(envs); - this.zeppelinService = zeppelinService; - this.zeppelinServiceRpcPort = zeppelinServiceRpcPort; this.portForward = portForward; this.sparkImage = sparkImage; this.podName = interpreterGroupName.toLowerCase() + "-" + getRandomString(6); @@ -276,8 +272,8 @@ Properties getTemplateBindings() { k8sProperties.put("zeppelin.k8s.interpreter.setting.name", interpreterSettingName); k8sProperties.put("zeppelin.k8s.interpreter.localRepo", "/tmp/local-repo"); k8sProperties.put("zeppelin.k8s.interpreter.rpc.portRange", String.format("%d:%d", getPort(), getPort())); - k8sProperties.put("zeppelin.k8s.server.rpc.service", zeppelinService); - k8sProperties.put("zeppelin.k8s.server.rpc.portRange", zeppelinServiceRpcPort); + k8sProperties.put("zeppelin.k8s.server.rpc.service", intpEventServerHost); + k8sProperties.put("zeppelin.k8s.server.rpc.portRange", intpEventServerPort); if (ownerUID() != null && ownerName() != null) { k8sProperties.put("zeppelin.k8s.server.uid", ownerUID()); k8sProperties.put("zeppelin.k8s.server.pod.name", ownerName()); diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java index d4b03da5570..d1df0f3c7cf 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java @@ -108,7 +108,7 @@ private String getZeppelinService() throws IOException { zConf.getK8sServiceName(), getNamespace()); } else { - return context.getZeppelinServerHost(); + return context.getIntpEventServerHost(); } } @@ -116,13 +116,13 @@ private String getZeppelinService() throws IOException { * get Zeppelin server rpc port * Read env variable "_SERVICE_PORT_RPC" */ - private String getZeppelinServiceRpcPort() { + private int getZeppelinServiceRpcPort() { String envServicePort = System.getenv( String.format("%s_SERVICE_PORT_RPC", getHostname().replaceAll("[-.]", "_").toUpperCase())); if (envServicePort != null) { - return envServicePort; + return Integer.parseInt(envServicePort); } else { - return Integer.toString(context.getZeppelinServerRPCPort()); + return context.getIntpEventServerPort(); } } @@ -139,7 +139,7 @@ private boolean isUserImpersonateForSparkInterpreter(InterpreterLaunchContext co } @Override - public InterpreterClient launch(InterpreterLaunchContext context) throws IOException { + public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException { LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup()); this.context = context; this.properties = context.getProperties(); diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java index 7e1e6685ff0..3718fa03b4a 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java @@ -51,7 +51,7 @@ public void testGetHostPort() { properties, envs, "zeppelin.server.hostname", - "12320", + 12320, false, "spark-container:1.0", 10, @@ -81,7 +81,7 @@ public void testPredefinedPortNumbers() { properties, envs, "zeppelin.server.hostname", - "12320", + 12320, false, "spark-container:1.0", 10, @@ -116,7 +116,7 @@ public void testGetTemplateBindings() throws IOException { properties, envs, "zeppelin.server.service", - "12320", + 12320, false, "spark-container:1.0", 10, @@ -136,7 +136,7 @@ public void testGetTemplateBindings() throws IOException { assertEquals(true , p.containsKey("zeppelin.k8s.interpreter.localRepo")); assertEquals("12321:12321" , p.get("zeppelin.k8s.interpreter.rpc.portRange")); assertEquals("zeppelin.server.service" , p.get("zeppelin.k8s.server.rpc.service")); - assertEquals("12320" , p.get("zeppelin.k8s.server.rpc.portRange")); + assertEquals(12320 , p.get("zeppelin.k8s.server.rpc.portRange")); assertEquals("v1", p.get("my.key1")); assertEquals("V1", envs.get("MY_ENV1")); @@ -169,7 +169,7 @@ public void testGetTemplateBindingsForSpark() throws IOException { properties, envs, "zeppelin.server.service", - "12320", + 12320, false, "spark-container:1.0", 10, @@ -222,7 +222,7 @@ public void testGetTemplateBindingsForSparkWithProxyUser() throws IOException { properties, envs, "zeppelin.server.service", - "12320", + 12320, false, "spark-container:1.0", 10, @@ -274,7 +274,7 @@ public void testGetTemplateBindingsForSparkWithProxyUserAnonymous() throws IOExc properties, envs, "zeppelin.server.service", - "12320", + 12320, false, "spark-container:1.0", 10, @@ -315,7 +315,7 @@ public void testSparkUiWebUrlTemplate() { properties, envs, "zeppelin.server.service", - "12320", + 12320, false, "spark-container:1.0", 10, @@ -360,7 +360,7 @@ public void testSparkPodResources() { properties, envs, "zeppelin.server.service", - "12320", + 12320, false, "spark-container:1.0", 10, @@ -397,7 +397,7 @@ public void testSparkPodResourcesMemoryOverhead() { properties, envs, "zeppelin.server.service", - "12320", + 12320, false, "spark-container:1.0", 10, diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index e04ebf21fc3..2637e97ef1a 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -1894,12 +1894,14 @@ public void onStatusChange(Paragraph p, Status before, Status after) { } } - if (p.isTerminated()) { + if (p.isTerminated() || after == Status.RUNNING) { if (p.getStatus() == Status.FINISHED) { LOG.info("Job {} is finished successfully, status: {}", p.getId(), p.getStatus()); - } else { + } else if (p.isTerminated()) { LOG.warn("Job {} is finished, status: {}, exception: {}, result: {}", p.getId(), p.getStatus(), p.getException(), p.getReturn()); + } else { + LOG.info("Job {} starts to RUNNING", p.getId()); } try { @@ -1928,6 +1930,7 @@ public void checkpointOutput(String noteId, String paragraphId) { try { Note note = getNotebook().getNote(noteId); note.getParagraph(paragraphId).checkpointOutput(); + getNotebook().saveNote(note, AuthenticationInfo.ANONYMOUS); } catch (IOException e) { LOG.warn("Fail to save note: " + noteId , e); } @@ -2076,6 +2079,7 @@ public void onParaInfosReceived(String noteId, String paragraphId, paragraph .updateRuntimeInfos(label, tooltip, metaInfos, setting.getGroup(), setting.getId()); + getNotebook().saveNote(note, AuthenticationInfo.ANONYMOUS); getConnectionManager().broadcast( note.getId(), new Message(OP.PARAS_INFO).put("id", paragraphId).put("infos", diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java index e51eac31c06..8d9a686c31a 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java @@ -16,17 +16,14 @@ */ package org.apache.zeppelin.recovery; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - import com.google.common.io.Files; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import java.io.File; -import java.util.Map; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.io.FileUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; import org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage; import org.apache.zeppelin.interpreter.recovery.StopInterpreter; @@ -38,12 +35,19 @@ import org.apache.zeppelin.server.ZeppelinServer; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.utils.TestUtils; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + public class RecoveryTest extends AbstractTestRestApi { + private Gson gson = new Gson(); private static File recoveryDir = null; @@ -51,31 +55,29 @@ public class RecoveryTest extends AbstractTestRestApi { private AuthenticationInfo anonymous = new AuthenticationInfo("anonymous"); - @BeforeClass - public static void init() throws Exception { + @Before + public void init() throws Exception { System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(), - FileSystemRecoveryStorage.class.getName()); + FileSystemRecoveryStorage.class.getName()); recoveryDir = Files.createTempDir(); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_DIR.getVarName(), recoveryDir.getAbsolutePath()); startUp(RecoveryTest.class.getSimpleName()); + + notebook = ZeppelinServer.sharedServiceLocator.getService(Notebook.class); } - @AfterClass - public static void destroy() throws Exception { - shutDown(); + @After + public void destroy() throws Exception { + shutDown(true, true); FileUtils.deleteDirectory(recoveryDir); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(), ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getStringValue()); } - @Before - public void setUp() { - notebook = ZeppelinServer.sharedServiceLocator.getService(Notebook.class); - } - @Test public void testRecovery() throws Exception { + LOG.info("Test testRecovery"); Note note1 = null; try { note1 = notebook.createNote("note1", anonymous); @@ -105,6 +107,9 @@ public void testRecovery() throws Exception { post.releaseConnection(); assertEquals(Job.Status.FINISHED, p1.getStatus()); assertEquals("abc\n", p1.getReturn().message().get(0).getData()); + } catch (Exception e) { + LOG.error(e.toString(), e); + throw e; } finally { if (null != note1) { TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); @@ -114,6 +119,7 @@ public void testRecovery() throws Exception { @Test public void testRecovery_2() throws Exception { + LOG.info("Test testRecovery_2"); Note note1 = null; try { note1 = notebook.createNote("note2", AuthenticationInfo.ANONYMOUS); @@ -148,6 +154,9 @@ public void testRecovery_2() throws Exception { assertEquals(resp.get("status"), "OK"); post.releaseConnection(); assertEquals(Job.Status.ERROR, p1.getStatus()); + } catch (Exception e) { + LOG.error(e.toString(), e); + throw e; } finally { if (null != note1) { TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); @@ -157,6 +166,7 @@ public void testRecovery_2() throws Exception { @Test public void testRecovery_3() throws Exception { + LOG.info("Test testRecovery_3"); Note note1 = null; try { note1 = TestUtils.getInstance(Notebook.class).createNote("note3", AuthenticationInfo.ANONYMOUS); @@ -188,6 +198,113 @@ public void testRecovery_3() throws Exception { assertEquals(resp.get("status"), "OK"); post.releaseConnection(); assertEquals(Job.Status.ERROR, p1.getStatus()); + } catch (Exception e ) { + LOG.error(e.toString(), e); + throw e; + } finally { + if (null != note1) { + TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + } + } + } + + @Test + public void testRecovery_Running_Paragraph_sh() throws Exception { + LOG.info("Test testRecovery_Running_Paragraph_sh"); + Note note1 = null; + try { + note1 = TestUtils.getInstance(Notebook.class).createNote("note4", AuthenticationInfo.ANONYMOUS); + + // run sh paragraph async, print 'hello' after 10 seconds + Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); + p1.setText("%sh sleep 10\necho 'hello'"); + PostMethod post = httpPost("/notebook/job/" + note1.getId() + "/" + p1.getId(), ""); + assertThat(post, isAllowed()); + post.releaseConnection(); + long start = System.currentTimeMillis(); + // wait until paragraph is RUNNING + while((System.currentTimeMillis() - start) < 10 * 1000) { + if (p1.getStatus() == Job.Status.RUNNING) { + break; + } + Thread.sleep(1000); + } + if (p1.getStatus() != Job.Status.RUNNING) { + fail("Fail to run paragraph: " + p1.getReturn()); + } + + // shutdown zeppelin and restart it + shutDown(); + startUp(RecoveryTest.class.getSimpleName(), false); + + // wait until paragraph is finished + start = System.currentTimeMillis(); + while((System.currentTimeMillis() - start) < 10 * 1000) { + if (p1.isTerminated()) { + break; + } + Thread.sleep(1000); + } + + assertEquals(Job.Status.FINISHED, p1.getStatus()); + assertEquals("hello\n", p1.getReturn().message().get(0).getData()); + } catch (Exception e ) { + LOG.error(e.toString(), e); + throw e; + } finally { + if (null != note1) { + TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + } + } + } + + @Test + public void testRecovery_Finished_Paragraph_python() throws Exception { + LOG.info("Test testRecovery_Finished_Paragraph_python"); + Note note1 = null; + try { + InterpreterSettingManager interpreterSettingManager = TestUtils.getInstance(InterpreterSettingManager.class); + InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("python"); + interpreterSetting.setProperty("zeppelin.python.useIPython", "false"); + interpreterSetting.setProperty("zeppelin.interpreter.result.cache", "100"); + + note1 = TestUtils.getInstance(Notebook.class).createNote("note4", AuthenticationInfo.ANONYMOUS); + + // run sh paragraph async, print 'hello' after 10 seconds + Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); + p1.setText("%python import time\n" + + "for i in range(1, 10):\n" + + " time.sleep(1)\n" + + " print(i)"); + PostMethod post = httpPost("/notebook/job/" + note1.getId() + "/" + p1.getId(), ""); + assertThat(post, isAllowed()); + post.releaseConnection(); + + // wait until paragraph is running + while(p1.getStatus() != Job.Status.RUNNING) { + Thread.sleep(1000); + } + + // shutdown zeppelin and restart it + shutDown(); + // sleep 15 seconds to make sure the paragraph is finished + Thread.sleep(10 * 1500); + + startUp(RecoveryTest.class.getSimpleName(), false); + + assertEquals(Job.Status.FINISHED, p1.getStatus()); + assertEquals("1\n" + + "2\n" + + "3\n" + + "4\n" + + "5\n" + + "6\n" + + "7\n" + + "8\n" + + "9\n", p1.getReturn().message().get(0).getData()); + } catch (Exception e ) { + LOG.error(e.toString(), e); + throw e; } finally { if (null != note1) { TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 2be4d54f4eb..6552d0fcbe8 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -289,12 +289,17 @@ protected static void shutDown() throws Exception { } protected static void shutDown(final boolean deleteConfDir) throws Exception { + shutDown(deleteConfDir, false); + } + + protected static void shutDown(final boolean deleteConfDir, + boolean forceShutdownInterpreter) throws Exception { if (!WAS_RUNNING && TestUtils.getInstance(Notebook.class) != null) { // restart interpreter to stop all interpreter processes List settingList = TestUtils.getInstance(Notebook.class).getInterpreterSettingManager() .get(); - if (!TestUtils.getInstance(Notebook.class).getConf().isRecoveryEnabled()) { + if (!TestUtils.getInstance(Notebook.class).getConf().isRecoveryEnabled() || forceShutdownInterpreter) { for (InterpreterSetting setting : settingList) { TestUtils.getInstance(Notebook.class).getInterpreterSettingManager().restart(setting.getId()); } @@ -336,7 +341,6 @@ protected static void shutDown(final boolean deleteConfDir) throws Exception { TestUtils.clearInstances(); ZeppelinServer.reset(); } - } protected static boolean checkIfServerIsRunning() { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java index 9626181f41d..5338bfce886 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -177,12 +177,15 @@ public InterpreterSettingManager(ZeppelinConfiguration conf, this.angularObjectRegistryListener = angularObjectRegistryListener; this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; this.appEventListener = appEventListener; + + this.interpreterEventServer = new RemoteInterpreterEventServer(conf, this); + this.interpreterEventServer.start(); + this.recoveryStorage = ReflectionUtils.createClazzInstance( conf.getRecoveryStorageClass(), new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class}, new Object[] {conf, this}); - this.recoveryStorage.init(); LOGGER.info("Using RecoveryStorage: " + this.recoveryStorage.getClass().getName()); this.lifecycleManager = ReflectionUtils.createClazzInstance( @@ -192,11 +195,13 @@ public InterpreterSettingManager(ZeppelinConfiguration conf, LOGGER.info("Using LifecycleManager: " + this.lifecycleManager.getClass().getName()); this.configStorage = configStorage; - this.interpreterEventServer = new RemoteInterpreterEventServer(conf, this); - this.interpreterEventServer.start(); init(); } + public RemoteInterpreterEventServer getInterpreterEventServer() { + return interpreterEventServer; + } + public void refreshInterpreterTemplates() { Set installedInterpreters = Sets.newHashSet(interpreterSettingTemplates.keySet()); @@ -322,6 +327,9 @@ private void init() throws IOException { loadInterpreterSettingFromDefaultDir(true); loadFromFile(); saveToFile(); + + // must init Recovery after init of InterpreterSettingManagaer + recoveryStorage.init(); } private void loadJupyterKernelLanguageMap() throws IOException { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java index b387475e798..a27677fa82c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java @@ -40,6 +40,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup { private InterpreterSetting interpreterSetting; private RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process + private Object interpreterProcessCreationLock = new Object(); /** * Create InterpreterGroup with given id and interpreterSetting, used in ZeppelinServer @@ -55,19 +56,21 @@ public InterpreterSetting getInterpreterSetting() { return interpreterSetting; } - public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess(String userName, - Properties properties) + public RemoteInterpreterProcess getOrCreateInterpreterProcess(String userName, + Properties properties) throws IOException { - if (remoteInterpreterProcess == null) { - LOGGER.info("Create InterpreterProcess for InterpreterGroup: " + getId()); - remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(id, userName, - properties); - remoteInterpreterProcess.start(userName); - interpreterSetting.getLifecycleManager().onInterpreterProcessStarted(this); - getInterpreterSetting().getRecoveryStorage() - .onInterpreterClientStart(remoteInterpreterProcess); + synchronized (interpreterProcessCreationLock) { + if (remoteInterpreterProcess == null) { + LOGGER.info("Create InterpreterProcess for InterpreterGroup: " + getId()); + remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(id, userName, + properties); + remoteInterpreterProcess.start(userName); + interpreterSetting.getLifecycleManager().onInterpreterProcessStarted(this); + getInterpreterSetting().getRecoveryStorage() + .onInterpreterClientStart(remoteInterpreterProcess); + } + return remoteInterpreterProcess; } - return remoteInterpreterProcess; } public RemoteInterpreterProcess getInterpreterProcess() { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java index 11ba6f91a2e..877d045712a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java @@ -166,7 +166,8 @@ public void registerInterpreterProcess(RegisterInfo registerInfo) throws TExcept LOGGER.warn("Interpreter process does not existed yet for InterpreterGroup: " + registerInfo.getInterpreterGroupId()); } - + LOGGER.info("Register interpreter process: {}:{}, {}", + registerInfo.getHost(), registerInfo.getPort(), registerInfo.getInterpreterGroupId()); interpreterProcess.processStarted(registerInfo.port, registerInfo.host); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 617587af82c..51776bab4cb 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.util.HashMap; import java.util.Map; import java.util.Properties; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java index ff60b39a0dc..bc5034a5fa7 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashMap; import java.util.Map; /** @@ -46,8 +45,8 @@ public StandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage } @Override - public InterpreterClient launch(InterpreterLaunchContext context) throws IOException { - LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup()); + public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException { + LOGGER.info("Launching new interpreter process of " + context.getInterpreterSettingGroup()); this.properties = context.getProperties(); InterpreterOption option = context.getOption(); InterpreterRunner runner = context.getRunner(); @@ -60,31 +59,18 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep context.getInterpreterSettingName(), context.getInterpreterGroupId(), connectTimeout, + context.getIntpEventServerHost(), + context.getIntpEventServerPort(), option.getHost(), - option.getPort()); + option.getPort(), + false); } else { - // try to recover it first - if (zConf.isRecoveryEnabled()) { - InterpreterClient recoveredClient = - recoveryStorage.getInterpreterClient(context.getInterpreterGroupId()); - if (recoveredClient != null) { - if (recoveredClient.isRunning()) { - LOGGER.info("Recover interpreter process: " + recoveredClient.getHost() + ":" + - recoveredClient.getPort()); - return recoveredClient; - } else { - LOGGER.warn("Cannot recover interpreter process: " + recoveredClient.getHost() + ":" - + recoveredClient.getPort() + ", as it is already terminated."); - } - } - } - // create new remote process String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/" + context.getInterpreterSettingId(); return new RemoteInterpreterManagedProcess( runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(), - context.getZeppelinServerRPCPort(), context.getZeppelinServerHost(), zConf.getInterpreterPortRange(), + context.getIntpEventServerPort(), context.getIntpEventServerHost(), zConf.getInterpreterPortRange(), zConf.getInterpreterDir() + "/" + groupName, localRepoPath, buildEnvFromProperties(context), connectTimeout, name, context.getInterpreterGroupId(), option.isUserImpersonate()); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java index 1b660ac24aa..46ac23ab6ae 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java @@ -39,6 +39,8 @@ /** * Hadoop compatible FileSystem based RecoveryStorage implementation. + * All the running interpreter process info will be save into files on hdfs. + * Each interpreter setting will have one file. * * Save InterpreterProcess in the format of: * InterpreterGroupId host:port @@ -47,16 +49,15 @@ public class FileSystemRecoveryStorage extends RecoveryStorage { private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemRecoveryStorage.class); - private InterpreterSettingManager interpreterSettingManager; private FileSystemStorage fs; private Path recoveryDir; + private InterpreterSettingManager interpreterSettingManager; public FileSystemRecoveryStorage(ZeppelinConfiguration zConf, InterpreterSettingManager interpreterSettingManager) throws IOException { super(zConf); this.interpreterSettingManager = interpreterSettingManager; - this.zConf = zConf; this.fs = new FileSystemStorage(zConf, zConf.getRecoveryDir()); LOGGER.info("Creating FileSystem: " + this.fs.getFs().getClass().getName() + " for Zeppelin Recovery."); @@ -79,17 +80,19 @@ private void save(String interpreterSettingName) throws IOException { InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName(interpreterSettingName); List recoveryContent = new ArrayList<>(); - for (ManagedInterpreterGroup interpreterGroup : interpreterSetting.getAllInterpreterGroups()) { - RemoteInterpreterProcess interpreterProcess = interpreterGroup.getInterpreterProcess(); - if (interpreterProcess != null) { - recoveryContent.add(interpreterGroup.getId() + "\t" + interpreterProcess.getHost() + ":" + - interpreterProcess.getPort()); + if (interpreterSetting != null) { + for (ManagedInterpreterGroup interpreterGroup : interpreterSetting.getAllInterpreterGroups()) { + RemoteInterpreterProcess interpreterProcess = interpreterGroup.getInterpreterProcess(); + if (interpreterProcess != null && interpreterProcess.isRunning()) { + recoveryContent.add(interpreterGroup.getId() + "\t" + interpreterProcess.getHost() + ":" + + interpreterProcess.getPort()); + } } } - LOGGER.debug("Updating recovery data for interpreterSetting: " + interpreterSettingName); - LOGGER.debug("Recovery Data: " + StringUtils.join(recoveryContent, System.lineSeparator())); + String recoveryContentStr = StringUtils.join(recoveryContent, System.lineSeparator()); + LOGGER.debug("Updating recovery data of {}: {}", interpreterSettingName, recoveryContentStr); Path recoveryFile = new Path(recoveryDir, interpreterSettingName + ".recovery"); - fs.writeFile(StringUtils.join(recoveryContent, System.lineSeparator()), recoveryFile, true); + fs.writeFile(recoveryContentStr, recoveryFile, true); } @Override @@ -105,16 +108,18 @@ public Map restore() throws IOException { if (!StringUtils.isBlank(recoveryContent)) { for (String line : recoveryContent.split(System.lineSeparator())) { String[] tokens = line.split("\t"); - String groupId = tokens[0]; + String interpreterGroupId = tokens[0]; String[] hostPort = tokens[1].split(":"); int connectTimeout = - zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); + zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess( - interpreterSettingName, groupId, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1])); - // interpreterSettingManager may be null when this class is used when it is used - // stop-interpreter.sh - clients.put(groupId, client); - LOGGER.info("Recovering Interpreter Process: " + hostPort[0] + ":" + hostPort[1]); + interpreterSettingName, interpreterGroupId, connectTimeout, + interpreterSettingManager.getInterpreterEventServer().getHost(), + interpreterSettingManager.getInterpreterEventServer().getPort(), + hostPort[0], Integer.parseInt(hostPort[1]), true); + clients.put(interpreterGroupId, client); + LOGGER.info("Recovering Interpreter Process: " + interpreterGroupId + ", " + + hostPort[0] + ":" + hostPort[1]); } } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java index 3a7d12c70f3..ec6a3b058df 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java @@ -22,6 +22,7 @@ import org.apache.zeppelin.interpreter.launcher.InterpreterClient; import java.io.IOException; +import java.util.HashMap; import java.util.Map; @@ -49,6 +50,6 @@ public void onInterpreterClientStop(InterpreterClient client) throws IOException @Override public Map restore() throws IOException { - return null; + return new HashMap<>(); } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java index d74b1621e7e..2eb0b04b50a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java @@ -22,11 +22,12 @@ public class StopInterpreter { public static void main(String[] args) throws IOException { ZeppelinConfiguration zConf = ZeppelinConfiguration.create(); - RecoveryStorage recoveryStorage = null; + InterpreterSettingManager interpreterSettingManager = + new InterpreterSettingManager(zConf, null, null, null); - recoveryStorage = ReflectionUtils.createClazzInstance(zConf.getRecoveryStorageClass(), + RecoveryStorage recoveryStorage = ReflectionUtils.createClazzInstance(zConf.getRecoveryStorageClass(), new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class}, - new Object[] {zConf, null}); + new Object[] {zConf, interpreterSettingManager}); LOGGER.info("Using RecoveryStorage: " + recoveryStorage.getClass().getName()); Map restoredClients = recoveryStorage.restore(); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index c3678c06bfa..79130a4a27d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -42,8 +42,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess { Pattern.compile("Submitted application (\\w+)"); private final String interpreterRunner; - private final int zeppelinServerRPCPort; - private final String zeppelinServerRPCHost; private final String interpreterPortRange; private InterpreterProcessLauncher interpreterProcessLauncher; private String host = null; @@ -59,8 +57,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess { public RemoteInterpreterManagedProcess( String intpRunner, - int zeppelinServerRPCPort, - String zeppelinServerRPCHost, + int intpEventServerPort, + String intpEventServerHost, String interpreterPortRange, String intpDir, String localRepoDir, @@ -69,10 +67,8 @@ public RemoteInterpreterManagedProcess( String interpreterSettingName, String interpreterGroupId, boolean isUserImpersonated) { - super(connectTimeout); + super(connectTimeout, intpEventServerHost, intpEventServerPort); this.interpreterRunner = intpRunner; - this.zeppelinServerRPCPort = zeppelinServerRPCPort; - this.zeppelinServerRPCHost = zeppelinServerRPCHost; this.interpreterPortRange = interpreterPortRange; this.env = env; this.interpreterDir = intpDir; @@ -99,9 +95,9 @@ public void start(String userName) throws IOException { cmdLine.addArgument("-d", false); cmdLine.addArgument(interpreterDir, false); cmdLine.addArgument("-c", false); - cmdLine.addArgument(zeppelinServerRPCHost, false); + cmdLine.addArgument(intpEventServerHost, false); cmdLine.addArgument("-p", false); - cmdLine.addArgument(String.valueOf(zeppelinServerRPCPort), false); + cmdLine.addArgument(String.valueOf(intpEventServerPort), false); cmdLine.addArgument("-r", false); cmdLine.addArgument(interpreterPortRange, false); cmdLine.addArgument("-i", false); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 5d10df15d3c..e3a81acea3d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -23,6 +23,8 @@ import org.apache.thrift.transport.TTransportException; import org.apache.zeppelin.interpreter.launcher.InterpreterClient; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -30,13 +32,20 @@ * Abstract class for interpreter process */ public abstract class RemoteInterpreterProcess implements InterpreterClient { + private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterProcess.class); private static final Gson GSON = new Gson(); private int connectTimeout; + protected String intpEventServerHost; + protected int intpEventServerPort; private PooledRemoteClient remoteClient; - public RemoteInterpreterProcess(int connectTimeout) { + public RemoteInterpreterProcess(int connectTimeout, + String intpEventServerHost, + int intpEventServerPort) { this.connectTimeout = connectTimeout; + this.intpEventServerHost = intpEventServerHost; + this.intpEventServerPort = intpEventServerPort; this.remoteClient = new PooledRemoteClient(() -> { TSocket transport = new TSocket(getHost(), getPort()); try { @@ -54,7 +63,6 @@ public int getConnectTimeout() { } public void shutdown() { - // Close client socket connection if (remoteClient != null) { remoteClient.shutdown(); } @@ -66,7 +74,10 @@ public void shutdown() { * @param name * @param o */ - public void updateRemoteAngularObject(String name, String noteId, String paragraphId, Object o) { + public void updateRemoteAngularObject(String name, + String noteId, + String paragraphId, + Object o) { remoteClient.callRemoteFunction((PooledRemoteClient.RemoteFunction) client -> { client.angularObjectUpdate(name, noteId, paragraphId, GSON.toJson(o)); return null; @@ -77,6 +88,21 @@ public R callRemoteFunction(PooledRemoteClient.RemoteFunction fun return remoteClient.callRemoteFunction(func); } + @Override + public boolean recover() { + try { + remoteClient.callRemoteFunction(client -> { + client.reconnect(intpEventServerHost, intpEventServerPort); + return null; + }); + return true; + } catch (Exception e) { + LOGGER.error("Fail to recover remote interpreter process: {}" , e.getMessage()); + return false; + } + } + + /** * called by RemoteInterpreterEventServer to notify that RemoteInterpreter Process is started */ diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java index a33520aa328..053a72ce423 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java @@ -30,19 +30,23 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { private final int port; private final String interpreterSettingName; private final String interpreterGroupId; + private final boolean isRecovery; public RemoteInterpreterRunningProcess( String interpreterSettingName, String interpreterGroupId, int connectTimeout, + String intpEventServerHost, + int intpEventServerPort, String host, - int port - ) { - super(connectTimeout); + int port, + boolean isRecovery) { + super(connectTimeout, intpEventServerHost, intpEventServerPort); this.interpreterSettingName = interpreterSettingName; this.interpreterGroupId = interpreterGroupId; this.host = host; this.port = port; + this.isRecovery = isRecovery; } @Override @@ -74,7 +78,7 @@ public void start(String userName) { public void stop() { // assume process is externally managed. nothing to do. But will kill it // when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that. - if (System.getenv("ZEPPELIN_FORCE_STOP") != null) { + if (System.getenv("ZEPPELIN_FORCE_STOP") != null || isRecovery) { if (isRunning()) { LOGGER.info("Kill interpreter process of interpreter group: {}", interpreterGroupId); try { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 55cb4f429a4..9fb8ba8502c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -73,7 +73,7 @@ public class Note implements JsonSerializable { private static final ExclusionStrategy strategy = new ExclusionStrategy() { @Override public boolean shouldSkipField(FieldAttributes f) { - return f.getName().equals("runtimeInfos") || f.getName().equals("path"); + return f.getName().equals("path"); } @Override @@ -1065,11 +1065,11 @@ public static Note fromJson(String json) throws IOException { public void postProcessParagraphs() { for (Paragraph p : paragraphs) { - p.cleanRuntimeInfos(); - p.cleanOutputBuffer(); p.parseText(); + p.setNote(this); + p.setAuthenticationInfo(AuthenticationInfo.ANONYMOUS); - if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) { + if (p.getStatus() == Status.PENDING) { p.setStatus(Status.ABORT); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 387a56e09a7..be3a5bbd37c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -48,6 +48,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoSync; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision; +import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.search.SearchService; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.user.Credentials; @@ -109,6 +110,21 @@ public Notebook( } } + private void recoverRunningParagraphs() { + + Thread thread = new Thread(() -> { + for (Note note : getAllNotes()) { + for (Paragraph paragraph : note.getParagraphs()) { + if (paragraph.getStatus() == Job.Status.RUNNING) { + paragraph.recover(); + } + } + } + }); + thread.setName("Recovering-Thread"); + thread.start(); + } + @Inject public Notebook( ZeppelinConfiguration conf, @@ -134,6 +150,8 @@ public Notebook( this.noteEventListeners.add(noteEventListener); } this.paragraphJobListener = (ParagraphJobListener) noteEventListener; + + recoverRunningParagraphs(); } public NoteManager getNoteManager() { @@ -545,6 +563,17 @@ public List getNotesInfo() { public List getAllNotes() { List noteList = noteManager.getAllNotes(); + for (Note note : noteList) { + note.setInterpreterFactory(replFactory); + note.setInterpreterSettingManager(interpreterSettingManager); + note.setParagraphJobListener(paragraphJobListener); + note.setNoteEventListeners(noteEventListeners); + note.setCredentials(credentials); + for (Paragraph p : note.getParagraphs()) { + p.setNote(note); + p.setListener(paragraphJobListener); + } + } Collections.sort(noteList, Comparator.comparing(Note::getPath)); return noteList; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 0086a47b335..3d70eb6352b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -54,6 +54,7 @@ import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.scheduler.Job; @@ -101,12 +102,11 @@ public class Paragraph extends JobWithProgressPoller implemen // personalized private transient Map userParagraphMap = new HashMap<>(); private transient Map localProperties = new HashMap<>(); - // serialize runtimeInfos to frontend but not to note file (via gson's ExclusionStrategy) + private Map runtimeInfos = new HashMap<>(); private transient List outputBuffer = new ArrayList<>(); - @VisibleForTesting Paragraph() { super(generateId(), null); @@ -405,7 +405,9 @@ public boolean execute(boolean blocking) { @Override protected InterpreterResult jobRun() throws Throwable { try { - this.runtimeInfos.clear(); + if (localProperties.getOrDefault("isRecover", "false").equals("false")) { + this.runtimeInfos.clear(); + } this.interpreter = getBindedInterpreter(); if (this.interpreter == null) { LOGGER.error("Can not find interpreter name " + intpText); @@ -505,6 +507,8 @@ protected InterpreterResult jobRun() throws Throwable { } } catch (Exception e) { return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + localProperties.remove("isRecover"); } } @@ -794,4 +798,51 @@ public void updateOutputBuffer(int index, InterpreterResult.Type type, String ou outputBuffer.size() + " output in outputBuffer"); } } + + public void recover() { + try { + LOGGER.info("Recovering paragraph: " + getId()); + + this.interpreter = getBindedInterpreter(); + InterpreterSetting interpreterSetting = ((ManagedInterpreterGroup) + interpreter.getInterpreterGroup()).getInterpreterSetting(); + Map config + = interpreterSetting.getConfig(interpreter.getClassName()); + mergeConfig(config); + + if (shouldSkipRunParagraph()) { + LOGGER.info("Skip to run blank paragraph. {}", getId()); + setStatus(Job.Status.FINISHED); + return ; + } + setStatus(Status.READY); + localProperties.put("isRecover", "true"); + for (List sessions : this.interpreter.getInterpreterGroup().values()) { + for (Interpreter intp : sessions) { + // exclude ConfInterpreter + if (intp instanceof RemoteInterpreter) { + ((RemoteInterpreter) intp).setOpened(true); + } + } + } + + if (getConfig().get("enabled") == null || (Boolean) getConfig().get("enabled")) { + setAuthenticationInfo(getAuthenticationInfo()); + interpreter.getScheduler().submit(this); + } + + } catch (InterpreterNotFoundException e) { + InterpreterResult intpResult = + new InterpreterResult(InterpreterResult.Code.ERROR, + String.format("Interpreter %s not found", this.intpText)); + setReturn(intpResult, e); + setStatus(Job.Status.ERROR); + } catch (Throwable e) { + InterpreterResult intpResult = + new InterpreterResult(InterpreterResult.Code.ERROR, + "Unexpected exception: " + ExceptionUtils.getStackTrace(e)); + setReturn(intpResult, e); + setStatus(Job.Status.ERROR); + } + } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java index d232c259be8..e73f79ef7b1 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -23,7 +23,6 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.thrift.ParagraphInfo; -import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.After; import org.junit.Before; import org.junit.Test; diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index c768e543131..84e7fbed5ab 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -27,7 +27,6 @@ import org.apache.zeppelin.interpreter.thrift.ParagraphInfo; import org.apache.zeppelin.resource.LocalResourcePool; import org.apache.zeppelin.scheduler.Job.Status; -import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.After; import org.junit.Before; import org.junit.Test;