diff --git a/docs/storage/storage.md b/docs/storage/storage.md index 76012fff4b4..1074c13797a 100644 --- a/docs/storage/storage.md +++ b/docs/storage/storage.md @@ -32,6 +32,7 @@ There are few notebook storage systems available for a use out of the box: * use local file system and version it using local Git repository - `GitNotebookRepo` * storage using Amazon S3 service - `S3NotebookRepo` * storage using Azure service - `AzureNotebookRepo` + * storage using Apache Hadoop HDFS - `HDFSNotebookRepo` Multiple storage systems can be used at the same time by providing a comma-separated list of the class-names in the configuration. By default, only first two of them will be automatically kept in sync by Zeppelin. @@ -210,6 +211,44 @@ Optionally, you can specify Azure folder structure name in the file **zeppelin-s ```
+ +## Notebook Storage in Apache Hadoop HDFS + +Notebooks may be stored in HDFS and local file system. + +To use this, set the following environment variable in the file **zeppelin-site.xml** + +``` + + zeppelin.notebook.storage + org.apache.zeppelin.notebook.repo.HDFSNotebookRepo + notebook persistence layer implementation + + + + hdfs.url + http://localhost:50070/webhdfs/v1/ + HDFS url + + + hdfs.user + hdfs + HDFS user + + + hdfs.maxlength + 1000 + Maximum number of lines of results fetched + + + hdfs.notebook.dir + /tmp + notebook location directory in HDFS + +``` + +
+ ## Storage in ZeppelinHub ZeppelinHub storage layer allows out of the box connection of Zeppelin instance with your ZeppelinHub account. First of all, you need to either comment out the following property in **zeppelin-site.xml**: diff --git a/file/pom.xml b/file/pom.xml index 0f7089b31f9..9e3ff945aac 100644 --- a/file/pom.xml +++ b/file/pom.xml @@ -70,6 +70,12 @@ ${jersey.common.version} + + org.apache.commons + commons-vfs2 + 2.0 + + junit junit diff --git a/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java b/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java index 94508cd0364..ed28668b64b 100644 --- a/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java +++ b/file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java @@ -18,12 +18,20 @@ package org.apache.zeppelin.file; -import java.net.URL; -import java.net.HttpURLConnection; +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.NameScope; +import org.slf4j.Logger; + +import javax.ws.rs.core.UriBuilder; import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; import java.io.InputStreamReader; -import javax.ws.rs.core.UriBuilder; -import org.slf4j.Logger; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; /** * Definition and HTTP invocation methods for all WebHDFS commands @@ -36,7 +44,8 @@ public class HDFSCommand { */ public enum HttpType { GET, - PUT + PUT, + DELETE } /** @@ -76,6 +85,11 @@ public Arg(String key, String value) { // Define all the commands available public Op getFileStatus = new Op("GETFILESTATUS", HttpType.GET, 0); public Op listStatus = new Op("LISTSTATUS", HttpType.GET, 0); + public Op openFile = new Op("OPEN", HttpType.GET, 0); + public Op makeDirectory = new Op("MKDIRS", HttpType.PUT, 0); + public Op createWriteFile = new Op("CREATE", HttpType.PUT, 0); + public Op deleteFile = new Op("DELETE", HttpType.DELETE, 0); + public Op renameFile = new Op("RENAME", HttpType.PUT, 0); public HDFSCommand(String url, String user, Logger logger, int maxLength) { super(); @@ -102,8 +116,22 @@ public String checkArgs(Op op, String path, Arg[] args) throws Exception { } + public String runCommand(Op op, String path, Arg[] args) throws Exception { + return runCommand(op, path, null, null, null, args); + } + + public String runCommand(Op op, String path, FileObject argFile, Arg[] args) throws Exception { + return runCommand(op, path, null, null, argFile, args); + } + + public String runCommand(Op op, String path, FileObject noteDir, String charsetName, + Arg[] args) throws Exception { + return runCommand(op, path, noteDir, charsetName, null, args); + } + // The operator that runs all commands - public String runCommand(Op op, String path, Arg[] args) throws Exception { + public String runCommand(Op op, String path, FileObject noteDir, String charsetName, + FileObject argFile, Arg[] args) throws Exception { // Check arguments String error = checkArgs(op, path, args); @@ -119,38 +147,131 @@ public String runCommand(Op op, String path, Arg[] args) throws Exception { .queryParam("op", op.op); if (args != null) { + boolean isUserName = false; for (Arg a : args) { builder = builder.queryParam(a.key, a.value); + if ("user.name".equals(a.key)) { + isUserName = true; + } + } + if (!isUserName) { + builder = builder.queryParam("user.name", this.user); } } + else { + builder = builder.queryParam("user.name", this.user); + } java.net.URI uri = builder.build(); // Connect and get response string URL hdfsUrl = uri.toURL(); HttpURLConnection con = (HttpURLConnection) hdfsUrl.openConnection(); + FileObject noteJson; + OutputStream out = null; if (op.cmd == HttpType.GET) { con.setRequestMethod("GET"); + con.setFollowRedirects(true); + + if ("OPEN".equals(op.op)) { + noteJson = noteDir.resolveFile("note.json", NameScope.CHILD); + out = noteJson.getContent().getOutputStream(false); + } + + String result = getReceivedResponse(con, HttpType.GET, hdfsUrl); + + if ("OPEN".equals(op.op)) { + out.write(result.getBytes()); + out.close(); + } + + return result; + } else if (op.cmd == HttpType.PUT) { + con.setRequestMethod("PUT"); + con.setFollowRedirects(false); int responseCode = con.getResponseCode(); - logger.info("Sending 'GET' request to URL : " + hdfsUrl); - logger.info("Response Code : " + responseCode); + String result = getReceivedResponse(con, HttpType.PUT, hdfsUrl); + + if (responseCode == 307 && ("CREATE".equals(op.op) || "APPEND".equals(op.op))) { + String location = con.getHeaderField("Location"); + logger.debug("Redirect Location: " + location); + + hdfsUrl = new URL(location); + con = (HttpURLConnection) hdfsUrl.openConnection(); + + File file = new File(argFile.getURL().toURI()); + FileInputStream fi = new FileInputStream(file); + + con.setRequestMethod("PUT"); + con.setRequestProperty("Content-Type", "application/octet-stream"); + con.setRequestProperty("Transfer-Encoding", "chunked"); + con.setDoOutput(true); - BufferedReader in = new BufferedReader( - new InputStreamReader(con.getInputStream())); - String inputLine; - StringBuffer response = new StringBuffer(); - - int i = 0; - while ((inputLine = in.readLine()) != null) { - if (inputLine.length() < maxLength) - response.append(inputLine); - i++; - if (i >= maxLength) - break; + DataOutputStream outputStream = new DataOutputStream(con.getOutputStream()); + + int bytesAvailable = fi.available(); + int maxBufferSize = 1024; + int bufferSize = Math.min(bytesAvailable, maxBufferSize); + byte[] buffer = new byte[bufferSize]; + + int bytesRead = fi.read(buffer, 0, bufferSize); + while (bytesRead > 0) { + outputStream.write(buffer, 0, bufferSize); + bytesAvailable = fi.available(); + bufferSize = Math.min(bytesAvailable, maxBufferSize); + bytesRead = fi.read(buffer, 0, bufferSize); + } + + fi.close(); + outputStream.flush(); + + result = getReceivedResponse(con, HttpType.PUT, hdfsUrl); } - in.close(); - return response.toString(); + + return result; + } else if (op.cmd == HttpType.DELETE) { + con.setRequestMethod("DELETE"); + con.setDoInput(true); + con.setInstanceFollowRedirects(false); + return getReceivedResponse(con, HttpType.DELETE, hdfsUrl); } + return null; } + + private String getReceivedResponse(HttpURLConnection con, + HttpType type, URL url) throws IOException { + int responseCode = con.getResponseCode(); + + BufferedReader in; + if (responseCode == 200 || responseCode == 201 || responseCode == 307) { + logger.debug("Sending '{}' request to URL : {}", type.toString(), url); + logger.debug("Response Code : " + responseCode); + logger.debug("response message: " + con.getResponseMessage()); + in = new BufferedReader(new InputStreamReader(con.getInputStream())); + } else { + logger.info("Sending '{}' request to URL : {}", type.toString(), url); + logger.info("Response Code : " + responseCode); + logger.info("response message: " + con.getResponseMessage()); + in = new BufferedReader(new InputStreamReader(con.getErrorStream())); + } + String inputLine; + StringBuffer response = new StringBuffer(); + int i = 0; + while ((inputLine = in.readLine()) != null) { + if (inputLine.length() < maxLength) { + response.append(inputLine); + } + i++; + if (i >= maxLength) { + logger.warn("Input stream's length(" + inputLine.length() + + ") is greater than or equal to hdfs.maxlength(" + maxLength + + "). Please increase hdfs.maxlength in interpreter setting"); + break; + } + } + in.close(); + + return response.toString(); + } } diff --git a/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java b/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java index c2caa11d5ae..9346e1f5255 100644 --- a/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java +++ b/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java @@ -18,9 +18,6 @@ package org.apache.zeppelin.file; -import java.text.SimpleDateFormat; -import java.util.*; - import com.google.gson.Gson; import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.interpreter.Interpreter; @@ -28,14 +25,20 @@ import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Properties; + /** * HDFS implementation of File interpreter for Zeppelin. * */ public class HDFSFileInterpreter extends FileInterpreter { - static final String HDFS_URL = "hdfs.url"; - static final String HDFS_USER = "hdfs.user"; - static final String HDFS_MAXLENGTH = "hdfs.maxlength"; + public static final String HDFS_URL = "hdfs.url"; + public static final String HDFS_USER = "hdfs.user"; + public static final String HDFS_MAXLENGTH = "hdfs.maxlength"; Exception exceptionOnConnect = null; HDFSCommand cmd = null; diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index 07fa2df42de..40f5c775001 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -57,6 +57,18 @@ ${project.version} + + ${project.groupId} + zeppelin-file + ${project.version} + + + javax.ws.rs + javax.ws.rs-api + + + + org.slf4j slf4j-api diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/HDFSNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/HDFSNotebookRepo.java new file mode 100644 index 00000000000..c0faa20cea7 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/HDFSNotebookRepo.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import com.google.gson.Gson; +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSystemManager; +import org.apache.commons.vfs2.FileType; +import org.apache.commons.vfs2.NameScope; +import org.apache.commons.vfs2.VFS; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.file.HDFSCommand; +import org.apache.zeppelin.file.HDFSCommand.Arg; +import org.apache.zeppelin.file.HDFSFileInterpreter.AllFileStatus; +import org.apache.zeppelin.file.HDFSFileInterpreter.OneFileStatus; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.zeppelin.file.HDFSFileInterpreter.HDFS_URL; +import static org.apache.zeppelin.file.HDFSFileInterpreter.HDFS_MAXLENGTH; + +/** + * HDFSNotebookRepo : Using HDFS to backup and restore notebook. + */ +public class HDFSNotebookRepo extends VFSNotebookRepo implements NotebookRepo { + Logger logger = LoggerFactory.getLogger(HDFSNotebookRepo.class); + + public static final String HDFS_NOTEBOOK_DIR = "hdfs.notebook.dir"; + static final String NOTE_JSON = "note.json"; + static final String NOTE_JSON_TEMP = "_note.json"; + + ZeppelinConfiguration conf; + FileSystemManager fsManager; + boolean enableWebHDFS = true; + String hdfsUrl = null; + String hdfsUser = null; + int hdfsMaxLength = 0; + String hdfsNotebookDir = null; + HDFSCommand hdfsCmd = null; + + public HDFSNotebookRepo(ZeppelinConfiguration conf) throws IOException { + super(conf); + this.conf = conf; + this.fsManager = VFS.getManager(); + this.hdfsUrl = conf.getString(HDFS_URL, HDFS_URL, "http://localhost:50070/webhdfs/v1/"); + this.hdfsMaxLength = conf.getInt(HDFS_URL, HDFS_MAXLENGTH, 100000); + this.hdfsNotebookDir = removeProtocol(conf.getString(HDFS_URL, HDFS_NOTEBOOK_DIR, "/tmp")); + this.hdfsUser = System.getenv("HADOOP_USER_NAME"); + if (this.hdfsUser == null) { + this.hdfsUser = System.getenv("LOGNAME"); + } + + if (this.hdfsUser == null) { + this.enableWebHDFS = false; + } else { + this.hdfsCmd = new HDFSCommand(hdfsUrl, hdfsUser, logger, this.hdfsMaxLength); + this.enableWebHDFS = checkWebHDFS(); + } + } + + public String removeProtocol(String hdfsUrl) { + String newUrl = hdfsUrl.replaceAll("/$", ""); + if (newUrl.startsWith("hdfs://")) { + return "/" + newUrl.replaceAll("^hdfs://", "").split("/", 2)[1]; + } else { + return newUrl; + } + } + + private boolean checkWebHDFS() { + boolean ret = true; + + OneFileStatus fileStatus; + Gson gson = new Gson(); + try { + String notebookStatus = this.hdfsCmd.runCommand(this.hdfsCmd.getFileStatus, "/", null); + fileStatus = gson.fromJson(notebookStatus, OneFileStatus.class); + long modificationTime = fileStatus.modificationTime; + } catch (Exception e) { + logger.info("disabled webHDFS. Please check webhdfs configurations"); + ret = false; + } finally { + return ret; + } + } + + private List getHdfsNotebook() { + List hdfsNotebook = new ArrayList(); + Gson gson = new Gson(); + String hdfsDirStatus = null; + + try { + hdfsDirStatus = this.hdfsCmd.runCommand(this.hdfsCmd.listStatus, this.hdfsNotebookDir, null); + + if (hdfsDirStatus != null) { + AllFileStatus allFiles = gson.fromJson(hdfsDirStatus, AllFileStatus.class); + if (allFiles != null && allFiles.FileStatuses != null + && allFiles.FileStatuses.FileStatus != null) { + for (OneFileStatus fs : allFiles.FileStatuses.FileStatus) { + if ("DIRECTORY".equals(fs.type) && fs.pathSuffix.startsWith("_") == false) { + hdfsNotebook.add(fs.pathSuffix); + logger.info("read a notebook from HDFS: " + fs.pathSuffix); + } + } + } + } + } catch (Exception e) { + logger.error("exception occurred during getting notebook from hdfs : ", e); + } + + return hdfsNotebook; + } + + private void downloadNotebook(String noteId, FileObject rootDir) throws IOException { + logger.debug("download notebook from hdfs: " + rootDir.getName().getBaseName()); + OneFileStatus fileStatus; + Gson gson = new Gson(); + String notebook = this.hdfsNotebookDir + "/" + noteId + "/" + NOTE_JSON; + try { + String notebookStatus = this.hdfsCmd.runCommand(this.hdfsCmd.getFileStatus, notebook, null); + fileStatus = gson.fromJson(notebookStatus, OneFileStatus.class); + } catch (Exception e) { + logger.warn("exception occurred during checking hdfs file status: ", e); + return; + } + long length = fileStatus.length; + if (length > this.hdfsMaxLength) { + this.hdfsCmd = new HDFSCommand(this.hdfsUrl, this.hdfsUser, logger, (int) length); + } + FileObject noteDir; + try { + noteDir = rootDir.resolveFile(noteId, NameScope.CHILD); + this.hdfsCmd.runCommand(this.hdfsCmd.openFile, notebook, noteDir, + conf.getString(ConfVars.ZEPPELIN_ENCODING), null); + } catch (Exception e) { + throw new IOException(e.getCause()); + } + } + + private void syncHDFSNoteList() throws IOException { + FileObject rootDir = super.getRootDir(); + FileObject[] children = rootDir.getChildren(); + List hdfsNotebook = getHdfsNotebook(); + + + for (FileObject f : children) { + String baseName = f.getName().getBaseName(); + logger.debug("read a notebook from local storage: " + baseName); + + if (f.isHidden() || f.getType() != FileType.FOLDER || + baseName.startsWith(".") || baseName.startsWith("#") || baseName.startsWith("~")) { + continue; + } + + if (hdfsNotebook.contains(baseName)) { + hdfsNotebook.remove(baseName); + } else { + uploadNoteToHDFS(baseName); + } + } + + for (String noteId : hdfsNotebook) { + downloadNotebook(noteId, rootDir); + } + } + + private void uploadNoteToHDFS(Note note) throws IOException { + uploadNoteToHDFS(note.getId()); + } + + private void uploadNoteToHDFS(String noteId) throws IOException { + String localNotebook = super.getRootDir() + "/" + noteId + "/" + NOTE_JSON; + FileObject localNote = super.getRootDir().resolveFile(noteId + "/" + NOTE_JSON); + String noteDir = this.hdfsNotebookDir + "/" + noteId; + String notebook = noteDir + "/" + NOTE_JSON; + String newNotebook = noteDir + "/" + NOTE_JSON_TEMP; + logger.debug("localNotebook: {}\tnotebook: {}", localNotebook, notebook); + + try { + this.hdfsCmd.runCommand(this.hdfsCmd.makeDirectory, noteDir, null); + this.hdfsCmd.runCommand(this.hdfsCmd.createWriteFile, newNotebook, localNote, null); + this.hdfsCmd.runCommand(this.hdfsCmd.deleteFile, notebook, null); + Arg dest = this.hdfsCmd.new Arg("destination", notebook); + Arg[] renameArgs = {dest}; + this.hdfsCmd.runCommand(this.hdfsCmd.renameFile, newNotebook, renameArgs); + } catch (Exception e) { + logger.error("Exception: ", e); + throw new IOException(e.getCause()); + } + } + + private void removeHDFSNote(String noteId) throws IOException { + String noteDir = this.hdfsNotebookDir + "/" + noteId; + logger.debug("remove noteDir: " + noteDir); + + Arg recursive = this.hdfsCmd.new Arg("recursive", "true"); + Arg[] args = {recursive}; + + try { + this.hdfsCmd.runCommand(this.hdfsCmd.deleteFile, noteDir, args); + } catch (Exception e) { + logger.error("Exception: ", e); + throw new IOException(e.getCause()); + } + } + + @Override + public List list(AuthenticationInfo subject) throws IOException { + if (this.enableWebHDFS) { + syncHDFSNoteList(); + } + return super.list(subject); + } + + @Override + public synchronized void save(Note note, AuthenticationInfo subject) throws IOException { + super.save(note, subject); + if (this.enableWebHDFS) { + uploadNoteToHDFS(note); + } + } + + @Override + public void remove(String noteId, AuthenticationInfo subject) throws IOException { + if (this.enableWebHDFS) { + removeHDFSNote(noteId); + } + super.remove(noteId, subject); + } + +} diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/HDFSNotebookRepoTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/HDFSNotebookRepoTest.java new file mode 100644 index 00000000000..9cb6b5f15f9 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/HDFSNotebookRepoTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class HDFSNotebookRepoTest { + @Test + public void removeProtocolTest() { + String hdfsUrlNoProtocol1 = "/user/foo/notebook"; + String hdfsUrlNoProtocol2 = "/user/foo/notebook/"; + String hdfsUrlWithProtocol1 = "hdfs://namenode/user/foo/notebook"; + String hdfsUrlWithProtocol2 = "hdfs://dummyhost:8020/user/foo/notebook"; + + ZeppelinConfiguration conf = new ZeppelinConfiguration(); + HDFSNotebookRepo repo = null; + try { + repo = new HDFSNotebookRepo(conf); + } catch (IOException e) { + e.printStackTrace(); + } + assertEquals("hdfsUrlNoProtocol1", "/user/foo/notebook", repo.removeProtocol(hdfsUrlNoProtocol1)); + assertEquals("hdfsUrlNoProtocol2", "/user/foo/notebook", repo.removeProtocol(hdfsUrlNoProtocol2)); + assertEquals("hdfsUrlWithProtocol1", "/user/foo/notebook", repo.removeProtocol(hdfsUrlWithProtocol1)); + assertEquals("hdfsUrlWithProtocol2", "/user/foo/notebook", repo.removeProtocol(hdfsUrlWithProtocol2)); + } +}