diff --git a/angular/pom.xml b/angular/pom.xml
new file mode 100644
index 00000000000..580b848e01f
--- /dev/null
+++ b/angular/pom.xml
@@ -0,0 +1,127 @@
+
+
+
+
+ 4.0.0
+
+
+ zeppelin
+ org.apache.zeppelin
+ 0.5.0-SNAPSHOT
+
+
+ org.apache.zeppelin
+ zeppelin-angular
+ jar
+ 0.5.0-SNAPSHOT
+ Zeppelin: Angular interpreter
+ http://zeppelin.incubator.apache.org
+
+
+
+ ${project.groupId}
+ zeppelin-interpreter
+ ${project.version}
+ provided
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+ junit
+ junit
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ 2.7
+
+ true
+
+
+
+
+ maven-enforcer-plugin
+ 1.3.1
+
+
+ enforce
+ none
+
+
+
+
+
+ maven-dependency-plugin
+ 2.8
+
+
+ copy-dependencies
+ package
+
+ copy-dependencies
+
+
+ ${project.build.directory}/../../interpreter/angular
+ false
+ false
+ true
+ runtime
+
+
+
+ copy-artifact
+ package
+
+ copy
+
+
+ ${project.build.directory}/../../interpreter/angular
+ false
+ false
+ true
+ runtime
+
+
+ ${project.groupId}
+ ${project.artifactId}
+ ${project.version}
+ ${project.packaging}
+
+
+
+
+
+
+
+
+
+
diff --git a/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java b/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java
new file mode 100644
index 00000000000..c7a406d18d7
--- /dev/null
+++ b/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.angular;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+/**
+ *
+ */
+public class AngularInterpreter extends Interpreter {
+ static {
+ Interpreter.register("angular", AngularInterpreter.class.getName());
+ }
+
+ public AngularInterpreter(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ return new InterpreterResult(Code.SUCCESS, Type.ANGULAR, st);
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List completion(String buf, int cursor) {
+ return new LinkedList();
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+ AngularInterpreter.class.getName() + this.hashCode());
+ }
+}
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index c203179c55a..2e2cad80576 100644
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -48,7 +48,7 @@
zeppelin.interpreters
- org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter
+ org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter
Comma separated interpreter configurations. First interpreter become a default
diff --git a/pom.xml b/pom.xml
index 20a073e70bd..d2fc77c01b9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
zeppelin-zengine
spark
markdown
+ angular
shell
hive
zeppelin-web
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 71c5ab5ef74..1b0b8201e0a 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -107,6 +107,7 @@ public class SparkInterpreter extends Interpreter {
+ "we should set this value")
.add("zeppelin.spark.useHiveContext", "true",
"Use HiveContext instead of SQLContext if it is true.")
+ .add("zeppelin.spark.maxResult", "1000", "Max number of SparkSQL result to display.")
.add("args", "", "spark commandline args").build());
}
@@ -400,7 +401,8 @@ public void open() {
dep = getDependencyResolver();
- z = new ZeppelinContext(sc, sqlc, null, dep, printStream);
+ z = new ZeppelinContext(sc, sqlc, null, dep, printStream,
+ Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
try {
if (sc.version().startsWith("1.1") || sc.version().startsWith("1.2")) {
@@ -512,7 +514,7 @@ public Object getValue(String name) {
}
String getJobGroup(InterpreterContext context){
- return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId();
+ return "zeppelin-" + context.getParagraphId();
}
/**
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 2555988288f..618579d0eaa 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -29,18 +29,15 @@
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Stage;
import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.SQLContext.QueryExecution;
-import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterUtils;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
@@ -76,7 +73,7 @@ public class SparkSqlInterpreter extends Interpreter {
}
private String getJobGroup(InterpreterContext context){
- return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId();
+ return "zeppelin-" + context.getParagraphId();
}
private int maxResult;
@@ -126,82 +123,13 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
sc.setLocalProperty("spark.scheduler.pool", null);
}
- sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
-
- // SchemaRDD - spark 1.1, 1.2, DataFrame - spark 1.3
- Object rdd;
- Object[] rows = null;
try {
- rdd = sqlc.sql(st);
-
- Method take = rdd.getClass().getMethod("take", int.class);
- rows = (Object[]) take.invoke(rdd, maxResult + 1);
+ Object rdd = sqlc.sql(st);
+ String msg = ZeppelinContext.showRDD(sc, context, rdd, maxResult);
+ return new InterpreterResult(Code.SUCCESS, msg);
} catch (Exception e) {
- logger.error("Error", e);
- sc.clearJobGroup();
- return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
- }
-
- String msg = null;
-
- // get field names
- Method queryExecution;
- QueryExecution qe;
- try {
- queryExecution = rdd.getClass().getMethod("queryExecution");
- qe = (QueryExecution) queryExecution.invoke(rdd);
- } catch (NoSuchMethodException | SecurityException | IllegalAccessException
- | IllegalArgumentException | InvocationTargetException e) {
- throw new InterpreterException(e);
- }
-
- List columns =
- scala.collection.JavaConverters.asJavaListConverter(
- qe.analyzed().output()).asJava();
-
- for (Attribute col : columns) {
- if (msg == null) {
- msg = col.name();
- } else {
- msg += "\t" + col.name();
- }
- }
-
- msg += "\n";
-
- // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType,
- // FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType,
- // NullType, NumericType, ShortType, StringType, StructType
-
- try {
- for (int r = 0; r < maxResult && r < rows.length; r++) {
- Object row = rows[r];
- Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
- Method apply = row.getClass().getMethod("apply", int.class);
-
- for (int i = 0; i < columns.size(); i++) {
- if (!(Boolean) isNullAt.invoke(row, i)) {
- msg += apply.invoke(row, i).toString();
- } else {
- msg += "null";
- }
- if (i != columns.size() - 1) {
- msg += "\t";
- }
- }
- msg += "\n";
- }
- } catch (NoSuchMethodException | SecurityException | IllegalAccessException
- | IllegalArgumentException | InvocationTargetException e) {
- throw new InterpreterException(e);
- }
-
- if (rows.length > maxResult) {
- msg += "\nResults are limited by " + maxResult + ".";
+ return new InterpreterResult(Code.ERROR, e.getMessage());
}
- InterpreterResult rett = new InterpreterResult(Code.SUCCESS, "%table " + msg);
- sc.clearJobGroup();
- return rett;
}
@Override
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
index 87cd18884da..2c03f1cd91b 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
@@ -22,19 +22,31 @@
import static scala.collection.JavaConversions.collectionAsScalaIterable;
import java.io.PrintStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SQLContext.QueryExecution;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.hive.HiveContext;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input.ParamOption;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.spark.dep.DependencyResolver;
import scala.Tuple2;
+import scala.Unit;
import scala.collection.Iterable;
/**
@@ -47,15 +59,18 @@ public class ZeppelinContext extends HashMap {
private DependencyResolver dep;
private PrintStream out;
private InterpreterContext interpreterContext;
+ private int maxResult;
public ZeppelinContext(SparkContext sc, SQLContext sql,
InterpreterContext interpreterContext,
- DependencyResolver dep, PrintStream printStream) {
+ DependencyResolver dep, PrintStream printStream,
+ int maxResult) {
this.sc = sc;
this.sqlContext = sql;
this.interpreterContext = interpreterContext;
this.dep = dep;
this.out = printStream;
+ this.maxResult = maxResult;
}
public SparkContext sc;
@@ -63,12 +78,6 @@ public ZeppelinContext(SparkContext sc, SQLContext sql,
public HiveContext hiveContext;
private GUI gui;
- /* spark-1.3
- public SchemaRDD sql(String sql) {
- return sqlContext.sql(sql);
- }
- */
-
/**
* Load dependency for interpreter and runtime (driver).
* And distribute them to spark cluster (sc.add())
@@ -221,25 +230,6 @@ public void setGui(GUI o) {
this.gui = o;
}
- public void run(String lines) {
- /*
- String intpName = Paragraph.getRequiredReplName(lines);
- String scriptBody = Paragraph.getScriptBody(lines);
- Interpreter intp = interpreterContext.getParagraph().getRepl(intpName);
- InterpreterResult ret = intp.interpret(scriptBody, interpreterContext);
- if (ret.code() == InterpreterResult.Code.SUCCESS) {
- out.println("%" + ret.type().toString().toLowerCase() + " " + ret.message());
- } else if (ret.code() == InterpreterResult.Code.ERROR) {
- out.println("Error: " + ret.message());
- } else if (ret.code() == InterpreterResult.Code.INCOMPLETE) {
- out.println("Incomplete");
- } else {
- out.println("Unknown error");
- }
- */
- throw new RuntimeException("Missing implementation");
- }
-
private void restartInterpreter() {
}
@@ -251,4 +241,310 @@ public void setInterpreterContext(InterpreterContext interpreterContext) {
this.interpreterContext = interpreterContext;
}
+ public void setMaxResult(int maxResult) {
+ this.maxResult = maxResult;
+ }
+
+ /**
+ * show DataFrame or SchemaRDD
+ * @param o DataFrame or SchemaRDD object
+ */
+ public void show(Object o) {
+ show(o, maxResult);
+ }
+
+ /**
+ * show DataFrame or SchemaRDD
+ * @param o DataFrame or SchemaRDD object
+ * @param maxResult maximum number of rows to display
+ */
+ public void show(Object o, int maxResult) {
+ Class cls = null;
+ try {
+ cls = this.getClass().forName("org.apache.spark.sql.DataFrame");
+ } catch (ClassNotFoundException e) {
+ }
+
+ if (cls == null) {
+ try {
+ cls = this.getClass().forName("org.apache.spark.sql.SchemaRDD");
+ } catch (ClassNotFoundException e) {
+ }
+ }
+
+ if (cls == null) {
+ throw new InterpreterException("Can not road DataFrame/SchemaRDD class");
+ }
+
+ if (cls.isInstance(o)) {
+ out.print(showRDD(sc, interpreterContext, o, maxResult));
+ } else {
+ out.print(o.toString());
+ }
+ }
+
+ public static String showRDD(SparkContext sc,
+ InterpreterContext interpreterContext,
+ Object rdd, int maxResult) {
+ Object[] rows = null;
+ Method take;
+ String jobGroup = "zeppelin-" + interpreterContext.getParagraphId();
+ sc.setJobGroup(jobGroup, "Zeppelin", false);
+
+ try {
+ take = rdd.getClass().getMethod("take", int.class);
+ rows = (Object[]) take.invoke(rdd, maxResult + 1);
+
+ } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+ | IllegalArgumentException | InvocationTargetException e) {
+ sc.clearJobGroup();
+ throw new InterpreterException(e);
+ }
+
+ String msg = null;
+
+ // get field names
+ Method queryExecution;
+ QueryExecution qe;
+ try {
+ queryExecution = rdd.getClass().getMethod("queryExecution");
+ qe = (QueryExecution) queryExecution.invoke(rdd);
+ } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+ | IllegalArgumentException | InvocationTargetException e) {
+ throw new InterpreterException(e);
+ }
+
+ List columns =
+ scala.collection.JavaConverters.asJavaListConverter(
+ qe.analyzed().output()).asJava();
+
+ for (Attribute col : columns) {
+ if (msg == null) {
+ msg = col.name();
+ } else {
+ msg += "\t" + col.name();
+ }
+ }
+
+ msg += "\n";
+
+ // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType,
+ // FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType,
+ // NullType, NumericType, ShortType, StringType, StructType
+
+ try {
+ for (int r = 0; r < maxResult && r < rows.length; r++) {
+ Object row = rows[r];
+ Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
+ Method apply = row.getClass().getMethod("apply", int.class);
+
+ for (int i = 0; i < columns.size(); i++) {
+ if (!(Boolean) isNullAt.invoke(row, i)) {
+ msg += apply.invoke(row, i).toString();
+ } else {
+ msg += "null";
+ }
+ if (i != columns.size() - 1) {
+ msg += "\t";
+ }
+ }
+ msg += "\n";
+ }
+ } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+ | IllegalArgumentException | InvocationTargetException e) {
+ throw new InterpreterException(e);
+ }
+
+ if (rows.length > maxResult) {
+ msg += "\nResults are limited by " + maxResult + ".";
+ }
+ sc.clearJobGroup();
+ return "%table " + msg;
+ }
+
+ /**
+ * Run paragraph by id
+ * @param id
+ */
+ public void run(String id) {
+ run(id, interpreterContext);
+ }
+
+ /**
+ * Run paragraph by id
+ * @param id
+ * @param context
+ */
+ public void run(String id, InterpreterContext context) {
+ if (id.equals(context.getParagraphId())) {
+ throw new InterpreterException("Can not run current Paragraph");
+ }
+
+ for (InterpreterContextRunner r : context.getRunners()) {
+ if (id.equals(r.getParagraphId())) {
+ r.run();
+ return;
+ }
+ }
+
+ throw new InterpreterException("Paragraph " + id + " not found");
+ }
+
+ /**
+ * Run paragraph at idx
+ * @param idx
+ */
+ public void run(int idx) {
+ run(idx, interpreterContext);
+ }
+
+ /**
+ * Run paragraph at index
+ * @param idx index starting from 0
+ * @param context interpreter context
+ */
+ public void run(int idx, InterpreterContext context) {
+ if (idx >= context.getRunners().size()) {
+ throw new InterpreterException("Index out of bound");
+ }
+
+ InterpreterContextRunner runner = context.getRunners().get(idx);
+ if (runner.getParagraphId().equals(context.getParagraphId())) {
+ throw new InterpreterException("Can not run current Paragraph");
+ }
+
+ runner.run();
+ }
+
+ public void run(List