diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 0ce7921de4a..6f927de934e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -53,7 +53,8 @@ public class RemoteInterpreter extends Interpreter { private String className; private String noteId; FormType formType; - boolean initialized; + boolean remoteCreated; + boolean remoteOpened; private Map env; private int connectTimeout; private int maxPoolSize; @@ -76,7 +77,8 @@ public RemoteInterpreter(Properties property, super(property); this.noteId = noteId; this.className = className; - initialized = false; + this.remoteCreated = false; + this.remoteOpened = false; this.interpreterRunner = interpreterRunner; this.interpreterPath = interpreterPath; this.localRepoPath = localRepoPath; @@ -104,7 +106,8 @@ public RemoteInterpreter( super(property); this.noteId = noteId; this.className = className; - initialized = false; + this.remoteCreated = false; + this.remoteOpened = false; this.host = host; this.port = port; this.connectTimeout = connectTimeout; @@ -197,8 +200,8 @@ public RemoteInterpreterProcess getInterpreterProcess() { } } - public synchronized void init() { - if (initialized == true) { + public synchronized void createRemote() { + if (remoteCreated == true) { return; } @@ -226,7 +229,6 @@ public synchronized void init() { } client.createInterpreter(groupId, noteId, getClassName(), (Map) property); - // Push angular object loaded from JSON file to remote interpreter if (!interpreterGroup.isAngularRegistryPushed()) { pushAngularObjectRegistryToRemote(client); @@ -241,31 +243,76 @@ public synchronized void init() { interpreterProcess.releaseClient(client, broken); } } - initialized = true; + remoteCreated = true; } + public void openRemote() { + if (remoteOpened == true) { + return; + } + + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + + final InterpreterGroup interpreterGroup = getInterpreterGroup(); + interpreterProcess.reference(interpreterGroup); + interpreterProcess.setMaxPoolSize( + Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize())); + String groupId = interpreterGroup.getId(); + synchronized (interpreterProcess) { + Client client = null; + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + throw new InterpreterException(e1); + } + + boolean broken = false; + try { + logger.info("Open remote interpreter {}", getClassName()); + client.open(noteId, getClassName()); + } catch (TException e) { + logger.error("Failed to open interpreter: {}", getClassName()); + throw new InterpreterException(e); + } finally { + // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken + interpreterProcess.releaseClient(client, broken); + } + } + remoteOpened = true; + } @Override public void open() { InterpreterGroup interpreterGroup = getInterpreterGroup(); + // It is a hack in RemoteInterpreter that client should create all the interpreters in the same + // session together then open them. This is because there may be some dependency between + // interpeters of the same session. e.g. PySparkInterpreter depends on SparkInterpeter. synchronized (interpreterGroup) { // initialize all interpreters in this interpreter group List interpreters = interpreterGroup.get(noteId); - for (Interpreter intp : new ArrayList<>(interpreters)) { + for (Interpreter intp : interpreters) { Interpreter p = intp; while (p instanceof WrappedInterpreter) { p = ((WrappedInterpreter) p).getInnerInterpreter(); } try { - ((RemoteInterpreter) p).init(); + ((RemoteInterpreter) p).createRemote(); } catch (InterpreterException e) { - logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup", + logger.error("Failed to create interpreter: {}. Remove it from interpreterGroup", p.getClassName()); interpreters.remove(p); } } + + for (Interpreter intp : interpreters) { + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + ((RemoteInterpreter) p).openRemote(); + } } } @@ -383,7 +430,7 @@ public void cancel(InterpreterContext context) { @Override public FormType getFormType() { - init(); + createRemote(); if (formType != null) { return formType; @@ -498,7 +545,7 @@ private InterpreterResult convert(RemoteInterpreterResult result) { /** * Push local angular object registry to * remote interpreter. This method should be - * call ONLY inside the init() method + * call ONLY inside the createRemote() method * @param client * @throws TException */ diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index cde6a7ba7ef..38c8e20a04e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -431,15 +431,7 @@ public void onPostExecute(String script) { protected Object jobRun() throws Throwable { try { InterpreterContext.set(context); - - // Open the interpreter instance prior to calling interpret(). - // This is necessary because the earliest we can register a hook - // is from within the open() method. LazyOpenInterpreter lazy = (LazyOpenInterpreter) interpreter; - if (!lazy.isOpen()) { - lazy.open(); - } - // Add hooks to script from registry. // Global scope first, followed by notebook scope processInterpreterHooks(null);