Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

package org.apache.zeppelin.python;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
Expand All @@ -29,21 +37,14 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import py4j.GatewayServer;

/**
* Python interpreter for Zeppelin.
*/
public class PythonInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(PythonInterpreter.class);
private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreter.class);

public static final String BOOTSTRAP_PY = "/bootstrap.py";
public static final String BOOTSTRAP_INPUT_PY = "/bootstrap_input.py";
Expand All @@ -65,49 +66,49 @@ public PythonInterpreter(Properties property) {

@Override
public void open() {
logger.info("Starting Python interpreter .....");
logger.info("Python path is set to:" + property.getProperty(ZEPPELIN_PYTHON));
LOG.info("Starting Python interpreter .....");
LOG.info("Python path is set to:" + property.getProperty(ZEPPELIN_PYTHON));

maxResult = Integer.valueOf(getProperty(MAX_RESULT));
process = getPythonProcess();

try {
process.open();
} catch (IOException e) {
logger.error("Can't start the python process", e);
LOG.error("Can't start the python process", e);
}

try {
logger.info("python PID : " + process.getPid());
LOG.info("python PID : " + process.getPid());
} catch (Exception e) {
logger.warn("Can't find python pid process", e);
LOG.warn("Can't find python pid process", e);
}

try {
logger.info("Bootstrap interpreter with " + BOOTSTRAP_PY);
LOG.info("Bootstrap interpreter with " + BOOTSTRAP_PY);
bootStrapInterpreter(BOOTSTRAP_PY);
} catch (IOException e) {
logger.error("Can't execute " + BOOTSTRAP_PY + " to initiate python process", e);
LOG.error("Can't execute " + BOOTSTRAP_PY + " to initiate python process", e);
}

if (py4J = isPy4jInstalled()) {
port = findRandomOpenPortOnAllLocalInterfaces();
logger.info("Py4j gateway port : " + port);
LOG.info("Py4j gateway port : " + port);
try {
gatewayServer = new GatewayServer(this, port);
gatewayServer.start();
logger.info("Bootstrap inputs with " + BOOTSTRAP_INPUT_PY);
LOG.info("Bootstrap inputs with " + BOOTSTRAP_INPUT_PY);
bootStrapInterpreter(BOOTSTRAP_INPUT_PY);
} catch (IOException e) {
logger.error("Can't execute " + BOOTSTRAP_INPUT_PY + " to " +
LOG.error("Can't execute " + BOOTSTRAP_INPUT_PY + " to " +
"initialize Zeppelin inputs in python process", e);
}
}
}

@Override
public void close() {
logger.info("closing Python interpreter .....");
LOG.info("closing Python interpreter .....");
try {
if (process != null) {
process.close();
Expand All @@ -116,7 +117,7 @@ public void close() {
gatewayServer.shutdown();
}
} catch (IOException e) {
logger.error("Can't close the interpreter", e);
LOG.error("Can't close the interpreter", e);
}
}

Expand All @@ -136,7 +137,7 @@ public void cancel(InterpreterContext context) {
try {
process.interrupt();
} catch (IOException e) {
logger.error("Can't interrupt the python interpreter", e);
LOG.error("Can't interrupt the python interpreter", e);
}
}

Expand Down Expand Up @@ -184,11 +185,11 @@ private Job getRunningJob(String paragraphId) {

private String sendCommandToPython(String cmd) {
String output = "";
logger.info("Sending : \n" + (cmd.length() > 200 ? cmd.substring(0, 200) + "..." : cmd));
LOG.info("Sending : \n" + (cmd.length() > 200 ? cmd.substring(0, 200) + "..." : cmd));
try {
output = process.sendAndGetResult(cmd);
} catch (IOException e) {
logger.error("Error when sending commands to python process", e);
LOG.error("Error when sending commands to python process", e);
}
//logger.info("Got : \n" + output);
return output;
Expand All @@ -207,7 +208,7 @@ private void bootStrapInterpreter(String file) throws IOException {
if (py4J && port != null && port != -1) {
bootstrapCode = bootstrapCode.replaceAll("\\%PORT\\%", port.toString());
}
logger.info("Bootstrap python interpreter with code from \n " + file);
LOG.info("Bootstrap python interpreter with code from \n " + file);
sendCommandToPython(bootstrapCode);
}

Expand All @@ -234,7 +235,7 @@ private int findRandomOpenPortOnAllLocalInterfaces() {
port = socket.getLocalPort();
socket.close();
} catch (IOException e) {
logger.error("Can't find an open port", e);
LOG.error("Can't find an open port", e);
}
return port;
}
Expand Down
Loading