Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions bin/interpreter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function usage() {
echo "usage) $0 -p <port> -d <interpreter dir to load> -l <local interpreter repo dir to load>"
}

while getopts "hp:d:l:v" o; do
while getopts "hp:d:l:v:u:" o; do
case ${o} in
h)
usage
Expand All @@ -42,6 +42,9 @@ while getopts "hp:d:l:v" o; do
. "${bin}/common.sh"
getZeppelinVersion
;;
u)
ZEPPELIN_SSH_COMMAND="ssh ${OPTARG}@localhost "
;;
esac
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires the login user must exist in the os account and be able to ssh to localhost. I am not sure whether this is a good way, but just feel the approach is a little strange compared to the impersonation implementation in hadoop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zjffdu yes, I agree, its not as implementation in hadoop, would you recommend something else ?

done

Expand Down Expand Up @@ -155,10 +158,11 @@ addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}"

CLASSPATH+=":${ZEPPELIN_INTP_CLASSPATH}"


if [[ -n "${SPARK_SUBMIT}" ]]; then
${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${CLASSPATH}" --driver-java-options "${JAVA_INTP_OPTS}" ${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${PORT} &
${ZEPPELIN_SSH_COMMAND} `${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${CLASSPATH}" --driver-java-options "${JAVA_INTP_OPTS}" ${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${PORT} &`
else
${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} &
${ZEPPELIN_SSH_COMMAND} ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} &
fi

pid=$!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package org.apache.zeppelin.interpreter.remote;

import java.util.*;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
Expand All @@ -36,8 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.util.*;
Copy link
Member

@bzz bzz Aug 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a nitpick but Zeppelin's Java code conventions discourages usage of wildcard imports.

Could you please check all the other changed files to follow this convention as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll revert this, and check that my Editor (Intellij Idea) is also configured properly.


/**
* Proxy for Interpreter instance that runs on separate process
Expand All @@ -59,20 +58,24 @@ public class RemoteInterpreter extends Interpreter {
private int maxPoolSize;
private String host;
private int port;
private String userName;
private Boolean isUserImpersonate;

/**
* Remote interpreter and manage interpreter process
*/
public RemoteInterpreter(Properties property,
String noteId,
String className,
String interpreterRunner,
String interpreterPath,
String localRepoPath,
int connectTimeout,
int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener) {
String noteId,
String className,
String interpreterRunner,
String interpreterPath,
String localRepoPath,
int connectTimeout,
int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener,
String userName,
Boolean isUserImpersonate) {
super(property);
this.noteId = noteId;
this.className = className;
Expand All @@ -85,6 +88,8 @@ public RemoteInterpreter(Properties property,
this.maxPoolSize = maxPoolSize;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.applicationEventListener = appListener;
this.userName = userName;
this.isUserImpersonate = isUserImpersonate;
}


Expand All @@ -100,7 +105,9 @@ public RemoteInterpreter(
int connectTimeout,
int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener) {
ApplicationEventListener appListener,
String userName,
Boolean isUserImpersonate) {
super(property);
this.noteId = noteId;
this.className = className;
Expand All @@ -111,6 +118,8 @@ public RemoteInterpreter(
this.maxPoolSize = maxPoolSize;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.applicationEventListener = appListener;
this.userName = userName;
this.isUserImpersonate = isUserImpersonate;
}


Expand All @@ -125,7 +134,9 @@ public RemoteInterpreter(
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener) {
ApplicationEventListener appListener,
String userName,
Boolean isUserImpersonate) {
super(property);
this.className = className;
this.noteId = noteId;
Expand All @@ -138,6 +149,8 @@ public RemoteInterpreter(
this.maxPoolSize = 10;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.applicationEventListener = appListener;
this.userName = userName;
this.isUserImpersonate = isUserImpersonate;
}

private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
Expand Down Expand Up @@ -205,7 +218,7 @@ public synchronized void init() {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();

final InterpreterGroup interpreterGroup = getInterpreterGroup();
interpreterProcess.reference(interpreterGroup);
interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
interpreterProcess.setMaxPoolSize(
Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
String groupId = interpreterGroup.getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;

/**
Expand Down Expand Up @@ -88,7 +87,7 @@ public int getPort() {
}

@Override
public void start() {
public void start(String userName, Boolean isUserImpersonate) {
// start server process
try {
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
Expand All @@ -101,6 +100,10 @@ public void start() {
cmdLine.addArgument(interpreterDir, false);
cmdLine.addArgument("-p", false);
cmdLine.addArgument(Integer.toString(port), false);
if (isUserImpersonate && !userName.equals("anonymous") ) {
cmdLine.addArgument("-u", false);
cmdLine.addArgument(userName, false);
}
cmdLine.addArgument("-l", false);
cmdLine.addArgument(localRepoDir, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@
package org.apache.zeppelin.interpreter.remote;

import com.google.gson.Gson;
import org.apache.commons.exec.*;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.TException;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.Constants;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.Properties;

/**
* Abstract class for interpreter process
Expand Down Expand Up @@ -63,18 +61,20 @@ public RemoteInterpreterProcess(

public abstract String getHost();
public abstract int getPort();
public abstract void start();

public abstract void start(String userName, Boolean isUserImpersonate);
public abstract void stop();
public abstract boolean isRunning();

public int getConnectTimeout() {
return connectTimeout;
}

public int reference(InterpreterGroup interpreterGroup) {
public int reference(InterpreterGroup interpreterGroup, String userName,
Boolean isUserImpersonate) {
synchronized (referenceCount) {
if (!isRunning()) {
start();
start(userName, isUserImpersonate);
}

if (clientPool == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public int getPort() {
}

@Override
public void start() {
public void start(String userName, Boolean isUserImpersonate) {
// assume process is externally managed. nothing to do
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ public void setUp() throws Exception {
env,
10 * 1000,
null,
null
);
null,
"anonymous",
false);

intpGroup.put("note", new LinkedList<Interpreter>());
intpGroup.get("note").add(intp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ private RemoteInterpreter createMockInterpreter() {
env,
10 * 1000,
this,
null);
null,
"anonymous",
false);

intpGroup.get("note").add(intp);
intp.setInterpreterGroup(intpGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Properties;

import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.interpreter.Constants;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
Expand All @@ -46,8 +45,8 @@ public void testStartStop() {
10 * 1000, null, null);
assertFalse(rip.isRunning());
assertEquals(0, rip.referenceCount());
assertEquals(1, rip.reference(intpGroup));
assertEquals(2, rip.reference(intpGroup));
assertEquals(1, rip.reference(intpGroup, "anonymous", false));
assertEquals(2, rip.reference(intpGroup, "anonymous", false));
assertEquals(true, rip.isRunning());
assertEquals(1, rip.dereference());
assertEquals(true, rip.isRunning());
Expand All @@ -61,7 +60,7 @@ public void testClientFactory() throws Exception {
RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
mock(RemoteInterpreterEventPoller.class), 10 * 1000);
rip.reference(intpGroup);
rip.reference(intpGroup, "anonymous", false);
assertEquals(0, rip.getNumActiveClient());
assertEquals(0, rip.getNumIdleClient());

Expand Down Expand Up @@ -106,7 +105,7 @@ public void testStartStopRemoteInterpreter() throws TException, InterruptedExcep
, 10 * 1000);
assertFalse(rip.isRunning());
assertEquals(0, rip.referenceCount());
assertEquals(1, rip.reference(intpGroup));
assertEquals(1, rip.reference(intpGroup, "anonymous", false));
assertEquals(true, rip.isRunning());
}
}
Loading