diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index b467a825b70..842cab4e6a4 100644
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -66,7 +66,7 @@
zeppelin.interpreters
- org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter
+ org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter
Comma separated interpreter configurations. First interpreter become a default
diff --git a/lens/pom.xml b/lens/pom.xml
new file mode 100644
index 00000000000..a78be2cbecc
--- /dev/null
+++ b/lens/pom.xml
@@ -0,0 +1,303 @@
+
+
+
+
+ 4.0.0
+
+
+ zeppelin
+ org.apache.zeppelin
+ 0.5.0-incubating-SNAPSHOT
+
+
+ org.apache.zeppelin
+ zeppelin-lens
+ jar
+ 0.5.0-incubating-SNAPSHOT
+ Zeppelin: Lens interpreter
+ http://www.apache.org
+
+
+ 2.2.0-beta-incubating-SNAPSHOT
+ 1.1.0.RELEASE
+ 2.4.0
+
+
+
+
+ org.apache.zeppelin
+ zeppelin-interpreter
+ ${project.version}
+ provided
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+ org.checkerframework
+ jdk7
+ 1.9.1
+
+
+
+ org.apache.lens
+ lens-cli
+ ${lens.version}
+
+
+
+ org.apache.lens
+ lens-client
+ ${lens.version}
+
+
+
+ org.codehaus.jackson
+ jackson-core-asl
+ 1.9.13
+
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+ 1.9.13
+
+
+
+ org.codehaus.jackson
+ jackson-xc
+ 1.9.11
+
+
+
+ org.codehaus.jackson
+ jackson-jaxrs
+ 1.9.11
+
+
+
+ org.glassfish.jersey.core
+ jersey-server
+ 2.3.1
+
+
+
+ org.glassfish.jersey.core
+ jersey-client
+ 2.3.1
+
+
+
+ org.springframework.shell
+ spring-shell
+ ${spring-shell.version}
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop-common.version}
+
+
+ com.sun.jersey
+ jersey-core
+
+
+ com.sun.jersey
+ jersey-json
+
+
+ com.sun.jersey
+ jersey-server
+
+
+
+
+
+ junit
+ junit
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ 2.7
+
+ true
+
+
+
+
+ maven-enforcer-plugin
+ 1.3.1
+
+
+ enforce
+ none
+
+
+
+
+
+ maven-dependency-plugin
+ 2.8
+
+
+ copy-dependencies
+ package
+
+ copy-dependencies
+
+
+ ${project.build.directory}/../../interpreter/lens
+ false
+ false
+ true
+ runtime
+
+
+
+ copy-artifact
+ package
+
+ copy
+
+
+ ${project.build.directory}/../../interpreter/lens
+ false
+ false
+ true
+ runtime
+
+
+ ${project.groupId}
+ ${project.artifactId}
+ ${project.version}
+ ${project.packaging}
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-clean-plugin
+ 2.4.1
+
+
+
+ ${basedir}/../interpreter/lens
+ false
+
+
+
+
+
+
+
+
+
+ inmobi.repo
+ https://github.com/InMobi/mvn-repo/raw/master/releases
+
+ false
+
+
+
+ inmobi.snapshots
+ https://github.com/InMobi/mvn-repo/raw/master/snapshots
+
+ false
+
+
+ true
+
+
+
+ central
+ http://repo1.maven.org/maven2
+
+ false
+
+
+
+ cloudera
+ https://repository.cloudera.com/artifactory/cloudera-repos
+
+ true
+ never
+
+
+ false
+ never
+
+
+
+ Codehaus repository
+ http://repository.codehaus.org/
+
+ false
+
+
+
+ apache.snapshots.repo
+ https://repository.apache.org/content/groups/snapshots
+ Apache Snapshots Repository
+
+ false
+
+
+ true
+
+
+
+ default
+ https://repository.apache.org/content/groups/public/
+
+
+ projectlombok.org
+ http://projectlombok.org/mavenrepo
+
+ false
+
+
+
+
+ ext-release-local
+ http://repo.springsource.org/simple/ext-release-local/
+
+ false
+
+
+
+
+
diff --git a/lens/src/main/java/org/apache/zeppelin/lens/ExecutionDetail.java b/lens/src/main/java/org/apache/zeppelin/lens/ExecutionDetail.java
new file mode 100644
index 00000000000..3f8b9abbef6
--- /dev/null
+++ b/lens/src/main/java/org/apache/zeppelin/lens/ExecutionDetail.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.lens;
+
+import org.apache.lens.client.LensClient;
+import org.springframework.shell.core.JLineShell;
+/**
+ * Pojo tracking query execution details
+ * Used to cancel the query
+ */
+public class ExecutionDetail {
+ private String queryHandle;
+ private LensClient lensClient;
+ private JLineShell shell;
+ ExecutionDetail(String qh, LensClient lensClient, JLineShell shell) {
+ this.queryHandle = qh;
+ this.lensClient = lensClient;
+ this.shell = shell;
+ }
+ public JLineShell getShell() {
+ return shell;
+ }
+ public String getQueryHandle() {
+ return queryHandle;
+ }
+ public LensClient getLensClient() {
+ return lensClient;
+ }
+}
diff --git a/lens/src/main/java/org/apache/zeppelin/lens/LensBootstrap.java b/lens/src/main/java/org/apache/zeppelin/lens/LensBootstrap.java
new file mode 100644
index 00000000000..2ce9c57db7a
--- /dev/null
+++ b/lens/src/main/java/org/apache/zeppelin/lens/LensBootstrap.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2011-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.lens;
+
+import java.util.Properties;
+
+import org.springframework.beans.factory.support.DefaultListableBeanFactory;
+import org.springframework.beans.factory.support.RootBeanDefinition;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.shell.core.Shell;
+
+/**
+ * workaround for https://github.com/spring-projects/spring-shell/issues/73
+ */
+public class LensBootstrap extends org.springframework.shell.Bootstrap {
+ public LensBootstrap() {
+ super();
+ }
+ public LensJLineShellComponent getLensJLineShellComponent() {
+ GenericApplicationContext ctx = (GenericApplicationContext) getApplicationContext();
+ RootBeanDefinition rbd = new RootBeanDefinition();
+ rbd.setBeanClass(LensJLineShellComponent.class);
+ DefaultListableBeanFactory bf = (DefaultListableBeanFactory) ctx.getBeanFactory();
+ bf.registerBeanDefinition(LensJLineShellComponent.class.getSimpleName(), rbd);
+ return ctx.getBean(LensJLineShellComponent.class);
+ }
+}
diff --git a/lens/src/main/java/org/apache/zeppelin/lens/LensInterpreter.java b/lens/src/main/java/org/apache/zeppelin/lens/LensInterpreter.java
new file mode 100644
index 00000000000..26327754f47
--- /dev/null
+++ b/lens/src/main/java/org/apache/zeppelin/lens/LensInterpreter.java
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.lens;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.io.ByteArrayOutputStream;
+
+import org.apache.lens.client.LensClient;
+import org.apache.lens.client.LensClientConfig;
+import org.apache.lens.client.LensClientSingletonWrapper;
+import org.apache.lens.cli.commands.BaseLensCommand;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.shell.Bootstrap;
+import org.springframework.shell.core.CommandResult;
+import org.springframework.shell.core.JLineShell;
+import org.springframework.shell.core.JLineShellComponent;
+import org.springframework.shell.support.logging.HandlerUtils;
+
+
+/**
+ * Lens interpreter for Zeppelin.
+ */
+public class LensInterpreter extends Interpreter {
+
+ static final Logger s_logger = LoggerFactory.getLogger(LensInterpreter.class);
+ static final String LENS_CLIENT_DBNAME = "lens.client.dbname";
+ static final String LENS_SERVER_URL = "lens.server.base.url";
+ static final String LENS_SESSION_CLUSTER_USER = "lens.session.cluster.user";
+ static final String LENS_PERSIST_RESULTSET = "lens.query.enable.persistent.resultset";
+ static final String ZEPPELIN_LENS_RUN_CONCURRENT_SESSION = "zeppelin.lens.run.concurrent";
+ static final String ZEPPELIN_LENS_CONCURRENT_SESSIONS = "zeppelin.lens.maxThreads";
+ static final String ZEPPELIN_MAX_ROWS = "zeppelin.lens.maxResults";
+ static final Map LENS_TABLE_FORMAT_REGEX = new LinkedHashMap() {
+ {
+ put("cubes", Pattern.compile(".*show\\s+cube.*"));
+ put("nativetables", Pattern.compile(".*show\\s+nativetable.*"));
+ put("storages", Pattern.compile(".*show\\s+storage.*"));
+ put("facts", Pattern.compile(".*show\\s+fact.*"));
+ put("dimensions", Pattern.compile(".*show\\s+dimension.*"));
+ put("params", Pattern.compile(".*show\\s+param.*"));
+ put("databases", Pattern.compile(".*show\\s+database.*"));
+ put("query results", Pattern.compile(".*query\\s+results.*"));
+ }
+ };
+
+ private static Pattern s_queryExecutePattern = Pattern.compile(".*query\\s+execute\\s+(.*)");
+ private static Map s_paraToQH =
+ new ConcurrentHashMap (); //tracks paragraphID -> Lens QueryHandle
+ private static Map s_clientMap =
+ new ConcurrentHashMap();
+
+ private int m_maxResults;
+ private int m_maxThreads;
+ private JLineShell m_shell;
+ private LensClientConfig m_lensConf;
+ private Bootstrap m_bs;
+ private LensClient m_lensClient;
+
+
+ static {
+ Interpreter.register(
+ "lens",
+ "lens",
+ LensInterpreter.class.getName(),
+ new InterpreterPropertyBuilder()
+ .add(ZEPPELIN_LENS_RUN_CONCURRENT_SESSION, "true", "Run concurrent Lens Sessions")
+ .add(ZEPPELIN_LENS_CONCURRENT_SESSIONS, "10",
+ "If concurrency is true then how many threads?")
+ .add(ZEPPELIN_MAX_ROWS, "1000", "max number of rows to display")
+ .add(LENS_SERVER_URL, "http://:/lensapi", "The URL for Lens Server")
+ .add(LENS_CLIENT_DBNAME, "default", "The database schema name")
+ .add(LENS_PERSIST_RESULTSET, "false", "Apache Lens to persist result in HDFS?")
+ .add(LENS_SESSION_CLUSTER_USER, "default", "Hadoop cluster username").build());
+ }
+
+ public LensInterpreter(Properties property) {
+ super(property);
+ try {
+ m_lensConf = new LensClientConfig();
+ m_lensConf.set(LENS_SERVER_URL, property.get(LENS_SERVER_URL).toString());
+ m_lensConf.set(LENS_CLIENT_DBNAME, property.get(LENS_CLIENT_DBNAME).toString());
+ m_lensConf.set(LENS_SESSION_CLUSTER_USER, property.get(LENS_SESSION_CLUSTER_USER).toString());
+ m_lensConf.set(LENS_PERSIST_RESULTSET, property.get(LENS_PERSIST_RESULTSET).toString());
+ try {
+ m_maxResults = Integer.parseInt(property.get(ZEPPELIN_MAX_ROWS).toString());
+ } catch (NumberFormatException|NullPointerException e) {
+ m_maxResults = 1000;
+ s_logger.error("unable to parse " + ZEPPELIN_MAX_ROWS + " :"
+ + property.get(ZEPPELIN_MAX_ROWS), e);
+ }
+ try {
+ m_maxThreads = Integer.parseInt(property.get(ZEPPELIN_LENS_CONCURRENT_SESSIONS).toString());
+ } catch (NumberFormatException|NullPointerException e) {
+ m_maxThreads = 10;
+ s_logger.error("unable to parse " + ZEPPELIN_LENS_CONCURRENT_SESSIONS + " :"
+ + property.get(ZEPPELIN_LENS_CONCURRENT_SESSIONS), e);
+ }
+ s_logger.info("LensInterpreter created");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ s_logger.error("unable to create lens interpreter", e);
+ }
+ }
+
+ private Bootstrap createBootstrap() {
+ return new LensBootstrap();
+ }
+
+ private JLineShell getJLineShell(Bootstrap bs) {
+ if (bs instanceof LensBootstrap) {
+ return ((LensBootstrap) bs).getLensJLineShellComponent();
+ } else {
+ return bs.getJLineShellComponent();
+ }
+ }
+
+ protected void init() {
+ try {
+ m_bs = createBootstrap();
+ m_shell = getJLineShell(m_bs);
+ } catch (Exception ex) {
+ s_logger.error("could not initialize commandLine", ex);
+ }
+ }
+
+ @Override
+ public void open() {
+ s_logger.info("LensInterpreter opening");
+ m_lensClient = new LensClient(m_lensConf);
+ LensClientSingletonWrapper.instance().setClient(m_lensClient);
+ init();
+ s_logger.info("LensInterpreter opened");
+ }
+
+ @Override
+ public void close() {
+ closeConnections();
+ s_logger.info("LensInterpreter closed");
+ }
+
+ private static void closeConnections() {
+ for (LensClient cl : s_clientMap.keySet()) {
+ if (cl.isConnectionOpen()) {
+ closeLensClient(cl);
+ }
+ }
+ }
+
+ private static void closeLensClient(LensClient lensClient) {
+ try {
+ lensClient.closeConnection();
+ } catch (Exception e) {
+ s_logger.error("unable to close lensClient", e);
+ }
+ }
+
+ private LensClient createAndSetLensClient(Bootstrap bs) {
+ LensClient lensClient = null;
+ try {
+ lensClient = new LensClient(m_lensConf);
+
+ for (String beanName : bs.getApplicationContext().getBeanDefinitionNames()) {
+ if (bs.getApplicationContext().getBean(beanName) instanceof BaseLensCommand) {
+ ((BaseLensCommand) bs.getApplicationContext().getBean(beanName))
+ .setClient(lensClient);
+ }
+ }
+ } catch (Exception e) {
+ s_logger.error("unable to create lens client", e);
+ throw e;
+ }
+ return lensClient;
+ }
+
+ private InterpreterResult HandleHelp(JLineShell shell, String st) {
+ java.util.logging.StreamHandler sh = null;
+ java.util.logging.Logger springLogger = null;
+ java.util.logging.Formatter formatter = new java.util.logging.Formatter() {
+ public String format(java.util.logging.LogRecord record) {
+ return record.getMessage();
+ }
+ };
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ sh = new java.util.logging.StreamHandler(baos, formatter);
+ springLogger = HandlerUtils.getLogger(org.springframework.shell.core.SimpleParser.class);
+ springLogger.addHandler(sh);
+ shell.executeCommand(st);
+ } catch (Exception e) {
+ s_logger.error(e.getMessage(), e);
+ return new InterpreterResult(Code.ERROR, e.getMessage());
+ }
+ finally {
+ sh.flush();
+ springLogger.removeHandler(sh);
+ sh.close();
+ }
+ return new InterpreterResult(Code.SUCCESS, baos.toString());
+ }
+
+ private String modifyQueryStatement(String st) {
+ Matcher matcher = s_queryExecutePattern.matcher(st.toLowerCase());
+ if (!matcher.find()) {
+ return st;
+ }
+ StringBuilder sb = new StringBuilder("query execute ");
+ if (!st.toLowerCase().matches(".*--async\\s+true")) {
+ sb.append("--async true ");
+ }
+ sb.append(matcher.group(1));
+ if (!st.toLowerCase().matches(".*limit\\s+\\d+.*")) {
+ sb.append(" limit ");
+ sb.append(m_maxResults);
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public InterpreterResult interpret(String input, InterpreterContext context) {
+ if (input == null || input.length() == 0) {
+ return new InterpreterResult(Code.ERROR, "no command submitted");
+ }
+ String st = input.replaceAll("\\n", " ");
+ s_logger.info("LensInterpreter command: " + st);
+
+ Bootstrap bs = createBootstrap();
+ JLineShell shell = getJLineShell(bs);
+ CommandResult res = null;
+ LensClient lensClient = null;
+ String qh = null;
+
+ if (st.trim().startsWith("help")) {
+ return HandleHelp(shell, st);
+ }
+
+ try {
+ lensClient = createAndSetLensClient(bs);
+ s_clientMap.put(lensClient, true);
+
+ String lensCommand = modifyQueryStatement(st);
+
+ s_logger.info("executing command : " + lensCommand);
+ res = shell.executeCommand(lensCommand);
+
+ if (!lensCommand.equals(st) && res != null
+ && res.getResult() != null
+ && res.getResult().toString().trim().matches("[a-z0-9-]+")) {
+ // setup query progress tracking
+ qh = res.getResult().toString();
+ s_paraToQH.put(context.getParagraphId(),
+ new ExecutionDetail(qh, lensClient, shell));
+ String getResultsCmd = "query results --async false " + qh;
+ s_logger.info("executing query results command : " + context.getParagraphId()
+ + " : " + getResultsCmd);
+ res = shell.executeCommand(getResultsCmd);
+ s_paraToQH.remove(context.getParagraphId());
+ }
+ } catch (Exception ex) {
+ s_logger.error("error in interpret", ex);
+ return new InterpreterResult(Code.ERROR, ex.getMessage());
+ }
+ finally {
+ if (shell != null) {
+ closeShell(shell);
+ }
+ if (lensClient != null) {
+ closeLensClient(lensClient);
+ s_clientMap.remove(lensClient);
+ }
+ if (qh != null) {
+ s_paraToQH.remove(context.getParagraphId());
+ }
+ }
+ return new InterpreterResult(Code.SUCCESS, formatResult(st, res));
+ }
+
+ private void closeShell(JLineShell shell) {
+ if (shell instanceof LensJLineShellComponent) {
+ ((LensJLineShellComponent) shell).stop();
+ } else {
+ ((JLineShellComponent) shell).stop();
+ }
+ }
+
+ private String formatResult(String st, CommandResult result) {
+ if (result == null) {
+ return "error in interpret, no result object returned";
+ }
+ if (!result.isSuccess() || result.getResult() == null) {
+ if (result.getException() != null) {
+ return result.getException().getMessage();
+ //try describe cube (without cube name)- error is written as a warning,
+ //but not returned to result object
+ } else {
+ return "error in interpret, unable to execute command";
+ }
+ }
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry entry : LENS_TABLE_FORMAT_REGEX.entrySet()) {
+ if (entry.getValue().matcher(st.toLowerCase()).find()) {
+ sb.append("%table " + entry.getKey() + " \n");
+ break;
+ }
+ }
+ if (s_queryExecutePattern.matcher(st.toLowerCase()).find() &&
+ result.getResult().toString().contains(" rows process in (")) {
+ sb.append("%table ");
+ }
+ if (sb.length() > 0) {
+ return sb.append(result.getResult().toString()).toString();
+ }
+ return result.getResult().toString();
+ //Lens sends error messages without setting result.isSuccess() = false.
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ if (!s_paraToQH.containsKey(context.getParagraphId())) {
+ s_logger.error("ignoring cancel from " + context.getParagraphId());
+ return;
+ }
+ String qh = s_paraToQH.get(context.getParagraphId()).getQueryHandle();
+ s_logger.info("preparing to cancel : (" + context.getParagraphId() + ") :" + qh);
+ Bootstrap bs = createBootstrap();
+ JLineShell shell = getJLineShell(bs);
+ LensClient lensClient = null;
+ try {
+ lensClient = createAndSetLensClient(bs);
+ s_clientMap.put(lensClient, true);
+ s_logger.info("invoke query kill (" + context.getParagraphId() + ") " + qh);
+ CommandResult res = shell.executeCommand("query kill " + qh);
+ s_logger.info("query kill returned (" + context.getParagraphId() + ") " + qh
+ + " with: " + res.getResult());
+ } catch (Exception e) {
+ s_logger.error("unable to kill query ("
+ + context.getParagraphId() + ") " + qh, e);
+ } finally {
+ try {
+ if (lensClient != null) {
+ closeLensClient(lensClient);
+ s_clientMap.remove(lensClient);
+ }
+ closeLensClient(s_paraToQH.get(context.getParagraphId()).getLensClient());
+ closeShell(s_paraToQH.get(context.getParagraphId()).getShell());
+ } catch (Exception e) {
+ // ignore
+ }
+ s_paraToQH.remove(context.getParagraphId());
+ closeShell(shell);
+ }
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ if (s_paraToQH.containsKey(context.getParagraphId())) {
+ s_logger.info("number of items for which progress can be reported :" + s_paraToQH.size());
+ s_logger.info("number of open lensclient :" + s_clientMap.size());
+ Bootstrap bs = createBootstrap();
+ JLineShell shell = getJLineShell(bs);
+ LensClient lensClient = null;
+ String qh = s_paraToQH.get(context.getParagraphId()).getQueryHandle();
+ try {
+ s_logger.info("fetch query status for : (" + context.getParagraphId() + ") :" + qh);
+ lensClient = createAndSetLensClient(bs);
+ s_clientMap.put(lensClient, true);
+ CommandResult res = shell.executeCommand("query status " + qh);
+ s_logger.info(context.getParagraphId() + " --> " + res.getResult().toString());
+ //change to debug
+ Pattern pattern = Pattern.compile(".*(Progress : (\\d\\.\\d)).*");
+ Matcher matcher = pattern.matcher(res.getResult().toString().replaceAll("\\n", " "));
+ if (matcher.find(2)) {
+ Double d = Double.parseDouble(matcher.group(2)) * 100;
+ if (d.intValue() == 100) {
+ s_paraToQH.remove(context.getParagraphId());
+ }
+ return d.intValue();
+ } else {
+ return 1;
+ }
+ }
+ catch (Exception e) {
+ s_logger.error("unable to get progress for (" + context.getParagraphId() + ") :" + qh, e);
+ s_paraToQH.remove(context.getParagraphId());
+ return 0;
+ } finally {
+ if (lensClient != null) {
+ closeLensClient(lensClient);
+ s_clientMap.remove(lensClient);
+ }
+ if (shell != null) {
+ closeShell(shell);
+ }
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public List completion(String buf, int cursor) {
+ return null;
+ }
+
+ public boolean concurrentRequests() {
+ return Boolean.parseBoolean(getProperty(ZEPPELIN_LENS_RUN_CONCURRENT_SESSION));
+ }
+ @Override
+ public Scheduler getScheduler() {
+ if (concurrentRequests()) {
+ return SchedulerFactory.singleton().createOrGetParallelScheduler(
+ LensInterpreter.class.getName() + this.hashCode(), m_maxThreads);
+ } else {
+ return super.getScheduler();
+ }
+ }
+}
diff --git a/lens/src/main/java/org/apache/zeppelin/lens/LensJLineShellComponent.java b/lens/src/main/java/org/apache/zeppelin/lens/LensJLineShellComponent.java
new file mode 100644
index 00000000000..f025d5353a3
--- /dev/null
+++ b/lens/src/main/java/org/apache/zeppelin/lens/LensJLineShellComponent.java
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2011-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.lens;
+
+import java.util.Map;
+
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanFactoryUtils;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.SmartLifecycle;
+import org.springframework.shell.CommandLine;
+import org.springframework.shell.plugin.BannerProvider;
+import org.springframework.shell.plugin.HistoryFileNameProvider;
+import org.springframework.shell.plugin.PluginUtils;
+import org.springframework.shell.plugin.PromptProvider;
+import org.springframework.shell.core.*;
+
+/**
+ * workaround for https://github.com/spring-projects/spring-shell/issues/73
+ */
+public class LensJLineShellComponent extends JLineShell
+ implements SmartLifecycle, ApplicationContextAware, InitializingBean {
+
+ @Autowired
+ private CommandLine commandLine;
+
+ private volatile boolean running = false;
+ private Thread shellThread;
+
+ private ApplicationContext applicationContext;
+ private boolean printBanner = true;
+
+ private String historyFileName;
+ private String promptText;
+ private String productName;
+ private String banner;
+ private String version;
+ private String welcomeMessage;
+
+ private ExecutionStrategy executionStrategy = new LensSimpleExecutionStrategy();
+ private SimpleParser parser = new SimpleParser();
+
+ public SimpleParser getSimpleParser() {
+ return parser;
+ }
+
+ public boolean isAutoStartup() {
+ return false;
+ }
+
+ public void stop(Runnable callback) {
+ stop();
+ callback.run();
+ }
+
+ public int getPhase() {
+ return 1;
+ }
+
+ public void start() {
+ //customizePlug must run before start thread to take plugin's configuration into effect
+ customizePlugin();
+ shellThread = new Thread(this, "Spring Shell");
+ shellThread.start();
+ running = true;
+ }
+
+
+ public void stop() {
+ if (running) {
+ closeShell();
+ running = false;
+ }
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void afterPropertiesSet() {
+
+ Map commands =
+ BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,
+ CommandMarker.class);
+ for (CommandMarker command : commands.values()) {
+ getSimpleParser().add(command);
+ }
+
+ Map converters = BeanFactoryUtils
+ .beansOfTypeIncludingAncestors(applicationContext, Converter.class);
+ for (Converter> converter : converters.values()) {
+ getSimpleParser().add(converter);
+ }
+
+ setHistorySize(commandLine.getHistorySize());
+ if (commandLine.getShellCommandsToExecute() != null) {
+ setPrintBanner(false);
+ }
+ }
+
+ /**
+ * wait the shell command to complete by typing "quit" or "exit"
+ *
+ */
+ public void waitForComplete() {
+ try {
+ shellThread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected ExecutionStrategy getExecutionStrategy() {
+ return executionStrategy;
+ }
+
+ @Override
+ protected Parser getParser() {
+ return parser;
+ }
+
+ @Override
+ public String getStartupNotifications() {
+ return null;
+ }
+
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+ public void customizePlugin() {
+ this.historyFileName = getHistoryFileName();
+ this.promptText = getPromptText();
+ String[] banner = getBannerText();
+ this.banner = banner[0];
+ this.welcomeMessage = banner[1];
+ this.version = banner[2];
+ this.productName = banner[3];
+ }
+
+ /**
+ * get history file name from provider. The provider has highest order
+ * org.springframework.core.Ordered.getOder will win.
+ *
+ * @return history file name
+ */
+ protected String getHistoryFileName() {
+ HistoryFileNameProvider historyFileNameProvider = PluginUtils
+ .getHighestPriorityProvider(this.applicationContext, HistoryFileNameProvider.class);
+ String providerHistoryFileName = historyFileNameProvider.getHistoryFileName();
+ if (providerHistoryFileName != null) {
+ return providerHistoryFileName;
+ } else {
+ return historyFileName;
+ }
+ }
+
+ /**
+ * get prompt text from provider. The provider has highest order
+ * org.springframework.core.Ordered.getOder will win.
+ *
+ * @return prompt text
+ */
+ protected String getPromptText() {
+ PromptProvider promptProvider = PluginUtils
+ .getHighestPriorityProvider(this.applicationContext, PromptProvider.class);
+ String providerPromptText = promptProvider.getPrompt();
+ if (providerPromptText != null) {
+ return providerPromptText;
+ } else {
+ return promptText;
+ }
+ }
+
+ /**
+ * Get Banner and Welcome Message from provider. The provider has highest order
+ * org.springframework.core.Ordered.getOder will win.
+ * @return BannerText[0]: Banner
+ * BannerText[1]: Welcome Message
+ * BannerText[2]: Version
+ * BannerText[3]: Product Name
+ */
+ private String[] getBannerText() {
+ String[] bannerText = new String[4];
+ BannerProvider provider = PluginUtils
+ .getHighestPriorityProvider(this.applicationContext, BannerProvider.class);
+ bannerText[0] = provider.getBanner();
+ bannerText[1] = provider.getWelcomeMessage();
+ bannerText[2] = provider.getVersion();
+ bannerText[3] = provider.getProviderName();
+ return bannerText;
+ }
+
+ public void printBannerAndWelcome() {
+ if (printBanner) {
+ logger.info(this.banner);
+ logger.info(getWelcomeMessage());
+ }
+ }
+
+
+ /**
+ * get the welcome message at start.
+ *
+ * @return welcome message
+ */
+ public String getWelcomeMessage() {
+ return this.welcomeMessage;
+ }
+
+
+ /**
+ * @param printBanner the printBanner to set
+ */
+ public void setPrintBanner(boolean printBanner) {
+ this.printBanner = printBanner;
+ }
+
+ protected String getProductName() {
+ return productName;
+ }
+
+ protected String getVersion() {
+ return version;
+ }
+}
diff --git a/lens/src/main/java/org/apache/zeppelin/lens/LensSimpleExecutionStrategy.java b/lens/src/main/java/org/apache/zeppelin/lens/LensSimpleExecutionStrategy.java
new file mode 100644
index 00000000000..e3294adb7dd
--- /dev/null
+++ b/lens/src/main/java/org/apache/zeppelin/lens/LensSimpleExecutionStrategy.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2011-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.lens;
+
+import org.springframework.shell.core.*;
+
+import java.util.logging.Logger;
+
+import org.springframework.shell.event.ParseResult;
+import org.springframework.shell.support.logging.HandlerUtils;
+import org.springframework.util.Assert;
+import org.springframework.util.ReflectionUtils;
+
+/**
+ * workaround for https://github.com/spring-projects/spring-shell/issues/73
+ */
+public class LensSimpleExecutionStrategy implements ExecutionStrategy {
+
+ private static final Logger logger = HandlerUtils.getLogger(LensSimpleExecutionStrategy.class);
+
+ public Object execute(ParseResult parseResult) throws RuntimeException {
+ Assert.notNull(parseResult, "Parse result required");
+ logger.info("LensSimpleExecutionStrategy execute method invoked");
+ synchronized (this) {
+ Assert.isTrue(isReadyForCommands(), "SimpleExecutionStrategy not yet ready for commands");
+ Object target = parseResult.getInstance();
+ if (target instanceof ExecutionProcessor) {
+ ExecutionProcessor processor = ((ExecutionProcessor) target);
+ parseResult = processor.beforeInvocation(parseResult);
+ try {
+ Object result = invoke(parseResult);
+ processor.afterReturningInvocation(parseResult, result);
+ return result;
+ } catch (Throwable th) {
+ processor.afterThrowingInvocation(parseResult, th);
+ return handleThrowable(th);
+ }
+ }
+ else {
+ return invoke(parseResult);
+ }
+ }
+ }
+
+ private Object invoke(ParseResult parseResult) {
+ try {
+ return ReflectionUtils.invokeMethod(parseResult.getMethod(),
+ parseResult.getInstance(), parseResult.getArguments());
+ } catch (Throwable th) {
+ logger.severe("Command failed " + th);
+ return handleThrowable(th);
+ }
+ }
+
+ private Object handleThrowable(Throwable th) {
+ if (th instanceof Error) {
+ throw ((Error) th);
+ }
+ if (th instanceof RuntimeException) {
+ throw ((RuntimeException) th);
+ }
+ throw new RuntimeException(th);
+ }
+
+ public boolean isReadyForCommands() {
+ return true;
+ }
+
+ public void terminate() {
+ // do nothing
+ }
+
+}
diff --git a/lens/src/test/java/org/apache/zeppelin/lens/LensInterpreterTest.java b/lens/src/test/java/org/apache/zeppelin/lens/LensInterpreterTest.java
new file mode 100644
index 00000000000..5af8deb8448
--- /dev/null
+++ b/lens/src/test/java/org/apache/zeppelin/lens/LensInterpreterTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.lens;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.apache.zeppelin.lens.LensInterpreter.*;
+
+/**
+ * Lens interpreter unit tests
+ */
+public class LensInterpreterTest {
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void test() {
+ Properties prop = new Properties();
+ prop.setProperty(LENS_SERVER_URL, "http://127.0.0.1:9999/lensapi");
+ prop.setProperty(LENS_CLIENT_DBNAME, "default");
+ prop.setProperty(LENS_PERSIST_RESULTSET, "false");
+ prop.setProperty(LENS_SESSION_CLUSTER_USER, "default");
+ prop.setProperty(ZEPPELIN_MAX_ROWS, "1000");
+ prop.setProperty(ZEPPELIN_LENS_RUN_CONCURRENT_SESSION, "true");
+ prop.setProperty(ZEPPELIN_LENS_CONCURRENT_SESSIONS, "10");
+ LensInterpreter t = new MockLensInterpreter(prop);
+ t.open();
+ //simple help test
+ InterpreterResult result = t.interpret("help", null);
+ assertEquals(result.type(), InterpreterResult.Type.TEXT);
+ //assertEquals("unable to find 'query execute' in help message",
+ // result.message().contains("query execute"), result.message());
+ t.close();
+ }
+
+ class MockLensInterpreter extends LensInterpreter {
+ public MockLensInterpreter(Properties property) {
+ super(property);
+ }
+ @Override
+ public void open() {
+ super.init();
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index ecd819f9976..9f844374364 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,7 @@
hive
tajo
flink
+ lens
ignite
zeppelin-web
zeppelin-server
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 6bc8a6cea1e..cf853a3483c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -397,7 +397,8 @@ public static enum ConfVars {
+ "org.apache.zeppelin.shell.ShellInterpreter,"
+ "org.apache.zeppelin.hive.HiveInterpreter,"
+ "org.apache.zeppelin.tajo.TajoInterpreter,"
- + "org.apache.zeppelin.flink.FlinkInterpreter,"
+ + "org.apache.zeppelin.lens.LensInterpreter,"
+ + "org.apache.zeppelin.flink.FlinkInterpreter"
+ "org.apache.zeppelin.ignite.IgniteInterpreter,"
+ "org.apache.zeppelin.ignite.IgniteSqlInterpreter"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),