Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs/manual/interpreters.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,49 @@ interpreter.start()
The above code will start interpreter thread inside your process. Once the interpreter is started you can configure zeppelin to connect to RemoteInterpreter by checking **Connect to existing process** checkbox and then provide **Host** and **Port** on which interpreter porocess is listening as shown in the image below:

<img src="../assets/themes/zeppelin/img/screenshots/existing_interpreter.png" width="450px">


## (Experimental) Interpreter Execution Hooks

Zeppelin allows for users to specify additional code to be executed by an interpreter at pre and post-paragraph code execution. This is primarily useful if you need to run the same set of code for all of the paragraphs within your notebook at specific times. Currently, this feature is only available for the spark and pyspark interpreters. To specify your hook code, you may use '`z.registerHook()`. For example, enter the following into one paragraph:

```python
%pyspark
z.registerHook("post_exec", "print 'This code should be executed before the parapgraph code!'")
z.registerHook("pre_exec", "print 'This code should be executed after the paragraph code!'")
```

These calls will not take into effect until the next time you run a paragraph. In another paragraph, enter
```python
%pyspark
print "This code should be entered into the paragraph by the user!"
```

The output should be:
```
This code should be executed before the paragraph code!
This code should be entered into the paragraph by the user!
This code should be executed after the paragraph code!
```

If you ever need to know the hook code, use `z.getHook()`:
```python
%pyspark
print z.getHook("post_exec")
```
```
print 'This code should be executed after the paragraph code!'
```
Any call to `z.registerHook()` will automatically overwrite what was previously registered. To completely unregister a hook event, use `z.unregisterHook(eventCode)`. Currently only `"post_exec"` and `"pre_exec"` are valid event codes for the Zeppelin Hook Registry system.

Finally, the hook registry is internally shared by other interpreters in the same group. This would allow for hook code for one interpreter REPL to be set by another as follows:

```scala
%spark
z.unregisterHook("post_exec", "pyspark")
```
The API is identical for both the spark (scala) and pyspark (python) implementations.

