diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index cbae4e59002..fdd458c9358 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -271,7 +271,7 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.python.PythonInterpreterPandasSql,org.apache.zeppelin.python.PythonCondaInterpreter,org.apache.zeppelin.python.PythonDockerInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivyPySpark3Interpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter,org.apache.zeppelin.pig.PigQueryInterpreter,org.apache.zeppelin.scio.ScioInterpreter,org.apache.zeppelin.groovy.GroovyInterpreter + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.WebHDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.python.PythonInterpreterPandasSql,org.apache.zeppelin.python.PythonCondaInterpreter,org.apache.zeppelin.python.PythonDockerInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivyPySpark3Interpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter,org.apache.zeppelin.pig.PigQueryInterpreter,org.apache.zeppelin.scio.ScioInterpreter,org.apache.zeppelin.groovy.GroovyInterpreter Comma separated interpreter configurations. First interpreter become a default diff --git a/docs/setup/storage/storage.md b/docs/setup/storage/storage.md index 269bc4624e1..265456cf2dd 100644 --- a/docs/setup/storage/storage.md +++ b/docs/setup/storage/storage.md @@ -33,6 +33,7 @@ There are few notebook storage systems available for a use out of the box: * storage using Amazon S3 service - `S3NotebookRepo` * storage using Azure service - `AzureNotebookRepo` * storage using MongoDB - `MongoNotebookRepo` + * storage using Web HDFS - `WebHdfsNotebookRepo` Multiple storage systems can be used at the same time by providing a comma-separated list of the class-names in the configuration. By default, only first two of them will be automatically kept in sync by Zeppelin. @@ -246,7 +247,51 @@ Optionally, you can specify Azure folder structure name in the file **zeppelin-s ```
+ +## Notebook Storage in Web Hdfs + +To enable your notebooks to be stored on HDFS - uncomment the next property in `zeppelin-site.xml` in order to use WebHdfsNotebookRepo class: + +``` + +zeppelin.notebook.storage +org.apache.zeppelin.notebook.repo.WebHdfsNotebookRepo +notebook persistence layer implementation + + + +hdfs.url +http://localhost:50070/webhdfs/v1/ +HDFS url + + + +hdfs.user +hdfs +HDFS user + + + +hdfs.maxlength +1000 +Maximum number of lines of results fetched + + +``` + +and replace the notebook directory property below by an absolute HDFS location as follows : +``` + +zeppelin.notebook.dir +/tmp/notebook +path or URI for notebook persist + +``` + +
+ ## Notebook Storage in ZeppelinHub +>>>>>>> 1c23f2138834bba3a08b365c59d861687c289589:docs/setup/storage/storage.md ZeppelinHub storage layer allows out of the box connection of Zeppelin instance with your ZeppelinHub account. First of all, you need to either comment out the following property in **zeppelin-site.xml**: diff --git a/file/pom.xml b/file/pom.xml index 2493c1fae4c..3939401d502 100644 --- a/file/pom.xml +++ b/file/pom.xml @@ -33,7 +33,6 @@ - 2.0 2.22.2 @@ -49,12 +48,12 @@ - javax.ws.rs - javax.ws.rs-api - ${ws.rsapi.version} + org.glassfish.jersey.core + jersey-common + ${jersey.common.version} - + org.slf4j slf4j-api @@ -64,12 +63,6 @@ slf4j-log4j12 - - org.glassfish.jersey.core - jersey-common - ${jersey.common.version} - - junit junit diff --git a/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java b/file/src/main/java/org/apache/zeppelin/file/WebHDFSFileInterpreter.java similarity index 98% rename from file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java rename to file/src/main/java/org/apache/zeppelin/file/WebHDFSFileInterpreter.java index 244101c9bda..cc61985ca0e 100644 --- a/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java +++ b/file/src/main/java/org/apache/zeppelin/file/WebHDFSFileInterpreter.java @@ -32,24 +32,24 @@ * HDFS implementation of File interpreter for Zeppelin. * */ -public class HDFSFileInterpreter extends FileInterpreter { +public class WebHDFSFileInterpreter extends FileInterpreter { static final String HDFS_URL = "hdfs.url"; static final String HDFS_USER = "hdfs.user"; static final String HDFS_MAXLENGTH = "hdfs.maxlength"; Exception exceptionOnConnect = null; - HDFSCommand cmd = null; + WebHDFSCommand cmd = null; Gson gson = null; public void prepare() { String userName = getProperty(HDFS_USER); String hdfsUrl = getProperty(HDFS_URL); int i = Integer.parseInt(getProperty(HDFS_MAXLENGTH)); - cmd = new HDFSCommand(hdfsUrl, userName, logger, i); + cmd = new WebHDFSCommand(hdfsUrl, userName, logger, i); gson = new Gson(); } - public HDFSFileInterpreter(Properties property){ + public WebHDFSFileInterpreter(Properties property){ super(property); prepare(); } diff --git a/file/src/main/resources/interpreter-setting.json b/file/src/main/resources/interpreter-setting.json index ebe5cf6ee3d..a835b374ef2 100644 --- a/file/src/main/resources/interpreter-setting.json +++ b/file/src/main/resources/interpreter-setting.json @@ -2,7 +2,7 @@ { "group": "file", "name": "hdfs", - "className": "org.apache.zeppelin.file.HDFSFileInterpreter", + "className": "org.apache.zeppelin.file.WebHDFSFileInterpreter", "properties": { "hdfs.url": { "envName": null, diff --git a/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java b/file/src/test/java/org/apache/zeppelin/file/WebHDFSFileInterpreterTest.java similarity index 96% rename from file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java rename to file/src/test/java/org/apache/zeppelin/file/WebHDFSFileInterpreterTest.java index adc9bd6b55f..619523a427d 100644 --- a/file/src/test/java/org/apache/zeppelin/file/HDFSFileInterpreterTest.java +++ b/file/src/test/java/org/apache/zeppelin/file/WebHDFSFileInterpreterTest.java @@ -40,12 +40,12 @@ * Tests Interpreter by running pre-determined commands against mock file system * */ -public class HDFSFileInterpreterTest extends TestCase { +public class WebHDFSFileInterpreterTest extends TestCase { @Test public void testMaxLength() { - HDFSFileInterpreter t = new MockHDFSFileInterpreter(new Properties()); + WebHDFSFileInterpreter t = new MockHDFSFileInterpreter(new Properties()); t.open(); InterpreterResult result = t.interpret("ls -l /", null); String lineSeparator = "\n"; @@ -56,7 +56,7 @@ public void testMaxLength() { Properties properties = new Properties(); final int maxLength = fileStatusLength - 2; properties.setProperty("hdfs.maxlength", String.valueOf(maxLength)); - HDFSFileInterpreter t1 = new MockHDFSFileInterpreter(properties); + WebHDFSFileInterpreter t1 = new MockHDFSFileInterpreter(properties); t1.open(); InterpreterResult result1 = t1.interpret("ls -l /", null); assertEquals(result1.message().get(0).getData().split(lineSeparator).length, maxLength); @@ -65,7 +65,7 @@ public void testMaxLength() { @Test public void test() { - HDFSFileInterpreter t = new MockHDFSFileInterpreter(new Properties()); + WebHDFSFileInterpreter t = new MockHDFSFileInterpreter(new Properties()); t.open(); // We have info for /, /user, /tmp, /mr-history/done @@ -186,7 +186,7 @@ void addGetFileStatusData() { mfs.put("/mr-history/done?op=GETFILESTATUS", "{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16393,\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1441253197480,\"owner\":\"mapred\",\"pathSuffix\":\"\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}"); } - public void addMockData(HDFSCommand.Op op) { + public void addMockData(WebHDFSCommand.Op op) { if (op.op.equals("LISTSTATUS")) { addListStatusData(); } else if (op.op.equals("GETFILESTATUS")) { @@ -202,7 +202,7 @@ public String get(String key) { /** * Run commands against mock file system that simulates webhdfs responses */ - class MockHDFSCommand extends HDFSCommand { + class MockHDFSCommand extends WebHDFSCommand { MockFileSystem fs = null; public MockHDFSCommand(String url, String user, Logger logger, int maxLength) { @@ -236,7 +236,7 @@ public String runCommand(Op op, String path, Arg[] args) throws Exception { /** * Mock Interpreter - uses Mock HDFS command */ - class MockHDFSFileInterpreter extends HDFSFileInterpreter { + class MockHDFSFileInterpreter extends WebHDFSFileInterpreter { @Override public void prepare() { diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index 109099cfc7b..fc6a3d53513 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -231,4 +231,4 @@ test - + \ No newline at end of file diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/file/HDFSStatus.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/file/HDFSStatus.java new file mode 100644 index 00000000000..2674d8bf222 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/file/HDFSStatus.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.file; + +/** + * Class that hosts HDFS file statuses + */ +public class HDFSStatus { + /** + * Status of one file + * + * matches returned JSON + */ + public class OneFileStatus { + public long accessTime; + public int blockSize; + public int childrenNum; + public int fileId; + public String group; + public long length; + public long modificationTime; + public String owner; + public String pathSuffix; + public String permission; + public int replication; + public int storagePolicy; + public String type; + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("\nAccessTime = ").append(accessTime); + sb.append("\nBlockSize = ").append(blockSize); + sb.append("\nChildrenNum = ").append(childrenNum); + sb.append("\nFileId = ").append(fileId); + sb.append("\nGroup = ").append(group); + sb.append("\nLength = ").append(length); + sb.append("\nModificationTime = ").append(modificationTime); + sb.append("\nOwner = ").append(owner); + sb.append("\nPathSuffix = ").append(pathSuffix); + sb.append("\nPermission = ").append(permission); + sb.append("\nReplication = ").append(replication); + sb.append("\nStoragePolicy = ").append(storagePolicy); + sb.append("\nType = ").append(type); + return sb.toString(); + } + } + + /** + * Status of one file + * + * matches returned JSON + */ + public class SingleFileStatus { + public OneFileStatus FileStatus; + } + + /** + * Status of all files in a directory + * + * matches returned JSON + */ + public class MultiFileStatus { + public OneFileStatus[] FileStatus; + } + + /** + * Status of all files in a directory + * + * matches returned JSON + */ + public class AllFileStatus { + public MultiFileStatus FileStatuses; + } + + +} + + diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/file/WebHDFSCommand.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/file/WebHDFSCommand.java new file mode 100644 index 00000000000..42f0377c986 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/file/WebHDFSCommand.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.file; + +import org.apache.http.client.utils.URIBuilder; +import org.slf4j.Logger; + +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; + +/** + * Definition and HTTP invocation methods for all WebHDFS commands + */ +public class WebHDFSCommand { + + /** + * Type of HTTP request + */ + public enum HttpType { + GET, + PUT, + DELETE + } + + /** + * Definition of WebHDFS operator + */ + public class Op { + public String op; + public HttpType cmd; + public int minArgs; + + public Op(String op, HttpType cmd, int minArgs) { + this.op = op; + this.cmd = cmd; + this.minArgs = minArgs; + } + } + + /** + * Definition of argument to an operator + */ + public class Arg { + public String key; + public String value; + + public Arg(String key, String value) { + this.key = key; + this.value = value; + } + } + + // How to connect to WebHDFS + String url = null; + String user = null; + int maxLength = 0; + Logger logger; + + // Define all the commands available + public Op getFileStatus = new Op("GETFILESTATUS", HttpType.GET, 0); + public Op listStatus = new Op("LISTSTATUS", HttpType.GET, 0); + public Op openFile = new Op("OPEN", HttpType.GET, 0); + public Op makeDirectory = new Op("MKDIRS", HttpType.PUT, 0); + public Op createWriteFile = new Op("CREATE", HttpType.PUT, 0); + public Op deleteFile = new Op("DELETE", HttpType.DELETE, 0); + public Op renameFile = new Op("RENAME", HttpType.PUT, 0); + + public WebHDFSCommand(String url, String user, Logger logger, int maxLength) { + super(); + this.url = url; + this.user = user; + this.maxLength = maxLength; + this.logger = logger; + } + + public String checkArgs(Op op, String path, Arg[] args) throws Exception { + if (op == null || + path == null || + (op.minArgs > 0 && + (args == null || + args.length != op.minArgs))) { + String a = ""; + a = (op != null) ? a + op.op + "\n" : a; + a = (path != null) ? a + path + "\n" : a; + a = (args != null) ? a + args + "\n" : a; + return a; + } + return null; + } + + + public String runCommand(Op op, String path, Arg[] args) throws Exception { + return runCommand(op, path, null, args); + } + + public String runCommand(Op op, String path, byte[] argFile, Arg[] args) throws Exception { + // Check arguments + String error = checkArgs(op, path, args); + if (error != null) { + logger.error("Bad arguments to command: " + error); + return "ERROR: BAD ARGS"; + } + + + // Build URI + String finalUrl = url; + if (url.endsWith("/") && path.startsWith("/")) + finalUrl += path.substring(1); + else + finalUrl += path; + + URIBuilder uriBuilder = new URIBuilder(finalUrl) + .addParameter("op", op.op) + .addParameter("user", this.user); + + if (args != null) { + boolean isUserName = false; + for (Arg a : args) { + uriBuilder.addParameter(a.key, a.value); + if ("user.name".equals(a.key)) { + isUserName = true; + } + } + if (!isUserName) { + uriBuilder.addParameter("user.name", this.user); + } + } else { + uriBuilder.addParameter("user.name", this.user); + } + java.net.URI uri = uriBuilder.build(); + // Connect and get response string + URL hdfsUrl = uri.toURL(); + HttpURLConnection con = (HttpURLConnection) hdfsUrl.openConnection(); + + if (op.cmd == HttpType.GET) { + con.setRequestMethod("GET"); + con.setInstanceFollowRedirects(true); + + String result = getReceivedResponse(con, HttpType.GET, hdfsUrl); + return result; + } else if (op.cmd == HttpType.PUT) { + con.setRequestMethod("PUT"); + con.setInstanceFollowRedirects(false); + int responseCode = con.getResponseCode(); + String result = getReceivedResponse(con, HttpType.PUT, hdfsUrl); + + if (responseCode == 307 && ("CREATE".equals(op.op) || "APPEND".equals(op.op))) { + String location = con.getHeaderField("Location"); + logger.debug("Redirect Location: " + location); + + hdfsUrl = new URL(location); + con = (HttpURLConnection) hdfsUrl.openConnection(); + + con.setRequestMethod("PUT"); + con.setRequestProperty("Content-Type", "application/octet-stream"); + con.setRequestProperty("Transfer-Encoding", "chunked"); + con.setDoOutput(true); + + DataOutputStream outputStream = new DataOutputStream(con.getOutputStream()); + outputStream.write(argFile); + outputStream.flush(); + + result = getReceivedResponse(con, HttpType.PUT, hdfsUrl); + } + + return result; + } else if (op.cmd == HttpType.DELETE) { + con.setRequestMethod("DELETE"); + con.setDoInput(true); + con.setInstanceFollowRedirects(false); + return getReceivedResponse(con, HttpType.DELETE, hdfsUrl); + } + return null; + } + + private String getReceivedResponse(HttpURLConnection con, + HttpType type, URL url) throws IOException { + int responseCode = con.getResponseCode(); + + BufferedReader in; + if (responseCode == 200 || responseCode == 201 || responseCode == 307) { + logger.debug("Sending '{}' request to URL : {}", type.toString(), url); + logger.debug("Response Code : " + responseCode); + logger.debug("response message: " + con.getResponseMessage()); + in = new BufferedReader(new InputStreamReader(con.getInputStream())); + } else { + logger.info("Sending '{}' request to URL : {}", type.toString(), url); + logger.info("Response Code : " + responseCode); + logger.info("response message: " + con.getResponseMessage()); + in = new BufferedReader(new InputStreamReader(con.getErrorStream())); + } + String inputLine; + StringBuffer response = new StringBuffer(); + int i = 0; + while ((inputLine = in.readLine()) != null) { + if (inputLine.length() < maxLength) { + response.append(inputLine); + } + i++; + if (i >= maxLength) { + logger.warn("Input stream's length(" + inputLine.length() + + ") is greater than or equal to hdfs.maxlength(" + maxLength + + "). Please increase hdfs.maxlength in interpreter setting"); + break; + } + } + in.close(); + + return response.toString(); + } +} diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index 07eaab0c2f9..2b0d35ab1bc 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -486,7 +486,7 @@ - ../bin + ../bin @@ -498,10 +498,10 @@ - ../zeppelin-distribution/target/zeppelin-${project.version}/zeppelin-${project.version}/bin + ../zeppelin-distribution/target/zeppelin-${project.version}/zeppelin-${project.version}/bin - + \ No newline at end of file diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 73acdd57e27..5985d406474 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -591,7 +591,7 @@ public static enum ConfVars { + "org.apache.zeppelin.livy.LivyPySpark3Interpreter," + "org.apache.zeppelin.livy.LivySparkRInterpreter," + "org.apache.zeppelin.alluxio.AlluxioInterpreter," - + "org.apache.zeppelin.file.HDFSFileInterpreter," + + "org.apache.zeppelin.file.WebHDFSFileInterpreter," + "org.apache.zeppelin.pig.PigInterpreter," + "org.apache.zeppelin.pig.PigQueryInterpreter," + "org.apache.zeppelin.flink.FlinkInterpreter," diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/WebHdfsNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/WebHdfsNotebookRepo.java new file mode 100644 index 00000000000..5885e4c0c6a --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/WebHdfsNotebookRepo.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.NotebookImportDeserializer; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.scheduler.Job.Status; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.apache.zeppelin.util.WebHdfsSite; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * + */ + +public class WebHdfsNotebookRepo implements NotebookRepo { + private static Logger logger = LoggerFactory.getLogger(WebHdfsNotebookRepo.class); + private WebHdfsSite hdfsSite; + private ZeppelinConfiguration conf; + private String rootDir; + + + public WebHdfsNotebookRepo(ZeppelinConfiguration conf) throws IOException { + this.conf = conf; + try { + rootDir = removeProtocol(conf.getNotebookDir()); + hdfsSite = new WebHdfsSite(conf); + hdfsSite.mkdirs(rootDir); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + + public String removeProtocol(String hdfsUrl) { + String newUrl = hdfsUrl.replaceAll("/$", ""); + if (newUrl.startsWith("hdfs://")) { + return "/" + newUrl.replaceAll("^hdfs://", "").split("/", 2)[1]; + } else { + return newUrl; + } + } + + + @Override + public List list(AuthenticationInfo subject) throws IOException { + String[] children = hdfsSite.listFiles(rootDir); + List infos = new LinkedList<>(); + for (String child : children) { + String fileName = child; + if (fileName.startsWith(".") + || fileName.startsWith("#") + || fileName.startsWith("~")) { + // skip hidden, temporary files + continue; + } + + if (!hdfsSite.isDirectory(rootDir + "/" + child)) { + // currently single note is saved like, [NOTE_ID]/note.json. + // so it must be a directory + continue; + } + + NoteInfo info = null; + + try { + info = getNoteInfo(rootDir + "/" + child); + if (info != null) { + infos.add(info); + } + } catch (Exception e) { + logger.error("Can't read note " + fileName, e); + } + } + + return infos; + } + + private Note getNote(String noteDir) throws IOException { + if (!hdfsSite.isDirectory(noteDir)) { + throw new IOException(noteDir + " is not a directory"); + } + + String noteJson = noteDir + "/" + "note.json"; + if (!hdfsSite.exists(noteJson)) { + throw new IOException(noteJson + " not found"); + } + + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.setPrettyPrinting(); + Gson gson = gsonBuilder.registerTypeAdapter(Date.class, new NotebookImportDeserializer()) + .create(); + + byte[] content = hdfsSite.readFile(noteJson); + String json = new String(content, conf.getString(ConfVars.ZEPPELIN_ENCODING)); + + Note note = gson.fromJson(json, Note.class); + + for (Paragraph p : note.getParagraphs()) { + if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) { + p.setStatus(Status.ABORT); + } + } + return note; + } + + private NoteInfo getNoteInfo(String noteDir) throws IOException { + Note note = getNote(noteDir); + return new NoteInfo(note); + } + + @Override + public Note get(String noteId, AuthenticationInfo subject) throws IOException { + String path = rootDir + "/" + noteId; + return getNote(path); + } + + protected String getRootDir() throws IOException { + if (!hdfsSite.exists(rootDir)) { + throw new IOException("Root path does not exists"); + } + + if (!hdfsSite.isDirectory(rootDir)) { + throw new IOException("Root path is not a directory"); + } + return rootDir; + } + + @Override + public synchronized void save(Note note, AuthenticationInfo subject) throws IOException { + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.setPrettyPrinting(); + Gson gson = gsonBuilder.create(); + String json = gson.toJson(note); + + String noteDir = rootDir + "/" + note.getId(); + + if (!hdfsSite.exists(noteDir)) { + hdfsSite.mkdirs(noteDir); + } + if (!hdfsSite.isDirectory(noteDir)) { + throw new IOException(noteDir + " is not a directory"); + } + + String noteJson = noteDir + "/" + "note.json"; + hdfsSite.writeFile(json.getBytes(conf.getString(ConfVars.ZEPPELIN_ENCODING)), noteJson); + } + + @Override + public void remove(String noteId, AuthenticationInfo subject) throws IOException { + String noteDir = rootDir + "/" + noteId; + + if (!hdfsSite.exists(noteDir)) { + throw new IOException("Can not remove " + noteDir); + } + hdfsSite.delete(noteDir); + } + + @Override + public void close() { + + } + + @Override + public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject) + throws IOException { + return null; + } + + @Override + public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException { + return null; + } + + @Override + public List revisionHistory(String noteId, AuthenticationInfo subject) { + return null; + } + + @Override + public Note setNoteRevision(String noteId, String revId, AuthenticationInfo subject) + throws IOException { + return null; + } + + @Override + public List getSettings(AuthenticationInfo subject) { + return null; + } + + @Override + public void updateSettings(Map settings, AuthenticationInfo subject) { + + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/WebHdfsSite.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/WebHdfsSite.java new file mode 100644 index 00000000000..8b9250bfa9a --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/WebHdfsSite.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.util; + +import com.google.gson.Gson; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.file.WebHDFSCommand; +import org.apache.zeppelin.file.HDFSStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + + +/** + * Class to create / move / delete / write to / read from HDFS filessytem + */ +public class WebHdfsSite { + public static final String HDFS_URL = "hdfs.url"; + public static final String HDFS_USER = "hdfs.user"; + public static final String HDFS_MAXLENGTH = "hdfs.maxlength"; + + private static Logger logger = LoggerFactory.getLogger(WebHdfsSite.class); + public static final String HDFS_NOTEBOOK_DIR = "hdfs.notebook.dir"; + static final String NOTE_JSON = "note.json"; + static final String NOTE_JSON_TEMP = "_note.json"; + + ZeppelinConfiguration conf; + boolean enableWebHDFS = true; + String hdfsUrl = null; + String hdfsUser = null; + int hdfsMaxLength = 0; + WebHDFSCommand hdfsCmd = null; + + + public WebHdfsSite(ZeppelinConfiguration conf) throws URISyntaxException { + this.conf = conf; + this.hdfsUrl = conf.getString(HDFS_URL, HDFS_URL, "http://localhost:50070/webhdfs/v1/"); + this.hdfsMaxLength = conf.getInt(HDFS_URL, HDFS_MAXLENGTH, 100000); + this.hdfsUser = System.getenv("HADOOP_USER_NAME"); + if (this.hdfsUser == null) { + this.hdfsUser = System.getenv("LOGNAME"); + } + if (this.hdfsUser == null) { + + this.enableWebHDFS = false; + } else { + this.hdfsCmd = new WebHDFSCommand(hdfsUrl, hdfsUser, logger, this.hdfsMaxLength); + this.enableWebHDFS = exists("/"); + } + } + + public boolean exists(String path) { + boolean ret = false; + HDFSStatus.SingleFileStatus fileStatus; + Gson gson = new Gson(); + try { + String notebookStatus = this.hdfsCmd.runCommand(this.hdfsCmd.getFileStatus, path, null); + fileStatus = gson.fromJson(notebookStatus, HDFSStatus.SingleFileStatus.class); + ret = fileStatus.FileStatus.modificationTime > 0; + } catch (Exception e) { + logger.info("disabled webHDFS. Please check webhdfs configurations"); + ret = false; + } finally { + return ret; + } + } + + + public String[] listFiles(String directory) throws IOException { + List hdfsNotebook = new ArrayList(); + Gson gson = new Gson(); + String hdfsDirStatus; + + try { + hdfsDirStatus = this.hdfsCmd.runCommand(this.hdfsCmd.listStatus, directory, null); + if (hdfsDirStatus != null) { + HDFSStatus.AllFileStatus allFiles = gson.fromJson(hdfsDirStatus, + HDFSStatus.AllFileStatus.class); + if (allFiles != null && allFiles.FileStatuses != null + && allFiles.FileStatuses.FileStatus != null) { + for (HDFSStatus.OneFileStatus fs : allFiles.FileStatuses.FileStatus) { + if ("DIRECTORY".equals(fs.type) && fs.pathSuffix.startsWith("_") == false) { + hdfsNotebook.add(fs.pathSuffix); + logger.info("read a notebook from HDFS: " + fs.pathSuffix); + } + } + } + } + } catch (Exception e) { + logger.error("exception occurred during getting notebook from hdfs : ", e); + } + return hdfsNotebook.toArray(new String[0]); + } + + public void delete(String path) throws IOException { + logger.debug("remove : " + path); + + WebHDFSCommand.Arg recursive = this.hdfsCmd.new Arg("recursive", "true"); + WebHDFSCommand.Arg[] args = {recursive}; + + try { + this.hdfsCmd.runCommand(this.hdfsCmd.deleteFile, path, args); + } catch (Exception e) { + logger.error("Exception: ", e); + throw new IOException(e.getCause()); + } + } + + public void mkdirs(String path) throws IOException { + + try { + this.hdfsCmd.runCommand(this.hdfsCmd.makeDirectory, path, null); + } catch (Exception e) { + logger.error("Exception: ", e); + throw new IOException(e.getCause()); + } + } + + + public void writeFile(byte[] content, String path) throws IOException { + try { + WebHDFSCommand.Arg dest = this.hdfsCmd.new Arg("overwrite", "true"); + WebHDFSCommand.Arg[] createArgs = {dest}; + this.hdfsCmd.runCommand(this.hdfsCmd.createWriteFile, path, content, createArgs); + } catch (Exception e) { + logger.error("Exception: ", e); + throw new IOException(e.getCause()); + } + } + + public void rename(String oldPath, String newPath) throws IOException { + try { + WebHDFSCommand.Arg dest = this.hdfsCmd.new Arg("destination", newPath); + WebHDFSCommand.Arg[] renameArgs = {dest}; + this.hdfsCmd.runCommand(this.hdfsCmd.renameFile, oldPath, renameArgs); + } catch (Exception e) { + logger.error("Exception: ", e); + throw new IOException(e.getCause()); + } finally { + } + } + + public boolean isDirectory(String path) throws IOException { + boolean ret = false; + HDFSStatus.SingleFileStatus fileStatus; + Gson gson = new Gson(); + try { + String notebookStatus = this.hdfsCmd.runCommand(this.hdfsCmd.getFileStatus, path, null); + fileStatus = gson.fromJson(notebookStatus, HDFSStatus.SingleFileStatus.class); + ret = fileStatus.FileStatus.type.equals("DIRECTORY"); + } catch (Exception e) { + logger.info("disabled webHDFS. Please check webhdfs configurations"); + ret = false; + } finally { + return ret; + } + } + + public byte[] readFile(String path) throws IOException { + byte[] res = new byte[0]; + try { + res = this.hdfsCmd.runCommand(this.hdfsCmd.openFile, path, null, null).getBytes(); + } catch (Exception e) { + logger.error("Exception: ", e); + throw new IOException(e.getCause()); + } finally { + return res; + } + } +}