From 9e39e53dbedd6981fd2cc57bb772a69d3c2b7ebe Mon Sep 17 00:00:00 2001 From: onkarshedge Date: Fri, 10 Jun 2016 13:25:59 +0530 Subject: [PATCH 1/3] Added ipfsnotebookrepo with Versioning and import feature backend and frontend added import status as ng-toast Added documentation --- conf/zeppelin-site.xml.template | 15 + docs/storage/storage.md | 50 ++- zeppelin-distribution/src/bin_license/LICENSE | 1 + .../zeppelin/socket/NotebookServer.java | 24 ++ .../noteName-import/note-import-dialog.html | 8 + .../notenameImport.controller.js | 42 ++- .../websocketEvents.factory.js | 2 + .../websocketEvents/websocketMsg.service.js | 9 + zeppelin-zengine/pom.xml | 5 + .../apache/zeppelin/notebook/Notebook.java | 28 ++ .../notebook/repo/AzureNotebookRepo.java | 6 + .../notebook/repo/IPFSNotebookRepo.java | 311 ++++++++++++++++++ .../zeppelin/notebook/repo/NotebookRepo.java | 25 ++ .../notebook/repo/NotebookRepoSync.java | 11 + .../notebook/repo/S3NotebookRepo.java | 6 + .../notebook/repo/VFSNotebookRepo.java | 6 + .../repo/zeppelinhub/ZeppelinHubRepo.java | 6 + .../zeppelin/notebook/socket/Message.java | 4 + .../notebook/repo/IpfsNotebookRepoTest.java | 109 ++++++ .../src/test/resources/ipfsnotehashes.json | 16 + 20 files changed, 676 insertions(+), 8 deletions(-) create mode 100644 zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/IPFSNotebookRepo.java create mode 100644 zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/IpfsNotebookRepoTest.java create mode 100644 zeppelin-zengine/src/test/resources/ipfsnotehashes.json diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 77e0b1f3bcd..acca51027eb 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -158,6 +158,21 @@ --> + + + zeppelin.notebook.storage org.apache.zeppelin.notebook.repo.VFSNotebookRepo diff --git a/docs/storage/storage.md b/docs/storage/storage.md index 76012fff4b4..349774a75d6 100644 --- a/docs/storage/storage.md +++ b/docs/storage/storage.md @@ -238,4 +238,52 @@ export ZEPPELINHUB_API_TOKEN = ZeppelinHub token export ZEPPELINHUB_API_ADDRESS = address of ZeppelinHub service (e.g. https://www.zeppelinhub.com) ``` -You can get more information on generating `token` and using authentication on the corresponding [help page](http://help.zeppelinhub.com/zeppelin_integration/#add-a-new-zeppelin-instance-and-generate-a-token). \ No newline at end of file +You can get more information on generating `token` and using authentication on the corresponding [help page](http://help.zeppelinhub.com/zeppelin_integration/#add-a-new-zeppelin-instance-and-generate-a-token). + +
+## Storage in IPFS + +Using IpfsNotebookRepo you can use Ipfs to save Note revisions and retrieve a particular revision. + +**Installing ipfs** +* Download and install [ipfs](https://ipfs.io/docs/install/) for your OS. +* Enter the following commands + * `ipfs init` + * `ipfs daemon` + +`ipfs daemon` will start the ipfs. In the output you will see the following. +>API server listening on /ip4/127.0.0.1/tcp/5001
+>Gateway (readonly) server listening on /ip4/127.0.0.1/tcp/8080 + +You can edit the following ports with `ipfs config edit`. +As zeppelin also uses 8080 by default you might want to chage the gateway server or run zeppelin on another port. + +Comment out the following in **zeppelin-site.xml** + +Make sure you enter the correct ipfs apiServer. +``` + + zeppelin.notebook.storage + org.apache.zeppelin.notebook.repo.IPFSNotebookRepo + notebook persistence layer implementation + + + + zeppelin.notebook.ipfs.apiServer + /ip4/127.0.0.1/tcp/5001 + ipfs api Server Multiaddress + +``` +and comment + +``` + + zeppelin.notebook.storage + org.apache.zeppelin.notebook.repo.VFSNotebookRepo + notebook persistence layer implementation + +``` +In the `notebook` directory is a file `ipfsnotehashes.json` which contains the ipfs hash for each + note revision which is commited, it is saved in your local '.ipfs' directory. You can share + these ipfs hash with others. Notebook can also be imported by entering this hash. + diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE index b33ad64b108..5696faecd7c 100644 --- a/zeppelin-distribution/src/bin_license/LICENSE +++ b/zeppelin-distribution/src/bin_license/LICENSE @@ -151,6 +151,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version (The MIT License) bcprov-jdk15on v1.51 (org.bouncycastle:bcprov-jdk15on:jar:1.51 - http://www.bouncycastle.org/java.html) - http://www.bouncycastle.org/licence.html (The MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs) - https://github.com/bryanbraun/anchorjs/blob/master/README.md#license (The MIT License) moment-duration-format v1.3.0 (https://github.com/jsmreese/moment-duration-format) - https://github.com/jsmreese/moment-duration-format/blob/master/LICENSE + (The MIT License) java-ipfs-api v0.0.1-SNAPSHOT (https://github.com/ipfs/java-ipfs-api) - https://github.com/ipfs/java-ipfs-api/blob/master/LICENSE The following components are provided under the MIT License. diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 694f08109e3..717f0a08603 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -188,6 +188,9 @@ public void onMessage(NotebookSocket conn, String msg) { case IMPORT_NOTE: importNote(conn, userAndRoles, notebook, messagereceived); break; + case IMPORT_NOTE_URL: + importNoteFromUrl(conn, userAndRoles, notebook, messagereceived); + break; case COMMIT_PARAGRAPH: updateParagraph(conn, userAndRoles, notebook, messagereceived); break; @@ -728,6 +731,27 @@ protected Note importNote(NotebookSocket conn, HashSet userAndRoles, return note; } + protected Note importNoteFromUrl(NotebookSocket conn, HashSet userAndRoles, + Notebook notebook, Message fromMessage) throws IOException { + Note note = null; + if (fromMessage != null) { + String urlHash = (String) fromMessage.get("hash"); + AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); + note = notebook.importNote(urlHash, subject); + Message message = new Message(OP.IMPORT_NOTE_STATUS); + if (note != null) { + note.persist(subject); + message.put("importStatus", "success"); + } + else { + message.put("importStatus", "failure"); + } + conn.send(serializeMessage(message)); + broadcastNoteList(subject); + } + return note; + } + private void removeParagraph(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); diff --git a/zeppelin-web/src/components/noteName-import/note-import-dialog.html b/zeppelin-web/src/components/noteName-import/note-import-dialog.html index 4922cb89058..c6bfb7da79f 100644 --- a/zeppelin-web/src/components/noteName-import/note-import-dialog.html +++ b/zeppelin-web/src/components/noteName-import/note-import-dialog.html @@ -56,6 +56,14 @@ + + diff --git a/zeppelin-web/src/components/noteName-import/notenameImport.controller.js b/zeppelin-web/src/components/noteName-import/notenameImport.controller.js index dea3dd32b22..a1078d1d0f2 100644 --- a/zeppelin-web/src/components/noteName-import/notenameImport.controller.js +++ b/zeppelin-web/src/components/noteName-import/notenameImport.controller.js @@ -14,11 +14,12 @@ 'use strict'; -angular.module('zeppelinWebApp').controller('NoteImportCtrl', function($scope, $timeout, websocketMsgSrv) { +angular.module('zeppelinWebApp').controller('NoteImportCtrl', function($scope, $timeout, ngToast, websocketMsgSrv) { var vm = this; $scope.note = {}; $scope.note.step1 = true; $scope.note.step2 = false; + $scope.note.urlType = 'normal'; vm.resetFlags = function() { $scope.note = {}; @@ -65,12 +66,16 @@ angular.module('zeppelinWebApp').controller('NoteImportCtrl', function($scope, $ vm.importNote = function() { $scope.note.errorText = ''; if ($scope.note.importUrl) { - jQuery.getJSON($scope.note.importUrl, function(result) { - vm.processImportJson(result); - }).fail(function() { - $scope.note.errorText = 'Unable to Fetch URL'; - $scope.$apply(); - }); + if ($scope.note.urlType === 'normal') { + jQuery.getJSON($scope.note.importUrl, function(result) { + vm.processImportJson(result); + }).fail(function() { + $scope.note.errorText = 'Unable to Fetch URL'; + $scope.$apply(); + }); + } else { + websocketMsgSrv.importNoteFromIpfs($scope.note.importUrl); + } } else { $scope.note.errorText = 'Enter URL'; $scope.$apply(); @@ -110,4 +115,27 @@ angular.module('zeppelinWebApp').controller('NoteImportCtrl', function($scope, $ vm.resetFlags(); angular.element('#noteImportModal').modal('hide'); }); + + $scope.$on('importNoteFail', function(event, data) { + vm.resetFlags(); + angular.element('#noteImportModal').modal('hide'); + var status = data.importStatus; + console.log('Data is %o', data); + if (status === 'success') { + ngToast.success({ + content: 'Successfully imported Note', + verticalPosition: 'bottom', + horizontalPosition: 'center', + timeout: '5000' + }); + } else { + ngToast.danger({ + content: 'Failed to import Note with hash', + verticalPosition: 'bottom', + horizontalPosition: 'center', + timeout: '5000' + }); + } + }); + }); diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js index e4f45db0218..05883ce2608 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js +++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js @@ -60,6 +60,8 @@ angular.module('zeppelinWebApp').factory('websocketEvents', $location.path('/notebook/' + data.note.id); } else if (op === 'NOTES_INFO') { $rootScope.$broadcast('setNoteMenu', data.notes); + } else if (op === 'IMPORT_NOTE_STATUS') { + $rootScope.$broadcast('importNoteFail', data); } else if (op === 'LIST_NOTEBOOK_JOBS') { $rootScope.$broadcast('setNotebookJobs', data.notebookJobs); } else if (op === 'LIST_UPDATE_NOTEBOOK_JOBS') { diff --git a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js index 473299ec4d9..3986b4edc94 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js +++ b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js @@ -151,6 +151,15 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, }); }, + importNoteFromIpfs: function(hashUrl) { + websocketEvents.sendNewEvent({ + op: 'IMPORT_NOTE_URL', + data: { + hash: hashUrl + } + }); + }, + checkpointNotebook: function(noteId, commitMessage) { websocketEvents.sendNewEvent({ op: 'CHECKPOINT_NOTEBOOK', diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index b7e1938ff60..243097c76cd 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -249,6 +249,11 @@ test + + org.ipfs + api + 0.0.1-SNAPSHOT + diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 025accd4284..423a9a3fa55 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -182,6 +182,34 @@ public String exportNote(String noteId) throws IOException, IllegalArgumentExcep return gson.toJson(note); } + /** + * + * @param url - the ipfs url or hash to get note from + * @param subject + * @return note + * @throws IOException + */ + public Note importNote(String url, AuthenticationInfo subject) throws IOException { + Note newNote = null; + Note oldNote = notebookRepo.getNoteFromUrl(url, subject); + if (oldNote != null) { + try { + newNote = createNote(subject); + newNote.setName(oldNote.getName()); + List paragraphs = oldNote.getParagraphs(); + for (Paragraph p : paragraphs) { + newNote.addCloneParagraph(p); + } + newNote.persist(subject); + } catch (IOException e) { + logger.error(e.toString(), e); + throw e; + } + } + + return newNote; + } + /** * import JSON as a new note. * diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/AzureNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/AzureNotebookRepo.java index ca72fc1f7f0..5dea3ccd0e6 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/AzureNotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/AzureNotebookRepo.java @@ -227,4 +227,10 @@ public List revisionHistory(String noteId, AuthenticationInfo subject) // Auto-generated method stub return null; } + + @Override + public Note getNoteFromUrl(String url, AuthenticationInfo subject) throws IOException{ + // Auto-generated method stub + return null; + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/IPFSNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/IPFSNotebookRepo.java new file mode 100644 index 00000000000..97b1e480220 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/IPFSNotebookRepo.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.vfs2.FileContent; +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.NameScope; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.notebook.ApplicationState; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NotebookImportDeserializer; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.ipfs.api.Base58; +import org.ipfs.api.IPFS; +import org.ipfs.api.MerkleNode; +import org.ipfs.api.Multihash; +import org.ipfs.api.NamedStreamable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +/** + * Share and get versioned notebooks with IPFS. Make sure IPFS daemon is running. + */ +public class IPFSNotebookRepo extends VFSNotebookRepo implements NotebookRepo { + private static final String API_SERVER_PROPERTY_NAME = "zeppelin.notebook.ipfs.apiServer"; + private static final String DEFAULT_API_SERVER = "/ip4/127.0.0.1/tcp/5001"; + private static final Logger LOG = LoggerFactory.getLogger(NotebookRepoSync.class); + private ExecutorService executorService = Executors.newCachedThreadPool(); + + private Gson gson; + private IPFS ipfs; + private FileObject ipfsNoteHashesJson; + private String encoding; + /* + * map of > i.e Revision History + * { 2A94M5J1Z : [ QmZQybbanVowHLynssnMjzPJcZ676yB1dA2CpzLN4ZTY48, QmVc.... ], + * 2BKZQ2FP6 : [ QmNhPUwuUQ1uD1n22h2CEBFLKPCExCiVc7rcgHmMftmzsv ] + * } + */ + private Map> noteHashes; + + public IPFSNotebookRepo(ZeppelinConfiguration conf) throws IOException { + super(conf); + encoding = conf.getString(ConfVars.ZEPPELIN_ENCODING); + gson = new GsonBuilder().setPrettyPrinting().registerTypeAdapter(Date.class, new + NotebookImportDeserializer()).create(); + String ipfsApiServer = conf.getString("IPFS_API_SERVER", + API_SERVER_PROPERTY_NAME, DEFAULT_API_SERVER); + ipfs = new IPFS(ipfsApiServer); + // creates a ipfsnotehashes.json file in notebook dir if not exists + init(); + // initialize noteHashes Map to load noteID and multihash from file + noteHashes = loadFromFile(); + } + + /* + * creates a ipfsnotehashes.json file in notebook directory. + * This file will represent the noteHashes Map in Json format. + */ + private void init() throws IOException { + FileObject file = getRootDir(); + ipfsNoteHashesJson = file.resolveFile("ipfsnotehashes.json", NameScope.CHILD); + if (!ipfsNoteHashesJson.exists()) { + ipfsNoteHashesJson.createFile(); + } + } + + //Reads the ipfsNoteHashesJson file and converts the Json String to Map + private Map> loadFromFile() throws IOException { + FileContent content = ipfsNoteHashesJson.getContent(); + InputStream ins = content.getInputStream(); + String json = IOUtils.toString(ins, encoding); + ins.close(); + if (json.isEmpty() || json == null) + return new HashMap<>(); + Type type = new TypeToken>>() { + }.getType(); + Map> map = gson.fromJson(json, type); + return map; + } + + /** + * Get's the note from peers. + * + * @param url + * @param subject + * @return Note + * @throws IOException + */ + @Override + public Note getNoteFromUrl(String url, AuthenticationInfo subject) throws IOException { + //getNote is blocking hence using a timeout + final Multihash multihash = new Multihash(Base58.decode(url)); + Callable task = new Callable() { + @Override + public Note call() throws Exception { + return getNote(multihash); + } + }; + Future noteFuture = executorService.submit(task); + Note note = null; + try { + note = noteFuture.get(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.error("Failed to download Note, Interrupted", e); + } catch (ExecutionException e) { + LOG.error("Failed to get note", e); + Throwable cause = e.getCause(); + if (cause instanceof IOException) + throw (IOException) cause; + } catch (TimeoutException e) { + LOG.error("TimeOut reached", e); + } finally { + noteFuture.cancel(true); + } + return note; + } + + /* + * The call to ipfs.cat() can be blocking. If the file corresponding to hash + * is not present in local ipfs storage it will try to get from peers which + * can be time consuming. + */ + private Note getNote(Multihash noteMultihash) throws IOException { + String noteJson = new String(ipfs.cat(noteMultihash)); + Note note = gson.fromJson(noteJson, Note.class); + + for (Paragraph p : note.getParagraphs()) { + if (p.getStatus() == Job.Status.PENDING || p.getStatus() == Job.Status.RUNNING) { + p.setStatus(Job.Status.ABORT); + } + + List appStates = p.getAllApplicationStates(); + if (appStates != null) { + for (ApplicationState app : appStates) { + if (app.getStatus() != ApplicationState.Status.ERROR) { + app.setStatus(ApplicationState.Status.UNLOADED); + } + } + } + } + return note; + } + + /* This method writes the revision history - noteHashes to ipfsNoteHashesJson file*/ + private synchronized void saveToFile() throws IOException { + String jsonString = gson.toJson(noteHashes); + OutputStream out = ipfsNoteHashesJson.getContent().getOutputStream(false); + out.write(jsonString.getBytes(encoding)); + out.close(); + } + + /* + * remove method deletes the notebook from directory and also removes all its + * revisions from ipfs local storage. + * It also removes the noteID entry from noteHashesJSon file. + */ + @Override + public synchronized void remove(String noteId, AuthenticationInfo subject) throws IOException { + super.remove(noteId, subject); + List revisions = noteHashes.get(noteId); + if (revisions != null) { + for (Revision rev : revisions) { + Multihash revisionHash = new Multihash(Base58.decode(rev.id)); + Map allPinnedObjects = ipfs.pin.ls(IPFS.PinType.recursive); + if (allPinnedObjects.containsKey(revisionHash)) { + ipfs.pin.rm(revisionHash, true); + } + } + noteHashes.remove(noteId); + } + saveToFile(); + } + + /* + * This method removes only a corresponding revision for a note + */ + public void removeRevision(String noteID, Revision revision) throws IOException { + List noteRevisions = noteHashes.get(noteID); + if (noteRevisions == null) { + LOG.error("This note " + noteID + "does not have any revisions"); + throw new IOException("This note " + noteID + "does not have any revisions"); + } + if (!noteRevisions.contains(revision)) { + LOG.error("invalid revision " + revision + " for note " + noteID); + throw new IOException("invalid revision " + revision + " for note " + noteID); + } + Multihash revisionHash = new Multihash(Base58.decode(revision.id)); + Map allPinnedObjects = ipfs.pin.ls(IPFS.PinType.recursive); + if (allPinnedObjects.containsKey(revisionHash)) { + ipfs.pin.rm(revisionHash, true); + } + noteRevisions.remove(revision); + saveToFile(); + } + + /* + * Notes are stored in local directory and are added to ipfs only when + * committed. If there is no change in note same hash will be generated + * and it will not be added to the file. + */ + @Override + public Revision checkpoint(String noteId, String commitMessage, AuthenticationInfo subject) + throws IOException { + Note note = get(noteId, subject); + String json = gson.toJson(note); + NamedStreamable.ByteArrayWrapper noteIpfs = + new NamedStreamable.ByteArrayWrapper("note.json", json + .getBytes(encoding)); + MerkleNode addResult = ipfs.add(noteIpfs); + /* + * The `time` param in Rev is in int(unix timestamp) seconds + * System.currentTimeMillis() returns long milliseconds + */ + int time = (int) (System.currentTimeMillis() / 1000L); + Revision revision = new Revision(addResult.hash.toBase58(), commitMessage, time); + List noteVersions = noteHashes.get(noteId); + if (noteVersions == null) { + noteVersions = new ArrayList<>(); + noteVersions.add(revision); + noteHashes.put(noteId, noteVersions); + } else { + if (!noteVersions.contains(revision)) { + noteVersions.add(revision); + } + /* + * revision already exists before. Should i change time ? + * */ + } + saveToFile(); + LOG.info("Checkpoint for Note " + noteId + " IpfsRevision is " + revision.toString()); + return revision; + } + + + public Map> getNoteHashes() { + return new HashMap<>(noteHashes); + } + + // get a particular revision from ipfs + @Override + public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException { + Note note = null; + String multihashBase58 = revId; + Multihash multihash = new Multihash(Base58.decode(multihashBase58)); + + Map allPinnedObjects = ipfs.pin.ls(IPFS.PinType.recursive); + if (allPinnedObjects.containsKey(multihash)) { + note = getNote(multihash); + } + if (note == null) { + LOG.info("revision " + multihash + " not present for note " + + noteId + " in local ipfs storage"); + } + return note; + } + + @Override + public List revisionHistory(String noteId, AuthenticationInfo subject) { + List versionHistory = noteHashes.get(noteId); + return versionHistory; + } + + @Override + public void close() { + super.close(); + executorService.shutdown(); + } +} + diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java index aff684fc158..49282a36944 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java @@ -100,6 +100,15 @@ public interface NotebookRepo { */ @ZeppelinApi public List revisionHistory(String noteId, AuthenticationInfo subject); + /** + * Download a note from a given URL or Hash + * @param url + * @param subject + * @return note in JSON String + */ + @ZeppelinApi public Note getNoteFromUrl(String url, AuthenticationInfo subject) + throws IOException; + /** * Represents the 'Revision' a point in life of the notebook */ @@ -112,6 +121,22 @@ public Revision(String revId, String message, int time) { public String id; public String message; public int time; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Revision)) return false; + + Revision revision = (Revision) o; + + return id.equals(revision.id); + + } + + @Override + public int hashCode() { + return id.hashCode(); + } } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java index 72087262aca..c0ecd9f0457 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java @@ -410,4 +410,15 @@ public List revisionHistory(String noteId, AuthenticationInfo subject) } return revisions; } + + @Override + public Note getNoteFromUrl(String url, AuthenticationInfo subject) { + Note newNote = null; + try { + newNote = getRepo(0).getNoteFromUrl(url, subject); + } catch (IOException e) { + LOG.error("Failed to download note with given url", e); + } + return newNote; + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java index 8f4dd31034f..0d659620a31 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/S3NotebookRepo.java @@ -270,4 +270,10 @@ public List revisionHistory(String noteId, AuthenticationInfo subject) // Auto-generated method stub return null; } + + @Override + public Note getNoteFromUrl(String url, AuthenticationInfo subject) throws IOException { + // Auto-generated method stub + return null; + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java index 4a93dec1dd9..ed04900c41a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java @@ -285,4 +285,10 @@ public List revisionHistory(String noteId, AuthenticationInfo subject) return null; } + @Override + public Note getNoteFromUrl(String url, AuthenticationInfo subject) throws IOException { + // Auto-generated method stub + return null; + } + } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java index 960bcde5aeb..a37be626d94 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java @@ -234,4 +234,10 @@ public List revisionHistory(String noteId, AuthenticationInfo subject) return history; } + @Override + public Note getNoteFromUrl(String url, AuthenticationInfo subject) throws IOException{ + // Auto-generated method stub + return null; + } + } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java index e91dfbb99e7..ea7602743a7 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java @@ -51,6 +51,10 @@ public static enum OP { // @param name name fpor the cloned note IMPORT_NOTE, // [c-s] import notebook // @param object notebook + IMPORT_NOTE_URL, // [c-s] import notebook + // @param hash ipfs hash + IMPORT_NOTE_STATUS, //[s-c] notebook import through url failed + // @param status success or failure NOTE_UPDATE, RUN_PARAGRAPH, // [c-s] run paragraph diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/IpfsNotebookRepoTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/IpfsNotebookRepoTest.java new file mode 100644 index 00000000000..94f5ef72b2e --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/IpfsNotebookRepoTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + + +import com.google.common.base.Joiner; + +import org.apache.commons.io.FileUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.mock.MockInterpreter1; +import org.apache.zeppelin.interpreter.mock.MockInterpreter2; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import static com.google.common.truth.Truth.assertThat; + +public class IpfsNotebookRepoTest { + private static final String TEST_NOTE_ID = "2A94M5J1Z"; + + private File zeppelinDir; + private File ipfsNoteHashes; + private String notebooksDir; + private ZeppelinConfiguration conf; + private IPFSNotebookRepo notebookRepo; + + @Before + public void setUp() throws Exception { + String zpath = System.getProperty("java.io.tmpdir") + "/ZeppelinTest_" + System.currentTimeMillis(); + zeppelinDir = new File(zpath); + zeppelinDir.mkdirs(); + new File(zeppelinDir, "conf").mkdirs(); + + notebooksDir = Joiner.on(File.separator).join(zpath, "notebook"); + File notebookDir = new File(notebooksDir); + notebookDir.mkdirs(); + + String testNoteDir = Joiner.on(File.separator).join(notebooksDir, TEST_NOTE_ID); + FileUtils.copyDirectory(new File(Joiner.on(File.separator).join("src", "test", "resources", TEST_NOTE_ID)), + new File(testNoteDir) + ); + FileUtils.copyFileToDirectory(new File(Joiner.on(File.separator).join("src", "test", "resources", "ipfsnotehashes.json")), + notebookDir + ); + ipfsNoteHashes = new File(Joiner.on(File.separator).join(notebooksDir, "ipfsnotehashes.json")); + + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), zeppelinDir.getAbsolutePath()); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath()); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2"); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), "org.apache.zeppelin.notebook.repo.IpfsNotebookRepo"); + + MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); + MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); + + conf = ZeppelinConfiguration.create(); + } + + @After + public void tearDown() throws Exception { + NotebookRepoSyncTest.delete(zeppelinDir); + } + + @Test + public void initNonemptyRevisionNotebookDir() throws IOException { + //given - ipfsnotehashes.json exits + + //when + notebookRepo = new IPFSNotebookRepo(conf); + + //then + assertThat(ipfsNoteHashes.exists()).isEqualTo(true); + assertThat(notebookRepo.getNoteHashes()).isNotEmpty(); + + assertThat(notebookRepo.list(null)).isNotEmpty(); + + } + + @Test + public void showNotebookHistory() throws IOException { + //given + + notebookRepo = new IPFSNotebookRepo(conf); + + List revisions = notebookRepo.revisionHistory(TEST_NOTE_ID,null); + + //then + assertThat(revisions).isNotEmpty(); + } + +} diff --git a/zeppelin-zengine/src/test/resources/ipfsnotehashes.json b/zeppelin-zengine/src/test/resources/ipfsnotehashes.json new file mode 100644 index 00000000000..522ab1e2bf6 --- /dev/null +++ b/zeppelin-zengine/src/test/resources/ipfsnotehashes.json @@ -0,0 +1,16 @@ +{ + "r": [ + { + "commitMessage": "r notebook no change commit", + "name": "QmcjoEoysvgW3nFSkLfDkLUuZZts4Yop9HvYkTPkrB8hWk", + "time": 1466145258 + } + ], + "2A94M5J1Z": [ + { + "commitMessage": "first para", + "name": "QmNN24LURsvaPqYZs4Y689r9wQWcs3CuFZJkfu5koYVnKS", + "time": 1466168505 + } + ] +} \ No newline at end of file From 1f13f9683d6dd0c34f023d79fe22db17ec9f7f51 Mon Sep 17 00:00:00 2001 From: onkarshedge Date: Mon, 22 Aug 2016 21:18:54 +0530 Subject: [PATCH 2/3] refactored code to remove any dependencies and use httpclient --- conf/zeppelin-site.xml.template | 6 +- docs/storage/storage.md | 4 +- zeppelin-distribution/src/bin_license/LICENSE | 2 +- zeppelin-server/pom.xml | 4 + zeppelin-zengine/pom.xml | 6 +- .../repo/{ => ipfs}/IPFSNotebookRepo.java | 148 +++++++----- .../zeppelin/notebook/repo/ipfs/Ipfs.java | 220 ++++++++++++++++++ .../notebook/repo/IpfsNotebookRepoTest.java | 109 --------- 8 files changed, 324 insertions(+), 175 deletions(-) rename zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/{ => ipfs}/IPFSNotebookRepo.java (70%) create mode 100644 zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/Ipfs.java delete mode 100644 zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/IpfsNotebookRepoTest.java diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index acca51027eb..27125112433 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -162,14 +162,14 @@ diff --git a/docs/storage/storage.md b/docs/storage/storage.md index 349774a75d6..cb45b81fdfd 100644 --- a/docs/storage/storage.md +++ b/docs/storage/storage.md @@ -264,13 +264,13 @@ Make sure you enter the correct ipfs apiServer. ``` zeppelin.notebook.storage - org.apache.zeppelin.notebook.repo.IPFSNotebookRepo + org.apache.zeppelin.notebook.repo.ipfs.IPFSNotebookRepo notebook persistence layer implementation zeppelin.notebook.ipfs.apiServer - /ip4/127.0.0.1/tcp/5001 + http://localhost:5001/api/v0/ ipfs api Server Multiaddress ``` diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE index 5696faecd7c..abf29ab3ee9 100644 --- a/zeppelin-distribution/src/bin_license/LICENSE +++ b/zeppelin-distribution/src/bin_license/LICENSE @@ -17,6 +17,7 @@ The following components are provided under Apache License. (Apache 2.0) Apache Commons Exec (commons-exec:commons-exec:1.3 - http://commons.apache.org/exec/) (Apache 2.0) Http Components (org.apache.httpcomponents:httpcore:4.3.3 - https://github.com/apache/httpclient) (Apache 2.0) Http Components (org.apache.httpcomponents:httpclient:4.3.6 - https://github.com/apache/httpclient) + (Apache 2.0) Http Components (org.apache.httpcomponents:httpmime:4.3.6 - https://github.com/apache/httpclient) (Apache 2.0) Apache Commons Lang (org.apache.commons:commons-lang:2.5 - http://commons.apache.org/proper/commons-lang/) (Apache 2.0) Apache Commons Lang 3 (org.apache.commons:commons-lang3:3.4 - http://commons.apache.org/proper/commons-lang/) (Apache 2.0) Apache Commons Math 3 (org.apache.commons:commons-math3:3.4.1 - http://commons.apache.org/proper/commons-math/) @@ -151,7 +152,6 @@ The text of each license is also included at licenses/LICENSE-[project]-[version (The MIT License) bcprov-jdk15on v1.51 (org.bouncycastle:bcprov-jdk15on:jar:1.51 - http://www.bouncycastle.org/java.html) - http://www.bouncycastle.org/licence.html (The MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs) - https://github.com/bryanbraun/anchorjs/blob/master/README.md#license (The MIT License) moment-duration-format v1.3.0 (https://github.com/jsmreese/moment-duration-format) - https://github.com/jsmreese/moment-duration-format/blob/master/LICENSE - (The MIT License) java-ipfs-api v0.0.1-SNAPSHOT (https://github.com/ipfs/java-ipfs-api) - https://github.com/ipfs/java-ipfs-api/blob/master/LICENSE The following components are provided under the MIT License. diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index eac96f00d8f..59ee0904e8c 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -319,6 +319,10 @@ org.eclipse.jetty.websocket websocket-client + + org.apache.httpcomponents + httpmime + diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index 243097c76cd..e8f5d1a3e46 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -250,9 +250,9 @@ - org.ipfs - api - 0.0.1-SNAPSHOT + org.apache.httpcomponents + httpmime + 4.3.6 diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/IPFSNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/IPFSNotebookRepo.java similarity index 70% rename from zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/IPFSNotebookRepo.java rename to zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/IPFSNotebookRepo.java index 97b1e480220..5d797cfd346 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/IPFSNotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/IPFSNotebookRepo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.zeppelin.notebook.repo; +package org.apache.zeppelin.notebook.repo.ipfs; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -31,13 +31,10 @@ import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.NotebookImportDeserializer; import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.notebook.repo.NotebookRepo; +import org.apache.zeppelin.notebook.repo.VFSNotebookRepo; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.user.AuthenticationInfo; -import org.ipfs.api.Base58; -import org.ipfs.api.IPFS; -import org.ipfs.api.MerkleNode; -import org.ipfs.api.Multihash; -import org.ipfs.api.NamedStreamable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,12 +61,15 @@ */ public class IPFSNotebookRepo extends VFSNotebookRepo implements NotebookRepo { private static final String API_SERVER_PROPERTY_NAME = "zeppelin.notebook.ipfs.apiServer"; - private static final String DEFAULT_API_SERVER = "/ip4/127.0.0.1/tcp/5001"; - private static final Logger LOG = LoggerFactory.getLogger(NotebookRepoSync.class); + private static final String DEFAULT_API_SERVER = "http://localhost:5001/api/v0/"; + private static final Logger LOG = LoggerFactory.getLogger(IPFSNotebookRepo.class); + Type simpleType = new TypeToken>() { + }.getType(); private ExecutorService executorService = Executors.newCachedThreadPool(); - private Gson gson; - private IPFS ipfs; + private Type pinType = new TypeToken>() { + }.getType(); + private Ipfs ipfs; private FileObject ipfsNoteHashesJson; private String encoding; /* @@ -87,16 +87,26 @@ public IPFSNotebookRepo(ZeppelinConfiguration conf) throws IOException { NotebookImportDeserializer()).create(); String ipfsApiServer = conf.getString("IPFS_API_SERVER", API_SERVER_PROPERTY_NAME, DEFAULT_API_SERVER); - ipfs = new IPFS(ipfsApiServer); + ipfs = new Ipfs(ipfsApiServer); + String versionJson = ipfs.version(); + if (versionJson == null) { + throw new IOException("Make sure the ipfs daemon is running on given url" + ipfsApiServer); + } + Map data = gson.fromJson(versionJson, simpleType); + String version = data.get("Version"); + if (!version.equals("0.4.2")) { + throw new IOException("Current api is not supported for " + version + " only for 0.4.2"); + } + // creates a ipfsnotehashes.json file in notebook dir if not exists init(); // initialize noteHashes Map to load noteID and multihash from file noteHashes = loadFromFile(); } - /* - * creates a ipfsnotehashes.json file in notebook directory. - * This file will represent the noteHashes Map in Json format. + /** + * creates a ipfsnotehashes.json file in notebook directory. This file will represent the + * noteHashes Map in Json format. */ private void init() throws IOException { FileObject file = getRootDir(); @@ -106,7 +116,9 @@ private void init() throws IOException { } } - //Reads the ipfsNoteHashesJson file and converts the Json String to Map + /** + * Reads the ipfsNoteHashesJson file and converts the Json String to Map + */ private Map> loadFromFile() throws IOException { FileContent content = ipfsNoteHashesJson.getContent(); InputStream ins = content.getInputStream(); @@ -123,19 +135,15 @@ private Map> loadFromFile() throws IOException { /** * Get's the note from peers. * - * @param url - * @param subject * @return Note - * @throws IOException */ @Override - public Note getNoteFromUrl(String url, AuthenticationInfo subject) throws IOException { + public Note getNoteFromUrl(final String hash, AuthenticationInfo subject) throws IOException { //getNote is blocking hence using a timeout - final Multihash multihash = new Multihash(Base58.decode(url)); Callable task = new Callable() { @Override public Note call() throws Exception { - return getNote(multihash); + return getNote(hash); } }; Future noteFuture = executorService.submit(task); @@ -157,13 +165,17 @@ public Note call() throws Exception { return note; } - /* - * The call to ipfs.cat() can be blocking. If the file corresponding to hash - * is not present in local ipfs storage it will try to get from peers which - * can be time consuming. - */ - private Note getNote(Multihash noteMultihash) throws IOException { - String noteJson = new String(ipfs.cat(noteMultihash)); + /** + * The call to ipfs.cat() can be blocking. If the file corresponding to hash is not present in + * local ipfs storage it will try to get from peers which can be time consuming resulting in + * SocketTimeout Exception + */ + private Note getNote(String noteMultihash) throws IOException { + String noteJson = ipfs.cat(noteMultihash); + if (noteJson == null) { + throw new IOException("Failed to retrieve Note with hash " + noteMultihash); + } + Note note = gson.fromJson(noteJson, Note.class); for (Paragraph p : note.getParagraphs()) { @@ -191,21 +203,30 @@ private synchronized void saveToFile() throws IOException { out.close(); } - /* - * remove method deletes the notebook from directory and also removes all its - * revisions from ipfs local storage. - * It also removes the noteID entry from noteHashesJSon file. + /** + * remove method deletes the notebook from directory and also removes all its revisions from ipfs + * local storage. It also removes the noteID entry from noteHashesJSon file. */ @Override public synchronized void remove(String noteId, AuthenticationInfo subject) throws IOException { super.remove(noteId, subject); List revisions = noteHashes.get(noteId); if (revisions != null) { + String pinJsonResponse = ipfs.pinLs(); + if (pinJsonResponse == null) { + throw new IOException("Failed to retrieve pinned hashes"); + } + + Map data = gson.fromJson(pinJsonResponse, pinType); + Map pinnedObjects = (Map) data.get("Keys"); + for (Revision rev : revisions) { - Multihash revisionHash = new Multihash(Base58.decode(rev.id)); - Map allPinnedObjects = ipfs.pin.ls(IPFS.PinType.recursive); - if (allPinnedObjects.containsKey(revisionHash)) { - ipfs.pin.rm(revisionHash, true); + String revisionHash = rev.id; + if (pinnedObjects.containsKey(revisionHash)) { + boolean success = ipfs.pinRm(revisionHash); + if (!success) { + LOG.warn("Failed to remove " + revisionHash); + } } } noteHashes.remove(noteId); @@ -213,7 +234,7 @@ public synchronized void remove(String noteId, AuthenticationInfo subject) throw saveToFile(); } - /* + /** * This method removes only a corresponding revision for a note */ public void removeRevision(String noteID, Revision revision) throws IOException { @@ -226,12 +247,12 @@ public void removeRevision(String noteID, Revision revision) throws IOException LOG.error("invalid revision " + revision + " for note " + noteID); throw new IOException("invalid revision " + revision + " for note " + noteID); } - Multihash revisionHash = new Multihash(Base58.decode(revision.id)); - Map allPinnedObjects = ipfs.pin.ls(IPFS.PinType.recursive); - if (allPinnedObjects.containsKey(revisionHash)) { - ipfs.pin.rm(revisionHash, true); + boolean success = ipfs.pinRm(revision.id); + if (success) { + noteRevisions.remove(revision); + } else { + LOG.warn("Failed to remove revision " + revision); } - noteRevisions.remove(revision); saveToFile(); } @@ -245,16 +266,21 @@ public Revision checkpoint(String noteId, String commitMessage, AuthenticationIn throws IOException { Note note = get(noteId, subject); String json = gson.toJson(note); - NamedStreamable.ByteArrayWrapper noteIpfs = - new NamedStreamable.ByteArrayWrapper("note.json", json - .getBytes(encoding)); - MerkleNode addResult = ipfs.add(noteIpfs); + String addResult = ipfs.add(json); + if (addResult == null) { + throw new IOException("Failed to checkpoint note with ID " + noteId); + } + Type type = new TypeToken>() { + }.getType(); + Map data = gson.fromJson(addResult, type); + String hash = data.get("Hash"); + /* * The `time` param in Rev is in int(unix timestamp) seconds * System.currentTimeMillis() returns long milliseconds */ int time = (int) (System.currentTimeMillis() / 1000L); - Revision revision = new Revision(addResult.hash.toBase58(), commitMessage, time); + Revision revision = new Revision(hash, commitMessage, time); List noteVersions = noteHashes.get(noteId); if (noteVersions == null) { noteVersions = new ArrayList<>(); @@ -265,11 +291,12 @@ public Revision checkpoint(String noteId, String commitMessage, AuthenticationIn noteVersions.add(revision); } /* - * revision already exists before. Should i change time ? - * */ + * revision already exists before. Should i change time ? + * + * */ } saveToFile(); - LOG.info("Checkpoint for Note " + noteId + " IpfsRevision is " + revision.toString()); + LOG.info("Checkpoint for Note " + noteId + " IpfsRevision is " + revision.id); return revision; } @@ -278,19 +305,21 @@ public Map> getNoteHashes() { return new HashMap<>(noteHashes); } - // get a particular revision from ipfs + /** + * get a particular revision from ipfs + */ @Override public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException { Note note = null; - String multihashBase58 = revId; - Multihash multihash = new Multihash(Base58.decode(multihashBase58)); + String pinJsonResponse = ipfs.pinLs(); + Map data = gson.fromJson(pinJsonResponse, pinType); + Map allPinnedObjects = (Map) data.get("Keys"); - Map allPinnedObjects = ipfs.pin.ls(IPFS.PinType.recursive); - if (allPinnedObjects.containsKey(multihash)) { - note = getNote(multihash); + if (allPinnedObjects.containsKey(revId)) { + note = getNote(revId); } if (note == null) { - LOG.info("revision " + multihash + " not present for note " + + LOG.warn("revision " + revId + " not present for note " + noteId + " in local ipfs storage"); } return note; @@ -305,6 +334,11 @@ public List revisionHistory(String noteId, AuthenticationInfo subject) @Override public void close() { super.close(); + try { + ipfs.getClient().close(); + } catch (IOException e) { + LOG.info("Couldn't successfully close the ipfsClient"); + } executorService.shutdown(); } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/Ipfs.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/Ipfs.java new file mode 100644 index 00000000000..d107986e0bc --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/Ipfs.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo.ipfs; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.mime.MultipartEntityBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.Map; + + +/** + * Ipfs HttpClient + */ +public class Ipfs { + private static final Logger LOG = LoggerFactory.getLogger(Ipfs.class); + private CloseableHttpClient client; + private String baseUrl; + + public Ipfs(String url) { + baseUrl = url; + client = HttpClients.createDefault(); + } + + public String pinLs() { + String pinnedJsonString = null; + HttpGet request = new HttpGet(baseUrl + "pin/ls?type=recursive&quiet=true"); + try { + CloseableHttpResponse response = client.execute(request); + try { + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + if (statusCode == HttpStatus.SC_OK && resEntity != null) { + pinnedJsonString = EntityUtils.toString(resEntity, "UTF-8"); + } else { + LOG.error("Failed to get pinned objects with unexpected statuscode " + statusCode); + } + + } finally { + response.close(); + } + } catch (IOException e) { + LOG.error("Http request failed to send to " + baseUrl, e); + } + return pinnedJsonString; + } + + public boolean pinRm(String hash) { + boolean removed = false; + HttpGet request = new HttpGet(baseUrl + "pin/rm?recursive=true&arg=" + hash); + try { + CloseableHttpResponse response = client.execute(request); + try { + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + String responseMessage; + if (statusCode == HttpStatus.SC_INTERNAL_SERVER_ERROR) { + responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + + Type type = new TypeToken>() { + }.getType(); + Map responseMap = new Gson().fromJson(responseMessage, type); + String errormessage = responseMap.get("Message"); + if (errormessage.equals("note pinned")) { + removed = true; + } + + LOG.error("response Message = " + responseMessage); + /* + * HTTP/1.1 500 Internal Server Error + * Content-Type: application/json + * responseMessage = {"Message":"invalid ipfs ref path","Code":0} + * or not pinned + * HTTP/1.1 500 Internal Server Error + * responseMessage = {"Message":"not pinned","Code":0} + * + **/ + } else if (statusCode == HttpStatus.SC_OK && resEntity != null) { + responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + removed = true; + LOG.info("response Message = " + responseMessage); + } else { + LOG.error("Failed to remove pinned object with unexpected statuscode " + statusCode); + } + + } finally { + response.close(); + } + } catch (IOException e) { + LOG.error("Http request failed to send to " + baseUrl, e); + } + return removed; + } + + public String cat(String hash) { + HttpGet request = new HttpGet(baseUrl + "cat/" + hash); + String ipfsContent = null; + RequestConfig config = RequestConfig.custom() + .setSocketTimeout(555000) + .build(); + request.setConfig(config); + try { + CloseableHttpResponse response = client.execute(request); + try { + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + if (statusCode == HttpStatus.SC_INTERNAL_SERVER_ERROR) { + String responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + LOG.error("response Message = " + responseMessage); + /* + * HTTP/1.1 500 Internal Server Error + * Content-Type: application/json + * responseMessage = {"Message":"invalid ipfs ref path","Code":0} + * + **/ + } else if (statusCode == HttpStatus.SC_OK && resEntity != null) { + String responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + return responseMessage; + } else { + LOG.error("Failed to get ipfs object with unexpected statuscode " + statusCode); + } + } finally { + response.close(); + } + } catch (IOException e) { + LOG.error("Http request failed to send to " + baseUrl, e); + } + return ipfsContent; + } + + public String add(String content) { + String filename = "note.json"; + String responseMessage = null; + if (content == null) { + return null; + } + HttpPost request = new HttpPost(baseUrl + "add?stream-channels=true&progress=false"); + HttpEntity entity = MultipartEntityBuilder.create() + .addBinaryBody("file", content.getBytes(), ContentType + .APPLICATION_OCTET_STREAM, filename) + .build(); + request.setEntity(entity); + try { + CloseableHttpResponse response = client.execute(request); + try { + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + if (statusCode == HttpStatus.SC_OK && resEntity != null) { + responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + } else { + LOG.error("Failed to add ipfs object with unexpected statuscode " + statusCode); + } + } finally { + response.close(); + } + } catch (IOException e) { + LOG.error("Http request failed to send to " + baseUrl, e); + } + return responseMessage; + } + + public String version() { + String version = null; + HttpGet request = new HttpGet(baseUrl + "/version"); + try { + CloseableHttpResponse response = client.execute(request); + try { + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + if (statusCode == HttpStatus.SC_OK && resEntity != null) { + version = EntityUtils.toString(resEntity, "UTF-8"); + } else { + LOG.error("Failed to get ipfs version with unexpected statuscode " + statusCode); + } + + } finally { + response.close(); + } + } catch (IOException e) { + LOG.error("Http request failed to send to " + baseUrl, e); + } + return version; + } + + public CloseableHttpClient getClient() { + return client; + } +} + + diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/IpfsNotebookRepoTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/IpfsNotebookRepoTest.java deleted file mode 100644 index 94f5ef72b2e..00000000000 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/IpfsNotebookRepoTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.notebook.repo; - - -import com.google.common.base.Joiner; - -import org.apache.commons.io.FileUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.mock.MockInterpreter1; -import org.apache.zeppelin.interpreter.mock.MockInterpreter2; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.List; - -import static com.google.common.truth.Truth.assertThat; - -public class IpfsNotebookRepoTest { - private static final String TEST_NOTE_ID = "2A94M5J1Z"; - - private File zeppelinDir; - private File ipfsNoteHashes; - private String notebooksDir; - private ZeppelinConfiguration conf; - private IPFSNotebookRepo notebookRepo; - - @Before - public void setUp() throws Exception { - String zpath = System.getProperty("java.io.tmpdir") + "/ZeppelinTest_" + System.currentTimeMillis(); - zeppelinDir = new File(zpath); - zeppelinDir.mkdirs(); - new File(zeppelinDir, "conf").mkdirs(); - - notebooksDir = Joiner.on(File.separator).join(zpath, "notebook"); - File notebookDir = new File(notebooksDir); - notebookDir.mkdirs(); - - String testNoteDir = Joiner.on(File.separator).join(notebooksDir, TEST_NOTE_ID); - FileUtils.copyDirectory(new File(Joiner.on(File.separator).join("src", "test", "resources", TEST_NOTE_ID)), - new File(testNoteDir) - ); - FileUtils.copyFileToDirectory(new File(Joiner.on(File.separator).join("src", "test", "resources", "ipfsnotehashes.json")), - notebookDir - ); - ipfsNoteHashes = new File(Joiner.on(File.separator).join(notebooksDir, "ipfsnotehashes.json")); - - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), zeppelinDir.getAbsolutePath()); - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath()); - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2"); - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), "org.apache.zeppelin.notebook.repo.IpfsNotebookRepo"); - - MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); - MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); - - conf = ZeppelinConfiguration.create(); - } - - @After - public void tearDown() throws Exception { - NotebookRepoSyncTest.delete(zeppelinDir); - } - - @Test - public void initNonemptyRevisionNotebookDir() throws IOException { - //given - ipfsnotehashes.json exits - - //when - notebookRepo = new IPFSNotebookRepo(conf); - - //then - assertThat(ipfsNoteHashes.exists()).isEqualTo(true); - assertThat(notebookRepo.getNoteHashes()).isNotEmpty(); - - assertThat(notebookRepo.list(null)).isNotEmpty(); - - } - - @Test - public void showNotebookHistory() throws IOException { - //given - - notebookRepo = new IPFSNotebookRepo(conf); - - List revisions = notebookRepo.revisionHistory(TEST_NOTE_ID,null); - - //then - assertThat(revisions).isNotEmpty(); - } - -} From 1918565e89b1d9f0e906c15a2e5f9136234d7dce Mon Sep 17 00:00:00 2001 From: Onkar Shedge Date: Tue, 30 Aug 2016 17:24:18 +0530 Subject: [PATCH 3/3] Refactored according to codestyles, changed message format,changed radio to checkbox, notename is used --- conf/zeppelin-site.xml.template | 2 +- docs/storage/storage.md | 13 -- .../zeppelin/socket/NotebookServer.java | 30 ++-- .../noteName-import/note-import-dialog.html | 9 +- .../notenameImport.controller.js | 34 ++-- .../websocketEvents.factory.js | 2 +- .../websocketEvents/websocketMsg.service.js | 8 +- .../apache/zeppelin/notebook/Notebook.java | 11 +- .../zeppelin/notebook/repo/NotebookRepo.java | 11 +- .../notebook/repo/ipfs/IPFSNotebookRepo.java | 19 +- .../zeppelin/notebook/repo/ipfs/Ipfs.java | 165 ++++++++++-------- .../zeppelin/notebook/socket/Message.java | 5 +- .../src/test/resources/ipfsnotehashes.json | 16 -- 13 files changed, 159 insertions(+), 166 deletions(-) delete mode 100644 zeppelin-zengine/src/test/resources/ipfsnotehashes.json diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 27125112433..868f8e9c531 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -170,7 +170,7 @@ zeppelin.notebook.ipfs.apiServer http://localhost:5001/api/v0/ ipfs api Server Address -
+ --> diff --git a/docs/storage/storage.md b/docs/storage/storage.md index cb45b81fdfd..22bff047126 100644 --- a/docs/storage/storage.md +++ b/docs/storage/storage.md @@ -245,19 +245,6 @@ You can get more information on generating `token` and using authentication on t Using IpfsNotebookRepo you can use Ipfs to save Note revisions and retrieve a particular revision. -**Installing ipfs** -* Download and install [ipfs](https://ipfs.io/docs/install/) for your OS. -* Enter the following commands - * `ipfs init` - * `ipfs daemon` - -`ipfs daemon` will start the ipfs. In the output you will see the following. ->API server listening on /ip4/127.0.0.1/tcp/5001
->Gateway (readonly) server listening on /ip4/127.0.0.1/tcp/8080 - -You can edit the following ports with `ipfs config edit`. -As zeppelin also uses 8080 by default you might want to chage the gateway server or run zeppelin on another port. - Comment out the following in **zeppelin-site.xml** Make sure you enter the correct ipfs apiServer. diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 717f0a08603..2f0f1f1a715 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -735,19 +735,25 @@ protected Note importNoteFromUrl(NotebookSocket conn, HashSet userAndRol Notebook notebook, Message fromMessage) throws IOException { Note note = null; if (fromMessage != null) { - String urlHash = (String) fromMessage.get("hash"); - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - note = notebook.importNote(urlHash, subject); - Message message = new Message(OP.IMPORT_NOTE_STATUS); - if (note != null) { - note.persist(subject); - message.put("importStatus", "success"); - } - else { - message.put("importStatus", "failure"); + String type = (String) fromMessage.get("type"); + if (type.equals("ipfs")) { + String name = (String) fromMessage.get("name"); + Map options = ((Map) fromMessage.get("options")); + String urlHash = (String) options.get("hash"); + AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); + note = notebook.importNoteFromBackend(urlHash, name, subject); + Message message = new Message(OP.IMPORT_NOTE_STATUS); + message.put("type", "ipfs"); + message.put("options", options); + if (note != null) { + note.persist(subject); + message.put("importStatus", "success"); + } else { + message.put("importStatus", "failure"); + } + conn.send(serializeMessage(message)); + broadcastNoteList(subject); } - conn.send(serializeMessage(message)); - broadcastNoteList(subject); } return note; } diff --git a/zeppelin-web/src/components/noteName-import/note-import-dialog.html b/zeppelin-web/src/components/noteName-import/note-import-dialog.html index c6bfb7da79f..5cf19436660 100644 --- a/zeppelin-web/src/components/noteName-import/note-import-dialog.html +++ b/zeppelin-web/src/components/noteName-import/note-import-dialog.html @@ -56,14 +56,7 @@ - - + diff --git a/zeppelin-web/src/components/noteName-import/notenameImport.controller.js b/zeppelin-web/src/components/noteName-import/notenameImport.controller.js index a1078d1d0f2..a4c13a2ec9e 100644 --- a/zeppelin-web/src/components/noteName-import/notenameImport.controller.js +++ b/zeppelin-web/src/components/noteName-import/notenameImport.controller.js @@ -19,7 +19,7 @@ angular.module('zeppelinWebApp').controller('NoteImportCtrl', function($scope, $ $scope.note = {}; $scope.note.step1 = true; $scope.note.step2 = false; - $scope.note.urlType = 'normal'; + $scope.note.isOtherUrl = false; vm.resetFlags = function() { $scope.note = {}; @@ -66,7 +66,7 @@ angular.module('zeppelinWebApp').controller('NoteImportCtrl', function($scope, $ vm.importNote = function() { $scope.note.errorText = ''; if ($scope.note.importUrl) { - if ($scope.note.urlType === 'normal') { + if (!$scope.note.isOtherUrl) { jQuery.getJSON($scope.note.importUrl, function(result) { vm.processImportJson(result); }).fail(function() { @@ -74,7 +74,7 @@ angular.module('zeppelinWebApp').controller('NoteImportCtrl', function($scope, $ $scope.$apply(); }); } else { - websocketMsgSrv.importNoteFromIpfs($scope.note.importUrl); + websocketMsgSrv.importNoteFromBackend($scope.note.importUrl, $scope.note.noteImportName, 'ipfs'); } } else { $scope.note.errorText = 'Enter URL'; @@ -116,25 +116,21 @@ angular.module('zeppelinWebApp').controller('NoteImportCtrl', function($scope, $ angular.element('#noteImportModal').modal('hide'); }); - $scope.$on('importNoteFail', function(event, data) { - vm.resetFlags(); - angular.element('#noteImportModal').modal('hide'); - var status = data.importStatus; - console.log('Data is %o', data); - if (status === 'success') { - ngToast.success({ - content: 'Successfully imported Note', + $scope.$on('importNoteResult', function(event, data) { + if (data.type === 'ipfs') { + var ngToastConf = { + content: 'Failed to import Note with hash ' + data.options.hash, verticalPosition: 'bottom', horizontalPosition: 'center', timeout: '5000' - }); - } else { - ngToast.danger({ - content: 'Failed to import Note with hash', - verticalPosition: 'bottom', - horizontalPosition: 'center', - timeout: '5000' - }); + }; + + if (data.importStatus === 'success') { + ngToastConf.content = 'Successfully imported Note with hash ' + data.options.hash, + ngToast.success(ngToastConf); + } else { + ngToast.danger(ngToastConf); + } } }); diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js index 05883ce2608..8bab14a0c3b 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js +++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js @@ -61,7 +61,7 @@ angular.module('zeppelinWebApp').factory('websocketEvents', } else if (op === 'NOTES_INFO') { $rootScope.$broadcast('setNoteMenu', data.notes); } else if (op === 'IMPORT_NOTE_STATUS') { - $rootScope.$broadcast('importNoteFail', data); + $rootScope.$broadcast('importNoteResult', data); } else if (op === 'LIST_NOTEBOOK_JOBS') { $rootScope.$broadcast('setNotebookJobs', data.notebookJobs); } else if (op === 'LIST_UPDATE_NOTEBOOK_JOBS') { diff --git a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js index 3986b4edc94..42db3084ea8 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js +++ b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js @@ -151,11 +151,15 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, }); }, - importNoteFromIpfs: function(hashUrl) { + importNoteFromBackend: function(hashUrl, name, type) { websocketEvents.sendNewEvent({ op: 'IMPORT_NOTE_URL', data: { - hash: hashUrl + type: type, + name: name, + options: { + hash: hashUrl + } } }); }, diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 423a9a3fa55..1b98f8c12a0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -189,20 +189,25 @@ public String exportNote(String noteId) throws IOException, IllegalArgumentExcep * @return note * @throws IOException */ - public Note importNote(String url, AuthenticationInfo subject) throws IOException { + public Note importNoteFromBackend(String url, String noteName, AuthenticationInfo subject) + throws IOException { Note newNote = null; Note oldNote = notebookRepo.getNoteFromUrl(url, subject); if (oldNote != null) { try { newNote = createNote(subject); - newNote.setName(oldNote.getName()); + if (noteName != null) { + newNote.setName(noteName); + } else { + newNote.setName(oldNote.getName()); + } List paragraphs = oldNote.getParagraphs(); for (Paragraph p : paragraphs) { newNote.addCloneParagraph(p); } newNote.persist(subject); } catch (IOException e) { - logger.error(e.toString(), e); + logger.error("Importing note from " + url + " failed", e); throw e; } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java index 49282a36944..dbf1cb8f7a0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java @@ -124,13 +124,14 @@ public Revision(String revId, String message, int time) { @Override public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof Revision)) return false; - + if (this == o) { + return true; + } + if (!(o instanceof Revision)) { + return false; + } Revision revision = (Revision) o; - return id.equals(revision.id); - } @Override diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/IPFSNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/IPFSNotebookRepo.java index 5d797cfd346..3e348fdc0b1 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/IPFSNotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/IPFSNotebookRepo.java @@ -63,12 +63,8 @@ public class IPFSNotebookRepo extends VFSNotebookRepo implements NotebookRepo { private static final String API_SERVER_PROPERTY_NAME = "zeppelin.notebook.ipfs.apiServer"; private static final String DEFAULT_API_SERVER = "http://localhost:5001/api/v0/"; private static final Logger LOG = LoggerFactory.getLogger(IPFSNotebookRepo.class); - Type simpleType = new TypeToken>() { - }.getType(); private ExecutorService executorService = Executors.newCachedThreadPool(); private Gson gson; - private Type pinType = new TypeToken>() { - }.getType(); private Ipfs ipfs; private FileObject ipfsNoteHashesJson; private String encoding; @@ -92,6 +88,8 @@ public IPFSNotebookRepo(ZeppelinConfiguration conf) throws IOException { if (versionJson == null) { throw new IOException("Make sure the ipfs daemon is running on given url" + ipfsApiServer); } + Type simpleType = new TypeToken>() { + }.getType(); Map data = gson.fromJson(versionJson, simpleType); String version = data.get("Version"); if (!version.equals("0.4.2")) { @@ -124,8 +122,9 @@ private Map> loadFromFile() throws IOException { InputStream ins = content.getInputStream(); String json = IOUtils.toString(ins, encoding); ins.close(); - if (json.isEmpty() || json == null) + if (json.isEmpty() || json == null) { return new HashMap<>(); + } Type type = new TypeToken>>() { }.getType(); Map> map = gson.fromJson(json, type); @@ -155,8 +154,9 @@ public Note call() throws Exception { } catch (ExecutionException e) { LOG.error("Failed to get note", e); Throwable cause = e.getCause(); - if (cause instanceof IOException) + if (cause instanceof IOException) { throw (IOException) cause; + } } catch (TimeoutException e) { LOG.error("TimeOut reached", e); } finally { @@ -216,7 +216,8 @@ public synchronized void remove(String noteId, AuthenticationInfo subject) throw if (pinJsonResponse == null) { throw new IOException("Failed to retrieve pinned hashes"); } - + Type pinType = new TypeToken>() { + }.getType(); Map data = gson.fromJson(pinJsonResponse, pinType); Map pinnedObjects = (Map) data.get("Keys"); @@ -312,6 +313,8 @@ public Map> getNoteHashes() { public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException { Note note = null; String pinJsonResponse = ipfs.pinLs(); + Type pinType = new TypeToken>() { + }.getType(); Map data = gson.fromJson(pinJsonResponse, pinType); Map allPinnedObjects = (Map) data.get("Keys"); @@ -337,7 +340,7 @@ public void close() { try { ipfs.getClient().close(); } catch (IOException e) { - LOG.info("Couldn't successfully close the ipfsClient"); + LOG.info("Couldn't successfully close the ipfsClient", e); } executorService.shutdown(); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/Ipfs.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/Ipfs.java index d107986e0bc..a5987e645d0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/Ipfs.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/ipfs/Ipfs.java @@ -55,22 +55,24 @@ public Ipfs(String url) { public String pinLs() { String pinnedJsonString = null; HttpGet request = new HttpGet(baseUrl + "pin/ls?type=recursive&quiet=true"); + CloseableHttpResponse response = null; try { - CloseableHttpResponse response = client.execute(request); - try { - int statusCode = response.getStatusLine().getStatusCode(); - HttpEntity resEntity = response.getEntity(); - if (statusCode == HttpStatus.SC_OK && resEntity != null) { - pinnedJsonString = EntityUtils.toString(resEntity, "UTF-8"); - } else { - LOG.error("Failed to get pinned objects with unexpected statuscode " + statusCode); - } - - } finally { - response.close(); + response = client.execute(request); + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + if (statusCode == HttpStatus.SC_OK && resEntity != null) { + pinnedJsonString = EntityUtils.toString(resEntity, "UTF-8"); + } else { + LOG.error("Failed to get pinned objects with unexpected statuscode " + statusCode); } } catch (IOException e) { LOG.error("Http request failed to send to " + baseUrl, e); + } finally { + try { + response.close(); + } catch (IOException | NullPointerException e) { + LOG.error("Failed to close response", e); + } } return pinnedJsonString; } @@ -78,24 +80,23 @@ public String pinLs() { public boolean pinRm(String hash) { boolean removed = false; HttpGet request = new HttpGet(baseUrl + "pin/rm?recursive=true&arg=" + hash); + CloseableHttpResponse response = null; try { - CloseableHttpResponse response = client.execute(request); - try { - int statusCode = response.getStatusLine().getStatusCode(); - HttpEntity resEntity = response.getEntity(); - String responseMessage; - if (statusCode == HttpStatus.SC_INTERNAL_SERVER_ERROR) { - responseMessage = EntityUtils.toString(resEntity, "UTF-8"); - - Type type = new TypeToken>() { - }.getType(); - Map responseMap = new Gson().fromJson(responseMessage, type); - String errormessage = responseMap.get("Message"); - if (errormessage.equals("note pinned")) { - removed = true; - } - - LOG.error("response Message = " + responseMessage); + response = client.execute(request); + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + String responseMessage; + if (statusCode == HttpStatus.SC_INTERNAL_SERVER_ERROR) { + responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + Type type = new TypeToken>() { + }.getType(); + Map responseMap = new Gson().fromJson(responseMessage, type); + String errormessage = responseMap.get("Message"); + if (errormessage.equals("note pinned")) { + removed = true; + } + + LOG.error("response Message = " + responseMessage); /* * HTTP/1.1 500 Internal Server Error * Content-Type: application/json @@ -105,19 +106,21 @@ public boolean pinRm(String hash) { * responseMessage = {"Message":"not pinned","Code":0} * **/ - } else if (statusCode == HttpStatus.SC_OK && resEntity != null) { - responseMessage = EntityUtils.toString(resEntity, "UTF-8"); - removed = true; - LOG.info("response Message = " + responseMessage); - } else { - LOG.error("Failed to remove pinned object with unexpected statuscode " + statusCode); - } - - } finally { - response.close(); + } else if (statusCode == HttpStatus.SC_OK && resEntity != null) { + responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + removed = true; + LOG.info("response Message = " + responseMessage); + } else { + LOG.error("Failed to remove pinned object with unexpected statuscode " + statusCode); } } catch (IOException e) { LOG.error("Http request failed to send to " + baseUrl, e); + } finally { + try { + response.close(); + } catch (IOException | NullPointerException e) { + LOG.error("Failed to close response", e); + } } return removed; } @@ -129,31 +132,34 @@ public String cat(String hash) { .setSocketTimeout(555000) .build(); request.setConfig(config); + CloseableHttpResponse response = null; try { - CloseableHttpResponse response = client.execute(request); - try { - int statusCode = response.getStatusLine().getStatusCode(); - HttpEntity resEntity = response.getEntity(); - if (statusCode == HttpStatus.SC_INTERNAL_SERVER_ERROR) { - String responseMessage = EntityUtils.toString(resEntity, "UTF-8"); - LOG.error("response Message = " + responseMessage); + response = client.execute(request); + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + if (statusCode == HttpStatus.SC_INTERNAL_SERVER_ERROR) { + String responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + LOG.error("response Message = " + responseMessage); /* * HTTP/1.1 500 Internal Server Error * Content-Type: application/json * responseMessage = {"Message":"invalid ipfs ref path","Code":0} * **/ - } else if (statusCode == HttpStatus.SC_OK && resEntity != null) { - String responseMessage = EntityUtils.toString(resEntity, "UTF-8"); - return responseMessage; - } else { - LOG.error("Failed to get ipfs object with unexpected statuscode " + statusCode); - } - } finally { - response.close(); + } else if (statusCode == HttpStatus.SC_OK && resEntity != null) { + String responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + return responseMessage; + } else { + LOG.error("Failed to get ipfs object with unexpected statuscode " + statusCode); } } catch (IOException e) { LOG.error("Http request failed to send to " + baseUrl, e); + } finally { + try { + response.close(); + } catch (IOException | NullPointerException e) { + LOG.error("Failed to close response", e); + } } return ipfsContent; } @@ -170,21 +176,24 @@ public String add(String content) { .APPLICATION_OCTET_STREAM, filename) .build(); request.setEntity(entity); + CloseableHttpResponse response = null; try { - CloseableHttpResponse response = client.execute(request); - try { - int statusCode = response.getStatusLine().getStatusCode(); - HttpEntity resEntity = response.getEntity(); - if (statusCode == HttpStatus.SC_OK && resEntity != null) { - responseMessage = EntityUtils.toString(resEntity, "UTF-8"); - } else { - LOG.error("Failed to add ipfs object with unexpected statuscode " + statusCode); - } - } finally { - response.close(); + response = client.execute(request); + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + if (statusCode == HttpStatus.SC_OK && resEntity != null) { + responseMessage = EntityUtils.toString(resEntity, "UTF-8"); + } else { + LOG.error("Failed to add ipfs object with unexpected statuscode " + statusCode); } } catch (IOException e) { LOG.error("Http request failed to send to " + baseUrl, e); + } finally { + try { + response.close(); + } catch (IOException | NullPointerException e) { + LOG.error("Failed to close response", e); + } } return responseMessage; } @@ -192,22 +201,24 @@ public String add(String content) { public String version() { String version = null; HttpGet request = new HttpGet(baseUrl + "/version"); + CloseableHttpResponse response = null; try { - CloseableHttpResponse response = client.execute(request); - try { - int statusCode = response.getStatusLine().getStatusCode(); - HttpEntity resEntity = response.getEntity(); - if (statusCode == HttpStatus.SC_OK && resEntity != null) { - version = EntityUtils.toString(resEntity, "UTF-8"); - } else { - LOG.error("Failed to get ipfs version with unexpected statuscode " + statusCode); - } - - } finally { - response.close(); + response = client.execute(request); + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity resEntity = response.getEntity(); + if (statusCode == HttpStatus.SC_OK && resEntity != null) { + version = EntityUtils.toString(resEntity, "UTF-8"); + } else { + LOG.error("Failed to get ipfs version with unexpected statuscode " + statusCode); } } catch (IOException e) { LOG.error("Http request failed to send to " + baseUrl, e); + } finally { + try { + response.close(); + } catch (IOException | NullPointerException e) { + LOG.error("Failed to close response", e); + } } return version; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java index ea7602743a7..530d33e7ebf 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java @@ -52,9 +52,12 @@ public static enum OP { IMPORT_NOTE, // [c-s] import notebook // @param object notebook IMPORT_NOTE_URL, // [c-s] import notebook - // @param hash ipfs hash + // @param type ipfs for now + // @param options hash or magnetlink or any future url IMPORT_NOTE_STATUS, //[s-c] notebook import through url failed // @param status success or failure + // @param type ipfs for now + // @param options hash or magnetlink or any future url NOTE_UPDATE, RUN_PARAGRAPH, // [c-s] run paragraph diff --git a/zeppelin-zengine/src/test/resources/ipfsnotehashes.json b/zeppelin-zengine/src/test/resources/ipfsnotehashes.json deleted file mode 100644 index 522ab1e2bf6..00000000000 --- a/zeppelin-zengine/src/test/resources/ipfsnotehashes.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "r": [ - { - "commitMessage": "r notebook no change commit", - "name": "QmcjoEoysvgW3nFSkLfDkLUuZZts4Yop9HvYkTPkrB8hWk", - "time": 1466145258 - } - ], - "2A94M5J1Z": [ - { - "commitMessage": "first para", - "name": "QmNN24LURsvaPqYZs4Y689r9wQWcs3CuFZJkfu5koYVnKS", - "time": 1466168505 - } - ] -} \ No newline at end of file