### Caveats
Calls to `z.registerHook("pre_exec", ...)` should be made with care. If there are errors in your specified hook code, this will cause the interpreter REPL to become unable to execute any code pass the pre-execute stage making it impossible for direct calls to `z.unregisterHook()` to take into effect. Current workarounds include calling `z.unregisterHook()` from a different interpreter REPL in the same interpreter group (see above) or manually restarting the interpreter group in the UI.

Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.InterpreterProperty;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
Expand Down Expand Up @@ -101,6 +102,7 @@ public class SparkInterpreter extends Interpreter {
private SparkConf conf;
private static SparkContext sc;
private static SQLContext sqlc;
private static InterpreterHookRegistry hooks;
private static SparkEnv env;
private static Object sparkSession; // spark 2.x
private static JobProgressListener sparkListener;
Expand Down Expand Up @@ -778,8 +780,10 @@ public void open() {
sqlc = getSQLContext();

dep = getDependencyResolver();

hooks = getInterpreterGroup().getInterpreterHookRegistry();

z = new ZeppelinContext(sc, sqlc, null, dep,
z = new ZeppelinContext(sc, sqlc, null, dep, hooks,
Integer.parseInt(getProperty("zeppelin.spark.maxResult")));

interpret("@transient val _binder = new java.util.HashMap[String, Object]()");
Expand Down
104 changes: 103 additions & 1 deletion spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;

import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.zeppelin.annotation.ZeppelinApi;
import org.apache.zeppelin.annotation.Experimental;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectWatcher;
Expand All @@ -41,6 +44,7 @@
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
Expand All @@ -53,19 +57,33 @@
* Spark context for zeppelin.
*/
public class ZeppelinContext {
// Map interpreter class name (to be used by hook registry) from
// given replName in parapgraph
private static final Map<String, String> interpreterClassMap;
static {
interpreterClassMap = new HashMap<String, String>();
interpreterClassMap.put("spark", "org.apache.zeppelin.spark.SparkInterpreter");
interpreterClassMap.put("sql", "org.apache.zeppelin.spark.SparkSqlInterpreter");
interpreterClassMap.put("dep", "org.apache.zeppelin.spark.DepInterpreter");
interpreterClassMap.put("pyspark", "org.apache.zeppelin.spark.PySparkInterpreter");
}

private SparkDependencyResolver dep;
private InterpreterContext interpreterContext;
private int maxResult;
private List<Class> supportedClasses;

private InterpreterHookRegistry hooks;

public ZeppelinContext(SparkContext sc, SQLContext sql,
InterpreterContext interpreterContext,
SparkDependencyResolver dep,
InterpreterHookRegistry hooks,
int maxResult) {
this.sc = sc;
this.sqlContext = sql;
this.interpreterContext = interpreterContext;
this.dep = dep;
this.hooks = hooks;
this.maxResult = maxResult;
this.supportedClasses = new ArrayList<>();
try {
Expand Down Expand Up @@ -697,6 +715,90 @@ private void angularUnbind(String name, String noteId) {
registry.remove(name, noteId, null);
}

/**
* Get the interpreter class name from name entered in paragraph
* @param replName if replName is a valid className, return that instead.
*/
public String getClassNameFromReplName(String replName) {
for (String name : interpreterClassMap.values()) {
if (replName.equals(name)) {
return replName;
}
}

if (replName.contains("spark.")) {
replName = replName.replace("spark.", "");
}
return interpreterClassMap.get(replName);
}

/**
* General function to register hook event
* @param event The type of event to hook to (pre_exec, post_exec)
* @param cmd The code to be executed by the interpreter on given event
* @param replName Name of the interpreter
*/
@Experimental
public void registerHook(String event, String cmd, String replName) {
String noteId = interpreterContext.getNoteId();
String className = getClassNameFromReplName(replName);
hooks.register(noteId, className, event, cmd);
}

/**
* registerHook() wrapper for current repl
* @param event The type of event to hook to (pre_exec, post_exec)
* @param cmd The code to be executed by the interpreter on given event
*/
@Experimental
public void registerHook(String event, String cmd) {
String className = interpreterContext.getClassName();
registerHook(event, cmd, className);
}

/**
* Get the hook code
* @param event The type of event to hook to (pre_exec, post_exec)
* @param replName Name of the interpreter
*/
@Experimental
public String getHook(String event, String replName) {
String noteId = interpreterContext.getNoteId();
String className = getClassNameFromReplName(replName);
return hooks.get(noteId, className, event);
}

/**
* getHook() wrapper for current repl
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public String getHook(String event) {
String className = interpreterContext.getClassName();
return getHook(event, className);
}

/**
* Unbind code from given hook event
* @param event The type of event to hook to (pre_exec, post_exec)
* @param replName Name of the interpreter
*/
@Experimental
public void unregisterHook(String event, String replName) {
String noteId = interpreterContext.getNoteId();
String className = getClassNameFromReplName(replName);
hooks.unregister(noteId, className, event);
}

/**
* unregisterHook() wrapper for current repl
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public void unregisterHook(String event) {
String className = interpreterContext.getClassName();
unregisterHook(event, className);
}

/**
* Add object into resource pool
Expand Down
23 changes: 20 additions & 3 deletions spark/src/main/resources/python/zeppelin_pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ def put(self, key, value):
def get(self, key):
return self.__getitem__(key)

def input(self, name, defaultValue = ""):
def input(self, name, defaultValue=""):
return self.z.input(name, defaultValue)

def select(self, name, options, defaultValue = ""):
def select(self, name, options, defaultValue=""):
# auto_convert to ArrayList doesn't match the method signature on JVM side
tuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options))
iterables = gateway.jvm.scala.collection.JavaConversions.collectionAsScalaIterable(tuples)
return self.z.select(name, defaultValue, iterables)

def checkbox(self, name, options, defaultChecked = None):
def checkbox(self, name, options, defaultChecked=None):
if defaultChecked is None:
defaultChecked = list(map(lambda items: items[0], options))
optionTuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options))
Expand All @@ -99,6 +99,23 @@ def checkbox(self, name, options, defaultChecked = None):
checkedIterables = self.z.checkbox(name, defaultCheckedIterables, optionIterables)
return gateway.jvm.scala.collection.JavaConversions.asJavaCollection(checkedIterables)

def registerHook(self, event, cmd, replName=None):
if replName is None:
self.z.registerHook(event, cmd)
else:
self.z.registerHook(event, cmd, replName)

def unregisterHook(self, event, replName=None):
if replName is None:
self.z.unregisterHook(event)
else:
self.z.unregisterHook(event, replName)

def getHook(self, event, replName=None):
if replName is None:
return self.z.getHook(event)
return self.z.getHook(event, replName)

def __tupleToScalaTuple2(self, tuple):
if (len(tuple) == 2):
return gateway.jvm.scala.Tuple2(tuple[0], tuple[1])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.google.gson.annotations.SerializedName;
import org.apache.zeppelin.annotation.ZeppelinApi;
import org.apache.zeppelin.annotation.Experimental;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
Expand Down Expand Up @@ -203,6 +204,71 @@ public void setClassloaderUrls(URL[] classloaderUrls) {
this.classloaderUrls = classloaderUrls;
}

/**
* General function to register hook event
* @param noteId - Note to bind hook to
* @param event The type of event to hook to (pre_exec, post_exec)
* @param cmd The code to be executed by the interpreter on given event
*/
@Experimental
public void registerHook(String noteId, String event, String cmd) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InterpreterHookRegistry is directly passed to ZeppelinContext on it's constructor. And ZeppelinContext is exposed to user directly. So currently Interpreter.registerHook(),Interpreter.unregiserHook(),Interpreter.getHook() are accessed from only RemoteInterpreterServer. But RemoteInterpreterServer also can get directly access InterpreterHookRegistry from InterpreterGroup. In this way, i think the feature can be implemented without modification of Interpreter.java. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of see your point, moving the convenience methods from Interpreter.java to InterpreterGroup.java would save the need for defining wrappers in LazyOpenInterpreter.java. However this would also mean that the interpreter className must always be specified. This means for example that this line:

String cmdDev = interpreter.getHook(noteId, HookType.PRE_EXEC_DEV);

Would be changed to

String className = interpreter.getClassName();
String cmdDev = interpreter.getInterpreterGroup().getHook(noteId, className, HookType.PRE_EXEC_DEV);

Additionally while these methods are only currently called through RemoteInterpreterServer, I will have use cases for directly calling them in the Python Interpreter subclasses (eg, PySparkInterpreter and PythonInterpreter) for my upcoming matplotlib integration work. Given this, I felt that the current syntax would be clearer.

InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
String className = getClassName();
hooks.register(noteId, className, event, cmd);
}

/**
* registerHook() wrapper for global scope
* @param event The type of event to hook to (pre_exec, post_exec)
* @param cmd The code to be executed by the interpreter on given event
*/
@Experimental
public void registerHook(String event, String cmd) {
registerHook(null, event, cmd);
}

/**
* Get the hook code
* @param noteId - Note to bind hook to
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public String getHook(String noteId, String event) {
InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
String className = getClassName();
return hooks.get(noteId, className, event);
}

/**
* getHook() wrapper for global scope
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public String getHook(String event) {
return getHook(null, event);
}

/**
* Unbind code from given hook event
* @param noteId - Note to bind hook to
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public void unregisterHook(String noteId, String event) {
InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
String className = getClassName();
hooks.unregister(noteId, className, event);
}

/**
* unregisterHook() wrapper for global scope
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public void unregisterHook(String event) {
unregisterHook(null, event);
}

@ZeppelinApi
public Interpreter getInterpreterInTheSameSessionByClassName(String className) {
synchronized (interpreterGroup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public static void remove() {
private AngularObjectRegistry angularObjectRegistry;
private ResourcePool resourcePool;
private List<InterpreterContextRunner> runners;
private String className;

public InterpreterContext(String noteId,
String paragraphId,
Expand Down Expand Up @@ -124,4 +125,11 @@ public List<InterpreterContextRunner> getRunners() {
return runners;
}

public String getClassName() {
return className;
}

public void setClassName(String className) {
this.className = className;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
Logger LOGGER = Logger.getLogger(InterpreterGroup.class);

AngularObjectRegistry angularObjectRegistry;
InterpreterHookRegistry hookRegistry;
RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process
ResourcePool resourcePool;
boolean angularRegistryPushed = false;
Expand Down Expand Up @@ -118,10 +119,18 @@ public Properties getProperty() {
public AngularObjectRegistry getAngularObjectRegistry() {
return angularObjectRegistry;
}

public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) {
this.angularObjectRegistry = angularObjectRegistry;
}

public InterpreterHookRegistry getInterpreterHookRegistry() {
return hookRegistry;
}

public void setInterpreterHookRegistry(InterpreterHookRegistry hookRegistry) {
this.hookRegistry = hookRegistry;
}

public RemoteInterpreterProcess getRemoteInterpreterProcess() {
return remoteInterpreterProcess;
Expand Down
Loading