diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java index 08bb944721c..06e0c99205e 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java @@ -17,6 +17,14 @@ package org.apache.zeppelin.python; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.ServerSocket; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -29,21 +37,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import py4j.GatewayServer; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.ServerSocket; -import java.util.Collection; -import java.util.List; -import java.util.Properties; +import py4j.GatewayServer; /** * Python interpreter for Zeppelin. */ public class PythonInterpreter extends Interpreter { - Logger logger = LoggerFactory.getLogger(PythonInterpreter.class); + private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreter.class); public static final String BOOTSTRAP_PY = "/bootstrap.py"; public static final String BOOTSTRAP_INPUT_PY = "/bootstrap_input.py"; @@ -65,8 +66,8 @@ public PythonInterpreter(Properties property) { @Override public void open() { - logger.info("Starting Python interpreter ....."); - logger.info("Python path is set to:" + property.getProperty(ZEPPELIN_PYTHON)); + LOG.info("Starting Python interpreter ....."); + LOG.info("Python path is set to:" + property.getProperty(ZEPPELIN_PYTHON)); maxResult = Integer.valueOf(getProperty(MAX_RESULT)); process = getPythonProcess(); @@ -74,32 +75,32 @@ public void open() { try { process.open(); } catch (IOException e) { - logger.error("Can't start the python process", e); + LOG.error("Can't start the python process", e); } try { - logger.info("python PID : " + process.getPid()); + LOG.info("python PID : " + process.getPid()); } catch (Exception e) { - logger.warn("Can't find python pid process", e); + LOG.warn("Can't find python pid process", e); } try { - logger.info("Bootstrap interpreter with " + BOOTSTRAP_PY); + LOG.info("Bootstrap interpreter with " + BOOTSTRAP_PY); bootStrapInterpreter(BOOTSTRAP_PY); } catch (IOException e) { - logger.error("Can't execute " + BOOTSTRAP_PY + " to initiate python process", e); + LOG.error("Can't execute " + BOOTSTRAP_PY + " to initiate python process", e); } if (py4J = isPy4jInstalled()) { port = findRandomOpenPortOnAllLocalInterfaces(); - logger.info("Py4j gateway port : " + port); + LOG.info("Py4j gateway port : " + port); try { gatewayServer = new GatewayServer(this, port); gatewayServer.start(); - logger.info("Bootstrap inputs with " + BOOTSTRAP_INPUT_PY); + LOG.info("Bootstrap inputs with " + BOOTSTRAP_INPUT_PY); bootStrapInterpreter(BOOTSTRAP_INPUT_PY); } catch (IOException e) { - logger.error("Can't execute " + BOOTSTRAP_INPUT_PY + " to " + + LOG.error("Can't execute " + BOOTSTRAP_INPUT_PY + " to " + "initialize Zeppelin inputs in python process", e); } } @@ -107,7 +108,7 @@ public void open() { @Override public void close() { - logger.info("closing Python interpreter ....."); + LOG.info("closing Python interpreter ....."); try { if (process != null) { process.close(); @@ -116,7 +117,7 @@ public void close() { gatewayServer.shutdown(); } } catch (IOException e) { - logger.error("Can't close the interpreter", e); + LOG.error("Can't close the interpreter", e); } } @@ -136,7 +137,7 @@ public void cancel(InterpreterContext context) { try { process.interrupt(); } catch (IOException e) { - logger.error("Can't interrupt the python interpreter", e); + LOG.error("Can't interrupt the python interpreter", e); } } @@ -184,11 +185,11 @@ private Job getRunningJob(String paragraphId) { private String sendCommandToPython(String cmd) { String output = ""; - logger.info("Sending : \n" + (cmd.length() > 200 ? cmd.substring(0, 200) + "..." : cmd)); + LOG.info("Sending : \n" + (cmd.length() > 200 ? cmd.substring(0, 200) + "..." : cmd)); try { output = process.sendAndGetResult(cmd); } catch (IOException e) { - logger.error("Error when sending commands to python process", e); + LOG.error("Error when sending commands to python process", e); } //logger.info("Got : \n" + output); return output; @@ -207,7 +208,7 @@ private void bootStrapInterpreter(String file) throws IOException { if (py4J && port != null && port != -1) { bootstrapCode = bootstrapCode.replaceAll("\\%PORT\\%", port.toString()); } - logger.info("Bootstrap python interpreter with code from \n " + file); + LOG.info("Bootstrap python interpreter with code from \n " + file); sendCommandToPython(bootstrapCode); } @@ -234,7 +235,7 @@ private int findRandomOpenPortOnAllLocalInterfaces() { port = socket.getLocalPort(); socket.close(); } catch (IOException e) { - logger.error("Can't find an open port", e); + LOG.error("Can't find an open port", e); } return port; } diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java index 2e8060deec1..92fde2d459c 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java @@ -18,22 +18,26 @@ package org.apache.zeppelin.python; import static org.apache.zeppelin.python.PythonInterpreter.*; - import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.times; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.when; -import org.apache.zeppelin.interpreter.InterpreterResult; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.util.Properties; +import org.apache.zeppelin.interpreter.InterpreterResult; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -41,18 +45,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.util.Properties; - /** * Python interpreter unit test */ public class PythonInterpreterTest { - - Logger logger = LoggerFactory.getLogger(PythonProcess.class); + private static final Logger LOG = LoggerFactory.getLogger(PythonProcess.class); PythonInterpreter pythonInterpreter = null; PythonProcess mockPythonProcess; @@ -66,75 +63,54 @@ public static Properties getPythonTestProperties() { } @Before - public void beforeTest() { + public void beforeTest() throws IOException { cmdHistory = ""; /*Mock python process*/ mockPythonProcess = mock(PythonProcess.class); - when(mockPythonProcess.getPid()).thenReturn((long) 1); - try { - when(mockPythonProcess.sendAndGetResult(anyString())).thenAnswer( - new Answer() { - @Override - public String answer(InvocationOnMock invocationOnMock) throws Throwable { - return answerFromPythonMock(invocationOnMock); - } - }); - } catch (IOException e) { - logger.error("Can't initiate python process", e); - } + when(mockPythonProcess.getPid()).thenReturn(1L); + when(mockPythonProcess.sendAndGetResult(anyString())).thenAnswer(new Answer() { + @Override public String answer(InvocationOnMock invocationOnMock) throws Throwable { + return answerFromPythonMock(invocationOnMock); + } + }); pythonInterpreter = spy(new PythonInterpreter(getPythonTestProperties())); when(pythonInterpreter.getPythonProcess()).thenReturn(mockPythonProcess); - - try { - when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn("ImportError"); - } catch (IOException e) { - e.printStackTrace(); - } + when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn("ImportError"); } @Test public void testOpenInterpreter() { pythonInterpreter.open(); assertEquals(pythonInterpreter.getPythonProcess().getPid(), 1); - } - @Test - public void testPy4jIsNotInstalled() { - - /* - If Py4J is not installed, bootstrap_input.py - is not sent to Python process and - py4j JavaGateway is not running - */ + /** + * If Py4J is not installed, bootstrap_input.py + * is not sent to Python process and py4j JavaGateway is not running + */ + @Test public void testPy4jIsNotInstalled() { pythonInterpreter.open(); assertNull(pythonInterpreter.getPy4jPort()); - assertTrue(cmdHistory.contains("def help()")); assertTrue(cmdHistory.contains("class PyZeppelinContext(object):")); assertTrue(cmdHistory.contains("z = PyZeppelinContext")); assertTrue(cmdHistory.contains("def show")); assertFalse(cmdHistory.contains("GatewayClient")); - } - @Test - public void testPy4jInstalled() { - - /* - If Py4J installed, bootstrap_input.py - is sent to interpreter and JavaGateway is - running - */ + /** + * If Py4J installed, bootstrap_input.py + * is sent to interpreter and JavaGateway is + * running + * + * @throws IOException + */ + @Test public void testPy4jInstalled() throws IOException { + when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn(">>>"); - try { - when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn(">>>"); - } catch (IOException e) { - e.printStackTrace(); - } pythonInterpreter.open(); Integer py4jPort = pythonInterpreter.getPy4jPort(); assertNotNull(py4jPort); @@ -146,70 +122,71 @@ public void testPy4jInstalled() { assertTrue(cmdHistory.contains("GatewayClient(port=" + py4jPort + ")")); assertTrue(cmdHistory.contains("org.apache.zeppelin.display.Input")); - assertTrue(checkSocketAddress(py4jPort)); - + assertTrue(serverIsListeningOn(py4jPort)); } - @Test - public void testClose() { + public void testClose() throws IOException, InterruptedException { + //given: py4j is installed + when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn(">>>"); - try { - when(mockPythonProcess.sendAndGetResult(eq("\n\nimport py4j\n"))).thenReturn(">>>"); - } catch (IOException e) { - e.printStackTrace(); - } pythonInterpreter.open(); Integer py4jPort = pythonInterpreter.getPy4jPort(); - assertNotNull(py4jPort); + + //when pythonInterpreter.close(); - assertFalse(checkSocketAddress(py4jPort)); - try { - verify(mockPythonProcess, times(1)).close(); - } catch (IOException e) { - e.printStackTrace(); - } + //then + assertFalse(serverIsListeningOn(py4jPort)); + verify(mockPythonProcess, times(1)).close(); } - @Test public void testInterpret() { - pythonInterpreter.open(); cmdHistory = ""; InterpreterResult result = pythonInterpreter.interpret("print a", null); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals("%text print a", result.toString()); - } + /** + * Checks if given port is open on 'localhost' + * @param port + */ + private boolean serverIsListeningOn(Integer port) { + boolean serverIsListening = false; + Socket s = new Socket(); + boolean connected = tryToConnect(s, port); + if (connected) { + serverIsListening = true; + tryToClose(s); + } + return serverIsListening; + } - private boolean checkSocketAddress(Integer py4jPort) { - Socket s = new Socket(); - SocketAddress sa = new InetSocketAddress("localhost", py4jPort); - Boolean working = null; + private boolean tryToConnect(Socket s, Integer port) { + boolean connected = false; + SocketAddress sa = new InetSocketAddress("localhost", port); try { s.connect(sa, 10000); + connected = true; } catch (IOException e) { - working = false; + LOG.error("Can't open connection to " + sa, e); } + return connected; + } - if (working == null) { - working = s.isConnected(); - try { - s.close(); - } catch (IOException e) { - logger.error("Can't close connection to localhost:" + py4jPort, e); - } + private void tryToClose(Socket s) { + try { + s.close(); + } catch (IOException e) { + LOG.error("Can't close connection to " + s.getInetAddress(), e); } - return working; } - - private String answerFromPythonMock(InvocationOnMock invocationOnMock) { Object[] inputs = invocationOnMock.getArguments(); String cmdToExecute = (String) inputs[0];