diff --git a/.gitignore b/.gitignore index 502ab0b69e3..ce81c89eb77 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ spark/derby.log spark/metastore_db spark-1.*-bin-hadoop* +zeppelin-server/derby.log lens/lens-cli-hist.log diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 16e26b76f6f..730de7b632b 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -138,7 +138,7 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter Comma separated interpreter configurations. First interpreter become a default diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html index 994e5dba65a..fb09a9e52af 100644 --- a/docs/_includes/themes/zeppelin/_navigation.html +++ b/docs/_includes/themes/zeppelin/_navigation.html @@ -46,6 +46,7 @@
  • Flink
  • Geode
  • HBase
  • +
  • HDFS
  • Hive
  • Ignite
  • JDBC
  • diff --git a/docs/interpreter/hdfs.md b/docs/interpreter/hdfs.md new file mode 100644 index 00000000000..f29755f523a --- /dev/null +++ b/docs/interpreter/hdfs.md @@ -0,0 +1,56 @@ +--- +layout: page +title: "HDFS File System Interpreter" +description: "" +group: manual +--- +{% include JB/setup %} + +## HDFS File System Interpreter for Apache Zeppelin + +[Hadoop File System](http://hadoop.apache.org/) is a distributed, fault tolerant file system part of the hadoop project and is often used as storage for distributed processing engines like [Hadoop MapReduce](http://hadoop.apache.org/) and [Apache Spark](http://spark.apache.org/) or underlying file systems like [Alluxio](http://www.alluxio.org/). + +## Configuration + + + + + + + + + + + + + + + + + + + + + +
    PropertyDefaultDescription
    hdfs.urlhttp://localhost:50070/webhdfs/v1/The URL for WebHDFS
    hdfs.userhdfsThe WebHDFS user
    hdfs.maxlength1000Maximum number of lines of results fetched
    + +
    +This interpreter connects to HDFS using the HTTP WebHDFS interface. +It supports the basic shell file commands applied to HDFS, it currently only supports browsing. + +* You can use ls [PATH] and ls -l [PATH] to list a directory. If the path is missing, then the current directory is listed. ls supports a -h flag for human readable file sizes. +* You can use cd [PATH] to change your current directory by giving a relative or an absolute path. +* You can invoke pwd to see your current directory. + +> **Tip :** Use ( Ctrl + . ) for autocompletion. + +### Create Interpreter + +In a notebook, to enable the **HDFS** interpreter, click the **Gear** icon and select **HDFS**. + + +#### WebHDFS REST API +You can confirm that you're able to access the WebHDFS API by running a curl command against the WebHDFS end point provided to the interpreter. + +Here is an example: +$> curl "http://localhost:50070/webhdfs/v1/?op=LISTSTATUS" \ No newline at end of file diff --git a/file/pom.xml b/file/pom.xml new file mode 100644 index 00000000000..586f41add2b --- /dev/null +++ b/file/pom.xml @@ -0,0 +1,146 @@ + + + + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.6.0-incubating-SNAPSHOT + + + org.apache.zeppelin + zeppelin-file + jar + 0.6.0-incubating-SNAPSHOT + Zeppelin File System Interpreters + http://www.apache.org + + + + org.apache.zeppelin + zeppelin-interpreter + ${project.version} + provided + + + + javax.ws.rs + javax.ws.rs-api + 2.0 + + + + org.slf4j + slf4j-api + + + + org.slf4j + slf4j-log4j12 + + + + org.glassfish.jersey.core + jersey-common + 2.22.2 + + + + + junit + junit + test + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.18.1 + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/file + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/file + false + false + true + + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + + diff --git a/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java b/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java new file mode 100644 index 00000000000..4d50ce52c34 --- /dev/null +++ b/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.file; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; + +/** + * File interpreter for Zeppelin. + * + */ +public abstract class FileInterpreter extends Interpreter { + Logger logger = LoggerFactory.getLogger(FileInterpreter.class); + String currentDir = null; + CommandArgs args = null; + + public FileInterpreter(Properties property) { + super(property); + currentDir = new String("/"); + } + + /** + * Handling the arguments of the command + */ + public class CommandArgs { + public String input = null; + public String command = null; + public ArrayList args = null; + public HashSet flags = null; + + public CommandArgs(String cmd) { + input = cmd; + args = new ArrayList(); + flags = new HashSet(); + } + + private void parseArg(String arg) { + if (arg.charAt(0) == '-') { // handle flags + for (int i = 0; i < arg.length(); i++) { + Character c = arg.charAt(i); + flags.add(c); + } + } else { // handle other args + args.add(arg); + } + } + + public void parseArgs() { + if (input == null) + return; + StringTokenizer st = new StringTokenizer(input); + if (st.hasMoreTokens()) { + command = st.nextToken(); + while (st.hasMoreTokens()) + parseArg(st.nextToken()); + } + } + } + + // Functions that each file system implementation must override + + public abstract String listAll(String path); + + public abstract boolean isDirectory(String path); + + // Combine paths, takes care of arguments such as .. + + protected String getNewPath(String argument){ + Path arg = Paths.get(argument); + Path ret = arg.isAbsolute() ? arg : Paths.get(currentDir, argument); + return ret.normalize().toString(); + } + + // Handle the command handling uniformly across all file systems + + @Override + public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { + logger.info("Run File command '" + cmd + "'"); + + args = new CommandArgs(cmd); + args.parseArgs(); + + if (args.command == null) { + logger.info("Error: No command"); + return new InterpreterResult(Code.ERROR, Type.TEXT, "No command"); + } + + // Simple parsing of the command + + if (args.command.equals("cd")) { + + String newPath = !args.args.isEmpty() ? getNewPath(args.args.get(0)) : currentDir; + if (!isDirectory(newPath)) + return new InterpreterResult(Code.ERROR, Type.TEXT, newPath + ": No such directory"); + + currentDir = newPath; + return new InterpreterResult(Code.SUCCESS, Type.TEXT, "OK"); + + } else if (args.command.equals("ls")) { + + String newPath = !args.args.isEmpty() ? getNewPath(args.args.get(0)) : currentDir; + try { + String results = listAll(newPath); + return new InterpreterResult(Code.SUCCESS, Type.TEXT, results); + } catch (Exception e) { + logger.error("Error listing files in path " + newPath, e); + return new InterpreterResult(Code.ERROR, Type.TEXT, e.getMessage()); + } + + } else if (args.command.equals("pwd")) { + + return new InterpreterResult(Code.SUCCESS, Type.TEXT, currentDir); + + } else { + + return new InterpreterResult(Code.ERROR, Type.TEXT, "Unknown command"); + + } + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + FileInterpreter.class.getName() + this.hashCode()); + } + + @Override + public List completion(String buf, int cursor) { + return null; + } +} diff --git a/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java b/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java new file mode 100644 index 00000000000..94508cd0364 --- /dev/null +++ b/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.file; + +import java.net.URL; +import java.net.HttpURLConnection; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import javax.ws.rs.core.UriBuilder; +import org.slf4j.Logger; + +/** + * Definition and HTTP invocation methods for all WebHDFS commands + * + */ +public class HDFSCommand { + + /** + * Type of HTTP request + */ + public enum HttpType { + GET, + PUT + } + + /** + * Definition of WebHDFS operator + */ + public class Op { + public String op; + public HttpType cmd; + public int minArgs; + + public Op(String op, HttpType cmd, int minArgs) { + this.op = op; + this.cmd = cmd; + this.minArgs = minArgs; + } + } + + /** + * Definition of argument to an operator + */ + public class Arg { + public String key; + public String value; + + public Arg(String key, String value) { + this.key = key; + this.value = value; + } + } + + // How to connect to WebHDFS + String url = null; + String user = null; + int maxLength = 0; + Logger logger; + + // Define all the commands available + public Op getFileStatus = new Op("GETFILESTATUS", HttpType.GET, 0); + public Op listStatus = new Op("LISTSTATUS", HttpType.GET, 0); + + public HDFSCommand(String url, String user, Logger logger, int maxLength) { + super(); + this.url = url; + this.user = user; + this.maxLength = maxLength; + this.logger = logger; + } + + public String checkArgs(Op op, String path, Arg[] args) throws Exception { + if (op == null || + path == null || + (op.minArgs > 0 && + (args == null || + args.length != op.minArgs))) + { + String a = ""; + a = (op != null) ? a + op.op + "\n" : a; + a = (path != null) ? a + path + "\n" : a; + a = (args != null) ? a + args + "\n" : a; + return a; + } + return null; + } + + + // The operator that runs all commands + public String runCommand(Op op, String path, Arg[] args) throws Exception { + + // Check arguments + String error = checkArgs(op, path, args); + if (error != null) { + logger.error("Bad arguments to command: " + error); + return "ERROR: BAD ARGS"; + } + + // Build URI + UriBuilder builder = UriBuilder + .fromPath(url) + .path(path) + .queryParam("op", op.op); + + if (args != null) { + for (Arg a : args) { + builder = builder.queryParam(a.key, a.value); + } + } + java.net.URI uri = builder.build(); + + // Connect and get response string + URL hdfsUrl = uri.toURL(); + HttpURLConnection con = (HttpURLConnection) hdfsUrl.openConnection(); + + if (op.cmd == HttpType.GET) { + con.setRequestMethod("GET"); + int responseCode = con.getResponseCode(); + logger.info("Sending 'GET' request to URL : " + hdfsUrl); + logger.info("Response Code : " + responseCode); + + BufferedReader in = new BufferedReader( + new InputStreamReader(con.getInputStream())); + String inputLine; + StringBuffer response = new StringBuffer(); + + int i = 0; + while ((inputLine = in.readLine()) != null) { + if (inputLine.length() < maxLength) + response.append(inputLine); + i++; + if (i >= maxLength) + break; + } + in.close(); + return response.toString(); + } + return null; + } +} diff --git a/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java b/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java new file mode 100644 index 00000000000..245093eb84c --- /dev/null +++ b/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.file; + +import java.text.SimpleDateFormat; +import java.util.*; + +import com.google.gson.Gson; +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; + +/** + * HDFS implementation of File interpreter for Zeppelin. + * + */ +public class HDFSFileInterpreter extends FileInterpreter { + static final String HDFS_URL = "hdfs.url"; + static final String HDFS_USER = "hdfs.user"; + static final String HDFS_MAXLENGTH = "hdfs.maxlength"; + + static { + Interpreter.register( + "hdfs", + "file", + HDFSFileInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add(HDFS_URL, "http://localhost:50070/webhdfs/v1/", "The URL for WebHDFS") + .add(HDFS_USER, "hdfs", "The WebHDFS user") + .add(HDFS_MAXLENGTH, "1000", "Maximum number of lines of results fetched").build()); + } + + Exception exceptionOnConnect = null; + HDFSCommand cmd = null; + Gson gson = null; + + public void prepare() { + String userName = getProperty(HDFS_USER); + String hdfsUrl = getProperty(HDFS_URL); + int i = Integer.parseInt(getProperty(HDFS_MAXLENGTH)); + cmd = new HDFSCommand(hdfsUrl, userName, logger, i); + gson = new Gson(); + } + + public HDFSFileInterpreter(Properties property){ + super(property); + prepare(); + } + + /** + * Status of one file + * + * matches returned JSON + */ + public class OneFileStatus { + public long accessTime; + public int blockSize; + public int childrenNum; + public int fileId; + public String group; + public long length; + public long modificationTime; + public String owner; + public String pathSuffix; + public String permission; + public int replication; + public int storagePolicy; + public String type; + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("\nAccessTime = " + accessTime); + sb.append("\nBlockSize = " + blockSize); + sb.append("\nChildrenNum = " + childrenNum); + sb.append("\nFileId = " + fileId); + sb.append("\nGroup = " + group); + sb.append("\nLength = " + length); + sb.append("\nModificationTime = " + modificationTime); + sb.append("\nOwner = " + owner); + sb.append("\nPathSuffix = " + pathSuffix); + sb.append("\nPermission = " + permission); + sb.append("\nReplication = " + replication); + sb.append("\nStoragePolicy = " + storagePolicy); + sb.append("\nType = " + type); + return sb.toString(); + } + } + + /** + * Status of one file + * + * matches returned JSON + */ + public class SingleFileStatus { + public OneFileStatus FileStatus; + } + + /** + * Status of all files in a directory + * + * matches returned JSON + */ + public class MultiFileStatus { + public OneFileStatus[] FileStatus; + } + + /** + * Status of all files in a directory + * + * matches returned JSON + */ + public class AllFileStatus { + public MultiFileStatus FileStatuses; + } + + // tests whether we're able to connect to HDFS + + private void testConnection() { + try { + if (isDirectory("/")) + logger.info("Successfully created WebHDFS connection"); + } catch (Exception e) { + logger.error("testConnection: Cannot open WebHDFS connection. Bad URL: " + "/", e); + exceptionOnConnect = e; + } + } + + @Override + public void open() { + testConnection(); + } + + @Override + public void close() { + } + + private String listDir(String path) throws Exception { + return cmd.runCommand(cmd.listStatus, path, null); + } + + private String listPermission(OneFileStatus fs){ + StringBuilder sb = new StringBuilder(); + sb.append(fs.type.equalsIgnoreCase("Directory") ? 'd' : '-'); + int p = Integer.parseInt(fs.permission, 16); + sb.append(((p & 0x400) == 0) ? '-' : 'r'); + sb.append(((p & 0x200) == 0) ? '-' : 'w'); + sb.append(((p & 0x100) == 0) ? '-' : 'x'); + sb.append(((p & 0x40) == 0) ? '-' : 'r'); + sb.append(((p & 0x20) == 0) ? '-' : 'w'); + sb.append(((p & 0x10) == 0) ? '-' : 'x'); + sb.append(((p & 0x4) == 0) ? '-' : 'r'); + sb.append(((p & 0x2) == 0) ? '-' : 'w'); + sb.append(((p & 0x1) == 0) ? '-' : 'x'); + return sb.toString(); + } + private String listDate(OneFileStatus fs) { + return new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(fs.modificationTime)); + } + private String ListOne(String path, OneFileStatus fs) { + if (args.flags.contains(new Character('l'))) { + StringBuilder sb = new StringBuilder(); + sb.append(listPermission(fs) + "\t"); + sb.append(((fs.replication == 0) ? "-" : fs.replication) + "\t "); + sb.append(fs.owner + "\t"); + sb.append(fs.group + "\t"); + if (args.flags.contains(new Character('h'))){ //human readable + sb.append(humanReadableByteCount(fs.length) + "\t\t"); + } else { + sb.append(fs.length + "\t"); + } + sb.append(listDate(fs) + "GMT\t"); + sb.append((path.length() == 1) ? path + fs.pathSuffix : path + '/' + fs.pathSuffix); + return sb.toString(); + } + return fs.pathSuffix; + } + + private String humanReadableByteCount(long bytes) { + int unit = 1024; + if (bytes < unit) return bytes + " B"; + int exp = (int) (Math.log(bytes) / Math.log(unit)); + String pre = "KMGTPE".charAt(exp - 1) + ""; + return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre); + } + + public String listFile(String filePath) { + try { + String str = cmd.runCommand(cmd.getFileStatus, filePath, null); + SingleFileStatus sfs = gson.fromJson(str, SingleFileStatus.class); + if (sfs != null) { + return ListOne(filePath, sfs.FileStatus); + } + } catch (Exception e) { + logger.error("listFile: " + filePath, e); + } + return "No such File or directory"; + } + + public String listAll(String path) { + String all = ""; + if (exceptionOnConnect != null) + return "Error connecting to provided endpoint."; + try { + //see if directory. + if (isDirectory(path)) { + String sfs = listDir(path); + if (sfs != null) { + AllFileStatus allFiles = gson.fromJson(sfs, AllFileStatus.class); + + if (allFiles != null && + allFiles.FileStatuses != null && + allFiles.FileStatuses.FileStatus != null) + { + for (OneFileStatus fs : allFiles.FileStatuses.FileStatus) + all = all + ListOne(path, fs) + '\n'; + } + } + return all; + } else { + return listFile(path); + } + } catch (Exception e) { + logger.error("listall: listDir " + path, e); + throw new InterpreterException("Could not find file or directory:\t" + path); + } + } + + public boolean isDirectory(String path) { + boolean ret = false; + if (exceptionOnConnect != null) + return ret; + try { + String str = cmd.runCommand(cmd.getFileStatus, path, null); + SingleFileStatus sfs = gson.fromJson(str, SingleFileStatus.class); + if (sfs != null) + return sfs.FileStatus.type.equals("DIRECTORY"); + } catch (Exception e) { + logger.error("IsDirectory: " + path, e); + return false; + } + return ret; + } + + + @Override + public List completion(String buf, int cursor) { + logger.info("Completion request at position\t" + cursor + " in string " + buf); + final List suggestions = new ArrayList<>(); + if (StringUtils.isEmpty(buf)) { + suggestions.add("ls"); + suggestions.add("cd"); + suggestions.add("pwd"); + return suggestions; + } + + //part of a command == no spaces + if (buf.split(" ").length == 1){ + if ("cd".contains(buf)) suggestions.add("cd"); + if ("ls".contains(buf)) suggestions.add("ls"); + if ("pwd".contains(buf)) suggestions.add("pwd"); + + return suggestions; + } + + + // last word will contain the path we're working with. + String lastToken = buf.substring(buf.lastIndexOf(" ") + 1); + if (lastToken.startsWith("-")) { //flag not path + return null; + } + + String localPath = ""; //all things before the last '/' + String unfinished = lastToken; //unfished filenames or directories + if (lastToken.contains("/")) { + localPath = lastToken.substring(0, lastToken.lastIndexOf('/') + 1); + unfinished = lastToken.substring(lastToken.lastIndexOf('/') + 1); + } + String globalPath = getNewPath(localPath); //adjust for cwd + + if (isDirectory(globalPath)){ + try { + String fileStatusString = listDir(globalPath); + if (fileStatusString != null) { + AllFileStatus allFiles = gson.fromJson(fileStatusString, AllFileStatus.class); + + if (allFiles != null && + allFiles.FileStatuses != null && + allFiles.FileStatuses.FileStatus != null) + { + for (OneFileStatus fs : allFiles.FileStatuses.FileStatus) { + if (fs.pathSuffix.contains(unfinished)) { + + //only suggest the text after the last . + String beforeLastPeriod = unfinished.substring(0, unfinished.lastIndexOf('.') + 1); + //beforeLastPeriod should be the start of fs.pathSuffix, so take the end of it. + String suggestedFinish = fs.pathSuffix.substring(beforeLastPeriod.length()); + suggestions.add(suggestedFinish); + } + } + return suggestions; + } + } + } catch (Exception e) { + logger.error("listall: listDir " + globalPath, e); + return null; + } + } else { + logger.info("path is not a directory. No values suggested."); + } + + //Error in string. + return null; + } +} diff --git a/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java b/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java new file mode 100644 index 00000000000..3c87fa69cfa --- /dev/null +++ b/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.file; + +import com.google.gson.Gson; +import junit.framework.TestCase; +import static org.junit.Assert.*; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.Test; +import org.slf4j.Logger; +import java.util.HashMap; +import java.util.Properties; +import java.lang.Override; +import java.lang.String; + + +/** + * Tests Interpreter by running pre-determined commands against mock file system + * + */ +public class HDFSFileInterpreterTest extends TestCase { + + @Test + public void test() { + HDFSFileInterpreter t = new MockHDFSFileInterpreter(new Properties()); + t.open(); + + // We have info for /, /user, /tmp, /mr-history/done + + // Ensure + // 1. ls -l works + // 2. paths (. and ..) are correctly handled + // 3. flags and arguments to commands are correctly handled + + InterpreterResult result1 = t.interpret("ls -l /", null); + assertEquals(result1.type(), InterpreterResult.Type.TEXT); + + InterpreterResult result2 = t.interpret("ls -l /./user/..", null); + assertEquals(result2.type(), InterpreterResult.Type.TEXT); + + assertEquals(result1.message(), result2.message()); + + // Ensure you can do cd and after that the ls uses current directory correctly + + InterpreterResult result3 = t.interpret("cd user", null); + assertEquals(result3.type(), InterpreterResult.Type.TEXT); + assertEquals(result3.message(), "OK"); + + InterpreterResult result4 = t.interpret("ls", null); + assertEquals(result4.type(), InterpreterResult.Type.TEXT); + + InterpreterResult result5 = t.interpret("ls /user", null); + assertEquals(result5.type(), InterpreterResult.Type.TEXT); + + assertEquals(result4.message(), result5.message()); + + // Ensure pwd works correctly + + InterpreterResult result6 = t.interpret("pwd", null); + assertEquals(result6.type(), InterpreterResult.Type.TEXT); + assertEquals(result6.message(), "/user"); + + // Move a couple of levels and check we're in the right place + + InterpreterResult result7 = t.interpret("cd ../mr-history/done", null); + assertEquals(result7.type(), InterpreterResult.Type.TEXT); + assertEquals(result7.message(), "OK"); + + InterpreterResult result8 = t.interpret("ls -l ", null); + assertEquals(result8.type(), InterpreterResult.Type.TEXT); + + InterpreterResult result9 = t.interpret("ls -l /mr-history/done", null); + assertEquals(result9.type(), InterpreterResult.Type.TEXT); + + assertEquals(result8.message(), result9.message()); + + InterpreterResult result10 = t.interpret("cd ../..", null); + assertEquals(result10.type(), InterpreterResult.Type.TEXT); + assertEquals(result7.message(), "OK"); + + InterpreterResult result11 = t.interpret("ls -l ", null); + assertEquals(result11.type(), InterpreterResult.Type.TEXT); + + // we should be back to first result after all this navigation + assertEquals(result1.message(), result11.message()); + + t.close(); + } + } + + /** + * Store command results from curl against a real file system + */ + class MockFileSystem { + HashMap mfs = new HashMap(); + void addListStatusData() { + mfs.put("/?op=LISTSTATUS", + "{\"FileStatuses\":{\"FileStatus\":[\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16389,\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1438548219672,\"owner\":\"yarn\",\"pathSuffix\":\"app-logs\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16395,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548030045,\"owner\":\"hdfs\",\"pathSuffix\":\"hdp\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16390,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547985336,\"owner\":\"mapred\",\"pathSuffix\":\"mapred\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":2,\"fileId\":16392,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547985346,\"owner\":\"hdfs\",\"pathSuffix\":\"mr-history\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16400,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548089725,\"owner\":\"hdfs\",\"pathSuffix\":\"system\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16386,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548150089,\"owner\":\"hdfs\",\"pathSuffix\":\"tmp\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16387,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547921792,\"owner\":\"hdfs\",\"pathSuffix\":\"user\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" + + "]}}" + ); + mfs.put("/user?op=LISTSTATUS", + "{\"FileStatuses\":{\"FileStatus\":[\n" + + " {\"accessTime\":0,\"blockSize\":0,\"childrenNum\":4,\"fileId\":16388,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253161263,\"owner\":\"ambari-qa\",\"pathSuffix\":\"ambari-qa\",\"permission\":\"770\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" + + " ]}}" + ); + mfs.put("/tmp?op=LISTSTATUS", + "{\"FileStatuses\":{\"FileStatus\":[\n" + + " {\"accessTime\":1441253097489,\"blockSize\":134217728,\"childrenNum\":0,\"fileId\":16400,\"group\":\"hdfs\",\"length\":1645,\"modificationTime\":1441253097517,\"owner\":\"hdfs\",\"pathSuffix\":\"ida8c06540_date040315\",\"permission\":\"755\",\"replication\":3,\"storagePolicy\":0,\"type\":\"FILE\"}\n" + + " ]}}" + ); + mfs.put("/mr-history/done?op=LISTSTATUS", + "{\"FileStatuses\":{\"FileStatus\":[\n" + + "{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16433,\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1441253197481,\"owner\":\"mapred\",\"pathSuffix\":\"2015\",\"permission\":\"770\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" + + "]}}" + ); + } + void addGetFileStatusData() { + mfs.put("/?op=GETFILESTATUS", + "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":7,\"fileId\":16385,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548089725,\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); + mfs.put("/user?op=GETFILESTATUS", + "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16387,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253043188,\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); + mfs.put("/tmp?op=GETFILESTATUS", + "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16386,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253097489,\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); + mfs.put("/mr-history/done?op=GETFILESTATUS", + "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16393,\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1441253197480,\"owner\":\"mapred\",\"pathSuffix\":\"\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); + } + public void addMockData(HDFSCommand.Op op) { + if (op.op.equals("LISTSTATUS")) { + addListStatusData(); + } else if (op.op.equals("GETFILESTATUS")) { + addGetFileStatusData(); + } + // do nothing + } + public String get(String key) { + return mfs.get(key); + } + } + + /** + * Run commands against mock file system that simulates webhdfs responses + */ + class MockHDFSCommand extends HDFSCommand { + MockFileSystem fs = null; + + public MockHDFSCommand(String url, String user, Logger logger) { + super(url, user, logger, 1000); + fs = new MockFileSystem(); + fs.addMockData(getFileStatus); + fs.addMockData(listStatus); + } + + @Override + public String runCommand(Op op, String path, Arg[] args) throws Exception { + + String error = checkArgs(op, path, args); + assertNull(error); + + String c = path + "?op=" + op.op; + + if (args != null) { + for (Arg a : args) { + c += "&" + a.key + "=" + a.value; + } + } + return fs.get(c); + } + } + + /** + * Mock Interpreter - uses Mock HDFS command + */ + class MockHDFSFileInterpreter extends HDFSFileInterpreter { + + @Override + public void prepare() { + // Run commands against mock File System instead of WebHDFS + cmd = new MockHDFSCommand("", "", logger); + gson = new Gson(); + } + + public MockHDFSFileInterpreter(Properties property) { + super(property); + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index dfb5eb9c35c..974994b5f2b 100755 --- a/pom.xml +++ b/pom.xml @@ -98,6 +98,7 @@ postgresql jdbc tajo + file flink ignite kylin diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE index a8dc0ed1e97..84289c03708 100644 --- a/zeppelin-distribution/src/bin_license/LICENSE +++ b/zeppelin-distribution/src/bin_license/LICENSE @@ -57,6 +57,7 @@ The following components are provided under Apache License. (Apache 2.0) javax.servlet (org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016 - http://www.eclipse.org/jetty) (Apache 2.0) Joda-Time (joda-time:joda-time:2.8.1 - http://www.joda.org/joda-time/) (Apache 2.0) Jackson (org.codehaus.jackson:jackson-core-asl:1.9.13 - http://jackson.codehaus.org/) + (Apache 2.0) Javassist (org.javassist:javassist:jar:3.18.1-GA:compile - http://jboss-javassist.github.io/javassist/) (Apache 2.0) JetS3t (net.java.dev.jets3t:jets3t:jar:0.9.3) - http://www.jets3t.org/ (Apache 2.0) Jetty (org.eclipse.jetty:jetty - http://www.eclipse.org/jetty) (Apache 2.0) mx4j (mx4j:mx4j:jar:3.0.2) - http://mx4j.sourceforge.net/ @@ -187,7 +188,10 @@ CDDL license The following components are provided under the CDDL License. (CDDL 1.0) javax.activation (javax.activation:activation:jar:1.1.1 - http://java.sun.com/javase/technologies/desktop/javabeans/jaf/index.jsp) + (CDDL 1.0) java.annotation (javax.annotation:javax.annotation-api:jar:1.2:compile - http://jcp.org/en/jsr/detail?id=250) (CDDL 1.1) Jersey (com.sun.jersey:jersey:jar:1.9 - https://jersey.java.net/) + (CDDL 1.1) jersey-core (org.glassfish.jersey.core:jersey-core:2.22.2 - https://jersey.java.net/) + (CDDL 1.1) hk2 (org.glassfish.hk2 - https://hk2.java.net/2.5.0-b03/) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index a9c300b1d8a..d10a0727744 100755 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -446,6 +446,7 @@ public static enum ConfVars { + "org.apache.zeppelin.shell.ShellInterpreter," + "org.apache.zeppelin.hive.HiveInterpreter," + "org.apache.zeppelin.alluxio.AlluxioInterpreter," + + "org.apache.zeppelin.file.HDFSFileInterpreter," + "org.apache.zeppelin.phoenix.PhoenixInterpreter," + "org.apache.zeppelin.postgresql.PostgreSqlInterpreter," + "org.apache.zeppelin.tajo.TajoInterpreter,"