From 721791a41e4cf0fbcff77fd7d5e19194813cd317 Mon Sep 17 00:00:00 2001 From: "nov.lzf" Date: Thu, 10 Sep 2020 10:56:36 +0800 Subject: [PATCH] =?UTF-8?q?optmize=20rpc=20client=20lifecycle=20=EF=BC=9Br?= =?UTF-8?q?equest=20timeout=20to=20rpc=20client=EF=BC=9B=20some=20bugfix?= =?UTF-8?q?=20(#3797)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * optmize rpc client lifecycle and some bugfix * get server load optimize; add request timeout to rpc client --- .../client/config/impl/ClientWorker.java | 1 + .../com/alibaba/nacos/client/ConfigTest.java | 15 +- .../common/remote/client/Connection.java | 1 + .../nacos/common/remote/client/RpcClient.java | 137 +++++++++++++----- .../remote/client/RpcClientFactory.java | 3 +- .../common/remote/client/RpcClientStatus.java | 12 +- .../common/remote/client/grpc/GrpcClient.java | 130 +++++++---------- .../remote/client/grpc/GrpcConnection.java | 43 +++--- .../client/rsocket/RsocketRpcClient.java | 37 +++-- .../remote/ConfigConnectionEventListener.java | 3 +- .../src/main/resources/application.properties | 5 + .../core/cluster/ServerMemberManager.java | 6 +- .../cluster/remote/ClusterRpcClientProxy.java | 16 +- .../nacos/core/remote/BaseRpcServer.java | 2 +- .../core/ServerLoaderInfoRequestHandler.java | 15 +- .../core/remote/grpc/BaseGrpcServer.java | 2 +- ...eRpcServer.java => GrpcClusterServer.java} | 2 +- .../core/remote/rsocket/RsocketRpcServer.java | 7 +- 18 files changed, 241 insertions(+), 196 deletions(-) rename core/src/main/java/com/alibaba/nacos/core/remote/grpc/{BaseRpcServer.java => GrpcClusterServer.java} (94%) diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index eedf278b009..767fb25e974 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -585,6 +585,7 @@ public void onConnected() { @Override public void onDisConnect() { + System.out.println("clear listen context..."); Collection values = cacheMap.get().values(); for (CacheData cacheData : values) { diff --git a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java index 4c292ec9643..4be25103ec8 100644 --- a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java @@ -55,8 +55,8 @@ public class ConfigTest { public void before() throws Exception { Properties properties = new Properties(); //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160..148:8848,127.0.0.1:8848,127.0.0.1:8848"); - properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); - //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848,11.160.144.149:8848"); + //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.149:8848,11.160.144.148:8848,127.0.0.1:8848"); //"11.239.114.187:8848,,11.239.113.204:8848,11.239.112.161:8848"); //"11.239.114.187:8848"); configService = NacosFactory.createConfigService(properties); @@ -234,7 +234,7 @@ public void test() throws Exception { Random random = new Random(); final String dataId = "xiaochun.xxc"; final String group = "xiaochun.xxc"; - final String content = "lessspring-" + System.currentTimeMillis(); + // final String content = "lessspring-" + System.currentTimeMillis(); Thread th = new Thread(new Runnable() { @Override @@ -244,10 +244,12 @@ public void run() { int times = 1000; while (times > 0) { try { - configService.publishConfig(dataId, group, "value" + System.currentTimeMillis()); + String content1 = "value" + System.currentTimeMillis(); + System.out.println("publish content:" + content1); + configService.publishConfig(dataId, group, content1); times--; - Thread.sleep(500L); + Thread.sleep(2000L); } catch (Exception e) { e.printStackTrace(); } @@ -264,7 +266,8 @@ public void run() { Listener listener = new AbstractListener() { @Override public void receiveConfigInfo(String configInfo) { - System.out.println("receiveConfigInfo1 :" + configInfo); + System.out.println("receiveConfigInfo1 content:" + configInfo + "," + System.currentTimeMillis()); + } }; diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java index 740030bfc4c..5e0d4e6a44e 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java @@ -68,6 +68,7 @@ public boolean isAbandon() { /** * Setter method for property abandon. + * connection event will be ignored if connection is abandoned. * * @param abandon value to be assigned to property abandon */ diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java index 913c2b3ee40..3ad0f89824a 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java @@ -61,8 +61,6 @@ public abstract class RpcClient implements Closeable { private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RpcClient.class); - protected static final long ACTIVE_INTERNAL = 3000L; - private ServerListFactory serverListFactory; protected LinkedBlockingQueue eventLinkedBlockingQueue = new LinkedBlockingQueue(); @@ -70,8 +68,6 @@ public abstract class RpcClient implements Closeable { protected volatile AtomicReference rpcClientStatus = new AtomicReference( RpcClientStatus.WAIT_INIT); - private long activeTimeStamp = System.currentTimeMillis(); - protected ScheduledExecutorService executorService; protected volatile Connection currentConnetion; @@ -144,14 +140,6 @@ protected void notifyConnected() { } } - protected boolean overActiveTime() { - return System.currentTimeMillis() - this.activeTimeStamp > ACTIVE_INTERNAL; - } - - protected void refereshActiveTimestamp() { - this.activeTimeStamp = System.currentTimeMillis(); - } - /** * check is this client is inited. * @@ -170,6 +158,15 @@ public boolean isRunning() { return this.rpcClientStatus.get() == RpcClientStatus.RUNNING; } + /** + * check is this client is shutdwon. + * + * @return + */ + public boolean isShutdwon() { + return this.rpcClientStatus.get() == RpcClientStatus.SHUTDOWN; + } + /** * init server list factory. * @@ -199,8 +196,8 @@ public void initLabels(Map labels) { /** * Start this client. */ - public void start() throws NacosException { - + public final void start() throws NacosException { + boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITED, RpcClientStatus.STARTING); if (!success) { return; @@ -296,6 +293,25 @@ public Response requestReply(Request request) { } }); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + RpcClient.this.shutdown(); + } catch (NacosException e) { + e.printStackTrace(); + } + + } + }); + + } + + @Override + public void shutdown() throws NacosException { + executorService.shutdown(); + rpcClientStatus.set(RpcClientStatus.SHUTDOWN); + closeConnection(currentConnetion); } private final ReentrantLock switchingLock = new ReentrantLock(); @@ -309,7 +325,7 @@ public void switchServerAsync() { /** * 1.判断当前是否正在重连中 2.如果正在重连中,则直接返回;如果不在重连中,则启动重连 3.重连逻辑:创建一个新的连接,如果连接可用 */ - protected void switchServerAsync(final ServerInfo serverInfoTryOnce) { + protected void switchServerAsync(final ServerInfo recommendServerInfo) { //return if is in switching of other thread. if (switchingFlag.get()) { @@ -321,24 +337,24 @@ public void run() { try { - AtomicReference serverInfoTryOnceInner = new AtomicReference( - serverInfoTryOnce); + AtomicReference recommendServer = new AtomicReference(recommendServerInfo); //only one thread can execute switching meantime. boolean innerLock = switchingLock.tryLock(); if (!innerLock) { return; } - switchingFlag.set(true); + switchingFlag.compareAndSet(false, true); // loop until start client success. boolean switchSuccess = false; - while (!switchSuccess) { - + while (!switchSuccess && !isShutdwon()) { + //1.get a new server ServerInfo serverInfo = null; //2.create a new channel to new server try { - serverInfo = serverInfoTryOnceInner.get() == null ? nextRpcServer() - : serverInfoTryOnceInner.get(); + serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get(); + System.out.println(RpcClient.this.name + "trying to connect server:" + serverInfo); + Connection connectNew = connectToServer(serverInfo); if (connectNew != null) { System.out.println(RpcClient.this.name + "-success to connect server:" + serverInfo); @@ -356,11 +372,15 @@ public void run() { System.out.println(RpcClient.this.name + "-fail to connect server:" + serverInfo); } + if (isShutdwon()) { + closeConnection(connectNew); + } + } catch (Exception e) { System.out.println(RpcClient.this.name + "-fail to connect server:" + serverInfo + " ,error message is " + e.getMessage()); } finally { - serverInfoTryOnceInner.set(null); + recommendServer.set(null); } try { @@ -372,7 +392,9 @@ public void run() { } } - System.out.println(RpcClient.this.name + "-success to connect server,return"); + if (isShutdwon()) { + System.out.println(RpcClient.this.name + " is shutdown ,stop reconnect to server :"); + } } catch (Exception e) { e.printStackTrace(); @@ -391,6 +413,10 @@ private void closeConnection(Connection connection) { } } + protected boolean connectionAbandon() { + return !(currentConnetion != null) && currentConnetion.isAbandon(); + } + /** * get connection type of this client. * @@ -428,24 +454,40 @@ public ServerInfo getCurrentServer() { * @return */ public Response request(Request request) throws NacosException { - int retryTimes = 3; + return request(request, 3000L); + } + /** + * send request. + * + * @param request request. + * @return + */ + public Response request(Request request, long timeoutMills) throws NacosException { + int retryTimes = 3; + Response response = null; Exception exceptionToThrow = null; while (retryTimes > 0) { try { - if (this.currentConnetion == null) { + if (this.currentConnetion == null || !isRunning()) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "client not connected."); } - Response response = this.currentConnetion.request(request, buildMeta()); - if (response != null && response instanceof ConnectionUnregisterResponse) { - synchronized (this) { - clearContextOnResetRequest(); - switchServerAsync(); - throw new IllegalStateException("Invalid client status."); + response = this.currentConnetion.request(request, buildMeta()); + + if (response != null) { + if (response instanceof ConnectionUnregisterResponse) { + synchronized (this) { + clearContextOnResetRequest(); + if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { + switchServerAsync(); + } + throw new IllegalStateException("Invalid client status."); + } + } else { + return response; } } - refereshActiveTimestamp(); - return response; + } catch (Exception e) { LoggerUtils.printIfErrorEnabled(LOGGER, "Fail to send request,request={},errorMesssage={}", request, e.getMessage()); @@ -454,6 +496,11 @@ public Response request(Request request) throws NacosException { retryTimes--; } + + if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { + switchServerAsync(); + } + if (exceptionToThrow != null) { throw new NacosException(SERVER_ERROR, exceptionToThrow); } @@ -476,7 +523,6 @@ public void asyncRequest(Request request, RequestCallBack callback) throws Nacos throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "client not connected."); } this.currentConnetion.asyncRequest(request, buildMeta(), callback); - refereshActiveTimestamp(); return; } catch (Exception e) { LoggerUtils.printIfErrorEnabled(LOGGER, "Fail to send request,request={},errorMesssage={}", request, @@ -522,6 +568,7 @@ public RequestFuture requestFuture(Request request) throws NacosException { * @return response. */ protected Response handleServerRequest(final Request request) { + for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) { Response response = serverRequestHandler.requestReply(request); if (response != null) { @@ -532,7 +579,7 @@ protected Response handleServerRequest(final Request request) { } /** - * register connection handler.will be notified wher inner connect changed. + * register connection handler.will be notified when inner connect changed. * * @param connectionEventListener connectionEventListener */ @@ -557,6 +604,24 @@ public void registerServerPushResponseHandler(ServerRequestHandler serverRequest this.serverRequestHandlers.add(serverRequestHandler); } + /** + * Getter method for property name. + * + * @return property value of name + */ + public String getName() { + return name; + } + + /** + * Setter method for property name. + * + * @param name value to be assigned to property name + */ + public void setName(String name) { + this.name = name; + } + /** * Getter method for property serverListFactory. * diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java index 6dc5661c789..2b7872950eb 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java @@ -52,11 +52,10 @@ public static Set> getAllClientEntrys() { * @param clientName client name. */ public static void destroyClient(String clientName) throws NacosException { - RpcClient rpcClient = clientMap.get(clientName); + RpcClient rpcClient = clientMap.remove(clientName); if (rpcClient != null) { rpcClient.shutdown(); } - clientMap.remove(clientName); } public static RpcClient getClient(String clientName) { diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientStatus.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientStatus.java index de636770c77..28edbd940a5 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientStatus.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientStatus.java @@ -42,7 +42,17 @@ public enum RpcClientStatus { /** * running. */ - RUNNING(3, "client is running..."); + RUNNING(4, "client is running..."), + + /** + * is in starting. + */ + UNHEALTHY(3, "client unhealthy,may closed by server,in rereconnecting"), + + /** + * running. + */ + SHUTDOWN(5, "client is shutdown..."); int status; diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java index 432272792a1..4815ae3277c 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java @@ -16,33 +16,29 @@ package com.alibaba.nacos.common.remote.client.grpc; -import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc; -import com.alibaba.nacos.api.grpc.auto.Metadata; import com.alibaba.nacos.api.grpc.auto.Payload; import com.alibaba.nacos.api.grpc.auto.RequestGrpc; -import com.alibaba.nacos.api.grpc.auto.RequestStreamGrpc; import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest; -import com.alibaba.nacos.api.remote.request.HeartBeatRequest; import com.alibaba.nacos.api.remote.request.PushAckRequest; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.ServerCheckRequest; -import com.alibaba.nacos.api.remote.response.ConnectResetResponse; import com.alibaba.nacos.api.remote.response.Response; -import com.alibaba.nacos.api.utils.NetUtils; import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.remote.GrpcUtils; import com.alibaba.nacos.common.remote.client.Connection; import com.alibaba.nacos.common.remote.client.RpcClient; -import com.alibaba.nacos.common.utils.VersionUtils; +import com.alibaba.nacos.common.remote.client.RpcClientStatus; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.TimeUnit; +import java.util.Date; /** * gRPC Client. @@ -102,47 +98,6 @@ private void shuntDownChannel(ManagedChannel managedChannel) { } } - /** - * Send Heart Beat Request. - */ - public void sendBeat() { - - int maxRetryTimes = 3; - while (maxRetryTimes > 0) { - - try { - if (!isRunning() && !overActiveTime()) { - return; - } - HeartBeatRequest heartBeatRequest = new HeartBeatRequest(); - Response heartBeatResponse = this.currentConnetion.request(heartBeatRequest, buildMeta()); - if (heartBeatResponse != null && heartBeatResponse instanceof ConnectResetResponse) { - LOGGER.warn(" connection is not register to current server ,trying to switch server "); - switchServerAsync(); - } - return; - } catch (Exception e) { - LOGGER.warn("Send heart beat fail,server is not avaliable now,retry ... "); - maxRetryTimes--; - LOGGER.error("Send heart beat error, ", e); - } - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - //No nothing. - } - } - - LOGGER.warn("max retry times for send heart beat fail reached,trying to switch server... "); - switchServerAsync(); - } - - private Metadata buildGrpcMeta() { - Metadata meta = Metadata.newBuilder().setClientIp(NetUtils.localIP()) - .setClientVersion(VersionUtils.getFullClientVersion()).putAllLabels(labels).build(); - return meta; - } - /** * chenck server if ok. * @@ -164,13 +119,14 @@ private boolean serverCheck(RequestGrpc.RequestFutureStub requestBlockingStub) { } } - private StreamObserver bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub) { + private StreamObserver bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub, + final GrpcConnection grpcConn) { final StreamObserver payloadStreamObserver = streamStub.requestBiStream(new StreamObserver() { @Override public void onNext(Payload payload) { - + LOGGER.debug(" stream server reuqust receive ,original info :{}", payload.toString()); try { final Request request = (Request) GrpcUtils.parse(payload).getBody(); @@ -181,6 +137,7 @@ public void onNext(Payload payload) { response.setRequestId(request.getRequestId()); sendResponse(response); } catch (Exception e) { + e.printStackTrace(); sendResponse(request.getRequestId(), false); } } @@ -193,15 +150,41 @@ public void onNext(Payload payload) { @Override public void onError(Throwable throwable) { + if (isRunning() && !grpcConn.isAbandon()) { + System.out.println("onError ,switch server " + this + new Date().toString()); + throwable.printStackTrace(); + if (throwable instanceof StatusRuntimeException) { + Status.Code code = ((StatusRuntimeException) throwable).getStatus().getCode(); + if (Status.UNAVAILABLE.getCode().equals(code) || Status.CANCELLED.getCode().equals(code)) { + if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { + switchServerAsync(); + } + } + } + } else { + System.out.println( + "client is not running status ,ignore error event , " + this + new Date().toString()); + + } + } @Override public void onCompleted() { - System.out.println("onCompleted ,switch server " + this); - switchServerAsync(); + if (isRunning() && !grpcConn.isAbandon()) { + System.out.println("onCompleted ,switch server " + this); + if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { + switchServerAsync(); + } + } else { + System.out.println( + "client is not running status ,ignore complete event , " + this + new Date().toString()); + + } + } }); - + return payloadStreamObserver; } @@ -222,17 +205,6 @@ private void sendResponse(Response response) { } } - @Override - public void start() throws NacosException { - super.start(); - executorService.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - sendBeat(); - } - }, 0, ACTIVE_INTERNAL, TimeUnit.MILLISECONDS); - } - @Override public Connection connectToServer(ServerInfo serverInfo) { try { @@ -243,22 +215,22 @@ public Connection connectToServer(ServerInfo serverInfo) { if (newChannelStubTemp != null) { LOGGER.info("success to create a connection to a server."); - RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc - .newStub(newChannelStubTemp.getChannel()); + BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc .newStub(newChannelStubTemp.getChannel()); - StreamObserver payloadStreamObserver = bindRequestStream(biRequestStreamStub); - GrpcConnection grpcConn = new GrpcConnection(serverInfo, payloadStreamObserver); + GrpcConnection grpcConn = new GrpcConnection(serverInfo); + + //create stream request and bind connection event to this connection. + StreamObserver payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn); + + // stream observer to send response to server + grpcConn.setPayloadStreamObserver(payloadStreamObserver); + grpcConn.setGrpcFutureServiceStub(newChannelStubTemp); + grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel()); + + //send a connection setup request. ConnectionSetupRequest conconSetupRequest = new ConnectionSetupRequest(); grpcConn.sendRequest(conconSetupRequest, buildMeta()); - - //switch current channel and stub - RequestGrpc.RequestFutureStub grpcFutureServiceStubTemp = RequestGrpc - .newFutureStub(newChannelStubTemp.getChannel()); - grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel()); - - grpcConn.setGrpcFutureServiceStub(grpcFutureServiceStubTemp); - grpcConn.setGrpcStreamServiceStub(requestStreamStubTemp); return grpcConn; } return null; @@ -268,10 +240,6 @@ public Connection connectToServer(ServerInfo serverInfo) { return null; } - @Override - public void shutdown() throws NacosException { - - } } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java index 5a8e473498c..01998288db6 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConnection.java @@ -19,7 +19,6 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.grpc.auto.Payload; import com.alibaba.nacos.api.grpc.auto.RequestGrpc; -import com.alibaba.nacos.api.grpc.auto.RequestStreamGrpc; import com.alibaba.nacos.api.remote.RequestCallBack; import com.alibaba.nacos.api.remote.RequestFuture; import com.alibaba.nacos.api.remote.RpcScheduledExecutor; @@ -54,12 +53,7 @@ public class GrpcConnection extends Connection { * grpc channel. */ protected ManagedChannel channel; - - /** - * stub to send stream request. - */ - protected RequestStreamGrpc.RequestStreamStub grpcStreamServiceStub; - + /** * stub to send request. */ @@ -67,9 +61,8 @@ public class GrpcConnection extends Connection { protected StreamObserver payloadStreamObserver; - public GrpcConnection(RpcClient.ServerInfo serverInfo, StreamObserver payloadStreamObserver) { + public GrpcConnection(RpcClient.ServerInfo serverInfo) { super(serverInfo); - this.payloadStreamObserver = payloadStreamObserver; } @Override @@ -209,38 +202,38 @@ public void setChannel(ManagedChannel channel) { } /** - * Getter method for property grpcStreamServiceStub. + * Getter method for property grpcFutureServiceStub. * - * @return property value of grpcStreamServiceStub + * @return property value of grpcFutureServiceStub */ - public RequestStreamGrpc.RequestStreamStub getGrpcStreamServiceStub() { - return grpcStreamServiceStub; + public RequestGrpc.RequestFutureStub getGrpcFutureServiceStub() { + return grpcFutureServiceStub; } /** - * Setter method for property grpcStreamServiceStub. + * Setter method for property grpcFutureServiceStub. * - * @param grpcStreamServiceStub value to be assigned to property grpcStreamServiceStub + * @param grpcFutureServiceStub value to be assigned to property grpcFutureServiceStub */ - public void setGrpcStreamServiceStub(RequestStreamGrpc.RequestStreamStub grpcStreamServiceStub) { - this.grpcStreamServiceStub = grpcStreamServiceStub; + public void setGrpcFutureServiceStub(RequestGrpc.RequestFutureStub grpcFutureServiceStub) { + this.grpcFutureServiceStub = grpcFutureServiceStub; } /** - * Getter method for property grpcFutureServiceStub. + * Getter method for property payloadStreamObserver. * - * @return property value of grpcFutureServiceStub + * @return property value of payloadStreamObserver */ - public RequestGrpc.RequestFutureStub getGrpcFutureServiceStub() { - return grpcFutureServiceStub; + public StreamObserver getPayloadStreamObserver() { + return payloadStreamObserver; } /** - * Setter method for property grpcFutureServiceStub. + * Setter method for property payloadStreamObserver. * - * @param grpcFutureServiceStub value to be assigned to property grpcFutureServiceStub + * @param payloadStreamObserver value to be assigned to property payloadStreamObserver */ - public void setGrpcFutureServiceStub(RequestGrpc.RequestFutureStub grpcFutureServiceStub) { - this.grpcFutureServiceStub = grpcFutureServiceStub; + public void setPayloadStreamObserver(StreamObserver payloadStreamObserver) { + this.payloadStreamObserver = payloadStreamObserver; } } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketRpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketRpcClient.java index e7dd3700862..8ced0689022 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketRpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/rsocket/RsocketRpcClient.java @@ -16,7 +16,6 @@ package com.alibaba.nacos.common.remote.client.rsocket; -import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest; import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.api.remote.response.ResponseCode; @@ -25,6 +24,7 @@ import com.alibaba.nacos.common.remote.RsocketUtils; import com.alibaba.nacos.common.remote.client.Connection; import com.alibaba.nacos.common.remote.client.RpcClient; +import com.alibaba.nacos.common.remote.client.RpcClientStatus; import io.rsocket.ConnectionSetupPayload; import io.rsocket.Payload; import io.rsocket.RSocket; @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import java.util.Date; import java.util.concurrent.atomic.AtomicReference; /** @@ -59,11 +60,6 @@ public RsocketRpcClient(String name) { super(name); } - @Override - public void shutdown() throws NacosException { - shutDownRsocketClient(rSocketClient.get()); - } - @Override public ConnectionType getConnectionType() { return ConnectionType.RSOCKET; @@ -141,8 +137,6 @@ void shutDownRsocketClient(RSocket client) { void fireOnCloseEvent(final RSocket rSocket, final Connection connectionInner) { Subscriber subscriber = new Subscriber() { - - Connection connection = connectionInner; @Override public void onSubscribe(Subscription subscription) { @@ -155,20 +149,35 @@ public void onNext(Void aVoid) { @Override public void onError(Throwable throwable) { - if (connectionInner.isAbandon()) { - return; + if (isRunning() && !connectionInner.isAbandon()) { + System.out.println("onError ,switch server " + this + new Date().toString()); + + if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { + switchServerAsync(); + } + } else { + System.out.println( + "client is not running status ,ignore error event , " + this + new Date().toString()); + } - switchServerAsync(); } @Override public void onComplete() { - if (connectionInner.isAbandon()) { - return; + + if (isRunning() && !connectionInner.isAbandon()) { + System.out.println("onCompleted ,switch server " + this); + if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { + switchServerAsync(); + } + } else { + System.out.println( + "client is not running status ,ignore complete event , " + this + new Date().toString()); + } - switchServerAsync(); } }; + rSocket.onClose().subscribe(subscriber); } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigConnectionEventListener.java b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigConnectionEventListener.java index 5b55af79c93..7dd468ec960 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigConnectionEventListener.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigConnectionEventListener.java @@ -42,7 +42,8 @@ public void clientConnected(Connection connect) { @Override public void clientDisConnected(Connection connect) { String connectionId = connect.getConnectionId(); - Loggers.RPC.info("client disconnected,clear config listen context, connetionId is {}", connectionId); + Loggers.RPC.info("client disconnected,clear config listen context, connetionId is {},client ip is {}", + connectionId, connect.getMetaInfo().getClientIp()); configChangeListenContext.clearContextForConnectionId(connectionId); } diff --git a/console/src/main/resources/application.properties b/console/src/main/resources/application.properties index 944b955f2d5..f9379b169ab 100644 --- a/console/src/main/resources/application.properties +++ b/console/src/main/resources/application.properties @@ -59,6 +59,11 @@ server.servlet.contextPath=/nacos nacos.naming.empty-service.auto-clean=true nacos.naming.empty-service.clean.initial-delay-ms=50000 nacos.naming.empty-service.clean.period-time-ms=30000 +spring.datasource.platform=mysql +db.num=1 +db.url.0=jdbc:mysql://10.101.167.27:3306/acm?characterEncoding=utf8&connectTimeout=1000&socketTimeout=10000&autoReconnect=true +db.user=root +db.password=root #*************** CMDB Module Related Configurations ***************# diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java index 2afb0a0061c..a9d9cdcb06d 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java @@ -79,7 +79,8 @@ @Component(value = "serverMemberManager") public class ServerMemberManager implements ApplicationListener { - private final NacosAsyncRestTemplate asyncRestTemplate = HttpClientBeanHolder.getNacosAsyncRestTemplate(Loggers.CORE); + private final NacosAsyncRestTemplate asyncRestTemplate = HttpClientBeanHolder + .getNacosAsyncRestTemplate(Loggers.CORE); /** * Cluster node list. @@ -135,7 +136,8 @@ protected void init() throws NacosException { this.localAddress = InetUtils.getSelfIp() + ":" + port; this.self = MemberUtils.singleParse(this.localAddress); this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version); - this.self.setExtendVal(MemberMetaDataConstants.SUPPORT_REMOTE_C_TYPE, ConnectionType.RSOCKET.getType()); + this.self.setExtendVal(MemberMetaDataConstants.SUPPORT_REMOTE_C_TYPE, ApplicationUtils + .getProperty(MemberMetaDataConstants.SUPPORT_REMOTE_C_TYPE, ConnectionType.GRPC.getType())); serverList.put(self.getAddress(), self); // register NodeChangeEvent publisher to NotifyManager diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java b/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java index b4b4a139fa8..75e6008c041 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java @@ -97,7 +97,8 @@ private void refresh(List members) throws NacosException { while (iterator.hasNext()) { Map.Entry next1 = iterator.next(); if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) { - next1.getValue().shutdown(); + Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey()); + RpcClientFactory.getClient(next1.getKey()).shutdown(); iterator.remove(); } } @@ -111,16 +112,17 @@ private String memberClientKey(Member member) { private void createRpcClientAndStart(Member member, ConnectionType type) throws NacosException { Map labels = new HashMap(2); labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_NODE); - - RpcClient client = RpcClientFactory.createClusterClient(memberClientKey(member), type, labels); + String memberClientKey = memberClientKey(member); + RpcClient client = RpcClientFactory.createClusterClient(memberClientKey, type, labels); if (!client.getConnectionType().equals(type)) { - RpcClientFactory.destroyClient(memberClientKey(member)); - client = RpcClientFactory.createClusterClient(memberClientKey(member), type, labels); + Loggers.CLUSTER.info(",connection type changed,destroy client of member - > : {}", member); + RpcClientFactory.destroyClient(memberClientKey); + client = RpcClientFactory.createClusterClient(memberClientKey, type, labels); } if (client.isWaitInited()) { - Loggers.CLUSTER.info("create a new rpc client to member - > : {}", member); - + Loggers.CLUSTER.info("start a new rpc client to member - > : {}", member); + //one fixed server client.init(new ServerListFactory() { @Override diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/BaseRpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/BaseRpcServer.java index 23bc3209160..34b18a941e2 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/BaseRpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/BaseRpcServer.java @@ -93,7 +93,7 @@ public void run() { * Stop Server. * @throws excetpion throw if stop server fail. */ - public void stopServer() throws Exception { + public final void stopServer() throws Exception { shundownServer(); } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/core/ServerLoaderInfoRequestHandler.java b/core/src/main/java/com/alibaba/nacos/core/remote/core/ServerLoaderInfoRequestHandler.java index e62e2b03700..fb1b8b23e79 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/core/ServerLoaderInfoRequestHandler.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/core/ServerLoaderInfoRequestHandler.java @@ -23,11 +23,10 @@ import com.alibaba.nacos.api.remote.response.ServerLoaderInfoResponse; import com.alibaba.nacos.core.remote.ConnectionManager; import com.alibaba.nacos.core.remote.RequestHandler; +import com.alibaba.nacos.core.utils.ApplicationUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.lang.management.ManagementFactory; -import java.lang.management.OperatingSystemMXBean; import java.util.HashMap; import java.util.Map; @@ -52,18 +51,10 @@ public ServerLoaderInfoResponse handle(ServerLoaderInfoRequest request, RequestM serverLoaderInfoResponse .putMetricsValue("sdkConCount", String.valueOf(connectionManager.currentClientsCount(filter))); serverLoaderInfoResponse.putMetricsValue("countLimit", String.valueOf(connectionManager.countLimited())); - serverLoaderInfoResponse.putMetricsValue("cpuLoad", String.valueOf(getSystemCpuLoad())); + serverLoaderInfoResponse.putMetricsValue("load", String.valueOf(ApplicationUtils.getLoad())); + serverLoaderInfoResponse.putMetricsValue("cpu", String.valueOf(ApplicationUtils.getCPU())); return serverLoaderInfoResponse; } - public static double getSystemCpuLoad() { - - OperatingSystemMXBean osmxb = (OperatingSystemMXBean) ManagementFactory - - .getOperatingSystemMXBean(); - - return osmxb.getSystemLoadAverage(); - - } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java index 82f91e9ea32..5d51b4da3ee 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseGrpcServer.java @@ -170,7 +170,7 @@ private void addServices(MutableHandlerRegistry handlerRegistry, ServerIntercept @Override public void shundownServer() { if (server != null) { - server.shutdown(); + server.shutdownNow(); } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseRpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcClusterServer.java similarity index 94% rename from core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseRpcServer.java rename to core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcClusterServer.java index afd980c7dd7..df5812c2691 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/BaseRpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcClusterServer.java @@ -25,7 +25,7 @@ * @version $Id: BaseGrpcServer.java, v 0.1 2020年07月13日 3:42 PM liuzunfei Exp $ */ @Service -public class BaseRpcServer extends BaseGrpcServer { +public class GrpcClusterServer extends BaseGrpcServer { private static final int PORT_OFFSET = 1001; diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java index 17199df52ad..2656113bc15 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java @@ -72,11 +72,6 @@ public int rpcPortOffset() { return PORT_OFFSET; } - @Override - public void shundownServer() { - - } - @Override public void startServer() throws Exception { RSocketServer rSocketServerInner = RSocketServer.create(); @@ -218,7 +213,7 @@ public ConnectionType getConnectionType() { } @Override - public void stopServer() throws Exception { + public void shundownServer() { if (this.closeChannel != null && !closeChannel.isDisposed()) { this.closeChannel.dispose(); }