diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java index eb64393387d..e8434fbe081 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java @@ -35,19 +35,10 @@ import org.apache.zeppelin.cluster.event.ClusterEventListener; import org.apache.zeppelin.cluster.meta.ClusterMeta; import org.apache.zeppelin.cluster.protocol.RaftServerMessagingProtocol; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.nio.file.FileVisitResult; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; import java.time.Duration; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -76,9 +67,15 @@ public class ClusterManagerServer extends ClusterManager { // Connect to the interpreter process that has been created public static String CONNET_EXISTING_PROCESS = "CONNET_EXISTING_PROCESS"; - private List clusterEventListeners = new ArrayList<>(); + private List clusterIntpEventListeners = new ArrayList<>(); + private List clusterNoteEventListeners = new ArrayList<>(); + private List clusterAuthEventListeners = new ArrayList<>(); + // zeppelin cluster event - public static String ZEPL_CLUSTER_EVENT_TOPIC = "ZEPL_CLUSTER_EVENT_TOPIC"; + public static String CLUSTER_INTP_EVENT_TOPIC = "CLUSTER_INTP_EVENT_TOPIC"; + public static String CLUSTER_NOTE_EVENT_TOPIC = "CLUSTER_NOTE_EVENT_TOPIC"; + public static String CLUSTER_AUTH_EVENT_TOPIC = "CLUSTER_AUTH_EVENT_TOPIC"; + public static String CLUSTER_NB_AUTH_EVENT_TOPIC = "CLUSTER_NB_AUTH_EVENT_TOPIC"; private ClusterManagerServer() { super(); @@ -206,8 +203,12 @@ public BroadcastService getBroadcastService() { raftServer = builder.build(); raftServer.bootstrap(clusterMemberIds); - messagingService.registerHandler(ZEPL_CLUSTER_EVENT_TOPIC, - subscribeClusterEvent, MoreExecutors.directExecutor()); + messagingService.registerHandler(CLUSTER_INTP_EVENT_TOPIC, + subscribeClusterIntpEvent, MoreExecutors.directExecutor()); + messagingService.registerHandler(CLUSTER_NOTE_EVENT_TOPIC, + subscribeClusterNoteEvent, MoreExecutors.directExecutor()); + messagingService.registerHandler(CLUSTER_AUTH_EVENT_TOPIC, + subscribeClusterAuthEvent, MoreExecutors.directExecutor()); LOGGER.info("RaftServer run() <<<"); } @@ -273,12 +274,12 @@ public HashMap getIdleNodeMeta() { return idleNodeMeta; } - public void unicastClusterEvent(String host, int port, String msg) { + public void unicastClusterEvent(String host, int port, String topic, String msg) { LOGGER.info("send unicastClusterEvent message {}", msg); Address address = Address.from(host, port); CompletableFuture response = messagingService.sendAndReceive(address, - ZEPL_CLUSTER_EVENT_TOPIC, msg.getBytes(), Duration.ofSeconds(2)); + topic, msg.getBytes(), Duration.ofSeconds(2)); response.whenComplete((r, e) -> { if (null == e) { LOGGER.error(e.getMessage(), e); @@ -288,7 +289,7 @@ public void unicastClusterEvent(String host, int port, String msg) { }); } - public void broadcastClusterEvent(String msg) { + public void broadcastClusterEvent(String topic, String msg) { LOGGER.info("send broadcastClusterEvent message {}", msg); for (Node node : clusterNodes) { @@ -299,7 +300,7 @@ public void broadcastClusterEvent(String msg) { } CompletableFuture response = messagingService.sendAndReceive(node.address(), - ZEPL_CLUSTER_EVENT_TOPIC, msg.getBytes(), Duration.ofSeconds(2)); + topic, msg.getBytes(), Duration.ofSeconds(2)); response.whenComplete((r, e) -> { if (null == e) { LOGGER.error(e.getMessage(), e); @@ -310,18 +311,51 @@ public void broadcastClusterEvent(String msg) { } } - private BiFunction subscribeClusterEvent = (address, data) -> { + private BiFunction subscribeClusterIntpEvent = (address, data) -> { + String message = new String(data); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("subscribeClusterIntpEvent() {}", message); + } + for (ClusterEventListener eventListener : clusterIntpEventListeners) { + eventListener.onClusterEvent(message); + } + + return null; + }; + + private BiFunction subscribeClusterNoteEvent = (address, data) -> { String message = new String(data); - LOGGER.info("subscribeClusterEvent() {}", message); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("subscribeClusterNoteEvent() {}", message); + } + for (ClusterEventListener eventListener : clusterNoteEventListeners) { + eventListener.onClusterEvent(message); + } - for (ClusterEventListener eventListener : clusterEventListeners) { + return null; + }; + + private BiFunction subscribeClusterAuthEvent = (address, data) -> { + String message = new String(data); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("subscribeClusterAuthEvent() {}", message); + } + for (ClusterEventListener eventListener : clusterAuthEventListeners) { eventListener.onClusterEvent(message); } return null; }; - public void addClusterEventListeners(ClusterEventListener listener) { - clusterEventListeners.add(listener); + public void addClusterEventListeners(String topic, ClusterEventListener listener) { + if (StringUtils.equals(topic, CLUSTER_INTP_EVENT_TOPIC)) { + clusterIntpEventListeners.add(listener); + } else if (StringUtils.equals(topic, CLUSTER_NOTE_EVENT_TOPIC)) { + clusterNoteEventListeners.add(listener); + } else if (StringUtils.equals(topic, CLUSTER_AUTH_EVENT_TOPIC)) { + clusterAuthEventListeners.add(listener); + } else { + LOGGER.error("Unknow cluster event topic : {}", topic); + } } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java index 0e1120ce66b..4fb61da5343 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java @@ -20,5 +20,21 @@ * Cluster Event */ public enum ClusterEvent { - CREATE_INTP_PROCESS + // CLUSTER_INTP_EVENT_TOPIC + CREATE_INTP_PROCESS, + // CLUSTER_NOTE_EVENT_TOPIC + BROADCAST_NOTE, + BROADCAST_NOTE_LIST, + BROADCAST_PARAGRAPH, + BROADCAST_PARAGRAPHS, + BROADCAST_NEW_PARAGRAPH, + UPDATE_NOTE_PERMISSIONS, + // CLUSTER_AUTH_EVENT_TOPIC + SET_ROLES, + SET_READERS_PERMISSIONS, + SET_RUNNERS_PERMISSIONS, + SET_WRITERS_PERMISSIONS, + SET_OWNERS_PERMISSIONS, + CLEAR_PERMISSION, + SET_NEW_NOTE_PERMISSIONS } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java new file mode 100644 index 00000000000..1fa6938f7df --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster.event; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.zeppelin.display.Input; + +import java.util.HashMap; +import java.util.Map; + +public class ClusterMessage { + public ClusterEvent clusterEvent; + private Map data = new HashMap<>(); + + private static Gson gson = new GsonBuilder() + .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ") + .setPrettyPrinting() + .registerTypeAdapterFactory(Input.TypeAdapterFactory).create(); + + public ClusterMessage(ClusterEvent event) { + this.clusterEvent = event; + } + + public ClusterMessage put(String k, String v) { + data.put(k, v); + return this; + } + + public String get(String k) { + return data.get(k); + } + + public Map getData() { + return data; + } + + public static ClusterMessage deserializeMessage(String msg) { + return gson.fromJson(msg, ClusterMessage.class); + } + + public static String serializeMessage(ClusterMessage m) { + return gson.toJson(m); + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index d4824dc0693..1a0e3ad48a0 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.function.Predicate; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.configuration.tree.ConfigurationNode; @@ -311,6 +312,11 @@ public String getServerAddress() { return getString(ConfVars.ZEPPELIN_ADDR); } + @VisibleForTesting + public void setServerPort(int port) { + properties.put(ConfVars.ZEPPELIN_PORT.getVarName(), String.valueOf(port)); + } + public int getServerPort() { return getInt(ConfVars.ZEPPELIN_PORT); } diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java index 7d8ff1e3b7a..1fe77a03a79 100644 --- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java @@ -56,7 +56,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher public ClusterInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) throws IOException { super(zConf, recoveryStorage); - clusterServer.addClusterEventListeners(this); + clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC, this); } @Override @@ -104,7 +104,8 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep mapEvent.put(CLUSTER_EVENT, CREATE_INTP_PROCESS); mapEvent.put(CLUSTER_EVENT_MSG, sContext); String strEvent = gson.toJson(mapEvent); - clusterServer.unicastClusterEvent(srvHost, srvPort, strEvent); + clusterServer.unicastClusterEvent( + srvHost, srvPort, ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC, strEvent); HashMap intpMeta = clusterServer .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId); @@ -145,9 +146,13 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep } @Override - public void onClusterEvent(String event) { + public void onClusterEvent(String msg) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(msg); + } + Gson gson = new Gson(); - Map mapEvent = gson.fromJson(event, + Map mapEvent = gson.fromJson(msg, new TypeToken>(){}.getType()); String sEvent = (String) mapEvent.get(CLUSTER_EVENT); ClusterEvent clusterEvent = ClusterEvent.valueOf(sEvent); @@ -157,7 +162,7 @@ public void onClusterEvent(String event) { onCreateIntpProcess(mapEvent); break; default: - LOGGER.error("Unknown Cluster Event : {}", clusterEvent); + LOGGER.error("Unknown clusterEvent:{}, msg:{} ", clusterEvent, msg); break; } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 1e180d8049f..10ee1804adb 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -353,7 +353,12 @@ private static void setupNotebookServer( private static void setupClusterManagerServer(ServiceLocator serviceLocator) { if (conf.isClusterMode()) { - ClusterManagerServer.getInstance().start(); + ClusterManagerServer clusterManagerServer = ClusterManagerServer.getInstance(); + NotebookServer notebookServer = serviceLocator.getService(NotebookServer.class); + AuthorizationService authorizationService = serviceLocator.getService(AuthorizationService.class); + clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, notebookServer); + clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, authorizationService); + clusterManagerServer.start(); } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 2130b9851a5..20527b9a0a9 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -42,6 +42,10 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.thrift.TException; +import org.apache.zeppelin.cluster.ClusterManagerServer; +import org.apache.zeppelin.cluster.event.ClusterEvent; +import org.apache.zeppelin.cluster.event.ClusterEventListener; +import org.apache.zeppelin.cluster.event.ClusterMessage; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; @@ -104,7 +108,8 @@ public class NotebookServer extends WebSocketServlet RemoteInterpreterProcessListener, ApplicationEventListener, ParagraphJobListener, - NoteEventListener { + NoteEventListener, + ClusterEventListener { /** * Job manager service type. @@ -525,50 +530,177 @@ public void getInterpreterBindings(NotebookSocket conn, Message fromMessage) thr } public void broadcastNote(Note note) { - connectionManager.broadcast(note.getId(), new Message(OP.NOTE).put("note", note)); + inlineBroadcastNote(note); + broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE, note); } - public void broadcastParagraph(Note note, Paragraph p) { + private void inlineBroadcastNote(Note note) { + Message message = new Message(OP.NOTE).put("note", note); + connectionManager.broadcast(note.getId(), message); + } + + private void inlineBroadcastParagraph(Note note, Paragraph p) { broadcastNoteForms(note); if (note.isPersonalizedMode()) { broadcastParagraphs(p.getUserParagraphMap(), p); } else { - connectionManager.broadcast(note.getId(), - new Message(OP.PARAGRAPH).put("paragraph", new ParagraphWithRuntimeInfo(p))); + Message message = new Message(OP.PARAGRAPH).put("paragraph", new ParagraphWithRuntimeInfo(p)); + connectionManager.broadcast(note.getId(), message); } } - public void broadcastParagraphs(Map userParagraphMap, - Paragraph defaultParagraph) { + public void broadcastParagraph(Note note, Paragraph p) { + inlineBroadcastParagraph(note, p); + broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPH, note, p); + } + + private void inlineBroadcastParagraphs(Map userParagraphMap, + Paragraph defaultParagraph) { if (null != userParagraphMap) { for (String user : userParagraphMap.keySet()) { - connectionManager.multicastToUser(user, - new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user))); + Message message = new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user)); + connectionManager.multicastToUser(user, message); } } } - private void broadcastNewParagraph(Note note, Paragraph para) { + private void broadcastParagraphs(Map userParagraphMap, + Paragraph defaultParagraph) { + inlineBroadcastParagraphs(userParagraphMap, defaultParagraph); + broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPHS, userParagraphMap, defaultParagraph); + } + + private void inlineBroadcastNewParagraph(Note note, Paragraph para) { LOG.info("Broadcasting paragraph on run call instead of note."); int paraIndex = note.getParagraphs().indexOf(para); - connectionManager.broadcast(note.getId(), - new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex)); + + Message message = new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex); + connectionManager.broadcast(note.getId(), message); } - public void broadcastNoteList(AuthenticationInfo subject, Set userAndRoles) { + private void broadcastNewParagraph(Note note, Paragraph para) { + inlineBroadcastNewParagraph(note, para); + broadcastClusterEvent(ClusterEvent.BROADCAST_NEW_PARAGRAPH, note, para); + } + + public void inlineBroadcastNoteList(AuthenticationInfo subject, Set userAndRoles) { if (subject == null) { subject = new AuthenticationInfo(StringUtils.EMPTY); } //send first to requesting user List notesInfo = getNotebook().getNotesInfo( - noteId -> getNotebookAuthorizationService().isReader(noteId, userAndRoles)); - connectionManager.multicastToUser(subject.getUser(), - new Message(OP.NOTES_INFO).put("notes", notesInfo)); + noteId -> getNotebookAuthorizationService().isReader(noteId, userAndRoles)); + Message message = new Message(OP.NOTES_INFO).put("notes", notesInfo); + connectionManager.multicastToUser(subject.getUser(), message); //to others afterwards connectionManager.broadcastNoteListExcept(notesInfo, subject); } + public void broadcastNoteList(AuthenticationInfo subject, Set userAndRoles) { + inlineBroadcastNoteList(subject, userAndRoles); + broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE_LIST, subject, userAndRoles); + } + + // broadcast ClusterEvent + private void broadcastClusterEvent(ClusterEvent event, Object... objects) { + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + if (!conf.isClusterMode()) { + return; + } + + ClusterMessage clusterMessage = new ClusterMessage(event); + + for(Object object : objects) { + String json = ""; + if (object instanceof AuthenticationInfo) { + json = ((AuthenticationInfo) object).toJson(); + clusterMessage.put("AuthenticationInfo", json); + } else if (object instanceof Note) { + json = ((Note) object).toJson(); + clusterMessage.put("Note", json); + } else if (object instanceof Paragraph) { + json = ((Paragraph) object).toJson(); + clusterMessage.put("Paragraph", json); + } else if (object instanceof Set) { + Gson gson = new Gson(); + json = gson.toJson(object); + clusterMessage.put("Set", json); + } else if (object instanceof Map) { + Gson gson = new Gson(); + json = gson.toJson(object); + clusterMessage.put("Map", json); + } else { + LOG.error("Unknown object type!"); + } + } + + String msg = ClusterMessage.serializeMessage(clusterMessage); + ClusterManagerServer.getInstance().broadcastClusterEvent( + ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, msg); + } + + @Override + public void onClusterEvent(String msg) { + if (LOG.isDebugEnabled()) { + LOG.debug("onClusterEvent : {}", msg); + } + ClusterMessage message = ClusterMessage.deserializeMessage(msg); + + Note note = null; + Paragraph paragraph = null; + Set userAndRoles = null; + Map userParagraphMap = null; + AuthenticationInfo authenticationInfo = null; + for (Map.Entry entry : message.getData().entrySet()) { + String key = entry.getKey(); + String json = entry.getValue(); + if (StringUtils.equals(key, "AuthenticationInfo")) { + authenticationInfo = AuthenticationInfo.fromJson(json); + } else if (StringUtils.equals(key, "Note")) { + note = Note.fromJson(json); + } else if (StringUtils.equals(key, "Paragraph")) { + paragraph = Paragraph.fromJson(json); + } else if (StringUtils.equals(key, "Set")) { + Gson gson = new Gson(); + userAndRoles = gson.fromJson(json, new TypeToken>() { + }.getType()); + } else if (StringUtils.equals(key, "Map")) { + Gson gson = new Gson(); + userParagraphMap = gson.fromJson(json, new TypeToken>() { + }.getType()); + } else { + LOG.error("Unknown key:{}, json:{}!" + key, json); + } + } + + switch (message.clusterEvent) { + case BROADCAST_NOTE: + inlineBroadcastNote(note); + break; + case BROADCAST_NOTE_LIST: + try { + getNotebook().reloadAllNotes(authenticationInfo); + inlineBroadcastNoteList(authenticationInfo, userAndRoles); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + break; + case BROADCAST_PARAGRAPH: + inlineBroadcastParagraph(note, paragraph); + break; + case BROADCAST_PARAGRAPHS: + inlineBroadcastParagraphs(userParagraphMap, paragraph); + break; + case BROADCAST_NEW_PARAGRAPH: + inlineBroadcastNewParagraph(note, paragraph); + break; + default: + LOG.error("Unknown clusterEvent:{}, msg:{} ", message.clusterEvent, msg); + break; + } + } + public void listNotesInfo(NotebookSocket conn, Message message) throws IOException { getNotebookService().listNotesInfo(false, getServiceContext(message), new WebSocketServiceCallback>(conn) { @@ -583,7 +715,7 @@ public void onSuccess(List notesInfo, public void broadcastReloadedNoteList(NotebookSocket conn, ServiceContext context) throws IOException { - getNotebookService().listNotesInfo(false, context, + getNotebookService().listNotesInfo(true, context, new WebSocketServiceCallback>(conn) { @Override public void onSuccess(List notesInfo, diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterAuthEventListenerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterAuthEventListenerTest.java new file mode 100644 index 00000000000..15bc23e1369 --- /dev/null +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterAuthEventListenerTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.zeppelin.cluster.event.ClusterEventListener; +import org.apache.zeppelin.cluster.event.ClusterMessage; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public class ClusterAuthEventListenerTest implements ClusterEventListener { + private static Logger LOGGER = LoggerFactory.getLogger(ClusterAuthEventListenerTest.class); + + public String receiveMsg = null; + + @Override + public void onClusterEvent(String msg) { + receiveMsg = msg; + LOGGER.info("onClusterEvent : {}", msg); + ClusterMessage message = ClusterMessage.deserializeMessage(msg); + + String noteId = message.get("noteId"); + String user = message.get("user"); + String jsonSet = message.get("set"); + Gson gson = new Gson(); + Set set = gson.fromJson(jsonSet, new TypeToken>() {}.getType()); + + assertNotNull(set); + switch (message.clusterEvent) { + case SET_READERS_PERMISSIONS: + case SET_WRITERS_PERMISSIONS: + case SET_OWNERS_PERMISSIONS: + case SET_RUNNERS_PERMISSIONS: + case CLEAR_PERMISSION: + assertNotNull(noteId); + break; + case SET_ROLES: + assertNotNull(user); + break; + default: + receiveMsg = null; + fail("Unknown clusterEvent : " + message.clusterEvent); + break; + } + } +} diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java new file mode 100644 index 00000000000..7e1898724de --- /dev/null +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster; + +import com.google.common.collect.Sets; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.PutMethod; +import org.apache.thrift.TException; +import org.apache.zeppelin.cluster.meta.ClusterMetaType; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.apache.zeppelin.interpreter.thrift.ParagraphInfo; +import org.apache.zeppelin.interpreter.thrift.ServiceException; +import org.apache.zeppelin.notebook.AuthorizationService; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService; +import org.apache.zeppelin.notebook.scheduler.SchedulerService; +import org.apache.zeppelin.rest.message.NewParagraphRequest; +import org.apache.zeppelin.service.ConfigurationService; +import org.apache.zeppelin.service.NotebookService; +import org.apache.zeppelin.socket.NotebookServer; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.apache.zeppelin.utils.TestUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class ClusterEventTest extends ZeppelinServerMock { + private static Logger LOGGER = LoggerFactory.getLogger(ClusterEventTest.class); + + private static List clusterAuthEventListenerTests = new ArrayList<>(); + private static List clusterNoteEventListenerTests = new ArrayList<>(); + private static List clusterNoteAuthEventListenerTests = new ArrayList<>(); + + private static List clusterServers = new ArrayList<>(); + private static ClusterManagerClient clusterClient = null; + static final String metaKey = "ClusterMultiNodeTestKey"; + + private static Notebook notebook; + private static NotebookServer notebookServer; + private static SchedulerService schedulerService; + private static NotebookService notebookService; + private static AuthorizationService authorizationService; + private HttpServletRequest mockRequest; + private AuthenticationInfo anonymous; + + Gson gson = new Gson(); + + @BeforeClass + public static void init() throws Exception { + ZeppelinConfiguration zconf = genZeppelinConf(); + + ZeppelinServerMock.startUp(ClusterEventTest.class.getSimpleName(), zconf); + notebook = TestUtils.getInstance(Notebook.class); + authorizationService = new AuthorizationService(notebook, notebook.getConf()); + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + schedulerService = new QuartzSchedulerService(conf, notebook); + notebookServer = spy(NotebookServer.getInstance()); + notebookService = + new NotebookService(notebook, authorizationService, conf, schedulerService); + + ConfigurationService configurationService = new ConfigurationService(notebook.getConf()); + when(notebookServer.getNotebookService()).thenReturn(notebookService); + when(notebookServer.getConfigurationService()).thenReturn(configurationService); + + startOtherZeppelinClusterNode(zconf); + } + + @AfterClass + public static void destroy() throws Exception { + ZeppelinServerMock.shutDown(); + + if (null != clusterClient) { + clusterClient.shutdown(); + } + for (ClusterManagerServer clusterServer : clusterServers) { + clusterServer.shutdown(); + } + LOGGER.info("stopCluster <<<"); + } + + @Before + public void setUp() { + mockRequest = mock(HttpServletRequest.class); + anonymous = new AuthenticationInfo("anonymous"); + } + + private static ZeppelinConfiguration genZeppelinConf() + throws IOException, InterruptedException { + String clusterAddrList = ""; + String zServerHost = RemoteInterpreterUtils.findAvailableHostAddress(); + for (int i = 0; i < 3; i ++) { + // Set the cluster IP and port + int zServerPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + clusterAddrList += zServerHost + ":" + zServerPort; + if (i != 2) { + clusterAddrList += ","; + } + } + ZeppelinConfiguration zconf = ZeppelinConfiguration.create(); + zconf.setClusterAddress(clusterAddrList); + LOGGER.info("clusterAddrList = {}", clusterAddrList); + + return zconf; + } + + public static ClusterManagerServer startClusterSingleNode(String clusterAddrList, String clusterHost, int clusterPort) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { + Class clazz = ClusterManagerServer.class; + Constructor constructor = clazz.getDeclaredConstructor(); + constructor.setAccessible(true); + ClusterManagerServer clusterServer = (ClusterManagerServer) constructor.newInstance(); + clusterServer.initTestCluster(clusterAddrList, clusterHost, clusterPort); + + clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, notebookServer); + clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, authorizationService); + return clusterServer; + } + + // + public static void startOtherZeppelinClusterNode(ZeppelinConfiguration zconf) + throws IOException, InterruptedException { + LOGGER.info("startCluster >>>"); + String clusterAddrList = zconf.getClusterAddress(); + + // mock cluster manager server + String cluster[] = clusterAddrList.split(","); + try { + // NOTE: cluster[2] is ZeppelinServerMock + for (int i = 0; i < 2; i ++) { + String[] parts = cluster[i].split(":"); + String clusterHost = parts[0]; + int clusterPort = Integer.valueOf(parts[1]); + + // ClusterSingleNodeMock clusterSingleNodeMock = new ClusterSingleNodeMock(); + ClusterManagerServer clusterServer + = startClusterSingleNode(clusterAddrList, clusterHost, clusterPort); + clusterServers.add(clusterServer); + // clusterSingleNodeMockList.add(clusterSingleNodeMock); + } + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + + for (ClusterManagerServer clusterServer : clusterServers) { + ClusterAuthEventListenerTest clusterAuthEventListenerTest = new ClusterAuthEventListenerTest(); + clusterAuthEventListenerTests.add(clusterAuthEventListenerTest); + clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, clusterAuthEventListenerTest); + + ClusterNoteEventListenerTest clusterNoteEventListenerTest = new ClusterNoteEventListenerTest(); + clusterNoteEventListenerTests.add(clusterNoteEventListenerTest); + clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, clusterNoteEventListenerTest); + + ClusterNoteAuthEventListenerTest clusterNoteAuthEventListenerTest = new ClusterNoteAuthEventListenerTest(); + clusterNoteAuthEventListenerTests.add(clusterNoteAuthEventListenerTest); + clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NB_AUTH_EVENT_TOPIC, clusterNoteAuthEventListenerTest); + + clusterServer.start(); + } + + // mock cluster manager client + clusterClient = ClusterManagerClient.getInstance(); + clusterClient.start(metaKey); + + // Waiting for cluster startup + int wait = 0; + while(wait++ < 100) { + if (clusterIsStartup() && clusterClient.raftInitialized()) { + LOGGER.info("wait {}(ms) found cluster leader", wait*3000); + break; + } + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + } + + Thread.sleep(3000); + assertEquals(true, clusterIsStartup()); + + getClusterServerMeta(); + LOGGER.info("startCluster <<<"); + } + + private void checkClusterNoteEventListener() { + for (ClusterNoteEventListenerTest clusterNoteEventListenerTest : clusterNoteEventListenerTests) { + assertNotNull(clusterNoteEventListenerTest.receiveMsg); + } + } + + private void checkClusterAuthEventListener() { + for (ClusterAuthEventListenerTest clusterAuthEventListenerTest : clusterAuthEventListenerTests) { + assertNotNull(clusterAuthEventListenerTest.receiveMsg); + } + } + + private void checkClusterNoteAuthEventListener() { + for (ClusterNoteAuthEventListenerTest clusterNoteAuthEventListenerTest : clusterNoteAuthEventListenerTests) { + assertNotNull(clusterNoteAuthEventListenerTest.receiveMsg); + } + } + + static boolean clusterIsStartup() { + boolean foundLeader = false; + for (ClusterManagerServer clusterServer : clusterServers) { + if (!clusterServer.raftInitialized()) { + LOGGER.warn("clusterServer not Initialized!"); + return false; + } + } + + return true; + } + + public static void getClusterServerMeta() { + LOGGER.info("getClusterServerMeta >>>"); + // Get metadata for all services + Object srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, ""); + LOGGER.info(srvMeta.toString()); + + Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, ""); + LOGGER.info(intpMeta.toString()); + + assertNotNull(srvMeta); + assertEquals(true, (srvMeta instanceof HashMap)); + HashMap hashMap = (HashMap) srvMeta; + + assertEquals(hashMap.size(), 3); + LOGGER.info("getClusterServerMeta <<<"); + } + + @Test + public void testRenameNoteEvent() throws IOException { + Note note = null; + try { + String oldName = "old_name"; + note = TestUtils.getInstance(Notebook.class).createNote(oldName, anonymous); + assertEquals(note.getName(), oldName); + String noteId = note.getId(); + + final String newName = "testName"; + String jsonRequest = "{\"name\": " + newName + "}"; + + PutMethod put = httpPut("/notebook/" + noteId + "/rename/", jsonRequest); + assertThat("test testRenameNote:", put, isAllowed()); + put.releaseConnection(); + + assertEquals(note.getName(), newName); + + // wait cluster sync event + Thread.sleep(1000); + checkClusterNoteEventListener(); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } finally { + // cleanup + if (null != note) { + TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + } + } + } + @Test + public void testCloneNoteEvent() throws IOException { + Note note1 = null; + String clonedNoteId = null; + try { + note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); + PostMethod post = httpPost("/notebook/" + note1.getId(), ""); + LOG.info("testCloneNote response\n" + post.getResponseBodyAsString()); + assertThat(post, isAllowed()); + Map resp = gson.fromJson(post.getResponseBodyAsString(), + new TypeToken>() {}.getType()); + clonedNoteId = (String) resp.get("body"); + post.releaseConnection(); + + GetMethod get = httpGet("/notebook/" + clonedNoteId); + assertThat(get, isAllowed()); + Map resp2 = gson.fromJson(get.getResponseBodyAsString(), + new TypeToken>() {}.getType()); + Map resp2Body = (Map) resp2.get("body"); + + get.releaseConnection(); + + // wait cluster sync event + Thread.sleep(1000); + checkClusterNoteEventListener(); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } finally { + // cleanup + if (null != note1) { + TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous); + } + if (null != clonedNoteId) { + TestUtils.getInstance(Notebook.class).removeNote(clonedNoteId, anonymous); + } + } + } + + @Test + public void insertParagraphEvent() throws IOException { + Note note = null; + try { + // Create note and set result explicitly + note = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); + Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + InterpreterResult result = new InterpreterResult(InterpreterResult.Code.SUCCESS, + InterpreterResult.Type.TEXT, "result"); + p1.setResult(result); + + // insert new paragraph + NewParagraphRequest newParagraphRequest = new NewParagraphRequest(); + + PostMethod post = httpPost("/notebook/" + note.getId() + "/paragraph", newParagraphRequest.toJson()); + LOG.info("test clear paragraph output response\n" + post.getResponseBodyAsString()); + assertThat(post, isAllowed()); + post.releaseConnection(); + + // wait cluster sync event + Thread.sleep(1000); + checkClusterNoteEventListener(); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } finally { + // cleanup + if (null != note) { + TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous); + } + } + } + + @Test + public void testClusterAuthEvent() throws IOException { + Note note = null; + + try { + note = notebook.createNote("note1", anonymous); + Paragraph p1 = note.addNewParagraph(anonymous); + p1.setText("%md start remote interpreter process"); + p1.setAuthenticationInfo(anonymous); + notebookServer.getNotebook().saveNote(note, anonymous); + + String noteId = note.getId(); + String user1Id = "user1", user2Id = "user2"; + + // test user1 can get anonymous's note + List paragraphList0 = null; + try { + paragraphList0 = notebookServer.getParagraphList(user1Id, noteId); + } catch (ServiceException e) { + LOGGER.error(e.getMessage(), e); + } catch (TException e) { + LOGGER.error(e.getMessage(), e); + } + assertNotNull(user1Id + " can get anonymous's note", paragraphList0); + + // test user1 cannot get user2's note + authorizationService.setOwners(noteId, new HashSet<>(Arrays.asList(user2Id))); + // wait cluster sync event + Thread.sleep(1000); + checkClusterAuthEventListener(); + + authorizationService.setReaders(noteId, new HashSet<>(Arrays.asList(user2Id))); + // wait cluster sync event + Thread.sleep(1000); + checkClusterAuthEventListener(); + + authorizationService.setRunners(noteId, new HashSet<>(Arrays.asList(user2Id))); + // wait cluster sync event + Thread.sleep(1000); + checkClusterAuthEventListener(); + + authorizationService.setWriters(noteId, new HashSet<>(Arrays.asList(user2Id))); + // wait cluster sync event + Thread.sleep(1000); + checkClusterAuthEventListener(); + + Set roles = Sets.newHashSet("admin"); + // set admin roles for both user1 and user2 + authorizationService.setRoles(user2Id, roles); + // wait cluster sync event + Thread.sleep(1000); + checkClusterAuthEventListener(); + + authorizationService.clearPermission(noteId); + // wait cluster sync event + Thread.sleep(1000); + checkClusterAuthEventListener(); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } finally { + if (null != note) { + notebook.removeNote(note.getId(), anonymous); + } + } + } +} diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteAuthEventListenerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteAuthEventListenerTest.java new file mode 100644 index 00000000000..f2ac6b2d175 --- /dev/null +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteAuthEventListenerTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.zeppelin.cluster.event.ClusterEventListener; +import org.apache.zeppelin.cluster.event.ClusterMessage; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public class ClusterNoteAuthEventListenerTest implements ClusterEventListener { + private static Logger LOGGER = LoggerFactory.getLogger(ClusterNoteAuthEventListenerTest.class); + + public String receiveMsg = null; + + @Override + public void onClusterEvent(String msg) { + receiveMsg = msg; + LOGGER.info("onClusterEvent : {}", msg); + ClusterMessage message = ClusterMessage.deserializeMessage(msg); + String noteId = message.get("noteId"); + String json = message.get("subject"); + AuthenticationInfo subject = AuthenticationInfo.fromJson(json); + + assertNotNull(noteId); + assertNotNull(json); + assertNotNull(subject); + } +} diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java new file mode 100644 index 00000000000..a8d9444eaa0 --- /dev/null +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.zeppelin.cluster.event.ClusterEventListener; +import org.apache.zeppelin.cluster.event.ClusterMessage; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +public class ClusterNoteEventListenerTest implements ClusterEventListener { + private static Logger LOGGER = LoggerFactory.getLogger(ClusterNoteEventListenerTest.class); + + public String receiveMsg = null; + + @Override + public void onClusterEvent(String msg) { + receiveMsg = msg; + LOGGER.info("onClusterEvent : {}", msg); + ClusterMessage message = ClusterMessage.deserializeMessage(msg); + + Note note = null; + Paragraph paragraph = null; + Set userAndRoles = null; + Map userParagraphMap = null; + AuthenticationInfo authenticationInfo = null; + for (Map.Entry entry : message.getData().entrySet()) { + String key = entry.getKey(); + String json = entry.getValue(); + if (key.equals("AuthenticationInfo")) { + authenticationInfo = AuthenticationInfo.fromJson(json); + LOGGER.info(authenticationInfo.toJson()); + } else if (key.equals("Note")) { + note = Note.fromJson(json); + LOGGER.info(note.toJson()); + } else if (key.equals("Paragraph")) { + paragraph = Paragraph.fromJson(json); + LOGGER.info(paragraph.toJson()); + } else if (key.equals("Set")) { + Gson gson = new Gson(); + userAndRoles = gson.fromJson(json, new TypeToken>() { + }.getType()); + LOGGER.info(userAndRoles.toString()); + } else if (key.equals("Map")) { + Gson gson = new Gson(); + userParagraphMap = gson.fromJson(json, new TypeToken>() { + }.getType()); + LOGGER.info(userParagraphMap.toString()); + } else { + receiveMsg = null; + fail("Unknown clusterEvent : " + message.clusterEvent); + } + } + } +} diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ZeppelinServerMock.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ZeppelinServerMock.java new file mode 100644 index 00000000000..a78a5827598 --- /dev/null +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ZeppelinServerMock.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cluster; + +import org.apache.commons.httpclient.Header; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpMethodBase; +import org.apache.commons.httpclient.cookie.CookiePolicy; +import org.apache.commons.httpclient.methods.ByteArrayRequestEntity; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.PutMethod; +import org.apache.commons.httpclient.methods.RequestEntity; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.plugin.PluginManager; +import org.apache.zeppelin.server.ZeppelinServer; +import org.apache.zeppelin.utils.TestUtils; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.regex.Pattern; + +public class ZeppelinServerMock { + protected static final Logger LOG = LoggerFactory.getLogger(ZeppelinServerMock.class); + + static final String REST_API_URL = "/api"; + static final String URL = getUrlToTest(); + protected static final boolean WAS_RUNNING = checkIfServerIsRunning(); + + protected static File zeppelinHome; + protected static File confDir; + protected static File notebookDir; + + private String getUrl(String path) { + String url; + if (System.getProperty("url") != null) { + url = System.getProperty("url"); + } else { + url = "http://localhost:8080"; + } + url += REST_API_URL; + if (path != null) { + url += path; + } + + return url; + } + + protected static String getUrlToTest() { + String url = "http://localhost:8080" + REST_API_URL; + if (System.getProperty("url") != null) { + url = System.getProperty("url"); + } + return url; + } + + static ExecutorService executor; + protected static final Runnable SERVER = new Runnable() { + @Override + public void run() { + try { + TestUtils.clearInstances(); + ZeppelinServer.main(new String[]{""}); + } catch (Exception e) { + LOG.error("Exception in WebDriverManager while getWebDriver ", e); + throw new RuntimeException(e); + } + } + }; + + private static void start(String testClassName, boolean cleanData, ZeppelinConfiguration zconf) + throws Exception { + LOG.info("Starting ZeppelinServer testClassName: {}", testClassName); + + if (!WAS_RUNNING) { + // copy the resources files to a temp folder + zeppelinHome = new File(".."); + LOG.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath()); + confDir = new File(zeppelinHome, "conf_" + testClassName); + confDir.mkdirs(); + zconf.save(confDir + "/zeppelin-site.xml"); + + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), + zeppelinHome.getAbsolutePath()); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), + new File("../zeppelin-web/dist").getAbsolutePath()); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), + confDir.getAbsolutePath()); + System.setProperty( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT.getVarName(), + "spark"); + notebookDir = new File(zeppelinHome.getAbsolutePath() + "/notebook_" + testClassName); + if (cleanData) { + FileUtils.deleteDirectory(notebookDir); + } + System.setProperty( + ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), + notebookDir.getPath() + ); + + // some test profile does not build zeppelin-web. + // to prevent zeppelin starting up fail, create zeppelin-web/dist directory + new File("../zeppelin-web/dist").mkdirs(); + + LOG.info("Staring test Zeppelin up..."); + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + + executor = Executors.newSingleThreadExecutor(); + executor.submit(SERVER); + long s = System.currentTimeMillis(); + boolean started = false; + while (System.currentTimeMillis() - s < 1000 * 60 * 3) { // 3 minutes + Thread.sleep(2000); + started = checkIfServerIsRunning(); + if (started == true) { + break; + } + } + if (started == false) { + throw new RuntimeException("Can not start Zeppelin server"); + } + //ZeppelinServer.notebook.setParagraphJobListener(NotebookServer.getInstance()); + LOG.info("Test Zeppelin stared."); + } + } + + protected static void startUp(String testClassName, ZeppelinConfiguration zconf) throws Exception { + start(testClassName, true, zconf); + } + + protected static void shutDown() throws Exception { + shutDown(true); + } + + protected static void shutDown(final boolean deleteConfDir) throws Exception { + if (!WAS_RUNNING && TestUtils.getInstance(Notebook.class) != null) { + // restart interpreter to stop all interpreter processes + List settingList = TestUtils.getInstance(Notebook.class).getInterpreterSettingManager() + .get(); + if (!TestUtils.getInstance(Notebook.class).getConf().isRecoveryEnabled()) { + for (InterpreterSetting setting : settingList) { + TestUtils.getInstance(Notebook.class).getInterpreterSettingManager().restart(setting.getId()); + } + } + LOG.info("Terminating test Zeppelin..."); + ZeppelinServer.jettyWebServer.stop(); + executor.shutdown(); + PluginManager.reset(); + + long s = System.currentTimeMillis(); + boolean started = true; + while (System.currentTimeMillis() - s < 1000 * 60 * 3) { // 3 minutes + Thread.sleep(2000); + started = checkIfServerIsRunning(); + if (started == false) { + break; + } + } + if (started == true) { + throw new RuntimeException("Can not stop Zeppelin server"); + } + + LOG.info("Test Zeppelin terminated."); + + if (deleteConfDir && !TestUtils.getInstance(Notebook.class).getConf().isRecoveryEnabled()) { + // don't delete interpreter.json when recovery is enabled. otherwise the interpreter setting + // id will change after zeppelin restart, then we can not recover interpreter process + // properly + FileUtils.deleteDirectory(confDir); + } + } + + } + + protected static boolean checkIfServerIsRunning() { + GetMethod request = null; + boolean isRunning; + try { + request = httpGet("/version"); + isRunning = request.getStatusCode() == 200; + } catch (IOException e) { + LOG.error("AbstractTestRestApi.checkIfServerIsRunning() fails .. ZeppelinServer is not " + + "running"); + isRunning = false; + } finally { + if (request != null) { + request.releaseConnection(); + } + } + return isRunning; + } + + protected static GetMethod httpGet(String path) throws IOException { + return httpGet(path, StringUtils.EMPTY, StringUtils.EMPTY); + } + + protected static GetMethod httpGet(String path, String user, String pwd) throws IOException { + return httpGet(path, user, pwd, StringUtils.EMPTY); + } + + protected static GetMethod httpGet(String path, String user, String pwd, String cookies) + throws IOException { + LOG.info("Connecting to {}", URL + path); + HttpClient httpClient = new HttpClient(); + GetMethod getMethod = new GetMethod(URL + path); + getMethod.addRequestHeader("Origin", URL); + if (userAndPasswordAreNotBlank(user, pwd)) { + getMethod.setRequestHeader("Cookie", "JSESSIONID=" + getCookie(user, pwd)); + } + if (!StringUtils.isBlank(cookies)) { + getMethod.setRequestHeader("Cookie", getMethod.getResponseHeader("Cookie") + ";" + cookies); + } + httpClient.executeMethod(getMethod); + LOG.info("{} - {}", getMethod.getStatusCode(), getMethod.getStatusText()); + return getMethod; + } + + protected static PutMethod httpPut(String path, String body) throws IOException { + return httpPut(path, body, StringUtils.EMPTY, StringUtils.EMPTY); + } + + protected static PutMethod httpPut(String path, String body, String user, String pwd) + throws IOException { + LOG.info("Connecting to {}", URL + path); + HttpClient httpClient = new HttpClient(); + PutMethod putMethod = new PutMethod(URL + path); + putMethod.addRequestHeader("Origin", URL); + RequestEntity entity = new ByteArrayRequestEntity(body.getBytes("UTF-8")); + putMethod.setRequestEntity(entity); + if (userAndPasswordAreNotBlank(user, pwd)) { + putMethod.setRequestHeader("Cookie", "JSESSIONID=" + getCookie(user, pwd)); + } + httpClient.executeMethod(putMethod); + LOG.info("{} - {}", putMethod.getStatusCode(), putMethod.getStatusText()); + return putMethod; + } + + protected static PostMethod httpPost(String path, String body) throws IOException { + return httpPost(path, body, StringUtils.EMPTY, StringUtils.EMPTY); + } + + protected static PostMethod httpPost(String path, String request, String user, String pwd) + throws IOException { + LOG.info("Connecting to {}", URL + path); + HttpClient httpClient = new HttpClient(); + PostMethod postMethod = new PostMethod(URL + path); + postMethod.setRequestBody(request); + postMethod.getParams().setCookiePolicy(CookiePolicy.IGNORE_COOKIES); + if (userAndPasswordAreNotBlank(user, pwd)) { + postMethod.setRequestHeader("Cookie", "JSESSIONID=" + getCookie(user, pwd)); + } + httpClient.executeMethod(postMethod); + LOG.info("{} - {}", postMethod.getStatusCode(), postMethod.getStatusText()); + return postMethod; + } + + private static String getCookie(String user, String password) throws IOException { + HttpClient httpClient = new HttpClient(); + PostMethod postMethod = new PostMethod(URL + "/login"); + postMethod.addRequestHeader("Origin", URL); + postMethod.setParameter("password", password); + postMethod.setParameter("userName", user); + httpClient.executeMethod(postMethod); + LOG.info("{} - {}", postMethod.getStatusCode(), postMethod.getStatusText()); + Pattern pattern = Pattern.compile("JSESSIONID=([a-zA-Z0-9-]*)"); + Header[] setCookieHeaders = postMethod.getResponseHeaders("Set-Cookie"); + String jsessionId = null; + for (Header setCookie : setCookieHeaders) { + java.util.regex.Matcher matcher = pattern.matcher(setCookie.toString()); + if (matcher.find()) { + jsessionId = matcher.group(1); + } + } + + if (jsessionId != null) { + return jsessionId; + } else { + return StringUtils.EMPTY; + } + } + + protected static boolean userAndPasswordAreNotBlank(String user, String pwd) { + if (StringUtils.isBlank(user) && StringUtils.isBlank(pwd)) { + return false; + } + return true; + } + + protected Matcher responsesWith(final int expectedStatusCode) { + return new TypeSafeMatcher() { + WeakReference method; + + @Override + public boolean matchesSafely(HttpMethodBase httpMethodBase) { + method = (method == null) ? new WeakReference<>(httpMethodBase) : method; + return httpMethodBase.getStatusCode() == expectedStatusCode; + } + + @Override + public void describeTo(Description description) { + description.appendText("HTTP response ").appendValue(expectedStatusCode) + .appendText(" from ").appendText(method.get().getPath()); + } + + @Override + protected void describeMismatchSafely(HttpMethodBase item, Description description) { + description.appendText("got ").appendValue(item.getStatusCode()).appendText(" ") + .appendText(item.getStatusText()); + } + }; + } + + /** + * Status code matcher. + */ + protected Matcher isForbidden() { + return responsesWith(403); + } + + protected Matcher isAllowed() { + return responsesWith(200); + } + + protected Matcher isCreated() { + return responsesWith(201); + } + + protected Matcher isBadRequest() { + return responsesWith(400); + } + + protected Matcher isNotFound() { + return responsesWith(404); + } + + protected Matcher isNotAllowed() { + return responsesWith(405); + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java index 7a2cde7d64c..5f0ea36f56f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java @@ -20,7 +20,13 @@ import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.Sets; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.cluster.ClusterManagerServer; +import org.apache.zeppelin.cluster.event.ClusterEvent; +import org.apache.zeppelin.cluster.event.ClusterEventListener; +import org.apache.zeppelin.cluster.event.ClusterMessage; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; @@ -37,7 +43,7 @@ * This class is responsible for maintain notes authorization info. And provide api for * setting and querying note authorization info. */ -public class AuthorizationService { +public class AuthorizationService implements ClusterEventListener { private static final Logger LOGGER = LoggerFactory.getLogger(AuthorizationService.class); private static final Set EMPTY_SET = new HashSet<>(); @@ -66,21 +72,41 @@ private Set validateUser(Set users) { } public void setOwners(String noteId, Set entities) { + inlineSetOwners(noteId, entities); + broadcastClusterEvent(ClusterEvent.SET_OWNERS_PERMISSIONS, noteId, null, entities); + } + + private void inlineSetOwners(String noteId, Set entities) { entities = validateUser(entities); notebook.getNote(noteId).setOwners(entities); } public void setReaders(String noteId, Set entities) { + inlineSetReaders(noteId, entities); + broadcastClusterEvent(ClusterEvent.SET_READERS_PERMISSIONS, noteId, null, entities); + } + + private void inlineSetReaders(String noteId, Set entities) { entities = validateUser(entities); notebook.getNote(noteId).setReaders(entities); } public void setRunners(String noteId, Set entities) { + inlineSetRunners(noteId, entities); + broadcastClusterEvent(ClusterEvent.SET_RUNNERS_PERMISSIONS, noteId, null, entities); + } + + private void inlineSetRunners(String noteId, Set entities) { entities = validateUser(entities); notebook.getNote(noteId).setRunners(entities); } public void setWriters(String noteId, Set entities) { + inlineSetWriters(noteId, entities); + broadcastClusterEvent(ClusterEvent.SET_WRITERS_PERMISSIONS, noteId, null, entities); + } + + private void inlineSetWriters(String noteId, Set entities) { entities = validateUser(entities); notebook.getNote(noteId).setWriters(entities); } @@ -211,6 +237,11 @@ public boolean isPublic() { } public void setRoles(String user, Set roles) { + inlineSetRoles(user, roles); + broadcastClusterEvent(ClusterEvent.SET_ROLES, null, user, roles); + } + + private void inlineSetRoles(String user, Set roles) { if (StringUtils.isBlank(user)) { LOGGER.warn("Setting roles for empty user"); return; @@ -241,9 +272,74 @@ public boolean apply(NoteInfo input) { } public void clearPermission(String noteId) { + inlineClearPermission(noteId); + broadcastClusterEvent(ClusterEvent.CLEAR_PERMISSION, noteId, null, null); + } + + public void inlineClearPermission(String noteId) { notebook.getNote(noteId).setReaders(Sets.newHashSet()); notebook.getNote(noteId).setRunners(Sets.newHashSet()); notebook.getNote(noteId).setWriters(Sets.newHashSet()); notebook.getNote(noteId).setOwners(Sets.newHashSet()); } + + @Override + public void onClusterEvent(String msg) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("onClusterEvent : {}", msg); + } + + ClusterMessage message = ClusterMessage.deserializeMessage(msg); + + String noteId = message.get("noteId"); + String user = message.get("user"); + String jsonSet = message.get("set"); + Gson gson = new Gson(); + Set set = gson.fromJson(jsonSet, new TypeToken>() { + }.getType()); + + switch (message.clusterEvent) { + case SET_READERS_PERMISSIONS: + inlineSetReaders(noteId, set); + break; + case SET_WRITERS_PERMISSIONS: + inlineSetWriters(noteId, set); + break; + case SET_OWNERS_PERMISSIONS: + inlineSetOwners(noteId, set); + break; + case SET_RUNNERS_PERMISSIONS: + inlineSetRunners(noteId, set); + break; + case SET_ROLES: + inlineSetRoles(user, set); + break; + case CLEAR_PERMISSION: + inlineClearPermission(noteId); + break; + default: + LOGGER.error("Unknown clusterEvent:{}, msg:{} ", message.clusterEvent, msg); + break; + } + } + + // broadcast cluster event + private void broadcastClusterEvent(ClusterEvent event, String noteId, + String user, Set set) { + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + if (!conf.isClusterMode()) { + return; + } + ClusterMessage message = new ClusterMessage(event); + message.put("noteId", noteId); + message.put("user", user); + + Gson gson = new Gson(); + String json = gson.toJson(set, new TypeToken>() { + }.getType()); + message.put("set", json); + String msg = ClusterMessage.serializeMessage(message); + ClusterManagerServer.getInstance().broadcastClusterEvent( + ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, msg); + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java index 0443891f0cb..8429e0ca8a7 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.notebook; import java.io.IOException; +import java.lang.reflect.GenericSignatureFormatError; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -26,7 +27,12 @@ import java.util.Map; import java.util.Set; +import com.google.gson.Gson; import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.cluster.ClusterManagerServer; +import org.apache.zeppelin.cluster.event.ClusterEvent; +import org.apache.zeppelin.cluster.event.ClusterEventListener; +import org.apache.zeppelin.cluster.event.ClusterMessage; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.scheduler.Job; @@ -42,7 +48,7 @@ /** * Contains authorization information for notes */ -public class NotebookAuthorization implements NoteEventListener { +public class NotebookAuthorization implements NoteEventListener, ClusterEventListener { private static final Logger LOG = LoggerFactory.getLogger(NotebookAuthorization.class); private static NotebookAuthorization instance = null; /* @@ -64,6 +70,8 @@ public static NotebookAuthorization init(ZeppelinConfiguration config) { if (instance == null) { instance = new NotebookAuthorization(); conf = config; + ClusterManagerServer.getInstance().addClusterEventListeners( + ClusterManagerServer.CLUSTER_NB_AUTH_EVENT_TOPIC, instance); try { configStorage = ConfigStorage.getInstance(config); loadFromFile(); @@ -378,8 +386,13 @@ public boolean apply(NoteInfo input) { } }).toList(); } - + public void setNewNotePermissions(String noteId, AuthenticationInfo subject) { + inlineSetNewNotePermissions(noteId, subject); + broadcastClusterEvent(ClusterEvent.SET_NEW_NOTE_PERMISSIONS, noteId, subject); + } + + public void inlineSetNewNotePermissions(String noteId, AuthenticationInfo subject) { if (!AuthenticationInfo.isAnonymous(subject)) { if (isPublic()) { // add current user to owners - can be public @@ -387,16 +400,20 @@ public void setNewNotePermissions(String noteId, AuthenticationInfo subject) { owners.add(subject.getUser()); setOwners(noteId, owners); } else { + Map> mapEntities = new HashMap<>(); // add current user to owners, readers, runners, writers - private note Set entities = getOwners(noteId); entities.add(subject.getUser()); setOwners(noteId, entities); + entities = getReaders(noteId); entities.add(subject.getUser()); setReaders(noteId, entities); + entities = getRunners(noteId); entities.add(subject.getUser()); setRunners(noteId, entities); + entities = getWriters(noteId); entities.add(subject.getUser()); setWriters(noteId, entities); @@ -438,4 +455,31 @@ public void onParagraphUpdate(Paragraph p) throws IOException { public void onParagraphStatusChange(Paragraph p, Job.Status status) { } + + @Override + public void onClusterEvent(String msg) { + if (LOG.isDebugEnabled()) { + LOG.info("onClusterEvent : {}", msg); + } + ClusterMessage message = ClusterMessage.deserializeMessage(msg); + String noteId = message.get("noteId"); + String json = message.get("subject"); + AuthenticationInfo subject = AuthenticationInfo.fromJson(json); + + inlineSetNewNotePermissions(noteId, subject); + } + + // broadcast cluster event + private void broadcastClusterEvent(ClusterEvent event, String noteId, AuthenticationInfo subject) { + if (!conf.isClusterMode()) { + return; + } + + ClusterMessage message = new ClusterMessage(event); + message.put("noteId", noteId); + message.put("subject", subject.toJson()); + String msg = ClusterMessage.serializeMessage(message); + ClusterManagerServer.getInstance().broadcastClusterEvent( + ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, msg); + } }