diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 77e0b1f3bcd..868f8e9c531 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -158,6 +158,21 @@ --> + + + zeppelin.notebook.storage org.apache.zeppelin.notebook.repo.VFSNotebookRepo diff --git a/docs/storage/storage.md b/docs/storage/storage.md index 76012fff4b4..22bff047126 100644 --- a/docs/storage/storage.md +++ b/docs/storage/storage.md @@ -238,4 +238,39 @@ export ZEPPELINHUB_API_TOKEN = ZeppelinHub token export ZEPPELINHUB_API_ADDRESS = address of ZeppelinHub service (e.g. https://www.zeppelinhub.com) ``` -You can get more information on generating `token` and using authentication on the corresponding [help page](http://help.zeppelinhub.com/zeppelin_integration/#add-a-new-zeppelin-instance-and-generate-a-token). \ No newline at end of file +You can get more information on generating `token` and using authentication on the corresponding [help page](http://help.zeppelinhub.com/zeppelin_integration/#add-a-new-zeppelin-instance-and-generate-a-token). + +
+## Storage in IPFS + +Using IpfsNotebookRepo you can use Ipfs to save Note revisions and retrieve a particular revision. + +Comment out the following in **zeppelin-site.xml** + +Make sure you enter the correct ipfs apiServer. +``` + + zeppelin.notebook.storage + org.apache.zeppelin.notebook.repo.ipfs.IPFSNotebookRepo + notebook persistence layer implementation + + + + zeppelin.notebook.ipfs.apiServer + http://localhost:5001/api/v0/ + ipfs api Server Multiaddress + +``` +and comment + +``` + + zeppelin.notebook.storage + org.apache.zeppelin.notebook.repo.VFSNotebookRepo + notebook persistence layer implementation + +``` +In the `notebook` directory is a file `ipfsnotehashes.json` which contains the ipfs hash for each + note revision which is commited, it is saved in your local '.ipfs' directory. You can share + these ipfs hash with others. Notebook can also be imported by entering this hash. + diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE index b33ad64b108..abf29ab3ee9 100644 --- a/zeppelin-distribution/src/bin_license/LICENSE +++ b/zeppelin-distribution/src/bin_license/LICENSE @@ -17,6 +17,7 @@ The following components are provided under Apache License. (Apache 2.0) Apache Commons Exec (commons-exec:commons-exec:1.3 - http://commons.apache.org/exec/) (Apache 2.0) Http Components (org.apache.httpcomponents:httpcore:4.3.3 - https://github.com/apache/httpclient) (Apache 2.0) Http Components (org.apache.httpcomponents:httpclient:4.3.6 - https://github.com/apache/httpclient) + (Apache 2.0) Http Components (org.apache.httpcomponents:httpmime:4.3.6 - https://github.com/apache/httpclient) (Apache 2.0) Apache Commons Lang (org.apache.commons:commons-lang:2.5 - http://commons.apache.org/proper/commons-lang/) (Apache 2.0) Apache Commons Lang 3 (org.apache.commons:commons-lang3:3.4 - http://commons.apache.org/proper/commons-lang/) (Apache 2.0) Apache Commons Math 3 (org.apache.commons:commons-math3:3.4.1 - http://commons.apache.org/proper/commons-math/) diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index eac96f00d8f..59ee0904e8c 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -319,6 +319,10 @@ org.eclipse.jetty.websocket websocket-client + + org.apache.httpcomponents + httpmime + diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 694f08109e3..2f0f1f1a715 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -188,6 +188,9 @@ public void onMessage(NotebookSocket conn, String msg) { case IMPORT_NOTE: importNote(conn, userAndRoles, notebook, messagereceived); break; + case IMPORT_NOTE_URL: + importNoteFromUrl(conn, userAndRoles, notebook, messagereceived); + break; case COMMIT_PARAGRAPH: updateParagraph(conn, userAndRoles, notebook, messagereceived); break; @@ -728,6 +731,33 @@ protected Note importNote(NotebookSocket conn, HashSet userAndRoles, return note; } + protected Note importNoteFromUrl(NotebookSocket conn, HashSet userAndRoles, + Notebook notebook, Message fromMessage) throws IOException { + Note note = null; + if (fromMessage != null) { + String type = (String) fromMessage.get("type"); + if (type.equals("ipfs")) { + String name = (String) fromMessage.get("name"); + Map options = ((Map) fromMessage.get("options")); + String urlHash = (String) options.get("hash"); + AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); + note = notebook.importNoteFromBackend(urlHash, name, subject); + Message message = new Message(OP.IMPORT_NOTE_STATUS); + message.put("type", "ipfs"); + message.put("options", options); + if (note != null) { + note.persist(subject); + message.put("importStatus", "success"); + } else { + message.put("importStatus", "failure"); + } + conn.send(serializeMessage(message)); + broadcastNoteList(subject); + } + } + return note; + } + private void removeParagraph(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); diff --git a/zeppelin-web/src/components/noteName-import/note-import-dialog.html b/zeppelin-web/src/components/noteName-import/note-import-dialog.html index 4922cb89058..5cf19436660 100644 --- a/zeppelin-web/src/components/noteName-import/note-import-dialog.html +++ b/zeppelin-web/src/components/noteName-import/note-import-dialog.html @@ -56,6 +56,7 @@ + diff --git a/zeppelin-web/src/components/noteName-import/notenameImport.controller.js b/zeppelin-web/src/components/noteName-import/notenameImport.controller.js index dea3dd32b22..a4c13a2ec9e 100644 --- a/zeppelin-web/src/components/noteName-import/notenameImport.controller.js +++ b/zeppelin-web/src/components/noteName-import/notenameImport.controller.js @@ -14,11 +14,12 @@ 'use strict'; -angular.module('zeppelinWebApp').controller('NoteImportCtrl', function($scope, $timeout, websocketMsgSrv) { +angular.module('zeppelinWebApp').controller('NoteImportCtrl', function($scope, $timeout, ngToast, websocketMsgSrv) { var vm = this; $scope.note = {}; $scope.note.step1 = true; $scope.note.step2 = false; + $scope.note.isOtherUrl = false; vm.resetFlags = function() { $scope.note = {}; @@ -65,12 +66,16 @@ angular.module('zeppelinWebApp').controller('NoteImportCtrl', function($scope, $ vm.importNote = function() { $scope.note.errorText = ''; if ($scope.note.importUrl) { - jQuery.getJSON($scope.note.importUrl, function(result) { - vm.processImportJson(result); - }).fail(function() { - $scope.note.errorText = 'Unable to Fetch URL'; - $scope.$apply(); - }); + if (!$scope.note.isOtherUrl) { + jQuery.getJSON($scope.note.importUrl, function(result) { + vm.processImportJson(result); + }).fail(function() { + $scope.note.errorText = 'Unable to Fetch URL'; + $scope.$apply(); + }); + } else { + websocketMsgSrv.importNoteFromBackend($scope.note.importUrl, $scope.note.noteImportName, 'ipfs'); + } } else { $scope.note.errorText = 'Enter URL'; $scope.$apply(); @@ -110,4 +115,23 @@ angular.module('zeppelinWebApp').controller('NoteImportCtrl', function($scope, $ vm.resetFlags(); angular.element('#noteImportModal').modal('hide'); }); + + $scope.$on('importNoteResult', function(event, data) { + if (data.type === 'ipfs') { + var ngToastConf = { + content: 'Failed to import Note with hash ' + data.options.hash, + verticalPosition: 'bottom', + horizontalPosition: 'center', + timeout: '5000' + }; + + if (data.importStatus === 'success') { + ngToastConf.content = 'Successfully imported Note with hash ' + data.options.hash, + ngToast.success(ngToastConf); + } else { + ngToast.danger(ngToastConf); + } + } + }); + }); diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js index e4f45db0218..8bab14a0c3b 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js +++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js @@ -60,6 +60,8 @@ angular.module('zeppelinWebApp').factory('websocketEvents', $location.path('/notebook/' + data.note.id); } else if (op === 'NOTES_INFO') { $rootScope.$broadcast('setNoteMenu', data.notes); + } else if (op === 'IMPORT_NOTE_STATUS') { + $rootScope.$broadcast('importNoteResult', data); } else if (op === 'LIST_NOTEBOOK_JOBS') { $rootScope.$broadcast('setNotebookJobs', data.notebookJobs); } else if (op === 'LIST_UPDATE_NOTEBOOK_JOBS') { diff --git a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js index 473299ec4d9..42db3084ea8 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js +++ b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js @@ -151,6 +151,19 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, }); }, + importNoteFromBackend: function(hashUrl, name, type) { + websocketEvents.sendNewEvent({ + op: 'IMPORT_NOTE_URL', + data: { + type: type, + name: name, + options: { + hash: hashUrl + } + } + }); + }, + checkpointNotebook: function(noteId, commitMessage) { websocketEvents.sendNewEvent({ op: 'CHECKPOINT_NOTEBOOK', diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index b7e1938ff60..e8f5d1a3e46 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -249,6 +249,11 @@ test + + org.apache.httpcomponents + httpmime + 4.3.6 + diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 025accd4284..1b98f8c12a0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -182,6 +182,39 @@ public String exportNote(String noteId) throws IOException, IllegalArgumentExcep return gson.toJson(note); } + /** + * + * @param url - the ipfs url or hash to get note from + * @param subject + * @return note + * @throws IOException + */ + public Note importNoteFromBackend(String url, String noteName, AuthenticationInfo subject) + throws IOException { + Note newNote = null; + Note oldNote = notebookRepo.getNoteFromUrl(url, subject); + if (oldNote != null) { + try { + newNote = createNote(subject); + if (noteName != null) { + newNote.setName(noteName); + } else { + newNote.setName(oldNote.getName()); + } + List paragraphs = oldNote.getParagraphs(); + for (Paragraph p : paragraphs) { + newNote.addCloneParagraph(p); + } + newNote.persist(subject); + } catch (IOException e) { + logger.error("Importing note from " + url + " failed", e); + throw e; + } + } + + return newNote; + } + /** * import JSON as a new note. * diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/AzureNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/AzureNotebookRepo.java index ca72fc1f7f0..5dea3ccd0e6 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/AzureNotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/AzureNotebookRepo.java @@ -227,4 +227,10 @@ public List revisionHistory(String noteId, AuthenticationInfo subject) // Auto-generated method stub return null; } + + @Override + public Note getNoteFromUrl(String url, AuthenticationInfo subject) throws IOException{ + // Auto-generated method stub + return null; + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java index aff684fc158..dbf1cb8f7a0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java @@ -100,6 +100,15 @@ public interface NotebookRepo { */ @ZeppelinApi public List revisionHistory(String noteId, AuthenticationInfo subject); + /** + * Download a note from a given URL or Hash + * @param url + * @param subject + * @return note in JSON String + */ + @ZeppelinApi public Note getNoteFromUrl(String url, AuthenticationInfo subject) + throws IOException; + /** * Represents the 'Revision' a point in life of the notebook */ @@ -112,6 +121,23 @@ public Revision(String revId, String message, int time) { public String id; public String message; public int time; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Revision)) { + return false; + } + Revision revision = (Revision) o; + return id.equals(revision.id); + } + + @Override + public int hashCode() { + return id.hashCode(); + } } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java index 72087262aca..c0ecd9f0457 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java @@ -410,4 +410,15 @@ public List revisionHistory(String noteId, AuthenticationInfo subject) } return revisions; } + + @Override + public Note getNoteFromUrl(String url, AuthenticationInfo subject) { + Note newNote = null; + try { + newNote = getRepo(0).getNoteFromUrl(url, subject); + } catch (IOException e) { + LOG.error("Failed to download note with given url", e); + } + return newNote; + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java index 8f4dd31034f..0d659620a31 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java @@ -270,4 +270,10 @@ public List revisionHistory(String noteId, AuthenticationInfo subject) // Auto-generated method stub return null; } + + @Override + public Note getNoteFromUrl(String url, AuthenticationInfo subject) throws IOException { + // Auto-generated method stub + return null; + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java index 4a93dec1dd9..ed04900c41a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java @@ -285,4 +285,10 @@ public List revisionHistory(String noteId, AuthenticationInfo subject) return null; } + @Override + public Note getNoteFromUrl(String url, AuthenticationInfo subject) throws IOException { + // Auto-generated method stub + return null; + } + } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/IPFSNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/IPFSNotebookRepo.java new file mode 100644 index 00000000000..3e348fdc0b1 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/IPFSNotebookRepo.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo.ipfs; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.vfs2.FileContent; +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.NameScope; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.notebook.ApplicationState; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NotebookImportDeserializer; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.notebook.repo.NotebookRepo; +import org.apache.zeppelin.notebook.repo.VFSNotebookRepo; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +/** + * Share and get versioned notebooks with IPFS. Make sure IPFS daemon is running. + */ +public class IPFSNotebookRepo extends VFSNotebookRepo implements NotebookRepo { + private static final String API_SERVER_PROPERTY_NAME = "zeppelin.notebook.ipfs.apiServer"; + private static final String DEFAULT_API_SERVER = "http://localhost:5001/api/v0/"; + private static final Logger LOG = LoggerFactory.getLogger(IPFSNotebookRepo.class); + private ExecutorService executorService = Executors.newCachedThreadPool(); + private Gson gson; + private Ipfs ipfs; + private FileObject ipfsNoteHashesJson; + private String encoding; + /* + * map of > i.e Revision History + * { 2A94M5J1Z : [ QmZQybbanVowHLynssnMjzPJcZ676yB1dA2CpzLN4ZTY48, QmVc.... ], + * 2BKZQ2FP6 : [ QmNhPUwuUQ1uD1n22h2CEBFLKPCExCiVc7rcgHmMftmzsv ] + * } + */ + private Map> noteHashes; + + public IPFSNotebookRepo(ZeppelinConfiguration conf) throws IOException { + super(conf); + encoding = conf.getString(ConfVars.ZEPPELIN_ENCODING); + gson = new GsonBuilder().setPrettyPrinting().registerTypeAdapter(Date.class, new + NotebookImportDeserializer()).create(); + String ipfsApiServer = conf.getString("IPFS_API_SERVER", + API_SERVER_PROPERTY_NAME, DEFAULT_API_SERVER); + ipfs = new Ipfs(ipfsApiServer); + String versionJson = ipfs.version(); + if (versionJson == null) { + throw new IOException("Make sure the ipfs daemon is running on given url" + ipfsApiServer); + } + Type simpleType = new TypeToken>() { + }.getType(); + Map data = gson.fromJson(versionJson, simpleType); + String version = data.get("Version"); + if (!version.equals("0.4.2")) { + throw new IOException("Current api is not supported for " + version + " only for 0.4.2"); + } + + // creates a ipfsnotehashes.json file in notebook dir if not exists + init(); + // initialize noteHashes Map to load noteID and multihash from file + noteHashes = loadFromFile(); + } + + /** + * creates a ipfsnotehashes.json file in notebook directory. This file will represent the + * noteHashes Map in Json format. + */ + private void init() throws IOException { + FileObject file = getRootDir(); + ipfsNoteHashesJson = file.resolveFile("ipfsnotehashes.json", NameScope.CHILD); + if (!ipfsNoteHashesJson.exists()) { + ipfsNoteHashesJson.createFile(); + } + } + + /** + * Reads the ipfsNoteHashesJson file and converts the Json String to Map + */ + private Map> loadFromFile() throws IOException { + FileContent content = ipfsNoteHashesJson.getContent(); + InputStream ins = content.getInputStream(); + String json = IOUtils.toString(ins, encoding); + ins.close(); + if (json.isEmpty() || json == null) { + return new HashMap<>(); + } + Type type = new TypeToken>>() { + }.getType(); + Map> map = gson.fromJson(json, type); + return map; + } + + /** + * Get's the note from peers. + * + * @return Note + */ + @Override + public Note getNoteFromUrl(final String hash, AuthenticationInfo subject) throws IOException { + //getNote is blocking hence using a timeout + Callable task = new Callable() { + @Override + public Note call() throws Exception { + return getNote(hash); + } + }; + Future noteFuture = executorService.submit(task); + Note note = null; + try { + note = noteFuture.get(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.error("Failed to download Note, Interrupted", e); + } catch (ExecutionException e) { + LOG.error("Failed to get note", e); + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } + } catch (TimeoutException e) { + LOG.error("TimeOut reached", e); + } finally { + noteFuture.cancel(true); + } + return note; + } + + /** + * The call to ipfs.cat() can be blocking. If the file corresponding to hash is not present in + * local ipfs storage it will try to get from peers which can be time consuming resulting in + * SocketTimeout Exception + */ + private Note getNote(String noteMultihash) throws IOException { + String noteJson = ipfs.cat(noteMultihash); + if (noteJson == null) { + throw new IOException("Failed to retrieve Note with hash " + noteMultihash); + } + + Note note = gson.fromJson(noteJson, Note.class); + + for (Paragraph p : note.getParagraphs()) { + if (p.getStatus() == Job.Status.PENDING || p.getStatus() == Job.Status.RUNNING) { + p.setStatus(Job.Status.ABORT); + } + + List appStates = p.getAllApplicationStates(); + if (appStates != null) { + for (ApplicationState app : appStates) { + if (app.getStatus() != ApplicationState.Status.ERROR) { + app.setStatus(ApplicationState.Status.UNLOADED); + } + } + } + } + return note; + } + + /* This method writes the revision history - noteHashes to ipfsNoteHashesJson file*/ + private synchronized void saveToFile() throws IOException { + String jsonString = gson.toJson(noteHashes); + OutputStream out = ipfsNoteHashesJson.getContent().getOutputStream(false); + out.write(jsonString.getBytes(encoding)); + out.close(); + } + + /** + * remove method deletes the notebook from directory and also removes all its revisions from ipfs + * local storage. It also removes the noteID entry from noteHashesJSon file. + */ + @Override + public synchronized void remove(String noteId, AuthenticationInfo subject) throws IOException { + super.remove(noteId, subject); + List revisions = noteHashes.get(noteId); + if (revisions != null) { + String pinJsonResponse = ipfs.pinLs(); + if (pinJsonResponse == null) { + throw new IOException("Failed to retrieve pinned hashes"); + } + Type pinType = new TypeToken>() { + }.getType(); + Map data = gson.fromJson(pinJsonResponse, pinType); + Map pinnedObjects = (Map) data.get("Keys"); + + for (Revision rev : revisions) { + String revisionHash = rev.id; + if (pinnedObjects.containsKey(revisionHash)) { + boolean success = ipfs.pinRm(revisionHash); + if (!success) { + LOG.warn("Failed to remove " + revisionHash); + } + } + } + noteHashes.remove(noteId); + } + saveToFile(); + } + + /** + * This method removes only a corresponding revision for a note + */ + public void removeRevision(String noteID, Revision revision) throws IOException { + List noteRevisions = noteHashes.get(noteID); + if (noteRevisions == null) { + LOG.error("This note " + noteID + "does not have any revisions"); + throw new IOException("This note " + noteID + "does not have any revisions"); + } + if (!noteRevisions.contains(revision)) { + LOG.error("invalid revision " + revision + " for note " + noteID); + throw new IOException("invalid revision " + revision + " for note " + noteID); + } + boolean success = ipfs.pinRm(revision.id); + if (success) { + noteRevisions.remove(revision); + } else { + LOG.warn("Failed to remove revision " + revision); + } + saveToFile(); + } + + /* + * Notes are stored in local directory and are added to ipfs only when + * committed. If there is no change in note same hash will be generated + * and it will not be added to the file. + */ + @Override + public Revision checkpoint(String noteId, String commitMessage, AuthenticationInfo subject) + throws IOException { + Note note = get(noteId, subject); + String json = gson.toJson(note); + String addResult = ipfs.add(json); + if (addResult == null) { + throw new IOException("Failed to checkpoint note with ID " + noteId); + } + Type type = new TypeToken>() { + }.getType(); + Map data = gson.fromJson(addResult, type); + String hash = data.get("Hash"); + + /* + * The `time` param in Rev is in int(unix timestamp) seconds + * System.currentTimeMillis() returns long milliseconds + */ + int time = (int) (System.currentTimeMillis() / 1000L); + Revision revision = new Revision(hash, commitMessage, time); + List noteVersions = noteHashes.get(noteId); + if (noteVersions == null) { + noteVersions = new ArrayList<>(); + noteVersions.add(revision); + noteHashes.put(noteId, noteVersions); + } else { + if (!noteVersions.contains(revision)) { + noteVersions.add(revision); + } + /* + * revision already exists before. Should i change time ? + * + * */ + } + saveToFile(); + LOG.info("Checkpoint for Note " + noteId + " IpfsRevision is " + revision.id); + return revision; + } + + + public Map> getNoteHashes() { + return new HashMap<>(noteHashes); + } + + /** + * get a particular revision from ipfs + */ + @Override + public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException { + Note note = null; + String pinJsonResponse = ipfs.pinLs(); + Type pinType = new TypeToken>() { + }.getType(); + Map data = gson.fromJson(pinJsonResponse, pinType); + Map allPinnedObjects = (Map) data.get("Keys"); + + if (allPinnedObjects.containsKey(revId)) { + note = getNote(revId); + } + if (note == null) { + LOG.warn("revision " + revId + " not present for note " + + noteId + " in local ipfs storage"); + } + return note; + } + + @Override + public List revisionHistory(String noteId, AuthenticationInfo subject) { + List versionHistory = noteHashes.get(noteId); + return versionHistory; + } + + @Override + public void close() { + super.close(); + try { + ipfs.getClient().close(); + } catch (IOException e) { + LOG.info("Couldn't successfully close the ipfsClient", e); + } + executorService.shutdown(); + } +} + diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/Ipfs.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/Ipfs.java new file mode 100644 index 00000000000..a5987e645d0 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/Ipfs.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo.ipfs; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.mime.MultipartEntityBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.Map; + + +/** + * Ipfs HttpClient + */ +public class Ipfs { + private static final Logger LOG = LoggerFactory.getLogger(Ipfs.class); + private CloseableHttpClient client; + private String baseUrl; + + public Ipfs(String url) { + baseUrl = url; + client = HttpClients.createDefault(); + } + + public String pinLs() { + String pinnedJsonString = null; + HttpGet request = new HttpGet(baseUrl + "pin/ls?type=recursive&quiet=true"); + CloseableHttpResponse response = null; + try { + response = client.execute(request); + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + if (statusCode == HttpStatus.SC_OK && resEntity != null) { + pinnedJsonString = EntityUtils.toString(resEntity, "UTF-8"); + } else { + LOG.error("Failed to get pinned objects with unexpected statuscode " + statusCode); + } + } catch (IOException e) { + LOG.error("Http request failed to send to " + baseUrl, e); + } finally { + try { + response.close(); + } catch (IOException | NullPointerException e) { + LOG.error("Failed to close response", e); + } + } + return pinnedJsonString; + } + + public boolean pinRm(String hash) { + boolean removed = false; + HttpGet request = new HttpGet(baseUrl + "pin/rm?recursive=true&arg=" + hash); + CloseableHttpResponse response = null; + try { + response = client.execute(request); + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + String responseMessage; + if (statusCode == HttpStatus.SC_INTERNAL_SERVER_ERROR) { + responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + Type type = new TypeToken>() { + }.getType(); + Map responseMap = new Gson().fromJson(responseMessage, type); + String errormessage = responseMap.get("Message"); + if (errormessage.equals("note pinned")) { + removed = true; + } + + LOG.error("response Message = " + responseMessage); + /* + * HTTP/1.1 500 Internal Server Error + * Content-Type: application/json + * responseMessage = {"Message":"invalid ipfs ref path","Code":0} + * or not pinned + * HTTP/1.1 500 Internal Server Error + * responseMessage = {"Message":"not pinned","Code":0} + * + **/ + } else if (statusCode == HttpStatus.SC_OK && resEntity != null) { + responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + removed = true; + LOG.info("response Message = " + responseMessage); + } else { + LOG.error("Failed to remove pinned object with unexpected statuscode " + statusCode); + } + } catch (IOException e) { + LOG.error("Http request failed to send to " + baseUrl, e); + } finally { + try { + response.close(); + } catch (IOException | NullPointerException e) { + LOG.error("Failed to close response", e); + } + } + return removed; + } + + public String cat(String hash) { + HttpGet request = new HttpGet(baseUrl + "cat/" + hash); + String ipfsContent = null; + RequestConfig config = RequestConfig.custom() + .setSocketTimeout(555000) + .build(); + request.setConfig(config); + CloseableHttpResponse response = null; + try { + response = client.execute(request); + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + if (statusCode == HttpStatus.SC_INTERNAL_SERVER_ERROR) { + String responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + LOG.error("response Message = " + responseMessage); + /* + * HTTP/1.1 500 Internal Server Error + * Content-Type: application/json + * responseMessage = {"Message":"invalid ipfs ref path","Code":0} + * + **/ + } else if (statusCode == HttpStatus.SC_OK && resEntity != null) { + String responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + return responseMessage; + } else { + LOG.error("Failed to get ipfs object with unexpected statuscode " + statusCode); + } + } catch (IOException e) { + LOG.error("Http request failed to send to " + baseUrl, e); + } finally { + try { + response.close(); + } catch (IOException | NullPointerException e) { + LOG.error("Failed to close response", e); + } + } + return ipfsContent; + } + + public String add(String content) { + String filename = "note.json"; + String responseMessage = null; + if (content == null) { + return null; + } + HttpPost request = new HttpPost(baseUrl + "add?stream-channels=true&progress=false"); + HttpEntity entity = MultipartEntityBuilder.create() + .addBinaryBody("file", content.getBytes(), ContentType + .APPLICATION_OCTET_STREAM, filename) + .build(); + request.setEntity(entity); + CloseableHttpResponse response = null; + try { + response = client.execute(request); + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + if (statusCode == HttpStatus.SC_OK && resEntity != null) { + responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + } else { + LOG.error("Failed to add ipfs object with unexpected statuscode " + statusCode); + } + } catch (IOException e) { + LOG.error("Http request failed to send to " + baseUrl, e); + } finally { + try { + response.close(); + } catch (IOException | NullPointerException e) { + LOG.error("Failed to close response", e); + } + } + return responseMessage; + } + + public String version() { + String version = null; + HttpGet request = new HttpGet(baseUrl + "/version"); + CloseableHttpResponse response = null; + try { + response = client.execute(request); + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + if (statusCode == HttpStatus.SC_OK && resEntity != null) { + version = EntityUtils.toString(resEntity, "UTF-8"); + } else { + LOG.error("Failed to get ipfs version with unexpected statuscode " + statusCode); + } + } catch (IOException e) { + LOG.error("Http request failed to send to " + baseUrl, e); + } finally { + try { + response.close(); + } catch (IOException | NullPointerException e) { + LOG.error("Failed to close response", e); + } + } + return version; + } + + public CloseableHttpClient getClient() { + return client; + } +} + + diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java index 960bcde5aeb..a37be626d94 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java @@ -234,4 +234,10 @@ public List revisionHistory(String noteId, AuthenticationInfo subject) return history; } + @Override + public Note getNoteFromUrl(String url, AuthenticationInfo subject) throws IOException{ + // Auto-generated method stub + return null; + } + } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java index e91dfbb99e7..530d33e7ebf 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java @@ -51,6 +51,13 @@ public static enum OP { // @param name name fpor the cloned note IMPORT_NOTE, // [c-s] import notebook // @param object notebook + IMPORT_NOTE_URL, // [c-s] import notebook + // @param type ipfs for now + // @param options hash or magnetlink or any future url + IMPORT_NOTE_STATUS, //[s-c] notebook import through url failed + // @param status success or failure + // @param type ipfs for now + // @param options hash or magnetlink or any future url NOTE_UPDATE, RUN_PARAGRAPH, // [c-s] run paragraph