diff --git a/bin/interpreter.sh b/bin/interpreter.sh index 29d02218d80..33f2738def0 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -23,7 +23,7 @@ function usage() { echo "usage) $0 -p -d -l -g " } -while getopts "hp:d:l:v:u:g:" o; do +while getopts "hc:p:d:l:v:u:g:" o; do case ${o} in h) usage @@ -32,8 +32,11 @@ while getopts "hp:d:l:v:u:g:" o; do d) INTERPRETER_DIR=${OPTARG} ;; + c) + CALLBACK_HOST=${OPTARG} # This will be used callback host + ;; p) - PORT=${OPTARG} + PORT=${OPTARG} # This will be used callback port ;; l) LOCAL_INTERPRETER_REPO=${OPTARG} @@ -202,12 +205,12 @@ fi if [[ -n "${SPARK_SUBMIT}" ]]; then if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]]; then - INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${PORT}` + INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}` else - INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${PORT}` + INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}` fi else - INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} ` + INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} ` fi if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then diff --git a/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java index 21ce283abba..3a15e8be0dd 100644 --- a/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java +++ b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java @@ -38,8 +38,8 @@ public class ZeppelinDevServer extends private DevInterpreter interpreter = null; private InterpreterOutput out; - public ZeppelinDevServer(int port) throws TException { - super(port); + public ZeppelinDevServer(int port) throws TException, IOException { + super(null, port); } @Override diff --git a/pom.xml b/pom.xml index 9232b9e94ea..709486b878a 100644 --- a/pom.xml +++ b/pom.xml @@ -106,6 +106,7 @@ 3.2.1 1.1.1 1.2.3 + 4.1.12.Final 4.12 @@ -132,6 +133,8 @@ 1.3.0 2.8.2 + zeppelin-package + 64m 512m @@ -254,6 +257,11 @@ shiro-config-core ${shiro.version} + + io.netty + netty-all + ${netty.version} + diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index 109099cfc7b..cf0b255fff9 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -219,6 +219,11 @@ guava + + io.netty + netty-all + + junit junit @@ -231,4 +236,63 @@ test + + + + + org.apache.maven.plugins + maven-shade-plugin + + false + + + io.netty:netty-all + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + io.netty + ${zeppelin.shade.package}.io.netty + + io.netty.** + + + + + + + package + + shade + + + + + + maven-dependency-plugin + + + copy-dependencies + process-test-resources + + copy-dependencies + + + io.netty + + + + + + diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 719d2dd71c0..b103bbeae25 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -17,38 +17,77 @@ package org.apache.zeppelin.interpreter.remote; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; import java.nio.ByteBuffer; -import java.util.*; - +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; import org.apache.thrift.TException; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.zeppelin.dep.DependencyResolver; -import org.apache.zeppelin.display.*; -import org.apache.zeppelin.helium.*; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.helium.Application; +import org.apache.zeppelin.helium.ApplicationContext; +import org.apache.zeppelin.helium.ApplicationException; +import org.apache.zeppelin.helium.ApplicationLoader; +import org.apache.zeppelin.helium.HeliumAppAngularObjectRegistry; +import org.apache.zeppelin.helium.HeliumPackage; +import org.apache.zeppelin.interpreter.Constants; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterHookListener; +import org.apache.zeppelin.interpreter.InterpreterHookRegistry; +import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterOutputListener; +import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.thrift.*; -import org.apache.zeppelin.resource.*; +import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; +import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.apache.zeppelin.interpreter.RemoteWorksController; +import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; +import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner; +import org.apache.zeppelin.resource.DistributedResourcePool; +import org.apache.zeppelin.resource.Resource; +import org.apache.zeppelin.resource.ResourcePool; +import org.apache.zeppelin.resource.ResourceSet; +import org.apache.zeppelin.resource.WellKnownResourceName; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; import org.apache.zeppelin.scheduler.JobProgressPoller; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.user.AuthenticationInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; /** * Entry point for Interpreter process. @@ -68,6 +107,9 @@ public class RemoteInterpreterServer Gson gson = new Gson(); RemoteInterpreterService.Processor processor; + private String callbackHost; + private int callbackPort; + private String host; private int port; private TThreadPoolServer server; @@ -82,11 +124,26 @@ public class RemoteInterpreterServer private final long DEFAULT_SHUTDOWN_TIMEOUT = 2000; - public RemoteInterpreterServer(int port) throws TTransportException { - this.port = port; + public RemoteInterpreterServer(String callbackHost, int port) + throws TTransportException, IOException { + if (null != callbackHost) { + this.callbackHost = callbackHost; + this.callbackPort = port; + } else { + // DevInterpreter + this.port = port; + } processor = new RemoteInterpreterService.Processor<>(this); - TServerSocket serverTransport = new TServerSocket(port); + TServerSocket serverTransport; + if (null == callbackHost) { + // Dev Interpreter + serverTransport = new TServerSocket(port); + } else { + this.port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + this.host = RemoteInterpreterUtils.findAvailableHostname(); + serverTransport = new TServerSocket(this.port); + } server = new TThreadPoolServer( new TThreadPoolServer.Args(serverTransport).processor(processor)); remoteWorksResponsePool = Collections.synchronizedMap(new HashMap()); @@ -95,6 +152,26 @@ public RemoteInterpreterServer(int port) throws TTransportException { @Override public void run() { + if (null != callbackHost) { + new Thread(new Runnable() { + boolean interrupted = false; + @Override + public void run() { + while (!interrupted && !server.isServing()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + interrupted = true; + } + } + + if (!interrupted) { + RemoteInterpreterUtils + .registerInterpreter(callbackHost, callbackPort, host + ":" + port); + } + } + }).start(); + } logger.info("Starting remote interpreter server on port {}", port); server.serve(); } @@ -141,13 +218,15 @@ public boolean isRunning() { public static void main(String[] args) - throws TTransportException, InterruptedException { - + throws TTransportException, InterruptedException, IOException { + String callbackHost = null; int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT; if (args.length > 0) { - port = Integer.parseInt(args[0]); + callbackHost = args[0]; + port = Integer.parseInt(args[1]); } - RemoteInterpreterServer remoteInterpreterServer = new RemoteInterpreterServer(port); + RemoteInterpreterServer remoteInterpreterServer = + new RemoteInterpreterServer(callbackHost, port); remoteInterpreterServer.start(); remoteInterpreterServer.join(); System.exit(0); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java index 4ee6690f7c4..dc3810dc8d9 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java @@ -17,14 +17,32 @@ package org.apache.zeppelin.interpreter.remote; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.CharsetUtil; import java.io.IOException; import java.net.ConnectException; +import java.net.Inet4Address; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.InterfaceAddress; +import java.net.NetworkInterface; import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.Collections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @@ -40,6 +58,24 @@ public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOExcepti return port; } + public static String findAvailableHostname() throws UnknownHostException, SocketException { + InetAddress address = InetAddress.getLocalHost(); + if (address.isLoopbackAddress()) { + for (NetworkInterface networkInterface : Collections + .list(NetworkInterface.getNetworkInterfaces())) { + if (!networkInterface.isLoopback()) { + for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses()) { + InetAddress a = interfaceAddress.getAddress(); + if (a instanceof Inet4Address) { + return a.getHostAddress(); + } + } + } + } + } + return address.getHostName(); + } + public static boolean checkIfRemoteEndpointAccessible(String host, int port) { try { Socket discover = new Socket(); @@ -80,4 +116,42 @@ public static boolean isEnvString(String key) { return key.matches("^[A-Z_0-9]*"); } + + public static void registerInterpreter(String callbackHost, int callbackPort, final String msg) { + LOGGER.info("callbackHost: {}, callbackPort: {}, msg: {}", callbackHost, callbackPort, msg); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + try { + Bootstrap b = new Bootstrap(); + b.group(workerGroup); + b.channel(NioSocketChannel.class); + b.option(ChannelOption.SO_KEEPALIVE, true); + b.handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + LOGGER.info("Send message {}", msg); + ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8)); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } + }); + } + }); + + ChannelFuture f = b.connect(callbackHost, callbackPort).sync(); + + // Wait until the connection is closed. + f.channel().closeFuture().sync(); + } catch (InterruptedException e) { + // + } finally { + workerGroup.shutdownGracefully(); + } + } } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java index a4b3a2573ba..6efe4560777 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java @@ -42,7 +42,7 @@ public void tearDown() throws Exception { @Test public void testStartStop() throws InterruptedException, IOException, TException { - RemoteInterpreterServer server = new RemoteInterpreterServer( + RemoteInterpreterServer server = new RemoteInterpreterServer("localhost", RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces()); assertEquals(false, server.isRunning()); @@ -90,7 +90,7 @@ public void run() { @Test public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException { - RemoteInterpreterServer server = new RemoteInterpreterServer( + RemoteInterpreterServer server = new RemoteInterpreterServer("localhost", RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces()); assertEquals(false, server.isRunning()); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index 1fb9b90771c..084aa803c69 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -17,17 +17,38 @@ package org.apache.zeppelin.interpreter.remote; -import org.apache.commons.exec.*; -import org.apache.commons.exec.environment.EnvironmentUtils; -import org.apache.zeppelin.helium.ApplicationEventListener; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.util.CharsetUtil; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.commons.exec.CommandLine; +import org.apache.commons.exec.DefaultExecutor; +import org.apache.commons.exec.ExecuteException; +import org.apache.commons.exec.ExecuteResultHandler; +import org.apache.commons.exec.ExecuteWatchdog; +import org.apache.commons.exec.LogOutputStream; +import org.apache.commons.exec.PumpStreamHandler; +import org.apache.commons.exec.environment.EnvironmentUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.InterpreterException; /** * This class manages start / stop of remote interpreter process @@ -37,10 +58,12 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess private static final Logger logger = LoggerFactory.getLogger( RemoteInterpreterManagedProcess.class); private final String interpreterRunner; + private final CountDownLatch hostPortLatch; private DefaultExecutor executor; private ExecuteWatchdog watchdog; boolean running = false; + private String host = null; private int port = -1; private final String interpreterDir; private final String localRepoDir; @@ -48,6 +71,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess private Map env; + public RemoteInterpreterManagedProcess( String intpRunner, String intpDir, @@ -64,6 +88,7 @@ public RemoteInterpreterManagedProcess( this.interpreterDir = intpDir; this.localRepoDir = localRepoDir; this.interpreterGroupName = interpreterGroupName; + this.hostPortLatch = new CountDownLatch(1); } RemoteInterpreterManagedProcess(String intpRunner, @@ -80,6 +105,7 @@ public RemoteInterpreterManagedProcess( this.interpreterDir = intpDir; this.localRepoDir = localRepoDir; this.interpreterGroupName = interpreterGroupName; + this.hostPortLatch = new CountDownLatch(1); } @Override @@ -95,8 +121,11 @@ public int getPort() { @Override public void start(String userName, Boolean isUserImpersonate) { // start server process + final String callbackHost; + final int callbackPort; try { - port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + callbackHost = RemoteInterpreterUtils.findAvailableHostname(); + callbackPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); } catch (IOException e1) { throw new InterpreterException(e1); } @@ -104,8 +133,10 @@ public void start(String userName, Boolean isUserImpersonate) { CommandLine cmdLine = CommandLine.parse(interpreterRunner); cmdLine.addArgument("-d", false); cmdLine.addArgument(interpreterDir, false); + cmdLine.addArgument("-c", false); + cmdLine.addArgument(callbackHost, false); cmdLine.addArgument("-p", false); - cmdLine.addArgument(Integer.toString(port), false); + cmdLine.addArgument(Integer.toString(callbackPort), false); if (isUserImpersonate && !userName.equals("anonymous")) { cmdLine.addArgument("-u", false); cmdLine.addArgument(userName, false); @@ -137,34 +168,68 @@ public void start(String userName, Boolean isUserImpersonate) { throw new InterpreterException(e); } - - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < getConnectTimeout()) { - if (!running) { + // Start netty server to receive hostPort information from RemoteInterpreterServer; + new Thread(new Runnable() { + @Override + public void run() { + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); try { - cmdOut.flush(); - } catch (IOException e) { - // nothing to do + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.DEBUG)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new SimpleChannelInboundHandler() { + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + logger.info("{}", ctx); + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { + logger.info("msg: {}", msg.toString(CharsetUtil.UTF_8)); + String[] hostPort = msg.toString(CharsetUtil.UTF_8).split(":"); + host = hostPort[0]; + port = Integer.parseInt(hostPort[1]); + hostPortLatch.countDown(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.error("Netty error", cause); + ctx.close(); + } + }); + } + }); + + logger.info("Netty server starts"); + // Bind and start to accept incoming connections. + ChannelFuture f = b.bind(callbackPort).sync(); + + // Wait until the server socket is closed. + // In this example, this does not happen, but you can do that to gracefully + // shut down your server. + f.channel().closeFuture().sync(); + } catch (InterruptedException e) { + logger.error("Netty server error while binding", e); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); } - throw new InterpreterException(new String(cmdOut.toByteArray())); } + }).start(); - try { - if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) { - break; - } else { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - logger.error("Exception in RemoteInterpreterProcess while synchronized reference " + - "Thread.sleep", e); - } - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug("Remote interpreter not yet accessible at localhost:" + port); - } + try { + hostPortLatch.await(getConnectTimeout(), TimeUnit.MILLISECONDS); + // Check if not running + if (null == host || -1 == port) { + throw new InterpreterException("CAnnot run interpreter"); } + } catch (InterruptedException e) { + logger.error("Remote interpreter is not accessible"); } processOutput.setOutputStream(null); } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java index b85d7ef2fb0..ffbeac89191 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java @@ -21,11 +21,11 @@ import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.*; +import java.io.IOException; import java.util.HashMap; import java.util.Properties; import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransportException; import org.apache.zeppelin.interpreter.Constants; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; @@ -78,8 +78,8 @@ public void testClientFactory() throws Exception { } @Test - public void testStartStopRemoteInterpreter() throws TException, InterruptedException { - RemoteInterpreterServer server = new RemoteInterpreterServer(3678); + public void testStartStopRemoteInterpreter() throws TException, InterruptedException, IOException { + RemoteInterpreterServer server = new RemoteInterpreterServer(null, 3678); server.start(); boolean running = false; long startTime = System.currentTimeMillis();