diff --git a/angular/pom.xml b/angular/pom.xml
index df4e83c0356..021139db599 100644
--- a/angular/pom.xml
+++ b/angular/pom.xml
@@ -53,6 +53,23 @@
slf4j-log4j12
+
+ commons-logging
+ commons-logging
+
+
+
+ org.apache.commons
+ commons-lang3
+ 3.4
+
+
+
+ commons-io
+ commons-io
+ 2.4
+
+
junit
junit
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index c8950185577..4606ed34231 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -79,9 +79,12 @@ fi
# add test classes for unittest
if [[ -d "${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes"
+ if [[ -n "${ZEPPELIN_ZENGINE_TEST}" ]]; then
+ addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes"
+ fi
fi
-addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"
+addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-interpreter-api/target"
addJarInDirForIntp "${ZEPPELIN_HOME}/lib/interpreter"
addJarInDirForIntp "${INTERPRETER_DIR}"
diff --git a/livy/pom.xml b/livy/pom.xml
index 3fc58c43768..5e648d93725 100644
--- a/livy/pom.xml
+++ b/livy/pom.xml
@@ -57,13 +57,6 @@
${project.version}
-
- ${project.groupId}
- zeppelin-interpreter
- ${project.version}
- test
-
-
org.apache.livy
livy-integration-test
diff --git a/markdown/pom.xml b/markdown/pom.xml
index d5fe9a3cc78..55535a699cd 100644
--- a/markdown/pom.xml
+++ b/markdown/pom.xml
@@ -80,6 +80,11 @@
2.4
+
+ commons-logging
+ commons-logging
+
+
junit
junit
diff --git a/python/pom.xml b/python/pom.xml
index 5c7059d78ee..8fed0c911b8 100644
--- a/python/pom.xml
+++ b/python/pom.xml
@@ -35,17 +35,11 @@
python
0.10.7
- 1.4.0
+ 1.15.0
python-interpreter-with-py4j
-
- ${project.groupId}
- zeppelin-interpreter
- ${project.version}
-
-
org.apache.commons
commons-exec
@@ -68,6 +62,26 @@
slf4j-log4j12
+
+ commons-logging
+ commons-logging
+
+
+
+ commons-lang
+ commons-lang
+
+
+
+ commons-io
+ commons-io
+
+
+
+ commons-codec
+ commons-codec
+
+
io.grpc
grpc-netty
diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index 4b6bfdb7bb2..28e6270598f 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -214,7 +214,7 @@ public void testGrpcFrameSize() throws InterpreterException, IOException {
List interpreterResultMessages =
context.out.toInterpreterResultMessage();
assertEquals(1, interpreterResultMessages.size());
- assertTrue(interpreterResultMessages.get(0).getData().contains("exceeds maximum: 3000"));
+ assertTrue(interpreterResultMessages.get(0).getData().contains("exceeds maximum size 3000"));
// next call continue work
result = interpreter.interpret("print(1)", context);
diff --git a/python/src/test/resources/log4j.properties b/python/src/test/resources/log4j.properties
index 8993ff2854d..2933a6408b8 100644
--- a/python/src/test/resources/log4j.properties
+++ b/python/src/test/resources/log4j.properties
@@ -24,4 +24,4 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
-log4j.logger.org.apache.zeppelin.python=DEBUG
+log4j.logger.org.apache.zeppelin.python=INFO
diff --git a/shell/pom.xml b/shell/pom.xml
index c702b9f6d31..98b8a830dc5 100644
--- a/shell/pom.xml
+++ b/shell/pom.xml
@@ -58,6 +58,11 @@
slf4j-log4j12
+
+ commons-logging
+ commons-logging
+
+
org.apache.commons
commons-exec
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index a7dbce6d436..daf801fcdd2 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -292,6 +292,11 @@
${commons.exec.version}
+
+ commons-logging
+ commons-logging
+
+
org.scala-lang
scala-library
diff --git a/zeppelin-integration/pom.xml b/zeppelin-integration/pom.xml
index 94ed3a29ecc..25463b3f1bd 100644
--- a/zeppelin-integration/pom.xml
+++ b/zeppelin-integration/pom.xml
@@ -44,7 +44,7 @@
3.8.1
- 3.4
+ 3.7
2.16
diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml
index 38259c72d03..22a029e8bf2 100644
--- a/zeppelin-interpreter/pom.xml
+++ b/zeppelin-interpreter/pom.xml
@@ -44,12 +44,65 @@
3.0.3
1.0
2.12.1
+ 3.0.0-rc4
+ 3.1.1
+ 20.0
+ 3.7
2.3
+
+ io.atomix
+ atomix
+ ${atomix.version}
+
+
+ org.apache.commons
+ commons-math3
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ com.google.guava
+ guava
+
+
+
+
+
+ io.atomix
+ atomix-raft
+ ${atomix.version}
+
+
+
+ io.atomix
+ atomix-primary-backup
+ ${atomix.version}
+
+
+
+ org.apache.commons
+ commons-math3
+ ${commons-math3.version}
+
+
+
+ org.apache.commons
+ commons-lang3
+ ${commons-lang3.version}
+
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+
org.apache.thrift
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java
new file mode 100644
index 00000000000..34e3b6f350c
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import io.atomix.cluster.messaging.BroadcastService;
+
+import java.util.function.Consumer;
+
+/**
+ * Broadcast Service Adapter
+ * Service for broadcast messaging between nodes.
+ * The broadcast service is an unreliable broadcast messaging service backed by multicast.
+ * This service provides no guaranteed regarding reliability or order of messages.
+ */
+public class BroadcastServiceAdapter implements BroadcastService {
+ @Override
+ public void broadcast(String subject, byte[] message) {
+
+ }
+
+ @Override
+ public void addListener(String subject, Consumer listener) {
+
+ }
+
+ @Override
+ public void removeListener(String subject, Consumer listener) {
+
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
new file mode 100644
index 00000000000..683f068f3c8
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import com.google.common.collect.Maps;
+import io.atomix.cluster.MemberId;
+import io.atomix.cluster.Node;
+import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.cluster.messaging.impl.NettyMessagingService;
+import io.atomix.primitive.operation.OperationType;
+import io.atomix.primitive.operation.PrimitiveOperation;
+import io.atomix.primitive.operation.impl.DefaultOperationId;
+import io.atomix.primitive.partition.PartitionId;
+import io.atomix.primitive.service.ServiceConfig;
+import io.atomix.primitive.session.SessionClient;
+import io.atomix.primitive.session.SessionId;
+import io.atomix.protocols.raft.RaftClient;
+import io.atomix.protocols.raft.RaftError;
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.RaftMember;
+import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.JoinRequest;
+import io.atomix.protocols.raft.protocol.JoinResponse;
+import io.atomix.protocols.raft.protocol.LeaveRequest;
+import io.atomix.protocols.raft.protocol.LeaveResponse;
+import io.atomix.protocols.raft.protocol.ConfigureRequest;
+import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.ReconfigureRequest;
+import io.atomix.protocols.raft.protocol.ReconfigureResponse;
+import io.atomix.protocols.raft.protocol.InstallRequest;
+import io.atomix.protocols.raft.protocol.InstallResponse;
+import io.atomix.protocols.raft.protocol.PollRequest;
+import io.atomix.protocols.raft.protocol.PollResponse;
+import io.atomix.protocols.raft.protocol.VoteRequest;
+import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.protocol.AppendRequest;
+import io.atomix.protocols.raft.protocol.AppendResponse;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.RaftResponse;
+import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
+import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
+import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
+import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
+import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
+import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
+import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
+import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.session.CommunicationStrategy;
+import io.atomix.protocols.raft.storage.system.Configuration;
+import io.atomix.utils.net.Address;
+import io.atomix.utils.serializer.Namespace;
+import io.atomix.utils.serializer.Serializer;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.cluster.meta.ClusterMeta;
+import org.apache.zeppelin.cluster.meta.ClusterMetaEntity;
+import org.apache.zeppelin.cluster.meta.ClusterMetaOperation;
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+import org.apache.zeppelin.cluster.protocol.LocalRaftProtocolFactory;
+import org.apache.zeppelin.cluster.protocol.RaftClientMessagingProtocol;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.time.Instant;
+
+import java.util.Date;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static io.atomix.primitive.operation.PrimitiveOperation.operation;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.DELETE_OPERATION;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.PUT_OPERATION;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.GET_OPERATION;
+
+/**
+ * The base class for cluster management, including the following implementations
+ * 1. RaftClient as the raft client
+ * 2. Threading to provide retry after cluster metadata submission failure
+ * 3. Cluster monitoring
+ */
+public abstract class ClusterManager {
+ private static Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class);
+
+ public final ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
+
+ protected Collection clusterNodes = new ArrayList<>();
+
+ // raft
+ protected static String ZEPL_CLUSTER_ID = "ZEPL-CLUSTER";
+ protected static String ZEPL_CLIENT_ID = "ZEPL-CLIENT";
+
+ protected int raftServerPort = 0;
+
+ protected RaftClient raftClient = null;
+ protected SessionClient raftSessionClient = null;
+ protected Map raftAddressMap = new ConcurrentHashMap<>();
+ protected LocalRaftProtocolFactory protocolFactory
+ = new LocalRaftProtocolFactory(protocolSerializer);
+ protected List messagingServices = new ArrayList<>();
+ protected List clusterMemberIds = new ArrayList();
+
+ protected AtomicBoolean running = new AtomicBoolean(true);
+
+ // Write data through the queue to prevent failure due to network exceptions
+ private ConcurrentLinkedQueue clusterMetaQueue
+ = new ConcurrentLinkedQueue<>();
+
+ // zeppelin server host & port
+ protected String zeplServerHost = "";
+
+ public ClusterManager() {
+ try {
+ zeplServerHost = RemoteInterpreterUtils.findAvailableHostAddress();
+ String clusterAddr = zconf.getClusterAddress();
+ if (!StringUtils.isEmpty(clusterAddr)) {
+ String cluster[] = clusterAddr.split(",");
+
+ for (int i = 0; i < cluster.length; i++) {
+ String[] parts = cluster[i].split(":");
+ String clusterHost = parts[0];
+ int clusterPort = Integer.valueOf(parts[1]);
+ if (zeplServerHost.equalsIgnoreCase(clusterHost)) {
+ raftServerPort = clusterPort;
+ }
+
+ Node node = Node.builder().withId(cluster[i])
+ .withAddress(Address.from(clusterHost, clusterPort)).build();
+ clusterNodes.add(node);
+ raftAddressMap.put(MemberId.from(cluster[i]), Address.from(clusterHost, clusterPort));
+ clusterMemberIds.add(MemberId.from(cluster[i]));
+ }
+ }
+ } catch (UnknownHostException e) {
+ LOGGER.error(e.getMessage());
+ } catch (SocketException e) {
+ LOGGER.error(e.getMessage());
+ }
+
+ }
+
+ // Check if the raft environment is initialized
+ public abstract boolean raftInitialized();
+ // Is it a cluster leader
+ public abstract boolean isClusterLeader();
+
+ public AtomicBoolean getRunning() {
+ return running;
+ }
+
+ private SessionClient createProxy(RaftClient client) {
+ return client.sessionBuilder(ClusterPrimitiveType.PRIMITIVE_NAME,
+ ClusterPrimitiveType.INSTANCE, new ServiceConfig())
+ .withReadConsistency(ReadConsistency.SEQUENTIAL)
+ .withCommunicationStrategy(CommunicationStrategy.LEADER)
+ .build()
+ .connect()
+ .join();
+ }
+
+ public void start() {
+ if (!zconf.isClusterMode()) {
+ return;
+ }
+
+ // RaftClient Thread
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ LOGGER.info("RaftClientThread run() >>>");
+
+ int raftClientPort = 0;
+ try {
+ raftClientPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage());
+ }
+
+ MemberId memberId = MemberId.from(ZEPL_CLIENT_ID + zeplServerHost + ":" + raftClientPort);
+ Address address = Address.from(zeplServerHost, raftClientPort);
+ raftAddressMap.put(memberId, address);
+
+ MessagingService messagingManager
+ = NettyMessagingService.builder().withAddress(address).build().start().join();
+ RaftClientProtocol protocol = new RaftClientMessagingProtocol(
+ messagingManager, protocolSerializer, raftAddressMap::get);
+
+ raftClient = RaftClient.builder()
+ .withMemberId(memberId)
+ .withPartitionId(PartitionId.from("partition", 1))
+ .withProtocol(protocol)
+ .build();
+
+ raftClient.connect(clusterMemberIds).join();
+
+ raftSessionClient = createProxy(raftClient);
+
+ LOGGER.info("RaftClientThread run() <<<");
+ }
+ }).start();
+
+ // Cluster Meta Consume Thread
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ while (getRunning().get()) {
+ ClusterMetaEntity metaEntity = clusterMetaQueue.peek();
+ if (null != metaEntity) {
+ // Determine whether the client is connected
+ int retry = 0;
+ while (!raftInitialized()) {
+ retry++;
+ if (0 == retry % 30) {
+ LOGGER.error("Raft incomplete initialization! retry[{}]", retry);
+ }
+ Thread.sleep(100);
+ }
+ boolean success = false;
+ switch (metaEntity.getOperation()) {
+ case DELETE_OPERATION:
+ success = deleteClusterMeta(metaEntity);
+ break;
+ case PUT_OPERATION:
+ success = putClusterMeta(metaEntity);
+ break;
+ }
+ if (true == success) {
+ // The operation was successfully deleted
+ clusterMetaQueue.remove(metaEntity);
+ } else {
+ LOGGER.error("Cluster Meta Consume faild!");
+ }
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+ }).start();
+ }
+
+ // cluster shutdown
+ public void shutdown() {
+ if (!zconf.isClusterMode()) {
+ return;
+ }
+
+ running.set(false);
+
+ try {
+ if (null != raftSessionClient) {
+ raftSessionClient.close().get(3, TimeUnit.SECONDS);
+ }
+ if (null != raftClient) {
+ raftClient.close().get(3, TimeUnit.SECONDS);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage());
+ } catch (ExecutionException e) {
+ LOGGER.error(e.getMessage());
+ } catch (TimeoutException e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+
+ public String getClusterName() {
+ return zeplServerHost + ":" + raftServerPort;
+ }
+
+ // put metadata into cluster metadata
+ private boolean putClusterMeta(ClusterMetaEntity entity) {
+ if (!raftInitialized()) {
+ LOGGER.error("Raft incomplete initialization!");
+ return false;
+ }
+
+ ClusterMetaType metaType = entity.getMetaType();
+ String metaKey = entity.getKey();
+ HashMap newMetaValue = entity.getValues();
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("putClusterMeta {} {}", metaType, metaKey);
+ }
+
+ // add cluster name
+ newMetaValue.put(ClusterMeta.SERVER_HOST, zeplServerHost);
+ newMetaValue.put(ClusterMeta.SERVER_PORT, raftServerPort);
+
+ raftSessionClient.execute(operation(ClusterStateMachine.PUT,
+ clientSerializer.encode(entity)))
+ .thenApply(clientSerializer::decode);
+ return true;
+ }
+
+ // put metadata into cluster metadata
+ public void putClusterMeta(ClusterMetaType type, String key, HashMap values) {
+ ClusterMetaEntity metaEntity = new ClusterMetaEntity(PUT_OPERATION, type, key, values);
+
+ boolean result = putClusterMeta(metaEntity);
+ if (false == result) {
+ LOGGER.warn("putClusterMeta failure, Cache metadata to queue.");
+ clusterMetaQueue.add(metaEntity);
+ }
+ }
+
+ // delete metadata by cluster metadata
+ private boolean deleteClusterMeta(ClusterMetaEntity entity) {
+ ClusterMetaType metaType = entity.getMetaType();
+ String metaKey = entity.getKey();
+
+ // Need to pay attention to delete metadata operations
+ LOGGER.info("deleteClusterMeta {} {}", metaType, metaKey);
+
+ if (!raftInitialized()) {
+ LOGGER.error("Raft incomplete initialization!");
+ return false;
+ }
+
+ raftSessionClient.execute(operation(
+ ClusterStateMachine.REMOVE,
+ clientSerializer.encode(entity)))
+ .thenApply(clientSerializer::decode)
+ .thenAccept(result -> {
+ LOGGER.info("deleteClusterMeta {}", result);
+ });
+
+ return true;
+ }
+
+ // delete metadata from cluster metadata
+ public void deleteClusterMeta(ClusterMetaType type, String key) {
+ ClusterMetaEntity metaEntity = new ClusterMetaEntity(DELETE_OPERATION, type, key, null);
+
+ boolean result = deleteClusterMeta(metaEntity);
+ if (false == result) {
+ LOGGER.warn("deleteClusterMeta faild, Cache data to queue.");
+ clusterMetaQueue.add(metaEntity);
+ }
+ }
+
+ // get metadata by cluster metadata
+ public HashMap> getClusterMeta(
+ ClusterMetaType metaType, String metaKey) {
+ HashMap> clusterMeta = new HashMap<>();
+ if (!raftInitialized()) {
+ LOGGER.error("Raft incomplete initialization!");
+ return clusterMeta;
+ }
+
+ ClusterMetaEntity entity = new ClusterMetaEntity(GET_OPERATION, metaType, metaKey, null);
+
+ byte[] mateData = null;
+ try {
+ mateData = raftSessionClient.execute(operation(ClusterStateMachine.GET,
+ clientSerializer.encode(entity))).get(3, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage());
+ } catch (ExecutionException e) {
+ LOGGER.error(e.getMessage());
+ } catch (TimeoutException e) {
+ LOGGER.error(e.getMessage());
+ }
+
+ if (null != mateData) {
+ clusterMeta = clientSerializer.decode(mateData);
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("getClusterMeta >>> {}", clusterMeta.toString());
+ }
+
+ return clusterMeta;
+ }
+
+ protected static final Serializer protocolSerializer = Serializer.using(Namespace.builder()
+ .register(OpenSessionRequest.class)
+ .register(OpenSessionResponse.class)
+ .register(CloseSessionRequest.class)
+ .register(CloseSessionResponse.class)
+ .register(KeepAliveRequest.class)
+ .register(KeepAliveResponse.class)
+ .register(QueryRequest.class)
+ .register(QueryResponse.class)
+ .register(CommandRequest.class)
+ .register(CommandResponse.class)
+ .register(MetadataRequest.class)
+ .register(MetadataResponse.class)
+ .register(JoinRequest.class)
+ .register(JoinResponse.class)
+ .register(LeaveRequest.class)
+ .register(LeaveResponse.class)
+ .register(ConfigureRequest.class)
+ .register(ConfigureResponse.class)
+ .register(ReconfigureRequest.class)
+ .register(ReconfigureResponse.class)
+ .register(InstallRequest.class)
+ .register(InstallResponse.class)
+ .register(PollRequest.class)
+ .register(PollResponse.class)
+ .register(VoteRequest.class)
+ .register(VoteResponse.class)
+ .register(AppendRequest.class)
+ .register(AppendResponse.class)
+ .register(PublishRequest.class)
+ .register(ResetRequest.class)
+ .register(RaftResponse.Status.class)
+ .register(RaftError.class)
+ .register(RaftError.Type.class)
+ .register(PrimitiveOperation.class)
+ .register(ReadConsistency.class)
+ .register(byte[].class)
+ .register(long[].class)
+ .register(CloseSessionEntry.class)
+ .register(CommandEntry.class)
+ .register(ConfigurationEntry.class)
+ .register(InitializeEntry.class)
+ .register(KeepAliveEntry.class)
+ .register(MetadataEntry.class)
+ .register(OpenSessionEntry.class)
+ .register(QueryEntry.class)
+ .register(PrimitiveOperation.class)
+ .register(DefaultOperationId.class)
+ .register(OperationType.class)
+ .register(ReadConsistency.class)
+ .register(ArrayList.class)
+ .register(HashMap.class)
+ .register(ClusterMetaEntity.class)
+ .register(Date.class)
+ .register(Collections.emptyList().getClass())
+ .register(HashSet.class)
+ .register(DefaultRaftMember.class)
+ .register(MemberId.class)
+ .register(SessionId.class)
+ .register(RaftMember.Type.class)
+ .register(Instant.class)
+ .register(Configuration.class)
+ .build());
+
+ protected static final Serializer storageSerializer = Serializer.using(Namespace.builder()
+ .register(CloseSessionEntry.class)
+ .register(CommandEntry.class)
+ .register(ConfigurationEntry.class)
+ .register(InitializeEntry.class)
+ .register(KeepAliveEntry.class)
+ .register(MetadataEntry.class)
+ .register(OpenSessionEntry.class)
+ .register(QueryEntry.class)
+ .register(PrimitiveOperation.class)
+ .register(DefaultOperationId.class)
+ .register(OperationType.class)
+ .register(ReadConsistency.class)
+ .register(ArrayList.class)
+ .register(ClusterMetaEntity.class)
+ .register(HashMap.class)
+ .register(HashSet.class)
+ .register(Date.class)
+ .register(DefaultRaftMember.class)
+ .register(MemberId.class)
+ .register(RaftMember.Type.class)
+ .register(Instant.class)
+ .register(Configuration.class)
+ .register(byte[].class)
+ .register(long[].class)
+ .build());
+
+ protected static final Serializer clientSerializer = Serializer.using(Namespace.builder()
+ .register(ReadConsistency.class)
+ .register(ClusterMetaEntity.class)
+ .register(ClusterMetaOperation.class)
+ .register(ClusterMetaType.class)
+ .register(HashMap.class)
+ .register(Date.class)
+ .register(Maps.immutableEntry(new String(), new Object()).getClass())
+ .build());
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java
new file mode 100644
index 00000000000..b4802a036dc
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import io.atomix.primitive.PrimitiveBuilder;
+import io.atomix.primitive.PrimitiveManagementService;
+import io.atomix.primitive.PrimitiveType;
+import io.atomix.primitive.config.PrimitiveConfig;
+import io.atomix.primitive.service.PrimitiveService;
+import io.atomix.primitive.service.ServiceConfig;
+
+/**
+ * Cluster primitive type
+ * Creating a custom distributed primitive is defining the primitive type.
+ * To create a new type, implement the PrimitiveType interface
+ */
+public class ClusterPrimitiveType implements PrimitiveType {
+ public static final ClusterPrimitiveType INSTANCE = new ClusterPrimitiveType();
+
+ public static final String PRIMITIVE_NAME = "CLUSTER_PRIMITIVE";
+
+ @Override
+ public String name() {
+ return PRIMITIVE_NAME;
+ }
+
+ @Override
+ public PrimitiveConfig newConfig() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PrimitiveBuilder newBuilder(String primitiveName,
+ PrimitiveConfig config,
+ PrimitiveManagementService managementService) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PrimitiveService newService(ServiceConfig config) {
+ return new ClusterStateMachine();
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
new file mode 100644
index 00000000000..460f6ac3f18
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import com.google.common.collect.Maps;
+import io.atomix.primitive.operation.OperationId;
+import io.atomix.primitive.service.AbstractPrimitiveService;
+import io.atomix.primitive.service.BackupOutput;
+import io.atomix.primitive.service.BackupInput;
+import io.atomix.primitive.service.Commit;
+import io.atomix.primitive.service.ServiceExecutor;
+import io.atomix.utils.serializer.Serializer;
+import org.apache.zeppelin.cluster.meta.ClusterMeta;
+import org.apache.zeppelin.cluster.meta.ClusterMetaEntity;
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Cluster State Machine for Zeppelin
+ * The cluster state is implemented as a snapshot state machine.
+ * The state machine stores the service and process metadata information of the cluster.
+ * Metadata information can be manipulated by put, get, remove, index, and snapshot.
+ */
+public class ClusterStateMachine extends AbstractPrimitiveService {
+ private static Logger logger = LoggerFactory.getLogger(ClusterStateMachine.class);
+ private ClusterMeta clusterMeta = new ClusterMeta();
+
+ // Command to operation a variable in cluster state machine
+ public static final OperationId PUT = OperationId.command("put");
+ public static final OperationId GET = OperationId.query("get");
+ public static final OperationId REMOVE = OperationId.command("remove");
+ public static final OperationId INDEX = OperationId.command("index");
+
+ public ClusterStateMachine() {
+ super(ClusterPrimitiveType.INSTANCE);
+ }
+
+ @Override
+ public Serializer serializer() {
+ return ClusterManager.clientSerializer;
+ }
+
+ @Override
+ protected void configure(ServiceExecutor executor) {
+ executor.register(PUT, this::put);
+ executor.register(GET, this::get);
+ executor.register(REMOVE, this::remove);
+ executor.register(INDEX, this::index);
+ }
+
+ protected long put(Commit commit) {
+ clusterMeta.put(commit.value().getMetaType(),
+ commit.value().getKey(), commit.value().getValues());
+ return commit.index();
+ }
+
+ protected Map> get(Commit commit) {
+ return clusterMeta.get(commit.value().getMetaType(), commit.value().getKey());
+ }
+
+ protected long remove(Commit commit) {
+ clusterMeta.remove(commit.value().getMetaType(), commit.value().getKey());
+ return commit.index();
+ }
+
+ protected long index(Commit commit) {
+ return commit.index();
+ }
+
+ @Override
+ public void backup(BackupOutput writer) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ClusterStateMachine.backup()");
+ }
+
+ // backup ServerMeta
+ // cluster meta map struct
+ // cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...}
+ Map> mapServerMeta
+ = clusterMeta.get(ClusterMetaType.ServerMeta, "");
+ // write all ServerMeta size
+ writer.writeInt(mapServerMeta.size());
+ for (Map.Entry> entry : mapServerMeta.entrySet()) {
+ // write cluster_name
+ writer.writeString(entry.getKey());
+
+ Map kvPairs = entry.getValue();
+ // write cluster mate kv pairs size
+ writer.writeInt(kvPairs.size());
+ for (Map.Entry entryValue : kvPairs.entrySet()) {
+ // write cluster mate kv pairs
+ writer.writeString(entryValue.getKey());
+ writer.writeObject(entryValue.getValue());
+ }
+ }
+
+ // backup IntpProcessMeta
+ // Interpreter meta map struct
+ // IntpGroupId -> {server_tserver_host,server_tserver_port,...}
+ Map> mapIntpProcMeta
+ = clusterMeta.get(ClusterMetaType.IntpProcessMeta, "");
+ // write interpreter size
+ writer.writeInt(mapIntpProcMeta.size());
+ for (Map.Entry> entry : mapIntpProcMeta.entrySet()) {
+ // write IntpGroupId
+ writer.writeString(entry.getKey());
+
+ Map kvPairs = entry.getValue();
+ // write interpreter mate kv pairs size
+ writer.writeInt(kvPairs.size());
+ for (Map.Entry entryValue : kvPairs.entrySet()) {
+ // write interpreter mate kv pairs
+ writer.writeString(entryValue.getKey());
+ writer.writeObject(entryValue.getValue());
+ }
+ }
+ }
+
+ @Override
+ public void restore(BackupInput reader) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ClusterStateMachine.restore()");
+ }
+
+ clusterMeta = new ClusterMeta();
+ // read all ServerMeta size
+ int nServerMeta = reader.readInt();
+ for (int i = 0; i < nServerMeta; i++) {
+ // read cluster_name
+ String clusterName = reader.readString();
+
+ // read cluster mate kv pairs size
+ int nKVpairs = reader.readInt();
+ for (int j = 0; j < nKVpairs; i++) {
+ // read cluster mate kv pairs
+ String key = reader.readString();
+ Object value = reader.readObject();
+
+ clusterMeta.put(ClusterMetaType.ServerMeta,
+ clusterName, Maps.immutableEntry(key, value));
+ }
+ }
+
+ // read all IntpProcessMeta size
+ int nIntpMeta = reader.readInt();
+ for (int i = 0; i < nIntpMeta; i++) {
+ // read interpreter name
+ String intpName = reader.readString();
+
+ // read interpreter mate kv pairs size
+ int nKVpairs = reader.readInt();
+ for (int j = 0; j < nKVpairs; i++) {
+ // read interpreter mate kv pairs
+ String key = reader.readString();
+ Object value = reader.readObject();
+
+ clusterMeta.put(ClusterMetaType.IntpProcessMeta,
+ intpName, Maps.immutableEntry(key, value));
+ }
+ }
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java
new file mode 100644
index 00000000000..6283813fa9f
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.listener;
+
+import io.atomix.cluster.ClusterMembershipEvent;
+import io.atomix.cluster.ClusterMembershipEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Entity capable of receiving device cluster-related events.
+ * Listen for new zeppelin servers to join or leave the cluster,
+ * Monitor whether the metadata in the cluster server changes
+ */
+public class ZeppelinClusterMembershipEventListener implements ClusterMembershipEventListener {
+ private static Logger logger
+ = LoggerFactory.getLogger(ZeppelinClusterMembershipEventListener.class);
+
+ @Override
+ public void event(ClusterMembershipEvent event) {
+ switch (event.type()) {
+ case MEMBER_ADDED:
+ logger.info(event.subject().id() + " joined the cluster.");
+ break;
+ case MEMBER_REMOVED:
+ logger.info(event.subject().id() + " left the cluster.");
+ break;
+ case METADATA_CHANGED:
+ logger.info(event.subject().id() + " meta data changed.");
+ break;
+ case REACHABILITY_CHANGED:
+ logger.info(event.subject().id() + " reachability changed.");
+ break;
+ }
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
new file mode 100644
index 00000000000..b96e32b56d8
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.meta;
+
+import com.google.gson.Gson;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Metadata stores metadata information in a KV key-value pair
+ */
+public class ClusterMeta implements Serializable {
+ private static Logger logger = LoggerFactory.getLogger(ClusterMeta.class);
+
+ // zeppelin-server meta
+ public static String SERVER_HOST = "SERVER_HOST";
+ public static String SERVER_PORT = "SERVER_PORT";
+ public static String SERVER_TSERVER_HOST = "SERVER_TSERVER_HOST";
+ public static String SERVER_TSERVER_PORT = "SERVER_TSERVER_PORT";
+ public static String SERVER_START_TIME = "SERVER_START_TIME";
+
+ // interperter-process meta
+ public static String INTP_TSERVER_HOST = "INTP_TSERVER_HOST";
+ public static String INTP_TSERVER_PORT = "INTP_TSERVER_PORT";
+ public static String INTP_START_TIME = "INTP_START_TIME";
+
+ // zeppelin-server resource usage
+ public static String CPU_CAPACITY = "CPU_CAPACITY";
+ public static String CPU_USED = "CPU_USED";
+ public static String MEMORY_CAPACITY = "MEMORY_CAPACITY";
+ public static String MEMORY_USED = "MEMORY_USED";
+
+ public static String HEARTBEAT = "HEARTBEAT";
+
+ // zeppelin-server or interperter-process status
+ public static String STATUS = "STATUS";
+ public static String ONLINE_STATUS = "ONLINE";
+ public static String OFFLINE_STATUS = "OFFLINE";
+
+ // cluster_name = host:port
+ // Map:cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...}
+ private Map> mapServerMeta = new HashMap<>();
+
+ // Map:InterpreterGroupId -> {cluster_name,intp_tserver_host,...}
+ private Map> mapInterpreterMeta = new HashMap<>();
+
+ public static Gson gson = new Gson();
+
+ public void put(ClusterMetaType type, String key, Object value) {
+ Map mapValue = (Map) value;
+
+ switch (type) {
+ case ServerMeta:
+ // Because it may be partially updated metadata information
+ if (mapServerMeta.containsKey(key)) {
+ Map values = mapServerMeta.get(key);
+ values.putAll(mapValue);
+ } else {
+ mapServerMeta.put(key, mapValue);
+ }
+ break;
+ case IntpProcessMeta:
+ if (mapInterpreterMeta.containsKey(key)) {
+ Map values = mapInterpreterMeta.get(key);
+ values.putAll(mapValue);
+ } else {
+ mapInterpreterMeta.put(key, mapValue);
+ }
+ break;
+ }
+ }
+
+ public Map> get(ClusterMetaType type, String key) {
+ Map values = null;
+
+ switch (type) {
+ case ServerMeta:
+ if (null == key || StringUtils.isEmpty(key)) {
+ return mapServerMeta;
+ }
+ if (mapServerMeta.containsKey(key)) {
+ values = mapServerMeta.get(key);
+ } else {
+ logger.warn("can not find key : {}", key);
+ }
+ break;
+ case IntpProcessMeta:
+ if (null == key || StringUtils.isEmpty(key)) {
+ return mapInterpreterMeta;
+ }
+ if (mapInterpreterMeta.containsKey(key)) {
+ values = mapInterpreterMeta.get(key);
+ } else {
+ logger.warn("can not find key : {}", key);
+ }
+ break;
+ }
+
+ Map> result = new HashMap<>();
+ result.put(key, values);
+
+ return result;
+ }
+
+ public Map remove(ClusterMetaType type, String key) {
+ switch (type) {
+ case ServerMeta:
+ if (mapServerMeta.containsKey(key)) {
+ return mapServerMeta.remove(key);
+ } else {
+ logger.warn("can not find key : {}", key);
+ }
+ break;
+ case IntpProcessMeta:
+ if (mapInterpreterMeta.containsKey(key)) {
+ return mapInterpreterMeta.remove(key);
+ } else {
+ logger.warn("can not find key : {}", key);
+ }
+ break;
+ }
+
+ return null;
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
new file mode 100644
index 00000000000..7a5afb013b8
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.meta;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+/**
+ * Cluster operations, cluster types, encapsulation objects for keys and values
+ */
+public class ClusterMetaEntity implements Serializable {
+ private ClusterMetaOperation operation;
+ private ClusterMetaType type;
+ private String key;
+ private HashMap values = new HashMap<>();
+
+ public ClusterMetaEntity(ClusterMetaOperation operation, ClusterMetaType type,
+ String key, HashMap values) {
+ this.operation = operation;
+ this.type = type;
+ this.key = key;
+
+ if (null != values) {
+ this.values.putAll(values);
+ }
+ }
+
+ public ClusterMetaOperation getOperation() {
+ return operation;
+ }
+
+ public ClusterMetaType getMetaType() {
+ return type;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public HashMap getValues() {
+ return values;
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java
new file mode 100644
index 00000000000..33c99c86ed3
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.meta;
+
+/**
+ * Type of cluster metadata operation
+ */
+public enum ClusterMetaOperation {
+ GET_OPERATION,
+ PUT_OPERATION,
+ DELETE_OPERATION
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
new file mode 100644
index 00000000000..c6229bd6347
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.meta;
+
+/**
+ * Type of cluster metadata
+ */
+public enum ClusterMetaType {
+ ServerMeta,
+ IntpProcessMeta
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java
new file mode 100644
index 00000000000..eb7a76281ed
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.protocol;
+
+import com.google.common.collect.Maps;
+import io.atomix.cluster.MemberId;
+import io.atomix.primitive.session.SessionId;
+
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.utils.concurrent.Futures;
+import io.atomix.utils.serializer.Serializer;
+
+import java.net.ConnectException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Protocol for intercommunication between Raft clients for each server in the cluster.
+ * Communication protocol for handling sessions, queries, commands, and services within the cluster.
+ */
+public class LocalRaftClientProtocol extends LocalRaftProtocol implements RaftClientProtocol {
+ private Function> heartbeatHandler;
+ private final Map> publishListeners = Maps.newConcurrentMap();
+
+ public LocalRaftClientProtocol(MemberId memberId,
+ Serializer serializer,
+ Map servers,
+ Map clients) {
+ super(serializer, servers, clients);
+ clients.put(memberId, this);
+ }
+
+ private CompletableFuture getServer(MemberId memberId) {
+ LocalRaftServerProtocol server = server(memberId);
+ if (server != null) {
+ return Futures.completedFuture(server);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public CompletableFuture openSession(MemberId memberId,
+ OpenSessionRequest request) {
+ return getServer(memberId).thenCompose(protocol ->
+ protocol.openSession(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture closeSession(MemberId memberId,
+ CloseSessionRequest request) {
+ return getServer(memberId).thenCompose(protocol ->
+ protocol.closeSession(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture keepAlive(MemberId memberId,
+ KeepAliveRequest request) {
+ return getServer(memberId).thenCompose(protocol ->
+ protocol.keepAlive(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture query(MemberId memberId, QueryRequest request) {
+ return getServer(memberId).thenCompose(protocol ->
+ protocol.query(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture command(MemberId memberId,
+ CommandRequest request) {
+ return getServer(memberId).thenCompose(protocol ->
+ protocol.command(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture metadata(MemberId memberId,
+ MetadataRequest request) {
+ return getServer(memberId).thenCompose(protocol ->
+ protocol.metadata(encode(request))).thenApply(this::decode);
+ }
+
+ CompletableFuture heartbeat(byte[] request) {
+ if (heartbeatHandler != null) {
+ return heartbeatHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerHeartbeatHandler(Function> handler) {
+ this.heartbeatHandler = handler;
+ }
+
+ @Override
+ public void unregisterHeartbeatHandler() {
+ this.heartbeatHandler = null;
+ }
+
+ @Override
+ public void reset(Set members, ResetRequest request) {
+ members.forEach(nodeId -> {
+ LocalRaftServerProtocol server = server(nodeId);
+ if (server != null) {
+ server.reset(request.session(), encode(request));
+ }
+ });
+ }
+
+ void publish(long sessionId, byte[] request) {
+ Consumer listener = publishListeners.get(sessionId);
+ if (listener != null) {
+ listener.accept(decode(request));
+ }
+ }
+
+ @Override
+ public void registerPublishListener(SessionId sessionId,
+ Consumer listener, Executor executor) {
+ publishListeners.put(sessionId.id(), request ->
+ executor.execute(() -> listener.accept(request)));
+ }
+
+ @Override
+ public void unregisterPublishListener(SessionId sessionId) {
+ publishListeners.remove(sessionId.id());
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocol.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocol.java
new file mode 100644
index 00000000000..c28047a0f09
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocol.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.protocol;
+
+import io.atomix.cluster.MemberId;
+import io.atomix.utils.serializer.Serializer;
+
+import java.util.Map;
+
+/**
+ * Base class for Raft protocol.
+ */
+public abstract class LocalRaftProtocol {
+ private final Serializer serializer;
+ private final Map servers;
+ private final Map clients;
+
+ public LocalRaftProtocol(Serializer serializer,
+ Map servers,
+ Map clients) {
+ this.serializer = serializer;
+ this.servers = servers;
+ this.clients = clients;
+ }
+
+ T copy(T value) {
+ return serializer.decode(serializer.encode(value));
+ }
+
+ byte[] encode(Object value) {
+ return serializer.encode(value);
+ }
+
+ T decode(byte[] bytes) {
+ return serializer.decode(bytes);
+ }
+
+ LocalRaftServerProtocol server(MemberId memberId) {
+ return servers.get(memberId);
+ }
+
+ LocalRaftClientProtocol client(MemberId memberId) {
+ return clients.get(memberId);
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocolFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocolFactory.java
new file mode 100644
index 00000000000..83d25026d48
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocolFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.protocol;
+
+import com.google.common.collect.Maps;
+import io.atomix.cluster.MemberId;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.utils.serializer.Serializer;
+
+import java.util.Map;
+
+/**
+ * Cluster Raft protocol factory.
+ */
+public class LocalRaftProtocolFactory {
+ private final Serializer serializer;
+ private final Map servers = Maps.newConcurrentMap();
+ private final Map clients = Maps.newConcurrentMap();
+
+ public LocalRaftProtocolFactory(Serializer serializer) {
+ this.serializer = serializer;
+ }
+
+ /**
+ * Returns a new test client protocol.
+ *
+ * @param memberId the client member identifier
+ * @return a new test client protocol
+ */
+ public RaftClientProtocol newClientProtocol(MemberId memberId) {
+ return new LocalRaftClientProtocol(memberId, serializer, servers, clients);
+ }
+
+ /**
+ * Returns a new test server protocol.
+ *
+ * @param memberId the server member identifier
+ * @return a new test server protocol
+ */
+ public RaftServerProtocol newServerProtocol(MemberId memberId) {
+ return new LocalRaftServerProtocol(memberId, serializer, servers, clients);
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftServerProtocol.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftServerProtocol.java
new file mode 100644
index 00000000000..c28ae87fd9f
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftServerProtocol.java
@@ -0,0 +1,527 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.protocol;
+
+import com.google.common.collect.Maps;
+import io.atomix.cluster.MemberId;
+import io.atomix.primitive.session.SessionId;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.JoinRequest;
+import io.atomix.protocols.raft.protocol.JoinResponse;
+import io.atomix.protocols.raft.protocol.LeaveRequest;
+import io.atomix.protocols.raft.protocol.LeaveResponse;
+import io.atomix.protocols.raft.protocol.ConfigureRequest;
+import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.ReconfigureRequest;
+import io.atomix.protocols.raft.protocol.ReconfigureResponse;
+import io.atomix.protocols.raft.protocol.InstallRequest;
+import io.atomix.protocols.raft.protocol.InstallResponse;
+import io.atomix.protocols.raft.protocol.PollRequest;
+import io.atomix.protocols.raft.protocol.PollResponse;
+import io.atomix.protocols.raft.protocol.VoteRequest;
+import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.protocol.TransferRequest;
+import io.atomix.protocols.raft.protocol.TransferResponse;
+import io.atomix.protocols.raft.protocol.AppendRequest;
+import io.atomix.protocols.raft.protocol.AppendResponse;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+
+import io.atomix.utils.concurrent.Futures;
+import io.atomix.utils.serializer.Serializer;
+
+import java.net.ConnectException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Cluster server protocol.
+ */
+public class LocalRaftServerProtocol extends LocalRaftProtocol implements RaftServerProtocol {
+ private Function> openSessionHandler;
+ private Function>
+ closeSessionHandler;
+ private Function> keepAliveHandler;
+ private Function> queryHandler;
+ private Function> commandHandler;
+ private Function> metadataHandler;
+ private Function> joinHandler;
+ private Function> leaveHandler;
+ private Function> configureHandler;
+ private Function> reconfigureHandler;
+ private Function> installHandler;
+ private Function> pollHandler;
+ private Function> voteHandler;
+ private Function> transferHandler;
+ private Function> appendHandler;
+ private final Map> resetListeners = Maps.newConcurrentMap();
+
+ public LocalRaftServerProtocol(MemberId memberId, Serializer serializer,
+ Map servers,
+ Map clients) {
+ super(serializer, servers, clients);
+ servers.put(memberId, this);
+ }
+
+ private CompletableFuture getServer(MemberId memberId) {
+ LocalRaftServerProtocol server = server(memberId);
+ if (server != null) {
+ return Futures.completedFuture(server);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ private CompletableFuture getClient(MemberId memberId) {
+ LocalRaftClientProtocol client = client(memberId);
+ if (client != null) {
+ return Futures.completedFuture(client);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public CompletableFuture openSession(MemberId memberId,
+ OpenSessionRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.openSession(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture closeSession(MemberId memberId,
+ CloseSessionRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.closeSession(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture keepAlive(MemberId memberId,
+ KeepAliveRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.keepAlive(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture query(MemberId memberId, QueryRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.query(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture command(MemberId memberId,
+ CommandRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.command(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture metadata(MemberId memberId,
+ MetadataRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.metadata(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture join(MemberId memberId, JoinRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.join(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture leave(MemberId memberId, LeaveRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.leave(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture configure(MemberId memberId,
+ ConfigureRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.configure(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture reconfigure(MemberId memberId,
+ ReconfigureRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.reconfigure(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture install(MemberId memberId, InstallRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.install(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture transfer(MemberId memberId, TransferRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.install(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture poll(MemberId memberId, PollRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.poll(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture vote(MemberId memberId, VoteRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.vote(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public CompletableFuture append(MemberId memberId, AppendRequest request) {
+ return getServer(memberId).thenCompose(listener ->
+ listener.append(encode(request))).thenApply(this::decode);
+ }
+
+ @Override
+ public void publish(MemberId memberId, PublishRequest request) {
+ getClient(memberId).thenAccept(protocol ->
+ protocol.publish(request.session(), encode(request)));
+ }
+
+ @Override
+ public CompletableFuture heartbeat(MemberId memberId,
+ HeartbeatRequest request) {
+ return getClient(memberId).thenCompose(protocol ->
+ protocol.heartbeat(encode(request))).thenApply(this::decode);
+ }
+
+ CompletableFuture openSession(byte[] request) {
+ if (openSessionHandler != null) {
+ return openSessionHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerOpenSessionHandler(Function> handler) {
+ this.openSessionHandler = handler;
+ }
+
+ @Override
+ public void unregisterOpenSessionHandler() {
+ this.openSessionHandler = null;
+ }
+
+ CompletableFuture closeSession(byte[] request) {
+ if (closeSessionHandler != null) {
+ return closeSessionHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerCloseSessionHandler(Function> handler) {
+ this.closeSessionHandler = handler;
+ }
+
+ @Override
+ public void unregisterCloseSessionHandler() {
+ this.closeSessionHandler = null;
+ }
+
+ CompletableFuture keepAlive(byte[] request) {
+ if (keepAliveHandler != null) {
+ return keepAliveHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerKeepAliveHandler(Function> handler) {
+ this.keepAliveHandler = handler;
+ }
+
+ @Override
+ public void unregisterKeepAliveHandler() {
+ this.keepAliveHandler = null;
+ }
+
+ CompletableFuture query(byte[] request) {
+ if (queryHandler != null) {
+ return queryHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerQueryHandler(Function> handler) {
+ this.queryHandler = handler;
+ }
+
+ @Override
+ public void unregisterQueryHandler() {
+ this.queryHandler = null;
+ }
+
+ CompletableFuture command(byte[] request) {
+ if (commandHandler != null) {
+ return commandHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerCommandHandler(Function> handler) {
+ this.commandHandler = handler;
+ }
+
+ @Override
+ public void unregisterCommandHandler() {
+ this.commandHandler = null;
+ }
+
+ CompletableFuture metadata(byte[] request) {
+ if (metadataHandler != null) {
+ return metadataHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerMetadataHandler(Function> handler) {
+ this.metadataHandler = handler;
+ }
+
+ @Override
+ public void unregisterMetadataHandler() {
+ this.metadataHandler = null;
+ }
+
+ CompletableFuture join(byte[] request) {
+ if (joinHandler != null) {
+ return joinHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerJoinHandler(Function> handler) {
+ this.joinHandler = handler;
+ }
+
+ @Override
+ public void unregisterJoinHandler() {
+ this.joinHandler = null;
+ }
+
+ CompletableFuture leave(byte[] request) {
+ if (leaveHandler != null) {
+ return leaveHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerLeaveHandler(Function> handler) {
+ this.leaveHandler = handler;
+ }
+
+ @Override
+ public void unregisterLeaveHandler() {
+ this.leaveHandler = null;
+ }
+
+ CompletableFuture configure(byte[] request) {
+ if (configureHandler != null) {
+ return configureHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerConfigureHandler(Function> handler) {
+ this.configureHandler = handler;
+ }
+
+ @Override
+ public void unregisterConfigureHandler() {
+ this.configureHandler = null;
+ }
+
+ CompletableFuture reconfigure(byte[] request) {
+ if (reconfigureHandler != null) {
+ return reconfigureHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerReconfigureHandler(Function> handler) {
+ this.reconfigureHandler = handler;
+ }
+
+ @Override
+ public void unregisterReconfigureHandler() {
+ this.reconfigureHandler = null;
+ }
+
+ CompletableFuture install(byte[] request) {
+ if (installHandler != null) {
+ return installHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerInstallHandler(Function> handler) {
+ this.installHandler = handler;
+ }
+
+ @Override
+ public void unregisterInstallHandler() {
+ this.installHandler = null;
+ }
+
+ CompletableFuture poll(byte[] request) {
+ if (pollHandler != null) {
+ return pollHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerPollHandler(Function> handler) {
+ this.pollHandler = handler;
+ }
+
+ @Override
+ public void unregisterPollHandler() {
+ this.pollHandler = null;
+ }
+
+ CompletableFuture vote(byte[] request) {
+ if (voteHandler != null) {
+ return voteHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerVoteHandler(Function> handler) {
+ this.voteHandler = handler;
+ }
+
+ @Override
+ public void unregisterVoteHandler() {
+ this.voteHandler = null;
+ }
+
+ @Override
+ public void registerTransferHandler(Function> handler) {
+ this.transferHandler = handler;
+ }
+
+ @Override
+ public void unregisterTransferHandler() {
+ this.transferHandler = null;
+ }
+
+ CompletableFuture transfer(byte[] request) {
+ if (transferHandler != null) {
+ return transferHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ CompletableFuture append(byte[] request) {
+ if (appendHandler != null) {
+ return appendHandler.apply(decode(request)).thenApply(this::encode);
+ } else {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ }
+
+ @Override
+ public void registerAppendHandler(Function> handler) {
+ this.appendHandler = handler;
+ }
+
+ @Override
+ public void unregisterAppendHandler() {
+ this.appendHandler = null;
+ }
+
+ void reset(long sessionId, byte[] request) {
+ Consumer listener = resetListeners.get(sessionId);
+ if (listener != null) {
+ listener.accept(decode(request));
+ }
+ }
+
+ @Override
+ public void registerResetListener(SessionId sessionId,
+ Consumer listener, Executor executor) {
+ resetListeners.put(sessionId.id(), request -> executor.execute(()
+ -> listener.accept(request)));
+ }
+
+ @Override
+ public void unregisterResetListener(SessionId sessionId) {
+ resetListeners.remove(sessionId.id());
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftClientMessagingProtocol.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftClientMessagingProtocol.java
new file mode 100644
index 00000000000..8d2b42507fd
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftClientMessagingProtocol.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.protocol;
+
+import io.atomix.cluster.MemberId;
+import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.primitive.session.SessionId;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.utils.net.Address;
+import io.atomix.utils.serializer.Serializer;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Raft client messaging service protocol.
+ */
+public class RaftClientMessagingProtocol extends RaftMessagingProtocol
+ implements RaftClientProtocol {
+ public RaftClientMessagingProtocol(MessagingService messagingService,
+ Serializer serializer,
+ Function addressProvider) {
+ super(messagingService, serializer, addressProvider);
+ }
+
+ @Override
+ public CompletableFuture openSession(MemberId memberId,
+ OpenSessionRequest request) {
+ return sendAndReceive(memberId, "open-session", request);
+ }
+
+ @Override
+ public CompletableFuture closeSession(MemberId memberId,
+ CloseSessionRequest request) {
+ return sendAndReceive(memberId, "close-session", request);
+ }
+
+ @Override
+ public CompletableFuture keepAlive(MemberId memberId,
+ KeepAliveRequest request) {
+ return sendAndReceive(memberId, "keep-alive", request);
+ }
+
+ @Override
+ public CompletableFuture query(MemberId memberId, QueryRequest request) {
+ return sendAndReceive(memberId, "query", request);
+ }
+
+ @Override
+ public CompletableFuture command(MemberId memberId,
+ CommandRequest request) {
+ return sendAndReceive(memberId, "command", request);
+ }
+
+ @Override
+ public CompletableFuture metadata(MemberId memberId,
+ MetadataRequest request) {
+ return sendAndReceive(memberId, "metadata", request);
+ }
+
+ @Override
+ public void registerHeartbeatHandler(Function> handler) {
+ registerHandler("heartbeat", handler);
+ }
+
+ @Override
+ public void unregisterHeartbeatHandler() {
+ unregisterHandler("heartbeat");
+ }
+
+ @Override
+ public void reset(Set members, ResetRequest request) {
+ for (MemberId memberId : members) {
+ sendAsync(memberId, String.format("reset-%d", request.session()), request);
+ }
+ }
+
+ @Override
+ public void registerPublishListener(SessionId sessionId, Consumer listener,
+ Executor executor) {
+ messagingService.registerHandler(String.format("publish-%d", sessionId.id()), (e, p) -> {
+ listener.accept(serializer.decode(p));
+ }, executor);
+ }
+
+ @Override
+ public void unregisterPublishListener(SessionId sessionId) {
+ messagingService.unregisterHandler(String.format("publish-%d", sessionId.id()));
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftMessagingProtocol.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftMessagingProtocol.java
new file mode 100644
index 00000000000..d67dbb88b46
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftMessagingProtocol.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.protocol;
+
+import io.atomix.cluster.MemberId;
+import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.utils.concurrent.Futures;
+import io.atomix.utils.net.Address;
+import io.atomix.utils.serializer.Serializer;
+
+import java.net.ConnectException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Messaging service based Raft protocol.
+ */
+public abstract class RaftMessagingProtocol {
+ protected final MessagingService messagingService;
+ protected final Serializer serializer;
+ private final Function addressProvider;
+
+ public RaftMessagingProtocol(MessagingService messagingService,
+ Serializer serializer,
+ Function addressProvider) {
+ this.messagingService = messagingService;
+ this.serializer = serializer;
+ this.addressProvider = addressProvider;
+ }
+
+ protected Address address(MemberId memberId) {
+ return addressProvider.apply(memberId);
+ }
+
+ protected CompletableFuture sendAndReceive(MemberId memberId,
+ String type, T request) {
+ Address address = address(memberId);
+ if (address == null) {
+ return Futures.exceptionalFuture(new ConnectException());
+ }
+ return messagingService.sendAndReceive(address, type, serializer.encode(request))
+ .thenApply(serializer::decode);
+ }
+
+ protected CompletableFuture sendAsync(MemberId memberId, String type, Object request) {
+ Address address = address(memberId);
+ if (address != null) {
+ return messagingService.sendAsync(address(memberId), type, serializer.encode(request));
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ protected void registerHandler(String type, Function> handler) {
+ messagingService.registerHandler(type, (e, p) -> {
+ CompletableFuture future = new CompletableFuture<>();
+ handler.apply(serializer.decode(p)).whenComplete((result, error) -> {
+ if (error == null) {
+ future.complete(serializer.encode(result));
+ } else {
+ future.completeExceptionally(error);
+ }
+ });
+ return future;
+ });
+ }
+
+ protected void unregisterHandler(String type) {
+ messagingService.unregisterHandler(type);
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java
new file mode 100644
index 00000000000..bae52bfc059
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java
@@ -0,0 +1,346 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.protocol;
+
+import io.atomix.cluster.MemberId;
+import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.primitive.session.SessionId;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.JoinRequest;
+import io.atomix.protocols.raft.protocol.JoinResponse;
+import io.atomix.protocols.raft.protocol.LeaveRequest;
+import io.atomix.protocols.raft.protocol.LeaveResponse;
+import io.atomix.protocols.raft.protocol.ConfigureRequest;
+import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.ReconfigureRequest;
+import io.atomix.protocols.raft.protocol.ReconfigureResponse;
+import io.atomix.protocols.raft.protocol.InstallRequest;
+import io.atomix.protocols.raft.protocol.InstallResponse;
+import io.atomix.protocols.raft.protocol.PollRequest;
+import io.atomix.protocols.raft.protocol.PollResponse;
+import io.atomix.protocols.raft.protocol.VoteRequest;
+import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.protocol.TransferRequest;
+import io.atomix.protocols.raft.protocol.TransferResponse;
+import io.atomix.protocols.raft.protocol.AppendRequest;
+import io.atomix.protocols.raft.protocol.AppendResponse;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.utils.net.Address;
+import io.atomix.utils.serializer.Serializer;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Raft server messaging protocol between Raft Servers for each server in the cluster.
+ */
+public class RaftServerMessagingProtocol extends RaftMessagingProtocol
+ implements RaftServerProtocol {
+ public RaftServerMessagingProtocol(MessagingService messagingService,
+ Serializer serializer,
+ Function addressProvider) {
+ super(messagingService, serializer, addressProvider);
+ }
+
+ @Override
+ public CompletableFuture openSession(MemberId memberId,
+ OpenSessionRequest request) {
+ return sendAndReceive(memberId, "open-session", request);
+ }
+
+ @Override
+ public CompletableFuture closeSession(MemberId memberId,
+ CloseSessionRequest request) {
+ return sendAndReceive(memberId, "close-session", request);
+ }
+
+ @Override
+ public CompletableFuture keepAlive(MemberId memberId,
+ KeepAliveRequest request) {
+ return sendAndReceive(memberId, "keep-alive", request);
+ }
+
+ @Override
+ public CompletableFuture query(MemberId memberId, QueryRequest request) {
+ return sendAndReceive(memberId, "query", request);
+ }
+
+ @Override
+ public CompletableFuture command(MemberId memberId,
+ CommandRequest request) {
+ return sendAndReceive(memberId, "command", request);
+ }
+
+ @Override
+ public CompletableFuture metadata(MemberId memberId,
+ MetadataRequest request) {
+ return sendAndReceive(memberId, "metadata", request);
+ }
+
+ @Override
+ public CompletableFuture join(MemberId memberId, JoinRequest request) {
+ return sendAndReceive(memberId, "join", request);
+ }
+
+ @Override
+ public CompletableFuture leave(MemberId memberId, LeaveRequest request) {
+ return sendAndReceive(memberId, "leave", request);
+ }
+
+ @Override
+ public CompletableFuture configure(MemberId memberId,
+ ConfigureRequest request) {
+ return sendAndReceive(memberId, "configure", request);
+ }
+
+ @Override
+ public CompletableFuture reconfigure(MemberId memberId,
+ ReconfigureRequest request) {
+ return sendAndReceive(memberId, "reconfigure", request);
+ }
+
+ @Override
+ public CompletableFuture install(MemberId memberId, InstallRequest request) {
+ return sendAndReceive(memberId, "install", request);
+ }
+
+ @Override
+ public CompletableFuture transfer(MemberId memberId,
+ TransferRequest request) {
+ return sendAndReceive(memberId, "transfer", request);
+ }
+
+ @Override
+ public CompletableFuture poll(MemberId memberId, PollRequest request) {
+ return sendAndReceive(memberId, "poll", request);
+ }
+
+ @Override
+ public CompletableFuture vote(MemberId memberId, VoteRequest request) {
+ return sendAndReceive(memberId, "vote", request);
+ }
+
+ @Override
+ public CompletableFuture append(MemberId memberId, AppendRequest request) {
+ return sendAndReceive(memberId, "append", request);
+ }
+
+ @Override
+ public void publish(MemberId memberId, PublishRequest request) {
+ sendAsync(memberId, String.format("publish-%d", request.session()), request);
+ }
+
+ @Override
+ public CompletableFuture heartbeat(MemberId memberId,
+ HeartbeatRequest request) {
+ return sendAndReceive(memberId, "heartbeat", request);
+ }
+
+ @Override
+ public void registerOpenSessionHandler(Function> handler) {
+ registerHandler("open-session", handler);
+ }
+
+ @Override
+ public void unregisterOpenSessionHandler() {
+ unregisterHandler("open-session");
+ }
+
+ @Override
+ public void registerCloseSessionHandler(Function> handler) {
+ registerHandler("close-session", handler);
+ }
+
+ @Override
+ public void unregisterCloseSessionHandler() {
+ unregisterHandler("close-session");
+ }
+
+ @Override
+ public void registerKeepAliveHandler(Function> handler) {
+ registerHandler("keep-alive", handler);
+ }
+
+ @Override
+ public void unregisterKeepAliveHandler() {
+ unregisterHandler("keep-alive");
+ }
+
+ @Override
+ public void registerQueryHandler(Function> handler) {
+ registerHandler("query", handler);
+ }
+
+ @Override
+ public void unregisterQueryHandler() {
+ unregisterHandler("query");
+ }
+
+ @Override
+ public void registerCommandHandler(Function> handler) {
+ registerHandler("command", handler);
+ }
+
+ @Override
+ public void unregisterCommandHandler() {
+ unregisterHandler("command");
+ }
+
+ @Override
+ public void registerMetadataHandler(Function> handler) {
+ registerHandler("metadata", handler);
+ }
+
+ @Override
+ public void unregisterMetadataHandler() {
+ unregisterHandler("metadata");
+ }
+
+ @Override
+ public void registerJoinHandler(Function> handler) {
+ registerHandler("join", handler);
+ }
+
+ @Override
+ public void unregisterJoinHandler() {
+ unregisterHandler("join");
+ }
+
+ @Override
+ public void registerLeaveHandler(Function> handler) {
+ registerHandler("leave", handler);
+ }
+
+ @Override
+ public void unregisterLeaveHandler() {
+ unregisterHandler("leave");
+ }
+
+ @Override
+ public void registerConfigureHandler(Function> handler) {
+ registerHandler("configure", handler);
+ }
+
+ @Override
+ public void unregisterConfigureHandler() {
+ unregisterHandler("configure");
+ }
+
+ @Override
+ public void registerReconfigureHandler(Function> handler) {
+ registerHandler("reconfigure", handler);
+ }
+
+ @Override
+ public void unregisterReconfigureHandler() {
+ unregisterHandler("reconfigure");
+ }
+
+ @Override
+ public void registerInstallHandler(Function> handler) {
+ registerHandler("install", handler);
+ }
+
+ @Override
+ public void unregisterInstallHandler() {
+ unregisterHandler("install");
+ }
+
+ @Override
+ public void registerTransferHandler(Function> handler) {
+ registerHandler("transfer", handler);
+ }
+
+ @Override
+ public void unregisterTransferHandler() {
+ unregisterHandler("transfer");
+ }
+
+ @Override
+ public void registerPollHandler(Function> handler) {
+ registerHandler("poll", handler);
+ }
+
+ @Override
+ public void unregisterPollHandler() {
+ unregisterHandler("poll");
+ }
+
+ @Override
+ public void registerVoteHandler(Function> handler) {
+ registerHandler("vote", handler);
+ }
+
+ @Override
+ public void unregisterVoteHandler() {
+ unregisterHandler("vote");
+ }
+
+ @Override
+ public void registerAppendHandler(Function> handler) {
+ registerHandler("append", handler);
+ }
+
+ @Override
+ public void unregisterAppendHandler() {
+ unregisterHandler("append");
+ }
+
+ @Override
+ public void registerResetListener(SessionId sessionId,
+ Consumer listener, Executor executor) {
+ messagingService.registerHandler(String.format("reset-%d", sessionId.id()), (e, p) -> {
+ listener.accept(serializer.decode(p));
+ }, executor);
+ }
+
+ @Override
+ public void unregisterResetListener(SessionId sessionId) {
+ messagingService.unregisterHandler(String.format("reset-%d", sessionId.id()));
+ }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 2b2f3b6fac6..98810031879 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -640,6 +640,31 @@ public String getZeppelinSearchTempPath() {
return getRelativeDir(ConfVars.ZEPPELIN_SEARCH_TEMP_PATH);
}
+ public String getClusterAddress() {
+ return getString(ConfVars.ZEPPELIN_CLUSTER_ADDR);
+ }
+
+ public void setClusterAddress(String clusterAddr) {
+ properties.put(ConfVars.ZEPPELIN_CLUSTER_ADDR.getVarName(), clusterAddr);
+ }
+
+ public boolean isClusterMode() {
+ String clusterAddr = getString(ConfVars.ZEPPELIN_CLUSTER_ADDR);
+ if (StringUtils.isEmpty(clusterAddr)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public int getClusterHeartbeatInterval() {
+ return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL);
+ }
+
+ public int getClusterHeartbeatTimeout() {
+ return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT);
+ }
+
public Map dumpConfigurations(Predicate predicate) {
Map properties = new HashMap<>();
@@ -782,6 +807,10 @@ public enum ConfVars {
ZEPPELIN_OWNER_ROLE("zeppelin.notebook.default.owner.username", ""),
+ ZEPPELIN_CLUSTER_ADDR("zeppelin.cluster.addr", ""),
+ ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL("zeppelin.cluster.heartbeat.interval", 3000),
+ ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT("zeppelin.cluster.heartbeat.timeout", 9000),
+
ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL("zeppelin.notebook.git.remote.url", ""),
ZEPPELIN_NOTEBOOK_GIT_REMOTE_USERNAME("zeppelin.notebook.git.remote.username", "token"),
ZEPPELIN_NOTEBOOK_GIT_REMOTE_ACCESS_TOKEN("zeppelin.notebook.git.remote.access-token", ""),
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index 9f0e13bd4d5..8e67dc1ee25 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -37,7 +37,7 @@
2.7.3
- 3.4
+ 3.7
1.5.2
2.2.1
5.3.1
@@ -127,12 +127,6 @@
gson
-
- com.google.guava
- guava
- 20.0
-
-
org.apache.lucene
lucene-core
@@ -258,6 +252,9 @@
${project.build.directory}/tmp
+
+ 1
+
@@ -357,87 +354,103 @@
com.google.guava
guava
-
-
-
-
- org.apache.hadoop
- hadoop-common
- ${hadoop.version}
- tests
- test
-
-
- com.sun.jersey
- jersey-core
-
-
- com.sun.jersey
- jersey-json
-
-
- com.sun.jersey
- jersey-client
-
-
- com.sun.jersey
- jersey-server
-
-
- javax.servlet
- servlet-api
-
-
- org.apache.avro
- avro
-
-
- org.apache.jackrabbit
- jackrabbit-webdav
-
-
- io.netty
- netty
-
- commons-httpclient
- commons-httpclient
-
-
- org.eclipse.jgit
- org.eclipse.jgit
-
-
- com.jcraft
- jsch
+ com.google.code.findbugs
+ jsr305
org.apache.commons
- commons-compress
-
-
- xml-apis
- xml-apis
-
-
- xerces
- xercesImpl
-
-
- org.codehaus.jackson
- jackson-mapper-asl
-
-
- org.codehaus.jackson
- jackson-core-asl
-
-
- com.google.guava
- guava
+ commons-math3
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ tests
+ test
+
+
+ com.sun.jersey
+ jersey-core
+
+
+ com.sun.jersey
+ jersey-json
+
+
+ com.sun.jersey
+ jersey-client
+
+
+ com.sun.jersey
+ jersey-server
+
+
+ javax.servlet
+ servlet-api
+
+
+ org.apache.avro
+ avro
+
+
+ org.apache.jackrabbit
+ jackrabbit-webdav
+
+
+ io.netty
+ netty
+
+
+ commons-httpclient
+ commons-httpclient
+
+
+ org.eclipse.jgit
+ org.eclipse.jgit
+
+
+ com.jcraft
+ jsch
+
+
+ org.apache.commons
+ commons-compress
+
+
+ xml-apis
+ xml-apis
+
+
+ xerces
+ xercesImpl
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
+
+ org.codehaus.jackson
+ jackson-core-asl
+
+
+ com.google.guava
+ guava
+
+
+ com.google.code.findbugs
+ jsr305
+
+
+ org.apache.commons
+ commons-math3
+
+
+
+
org.apache.hadoop
hadoop-hdfs
@@ -588,29 +601,10 @@
${project.version}
test
+
- com.google.protobuf
- protobuf-java
-
-
- com.google.protobuf
- protobuf-java-util
-
-
- com.google.guava
- guava
-
-
- com.google.errorprone
- error_prone_annotations
-
-
- io.grpc
- grpc-context
-
-
- com.google.api.grpc
- proto-google-common-protos
+ org.apache.zeppelin
+ zeppelin-python
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
index ee9f15cb787..d9000318dd7 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
@@ -25,6 +25,8 @@
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Properties;
@@ -32,6 +34,8 @@
public class MockInterpreterResourcePool extends Interpreter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MockInterpreterResourcePool.class);
+
AtomicInteger numWatch = new AtomicInteger(0);
public MockInterpreterResourcePool(Properties property) {
@@ -86,11 +90,14 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
ret = resourcePool.getAll();
} else if (cmd.equals("invoke")) {
Resource resource = resourcePool.get(noteId, paragraphId, name);
+ LOGGER.info("Resource: " + resource);
if (stmt.length >=4) {
Resource res = resource.invokeMethod(value, null, null, stmt[3]);
+ LOGGER.info("After invokeMethod: " + resource);
ret = res.get();
} else {
ret = resource.invokeMethod(value, null, null);
+ LOGGER.info("After invokeMethod: " + ret);
}
}
@@ -119,7 +126,7 @@ public int getProgress(InterpreterContext context) {
@Override
public List completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
+ InterpreterContext interpreterContext) {
return null;
}
}
diff --git a/zeppelin-zengine/src/test/resources/commons-logging-1.1.1.jar b/zeppelin-zengine/src/test/resources/commons-logging-1.1.1.jar
new file mode 100644
index 00000000000..1deef144cb1
Binary files /dev/null and b/zeppelin-zengine/src/test/resources/commons-logging-1.1.1.jar differ
diff --git a/zeppelin-zengine/src/test/resources/gson-2.2.jar b/zeppelin-zengine/src/test/resources/gson-2.2.jar
new file mode 100644
index 00000000000..e0576b4b5cb
Binary files /dev/null and b/zeppelin-zengine/src/test/resources/gson-2.2.jar differ
diff --git a/zeppelin-zengine/src/test/resources/log4j-1.2.17.jar b/zeppelin-zengine/src/test/resources/log4j-1.2.17.jar
new file mode 100644
index 00000000000..1d425cf7d7e
Binary files /dev/null and b/zeppelin-zengine/src/test/resources/log4j-1.2.17.jar differ
diff --git a/zeppelin-zengine/src/test/resources/log4j.properties b/zeppelin-zengine/src/test/resources/log4j.properties
index 843d41593e5..fd9771cb474 100644
--- a/zeppelin-zengine/src/test/resources/log4j.properties
+++ b/zeppelin-zengine/src/test/resources/log4j.properties
@@ -47,4 +47,4 @@ log4j.logger.org.apache.zeppelin.plugin=DEBUG
log4j.logger.org.apache.zeppelin.spark=DEBUG
log4j.logger.org.apache.zeppelin.python=DEBUG
-log4j.logger.org.quartz.core=DEBUG
+log4j.logger.org.apache.zeppelin.interpreter=DEBUG
diff --git a/zeppelin-zengine/src/test/resources/slf4j-api-1.7.10.jar b/zeppelin-zengine/src/test/resources/slf4j-api-1.7.10.jar
new file mode 100644
index 00000000000..744e9ec5b0d
Binary files /dev/null and b/zeppelin-zengine/src/test/resources/slf4j-api-1.7.10.jar differ
diff --git a/zeppelin-zengine/src/test/resources/slf4j-log4j12-1.7.10.jar b/zeppelin-zengine/src/test/resources/slf4j-log4j12-1.7.10.jar
new file mode 100644
index 00000000000..957b2b158a8
Binary files /dev/null and b/zeppelin-zengine/src/test/resources/slf4j-log4j12-1.7.10.jar differ