Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,10 @@
import org.apache.zeppelin.cluster.event.ClusterEventListener;
import org.apache.zeppelin.cluster.meta.ClusterMeta;
import org.apache.zeppelin.cluster.protocol.RaftServerMessagingProtocol;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -76,9 +67,15 @@ public class ClusterManagerServer extends ClusterManager {
// Connect to the interpreter process that has been created
public static String CONNET_EXISTING_PROCESS = "CONNET_EXISTING_PROCESS";

private List<ClusterEventListener> clusterEventListeners = new ArrayList<>();
private List<ClusterEventListener> clusterIntpEventListeners = new ArrayList<>();
private List<ClusterEventListener> clusterNoteEventListeners = new ArrayList<>();
private List<ClusterEventListener> clusterAuthEventListeners = new ArrayList<>();

// zeppelin cluster event
public static String ZEPL_CLUSTER_EVENT_TOPIC = "ZEPL_CLUSTER_EVENT_TOPIC";
public static String CLUSTER_INTP_EVENT_TOPIC = "CLUSTER_INTP_EVENT_TOPIC";
public static String CLUSTER_NOTE_EVENT_TOPIC = "CLUSTER_NOTE_EVENT_TOPIC";
public static String CLUSTER_AUTH_EVENT_TOPIC = "CLUSTER_AUTH_EVENT_TOPIC";
public static String CLUSTER_NB_AUTH_EVENT_TOPIC = "CLUSTER_NB_AUTH_EVENT_TOPIC";

private ClusterManagerServer() {
super();
Expand Down Expand Up @@ -206,8 +203,12 @@ public BroadcastService getBroadcastService() {
raftServer = builder.build();
raftServer.bootstrap(clusterMemberIds);

messagingService.registerHandler(ZEPL_CLUSTER_EVENT_TOPIC,
subscribeClusterEvent, MoreExecutors.directExecutor());
messagingService.registerHandler(CLUSTER_INTP_EVENT_TOPIC,
subscribeClusterIntpEvent, MoreExecutors.directExecutor());
messagingService.registerHandler(CLUSTER_NOTE_EVENT_TOPIC,
subscribeClusterNoteEvent, MoreExecutors.directExecutor());
messagingService.registerHandler(CLUSTER_AUTH_EVENT_TOPIC,
subscribeClusterAuthEvent, MoreExecutors.directExecutor());

LOGGER.info("RaftServer run() <<<");
}
Expand Down Expand Up @@ -273,12 +274,12 @@ public HashMap<String, Object> getIdleNodeMeta() {
return idleNodeMeta;
}

public void unicastClusterEvent(String host, int port, String msg) {
public void unicastClusterEvent(String host, int port, String topic, String msg) {
LOGGER.info("send unicastClusterEvent message {}", msg);

Address address = Address.from(host, port);
CompletableFuture<byte[]> response = messagingService.sendAndReceive(address,
ZEPL_CLUSTER_EVENT_TOPIC, msg.getBytes(), Duration.ofSeconds(2));
topic, msg.getBytes(), Duration.ofSeconds(2));
response.whenComplete((r, e) -> {
if (null == e) {
LOGGER.error(e.getMessage(), e);
Expand All @@ -288,7 +289,7 @@ public void unicastClusterEvent(String host, int port, String msg) {
});
}

public void broadcastClusterEvent(String msg) {
public void broadcastClusterEvent(String topic, String msg) {
LOGGER.info("send broadcastClusterEvent message {}", msg);

for (Node node : clusterNodes) {
Expand All @@ -299,7 +300,7 @@ public void broadcastClusterEvent(String msg) {
}

CompletableFuture<byte[]> response = messagingService.sendAndReceive(node.address(),
ZEPL_CLUSTER_EVENT_TOPIC, msg.getBytes(), Duration.ofSeconds(2));
topic, msg.getBytes(), Duration.ofSeconds(2));
response.whenComplete((r, e) -> {
if (null == e) {
LOGGER.error(e.getMessage(), e);
Expand All @@ -310,18 +311,51 @@ public void broadcastClusterEvent(String msg) {
}
}

private BiFunction<Address, byte[], byte[]> subscribeClusterEvent = (address, data) -> {
private BiFunction<Address, byte[], byte[]> subscribeClusterIntpEvent = (address, data) -> {
String message = new String(data);
if (LOGGER.isDebugEnabled()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is if (LOGGER.isDebugEnabled()) { needed? LOGGER.debug should be enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because need to synchronize the create, rename, delete, set read permissions, write permissions, execute permissions, so need to synchronize the more information, there will be a lot of synchronization information in the log file.
So need add if (LOGGER.isDebugEnabled())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand. you are saying logging is useful for debugging, and there could be a lot of information.
isn't debug log will be filtered out if debug log is not enabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the development phase, the debug information is very useful for developers, but after the development is completed, the debug information will be very much. In the log log file, there is a lot of synchronization information, which affects the system administrator to check whether the system is normal.
So, I think these debug messages don't need to be printed to the log file after development. :-)

LOGGER.debug("subscribeClusterIntpEvent() {}", message);
}
for (ClusterEventListener eventListener : clusterIntpEventListeners) {
eventListener.onClusterEvent(message);
}

return null;
};

private BiFunction<Address, byte[], byte[]> subscribeClusterNoteEvent = (address, data) -> {
String message = new String(data);
LOGGER.info("subscribeClusterEvent() {}", message);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("subscribeClusterNoteEvent() {}", message);
}
for (ClusterEventListener eventListener : clusterNoteEventListeners) {
eventListener.onClusterEvent(message);
}

for (ClusterEventListener eventListener : clusterEventListeners) {
return null;
};

private BiFunction<Address, byte[], byte[]> subscribeClusterAuthEvent = (address, data) -> {
String message = new String(data);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("subscribeClusterAuthEvent() {}", message);
}
for (ClusterEventListener eventListener : clusterAuthEventListeners) {
eventListener.onClusterEvent(message);
}

return null;
};

public void addClusterEventListeners(ClusterEventListener listener) {
clusterEventListeners.add(listener);
public void addClusterEventListeners(String topic, ClusterEventListener listener) {
if (StringUtils.equals(topic, CLUSTER_INTP_EVENT_TOPIC)) {
clusterIntpEventListeners.add(listener);
} else if (StringUtils.equals(topic, CLUSTER_NOTE_EVENT_TOPIC)) {
clusterNoteEventListeners.add(listener);
} else if (StringUtils.equals(topic, CLUSTER_AUTH_EVENT_TOPIC)) {
clusterAuthEventListeners.add(listener);
} else {
LOGGER.error("Unknow cluster event topic : {}", topic);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,21 @@
* Cluster Event
*/
public enum ClusterEvent {
CREATE_INTP_PROCESS
// CLUSTER_INTP_EVENT_TOPIC
CREATE_INTP_PROCESS,
// CLUSTER_NOTE_EVENT_TOPIC
BROADCAST_NOTE,
BROADCAST_NOTE_LIST,
BROADCAST_PARAGRAPH,
BROADCAST_PARAGRAPHS,
BROADCAST_NEW_PARAGRAPH,
UPDATE_NOTE_PERMISSIONS,
// CLUSTER_AUTH_EVENT_TOPIC
SET_ROLES,
SET_READERS_PERMISSIONS,
SET_RUNNERS_PERMISSIONS,
SET_WRITERS_PERMISSIONS,
SET_OWNERS_PERMISSIONS,
CLEAR_PERMISSION,
SET_NEW_NOTE_PERMISSIONS
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster.event;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.zeppelin.display.Input;

import java.util.HashMap;
import java.util.Map;

public class ClusterMessage {
public ClusterEvent clusterEvent;
private Map<String, String> data = new HashMap<>();

private static Gson gson = new GsonBuilder()
.setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
.setPrettyPrinting()
.registerTypeAdapterFactory(Input.TypeAdapterFactory).create();

public ClusterMessage(ClusterEvent event) {
this.clusterEvent = event;
}

public ClusterMessage put(String k, String v) {
data.put(k, v);
return this;
}

public String get(String k) {
return data.get(k);
}

public Map<String, String> getData() {
return data;
}

public static ClusterMessage deserializeMessage(String msg) {
return gson.fromJson(msg, ClusterMessage.class);
}

public static String serializeMessage(ClusterMessage m) {
return gson.toJson(m);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.function.Predicate;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.configuration.tree.ConfigurationNode;
Expand Down Expand Up @@ -311,6 +312,11 @@ public String getServerAddress() {
return getString(ConfVars.ZEPPELIN_ADDR);
}

@VisibleForTesting
public void setServerPort(int port) {
properties.put(ConfVars.ZEPPELIN_PORT.getVarName(), String.valueOf(port));
}

public int getServerPort() {
return getInt(ConfVars.ZEPPELIN_PORT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
public ClusterInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage)
throws IOException {
super(zConf, recoveryStorage);
clusterServer.addClusterEventListeners(this);
clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC, this);
}

@Override
Expand Down Expand Up @@ -104,7 +104,8 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep
mapEvent.put(CLUSTER_EVENT, CREATE_INTP_PROCESS);
mapEvent.put(CLUSTER_EVENT_MSG, sContext);
String strEvent = gson.toJson(mapEvent);
clusterServer.unicastClusterEvent(srvHost, srvPort, strEvent);
clusterServer.unicastClusterEvent(
srvHost, srvPort, ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC, strEvent);

HashMap<String, Object> intpMeta = clusterServer
.getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
Expand Down Expand Up @@ -145,9 +146,13 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep
}

@Override
public void onClusterEvent(String event) {
public void onClusterEvent(String msg) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(msg);
}

Gson gson = new Gson();
Map<String, Object> mapEvent = gson.fromJson(event,
Map<String, Object> mapEvent = gson.fromJson(msg,
new TypeToken<Map<String, Object>>(){}.getType());
String sEvent = (String) mapEvent.get(CLUSTER_EVENT);
ClusterEvent clusterEvent = ClusterEvent.valueOf(sEvent);
Expand All @@ -157,7 +162,7 @@ public void onClusterEvent(String event) {
onCreateIntpProcess(mapEvent);
break;
default:
LOGGER.error("Unknown Cluster Event : {}", clusterEvent);
LOGGER.error("Unknown clusterEvent:{}, msg:{} ", clusterEvent, msg);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,12 @@ private static void setupNotebookServer(

private static void setupClusterManagerServer(ServiceLocator serviceLocator) {
if (conf.isClusterMode()) {
ClusterManagerServer.getInstance().start();
ClusterManagerServer clusterManagerServer = ClusterManagerServer.getInstance();
NotebookServer notebookServer = serviceLocator.getService(NotebookServer.class);
AuthorizationService authorizationService = serviceLocator.getService(AuthorizationService.class);
clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, notebookServer);
clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, authorizationService);
clusterManagerServer.start();
}
}

Expand Down
Loading