From 145a9366884320dacf6b47e96dd81987f91c4a28 Mon Sep 17 00:00:00 2001 From: Pranav Agarwal Date: Sat, 13 Jun 2015 01:24:44 +0530 Subject: [PATCH 1/4] Lens Interpreter --- lens/pom.xml | 303 ++++++++++++ .../apache/zeppelin/lens/ExecutionDetail.java | 44 ++ .../apache/zeppelin/lens/LensBootstrap.java | 41 ++ .../apache/zeppelin/lens/LensInterpreter.java | 451 ++++++++++++++++++ .../lens/LensJLineShellComponent.java | 244 ++++++++++ .../lens/LensSimpleExecutionStrategy.java | 86 ++++ .../zeppelin/lens/LensInterpreterTest.java | 71 +++ pom.xml | 1 + .../zeppelin/conf/ZeppelinConfiguration.java | 1 + 9 files changed, 1242 insertions(+) create mode 100644 lens/pom.xml create mode 100644 lens/src/main/java/org/apache/zeppelin/lens/ExecutionDetail.java create mode 100644 lens/src/main/java/org/apache/zeppelin/lens/LensBootstrap.java create mode 100644 lens/src/main/java/org/apache/zeppelin/lens/LensInterpreter.java create mode 100644 lens/src/main/java/org/apache/zeppelin/lens/LensJLineShellComponent.java create mode 100644 lens/src/main/java/org/apache/zeppelin/lens/LensSimpleExecutionStrategy.java create mode 100644 lens/src/test/java/org/apache/zeppelin/lens/LensInterpreterTest.java diff --git a/lens/pom.xml b/lens/pom.xml new file mode 100644 index 00000000000..a78be2cbecc --- /dev/null +++ b/lens/pom.xml @@ -0,0 +1,303 @@ + + + + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.5.0-incubating-SNAPSHOT + + + org.apache.zeppelin + zeppelin-lens + jar + 0.5.0-incubating-SNAPSHOT + Zeppelin: Lens interpreter + http://www.apache.org + + + 2.2.0-beta-incubating-SNAPSHOT + 1.1.0.RELEASE + 2.4.0 + + + + + org.apache.zeppelin + zeppelin-interpreter + ${project.version} + provided + + + + org.slf4j + slf4j-api + + + + org.slf4j + slf4j-log4j12 + + + + org.checkerframework + jdk7 + 1.9.1 + + + + org.apache.lens + lens-cli + ${lens.version} + + + + org.apache.lens + lens-client + ${lens.version} + + + + org.codehaus.jackson + jackson-core-asl + 1.9.13 + + + + org.codehaus.jackson + jackson-mapper-asl + 1.9.13 + + + + org.codehaus.jackson + jackson-xc + 1.9.11 + + + + org.codehaus.jackson + jackson-jaxrs + 1.9.11 + + + + org.glassfish.jersey.core + jersey-server + 2.3.1 + + + + org.glassfish.jersey.core + jersey-client + 2.3.1 + + + + org.springframework.shell + spring-shell + ${spring-shell.version} + + + + org.apache.hadoop + hadoop-common + ${hadoop-common.version} + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-json + + + com.sun.jersey + jersey-server + + + + + + junit + junit + test + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/lens + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/lens + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + org.apache.maven.plugins + maven-clean-plugin + 2.4.1 + + + + ${basedir}/../interpreter/lens + false + + + + + + + + + + inmobi.repo + https://github.com/InMobi/mvn-repo/raw/master/releases + + false + + + + inmobi.snapshots + https://github.com/InMobi/mvn-repo/raw/master/snapshots + + false + + + true + + + + central + http://repo1.maven.org/maven2 + + false + + + + cloudera + https://repository.cloudera.com/artifactory/cloudera-repos + + true + never + + + false + never + + + + Codehaus repository + http://repository.codehaus.org/ + + false + + + + apache.snapshots.repo + https://repository.apache.org/content/groups/snapshots + Apache Snapshots Repository + + false + + + true + + + + default + https://repository.apache.org/content/groups/public/ + + + projectlombok.org + http://projectlombok.org/mavenrepo + + false + + + + + ext-release-local + http://repo.springsource.org/simple/ext-release-local/ + + false + + + + + diff --git a/lens/src/main/java/org/apache/zeppelin/lens/ExecutionDetail.java b/lens/src/main/java/org/apache/zeppelin/lens/ExecutionDetail.java new file mode 100644 index 00000000000..3f8b9abbef6 --- /dev/null +++ b/lens/src/main/java/org/apache/zeppelin/lens/ExecutionDetail.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.lens; + +import org.apache.lens.client.LensClient; +import org.springframework.shell.core.JLineShell; +/** + * Pojo tracking query execution details + * Used to cancel the query + */ +public class ExecutionDetail { + private String queryHandle; + private LensClient lensClient; + private JLineShell shell; + ExecutionDetail(String qh, LensClient lensClient, JLineShell shell) { + this.queryHandle = qh; + this.lensClient = lensClient; + this.shell = shell; + } + public JLineShell getShell() { + return shell; + } + public String getQueryHandle() { + return queryHandle; + } + public LensClient getLensClient() { + return lensClient; + } +} diff --git a/lens/src/main/java/org/apache/zeppelin/lens/LensBootstrap.java b/lens/src/main/java/org/apache/zeppelin/lens/LensBootstrap.java new file mode 100644 index 00000000000..2ce9c57db7a --- /dev/null +++ b/lens/src/main/java/org/apache/zeppelin/lens/LensBootstrap.java @@ -0,0 +1,41 @@ +/* + * Copyright 2011-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.lens; + +import java.util.Properties; + +import org.springframework.beans.factory.support.DefaultListableBeanFactory; +import org.springframework.beans.factory.support.RootBeanDefinition; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.shell.core.Shell; + +/** + * workaround for https://github.com/spring-projects/spring-shell/issues/73 + */ +public class LensBootstrap extends org.springframework.shell.Bootstrap { + public LensBootstrap() { + super(); + } + public LensJLineShellComponent getLensJLineShellComponent() { + GenericApplicationContext ctx = (GenericApplicationContext) getApplicationContext(); + RootBeanDefinition rbd = new RootBeanDefinition(); + rbd.setBeanClass(LensJLineShellComponent.class); + DefaultListableBeanFactory bf = (DefaultListableBeanFactory) ctx.getBeanFactory(); + bf.registerBeanDefinition(LensJLineShellComponent.class.getSimpleName(), rbd); + return ctx.getBean(LensJLineShellComponent.class); + } +} diff --git a/lens/src/main/java/org/apache/zeppelin/lens/LensInterpreter.java b/lens/src/main/java/org/apache/zeppelin/lens/LensInterpreter.java new file mode 100644 index 00000000000..26327754f47 --- /dev/null +++ b/lens/src/main/java/org/apache/zeppelin/lens/LensInterpreter.java @@ -0,0 +1,451 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.lens; + +import java.util.List; +import java.util.Properties; +import java.util.regex.Pattern; +import java.util.regex.Matcher; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.io.ByteArrayOutputStream; + +import org.apache.lens.client.LensClient; +import org.apache.lens.client.LensClientConfig; +import org.apache.lens.client.LensClientSingletonWrapper; +import org.apache.lens.cli.commands.BaseLensCommand; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.shell.Bootstrap; +import org.springframework.shell.core.CommandResult; +import org.springframework.shell.core.JLineShell; +import org.springframework.shell.core.JLineShellComponent; +import org.springframework.shell.support.logging.HandlerUtils; + + +/** + * Lens interpreter for Zeppelin. + */ +public class LensInterpreter extends Interpreter { + + static final Logger s_logger = LoggerFactory.getLogger(LensInterpreter.class); + static final String LENS_CLIENT_DBNAME = "lens.client.dbname"; + static final String LENS_SERVER_URL = "lens.server.base.url"; + static final String LENS_SESSION_CLUSTER_USER = "lens.session.cluster.user"; + static final String LENS_PERSIST_RESULTSET = "lens.query.enable.persistent.resultset"; + static final String ZEPPELIN_LENS_RUN_CONCURRENT_SESSION = "zeppelin.lens.run.concurrent"; + static final String ZEPPELIN_LENS_CONCURRENT_SESSIONS = "zeppelin.lens.maxThreads"; + static final String ZEPPELIN_MAX_ROWS = "zeppelin.lens.maxResults"; + static final Map LENS_TABLE_FORMAT_REGEX = new LinkedHashMap() { + { + put("cubes", Pattern.compile(".*show\\s+cube.*")); + put("nativetables", Pattern.compile(".*show\\s+nativetable.*")); + put("storages", Pattern.compile(".*show\\s+storage.*")); + put("facts", Pattern.compile(".*show\\s+fact.*")); + put("dimensions", Pattern.compile(".*show\\s+dimension.*")); + put("params", Pattern.compile(".*show\\s+param.*")); + put("databases", Pattern.compile(".*show\\s+database.*")); + put("query results", Pattern.compile(".*query\\s+results.*")); + } + }; + + private static Pattern s_queryExecutePattern = Pattern.compile(".*query\\s+execute\\s+(.*)"); + private static Map s_paraToQH = + new ConcurrentHashMap (); //tracks paragraphID -> Lens QueryHandle + private static Map s_clientMap = + new ConcurrentHashMap(); + + private int m_maxResults; + private int m_maxThreads; + private JLineShell m_shell; + private LensClientConfig m_lensConf; + private Bootstrap m_bs; + private LensClient m_lensClient; + + + static { + Interpreter.register( + "lens", + "lens", + LensInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add(ZEPPELIN_LENS_RUN_CONCURRENT_SESSION, "true", "Run concurrent Lens Sessions") + .add(ZEPPELIN_LENS_CONCURRENT_SESSIONS, "10", + "If concurrency is true then how many threads?") + .add(ZEPPELIN_MAX_ROWS, "1000", "max number of rows to display") + .add(LENS_SERVER_URL, "http://:/lensapi", "The URL for Lens Server") + .add(LENS_CLIENT_DBNAME, "default", "The database schema name") + .add(LENS_PERSIST_RESULTSET, "false", "Apache Lens to persist result in HDFS?") + .add(LENS_SESSION_CLUSTER_USER, "default", "Hadoop cluster username").build()); + } + + public LensInterpreter(Properties property) { + super(property); + try { + m_lensConf = new LensClientConfig(); + m_lensConf.set(LENS_SERVER_URL, property.get(LENS_SERVER_URL).toString()); + m_lensConf.set(LENS_CLIENT_DBNAME, property.get(LENS_CLIENT_DBNAME).toString()); + m_lensConf.set(LENS_SESSION_CLUSTER_USER, property.get(LENS_SESSION_CLUSTER_USER).toString()); + m_lensConf.set(LENS_PERSIST_RESULTSET, property.get(LENS_PERSIST_RESULTSET).toString()); + try { + m_maxResults = Integer.parseInt(property.get(ZEPPELIN_MAX_ROWS).toString()); + } catch (NumberFormatException|NullPointerException e) { + m_maxResults = 1000; + s_logger.error("unable to parse " + ZEPPELIN_MAX_ROWS + " :" + + property.get(ZEPPELIN_MAX_ROWS), e); + } + try { + m_maxThreads = Integer.parseInt(property.get(ZEPPELIN_LENS_CONCURRENT_SESSIONS).toString()); + } catch (NumberFormatException|NullPointerException e) { + m_maxThreads = 10; + s_logger.error("unable to parse " + ZEPPELIN_LENS_CONCURRENT_SESSIONS + " :" + + property.get(ZEPPELIN_LENS_CONCURRENT_SESSIONS), e); + } + s_logger.info("LensInterpreter created"); + } + catch (Exception e) { + e.printStackTrace(); + s_logger.error("unable to create lens interpreter", e); + } + } + + private Bootstrap createBootstrap() { + return new LensBootstrap(); + } + + private JLineShell getJLineShell(Bootstrap bs) { + if (bs instanceof LensBootstrap) { + return ((LensBootstrap) bs).getLensJLineShellComponent(); + } else { + return bs.getJLineShellComponent(); + } + } + + protected void init() { + try { + m_bs = createBootstrap(); + m_shell = getJLineShell(m_bs); + } catch (Exception ex) { + s_logger.error("could not initialize commandLine", ex); + } + } + + @Override + public void open() { + s_logger.info("LensInterpreter opening"); + m_lensClient = new LensClient(m_lensConf); + LensClientSingletonWrapper.instance().setClient(m_lensClient); + init(); + s_logger.info("LensInterpreter opened"); + } + + @Override + public void close() { + closeConnections(); + s_logger.info("LensInterpreter closed"); + } + + private static void closeConnections() { + for (LensClient cl : s_clientMap.keySet()) { + if (cl.isConnectionOpen()) { + closeLensClient(cl); + } + } + } + + private static void closeLensClient(LensClient lensClient) { + try { + lensClient.closeConnection(); + } catch (Exception e) { + s_logger.error("unable to close lensClient", e); + } + } + + private LensClient createAndSetLensClient(Bootstrap bs) { + LensClient lensClient = null; + try { + lensClient = new LensClient(m_lensConf); + + for (String beanName : bs.getApplicationContext().getBeanDefinitionNames()) { + if (bs.getApplicationContext().getBean(beanName) instanceof BaseLensCommand) { + ((BaseLensCommand) bs.getApplicationContext().getBean(beanName)) + .setClient(lensClient); + } + } + } catch (Exception e) { + s_logger.error("unable to create lens client", e); + throw e; + } + return lensClient; + } + + private InterpreterResult HandleHelp(JLineShell shell, String st) { + java.util.logging.StreamHandler sh = null; + java.util.logging.Logger springLogger = null; + java.util.logging.Formatter formatter = new java.util.logging.Formatter() { + public String format(java.util.logging.LogRecord record) { + return record.getMessage(); + } + }; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + sh = new java.util.logging.StreamHandler(baos, formatter); + springLogger = HandlerUtils.getLogger(org.springframework.shell.core.SimpleParser.class); + springLogger.addHandler(sh); + shell.executeCommand(st); + } catch (Exception e) { + s_logger.error(e.getMessage(), e); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } + finally { + sh.flush(); + springLogger.removeHandler(sh); + sh.close(); + } + return new InterpreterResult(Code.SUCCESS, baos.toString()); + } + + private String modifyQueryStatement(String st) { + Matcher matcher = s_queryExecutePattern.matcher(st.toLowerCase()); + if (!matcher.find()) { + return st; + } + StringBuilder sb = new StringBuilder("query execute "); + if (!st.toLowerCase().matches(".*--async\\s+true")) { + sb.append("--async true "); + } + sb.append(matcher.group(1)); + if (!st.toLowerCase().matches(".*limit\\s+\\d+.*")) { + sb.append(" limit "); + sb.append(m_maxResults); + } + return sb.toString(); + } + + @Override + public InterpreterResult interpret(String input, InterpreterContext context) { + if (input == null || input.length() == 0) { + return new InterpreterResult(Code.ERROR, "no command submitted"); + } + String st = input.replaceAll("\\n", " "); + s_logger.info("LensInterpreter command: " + st); + + Bootstrap bs = createBootstrap(); + JLineShell shell = getJLineShell(bs); + CommandResult res = null; + LensClient lensClient = null; + String qh = null; + + if (st.trim().startsWith("help")) { + return HandleHelp(shell, st); + } + + try { + lensClient = createAndSetLensClient(bs); + s_clientMap.put(lensClient, true); + + String lensCommand = modifyQueryStatement(st); + + s_logger.info("executing command : " + lensCommand); + res = shell.executeCommand(lensCommand); + + if (!lensCommand.equals(st) && res != null + && res.getResult() != null + && res.getResult().toString().trim().matches("[a-z0-9-]+")) { + // setup query progress tracking + qh = res.getResult().toString(); + s_paraToQH.put(context.getParagraphId(), + new ExecutionDetail(qh, lensClient, shell)); + String getResultsCmd = "query results --async false " + qh; + s_logger.info("executing query results command : " + context.getParagraphId() + + " : " + getResultsCmd); + res = shell.executeCommand(getResultsCmd); + s_paraToQH.remove(context.getParagraphId()); + } + } catch (Exception ex) { + s_logger.error("error in interpret", ex); + return new InterpreterResult(Code.ERROR, ex.getMessage()); + } + finally { + if (shell != null) { + closeShell(shell); + } + if (lensClient != null) { + closeLensClient(lensClient); + s_clientMap.remove(lensClient); + } + if (qh != null) { + s_paraToQH.remove(context.getParagraphId()); + } + } + return new InterpreterResult(Code.SUCCESS, formatResult(st, res)); + } + + private void closeShell(JLineShell shell) { + if (shell instanceof LensJLineShellComponent) { + ((LensJLineShellComponent) shell).stop(); + } else { + ((JLineShellComponent) shell).stop(); + } + } + + private String formatResult(String st, CommandResult result) { + if (result == null) { + return "error in interpret, no result object returned"; + } + if (!result.isSuccess() || result.getResult() == null) { + if (result.getException() != null) { + return result.getException().getMessage(); + //try describe cube (without cube name)- error is written as a warning, + //but not returned to result object + } else { + return "error in interpret, unable to execute command"; + } + } + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : LENS_TABLE_FORMAT_REGEX.entrySet()) { + if (entry.getValue().matcher(st.toLowerCase()).find()) { + sb.append("%table " + entry.getKey() + " \n"); + break; + } + } + if (s_queryExecutePattern.matcher(st.toLowerCase()).find() && + result.getResult().toString().contains(" rows process in (")) { + sb.append("%table "); + } + if (sb.length() > 0) { + return sb.append(result.getResult().toString()).toString(); + } + return result.getResult().toString(); + //Lens sends error messages without setting result.isSuccess() = false. + } + + @Override + public void cancel(InterpreterContext context) { + if (!s_paraToQH.containsKey(context.getParagraphId())) { + s_logger.error("ignoring cancel from " + context.getParagraphId()); + return; + } + String qh = s_paraToQH.get(context.getParagraphId()).getQueryHandle(); + s_logger.info("preparing to cancel : (" + context.getParagraphId() + ") :" + qh); + Bootstrap bs = createBootstrap(); + JLineShell shell = getJLineShell(bs); + LensClient lensClient = null; + try { + lensClient = createAndSetLensClient(bs); + s_clientMap.put(lensClient, true); + s_logger.info("invoke query kill (" + context.getParagraphId() + ") " + qh); + CommandResult res = shell.executeCommand("query kill " + qh); + s_logger.info("query kill returned (" + context.getParagraphId() + ") " + qh + + " with: " + res.getResult()); + } catch (Exception e) { + s_logger.error("unable to kill query (" + + context.getParagraphId() + ") " + qh, e); + } finally { + try { + if (lensClient != null) { + closeLensClient(lensClient); + s_clientMap.remove(lensClient); + } + closeLensClient(s_paraToQH.get(context.getParagraphId()).getLensClient()); + closeShell(s_paraToQH.get(context.getParagraphId()).getShell()); + } catch (Exception e) { + // ignore + } + s_paraToQH.remove(context.getParagraphId()); + closeShell(shell); + } + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + if (s_paraToQH.containsKey(context.getParagraphId())) { + s_logger.info("number of items for which progress can be reported :" + s_paraToQH.size()); + s_logger.info("number of open lensclient :" + s_clientMap.size()); + Bootstrap bs = createBootstrap(); + JLineShell shell = getJLineShell(bs); + LensClient lensClient = null; + String qh = s_paraToQH.get(context.getParagraphId()).getQueryHandle(); + try { + s_logger.info("fetch query status for : (" + context.getParagraphId() + ") :" + qh); + lensClient = createAndSetLensClient(bs); + s_clientMap.put(lensClient, true); + CommandResult res = shell.executeCommand("query status " + qh); + s_logger.info(context.getParagraphId() + " --> " + res.getResult().toString()); + //change to debug + Pattern pattern = Pattern.compile(".*(Progress : (\\d\\.\\d)).*"); + Matcher matcher = pattern.matcher(res.getResult().toString().replaceAll("\\n", " ")); + if (matcher.find(2)) { + Double d = Double.parseDouble(matcher.group(2)) * 100; + if (d.intValue() == 100) { + s_paraToQH.remove(context.getParagraphId()); + } + return d.intValue(); + } else { + return 1; + } + } + catch (Exception e) { + s_logger.error("unable to get progress for (" + context.getParagraphId() + ") :" + qh, e); + s_paraToQH.remove(context.getParagraphId()); + return 0; + } finally { + if (lensClient != null) { + closeLensClient(lensClient); + s_clientMap.remove(lensClient); + } + if (shell != null) { + closeShell(shell); + } + } + } + return 0; + } + + @Override + public List completion(String buf, int cursor) { + return null; + } + + public boolean concurrentRequests() { + return Boolean.parseBoolean(getProperty(ZEPPELIN_LENS_RUN_CONCURRENT_SESSION)); + } + @Override + public Scheduler getScheduler() { + if (concurrentRequests()) { + return SchedulerFactory.singleton().createOrGetParallelScheduler( + LensInterpreter.class.getName() + this.hashCode(), m_maxThreads); + } else { + return super.getScheduler(); + } + } +} diff --git a/lens/src/main/java/org/apache/zeppelin/lens/LensJLineShellComponent.java b/lens/src/main/java/org/apache/zeppelin/lens/LensJLineShellComponent.java new file mode 100644 index 00000000000..f025d5353a3 --- /dev/null +++ b/lens/src/main/java/org/apache/zeppelin/lens/LensJLineShellComponent.java @@ -0,0 +1,244 @@ +/* + * Copyright 2011-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.lens; + +import java.util.Map; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactoryUtils; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.SmartLifecycle; +import org.springframework.shell.CommandLine; +import org.springframework.shell.plugin.BannerProvider; +import org.springframework.shell.plugin.HistoryFileNameProvider; +import org.springframework.shell.plugin.PluginUtils; +import org.springframework.shell.plugin.PromptProvider; +import org.springframework.shell.core.*; + +/** + * workaround for https://github.com/spring-projects/spring-shell/issues/73 + */ +public class LensJLineShellComponent extends JLineShell + implements SmartLifecycle, ApplicationContextAware, InitializingBean { + + @Autowired + private CommandLine commandLine; + + private volatile boolean running = false; + private Thread shellThread; + + private ApplicationContext applicationContext; + private boolean printBanner = true; + + private String historyFileName; + private String promptText; + private String productName; + private String banner; + private String version; + private String welcomeMessage; + + private ExecutionStrategy executionStrategy = new LensSimpleExecutionStrategy(); + private SimpleParser parser = new SimpleParser(); + + public SimpleParser getSimpleParser() { + return parser; + } + + public boolean isAutoStartup() { + return false; + } + + public void stop(Runnable callback) { + stop(); + callback.run(); + } + + public int getPhase() { + return 1; + } + + public void start() { + //customizePlug must run before start thread to take plugin's configuration into effect + customizePlugin(); + shellThread = new Thread(this, "Spring Shell"); + shellThread.start(); + running = true; + } + + + public void stop() { + if (running) { + closeShell(); + running = false; + } + } + + public boolean isRunning() { + return running; + } + + @SuppressWarnings("rawtypes") + public void afterPropertiesSet() { + + Map commands = + BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, + CommandMarker.class); + for (CommandMarker command : commands.values()) { + getSimpleParser().add(command); + } + + Map converters = BeanFactoryUtils + .beansOfTypeIncludingAncestors(applicationContext, Converter.class); + for (Converter converter : converters.values()) { + getSimpleParser().add(converter); + } + + setHistorySize(commandLine.getHistorySize()); + if (commandLine.getShellCommandsToExecute() != null) { + setPrintBanner(false); + } + } + + /** + * wait the shell command to complete by typing "quit" or "exit" + * + */ + public void waitForComplete() { + try { + shellThread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + protected ExecutionStrategy getExecutionStrategy() { + return executionStrategy; + } + + @Override + protected Parser getParser() { + return parser; + } + + @Override + public String getStartupNotifications() { + return null; + } + + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + public void customizePlugin() { + this.historyFileName = getHistoryFileName(); + this.promptText = getPromptText(); + String[] banner = getBannerText(); + this.banner = banner[0]; + this.welcomeMessage = banner[1]; + this.version = banner[2]; + this.productName = banner[3]; + } + + /** + * get history file name from provider. The provider has highest order + * org.springframework.core.Ordered.getOder will win. + * + * @return history file name + */ + protected String getHistoryFileName() { + HistoryFileNameProvider historyFileNameProvider = PluginUtils + .getHighestPriorityProvider(this.applicationContext, HistoryFileNameProvider.class); + String providerHistoryFileName = historyFileNameProvider.getHistoryFileName(); + if (providerHistoryFileName != null) { + return providerHistoryFileName; + } else { + return historyFileName; + } + } + + /** + * get prompt text from provider. The provider has highest order + * org.springframework.core.Ordered.getOder will win. + * + * @return prompt text + */ + protected String getPromptText() { + PromptProvider promptProvider = PluginUtils + .getHighestPriorityProvider(this.applicationContext, PromptProvider.class); + String providerPromptText = promptProvider.getPrompt(); + if (providerPromptText != null) { + return providerPromptText; + } else { + return promptText; + } + } + + /** + * Get Banner and Welcome Message from provider. The provider has highest order + * org.springframework.core.Ordered.getOder will win. + * @return BannerText[0]: Banner + * BannerText[1]: Welcome Message + * BannerText[2]: Version + * BannerText[3]: Product Name + */ + private String[] getBannerText() { + String[] bannerText = new String[4]; + BannerProvider provider = PluginUtils + .getHighestPriorityProvider(this.applicationContext, BannerProvider.class); + bannerText[0] = provider.getBanner(); + bannerText[1] = provider.getWelcomeMessage(); + bannerText[2] = provider.getVersion(); + bannerText[3] = provider.getProviderName(); + return bannerText; + } + + public void printBannerAndWelcome() { + if (printBanner) { + logger.info(this.banner); + logger.info(getWelcomeMessage()); + } + } + + + /** + * get the welcome message at start. + * + * @return welcome message + */ + public String getWelcomeMessage() { + return this.welcomeMessage; + } + + + /** + * @param printBanner the printBanner to set + */ + public void setPrintBanner(boolean printBanner) { + this.printBanner = printBanner; + } + + protected String getProductName() { + return productName; + } + + protected String getVersion() { + return version; + } +} diff --git a/lens/src/main/java/org/apache/zeppelin/lens/LensSimpleExecutionStrategy.java b/lens/src/main/java/org/apache/zeppelin/lens/LensSimpleExecutionStrategy.java new file mode 100644 index 00000000000..e3294adb7dd --- /dev/null +++ b/lens/src/main/java/org/apache/zeppelin/lens/LensSimpleExecutionStrategy.java @@ -0,0 +1,86 @@ +/* + * Copyright 2011-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.lens; + +import org.springframework.shell.core.*; + +import java.util.logging.Logger; + +import org.springframework.shell.event.ParseResult; +import org.springframework.shell.support.logging.HandlerUtils; +import org.springframework.util.Assert; +import org.springframework.util.ReflectionUtils; + +/** + * workaround for https://github.com/spring-projects/spring-shell/issues/73 + */ +public class LensSimpleExecutionStrategy implements ExecutionStrategy { + + private static final Logger logger = HandlerUtils.getLogger(LensSimpleExecutionStrategy.class); + + public Object execute(ParseResult parseResult) throws RuntimeException { + Assert.notNull(parseResult, "Parse result required"); + logger.info("LensSimpleExecutionStrategy execute method invoked"); + synchronized (this) { + Assert.isTrue(isReadyForCommands(), "SimpleExecutionStrategy not yet ready for commands"); + Object target = parseResult.getInstance(); + if (target instanceof ExecutionProcessor) { + ExecutionProcessor processor = ((ExecutionProcessor) target); + parseResult = processor.beforeInvocation(parseResult); + try { + Object result = invoke(parseResult); + processor.afterReturningInvocation(parseResult, result); + return result; + } catch (Throwable th) { + processor.afterThrowingInvocation(parseResult, th); + return handleThrowable(th); + } + } + else { + return invoke(parseResult); + } + } + } + + private Object invoke(ParseResult parseResult) { + try { + return ReflectionUtils.invokeMethod(parseResult.getMethod(), + parseResult.getInstance(), parseResult.getArguments()); + } catch (Throwable th) { + logger.severe("Command failed " + th); + return handleThrowable(th); + } + } + + private Object handleThrowable(Throwable th) { + if (th instanceof Error) { + throw ((Error) th); + } + if (th instanceof RuntimeException) { + throw ((RuntimeException) th); + } + throw new RuntimeException(th); + } + + public boolean isReadyForCommands() { + return true; + } + + public void terminate() { + // do nothing + } + +} diff --git a/lens/src/test/java/org/apache/zeppelin/lens/LensInterpreterTest.java b/lens/src/test/java/org/apache/zeppelin/lens/LensInterpreterTest.java new file mode 100644 index 00000000000..5af8deb8448 --- /dev/null +++ b/lens/src/test/java/org/apache/zeppelin/lens/LensInterpreterTest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.lens; + +import static org.junit.Assert.assertEquals; + +import java.util.Properties; + +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import static org.apache.zeppelin.lens.LensInterpreter.*; + +/** + * Lens interpreter unit tests + */ +public class LensInterpreterTest { + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void test() { + Properties prop = new Properties(); + prop.setProperty(LENS_SERVER_URL, "http://127.0.0.1:9999/lensapi"); + prop.setProperty(LENS_CLIENT_DBNAME, "default"); + prop.setProperty(LENS_PERSIST_RESULTSET, "false"); + prop.setProperty(LENS_SESSION_CLUSTER_USER, "default"); + prop.setProperty(ZEPPELIN_MAX_ROWS, "1000"); + prop.setProperty(ZEPPELIN_LENS_RUN_CONCURRENT_SESSION, "true"); + prop.setProperty(ZEPPELIN_LENS_CONCURRENT_SESSIONS, "10"); + LensInterpreter t = new MockLensInterpreter(prop); + t.open(); + //simple help test + InterpreterResult result = t.interpret("help", null); + assertEquals(result.type(), InterpreterResult.Type.TEXT); + //assertEquals("unable to find 'query execute' in help message", + // result.message().contains("query execute"), result.message()); + t.close(); + } + + class MockLensInterpreter extends LensInterpreter { + public MockLensInterpreter(Properties property) { + super(property); + } + @Override + public void open() { + super.init(); + } + } +} diff --git a/pom.xml b/pom.xml index dd3c2a3684d..c88e8f9ea30 100644 --- a/pom.xml +++ b/pom.xml @@ -93,6 +93,7 @@ hive tajo flink + lens zeppelin-web zeppelin-server zeppelin-distribution diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 78a463cc196..fc6296b4621 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -390,6 +390,7 @@ public static enum ConfVars { + "org.apache.zeppelin.shell.ShellInterpreter," + "org.apache.zeppelin.hive.HiveInterpreter," + "org.apache.zeppelin.tajo.TajoInterpreter," + + "org.apache.zeppelin.lens.LensInterpreter," + "org.apache.zeppelin.flink.FlinkInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), From 24f0e45e56993eeadd8a955dddda79aa43cb00e4 Mon Sep 17 00:00:00 2001 From: Pranav Agarwal Date: Sun, 21 Jun 2015 23:29:22 +0530 Subject: [PATCH 2/4] added lens entry to conf/zeppelin-site.xml.template --- conf/zeppelin-site.xml.template | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index b467a825b70..8d022ad5cca 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -66,7 +66,11 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter +<<<<<<< Updated upstream + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter +======= + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.flink.FlinkInterpreter +>>>>>>> Stashed changes Comma separated interpreter configurations. First interpreter become a default From 583dd6a4ccbae0e496b2fe5d9c90b56cbaa6dd63 Mon Sep 17 00:00:00 2001 From: Pranav Agarwal Date: Sun, 21 Jun 2015 23:44:22 +0530 Subject: [PATCH 3/4] file compile error --- .../java/org/apache/zeppelin/conf/ZeppelinConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 96b2c43ee1f..cf853a3483c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -398,7 +398,7 @@ public static enum ConfVars { + "org.apache.zeppelin.hive.HiveInterpreter," + "org.apache.zeppelin.tajo.TajoInterpreter," + "org.apache.zeppelin.lens.LensInterpreter," - + "org.apache.zeppelin.flink.FlinkInterpreter", + + "org.apache.zeppelin.flink.FlinkInterpreter" + "org.apache.zeppelin.ignite.IgniteInterpreter," + "org.apache.zeppelin.ignite.IgniteSqlInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), From d8694c234ca97949cf3e6fbbf23e792804f69cc6 Mon Sep 17 00:00:00 2001 From: Pranav Agarwal Date: Sun, 21 Jun 2015 23:47:27 +0530 Subject: [PATCH 4/4] fixed merge conflicts for conf/zeppelin-site.xml.template --- conf/zeppelin-site.xml.template | 4 ---- 1 file changed, 4 deletions(-) diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 8d022ad5cca..842cab4e6a4 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -66,11 +66,7 @@ zeppelin.interpreters -<<<<<<< Updated upstream org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter -======= - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.flink.FlinkInterpreter ->>>>>>> Stashed changes Comma separated interpreter configurations. First interpreter become a default