Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public Response putNotePermissions(@PathParam("noteId") String noteId, String re
AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
note.persist(subject);
notebookServer.broadcastNote(note);
notebookServer.broadcastNoteList(subject);
return new JsonResponse<>(Status.OK).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public ZeppelinServer() throws Exception {
notebookWsServer, heliumApplicationFactory, depResolver);
this.notebookRepo = new NotebookRepoSync(conf);
this.notebookIndex = new LuceneSearch();
this.notebookAuthorization = new NotebookAuthorization(conf);
this.notebookAuthorization = NotebookAuthorization.init(conf);
this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
notebook = new Notebook(conf,
notebookRepo, schedulerFactory, replFactory, notebookWsServer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
Expand All @@ -85,6 +86,8 @@ String getKey() {
Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create();
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
final Map<String, Queue<NotebookSocket>> userConnectedSockets =
new ConcurrentHashMap<String, Queue<NotebookSocket>>();

private Notebook notebook() {
return ZeppelinServer.notebook;
Expand Down Expand Up @@ -160,6 +163,9 @@ public void onMessage(NotebookSocket conn, String msg) {
userAndRoles.addAll(roles);
}
}
if (StringUtils.isEmpty(conn.getUser())) {
addUserConnection(messagereceived.principal, conn);
}
AuthenticationInfo subject = new AuthenticationInfo(messagereceived.principal);

/** Lets be elegant here */
Expand Down Expand Up @@ -264,6 +270,26 @@ public void onClose(NotebookSocket conn, int code, String reason) {
.getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason);
removeConnectionFromAllNote(conn);
connectedSockets.remove(conn);
removeUserConnection(conn.getUser(), conn);
}

private void removeUserConnection(String user, NotebookSocket conn) {
if (userConnectedSockets.containsKey(user)) {
userConnectedSockets.get(user).remove(conn);
} else {
LOG.warn("Closing connection that is absent in user connections");
}
}

private void addUserConnection(String user, NotebookSocket conn) {
conn.setUser(user);
if (userConnectedSockets.containsKey(user)) {
userConnectedSockets.get(user).add(conn);
} else {
Queue<NotebookSocket> socketQueue = new ConcurrentLinkedQueue<>();
socketQueue.add(conn);
userConnectedSockets.put(user, socketQueue);
}
}

protected Message deserializeMessage(String msg) {
Expand Down Expand Up @@ -379,8 +405,12 @@ private void broadcastExcept(String noteId, Message m, NotebookSocket exclude) {
}
}

private void broadcastAll(Message m) {
for (NotebookSocket conn : connectedSockets) {
private void multicastToUser(String user, Message m) {
if (!userConnectedSockets.containsKey(user)) {
LOG.warn("Broadcasting to user that is not in connections map");
return;
}
for (NotebookSocket conn: userConnectedSockets.get(user)) {
try {
conn.send(serializeMessage(m));
} catch (IOException e) {
Expand Down Expand Up @@ -472,6 +502,7 @@ public List<Map<String, String>> generateNotebooksInfo(boolean needsReload,
LOG.error("Fail to reload notes from repository", e);
}
}

List<Note> notes = notebook.getAllNotes(subject);
List<Map<String, String>> notesInfo = new LinkedList<>();
for (Note note : notes) {
Expand Down Expand Up @@ -500,8 +531,20 @@ public void broadcastInterpreterBindings(String noteId,
}

public void broadcastNoteList(AuthenticationInfo subject) {
if (subject == null) {
subject = new AuthenticationInfo(StringUtils.EMPTY);
}
//send first to requesting user
List<Map<String, String>> notesInfo = generateNotebooksInfo(false, subject);
broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo));
//to others afterwards
for (String user: userConnectedSockets.keySet()) {
if (subject.getUser() == user) {
continue;
}
notesInfo = generateNotebooksInfo(false, new AuthenticationInfo(user));
multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo));
}
}

public void unicastNoteList(NotebookSocket conn, AuthenticationInfo subject) {
Expand All @@ -510,8 +553,21 @@ public void unicastNoteList(NotebookSocket conn, AuthenticationInfo subject) {
}

public void broadcastReloadedNoteList(AuthenticationInfo subject) {
if (subject == null) {
subject = new AuthenticationInfo(StringUtils.EMPTY);
}
//reload and reply first to requesting user
List<Map<String, String>> notesInfo = generateNotebooksInfo(true, subject);
broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo));
//to others afterwards
for (String user: userConnectedSockets.keySet()) {
if (subject.getUser() == user) {
continue;
}
//reloaded already above; parameter - false
notesInfo = generateNotebooksInfo(false, new AuthenticationInfo(user));
multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo));
}
}

