) tezDAGStatsMapField.get(stat);
+ for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+ LOGGER.debug("Tez JobId:" + dagStats.getJobId());
+ jobIds.add(dagStats.getJobId());
+ }
+ return jobIds;
+ } catch (Exception e) {
+ LOGGER.error("Can not extract jobIds from TezPigScriptStats", e);
+ throw new RuntimeException("Can not extract jobIds from TezPigScriptStats", e);
+ }
+ }
+}
diff --git a/pig/src/main/resources/interpreter-setting.json b/pig/src/main/resources/interpreter-setting.json
new file mode 100644
index 00000000000..27918ede1bf
--- /dev/null
+++ b/pig/src/main/resources/interpreter-setting.json
@@ -0,0 +1,46 @@
+[
+ {
+ "group": "pig",
+ "name": "script",
+ "className": "org.apache.zeppelin.pig.PigInterpreter",
+ "properties": {
+ "zeppelin.pig.execType": {
+ "envName": null,
+ "propertyName": "zeppelin.pig.execType",
+ "defaultValue": "mapreduce",
+ "description": "local | mapreduce | tez"
+ },
+ "zeppelin.pig.includeJobStats": {
+ "envName": null,
+ "propertyName": "zeppelin.pig.includeJobStats",
+ "defaultValue": "false",
+ "description": "flag to include job stats in output"
+ }
+ },
+ "editor": {
+ "language": "pig"
+ }
+ },
+ {
+ "group": "pig",
+ "name": "query",
+ "className": "org.apache.zeppelin.pig.PigQueryInterpreter",
+ "properties": {
+ "zeppelin.pig.execType": {
+ "envName": null,
+ "propertyName": "zeppelin.pig.execType",
+ "defaultValue": "mapreduce",
+ "description": "local | mapreduce | tez"
+ },
+ "zeppelin.pig.maxResult": {
+ "envName": null,
+ "propertyName": "zeppelin.pig.maxResult",
+ "defaultValue": "1000",
+ "description": "max row number for %pig.query"
+ }
+ },
+ "editor": {
+ "language": "pig"
+ }
+ }
+]
diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java
new file mode 100644
index 00000000000..3d062d61579
--- /dev/null
+++ b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.pig;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PigInterpreterTest {
+
+ private PigInterpreter pigInterpreter;
+ private InterpreterContext context;
+
+ @Before
+ public void setUp() {
+ Properties properties = new Properties();
+ properties.put("zeppelin.pig.execType", "local");
+ pigInterpreter = new PigInterpreter(properties);
+ pigInterpreter.open();
+ context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null,
+ null, null);
+ }
+
+ @After
+ public void tearDown() {
+ pigInterpreter.close();
+ }
+
+ @Test
+ public void testBasics() throws IOException {
+ String content = "1\tandy\n"
+ + "2\tpeter\n";
+ File tmpFile = File.createTempFile("zeppelin", "test");
+ FileWriter writer = new FileWriter(tmpFile);
+ IOUtils.write(content, writer);
+ writer.close();
+
+ // simple pig script using dump
+ String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
+ + "dump a;";
+ InterpreterResult result = pigInterpreter.interpret(pigscript, context);
+ assertEquals(Type.TEXT, result.type());
+ assertEquals(Code.SUCCESS, result.code());
+ assertTrue(result.message().contains("(1,andy)\n(2,peter)"));
+
+ // describe
+ pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
+ + "describe a;";
+ result = pigInterpreter.interpret(pigscript, context);
+ assertEquals(Type.TEXT, result.type());
+ assertEquals(Code.SUCCESS, result.code());
+ assertTrue(result.message().contains("a: {id: int,name: bytearray}"));
+
+ // syntax error (compilation error)
+ pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
+ + "describe a;";
+ result = pigInterpreter.interpret(pigscript, context);
+ assertEquals(Type.TEXT, result.type());
+ assertEquals(Code.ERROR, result.code());
+ assertTrue(result.message().contains("Syntax error, unexpected symbol at or near 'a'"));
+
+ // execution error
+ pigscript = "a = load 'invalid_path';"
+ + "dump a;";
+ result = pigInterpreter.interpret(pigscript, context);
+ assertEquals(Type.TEXT, result.type());
+ assertEquals(Code.ERROR, result.code());
+ assertTrue(result.message().contains("Input path does not exist"));
+ }
+
+
+ @Test
+ public void testIncludeJobStats() throws IOException {
+ Properties properties = new Properties();
+ properties.put("zeppelin.pig.execType", "local");
+ properties.put("zeppelin.pig.includeJobStats", "true");
+ pigInterpreter = new PigInterpreter(properties);
+ pigInterpreter.open();
+
+ String content = "1\tandy\n"
+ + "2\tpeter\n";
+ File tmpFile = File.createTempFile("zeppelin", "test");
+ FileWriter writer = new FileWriter(tmpFile);
+ IOUtils.write(content, writer);
+ writer.close();
+
+ // simple pig script using dump
+ String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
+ + "dump a;";
+ InterpreterResult result = pigInterpreter.interpret(pigscript, context);
+ assertEquals(Type.TEXT, result.type());
+ assertEquals(Code.SUCCESS, result.code());
+ assertTrue(result.message().contains("Counters:"));
+ assertTrue(result.message().contains("(1,andy)\n(2,peter)"));
+
+ // describe
+ pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
+ + "describe a;";
+ result = pigInterpreter.interpret(pigscript, context);
+ assertEquals(Type.TEXT, result.type());
+ assertEquals(Code.SUCCESS, result.code());
+ // no job is launched, so no jobStats
+ assertTrue(!result.message().contains("Counters:"));
+ assertTrue(result.message().contains("a: {id: int,name: bytearray}"));
+
+ // syntax error (compilation error)
+ pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
+ + "describe a;";
+ result = pigInterpreter.interpret(pigscript, context);
+ assertEquals(Type.TEXT, result.type());
+ assertEquals(Code.ERROR, result.code());
+ // no job is launched, so no jobStats
+ assertTrue(!result.message().contains("Counters:"));
+ assertTrue(result.message().contains("Syntax error, unexpected symbol at or near 'a'"));
+
+ // execution error
+ pigscript = "a = load 'invalid_path';"
+ + "dump a;";
+ result = pigInterpreter.interpret(pigscript, context);
+ assertEquals(Type.TEXT, result.type());
+ assertEquals(Code.ERROR, result.code());
+ assertTrue(result.message().contains("Counters:"));
+ assertTrue(result.message().contains("Input path does not exist"));
+ }
+}
diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java
new file mode 100644
index 00000000000..00ece440542
--- /dev/null
+++ b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.pig;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ */
+public class PigQueryInterpreterTest {
+
+ private PigInterpreter pigInterpreter;
+ private PigQueryInterpreter pigQueryInterpreter;
+ private InterpreterContext context;
+
+ @Before
+ public void setUp() {
+ Properties properties = new Properties();
+ properties.put("zeppelin.pig.execType", "local");
+ properties.put("zeppelin.pig.maxResult", "20");
+
+ pigInterpreter = new PigInterpreter(properties);
+ pigQueryInterpreter = new PigQueryInterpreter(properties);
+ List interpreters = new ArrayList();
+ interpreters.add(pigInterpreter);
+ interpreters.add(pigQueryInterpreter);
+ InterpreterGroup group = new InterpreterGroup();
+ group.put("note_id", interpreters);
+ pigInterpreter.setInterpreterGroup(group);
+ pigQueryInterpreter.setInterpreterGroup(group);
+ pigInterpreter.open();
+ pigQueryInterpreter.open();
+
+ context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null,
+ null, null);
+ }
+
+ @After
+ public void tearDown() {
+ pigInterpreter.close();
+ pigQueryInterpreter.close();
+ }
+
+ @Test
+ public void testBasics() throws IOException {
+ String content = "andy\tmale\t10\n"
+ + "peter\tmale\t20\n"
+ + "amy\tfemale\t14\n";
+ File tmpFile = File.createTempFile("zeppelin", "test");
+ FileWriter writer = new FileWriter(tmpFile);
+ IOUtils.write(content, writer);
+ writer.close();
+
+ // run script in PigInterpreter
+ String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (name, gender, age);\n"
+ + "a2 = load 'invalid_path' as (name, gender, age);\n"
+ + "dump a;";
+ InterpreterResult result = pigInterpreter.interpret(pigscript, context);
+ assertEquals(InterpreterResult.Type.TEXT, result.type());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(result.message().contains("(andy,male,10)\n(peter,male,20)\n(amy,female,14)"));
+
+ // run single line query in PigQueryInterpreter
+ String query = "foreach a generate name, age;";
+ result = pigQueryInterpreter.interpret(query, context);
+ assertEquals(InterpreterResult.Type.TABLE, result.type());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("name\tage\nandy\t10\npeter\t20\namy\t14\n", result.message());
+
+ // run multiple line query in PigQueryInterpreter
+ query = "b = group a by gender;\nforeach b generate group as gender, COUNT($1) as count;";
+ result = pigQueryInterpreter.interpret(query, context);
+ assertEquals(InterpreterResult.Type.TABLE, result.type());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("gender\tcount\nmale\t2\nfemale\t1\n", result.message());
+
+ // syntax error in PigQueryInterpereter
+ query = "b = group a by invalid_column;\nforeach b generate group as gender, COUNT($1) as count;";
+ result = pigQueryInterpreter.interpret(query, context);
+ assertEquals(InterpreterResult.Type.TEXT, result.type());
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ assertTrue(result.message().contains("Projected field [invalid_column] does not exist in schema"));
+
+ // execution error in PigQueryInterpreter
+ query = "foreach a2 generate name, age;";
+ result = pigQueryInterpreter.interpret(query, context);
+ assertEquals(InterpreterResult.Type.TEXT, result.type());
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ assertTrue(result.message().contains("Input path does not exist"));
+ }
+
+ @Test
+ public void testMaxResult() throws IOException {
+ StringBuilder content = new StringBuilder();
+ for (int i=0;i<30;++i) {
+ content.append(i + "\tname_" + i + "\n");
+ }
+ File tmpFile = File.createTempFile("zeppelin", "test");
+ FileWriter writer = new FileWriter(tmpFile);
+ IOUtils.write(content, writer);
+ writer.close();
+
+ // run script in PigInterpreter
+ String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id, name);";
+ InterpreterResult result = pigInterpreter.interpret(pigscript, context);
+ assertEquals(InterpreterResult.Type.TEXT, result.type());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ // empty output
+ assertTrue(result.message().isEmpty());
+
+ // run single line query in PigQueryInterpreter
+ String query = "foreach a generate id;";
+ result = pigQueryInterpreter.interpret(query, context);
+ assertEquals(InterpreterResult.Type.TABLE, result.type());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(result.message().contains("id\n0\n1\n2"));
+ assertTrue(result.message().contains("Results are limited by 20"));
+ }
+}
diff --git a/pig/src/test/resources/log4j.properties b/pig/src/test/resources/log4j.properties
new file mode 100644
index 00000000000..8daee59d60d
--- /dev/null
+++ b/pig/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger = INFO, stdout
+
+log4j.appender.stdout = org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
diff --git a/pom.xml b/pom.xml
index 03b226341e1..558ce0624a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,7 @@
shell
livy
hbase
+ pig
postgresql
jdbc
file
diff --git a/spark-dependencies/pom.xml b/spark-dependencies/pom.xml
index f320680072b..04b6983fb03 100644
--- a/spark-dependencies/pom.xml
+++ b/spark-dependencies/pom.xml
@@ -525,9 +525,9 @@
true
- 2.0.0
+ 2.0.1
2.5.0
- 0.10.1
+ 0.10.3
2.11.8
diff --git a/spark/pom.xml b/spark/pom.xml
index 66d93c42ee6..efb7452647c 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -37,7 +37,7 @@
1.8.2
1.10.19
1.6.4
- 2.0.0
+ 2.0.1
@@ -519,9 +519,9 @@
true
- 2.0.0
+ 2.0.1
2.5.0
- 0.10.1
+ 0.10.3
2.11.8
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 878c0106b06..0812c761038 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -49,6 +49,7 @@
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.InterpreterProperty;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
@@ -101,6 +102,7 @@ public class SparkInterpreter extends Interpreter {
private SparkConf conf;
private static SparkContext sc;
private static SQLContext sqlc;
+ private static InterpreterHookRegistry hooks;
private static SparkEnv env;
private static Object sparkSession; // spark 2.x
private static JobProgressListener sparkListener;
@@ -479,7 +481,7 @@ private void setupConfForPySpark(SparkConf conf) {
//Only one of py4j-0.9-src.zip and py4j-0.8.2.1-src.zip should exist
String[] pythonLibs = new String[]{"pyspark.zip", "py4j-0.9-src.zip", "py4j-0.8.2.1-src.zip",
- "py4j-0.10.1-src.zip"};
+ "py4j-0.10.1-src.zip", "py4j-0.10.3-src.zip"};
ArrayList pythonLibUris = new ArrayList<>();
for (String lib : pythonLibs) {
File libFile = new File(pysparkPath, lib);
@@ -813,8 +815,10 @@ public void open() {
sqlc = getSQLContext();
dep = getDependencyResolver();
+
+ hooks = getInterpreterGroup().getInterpreterHookRegistry();
- z = new ZeppelinContext(sc, sqlc, null, dep,
+ z = new ZeppelinContext(sc, sqlc, null, dep, hooks,
Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
interpret("@transient val _binder = new java.util.HashMap[String, Object]()");
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
index 7465756e14f..92b50d0a3b8 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
@@ -28,11 +28,14 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.zeppelin.annotation.ZeppelinApi;
+import org.apache.zeppelin.annotation.Experimental;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectWatcher;
@@ -41,6 +44,7 @@
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
@@ -53,19 +57,33 @@
* Spark context for zeppelin.
*/
public class ZeppelinContext {
+ // Map interpreter class name (to be used by hook registry) from
+ // given replName in parapgraph
+ private static final Map interpreterClassMap;
+ static {
+ interpreterClassMap = new HashMap();
+ interpreterClassMap.put("spark", "org.apache.zeppelin.spark.SparkInterpreter");
+ interpreterClassMap.put("sql", "org.apache.zeppelin.spark.SparkSqlInterpreter");
+ interpreterClassMap.put("dep", "org.apache.zeppelin.spark.DepInterpreter");
+ interpreterClassMap.put("pyspark", "org.apache.zeppelin.spark.PySparkInterpreter");
+ }
+
private SparkDependencyResolver dep;
private InterpreterContext interpreterContext;
private int maxResult;
private List supportedClasses;
-
+ private InterpreterHookRegistry hooks;
+
public ZeppelinContext(SparkContext sc, SQLContext sql,
InterpreterContext interpreterContext,
SparkDependencyResolver dep,
+ InterpreterHookRegistry hooks,
int maxResult) {
this.sc = sc;
this.sqlContext = sql;
this.interpreterContext = interpreterContext;
this.dep = dep;
+ this.hooks = hooks;
this.maxResult = maxResult;
this.supportedClasses = new ArrayList<>();
try {
@@ -697,6 +715,90 @@ private void angularUnbind(String name, String noteId) {
registry.remove(name, noteId, null);
}
+ /**
+ * Get the interpreter class name from name entered in paragraph
+ * @param replName if replName is a valid className, return that instead.
+ */
+ public String getClassNameFromReplName(String replName) {
+ for (String name : interpreterClassMap.values()) {
+ if (replName.equals(name)) {
+ return replName;
+ }
+ }
+
+ if (replName.contains("spark.")) {
+ replName = replName.replace("spark.", "");
+ }
+ return interpreterClassMap.get(replName);
+ }
+
+ /**
+ * General function to register hook event
+ * @param event The type of event to hook to (pre_exec, post_exec)
+ * @param cmd The code to be executed by the interpreter on given event
+ * @param replName Name of the interpreter
+ */
+ @Experimental
+ public void registerHook(String event, String cmd, String replName) {
+ String noteId = interpreterContext.getNoteId();
+ String className = getClassNameFromReplName(replName);
+ hooks.register(noteId, className, event, cmd);
+ }
+
+ /**
+ * registerHook() wrapper for current repl
+ * @param event The type of event to hook to (pre_exec, post_exec)
+ * @param cmd The code to be executed by the interpreter on given event
+ */
+ @Experimental
+ public void registerHook(String event, String cmd) {
+ String className = interpreterContext.getClassName();
+ registerHook(event, cmd, className);
+ }
+
+ /**
+ * Get the hook code
+ * @param event The type of event to hook to (pre_exec, post_exec)
+ * @param replName Name of the interpreter
+ */
+ @Experimental
+ public String getHook(String event, String replName) {
+ String noteId = interpreterContext.getNoteId();
+ String className = getClassNameFromReplName(replName);
+ return hooks.get(noteId, className, event);
+ }
+
+ /**
+ * getHook() wrapper for current repl
+ * @param event The type of event to hook to (pre_exec, post_exec)
+ */
+ @Experimental
+ public String getHook(String event) {
+ String className = interpreterContext.getClassName();
+ return getHook(event, className);
+ }
+
+ /**
+ * Unbind code from given hook event
+ * @param event The type of event to hook to (pre_exec, post_exec)
+ * @param replName Name of the interpreter
+ */
+ @Experimental
+ public void unregisterHook(String event, String replName) {
+ String noteId = interpreterContext.getNoteId();
+ String className = getClassNameFromReplName(replName);
+ hooks.unregister(noteId, className, event);
+ }
+
+ /**
+ * unregisterHook() wrapper for current repl
+ * @param event The type of event to hook to (pre_exec, post_exec)
+ */
+ @Experimental
+ public void unregisterHook(String event) {
+ String className = interpreterContext.getClassName();
+ unregisterHook(event, className);
+ }
/**
* Add object into resource pool
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
index 26488337b11..961793db17e 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
@@ -141,6 +141,9 @@ public void open() throws IOException {
cmd.addArgument(Integer.toString(port));
cmd.addArgument(libPath);
cmd.addArgument(Integer.toString(sparkVersion.toNumber()));
+
+ // dump out the R command to facilitate manually running it, e.g. for fault diagnosis purposes
+ logger.debug(cmd.toString());
executor = new DefaultExecutor();
outputStream = new SparkOutputStream(logger);
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index 49e60d4577d..a53f503b83c 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -80,16 +80,16 @@ def put(self, key, value):
def get(self, key):
return self.__getitem__(key)
- def input(self, name, defaultValue = ""):
+ def input(self, name, defaultValue=""):
return self.z.input(name, defaultValue)
- def select(self, name, options, defaultValue = ""):
+ def select(self, name, options, defaultValue=""):
# auto_convert to ArrayList doesn't match the method signature on JVM side
tuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options))
iterables = gateway.jvm.scala.collection.JavaConversions.collectionAsScalaIterable(tuples)
return self.z.select(name, defaultValue, iterables)
- def checkbox(self, name, options, defaultChecked = None):
+ def checkbox(self, name, options, defaultChecked=None):
if defaultChecked is None:
defaultChecked = list(map(lambda items: items[0], options))
optionTuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options))
@@ -99,6 +99,23 @@ def checkbox(self, name, options, defaultChecked = None):
checkedIterables = self.z.checkbox(name, defaultCheckedIterables, optionIterables)
return gateway.jvm.scala.collection.JavaConversions.asJavaCollection(checkedIterables)
+ def registerHook(self, event, cmd, replName=None):
+ if replName is None:
+ self.z.registerHook(event, cmd)
+ else:
+ self.z.registerHook(event, cmd, replName)
+
+ def unregisterHook(self, event, replName=None):
+ if replName is None:
+ self.z.unregisterHook(event)
+ else:
+ self.z.unregisterHook(event, replName)
+
+ def getHook(self, event, replName=None):
+ if replName is None:
+ return self.z.getHook(event)
+ return self.z.getHook(event, replName)
+
def __tupleToScalaTuple2(self, tuple):
if (len(tuple) == 2):
return gateway.jvm.scala.Tuple2(tuple[0], tuple[1])
diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE
index 2ee668a130e..99853d77c62 100644
--- a/zeppelin-distribution/src/bin_license/LICENSE
+++ b/zeppelin-distribution/src/bin_license/LICENSE
@@ -116,7 +116,6 @@ The following components are provided under Apache License.
(Apache 2.0) Utility classes for Jetty (org.mortbay.jetty:jetty-util:6.1.26 - http://javadox.com/org.mortbay.jetty/jetty/6.1.26/overview-tree.html)
(Apache 2.0) Servlet API (org.mortbay.jetty:servlet-api:2.5-20081211 - https://en.wikipedia.org/wiki/Jetty_(web_server))
(Apache 2.0) Google HTTP Client Library for Java (com.google.http-client:google-http-client-jackson2:1.21.0 - https://github.com/google/google-http-java-client/tree/dev/google-http-client-jackson2)
- (Apache 2.0) angular-esri-map (https://github.com/Esri/angular-esri-map)
(Apache 2.0) pegdown (org.pegdown:pegdown:1.6.0 - https://github.com/sirthias/pegdown)
(Apache 2.0) parboiled-java (org.parboiled:parboiled-java:1.1.7 - https://github.com/sirthias/parboiled)
(Apache 2.0) parboiled-core (org.parboiled:parboiled-core:1.1.7 - https://github.com/sirthias/parboiled)
@@ -156,7 +155,15 @@ The following components are provided under Apache License.
(Apache 2.0) Tachyon Project Core (org.tachyonproject:tachyon:0.6.4 - http://tachyonproject.org/tachyon/)
(Apache 2.0) Tachyon Project Client (org.tachyonproject:tachyon-client:0.6.4 - http://tachyonproject.org/tachyon-client/)
(Apache 2.0) javax.inject (javax.inject:javax.inject:1 - http://code.google.com/p/atinject/)
-
+ (Apache 2.0) Apache Pig (org.apache.pig:0.16 - http://pig.apache.org)
+ (Apache 2.0) tez-api (org.apache.tez:tez-api:0.7.0 - http://tez.apache.org)
+ (Apache 2.0) tez-common (org.apache.tez:tez-common:0.7.0 - http://tez.apache.org)
+ (Apache 2.0) tez-dag (org.apache.tez:tez-dag:0.7.0 - http://tez.apache.org)
+ (Apache 2.0) tez-runtime-library (org.apache.tez:runtime-library:0.7.0 - http://tez.apache.org)
+ (Apache 2.0) tez-runtime-internals (org.apache.tez:tez-runtime-internals:0.7.0 - http://tez.apache.org)
+ (Apache 2.0) tez-mapreduce (org.apache.tez:tez-mapreduce:0.7.0 - http://tez.apache.org)
+ (Apache 2.0) tez-yarn-timeline-history-with-acls (org.apache.tez:tez-yarn-timeline-history-with-acls:0.7.0 - http://tez.apache.org)
+
========================================================================
MIT licenses
========================================================================
@@ -252,7 +259,7 @@ The following components are provided under the BSD-style License.
(BSD-like) ASM asm-utils (org.ow2.asm:asm-utils:5.0.3 - http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
(New BSD License) Markdown4j (org.commonjava.googlecode.markdown4j:markdown4j:jar:2.2-cj-1.0 - https://code.google.com/p/markdown4j/)
(New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
- (New BSD License) Py4J (net.sf.py4j:py4j:0.10.1 - http://py4j.sourceforge.net/) - https://github.com/bartdag/py4j/blob/0.10.1/LICENSE.txt
+ (New BSD License) Py4J (net.sf.py4j:py4j:0.10.3 - http://py4j.sourceforge.net/) - https://github.com/bartdag/py4j/blob/0.10.3/LICENSE.txt
(New BSD License) Markdown4j (org.commonjava.googlecode.markdown4j:markdown4j:jar:2.2-cj-1.0 - https://code.google.com/p/markdown4j/)
(BSD 3 Clause) Paranamer (com.thoughtworks.paranamer:paranamer:jar:2.6) - https://github.com/paul-hammant/paranamer/blob/paranamer-parent-2.6/LICENSE.txt
(BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
index 9678b4691df..3e323200f6e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
@@ -27,6 +27,7 @@
import com.google.gson.annotations.SerializedName;
import org.apache.zeppelin.annotation.ZeppelinApi;
+import org.apache.zeppelin.annotation.Experimental;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@@ -203,6 +204,71 @@ public void setClassloaderUrls(URL[] classloaderUrls) {
this.classloaderUrls = classloaderUrls;
}
+ /**
+ * General function to register hook event
+ * @param noteId - Note to bind hook to
+ * @param event The type of event to hook to (pre_exec, post_exec)
+ * @param cmd The code to be executed by the interpreter on given event
+ */
+ @Experimental
+ public void registerHook(String noteId, String event, String cmd) {
+ InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
+ String className = getClassName();
+ hooks.register(noteId, className, event, cmd);
+ }
+
+ /**
+ * registerHook() wrapper for global scope
+ * @param event The type of event to hook to (pre_exec, post_exec)
+ * @param cmd The code to be executed by the interpreter on given event
+ */
+ @Experimental
+ public void registerHook(String event, String cmd) {
+ registerHook(null, event, cmd);
+ }
+
+ /**
+ * Get the hook code
+ * @param noteId - Note to bind hook to
+ * @param event The type of event to hook to (pre_exec, post_exec)
+ */
+ @Experimental
+ public String getHook(String noteId, String event) {
+ InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
+ String className = getClassName();
+ return hooks.get(noteId, className, event);
+ }
+
+ /**
+ * getHook() wrapper for global scope
+ * @param event The type of event to hook to (pre_exec, post_exec)
+ */
+ @Experimental
+ public String getHook(String event) {
+ return getHook(null, event);
+ }
+
+ /**
+ * Unbind code from given hook event
+ * @param noteId - Note to bind hook to
+ * @param event The type of event to hook to (pre_exec, post_exec)
+ */
+ @Experimental
+ public void unregisterHook(String noteId, String event) {
+ InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
+ String className = getClassName();
+ hooks.unregister(noteId, className, event);
+ }
+
+ /**
+ * unregisterHook() wrapper for global scope
+ * @param event The type of event to hook to (pre_exec, post_exec)
+ */
+ @Experimental
+ public void unregisterHook(String event) {
+ unregisterHook(null, event);
+ }
+
@ZeppelinApi
public Interpreter getInterpreterInTheSameSessionByClassName(String className) {
synchronized (interpreterGroup) {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 21ca2e67b72..e33b9352252 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -57,6 +57,7 @@ public static void remove() {
private AngularObjectRegistry angularObjectRegistry;
private ResourcePool resourcePool;
private List runners;
+ private String className;
public InterpreterContext(String noteId,
String paragraphId,
@@ -124,4 +125,11 @@ public List getRunners() {
return runners;
}
+ public String getClassName() {
+ return className;
+ }
+
+ public void setClassName(String className) {
+ this.className = className;
+ }
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index bc56784b15b..f3b158c26a6 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -33,7 +33,7 @@
* and InterpreterGroup will have reference to these all interpreters.
*
* Remember, list of interpreters are dedicated to a note.
- * (when InterpreterOption.perNoteSession==true)
+ * (when InterpreterOption.session==true)
* So InterpreterGroup internally manages map of [noteId, list of interpreters]
*
* A InterpreterGroup runs on interpreter process.
@@ -45,6 +45,7 @@ public class InterpreterGroup extends ConcurrentHashMap intpForNote = this.get(noteId);
destroy(intpForNote);
+
+ if (remoteInterpreterProcess != null) {
+ remoteInterpreterProcess.dereference();
+ if (remoteInterpreterProcess.referenceCount() <= 0) {
+ remoteInterpreterProcess = null;
+ allInterpreterGroups.remove(id);
+ }
+ }
}
@@ -213,6 +230,7 @@ public void destroy() {
while (remoteInterpreterProcess.referenceCount() > 0) {
remoteInterpreterProcess.dereference();
}
+ remoteInterpreterProcess = null;
}
allInterpreterGroups.remove(id);
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterHookListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterHookListener.java
new file mode 100644
index 00000000000..c70212c7b7e
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterHookListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+/**
+ * An interface for processing custom callback code into the interpreter.
+ */
+public interface InterpreterHookListener {
+ /**
+ * Prepends pre-execute hook code to the script that will be interpreted
+ */
+ public void onPreExecute(String script);
+
+ /**
+ * Appends post-execute hook code to the script that will be interpreted
+ */
+ public void onPostExecute(String script);
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterHookRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterHookRegistry.java
new file mode 100644
index 00000000000..0917775d692
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterHookRegistry.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The InterpreterinterpreterHookRegistry specifies code to be conditionally executed by an
+ * interpreter. The constants defined in this class denote currently
+ * supported events. Each instance is bound to a single InterpreterGroup.
+ * Scope is determined on a per-note basis (except when null for global scope).
+ */
+public class InterpreterHookRegistry {
+ public static final String GLOBAL_KEY = "_GLOBAL_";
+ private String interpreterId;
+ private Map>> registry =
+ new HashMap>>();
+
+ /**
+ * hookRegistry constructor.
+ *
+ * @param interpreterId The Id of the InterpreterGroup instance to bind to
+ */
+ public InterpreterHookRegistry(final String interpreterId) {
+ this.interpreterId = interpreterId;
+ }
+
+ /**
+ * Get the interpreterGroup id this instance is bound to
+ */
+ public String getInterpreterId() {
+ return interpreterId;
+ }
+
+ /**
+ * Adds a note to the registry
+ *
+ * @param noteId The Id of the Note instance to add
+ */
+ public void addNote(String noteId) {
+ synchronized (registry) {
+ if (registry.get(noteId) == null) {
+ registry.put(noteId, new HashMap>());
+ }
+ }
+ }
+
+ /**
+ * Adds a className to the registry
+ *
+ * @param noteId The note id
+ * @param className The name of the interpreter repl to map the hooks to
+ */
+ public void addRepl(String noteId, String className) {
+ synchronized (registry) {
+ addNote(noteId);
+ if (registry.get(noteId).get(className) == null) {
+ registry.get(noteId).put(className, new HashMap());
+ }
+ }
+ }
+
+ /**
+ * Register a hook for a specific event.
+ *
+ * @param noteId Denotes the note this instance belongs to
+ * @param className The name of the interpreter repl to map the hooks to
+ * @param event hook event (see constants defined in this class)
+ * @param cmd Code to be executed by the interpreter
+ */
+ public void register(String noteId, String className,
+ String event, String cmd) throws IllegalArgumentException {
+ synchronized (registry) {
+ if (noteId == null) {
+ noteId = GLOBAL_KEY;
+ }
+ addRepl(noteId, className);
+ if (!event.equals(HookType.POST_EXEC) && !event.equals(HookType.PRE_EXEC) &&
+ !event.equals(HookType.POST_EXEC_DEV) && !event.equals(HookType.PRE_EXEC_DEV)) {
+ throw new IllegalArgumentException("Must be " + HookType.POST_EXEC + ", " +
+ HookType.POST_EXEC_DEV + ", " +
+ HookType.PRE_EXEC + " or " +
+ HookType.PRE_EXEC_DEV);
+ }
+ registry.get(noteId).get(className).put(event, cmd);
+ }
+ }
+
+ /**
+ * Unregister a hook for a specific event.
+ *
+ * @param noteId Denotes the note this instance belongs to
+ * @param className The name of the interpreter repl to map the hooks to
+ * @param event hook event (see constants defined in this class)
+ */
+ public void unregister(String noteId, String className, String event) {
+ synchronized (registry) {
+ if (noteId == null) {
+ noteId = GLOBAL_KEY;
+ }
+ addRepl(noteId, className);
+ registry.get(noteId).get(className).remove(event);
+ }
+ }
+
+ /**
+ * Get a hook for a specific event.
+ *
+ * @param noteId Denotes the note this instance belongs to
+ * @param className The name of the interpreter repl to map the hooks to
+ * @param event hook event (see constants defined in this class)
+ */
+ public String get(String noteId, String className, String event) {
+ synchronized (registry) {
+ if (noteId == null) {
+ noteId = GLOBAL_KEY;
+ }
+ addRepl(noteId, className);
+ return registry.get(noteId).get(className).get(event);
+ }
+ }
+
+ /**
+ * Container for hook event type constants
+ */
+ public static final class HookType {
+ // Execute the hook code PRIOR to main paragraph code execution
+ public static final String PRE_EXEC = "pre_exec";
+
+ // Execute the hook code AFTER main paragraph code execution
+ public static final String POST_EXEC = "post_exec";
+
+ // Same as above but reserved for interpreter developers, in order to allow
+ // notebook users to use the above without overwriting registry settings
+ // that are initialized directly in subclasses of Interpreter.
+ public static final String PRE_EXEC_DEV = "pre_exec_dev";
+ public static final String POST_EXEC_DEV = "post_exec_dev";
+ }
+
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
index c62ab05eb1f..425ae20a4f1 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
@@ -147,4 +147,34 @@ public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
public void setClassloaderUrls(URL [] urls) {
intp.setClassloaderUrls(urls);
}
+
+ @Override
+ public void registerHook(String noteId, String event, String cmd) {
+ intp.registerHook(noteId, event, cmd);
+ }
+
+ @Override
+ public void registerHook(String event, String cmd) {
+ intp.registerHook(event, cmd);
+ }
+
+ @Override
+ public String getHook(String noteId, String event) {
+ return intp.getHook(noteId, event);
+ }
+
+ @Override
+ public String getHook(String event) {
+ return intp.getHook(event);
+ }
+
+ @Override
+ public void unregisterHook(String noteId, String event) {
+ intp.unregisterHook(noteId, event);
+ }
+
+ @Override
+ public void unregisterHook(String event) {
+ intp.unregisterHook(event);
+ }
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 073b84bbda2..e0cdaa338b1 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -298,6 +298,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
if (logger.isDebugEnabled()) {
logger.debug("st:\n{}", st);
}
+
FormType form = getFormType();
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
Client client = null;
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 8344366e569..0a7b1ed6912 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -33,6 +33,8 @@
import org.apache.zeppelin.display.*;
import org.apache.zeppelin.helium.*;
import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
+import org.apache.zeppelin.interpreter.InterpreterHookListener;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.dev.ZeppelinDevServer;
import org.apache.zeppelin.interpreter.thrift.*;
@@ -60,6 +62,7 @@ public class RemoteInterpreterServer
InterpreterGroup interpreterGroup;
AngularObjectRegistry angularObjectRegistry;
+ InterpreterHookRegistry hookRegistry;
DistributedResourcePool resourcePool;
private ApplicationLoader appLoader;
@@ -152,7 +155,9 @@ public void createInterpreter(String interpreterGroupId, String noteId, String
if (interpreterGroup == null) {
interpreterGroup = new InterpreterGroup(interpreterGroupId);
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
+ hookRegistry = new InterpreterHookRegistry(interpreterGroup.getId());
resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);
+ interpreterGroup.setInterpreterHookRegistry(hookRegistry);
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
interpreterGroup.setResourcePool(resourcePool);
@@ -290,6 +295,7 @@ public RemoteInterpreterResult interpret(String noteId, String className, String
}
Interpreter intp = getInterpreter(noteId, className);
InterpreterContext context = convert(interpreterContext);
+ context.setClassName(intp.getClassName());
Scheduler scheduler = intp.getScheduler();
InterpretJobListener jobListener = new InterpretJobListener();
@@ -383,10 +389,61 @@ public Map info() {
return infos;
}
+ private void processInterpreterHooks(final String noteId) {
+ InterpreterHookListener hookListener = new InterpreterHookListener() {
+ @Override
+ public void onPreExecute(String script) {
+ String cmdDev = interpreter.getHook(noteId, HookType.PRE_EXEC_DEV);
+ String cmdUser = interpreter.getHook(noteId, HookType.PRE_EXEC);
+
+ // User defined hook should be executed before dev hook
+ List cmds = Arrays.asList(cmdDev, cmdUser);
+ for (String cmd : cmds) {
+ if (cmd != null) {
+ script = cmd + '\n' + script;
+ }
+ }
+
+ InterpretJob.this.script = script;
+ }
+
+ @Override
+ public void onPostExecute(String script) {
+ String cmdDev = interpreter.getHook(noteId, HookType.POST_EXEC_DEV);
+ String cmdUser = interpreter.getHook(noteId, HookType.POST_EXEC);
+
+ // User defined hook should be executed after dev hook
+ List cmds = Arrays.asList(cmdUser, cmdDev);
+ for (String cmd : cmds) {
+ if (cmd != null) {
+ script += '\n' + cmd;
+ }
+ }
+
+ InterpretJob.this.script = script;
+ }
+ };
+ hookListener.onPreExecute(script);
+ hookListener.onPostExecute(script);
+ }
+
@Override
protected Object jobRun() throws Throwable {
try {
InterpreterContext.set(context);
+
+ // Open the interpreter instance prior to calling interpret().
+ // This is necessary because the earliest we can register a hook
+ // is from within the open() method.
+ LazyOpenInterpreter lazy = (LazyOpenInterpreter) interpreter;
+ if (!lazy.isOpen()) {
+ lazy.open();
+ }
+
+ // Add hooks to script from registry.
+ // Global scope first, followed by notebook scope
+ processInterpreterHooks(null);
+ processInterpreterHooks(context.getNoteId());
InterpreterResult result = interpreter.interpret(script, context);
// data from context.out is prepended to InterpreterResult if both defined
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterHookRegistryTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterHookRegistryTest.java
new file mode 100644
index 00000000000..7614e9eb204
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterHookRegistryTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+public class InterpreterHookRegistryTest {
+
+ @Test
+ public void testBasic() {
+ final String PRE_EXEC = InterpreterHookRegistry.HookType.PRE_EXEC;
+ final String POST_EXEC = InterpreterHookRegistry.HookType.POST_EXEC;
+ final String PRE_EXEC_DEV = InterpreterHookRegistry.HookType.PRE_EXEC_DEV;
+ final String POST_EXEC_DEV = InterpreterHookRegistry.HookType.POST_EXEC_DEV;
+ final String GLOBAL_KEY = InterpreterHookRegistry.GLOBAL_KEY;
+ final String noteId = "note";
+ final String className = "class";
+ final String preExecHook = "pre";
+ final String postExecHook = "post";
+ InterpreterHookRegistry registry = new InterpreterHookRegistry("intpId");
+
+ // Test register()
+ registry.register(noteId, className, PRE_EXEC, preExecHook);
+ registry.register(noteId, className, POST_EXEC, postExecHook);
+ registry.register(noteId, className, PRE_EXEC_DEV, preExecHook);
+ registry.register(noteId, className, POST_EXEC_DEV, postExecHook);
+
+ // Test get()
+ assertEquals(registry.get(noteId, className, PRE_EXEC), preExecHook);
+ assertEquals(registry.get(noteId, className, POST_EXEC), postExecHook);
+ assertEquals(registry.get(noteId, className, PRE_EXEC_DEV), preExecHook);
+ assertEquals(registry.get(noteId, className, POST_EXEC_DEV), postExecHook);
+
+ // Test Unregister
+ registry.unregister(noteId, className, PRE_EXEC);
+ registry.unregister(noteId, className, POST_EXEC);
+ registry.unregister(noteId, className, PRE_EXEC_DEV);
+ registry.unregister(noteId, className, POST_EXEC_DEV);
+ assertNull(registry.get(noteId, className, PRE_EXEC));
+ assertNull(registry.get(noteId, className, POST_EXEC));
+ assertNull(registry.get(noteId, className, PRE_EXEC_DEV));
+ assertNull(registry.get(noteId, className, POST_EXEC_DEV));
+
+ // Test Global Scope
+ registry.register(null, className, PRE_EXEC, preExecHook);
+ assertEquals(registry.get(GLOBAL_KEY, className, PRE_EXEC), preExecHook);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testValidEventCode() {
+ InterpreterHookRegistry registry = new InterpreterHookRegistry("intpId");
+
+ // Test that only valid event codes ("pre_exec", "post_exec") are accepted
+ registry.register("foo", "bar", "baz", "whatever");
+ }
+
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index 727211292b2..d5814046183 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -39,7 +39,6 @@
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.utils.InterpreterBindingUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
@@ -162,6 +161,7 @@ public Response putNotePermissions(@PathParam("noteId") String noteId, String re
AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
note.persist(subject);
notebookServer.broadcastNote(note);
+ notebookServer.broadcastNoteList(subject, userAndRoles);
return new JsonResponse<>(Status.OK).build();
}
@@ -176,7 +176,7 @@ public Response putNotePermissions(@PathParam("noteId") String noteId, String re
public Response bind(@PathParam("noteId") String noteId, String req) throws IOException {
List settingIdList = gson.fromJson(req, new TypeToken>() {
}.getType());
- notebook.bindInterpretersToNote(noteId, settingIdList);
+ notebook.bindInterpretersToNote(SecurityUtils.getPrincipal(), noteId, settingIdList);
return new JsonResponse<>(Status.OK).build();
}
@@ -198,7 +198,8 @@ public Response bind(@PathParam("noteId") String noteId) {
@ZeppelinApi
public Response getNotebookList() throws IOException {
AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
- List