Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions docs/storage/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ There are few notebook storage systems available for a use out of the box:
* use local file system and version it using local Git repository - `GitNotebookRepo`
* storage using Amazon S3 service - `S3NotebookRepo`
* storage using Azure service - `AzureNotebookRepo`
* storage using Apache Hadoop HDFS - `HDFSNotebookRepo`

Multiple storage systems can be used at the same time by providing a comma-separated list of the class-names in the configuration.
By default, only first two of them will be automatically kept in sync by Zeppelin.
Expand Down Expand Up @@ -210,6 +211,44 @@ Optionally, you can specify Azure folder structure name in the file **zeppelin-s
```

</br>

## Notebook Storage in Apache Hadoop HDFS <a name="HDFS"></a>

Notebooks may be stored in HDFS and local file system.

To use this, set the following environment variable in the file **zeppelin-site.xml**

```
<property>
<name>zeppelin.notebook.storage</name>
<value>org.apache.zeppelin.notebook.repo.HDFSNotebookRepo</value>
<description>notebook persistence layer implementation</description>
</property>

<property>
<name>hdfs.url</name>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to add prefix zeppelin.notebook, because hdfs is used in many places. Adding prefix can help user to understand this this is for notebook storage.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HDFSCommand use hdfs.url property already. So I used the same property name.
If adding prefix can help user to understand, I'll add prefix.

