diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index 727211292b2..b83a8891d0b 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -162,6 +162,7 @@ public Response putNotePermissions(@PathParam("noteId") String noteId, String re AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal()); note.persist(subject); notebookServer.broadcastNote(note); + notebookServer.broadcastNoteList(subject); return new JsonResponse<>(Status.OK).build(); } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index d352c080176..48956c912c9 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -92,7 +92,7 @@ public ZeppelinServer() throws Exception { notebookWsServer, heliumApplicationFactory, depResolver); this.notebookRepo = new NotebookRepoSync(conf); this.notebookIndex = new LuceneSearch(); - this.notebookAuthorization = new NotebookAuthorization(conf); + this.notebookAuthorization = NotebookAuthorization.init(conf); this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath()); notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookWsServer, diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 8b6329a4073..63764c1620e 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -59,6 +59,7 @@ import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -85,6 +86,8 @@ String getKey() { Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create(); final Map> noteSocketMap = new HashMap<>(); final Queue connectedSockets = new ConcurrentLinkedQueue<>(); + final Map> userConnectedSockets = + new ConcurrentHashMap>(); private Notebook notebook() { return ZeppelinServer.notebook; @@ -160,6 +163,9 @@ public void onMessage(NotebookSocket conn, String msg) { userAndRoles.addAll(roles); } } + if (StringUtils.isEmpty(conn.getUser())) { + addUserConnection(messagereceived.principal, conn); + } AuthenticationInfo subject = new AuthenticationInfo(messagereceived.principal); /** Lets be elegant here */ @@ -264,6 +270,26 @@ public void onClose(NotebookSocket conn, int code, String reason) { .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason); removeConnectionFromAllNote(conn); connectedSockets.remove(conn); + removeUserConnection(conn.getUser(), conn); + } + + private void removeUserConnection(String user, NotebookSocket conn) { + if (userConnectedSockets.containsKey(user)) { + userConnectedSockets.get(user).remove(conn); + } else { + LOG.warn("Closing connection that is absent in user connections"); + } + } + + private void addUserConnection(String user, NotebookSocket conn) { + conn.setUser(user); + if (userConnectedSockets.containsKey(user)) { + userConnectedSockets.get(user).add(conn); + } else { + Queue socketQueue = new ConcurrentLinkedQueue<>(); + socketQueue.add(conn); + userConnectedSockets.put(user, socketQueue); + } } protected Message deserializeMessage(String msg) { @@ -379,8 +405,12 @@ private void broadcastExcept(String noteId, Message m, NotebookSocket exclude) { } } - private void broadcastAll(Message m) { - for (NotebookSocket conn : connectedSockets) { + private void multicastToUser(String user, Message m) { + if (!userConnectedSockets.containsKey(user)) { + LOG.warn("Broadcasting to user that is not in connections map"); + return; + } + for (NotebookSocket conn: userConnectedSockets.get(user)) { try { conn.send(serializeMessage(m)); } catch (IOException e) { @@ -472,6 +502,7 @@ public List> generateNotebooksInfo(boolean needsReload, LOG.error("Fail to reload notes from repository", e); } } + List notes = notebook.getAllNotes(subject); List> notesInfo = new LinkedList<>(); for (Note note : notes) { @@ -500,8 +531,20 @@ public void broadcastInterpreterBindings(String noteId, } public void broadcastNoteList(AuthenticationInfo subject) { + if (subject == null) { + subject = new AuthenticationInfo(StringUtils.EMPTY); + } + //send first to requesting user List> notesInfo = generateNotebooksInfo(false, subject); - broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo)); + multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo)); + //to others afterwards + for (String user: userConnectedSockets.keySet()) { + if (subject.getUser() == user) { + continue; + } + notesInfo = generateNotebooksInfo(false, new AuthenticationInfo(user)); + multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo)); + } } public void unicastNoteList(NotebookSocket conn, AuthenticationInfo subject) { @@ -510,8 +553,21 @@ public void unicastNoteList(NotebookSocket conn, AuthenticationInfo subject) { } public void broadcastReloadedNoteList(AuthenticationInfo subject) { + if (subject == null) { + subject = new AuthenticationInfo(StringUtils.EMPTY); + } + //reload and reply first to requesting user List> notesInfo = generateNotebooksInfo(true, subject); - broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo)); + multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo)); + //to others afterwards + for (String user: userConnectedSockets.keySet()) { + if (subject.getUser() == user) { + continue; + } + //reloaded already above; parameter - false + notesInfo = generateNotebooksInfo(false, new AuthenticationInfo(user)); + multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo)); + } } void permissionError(NotebookSocket conn, String op, diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java index 5d68bf5ec2d..2bae36b3b75 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java @@ -20,6 +20,7 @@ import javax.servlet.http.HttpServletRequest; +import org.apache.commons.lang.StringUtils; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WebSocketAdapter; @@ -32,12 +33,14 @@ public class NotebookSocket extends WebSocketAdapter { private NotebookSocketListener listener; private HttpServletRequest request; private String protocol; + private String user; public NotebookSocket(HttpServletRequest req, String protocol, NotebookSocketListener listener) { this.listener = listener; this.request = req; this.protocol = protocol; + this.user = StringUtils.EMPTY; } @Override @@ -69,4 +72,11 @@ public void send(String serializeMessage) throws IOException { connection.getRemote().sendString(serializeMessage); } + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java index 4390d74b495..ad48b50786f 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java @@ -32,6 +32,7 @@ import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.server.ZeppelinServer; +import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.FixMethodOrder; @@ -341,7 +342,9 @@ public void testListNotebooks() throws IOException { Map resp = gson.fromJson(get.getResponseBodyAsString(), new TypeToken>() { }.getType()); List> body = (List>) resp.get("body"); - assertEquals("List notebooks are equal", ZeppelinServer.notebook.getAllNotes().size(), body.size()); + //TODO(khalid): anonymous or specific user notes? + AuthenticationInfo subject = new AuthenticationInfo("anonymous"); + assertEquals("List notebooks are equal", ZeppelinServer.notebook.getAllNotes(subject).size(), body.size()); get.releaseConnection(); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index d961ac033b0..1e65a86e21a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -31,14 +31,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; - import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.stream.JsonReader; -import org.apache.commons.codec.binary.StringUtils; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobBuilder; @@ -484,6 +482,7 @@ public void reloadAllNotes(AuthenticationInfo subject) throws IOException { } List noteInfos = notebookRepo.list(subject); + for (NoteInfo info : noteInfos) { loadNoteFromRepo(info.getId(), subject); } @@ -534,7 +533,7 @@ public int compare(Note note1, Note note2) { return noteList; } } - + public List getAllNotes(AuthenticationInfo subject) { final Set entities = Sets.newHashSet(); if (subject != null) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java index 0633906d110..75dc61b30f6 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java @@ -17,9 +17,13 @@ package org.apache.zeppelin.notebook; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,29 +35,44 @@ */ public class NotebookAuthorization { private static final Logger LOG = LoggerFactory.getLogger(NotebookAuthorization.class); - + private static NotebookAuthorization instance = null; /* * { "note1": { "owners": ["u1"], "readers": ["u1", "u2"], "writers": ["u1"] }, "note2": ... } } */ - private Map>> authInfo = new HashMap<>(); - private ZeppelinConfiguration conf; - private Gson gson; - private String filePath; - - public NotebookAuthorization(ZeppelinConfiguration conf) { - this.conf = conf; - filePath = conf.getNotebookAuthorizationPath(); - GsonBuilder builder = new GsonBuilder(); - builder.setPrettyPrinting(); - gson = builder.create(); - try { - loadFromFile(); - } catch (IOException e) { - LOG.error("Error loading NotebookAuthorization", e); + private static Map>> authInfo = new HashMap<>(); + private static ZeppelinConfiguration conf; + private static Gson gson; + private static String filePath; + + private NotebookAuthorization() {} + + public static NotebookAuthorization init(ZeppelinConfiguration config) { + if (instance == null) { + instance = new NotebookAuthorization(); + conf = config; + filePath = conf.getNotebookAuthorizationPath(); + GsonBuilder builder = new GsonBuilder(); + builder.setPrettyPrinting(); + gson = builder.create(); + try { + loadFromFile(); + } catch (IOException e) { + LOG.error("Error loading NotebookAuthorization", e); + } } + return instance; } - private void loadFromFile() throws IOException { + public static NotebookAuthorization getInstance() { + if (instance == null) { + LOG.warn("Notebook authorization module was called without initialization," + + " initializing with default configuration"); + init(ZeppelinConfiguration.create()); + } + return instance; + } + + private static void loadFromFile() throws IOException { File settingFile = new File(filePath); LOG.info(settingFile.getAbsolutePath()); if (!settingFile.exists()) { @@ -74,7 +93,7 @@ private void loadFromFile() throws IOException { String json = sb.toString(); NotebookAuthorizationInfoSaving info = gson.fromJson(json, NotebookAuthorizationInfoSaving.class); - this.authInfo = info.authInfo; + authInfo = info.authInfo; } private void saveToFile() { @@ -225,4 +244,16 @@ public void removeNote(String noteId) { saveToFile(); } + public List filterByUser(List notes, AuthenticationInfo subject) { + final Set entities = Sets.newHashSet(); + if (subject != null) { + entities.add(subject.getUser()); + } + return FluentIterable.from(notes).filter(new Predicate() { + @Override + public boolean apply(NoteInfo input) { + return input != null && isReader(input.getId(), entities); + } + }).toList(); + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java index f67b71f5d6c..21a1d7a25d9 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java @@ -31,6 +31,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.NotebookAuthorization; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; @@ -178,9 +179,11 @@ public void remove(String noteId, AuthenticationInfo subject) throws IOException */ void sync(int sourceRepoIndex, int destRepoIndex, AuthenticationInfo subject) throws IOException { LOG.info("Sync started"); + NotebookAuthorization auth = NotebookAuthorization.getInstance(); NotebookRepo srcRepo = getRepo(sourceRepoIndex); NotebookRepo dstRepo = getRepo(destRepoIndex); - List srcNotes = srcRepo.list(subject); + List allSrcNotes = srcRepo.list(subject); + List srcNotes = auth.filterByUser(allSrcNotes, subject); List dstNotes = dstRepo.list(subject); Map> noteIDs = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java index b32b3d8feb2..29cdf554de6 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java @@ -89,7 +89,7 @@ public void setUp() throws Exception { SearchService search = mock(SearchService.class); notebookRepo = new VFSNotebookRepo(conf); - NotebookAuthorization notebookAuthorization = new NotebookAuthorization(conf); + NotebookAuthorization notebookAuthorization = NotebookAuthorization.init(conf); notebook = new Notebook( conf, notebookRepo, diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 0ec8e7cfdd1..9e395c83220 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -91,7 +91,7 @@ public void setUp() throws Exception { SearchService search = mock(SearchService.class); notebookRepo = new VFSNotebookRepo(conf); - notebookAuthorization = new NotebookAuthorization(conf); + notebookAuthorization = NotebookAuthorization.init(conf); credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath()); notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this, search, @@ -207,6 +207,7 @@ public void testPersist() throws IOException, SchedulerException, RepositoryExce Notebook notebook2 = new Notebook( conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null, null, null, depResolver), this, null, null, null); + assertEquals(1, notebook2.getAllNotes().size()); } @@ -588,7 +589,7 @@ public void testPermissions() throws IOException { // create a note and a paragraph Note note = notebook.createNote(null); NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); - // empty owners, readers and writers means note is public + // empty owners, readers or writers means note is public assertEquals(notebookAuthorization.isOwner(note.getId(), new HashSet(Arrays.asList("user2"))), true); assertEquals(notebookAuthorization.isReader(note.getId(), @@ -873,6 +874,39 @@ public void testGetAllNotes() throws Exception { assertEquals(1, notebook.getAllNotes(new AuthenticationInfo("user2")).size()); } + + @Test + public void testGetAllNotesWithDifferentPermissions() throws IOException { + AuthenticationInfo user1 = new AuthenticationInfo("user1"); + AuthenticationInfo user2 = new AuthenticationInfo("user2"); + List notes1 = notebook.getAllNotes(user1); + List notes2 = notebook.getAllNotes(user2); + assertEquals(notes1.size(), 0); + assertEquals(notes2.size(), 0); + + //creates note and sets user1 owner + Note note = notebook.createNote(user1); + + // note is public since readers and writers empty + notes1 = notebook.getAllNotes(user1); + notes2 = notebook.getAllNotes(user2); + assertEquals(notes1.size(), 1); + assertEquals(notes2.size(), 1); + + notebook.getNotebookAuthorization().setReaders(note.getId(), Sets.newHashSet("user1")); + //note is public since writers empty + notes1 = notebook.getAllNotes(user1); + notes2 = notebook.getAllNotes(user2); + assertEquals(notes1.size(), 1); + assertEquals(notes2.size(), 1); + + notebook.getNotebookAuthorization().setWriters(note.getId(), Sets.newHashSet("user1")); + notes1 = notebook.getAllNotes(user1); + notes2 = notebook.getAllNotes(user2); + assertEquals(notes1.size(), 1); + assertEquals(notes2.size(), 0); + } + private void delete(File file){ if(file.isFile()) file.delete(); else if(file.isDirectory()){ diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java index c768df80435..95b9209dc52 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java @@ -99,7 +99,7 @@ public void setUp() throws Exception { search = mock(SearchService.class); notebookRepoSync = new NotebookRepoSync(conf); - notebookAuthorization = new NotebookAuthorization(conf); + notebookAuthorization = NotebookAuthorization.init(conf); credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath()); notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this, search, notebookAuthorization, credentials);