Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class RemoteInterpreter extends Interpreter {
private String className;
private String noteId;
FormType formType;
boolean initialized;
boolean remoteCreated;
boolean remoteOpened;
private Map<String, String> env;
private int connectTimeout;
private int maxPoolSize;
Expand All @@ -76,7 +77,8 @@ public RemoteInterpreter(Properties property,
super(property);
this.noteId = noteId;
this.className = className;
initialized = false;
this.remoteCreated = false;
this.remoteOpened = false;
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
this.localRepoPath = localRepoPath;
Expand Down Expand Up @@ -104,7 +106,8 @@ public RemoteInterpreter(
super(property);
this.noteId = noteId;
this.className = className;
initialized = false;
this.remoteCreated = false;
this.remoteOpened = false;
this.host = host;
this.port = port;
this.connectTimeout = connectTimeout;
Expand Down Expand Up @@ -197,8 +200,8 @@ public RemoteInterpreterProcess getInterpreterProcess() {
}
}

public synchronized void init() {
if (initialized == true) {
public synchronized void createRemote() {
if (remoteCreated == true) {
return;
}

Expand Down Expand Up @@ -226,7 +229,6 @@ public synchronized void init() {
}
client.createInterpreter(groupId, noteId,
getClassName(), (Map) property);

// Push angular object loaded from JSON file to remote interpreter
if (!interpreterGroup.isAngularRegistryPushed()) {
pushAngularObjectRegistryToRemote(client);
Expand All @@ -241,31 +243,76 @@ public synchronized void init() {
interpreterProcess.releaseClient(client, broken);
}
}
initialized = true;
remoteCreated = true;
}

public void openRemote() {
if (remoteOpened == true) {
return;
}

RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();

final InterpreterGroup interpreterGroup = getInterpreterGroup();
interpreterProcess.reference(interpreterGroup);
interpreterProcess.setMaxPoolSize(
Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
String groupId = interpreterGroup.getId();

synchronized (interpreterProcess) {
Client client = null;
try {
client = interpreterProcess.getClient();
} catch (Exception e1) {
throw new InterpreterException(e1);
}

boolean broken = false;
try {
logger.info("Open remote interpreter {}", getClassName());
client.open(noteId, getClassName());
} catch (TException e) {
logger.error("Failed to open interpreter: {}", getClassName());
throw new InterpreterException(e);
} finally {
// TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken
interpreterProcess.releaseClient(client, broken);
}
}
remoteOpened = true;
}

@Override
public void open() {
InterpreterGroup interpreterGroup = getInterpreterGroup();

// It is a hack in RemoteInterpreter that client should create all the interpreters in the same
// session together then open them. This is because there may be some dependency between
// interpeters of the same session. e.g. PySparkInterpreter depends on SparkInterpeter.
synchronized (interpreterGroup) {
// initialize all interpreters in this interpreter group
List<Interpreter> interpreters = interpreterGroup.get(noteId);
for (Interpreter intp : new ArrayList<>(interpreters)) {
for (Interpreter intp : interpreters) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
try {
((RemoteInterpreter) p).init();
((RemoteInterpreter) p).createRemote();
} catch (InterpreterException e) {
logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
logger.error("Failed to create interpreter: {}. Remove it from interpreterGroup",
p.getClassName());
interpreters.remove(p);
}
}

for (Interpreter intp : interpreters) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
((RemoteInterpreter) p).openRemote();
}
}
}

Expand Down Expand Up @@ -383,7 +430,7 @@ public void cancel(InterpreterContext context) {

@Override
public FormType getFormType() {
init();
createRemote();

if (formType != null) {
return formType;
Expand Down Expand Up @@ -498,7 +545,7 @@ private InterpreterResult convert(RemoteInterpreterResult result) {
/**
* Push local angular object registry to
* remote interpreter. This method should be
* call ONLY inside the init() method
* call ONLY inside the createRemote() method
* @param client
* @throws TException
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,15 +431,7 @@ public void onPostExecute(String script) {
protected Object jobRun() throws Throwable {
try {
InterpreterContext.set(context);

// Open the interpreter instance prior to calling interpret().
// This is necessary because the earliest we can register a hook
// is from within the open() method.
LazyOpenInterpreter lazy = (LazyOpenInterpreter) interpreter;
if (!lazy.isOpen()) {
lazy.open();
}

// Add hooks to script from registry.
// Global scope first, followed by notebook scope
processInterpreterHooks(null);
Expand Down