<value>http://localhost:50070/webhdfs/v1/</value>
<description>HDFS url</description>
</property>
<property>
<name>hdfs.user</name>
<value>hdfs</value>
<description>HDFS user</description>
</property>
<property>
<name>hdfs.maxlength</name>
<value>1000</value>
<description>Maximum number of lines of results fetched</description>
</property>
<property>
<name>hdfs.notebook.dir</name>
<value>/tmp</value>
<description>notebook location directory in HDFS</description>
</property>
```

</br>

## Storage in ZeppelinHub <a name="ZeppelinHub"></a>

ZeppelinHub storage layer allows out of the box connection of Zeppelin instance with your ZeppelinHub account. First of all, you need to either comment out the following property in **zeppelin-site.xml**:
Expand Down
6 changes: 6 additions & 0 deletions file/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@
<version>${jersey.common.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-vfs2</artifactId>
<version>2.0</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
165 changes: 143 additions & 22 deletions file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@

package org.apache.zeppelin.file;

import java.net.URL;
import java.net.HttpURLConnection;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.NameScope;
import org.slf4j.Logger;

import javax.ws.rs.core.UriBuilder;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import javax.ws.rs.core.UriBuilder;
import org.slf4j.Logger;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;

/**
* Definition and HTTP invocation methods for all WebHDFS commands
Expand All @@ -36,7 +44,8 @@ public class HDFSCommand {
*/
public enum HttpType {
GET,
PUT
PUT,
DELETE
}

/**
Expand Down Expand Up @@ -76,6 +85,11 @@ public Arg(String key, String value) {
// Define all the commands available
public Op getFileStatus = new Op("GETFILESTATUS", HttpType.GET, 0);
public Op listStatus = new Op("LISTSTATUS", HttpType.GET, 0);
public Op openFile = new Op("OPEN", HttpType.GET, 0);
public Op makeDirectory = new Op("MKDIRS", HttpType.PUT, 0);
public Op createWriteFile = new Op("CREATE", HttpType.PUT, 0);
public Op deleteFile = new Op("DELETE", HttpType.DELETE, 0);
public Op renameFile = new Op("RENAME", HttpType.PUT, 0);

public HDFSCommand(String url, String user, Logger logger, int maxLength) {
super();
Expand All @@ -102,8 +116,22 @@ public String checkArgs(Op op, String path, Arg[] args) throws Exception {
}


public String runCommand(Op op, String path, Arg[] args) throws Exception {
return runCommand(op, path, null, null, null, args);
}

public String runCommand(Op op, String path, FileObject argFile, Arg[] args) throws Exception {
return runCommand(op, path, null, null, argFile, args);
}

public String runCommand(Op op, String path, FileObject noteDir, String charsetName,
Arg[] args) throws Exception {
return runCommand(op, path, noteDir, charsetName, null, args);
}

// The operator that runs all commands
public String runCommand(Op op, String path, Arg[] args) throws Exception {
public String runCommand(Op op, String path, FileObject noteDir, String charsetName,
FileObject argFile, Arg[] args) throws Exception {

// Check arguments
String error = checkArgs(op, path, args);
Expand All @@ -119,38 +147,131 @@ public String runCommand(Op op, String path, Arg[] args) throws Exception {
.queryParam("op", op.op);

if (args != null) {
boolean isUserName = false;
for (Arg a : args) {
builder = builder.queryParam(a.key, a.value);
if ("user.name".equals(a.key)) {
isUserName = true;
}
}
if (!isUserName) {
builder = builder.queryParam("user.name", this.user);
}
}
else {
builder = builder.queryParam("user.name", this.user);
}
java.net.URI uri = builder.build();

// Connect and get response string
URL hdfsUrl = uri.toURL();
HttpURLConnection con = (HttpURLConnection) hdfsUrl.openConnection();
FileObject noteJson;
OutputStream out = null;

if (op.cmd == HttpType.GET) {
con.setRequestMethod("GET");
con.setFollowRedirects(true);

if ("OPEN".equals(op.op)) {
noteJson = noteDir.resolveFile("note.json", NameScope.CHILD);
out = noteJson.getContent().getOutputStream(false);
}

String result = getReceivedResponse(con, HttpType.GET, hdfsUrl);

if ("OPEN".equals(op.op)) {
out.write(result.getBytes());
out.close();
}

return result;
} else if (op.cmd == HttpType.PUT) {
con.setRequestMethod("PUT");
con.setFollowRedirects(false);
int responseCode = con.getResponseCode();
logger.info("Sending 'GET' request to URL : " + hdfsUrl);
logger.info("Response Code : " + responseCode);
String result = getReceivedResponse(con, HttpType.PUT, hdfsUrl);

if (responseCode == 307 && ("CREATE".equals(op.op) || "APPEND".equals(op.op))) {
String location = con.getHeaderField("Location");
logger.debug("Redirect Location: " + location);

hdfsUrl = new URL(location);
con = (HttpURLConnection) hdfsUrl.openConnection();

File file = new File(argFile.getURL().toURI());
FileInputStream fi = new FileInputStream(file);

con.setRequestMethod("PUT");
con.setRequestProperty("Content-Type", "application/octet-stream");
con.setRequestProperty("Transfer-Encoding", "chunked");
con.setDoOutput(true);

BufferedReader in = new BufferedReader(
new InputStreamReader(con.getInputStream()));
String inputLine;
StringBuffer response = new StringBuffer();

int i = 0;
while ((inputLine = in.readLine()) != null) {
if (inputLine.length() < maxLength)
response.append(inputLine);
i++;
if (i >= maxLength)
break;
DataOutputStream outputStream = new DataOutputStream(con.getOutputStream());

int bytesAvailable = fi.available();
int maxBufferSize = 1024;
int bufferSize = Math.min(bytesAvailable, maxBufferSize);
byte[] buffer = new byte[bufferSize];

int bytesRead = fi.read(buffer, 0, bufferSize);
while (bytesRead > 0) {
outputStream.write(buffer, 0, bufferSize);
bytesAvailable = fi.available();
bufferSize = Math.min(bytesAvailable, maxBufferSize);
bytesRead = fi.read(buffer, 0, bufferSize);
}

fi.close();
outputStream.flush();

result = getReceivedResponse(con, HttpType.PUT, hdfsUrl);
}
in.close();
return response.toString();

return result;
} else if (op.cmd == HttpType.DELETE) {
con.setRequestMethod("DELETE");
con.setDoInput(true);
con.setInstanceFollowRedirects(false);
return getReceivedResponse(con, HttpType.DELETE, hdfsUrl);
}

return null;
}

private String getReceivedResponse(HttpURLConnection con,
HttpType type, URL url) throws IOException {
int responseCode = con.getResponseCode();

BufferedReader in;
if (responseCode == 200 || responseCode == 201 || responseCode == 307) {
logger.debug("Sending '{}' request to URL : {}", type.toString(), url);
logger.debug("Response Code : " + responseCode);
logger.debug("response message: " + con.getResponseMessage());
in = new BufferedReader(new InputStreamReader(con.getInputStream()));
} else {
logger.info("Sending '{}' request to URL : {}", type.toString(), url);
logger.info("Response Code : " + responseCode);
logger.info("response message: " + con.getResponseMessage());
in = new BufferedReader(new InputStreamReader(con.getErrorStream()));
}
String inputLine;
StringBuffer response = new StringBuffer();
int i = 0;
while ((inputLine = in.readLine()) != null) {
if (inputLine.length() < maxLength) {
response.append(inputLine);
}
i++;
if (i >= maxLength) {
logger.warn("Input stream's length(" + inputLine.length()
+ ") is greater than or equal to hdfs.maxlength(" + maxLength
+ "). Please increase hdfs.maxlength in interpreter setting");
break;
}
}
in.close();

return response.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,27 @@

package org.apache.zeppelin.file;

import java.text.SimpleDateFormat;
import java.util.*;

import com.google.gson.Gson;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;

/**
* HDFS implementation of File interpreter for Zeppelin.
*
*/
public class HDFSFileInterpreter extends FileInterpreter {
static final String HDFS_URL = "hdfs.url";
static final String HDFS_USER = "hdfs.user";
static final String HDFS_MAXLENGTH = "hdfs.maxlength";
public static final String HDFS_URL = "hdfs.url";
public static final String HDFS_USER = "hdfs.user";
public static final String HDFS_MAXLENGTH = "hdfs.maxlength";

Exception exceptionOnConnect = null;
HDFSCommand cmd = null;
Expand Down
12 changes: 12 additions & 0 deletions zeppelin-zengine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a good idea to make zengine to depend on one specific interpreter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. But To use HDFSCommand functions, I had to add dependency.
Could you give me advice to solve this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, Zengine should not have such dependency.

@aspen01 if HDFSCommand have to be shared:

  • one option is to move it to zeppelin-interpreter, as both file and zeppelin-zengine already depend on it
  • another option to consider would be - extract a new maven sub-module out of zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/, move HDFSCommand there and make other modules depend on it.

Second option is more involved and deserves a separate issue \w discussion. First one looks more feasible

<artifactId>zeppelin-file</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Loading