diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index f756dca1f8e..caf0900c94a 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -144,7 +144,7 @@
zeppelin.interpreters
- org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter
+ org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter
Comma separated interpreter configurations. First interpreter become a default
diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html
index 53728a931fb..8612f12a47c 100644
--- a/docs/_includes/themes/zeppelin/_navigation.html
+++ b/docs/_includes/themes/zeppelin/_navigation.html
@@ -54,6 +54,7 @@
Ignite
JDBC
Lens
+ Livy
Markdown
Postgresql, hawq
R
diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md
new file mode 100644
index 00000000000..295a5080f40
--- /dev/null
+++ b/docs/interpreter/livy.md
@@ -0,0 +1,107 @@
+---
+layout: page
+title: "Livy Interpreter"
+description: ""
+group: manual
+---
+{% include JB/setup %}
+
+## Livy Interpreter for Apache Zeppelin
+Livy is an open source REST interface for interacting with Spark from anywhere. It supports executing snippets of code or programs in a Spark context that runs locally or in YARN.
+
+* Interactive Scala, Python and R shells
+* Batch submissions in Scala, Java, Python
+* Multi users can share the same server (impersonation support)
+* Can be used for submitting jobs from anywhere with REST
+* Does not require any code change to your programs
+
+### Requirements
+
+Additional requirements for the Livy interpreter are:
+
+ * Spark 1.3 or above.
+ * Livy server.
+
+### Configuration
+
+
+ Property
+ Default
+ Description
+
+
+ zeppelin.livy.master
+ local[*]
+ Spark master uri. ex) spark://masterhost:7077
+
+
+ zeppelin.livy.url
+ http://localhost:8998
+ URL where livy server is running
+
+
+ zeppelin.livy.spark.maxResult
+ 1000
+ Max number of SparkSQL result to display.
+
+
+
+
+
+## How to use
+Basically, you can use
+
+**spark**
+
+```
+%livy.spark
+sc.version
+```
+
+
+**pyspark**
+
+```
+%livy.pyspark
+print "1"
+```
+
+**sparkR**
+
+```
+%livy.sparkr
+hello <- function( name ) {
+ sprintf( "Hello, %s", name );
+}
+
+hello("livy")
+```
+
+## Impersonation
+When Zeppelin server is running with authentication enabled, then this interpreter utilizes Livy’s user impersonation feature i.e. sends extra parameter for creating and running a session ("proxyUser": "${loggedInUser}"). This is particularly useful when multi users are sharing a Notebook server.
+
+
+### Apply Zeppelin Dynamic Forms
+You can leverage [Zeppelin Dynamic Form]({{BASE_PATH}}/manual/dynamicform.html). You can use both the `text input` and `select form` parameterization features.
+
+```
+%livy.pyspark
+print "${group_by=product_id,product_id|product_name|customer_id|store_id}"
+```
+
+## FAQ
+
+Livy debugging: If you see any of these in error console
+
+> Connect to livyhost:8998 [livyhost/127.0.0.1, livyhost/0:0:0:0:0:0:0:1] failed: Connection refused
+
+Looks like the livy server is not up yet or the config is wrong
+
+> Exception: Session not found, Livy server would have restarted, or lost session.
+
+The session would have timed out, you may need to restart the interpreter.
+
+
+> Blacklisted configuration values in session config: spark.master
+
+edit `conf/spark-blacklist.conf` file in livy server and comment out `#spark.master` line.
diff --git a/livy/pom.xml b/livy/pom.xml
new file mode 100644
index 00000000000..6ad96221a89
--- /dev/null
+++ b/livy/pom.xml
@@ -0,0 +1,170 @@
+
+
+
+
+ 4.0.0
+
+
+ zeppelin
+ org.apache.zeppelin
+ 0.6.0-incubating-SNAPSHOT
+ ..
+
+
+ org.apache.zeppelin
+ zeppelin-livy
+ jar
+ 0.6.0-incubating-SNAPSHOT
+ Zeppelin: Livy interpreter
+ http://zeppelin.incubator.apache.org
+
+
+
+ 4.12
+ 3.2.4-Zeppelin
+ 1.7.0
+ 1.9.5
+
+
+
+
+ ${project.groupId}
+ zeppelin-interpreter
+ ${project.version}
+ provided
+
+
+
+ org.apache.commons
+ commons-exec
+ 1.3
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.3.4
+
+
+
+ com.google.code.gson
+ gson
+
+
+
+ junit
+ junit
+ test
+
+
+
+ org.assertj
+ assertj-core
+ ${assertj.version}
+ test
+
+
+ org.mockito
+ mockito-all
+ ${mockito.version}
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ 2.7
+
+ true
+
+
+
+
+ maven-enforcer-plugin
+ 1.3.1
+
+
+ enforce
+ none
+
+
+
+
+
+ maven-dependency-plugin
+ 2.8
+
+
+ copy-dependencies
+ package
+
+ copy-dependencies
+
+
+ ${project.build.directory}/../../interpreter/livy
+
+ false
+ false
+ true
+ runtime
+
+
+
+ copy-artifact
+ package
+
+ copy
+
+
+ ${project.build.directory}/../../interpreter/livy
+
+ false
+ false
+ true
+ runtime
+
+
+ ${project.groupId}
+ ${project.artifactId}
+ ${project.version}
+ ${project.packaging}
+
+
+
+
+
+
+
+
+
+
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
new file mode 100644
index 00000000000..27fc422948d
--- /dev/null
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.livy;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+
+/***
+ * Livy helper class
+ */
+public class LivyHelper {
+ Logger LOGGER = LoggerFactory.getLogger(LivyHelper.class);
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ HashMap paragraphHttpMap = new HashMap<>();
+ Properties property;
+ Integer MAX_NOS_RETRY = 60;
+
+ LivyHelper(Properties property) {
+ this.property = property;
+ }
+
+ public Integer createSession(InterpreterContext context, String kind) throws Exception {
+ try {
+ String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions",
+ "POST",
+ "{" +
+ "\"kind\": \"" + kind + "\", " +
+ "\"master\": \"" + property.getProperty("zeppelin.livy.master") + "\", " +
+ "\"proxyUser\": \"" + context.getAuthenticationInfo().getUser() + "\"" +
+ "}",
+ context.getParagraphId()
+ );
+ if (json.contains("CreateInteractiveRequest[\\\"master\\\"]")) {
+ json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions",
+ "POST",
+ "{" +
+ "\"kind\": \"" + kind + "\", " +
+ "\"conf\":{\"spark.master\": \""
+ + property.getProperty("zeppelin.livy.master") + "\"}," +
+ "\"proxyUser\": \"" + context.getAuthenticationInfo().getUser() + "\"" +
+ "}",
+ context.getParagraphId()
+ );
+ }
+ Map jsonMap = (Map) gson.fromJson(json,
+ new TypeToken>() {
+ }.getType());
+ Integer sessionId = ((Double) jsonMap.get("id")).intValue();
+ if (!jsonMap.get("state").equals("idle")) {
+ Integer nosRetry = MAX_NOS_RETRY;
+
+ while (nosRetry >= 0) {
+ LOGGER.error(String.format("sessionId:%s state is %s",
+ jsonMap.get("id"), jsonMap.get("state")));
+ Thread.sleep(1000);
+ json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId,
+ "GET", null,
+ context.getParagraphId());
+ jsonMap = (Map) gson.fromJson(json,
+ new TypeToken>() {
+ }.getType());
+ if (jsonMap.get("state").equals("idle")) {
+ break;
+ } else if (jsonMap.get("state").equals("error")) {
+ json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" +
+ sessionId + "/log",
+ "GET", null,
+ context.getParagraphId());
+ jsonMap = (Map) gson.fromJson(json,
+ new TypeToken>() {
+ }.getType());
+ String logs = StringUtils.join((ArrayList) jsonMap.get("log"), '\n');
+ LOGGER.error(String.format("Cannot start %s.\n%s", kind, logs));
+ throw new Exception(String.format("Cannot start %s.\n%s", kind, logs));
+ }
+ nosRetry--;
+ }
+ if (nosRetry <= 0) {
+ LOGGER.error("Error getting session for user within 60Sec.");
+ throw new Exception(String.format("Cannot start %s.", kind));
+ }
+ }
+ return sessionId;
+ } catch (Exception e) {
+ LOGGER.error("Error getting session for user", e);
+ throw e;
+ }
+ }
+
+ protected void initializeSpark(final InterpreterContext context,
+ final Map userSessionMap) throws Exception {
+ interpret("val sqlContext= new org.apache.spark.sql.SQLContext(sc)\n" +
+ "import sqlContext.implicits._", context, userSessionMap);
+ }
+
+ public InterpreterResult interpretInput(String stringLines,
+ final InterpreterContext context,
+ final Map userSessionMap,
+ LivyOutputStream out) {
+ try {
+ String[] lines = stringLines.split("\n");
+ String[] linesToRun = new String[lines.length + 1];
+ for (int i = 0; i < lines.length; i++) {
+ linesToRun[i] = lines[i];
+ }
+ linesToRun[lines.length] = "print(\"\")";
+
+ out.setInterpreterOutput(context.out);
+ context.out.clear();
+ Code r = null;
+ String incomplete = "";
+ boolean inComment = false;
+
+ for (int l = 0; l < linesToRun.length; l++) {
+ String s = linesToRun[l];
+ // check if next line starts with "." (but not ".." or "./") it is treated as an invocation
+ //for spark
+ if (l + 1 < linesToRun.length) {
+ String nextLine = linesToRun[l + 1].trim();
+ boolean continuation = false;
+ if (nextLine.isEmpty()
+ || nextLine.startsWith("//") // skip empty line or comment
+ || nextLine.startsWith("}")
+ || nextLine.startsWith("object")) { // include "} object" for Scala companion object
+ continuation = true;
+ } else if (!inComment && nextLine.startsWith("/*")) {
+ inComment = true;
+ continuation = true;
+ } else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
+ inComment = false;
+ continuation = true;
+ } else if (nextLine.length() > 1
+ && nextLine.charAt(0) == '.'
+ && nextLine.charAt(1) != '.' // ".."
+ && nextLine.charAt(1) != '/') { // "./"
+ continuation = true;
+ } else if (inComment) {
+ continuation = true;
+ }
+ if (continuation) {
+ incomplete += s + "\n";
+ continue;
+ }
+ }
+
+ InterpreterResult res;
+ try {
+ res = interpret(incomplete + s, context, userSessionMap);
+ } catch (Exception e) {
+ LOGGER.error("Interpreter exception", e);
+ return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
+ }
+
+ r = res.code();
+
+ if (r == Code.ERROR) {
+ out.setInterpreterOutput(null);
+ return res;
+ } else if (r == Code.INCOMPLETE) {
+ incomplete += s + "\n";
+ } else {
+ out.write((res.message() + "\n").getBytes(Charset.forName("UTF-8")));
+ incomplete = "";
+ }
+ }
+
+ if (r == Code.INCOMPLETE) {
+ out.setInterpreterOutput(null);
+ return new InterpreterResult(r, "Incomplete expression");
+ } else {
+ out.setInterpreterOutput(null);
+ return new InterpreterResult(Code.SUCCESS);
+ }
+
+ } catch (Exception e) {
+ LOGGER.error("error in interpretInput", e);
+ return new InterpreterResult(Code.ERROR, e.getMessage());
+ }
+ }
+
+ public InterpreterResult interpret(String stringLines,
+ final InterpreterContext context,
+ final Map userSessionMap)
+ throws Exception {
+ stringLines = stringLines
+ //for "\n" present in string
+ .replaceAll("\\\\n", "\\\\\\\\n")
+ //for new line present in string
+ .replaceAll("\\n", "\\\\n")
+ // for \" present in string
+ .replaceAll("\\\\\"", "\\\\\\\\\"")
+ // for " present in string
+ .replaceAll("\"", "\\\\\"");
+
+ if (stringLines.trim().equals("")) {
+ return new InterpreterResult(Code.SUCCESS, "");
+ }
+ Map jsonMap = executeCommand(stringLines, context, userSessionMap);
+ Integer id = ((Double) jsonMap.get("id")).intValue();
+ InterpreterResult res = getResultFromMap(jsonMap);
+ if (res != null) {
+ return res;
+ }
+
+ while (true) {
+ Thread.sleep(1000);
+ if (paragraphHttpMap.get(context.getParagraphId()) == null) {
+ return new InterpreterResult(Code.INCOMPLETE, "");
+ }
+ jsonMap = getStatusById(context, userSessionMap, id);
+ InterpreterResult interpreterResult = getResultFromMap(jsonMap);
+ if (interpreterResult != null) {
+ return interpreterResult;
+ }
+ }
+ }
+
+ private InterpreterResult getResultFromMap(Map jsonMap) {
+ if (jsonMap.get("state").equals("available")) {
+ if (((Map) jsonMap.get("output")).get("status").equals("error")) {
+ StringBuilder errorMessage = new StringBuilder((String) ((Map) jsonMap
+ .get("output")).get("evalue"));
+ if (errorMessage.toString().equals("incomplete statement")
+ || errorMessage.toString().contains("EOF")) {
+ return new InterpreterResult(Code.INCOMPLETE, "");
+ }
+ String traceback = gson.toJson(((Map) jsonMap.get("output")).get("traceback"));
+ if (!traceback.equals("[]")) {
+ errorMessage
+ .append("\n")
+ .append("traceback: \n")
+ .append(traceback);
+ }
+
+ return new InterpreterResult(Code.ERROR, errorMessage.toString());
+ }
+ if (((Map) jsonMap.get("output")).get("status").equals("ok")) {
+ String result = (String) ((Map) ((Map) jsonMap.get("output"))
+ .get("data")).get("text/plain");
+ if (result != null) {
+ result = result.trim();
+ if (result.startsWith(" userSessionMap) throws Exception {
+ String json = executeHTTP(property.get("zeppelin.livy.url") + "/sessions/"
+ + userSessionMap.get(context.getAuthenticationInfo().getUser())
+ + "/statements",
+ "POST",
+ "{\"code\": \"" + lines + "\" }",
+ context.getParagraphId());
+ if (json.matches("^(\")?Session (\'[0-9]\' )?not found(.?\"?)$")) {
+ throw new Exception("Exception: Session not found, Livy server would have restarted, " +
+ "or lost session.");
+ }
+ try {
+ Map jsonMap = gson.fromJson(json,
+ new TypeToken() {
+ }.getType());
+ return jsonMap;
+ } catch (Exception e) {
+ LOGGER.error("Error executeCommand", e);
+ throw e;
+ }
+ }
+
+ private Map getStatusById(InterpreterContext context,
+ Map userSessionMap, Integer id) throws Exception {
+ String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
+ + userSessionMap.get(context.getAuthenticationInfo().getUser())
+ + "/statements/" + id,
+ "GET", null, context.getParagraphId());
+ try {
+ Map jsonMap = gson.fromJson(json,
+ new TypeToken() {
+ }.getType());
+ return jsonMap;
+ } catch (Exception e) {
+ LOGGER.error("Error getStatusById", e);
+ throw e;
+ }
+ }
+
+ protected String executeHTTP(String targetURL, String method, String jsonData, String paragraphId)
+ throws Exception {
+ HttpClient client = HttpClientBuilder.create().build();
+ HttpResponse response = null;
+ if (method.equals("POST")) {
+ HttpPost request = new HttpPost(targetURL);
+ request.addHeader("Content-Type", "application/json");
+ StringEntity se = new StringEntity(jsonData);
+ request.setEntity(se);
+ response = client.execute(request);
+ paragraphHttpMap.put(paragraphId, request);
+ } else if (method.equals("GET")) {
+ HttpGet request = new HttpGet(targetURL);
+ request.addHeader("Content-Type", "application/json");
+ response = client.execute(request);
+ paragraphHttpMap.put(paragraphId, request);
+ } else if (method.equals("DELETE")) {
+ HttpDelete request = new HttpDelete(targetURL);
+ request.addHeader("Content-Type", "application/json");
+ response = client.execute(request);
+ }
+
+ if (response == null) {
+ return null;
+ }
+
+ if (response.getStatusLine().getStatusCode() == 200
+ || response.getStatusLine().getStatusCode() == 201
+ || response.getStatusLine().getStatusCode() == 404) {
+ return getResponse(response);
+ } else {
+ String responseString = getResponse(response);
+ if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) {
+ return responseString;
+ }
+ LOGGER.error(String.format("Error with %s StatusCode: %s",
+ response.getStatusLine().getStatusCode(), responseString));
+ throw new Exception(String.format("Error with %s StatusCode: %s",
+ response.getStatusLine().getStatusCode(), responseString));
+ }
+ }
+
+ private String getResponse(HttpResponse response) throws Exception {
+ BufferedReader rd = new BufferedReader(
+ new InputStreamReader(response.getEntity().getContent()));
+
+ StringBuffer result = new StringBuffer();
+ String line = "";
+ while ((line = rd.readLine()) != null) {
+ result.append(line);
+ }
+ return result.toString();
+ }
+
+ public void cancelHTTP(String paragraphId) {
+ if (paragraphHttpMap.get(paragraphId).getClass().getName().contains("HttpPost")) {
+ ((HttpPost) paragraphHttpMap.get(paragraphId)).abort();
+ } else {
+ ((HttpGet) paragraphHttpMap.get(paragraphId)).abort();
+ }
+ paragraphHttpMap.put(paragraphId, null);
+ }
+
+ public void closeSession(Map userSessionMap) {
+ for (Map.Entry entry : userSessionMap.entrySet()) {
+ try {
+ executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
+ + entry.getValue(),
+ "DELETE", null, null);
+ } catch (Exception e) {
+ LOGGER.error(String.format("Error closing session for user with session ID: %s",
+ entry.getValue()), e);
+ }
+ }
+ }
+}
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyOutputStream.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyOutputStream.java
new file mode 100644
index 00000000000..6a525ebd7fb
--- /dev/null
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyOutputStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.livy;
+
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * InterpreterOutput can be attached / detached.
+ */
+public class LivyOutputStream extends OutputStream {
+ InterpreterOutput interpreterOutput;
+
+ public LivyOutputStream() {
+ }
+
+ public InterpreterOutput getInterpreterOutput() {
+ return interpreterOutput;
+ }
+
+ public void setInterpreterOutput(InterpreterOutput interpreterOutput) {
+ this.interpreterOutput = interpreterOutput;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.write(b);
+ }
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.write(b);
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int offset, int len) throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.write(b, offset, len);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.close();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.flush();
+ }
+ }
+}
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java
new file mode 100644
index 00000000000..a24e1ae089e
--- /dev/null
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.livy;
+
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Livy PySpark interpreter for Zeppelin.
+ */
+public class LivyPySparkInterpreter extends Interpreter {
+
+ Logger LOGGER = LoggerFactory.getLogger(LivyPySparkInterpreter.class);
+
+ static {
+ Interpreter.register(
+ "pyspark",
+ "livy",
+ LivyPySparkInterpreter.class.getName(),
+ new InterpreterPropertyBuilder()
+ .build()
+ );
+ }
+
+ protected Map userSessionMap;
+ protected LivyHelper livyHelper;
+
+ public LivyPySparkInterpreter(Properties property) {
+ super(property);
+ userSessionMap = new HashMap<>();
+ livyHelper = new LivyHelper(property);
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+ livyHelper.closeSession(userSessionMap);
+ }
+
+ @Override
+ public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
+ try {
+ if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
+ try {
+ userSessionMap.put(
+ interpreterContext.getAuthenticationInfo().getUser(),
+ livyHelper.createSession(
+ interpreterContext,
+ "pyspark")
+ );
+ } catch (Exception e) {
+ LOGGER.error("Exception in LivyPySparkInterpreter while interpret ", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
+ }
+ }
+
+ if (line == null || line.trim().length() == 0) {
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
+ }
+
+ return livyHelper.interpret(line, interpreterContext, userSessionMap);
+ } catch (Exception e) {
+ LOGGER.error("Exception in LivyPySparkInterpreter while interpret ", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR,
+ InterpreterUtils.getMostRelevantMessage(e));
+ }
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ livyHelper.cancelHTTP(context.getParagraphId());
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+ LivyPySparkInterpreter.class.getName() + this.hashCode());
+ }
+
+ @Override
+ public List completion(String buf, int cursor) {
+ return null;
+ }
+
+}
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
new file mode 100644
index 00000000000..23a6379bebf
--- /dev/null
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.livy;
+
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Livy Spark interpreter for Zeppelin.
+ */
+public class LivySparkInterpreter extends Interpreter {
+
+ static String DEFAULT_URL = "http://localhost:8998";
+ static String LOCAL = "local[*]";
+ Logger LOGGER = LoggerFactory.getLogger(LivySparkInterpreter.class);
+ private LivyOutputStream out;
+
+ static {
+ Interpreter.register(
+ "spark",
+ "livy",
+ LivySparkInterpreter.class.getName(),
+ new InterpreterPropertyBuilder()
+ .add("zeppelin.livy.url", DEFAULT_URL, "The URL for Livy Server.")
+ .add("zeppelin.livy.master", LOCAL, "Spark master uri. ex) spark://masterhost:7077")
+ .build()
+ );
+ }
+
+ protected static Map userSessionMap;
+ private LivyHelper livyHelper;
+
+ public LivySparkInterpreter(Properties property) {
+ super(property);
+ userSessionMap = new HashMap<>();
+ livyHelper = new LivyHelper(property);
+ out = new LivyOutputStream();
+ }
+
+ protected static Map getUserSessionMap() {
+ return userSessionMap;
+ }
+
+ public void setUserSessionMap(Map userSessionMap) {
+ this.userSessionMap = userSessionMap;
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+ livyHelper.closeSession(userSessionMap);
+ }
+
+ @Override
+ public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
+ try {
+ if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
+ try {
+ userSessionMap.put(
+ interpreterContext.getAuthenticationInfo().getUser(),
+ livyHelper.createSession(
+ interpreterContext,
+ "spark")
+ );
+ livyHelper.initializeSpark(interpreterContext, userSessionMap);
+ } catch (Exception e) {
+ LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
+ }
+ }
+ if (line == null || line.trim().length() == 0) {
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
+ }
+
+ return livyHelper.interpretInput(line, interpreterContext, userSessionMap, out);
+ } catch (Exception e) {
+ LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR,
+ InterpreterUtils.getMostRelevantMessage(e));
+ }
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ livyHelper.cancelHTTP(context.getParagraphId());
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+ LivySparkInterpreter.class.getName() + this.hashCode());
+ }
+
+ @Override
+ public List completion(String buf, int cursor) {
+ return null;
+ }
+
+}
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java
new file mode 100644
index 00000000000..a41f4a70be9
--- /dev/null
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.livy;
+
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Livy PySpark interpreter for Zeppelin.
+ */
+public class LivySparkRInterpreter extends Interpreter {
+
+ Logger LOGGER = LoggerFactory.getLogger(LivySparkRInterpreter.class);
+
+ static {
+ Interpreter.register(
+ "sparkr",
+ "livy",
+ LivySparkRInterpreter.class.getName(),
+ new InterpreterPropertyBuilder()
+ .build()
+ );
+ }
+
+ protected Map userSessionMap;
+ private LivyHelper livyHelper;
+
+ public LivySparkRInterpreter(Properties property) {
+ super(property);
+ userSessionMap = new HashMap<>();
+ livyHelper = new LivyHelper(property);
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+ livyHelper.closeSession(userSessionMap);
+ }
+
+ @Override
+ public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
+ try {
+ if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
+ try {
+ userSessionMap.put(
+ interpreterContext.getAuthenticationInfo().getUser(),
+ livyHelper.createSession(
+ interpreterContext,
+ "sparkr")
+ );
+ } catch (Exception e) {
+ LOGGER.error("Exception in LivySparkRInterpreter while interpret ", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
+ }
+ }
+
+ if (line == null || line.trim().length() == 0) {
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
+ }
+
+ return livyHelper.interpret(line, interpreterContext, userSessionMap);
+ } catch (Exception e) {
+ LOGGER.error("Exception in LivySparkRInterpreter while interpret ", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR,
+ InterpreterUtils.getMostRelevantMessage(e));
+ }
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ livyHelper.cancelHTTP(context.getParagraphId());
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+ LivySparkRInterpreter.class.getName() + this.hashCode());
+ }
+
+ @Override
+ public List completion(String buf, int cursor) {
+ return null;
+ }
+
+}
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
new file mode 100644
index 00000000000..3637c64cbb9
--- /dev/null
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.livy;
+
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Livy PySpark interpreter for Zeppelin.
+ */
+public class LivySparkSQLInterpreter extends Interpreter {
+
+ Logger LOGGER = LoggerFactory.getLogger(LivySparkSQLInterpreter.class);
+ static String DEFAULT_MAX_RESULT = "1000";
+
+ static {
+ Interpreter.register(
+ "sql",
+ "livy",
+ LivySparkSQLInterpreter.class.getName(),
+ new InterpreterPropertyBuilder()
+ .add("zeppelin.livy.spark.maxResult",
+ DEFAULT_MAX_RESULT,
+ "Max number of SparkSQL result to display.")
+ .build()
+ );
+ }
+
+ protected Map userSessionMap;
+ private LivyHelper livyHelper;
+
+ public LivySparkSQLInterpreter(Properties property) {
+ super(property);
+ livyHelper = new LivyHelper(property);
+ userSessionMap = LivySparkInterpreter.getUserSessionMap();
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+ livyHelper.closeSession(userSessionMap);
+ }
+
+ @Override
+ public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
+ try {
+ if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
+ try {
+ userSessionMap.put(
+ interpreterContext.getAuthenticationInfo().getUser(),
+ livyHelper.createSession(
+ interpreterContext,
+ "spark")
+ );
+ livyHelper.initializeSpark(interpreterContext, userSessionMap);
+ } catch (Exception e) {
+ LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
+ }
+ }
+
+ if (line == null || line.trim().length() == 0) {
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
+ }
+
+ InterpreterResult res = livyHelper.interpret("sqlContext.sql(\"" +
+ line.replaceAll("\"", "\\\\\"")
+ .replaceAll("\\n", " ")
+ + "\").show(" +
+ property.get("zeppelin.livy.spark.maxResult") + ")",
+ interpreterContext, userSessionMap);
+
+ if (res.code() == InterpreterResult.Code.SUCCESS) {
+ StringBuilder resMsg = new StringBuilder();
+ resMsg.append("%table ");
+ String[] rows = res.message().split("\n");
+
+ String[] headers = rows[1].split("\\|");
+ for (int head = 1; head < headers.length; head++) {
+ resMsg.append(headers[head].trim()).append("\t");
+ }
+ resMsg.append("\n");
+ if (rows[3].indexOf("+") == 0) {
+
+ } else {
+ for (int cols = 3; cols < rows.length - 1; cols++) {
+ String[] col = rows[cols].split("\\|");
+ for (int data = 1; data < col.length; data++) {
+ resMsg.append(col[data].trim()).append("\t");
+ }
+ resMsg.append("\n");
+ }
+ }
+ if (rows[rows.length - 1].indexOf("only") == 0) {
+ resMsg.append("" + rows[rows.length - 1] + ". ");
+ }
+
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS,
+ resMsg.toString()
+ );
+ } else {
+ return res;
+ }
+
+
+ } catch (Exception e) {
+ LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR,
+ InterpreterUtils.getMostRelevantMessage(e));
+ }
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ livyHelper.cancelHTTP(context.getParagraphId());
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+ LivySparkInterpreter.class.getName() + this.hashCode());
+ }
+
+ @Override
+ public List completion(String buf, int cursor) {
+ return null;
+ }
+
+}
diff --git a/livy/src/main/test/org/apache/zeppelin/livy/LivyHelperTest.java b/livy/src/main/test/org/apache/zeppelin/livy/LivyHelperTest.java
new file mode 100644
index 00000000000..49289017338
--- /dev/null
+++ b/livy/src/main/test/org/apache/zeppelin/livy/LivyHelperTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.livy;
+
+import com.google.gson.GsonBuilder;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.util.HashMap;
+import java.util.Properties;
+
+import static org.mockito.Mockito.doReturn;
+
+/**
+ * Created for org.apache.zeppelin.livy on 22/04/16.
+ */
+
+@RunWith(MockitoJUnitRunner.class)
+public class LivyHelperTest {
+
+ @Rule
+ public ErrorCollector collector = new ErrorCollector();
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private static LivyPySparkInterpreter interpreter;
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private InterpreterContext interpreterContext;
+
+ @Mock(answer = Answers.CALLS_REAL_METHODS)
+ private LivyHelper livyHelper;
+
+ @Before
+ public void prepareContext() throws Exception {
+ interpreter.userSessionMap = new HashMap<>();
+ interpreter.userSessionMap.put(null, 1);
+
+ Properties properties = new Properties();
+ properties.setProperty("zeppelin.livy.url", "http://localhost:8998");
+ livyHelper.property = properties;
+ livyHelper.paragraphHttpMap = new HashMap<>();
+ livyHelper.gson = new GsonBuilder().setPrettyPrinting().create();
+
+
+ doReturn("{\"id\":1,\"state\":\"idle\",\"kind\":\"spark\",\"proxyUser\":\"null\",\"log\":[]}")
+ .when(livyHelper)
+ .executeHTTP(
+ livyHelper.property.getProperty("zeppelin.livy.url") + "/sessions",
+ "POST",
+ "{\"kind\": \"spark\", \"proxyUser\": \"null\"}",
+ null
+ );
+
+ doReturn("{\"id\":1,\"state\":\"available\",\"output\":{\"status\":\"ok\"," +
+ "\"execution_count\":1,\"data\":{\"text/plain\":\"1\"}}}")
+ .when(livyHelper)
+ .executeHTTP(
+ livyHelper.property.getProperty("zeppelin.livy.url") + "/sessions/1/statements",
+ "POST",
+ "{\"code\": \"print(1)\" }",
+ null
+ );
+
+ }
+
+
+ @Test
+ public void checkCreateSession() {
+ try {
+ Integer sessionId = livyHelper.createSession(interpreterContext, "spark");
+
+ collector.checkThat("check sessionId", 1, CoreMatchers.equalTo(sessionId));
+
+ } catch (Exception e) {
+ collector.addError(e);
+ }
+ }
+
+ @Test
+ public void checkInterpret() {
+ try {
+ InterpreterResult result = livyHelper.interpret("print(1)", interpreterContext, interpreter.userSessionMap);
+
+ collector.checkThat("check sessionId", InterpreterResult.Code.SUCCESS, CoreMatchers.equalTo(result.code()));
+
+ } catch (Exception e) {
+ collector.addError(e);
+ }
+ }
+
+}
diff --git a/livy/src/main/test/org/apache/zeppelin/livy/LivyInterpreterTest.java b/livy/src/main/test/org/apache/zeppelin/livy/LivyInterpreterTest.java
new file mode 100644
index 00000000000..b21afa4b2cc
--- /dev/null
+++ b/livy/src/main/test/org/apache/zeppelin/livy/LivyInterpreterTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.livy;
+
+
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.hamcrest.CoreMatchers;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.util.HashMap;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LivyInterpreterTest {
+
+ @Rule
+ public ErrorCollector collector = new ErrorCollector();
+
+ private static LivyPySparkInterpreter interpreter;
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private InterpreterContext interpreterContext;
+
+ @AfterClass
+ public static void tearDown() {
+ interpreter.close();
+ }
+
+ @Before
+ public void prepareContext() throws Exception {
+ interpreter = new LivyPySparkInterpreter(new Properties());
+ interpreter.userSessionMap = new HashMap<>();
+ interpreter.userSessionMap.put(null, 0);
+ interpreter.livyHelper = Mockito.mock(LivyHelper.class);
+ interpreter.open();
+
+ doReturn(new InterpreterResult(InterpreterResult.Code.SUCCESS)).when(interpreter.livyHelper)
+ .interpret("print \"x is 1.\"", interpreterContext, interpreter.userSessionMap);
+ }
+
+ @Test
+ public void checkInitVariables() throws Exception {
+ collector.checkThat("Check that, if userSessionMap is made: ",
+ interpreter.userSessionMap, CoreMatchers.notNullValue());
+ }
+
+ @Test
+ public void checkBasicInterpreter() throws Exception {
+
+ String paragraphString = "print \"x is 1.\"";
+
+ final InterpreterResult actual = interpreter.interpret(paragraphString, interpreterContext);
+
+ collector.checkThat("Check that, result is computed: ",
+ actual.code(), CoreMatchers.equalTo(InterpreterResult.Code.SUCCESS));
+ assertThat(actual).isNotNull();
+ }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index c7d93126915..77f36675862 100755
--- a/pom.xml
+++ b/pom.xml
@@ -92,6 +92,7 @@
markdown
angular
shell
+ livy
hive
hbase
phoenix
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 541aae1397b..9eceeedd6a6 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -463,6 +463,10 @@ public static enum ConfVars {
+ "org.apache.zeppelin.markdown.Markdown,"
+ "org.apache.zeppelin.angular.AngularInterpreter,"
+ "org.apache.zeppelin.shell.ShellInterpreter,"
+ + "org.apache.zeppelin.livy.LivySparkInterpreter,"
+ + "org.apache.zeppelin.livy.LivySparkSQLInterpreter,"
+ + "org.apache.zeppelin.livy.LivyPySparkInterpreter,"
+ + "org.apache.zeppelin.livy.LivySparkRInterpreter,"
+ "org.apache.zeppelin.hive.HiveInterpreter,"
+ "org.apache.zeppelin.alluxio.AlluxioInterpreter,"
+ "org.apache.zeppelin.file.HDFSFileInterpreter,"