diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 3fa3d8ddb7a..08769c83db6 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -129,6 +129,7 @@ public void onMessage(NotebookSocket conn, String msg) { userAndRoles.addAll(roles); } } + conn.setUserAndRoles(userAndRoles); /** Lets be elegant here */ switch (messagereceived.op) { @@ -374,7 +375,23 @@ public List> generateNotebooksInfo(boolean needsReload) { } public void broadcastNote(Note note) { - broadcast(note.id(), new Message(OP.NOTE).put("note", note)); + String noteId = note.id(); + synchronized (noteSocketMap) { + List socketLists = noteSocketMap.get(noteId); + if (socketLists == null || socketLists.size() == 0) { + return; + } + for (NotebookSocket conn : socketLists) { + try { + NoteSubset ns = note.getNoteSubset(conn.getUserAndRoles()); + Message m = new Message(OP.NOTE).put("note", ns); + LOG.debug("SEND >> " + m.op); + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOG.error("socket error", e); + } + } + } } public void broadcastNoteList() { @@ -419,7 +436,8 @@ private void sendNote(NotebookSocket conn, HashSet userAndRoles, Noteboo return; } addConnectionToNote(note.id(), conn); - conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); + NoteSubset ns = note.getNoteSubset(userAndRoles); + conn.send(serializeMessage(new Message(OP.NOTE).put("note", ns))); sendAllAngularObjects(note, conn); } } @@ -441,7 +459,8 @@ private void sendHomeNote(NotebookSocket conn, HashSet userAndRoles, return; } addConnectionToNote(note.id(), conn); - conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); + NoteSubset ns = note.getNoteSubset(userAndRoles); + conn.send(serializeMessage(new Message(OP.NOTE).put("note", ns))); sendAllAngularObjects(note, conn); } else { removeConnectionFromAllNote(conn); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java index 5d68bf5ec2d..ca92a972186 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java @@ -17,6 +17,8 @@ package org.apache.zeppelin.socket; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; import javax.servlet.http.HttpServletRequest; @@ -32,6 +34,7 @@ public class NotebookSocket extends WebSocketAdapter { private NotebookSocketListener listener; private HttpServletRequest request; private String protocol; + private HashSet userAndRoles; public NotebookSocket(HttpServletRequest req, String protocol, NotebookSocketListener listener) { @@ -40,6 +43,14 @@ public NotebookSocket(HttpServletRequest req, String protocol, this.protocol = protocol; } + public Set getUserAndRoles() { + return userAndRoles; + } + + public void setUserAndRoles(Set userAndRoles) { + this.userAndRoles = new HashSet<>(userAndRoles); + } + @Override public void onWebSocketClose(int closeCode, String message) { listener.onClose(this, closeCode, message); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index f2bbe11c18d..08cbf422a68 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -96,6 +96,19 @@ public Note(NotebookRepo repo, NoteInterpreterLoader replLoader, generateId(); } + public NoteSubset getNoteSubset(Set userAndRoles) { + NoteSubset ns = new NoteSubset(); + ns.paragraphs = new LinkedList<>(); + for (Paragraph p : this.paragraphs) { + ns.paragraphs.add(p.getParagraphSubset(userAndRoles)); + } + ns.angularObjects = this.angularObjects; + ns.name = this.name; + ns.id = this.id; + ns.config = this.config; + return ns; + } + private void generateId() { id = IdHashes.encode(System.currentTimeMillis() + new Random().nextInt()); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteSubset.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteSubset.java new file mode 100644 index 00000000000..ee94fffb7aa --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteSubset.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook; + +import org.apache.zeppelin.display.AngularObject; +import java.util.*; + +/** + * Subset of the note that filters data by user + */ +public class NoteSubset { + List paragraphs = new LinkedList<>(); + String name = ""; + String id; + + @SuppressWarnings("rawtypes") + Map> angularObjects = new HashMap<>(); + + Map config = new HashMap<>(); + Map info = new HashMap<>(); + + public NoteSubset() { + + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 6f2592b0364..174225b04af 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.notebook; +import com.google.gson.internal.StringMap; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.user.AuthenticationInfo; @@ -43,6 +44,7 @@ * */ public class Paragraph extends Job implements Serializable, Cloneable { + static Logger logger = LoggerFactory.getLogger(Paragraph.class); private static final long serialVersionUID = -6328572073497992016L; private transient NoteInterpreterLoader replLoader; @@ -86,6 +88,84 @@ public Paragraph(Note note, JobListener listener, NoteInterpreterLoader replLoad settings = new GUI(); config = new HashMap(); } + public ParagraphSubset getParagraphSubset(Set userAndRoles) { + ParagraphSubset ps = new ParagraphSubset(); + ps.title = this.title; + ps.text = this.text; + ps.authenticationInfo = this.authenticationInfo; + ps.dateUpdated = this.dateUpdated; + ps.config = this.config; + ps.settings = this.settings; + + ps.jobName = this.getJobName(); + ps.id = this.getId(); + logger.info("{} {} {} {} {}", this.getId(), this.getTitle(), this.getText(), this.config, + this.getReturn()); + Object result = this.getReturn(); + if (result == null) { + ps.result = result; + } else if (result.getClass() == com.google.gson.internal.StringMap.class) { + logger.info(result.getClass().toString()); + StringMap sm = (StringMap) result; + if (sm.get("type").equals("TABLE")) { + String msg = (String) sm.get("msg"); + String newMsg = msgSubset(msg, userAndRoles); + sm.put("msg", newMsg); + } + ps.result = sm; + } else if (result.getClass() == org.apache.zeppelin.interpreter.InterpreterResult.class) { + logger.info(result.getClass().toString()); + InterpreterResult ir = (InterpreterResult) result; + if (ir.type() == InterpreterResult.Type.TABLE) { + String msg = ir.message(); + String newMsg = msgSubset(msg, userAndRoles); + InterpreterResult irNew = new InterpreterResult(ir.code(), ir.type(), newMsg); + ps.result = irNew; + } else { + ps.result = ir; + } + } else { + logger.info(result.getClass().toString()); + ps.result = result; + } + ps.dateCreated = this.getDateCreated(); + ps.dateStarted = this.getDateStarted(); + ps.dateFinished = this.getDateFinished(); + ps.status = this.getStatus(); + return ps; + } + + private String msgSubset(String msg, Set userAndRoles) { + logger.info("Before {} {}", msg, userAndRoles); + if (userAndRoles == null || userAndRoles.contains("anonymous")) { + return msg; + } + int eolIndex = 0; + String[] fields; + eolIndex = msg.indexOf('\n'); + if (eolIndex == -1) { + return msg; + } + String firstLine = msg.substring(0, eolIndex); + fields = firstLine.split("\t"); + int zeppelinUserIndex = Arrays.asList(fields).indexOf("zeppelin_user"); + if (zeppelinUserIndex == -1) { + return msg; + } + + String[] arr = msg.substring(eolIndex + 1).split("\n"); + List newList = new ArrayList<>(); + newList.add(firstLine); + for (String a : arr) { + fields = a.split("\t"); + if (userAndRoles.contains(fields[zeppelinUserIndex])) { + newList.add(a); + } + } + String result = String.join("\n", newList); + logger.info("After {} {}", result, userAndRoles); + return result; + } private static String generateId() { return "paragraph_" + System.currentTimeMillis() + "_" diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphSubset.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphSubset.java new file mode 100644 index 00000000000..69702e54b6f --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphSubset.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook; + +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.user.AuthenticationInfo; + +import java.util.Date; +import java.util.Map; + +/** + * Subset of the paragraph that filters data by user + */ +public class ParagraphSubset { + String title; + String text; + AuthenticationInfo authenticationInfo; + Date dateUpdated; + Map config; // paragraph configs like isOpen, colWidth, etc + GUI settings; + + String jobName; + String id; + Object result; + Date dateCreated; + Date dateStarted; + Date dateFinished; + Job.Status status; + + ParagraphSubset() {} + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ParagraphSubset{"); + sb.append("jobName=").append(jobName); + sb.append("\nid=").append(id); + sb.append("\nresult=").append(result); + sb.append("\nresult class=").append(result != null ? result.getClass() : "null"); + sb.append("\n}"); + return sb.toString(); + } + +}