void permissionError(NotebookSocket conn, String op,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import javax.servlet.http.HttpServletRequest;

import org.apache.commons.lang.StringUtils;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;

Expand All @@ -32,12 +33,14 @@ public class NotebookSocket extends WebSocketAdapter {
private NotebookSocketListener listener;
private HttpServletRequest request;
private String protocol;
private String user;

public NotebookSocket(HttpServletRequest req, String protocol,
NotebookSocketListener listener) {
this.listener = listener;
this.request = req;
this.protocol = protocol;
this.user = StringUtils.EMPTY;
}

@Override
Expand Down Expand Up @@ -69,4 +72,11 @@ public void send(String serializeMessage) throws IOException {
connection.getRemote().sendString(serializeMessage);
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
Expand Down Expand Up @@ -341,7 +342,9 @@ public void testListNotebooks() throws IOException {
Map<String, Object> resp = gson.fromJson(get.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
}.getType());
List<Map<String, String>> body = (List<Map<String, String>>) resp.get("body");
assertEquals("List notebooks are equal", ZeppelinServer.notebook.getAllNotes().size(), body.size());
//TODO(khalid): anonymous or specific user notes?
AuthenticationInfo subject = new AuthenticationInfo("anonymous");
assertEquals("List notebooks are equal", ZeppelinServer.notebook.getAllNotes(subject).size(), body.size());
get.releaseConnection();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import org.apache.commons.codec.binary.StringUtils;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
Expand Down Expand Up @@ -484,6 +482,7 @@ public void reloadAllNotes(AuthenticationInfo subject) throws IOException {
}

List<NoteInfo> noteInfos = notebookRepo.list(subject);

for (NoteInfo info : noteInfos) {
loadNoteFromRepo(info.getId(), subject);
}
Expand Down Expand Up @@ -534,7 +533,7 @@ public int compare(Note note1, Note note2) {
return noteList;
}
}

public List<Note> getAllNotes(AuthenticationInfo subject) {
final Set<String> entities = Sets.newHashSet();
if (subject != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.zeppelin.notebook;

import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,29 +35,44 @@
*/
public class NotebookAuthorization {
private static final Logger LOG = LoggerFactory.getLogger(NotebookAuthorization.class);

private static NotebookAuthorization instance = null;
/*
* { "note1": { "owners": ["u1"], "readers": ["u1", "u2"], "writers": ["u1"] }, "note2": ... } }
*/
private Map<String, Map<String, Set<String>>> authInfo = new HashMap<>();
private ZeppelinConfiguration conf;
private Gson gson;
private String filePath;

public NotebookAuthorization(ZeppelinConfiguration conf) {
this.conf = conf;
filePath = conf.getNotebookAuthorizationPath();
GsonBuilder builder = new GsonBuilder();
builder.setPrettyPrinting();
gson = builder.create();
try {
loadFromFile();
} catch (IOException e) {
LOG.error("Error loading NotebookAuthorization", e);
private static Map<String, Map<String, Set<String>>> authInfo = new HashMap<>();
private static ZeppelinConfiguration conf;
private static Gson gson;
private static String filePath;

private NotebookAuthorization() {}

public static NotebookAuthorization init(ZeppelinConfiguration config) {
Copy link
Member

@echarles echarles Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why moving from constructor to init method (factory method pattern)?
Especially, the conf object is static, so calling multiple time init will override the conf field. The state of the NotebookAuthorization will not be consistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@echarles good point. actually that's more of a singleton pattern that gets instantiated once on init and can be retrieved by getInstance further on. the purpose of it was to enable other modules (e.g. storage in zengine) not having direct reference to notebookAuthorization, to be able to use its services without changing api. I believe for longer term authorization and storage modules can be refactored in a way that we don't need that pattern.

regarding overriding the conf field, you're right. I addressed it in 9cf1d88 so that it gets instantiated only once.

if (instance == null) {
instance = new NotebookAuthorization();
conf = config;
filePath = conf.getNotebookAuthorizationPath();
GsonBuilder builder = new GsonBuilder();
builder.setPrettyPrinting();
gson = builder.create();
try {
loadFromFile();
} catch (IOException e) {
LOG.error("Error loading NotebookAuthorization", e);
}
}
return instance;
}

private void loadFromFile() throws IOException {
public static NotebookAuthorization getInstance() {
if (instance == null) {
LOG.warn("Notebook authorization module was called without initialization,"
+ " initializing with default configuration");
init(ZeppelinConfiguration.create());
}
return instance;
}

private static void loadFromFile() throws IOException {
File settingFile = new File(filePath);
LOG.info(settingFile.getAbsolutePath());
if (!settingFile.exists()) {
Expand All @@ -74,7 +93,7 @@ private void loadFromFile() throws IOException {
String json = sb.toString();
NotebookAuthorizationInfoSaving info = gson.fromJson(json,
NotebookAuthorizationInfoSaving.class);
this.authInfo = info.authInfo;
authInfo = info.authInfo;
}

private void saveToFile() {
Expand Down Expand Up @@ -225,4 +244,16 @@ public void removeNote(String noteId) {
saveToFile();
}

public List<NoteInfo> filterByUser(List<NoteInfo> notes, AuthenticationInfo subject) {
final Set<String> entities = Sets.newHashSet();
if (subject != null) {
entities.add(subject.getUser());
}
return FluentIterable.from(notes).filter(new Predicate<NoteInfo>() {
@Override
public boolean apply(NoteInfo input) {
return input != null && isReader(input.getId(), entities);
}
}).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.NotebookAuthorization;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
Expand Down Expand Up @@ -178,9 +179,11 @@ public void remove(String noteId, AuthenticationInfo subject) throws IOException
*/
void sync(int sourceRepoIndex, int destRepoIndex, AuthenticationInfo subject) throws IOException {
LOG.info("Sync started");
NotebookAuthorization auth = NotebookAuthorization.getInstance();
NotebookRepo srcRepo = getRepo(sourceRepoIndex);
NotebookRepo dstRepo = getRepo(destRepoIndex);
List <NoteInfo> srcNotes = srcRepo.list(subject);
List <NoteInfo> allSrcNotes = srcRepo.list(subject);
List <NoteInfo> srcNotes = auth.filterByUser(allSrcNotes, subject);
List <NoteInfo> dstNotes = dstRepo.list(subject);

Map<String, List<String>> noteIDs = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void setUp() throws Exception {

SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
NotebookAuthorization notebookAuthorization = new NotebookAuthorization(conf);
NotebookAuthorization notebookAuthorization = NotebookAuthorization.init(conf);
notebook = new Notebook(
conf,
notebookRepo,
Expand Down
Loading