From fac4879e9202ec54c66da9e10dece9b938feac2a Mon Sep 17 00:00:00 2001 From: "nov.lzf" Date: Fri, 11 Sep 2020 18:02:28 +0800 Subject: [PATCH] =?UTF-8?q?optimize=20dumpService=20warning=20on=20startin?= =?UTF-8?q?g=20up=20=EF=BC=9B=20async=20request=20suppport=20in=20cluster?= =?UTF-8?q?=20rpc=20client=20proxy.=20(#3808)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * optimize dumpService warning on starting up . * async request suppport in cluster rpc client proxy. * async request suppport in cluster rpc client proxy. --- .../nacos/api/remote/RemoteConstants.java | 2 +- .../alibaba/nacos/api/remote/Requester.java | 86 +++++++++++++++++++ .../com/alibaba/nacos/client/ConfigTest.java | 48 ++++++----- .../common/constant/HttpHeaderConsts.java | 2 +- .../common/remote/client/Connection.java | 75 ++++------------ .../nacos/common/remote/client/RpcClient.java | 24 +++--- .../remote/ConfigPublishRequestHandler.java | 4 + .../remote/RpcConfigChangeNotifier.java | 14 +-- .../service/dump/ExternalDumpService.java | 2 + .../cluster/remote/ClusterRpcClientProxy.java | 34 +++++++- .../alibaba/nacos/core/remote/Connection.java | 55 ++---------- .../nacos/core/remote/ConnectionManager.java | 20 +++-- .../nacos/core/remote/RpcPushService.java | 56 +++++++----- .../grpc/GrpcBiStreamRequestAcceptor.java | 21 ++++- .../core/remote/grpc/GrpcConnection.java | 71 ++++++++------- .../remote/rsocket/RsocketConnection.java | 63 +++++++------- .../core/remote/rsocket/RsocketRpcServer.java | 15 +++- .../cluster/remote/ClusterConnection.java | 21 +++-- pom.xml | 2 +- 19 files changed, 365 insertions(+), 250 deletions(-) create mode 100644 api/src/main/java/com/alibaba/nacos/api/remote/Requester.java diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/RemoteConstants.java b/api/src/main/java/com/alibaba/nacos/api/remote/RemoteConstants.java index a6e21e0a604..9016e86c72b 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/RemoteConstants.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/RemoteConstants.java @@ -31,7 +31,7 @@ public class RemoteConstants { public static final String LABEL_SOURCE_SDK = "sdk"; - public static final String LABEL_SOURCE_NODE = "node"; + public static final String LABEL_SOURCE_CLUSTER = "cluster"; public static final String LABEL_MODULE = "module"; diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/Requester.java b/api/src/main/java/com/alibaba/nacos/api/remote/Requester.java new file mode 100644 index 00000000000..dce473fdd7d --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/remote/Requester.java @@ -0,0 +1,86 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.remote; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.remote.request.Request; +import com.alibaba.nacos.api.remote.request.RequestMeta; +import com.alibaba.nacos.api.remote.response.Response; + +import java.util.Map; + +/** + * connection interface,define basic operation. + * + * @author liuzunfei + * @version $Id: Requester.java, v 0.1 2020年09月11日 4:05 PM liuzunfei Exp $ + */ +public interface Requester { + + /** + * send request. default time out 3 seconds. + * + * @param request request. + * @param requestMeta requestMeta. + * @return response. + * @throws NacosException exception throw. + */ + public Response request(Request request, RequestMeta requestMeta) throws NacosException; + + /** + * send request. + * + * @param request request. + * @param requestMeta requestMeta. + * @param timeoutMills mills of timeouts. + * @return response response returned. + * @throws NacosException exception throw. + */ + public Response request(Request request, RequestMeta requestMeta, long timeoutMills) throws NacosException; + + /** + * send request. + * + * @param request request. + * @param requestMeta meta of request. + * @return request future. + * @throws NacosException exception throw. + */ + public RequestFuture requestFuture(Request request, RequestMeta requestMeta) throws NacosException; + + /** + * send aync request. = * @param request request. + * + * @param requestMeta meta of request. + * @param requestCallBack callback of request. + * @throws NacosException exception throw. + */ + public void asyncRequest(Request request, RequestMeta requestMeta, RequestCallBack requestCallBack) + throws NacosException; + + /** + * get connection labels. + * + * @return labels. + */ + public Map getLabels(); + + /** + * close connection. + */ + public void close(); +} diff --git a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java index 4be25103ec8..c3f9edca4c0 100644 --- a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java @@ -54,9 +54,9 @@ public class ConfigTest { @Before public void before() throws Exception { Properties properties = new Properties(); - //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160..148:8848,127.0.0.1:8848,127.0.0.1:8848"); - //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); - properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.149:8848,11.160.144.148:8848,127.0.0.1:8848"); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848"); + //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848"); + //properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.149:8848,11.160.144.148:8848,127.0.0.1:8848"); //"11.239.114.187:8848,,11.239.113.204:8848,11.239.112.161:8848"); //"11.239.114.187:8848"); configService = NacosFactory.createConfigService(properties); @@ -66,8 +66,8 @@ public void before() throws Exception { @Test public void test222() throws Exception { Map labels = new HashMap(); - labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_NODE); - + labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_CLUSTER); + RpcClient client = RpcClientFactory.createClient("1234", ConnectionType.RSOCKET, labels); client.init(new ServerListFactory() { @Override @@ -181,21 +181,27 @@ public void cleanup() throws Exception { @Test public void test2() throws Exception { + final String dataId = "xiaochun.xxc"; + final String group = "xiaochun.xxc"; Properties properties = new Properties(); - properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848,11.160.144.149:8848"); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.149:8848"); //" List configServiceList = new ArrayList(); for (int i = 0; i < 300; i++) { ConfigService configService = NacosFactory.createConfigService(properties); - configService.addListener("test", "test", new AbstractListener() { + Listener listener = new AbstractListener() { @Override public void receiveConfigInfo(String configInfo) { - System.out.println("listener2:" + configInfo); + System.out.println( + "receiveConfigInfo1 content:" + (System.currentTimeMillis() - Long.valueOf(configInfo))); + } - }); - configServiceList.add(configService); + }; + + configService.addListener(dataId, group, listener); + System.out.println(configServiceList.size()); } System.out.println("2"); @@ -208,13 +214,10 @@ public void run() { int times = 10000; while (times > 0) { try { - System.out.println("3"); - - boolean result = configService - .publishConfig("test", "test", "value" + System.currentTimeMillis()); - + boolean result = configService.publishConfig(dataId, group, "" + System.currentTimeMillis()); + times--; - Thread.sleep(3000L); + Thread.sleep(1000L); } catch (Exception e) { e.printStackTrace(); @@ -223,7 +226,7 @@ public void run() { } }); - //th.start(); + th.start(); Thread.sleep(1000000L); } @@ -244,12 +247,12 @@ public void run() { int times = 1000; while (times > 0) { try { - String content1 = "value" + System.currentTimeMillis(); - System.out.println("publish content:" + content1); + String content1 = System.currentTimeMillis() + ""; + //System.out.println("publish content:" + content1); configService.publishConfig(dataId, group, content1); times--; - Thread.sleep(2000L); + Thread.sleep(1000L); } catch (Exception e) { e.printStackTrace(); } @@ -266,8 +269,9 @@ public void run() { Listener listener = new AbstractListener() { @Override public void receiveConfigInfo(String configInfo) { - System.out.println("receiveConfigInfo1 content:" + configInfo + "," + System.currentTimeMillis()); - + System.out.println( + "receiveConfigInfo1 content:" + (System.currentTimeMillis() - Long.valueOf(configInfo))); + } }; diff --git a/common/src/main/java/com/alibaba/nacos/common/constant/HttpHeaderConsts.java b/common/src/main/java/com/alibaba/nacos/common/constant/HttpHeaderConsts.java index 8f8f5bb06e8..80246ec2258 100644 --- a/common/src/main/java/com/alibaba/nacos/common/constant/HttpHeaderConsts.java +++ b/common/src/main/java/com/alibaba/nacos/common/constant/HttpHeaderConsts.java @@ -31,7 +31,7 @@ public interface HttpHeaderConsts { String ACCEPT_CHARSET = "Accept-Charset"; String ACCEPT_ENCODING = "Accept-Encoding"; String CONTENT_ENCODING = "Content-Encoding"; - String CONNECTION = "Connection"; + String CONNECTION = "Requester"; String REQUEST_ID = "RequestId"; String REQUEST_MODULE = "Request-Module"; diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java index 5e0d4e6a44e..d5ded39038e 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java @@ -16,12 +16,7 @@ package com.alibaba.nacos.common.remote.client; -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.api.remote.RequestCallBack; -import com.alibaba.nacos.api.remote.RequestFuture; -import com.alibaba.nacos.api.remote.request.Request; -import com.alibaba.nacos.api.remote.request.RequestMeta; -import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.api.remote.Requester; import java.util.HashMap; import java.util.Map; @@ -33,7 +28,7 @@ * @version $Id: Connection.java, v 0.1 2020年08月09日 1:32 PM liuzunfei Exp $ */ @SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") -public abstract class Connection { +public abstract class Connection implements Requester { private boolean abandon = false; @@ -45,18 +40,6 @@ public Connection(RpcClient.ServerInfo serverInfo) { this.serverInfo = serverInfo; } - public String getLabel(String labelKey) { - return labels.get(labelKey); - } - - public void putLabel(String labelKey, String labelValue) { - labels.put(labelKey, labelValue); - } - - public void putLabels(Map labels) { - labels.putAll(labels); - } - /** * Getter method for property abandon. * @@ -67,8 +50,7 @@ public boolean isAbandon() { } /** - * Setter method for property abandon. - * connection event will be ignored if connection is abandoned. + * Setter method for property abandon. connection event will be ignored if connection is abandoned. * * @param abandon value to be assigned to property abandon */ @@ -77,49 +59,28 @@ public void setAbandon(boolean abandon) { } /** - * send request. - * default time out 3 seconds. - * @param request request. - * @param requestMeta requestMeta. - * @return response. - * @throws NacosException exception throw. - */ - public abstract Response request(Request request, RequestMeta requestMeta) throws NacosException; - - /** - * send request. + * Getter method for property labels. * - * @param request request. - * @param requestMeta requestMeta. - * @param timeoutMills mills of timeouts. - * @return response response returned. - * @throws NacosException exception throw. + * @return property value of labels */ - public abstract Response request(Request request, RequestMeta requestMeta, long timeoutMills) throws NacosException; + @Override + public Map getLabels() { + return labels; + } /** - * send request. + * Setter method for property labels. * - * @param request request. - * @param requestMeta meta of request. - * @return request future. - * @throws NacosException exception throw. - */ - public abstract RequestFuture requestFuture(Request request, RequestMeta requestMeta) throws NacosException; - - /** - * send aync request. - = * @param request request. - * @param requestMeta meta of request. - * @param requestCallBack callback of request. - * @throws NacosException exception throw. + * @param labels value to be assigned to property labels */ - public abstract void asyncRequest(Request request, RequestMeta requestMeta, RequestCallBack requestCallBack) - throws NacosException; + public void putLabels(Map labels) { + this.labels = labels; + } /** - * close connection. + * Setter method for property labels. */ - public abstract void close(); - + public void putLabel(String labelName, String labelValue) { + this.labels.put(labelName, labelValue); + } } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java index 3ad0f89824a..26770ab63de 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java @@ -346,6 +346,9 @@ public void run() { switchingFlag.compareAndSet(false, true); // loop until start client success. boolean switchSuccess = false; + + int reConnectTimes = 0; + Exception lastException = null; while (!switchSuccess && !isShutdwon()) { //1.get a new server @@ -353,7 +356,6 @@ public void run() { //2.create a new channel to new server try { serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get(); - System.out.println(RpcClient.this.name + "trying to connect server:" + serverInfo); Connection connectNew = connectToServer(serverInfo); if (connectNew != null) { @@ -368,20 +370,27 @@ public void run() { boolean s = eventLinkedBlockingQueue .add(new ConnectionEvent(ConnectionEvent.CONNECTED)); return; - } else { - System.out.println(RpcClient.this.name + "-fail to connect server:" + serverInfo); } if (isShutdwon()) { closeConnection(connectNew); } + + lastException = null; } catch (Exception e) { - System.out.println(RpcClient.this.name + "-fail to connect server:" + serverInfo - + " ,error message is " + e.getMessage()); + lastException = e; } finally { recommendServer.set(null); } + + reConnectTimes++; + + if (reConnectTimes % 30 == 0) { + System.out.println( + RpcClient.this.name + "-fail to connect server,after trying " + reConnectTimes + + " times, last tryed server is " + serverInfo); + } try { //sleep 100 millsecond to switch next server. @@ -431,10 +440,6 @@ protected boolean connectionAbandon() { */ public abstract int rpcPortOffset(); - protected void clearContextOnResetRequest() { - // Default do nothing. - } - /** * get current server. * @@ -477,7 +482,6 @@ public Response request(Request request, long timeoutMills) throws NacosExceptio if (response != null) { if (response instanceof ConnectionUnregisterResponse) { synchronized (this) { - clearContextOnResetRequest(); if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { switchServerAsync(); } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigPublishRequestHandler.java b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigPublishRequestHandler.java index 4629e32997c..fdfff7a1708 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigPublishRequestHandler.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigPublishRequestHandler.java @@ -20,7 +20,10 @@ import com.alibaba.nacos.api.config.remote.response.ConfigPubishResponse; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.request.RequestMeta; +import com.alibaba.nacos.auth.annotation.Secured; +import com.alibaba.nacos.auth.common.ActionTypes; import com.alibaba.nacos.common.utils.MapUtils; +import com.alibaba.nacos.config.server.auth.ConfigResourceParser; import com.alibaba.nacos.config.server.model.ConfigInfo; import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent; import com.alibaba.nacos.config.server.service.AggrWhitelist; @@ -53,6 +56,7 @@ public class ConfigPublishRequestHandler extends RequestHandler { private static final ScheduledExecutorService ASYNC_CONFIG_CHANGE_NOTIFY_EXECUTOR = ExecutorFactory.Managed - .newScheduledExecutorService(ClassUtils.getCanonicalName(Config.class), 100, + .newScheduledExecutorService(ClassUtils.getCanonicalName(Config.class), + Runtime.getRuntime().availableProcessors() * 2, new NameThreadFactory("com.alibaba.nacos.config.server.remote.ConfigChangeNotifier")); public RpcConfigChangeNotifier() { @@ -101,7 +103,8 @@ public void configDataChanged(String groupKey, final ConfigChangeNotifyRequest n } } - Loggers.RPC.info("push {} clients ,groupKey={}", clients == null ? 0 : notifyCount, groupKey); + Loggers.RPC.info("push {} clients ,groupKey={},queue size={}", clients == null ? 0 : notifyCount, groupKey, + ((ScheduledThreadPoolExecutor) ASYNC_CONFIG_CHANGE_NOTIFY_EXECUTOR).getQueue().size()); } @Override @@ -151,7 +154,7 @@ public boolean isOverTimes() { @Override public void run() { - rpcPushService.pushWithCallback(clientId, notifyRequet, new AbstractPushCallBack(500L) { + rpcPushService.pushWithCallback(clientId, notifyRequet, new AbstractPushCallBack(3000L) { int retryTimes = tryTimes; @Override @@ -163,14 +166,13 @@ public void onSuccess() { @Override public void onFail(Throwable e) { - Loggers.CORE.error("On failt ", e); Loggers.CORE.warn("push fail.dataId={},group={},tenant={},clientId={},tryTimes={}", notifyRequet.getDataId(), notifyRequet.getGroup(), notifyRequet.getTenant(), clientId, retryTimes); push(RpcPushTask.this); } - + }, ASYNC_CONFIG_CHANGE_NOTIFY_EXECUTOR); tryTimes++; diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/ExternalDumpService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/ExternalDumpService.java index cf5abf63405..64b84982fc2 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/ExternalDumpService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/ExternalDumpService.java @@ -20,6 +20,7 @@ import com.alibaba.nacos.config.server.service.repository.PersistService; import com.alibaba.nacos.core.cluster.ServerMemberManager; import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @@ -31,6 +32,7 @@ */ @Conditional(ConditionOnExternalStorage.class) @Component +@DependsOn({"rpcConfigChangeNotifier"}) public class ExternalDumpService extends DumpService { /** diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java b/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java index 75e6008c041..83f4c8cf5cf 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/remote/ClusterRpcClientProxy.java @@ -18,6 +18,7 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.RemoteConstants; +import com.alibaba.nacos.api.remote.RequestCallBack; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.common.notify.NotifyCenter; @@ -111,7 +112,7 @@ private String memberClientKey(Member member) { private void createRpcClientAndStart(Member member, ConnectionType type) throws NacosException { Map labels = new HashMap(2); - labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_NODE); + labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_CLUSTER); String memberClientKey = memberClientKey(member); RpcClient client = RpcClientFactory.createClusterClient(memberClientKey, type, labels); if (!client.getConnectionType().equals(type)) { @@ -154,15 +155,44 @@ public List getServerList() { * @throws NacosException exception may throws. */ public Response sendRequest(Member member, Request request) throws NacosException { + return sendRequest(member, request, 3000L); + } + + /** + * send request to member. + * + * @param member member of server. + * @param request request. + * @return Response response. + * @throws NacosException exception may throws. + */ + public Response sendRequest(Member member, Request request, long timeoutMills) throws NacosException { RpcClient client = RpcClientFactory.getClient(memberClientKey(member)); if (client != null) { - Response response = client.request(request); + Response response = client.request(request, timeoutMills); return response; } else { throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member); } } + /** + * aync send request to member with callback. + * + * @param member member of server. + * @param request request. + * @param callBack RequestCallBack. + * @throws NacosException exception may throws. + */ + public void asyncRequest(Member member, Request request, RequestCallBack callBack) throws NacosException { + RpcClient client = RpcClientFactory.getClient(memberClientKey(member)); + if (client != null) { + client.asyncRequest(request, callBack); + } else { + throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member); + } + } + /** * send request to member. * diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/Connection.java b/core/src/main/java/com/alibaba/nacos/core/remote/Connection.java index 743d97e63bb..464d9451f38 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/Connection.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/Connection.java @@ -16,12 +16,8 @@ package com.alibaba.nacos.core.remote; -import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.RemoteConstants; -import com.alibaba.nacos.api.remote.RequestCallBack; -import com.alibaba.nacos.api.remote.RequestFuture; -import com.alibaba.nacos.api.remote.request.Request; -import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.api.remote.Requester; import org.apache.commons.lang3.builder.ToStringBuilder; import java.util.Map; @@ -33,7 +29,7 @@ * @version $Id: Connection.java, v 0.1 2020年07月13日 7:08 PM liuzunfei Exp $ */ @SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") -public abstract class Connection { +public abstract class Connection implements Requester { private final ConnectionMetaInfo metaInfo; @@ -41,48 +37,6 @@ public Connection(ConnectionMetaInfo metaInfo) { this.metaInfo = metaInfo; } - /** - * Send response to this client that associated to this connection. - * - * @param request request. - * @param timeoutMills timeoutMills. - * @return Response resonse. - * @throws NacosException exception may throw. - */ - public abstract Response sendRequest(Request request, long timeoutMills) throws NacosException; - - /** - * Send response to this client that associated to this connection. - * - * @param request request. - * @throws NacosException exception may throw. - */ - public abstract void sendRequestNoAck(Request request) throws NacosException; - - /** - * Send response to this client that associated to this connection. - * - * @param request request. - * @return future of request. - * @throws NacosException exception may throw. - */ - public abstract RequestFuture sendRequestWithFuture(Request request) throws NacosException; - - /** - * Send response to this client that associated to this connection. - * - * @param request request. - * @param callBack call back. - * @throws NacosException exception may throw. - */ - public abstract void sendRequestWithCallBack(Request request, RequestCallBack callBack) - throws NacosException; - - /** - * Close this connection, if this connection is not active yet. - */ - public abstract void closeGrapcefully(); - /** * Update last Active Time to now. */ @@ -99,6 +53,11 @@ public long getLastActiveTimestamp() { return metaInfo.lastActiveTime; } + /** + * get connection Id. + * + * @return + */ public String getConnectionId() { return metaInfo.connectionId; } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java index c4ca558150c..ea53d5e905e 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java @@ -19,8 +19,11 @@ import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.remote.RpcScheduledExecutor; import com.alibaba.nacos.api.remote.request.ConnectResetRequest; +import com.alibaba.nacos.api.remote.request.RequestMeta; +import com.alibaba.nacos.api.utils.NetUtils; import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException; import com.alibaba.nacos.common.utils.StringUtils; +import com.alibaba.nacos.common.utils.VersionUtils; import com.alibaba.nacos.core.monitor.MetricsMonitor; import com.alibaba.nacos.core.utils.Loggers; import org.springframework.beans.factory.annotation.Autowired; @@ -55,8 +58,6 @@ public class ConnectionManager { String redirectAddress = null; - private static final long EXPIRE_MILLSECOND = 10000L; - @Autowired private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry; @@ -94,7 +95,7 @@ public void register(String connectionId, Connection connection) { public void unregister(String connectionId) { Connection remove = this.connetions.remove(connectionId); if (remove != null) { - remove.closeGrapcefully(); + remove.close(); Loggers.RPC.info(" connection unregistered successfully,connectionid = {} ", connectionId); clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); } @@ -187,7 +188,7 @@ public void run() { connectResetRequest.setServerIp(split[0]); connectResetRequest.setServerPort(split[1]); } - connection.sendRequestNoAck(connectResetRequest); + connection.request(connectResetRequest, buildMeta()); Loggers.RPC .info("expel connection ,send switch server response connectionid = {},connectResetRequest={} ", expeledClientId, connectResetRequest); @@ -214,6 +215,13 @@ public void run() { } + private RequestMeta buildMeta() { + RequestMeta meta = new RequestMeta(); + meta.setClientVersion(VersionUtils.getFullClientVersion()); + meta.setClientIp(NetUtils.localIP()); + return meta; + } + public void coordinateMaxClientsSmoth(int maxClient) { this.maxClient = maxClient; } @@ -240,7 +248,7 @@ public void loadSingle(String connectionId, String redirectAddress) { connectResetRequest.setServerPort(split[1]); } try { - connection.sendRequestNoAck(connectResetRequest); + connection.request(connectResetRequest, buildMeta()); } catch (ConnectionAlreadyClosedException e) { unregister(connectionId); } catch (Exception e) { @@ -297,7 +305,7 @@ public void expelAll() { for (Map.Entry entry : connetions.entrySet()) { Connection client = entry.getValue(); try { - client.sendRequestNoAck(new ConnectResetRequest()); + client.request(new ConnectResetRequest(), buildMeta()); } catch (Exception e) { //Do Nothing. } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java b/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java index 87918a6d1f5..029a8ba7b7c 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java @@ -18,10 +18,13 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.AbstractRequestCallBack; +import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.request.ServerPushRequest; import com.alibaba.nacos.api.remote.response.PushCallBack; import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.api.utils.NetUtils; import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException; +import com.alibaba.nacos.common.utils.VersionUtils; import com.alibaba.nacos.core.utils.Loggers; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -52,27 +55,29 @@ public void pushWithCallback(String connectionId, ServerPushRequest request, Pus Connection connection = connectionManager.getConnection(connectionId); if (connection != null) { try { - connection.sendRequestWithCallBack(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) { - - @Override - public Executor getExcutor() { - return executor; - } - - @Override - public void onResponse(Response response) { - if (response.isSuccess()) { - requestCallBack.onSuccess(); - } else { - requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage())); - } - } - - @Override - public void onException(Throwable e) { - requestCallBack.onFail(e); - } - }); + connection + .asyncRequest(request, buildMeta(), new AbstractRequestCallBack(requestCallBack.getTimeout()) { + + @Override + public Executor getExcutor() { + return executor; + } + + @Override + public void onResponse(Response response) { + if (response.isSuccess()) { + requestCallBack.onSuccess(); + } else { + requestCallBack + .onFail(new NacosException(response.getErrorCode(), response.getMessage())); + } + } + + @Override + public void onException(Throwable e) { + requestCallBack.onFail(e); + } + }); } catch (ConnectionAlreadyClosedException e) { connectionManager.unregister(connectionId); requestCallBack.onSuccess(); @@ -96,7 +101,7 @@ public void pushWithoutAck(String connectionId, ServerPushRequest request) { Connection connection = connectionManager.getConnection(connectionId); if (connection != null) { try { - connection.sendRequestNoAck(request); + connection.request(request, buildMeta()); } catch (ConnectionAlreadyClosedException e) { connectionManager.unregister(connectionId); } catch (Exception e) { @@ -107,4 +112,11 @@ public void pushWithoutAck(String connectionId, ServerPushRequest request) { } } + private RequestMeta buildMeta() { + RequestMeta meta = new RequestMeta(); + meta.setClientVersion(VersionUtils.getFullClientVersion()); + meta.setClientIp(NetUtils.localIP()); + return meta; + } + } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcBiStreamRequestAcceptor.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcBiStreamRequestAcceptor.java index cc2680b5850..4c05a7a3666 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcBiStreamRequestAcceptor.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcBiStreamRequestAcceptor.java @@ -21,13 +21,17 @@ import com.alibaba.nacos.api.grpc.auto.Payload; import com.alibaba.nacos.api.remote.request.ConnectResetRequest; import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest; +import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.api.utils.NetUtils; import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.remote.GrpcUtils; +import com.alibaba.nacos.common.utils.VersionUtils; import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.ConnectionManager; import com.alibaba.nacos.core.remote.ConnectionMetaInfo; import com.alibaba.nacos.core.remote.RpcAckCallbackSynchronizer; +import com.alibaba.nacos.core.utils.Loggers; import io.grpc.Context; import io.grpc.stub.StreamObserver; import org.springframework.beans.factory.annotation.Autowired; @@ -38,6 +42,7 @@ /** * grpc bi stream request . + * * @author liuzunfei * @version $Id: GrpcBiStreamRequest.java, v 0.1 2020年09月01日 10:41 PM liuzunfei Exp $ */ @@ -64,13 +69,13 @@ public void onNext(Payload payload) { String version = metadata.getClientVersion(); ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(connectionId, clientIp, ConnectionType.GRPC.getType(), version, metadata.getLabelsMap()); - + Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get()); if (connectionManager.isOverLimit()) { //Not register to the connection manager if current server is over limit. try { - connection.sendRequestNoAck(new ConnectResetRequest()); - connection.closeGrapcefully(); + connection.request(new ConnectResetRequest(), buildMeta()); + connection.close(); } catch (Exception e) { //Do nothing. } @@ -78,7 +83,9 @@ public void onNext(Payload payload) { connectionManager.register(connectionId, connection); } } else if (plainRequest.getBody() instanceof Response) { + Response response = (Response) plainRequest.getBody(); + Loggers.RPC_DIGEST.debug(String.format("[%s] response receive :%s ", "grpc", response.toString())); RpcAckCallbackSynchronizer.ackNotify(connectionId, response); } @@ -97,4 +104,12 @@ public void onCompleted() { return streamObserver; } + + private RequestMeta buildMeta() { + RequestMeta meta = new RequestMeta(); + meta.setClientVersion(VersionUtils.getFullClientVersion()); + meta.setClientIp(NetUtils.localIP()); + return meta; + } + } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java index 6f1454104a3..4f5bf415616 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java @@ -22,6 +22,7 @@ import com.alibaba.nacos.api.remote.RequestCallBack; import com.alibaba.nacos.api.remote.RequestFuture; import com.alibaba.nacos.api.remote.request.Request; +import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.api.utils.NetUtils; import com.alibaba.nacos.common.remote.GrpcUtils; @@ -35,6 +36,8 @@ import io.grpc.netty.shaded.io.netty.channel.Channel; import io.grpc.stub.StreamObserver; +import java.util.Map; + /** * grpc connection. * @@ -53,22 +56,9 @@ public GrpcConnection(ConnectionMetaInfo metaInfo, StreamObserver streamObserver this.channel = channel; } - @Override - public Response sendRequest(Request request, long timeoutMills) throws NacosException { - DefaultRequestFuture pushFuture = (DefaultRequestFuture) sendRequestWithFuture(request); + private void sendRequestNoAck(Request request, RequestMeta meta) throws NacosException { try { - return pushFuture.get(timeoutMills); - } catch (Exception e) { - throw new NacosException(NacosException.SERVER_ERROR, e); - } finally { - RpcAckCallbackSynchronizer.clearFuture(getConnectionId(), pushFuture.getRequestId()); - } - } - - @Override - public void sendRequestNoAck(Request request) throws NacosException { - try { - streamObserver.onNext(GrpcUtils.convert(request, buildMeta(request.getClass().getName()))); + streamObserver.onNext(GrpcUtils.convert(request, meta)); } catch (Exception e) { if (e instanceof StatusRuntimeException) { throw new ConnectionAlreadyClosedException(e); @@ -83,21 +73,12 @@ Metadata buildMeta(String type) { return meta; } - @Override - public RequestFuture sendRequestWithFuture(Request request) throws NacosException { - return sendRequestInner(request, null); - } - - @Override - public void sendRequestWithCallBack(Request request, RequestCallBack callBack) throws NacosException { - sendRequestInner(request, callBack); - } - - private DefaultRequestFuture sendRequestInner(Request request, RequestCallBack callBack) throws NacosException { + private DefaultRequestFuture sendRequestInner(Request request, RequestMeta meta, RequestCallBack callBack) + throws NacosException { Loggers.RPC_DIGEST.debug(String.format("[%s] send request : %s", "grpc", request.toString())); String requestId = String.valueOf(PushAckIdGenerator.getNextId()); request.setRequestId(requestId); - sendRequestNoAck(request); + sendRequestNoAck(request, meta); DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(this.getConnectionId(), requestId, callBack, new DefaultRequestFuture.TimeoutInnerTrigger() { @Override @@ -110,12 +91,44 @@ public void triggerOnTimeout() { } @Override - public void closeGrapcefully() { + public Response request(Request request, RequestMeta requestMeta) throws NacosException { + return request(request, requestMeta, 3000L); + } + + @Override + public Response request(Request request, RequestMeta requestMeta, long timeoutMills) throws NacosException { + DefaultRequestFuture pushFuture = (DefaultRequestFuture) sendRequestInner(request, requestMeta, null); + try { + return pushFuture.get(timeoutMills); + } catch (Exception e) { + throw new NacosException(NacosException.SERVER_ERROR, e); + } finally { + RpcAckCallbackSynchronizer.clearFuture(getConnectionId(), pushFuture.getRequestId()); + } + } + + @Override + public RequestFuture requestFuture(Request request, RequestMeta requestMeta) throws NacosException { + return sendRequestInner(request, requestMeta, null); + } + + @Override + public void asyncRequest(Request request, RequestMeta requestMeta, RequestCallBack requestCallBack) + throws NacosException { + sendRequestInner(request, requestMeta, requestCallBack); + } + + @Override + public Map getLabels() { + return null; + } + + @Override + public void close() { try { streamObserver.onCompleted(); } catch (Exception e) { Loggers.RPC.debug(String.format("[%s] connection close exception : %s", "grpc", e.getMessage())); } } - } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java index 13a18b5993d..37053529a6e 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketConnection.java @@ -23,9 +23,7 @@ import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.Response; -import com.alibaba.nacos.api.utils.NetUtils; import com.alibaba.nacos.common.remote.RsocketUtils; -import com.alibaba.nacos.common.utils.VersionUtils; import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.ConnectionMetaInfo; import com.alibaba.nacos.core.utils.Loggers; @@ -34,6 +32,7 @@ import reactor.core.publisher.Mono; import java.time.Duration; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -58,14 +57,30 @@ public RsocketConnection(ConnectionMetaInfo metaInfo, RSocket clientSocket) { this.clientSocket = clientSocket; } + private static CompletableFuture failAfter(final long timeouts) { + final CompletableFuture promise = new CompletableFuture(); + RpcScheduledExecutor.TIMEOUT_SHEDULER.schedule(new Callable() { + @Override + public Object call() throws Exception { + final TimeoutException ex = new TimeoutException("Timeout after " + timeouts); + return promise.completeExceptionally(ex); + } + }, timeouts, MILLISECONDS); + return promise; + } + @Override - public Response sendRequest(Request request, long timeoutMills) throws NacosException { + public Response request(Request request, RequestMeta requestMeta) throws NacosException { + return request(request, requestMeta, 3000L); + } + @Override + public Response request(Request request, RequestMeta requestMeta, long timeoutMills) throws NacosException { Loggers.RPC_DIGEST.debug(String.format("[%s] send request : %s", "rsocket", request)); try { Mono payloadMono = clientSocket - .requestResponse(RsocketUtils.convertRequestToPayload(request, buildMeta())); + .requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta)); Payload block = payloadMono.block(Duration.ofMillis(timeoutMills)); return RsocketUtils.parseResponseFromPayload(block); } catch (Exception e) { @@ -74,23 +89,10 @@ public Response sendRequest(Request request, long timeoutMills) throws NacosExce } @Override - public void sendRequestNoAck(Request request) throws NacosException { - Loggers.RPC_DIGEST.debug(String.format("[%s] send no ack request : %s", "rsocket", request)); - clientSocket.fireAndForget(RsocketUtils.convertRequestToPayload(request, buildMeta())).block(); - } - - private RequestMeta buildMeta() { - RequestMeta meta = new RequestMeta(); - meta.setClientVersion(VersionUtils.getFullClientVersion()); - meta.setClientIp(NetUtils.localIP()); - return meta; - } - - @Override - public RequestFuture sendRequestWithFuture(Request request) throws NacosException { + public RequestFuture requestFuture(Request request, RequestMeta requestMeta) throws NacosException { Loggers.RPC_DIGEST.debug(String.format("[%s] send future request : %s", "rsocket", request)); final Mono payloadMono = clientSocket - .requestResponse(RsocketUtils.convertRequestToPayload(request, buildMeta())); + .requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta)); final CompletableFuture payloadCompletableFuture = payloadMono.toFuture(); RequestFuture defaultPushFuture = new RequestFuture() { @@ -116,14 +118,14 @@ public Response get(long timeoutMills) throws TimeoutException, InterruptedExcep } @Override - public void sendRequestWithCallBack(Request request, RequestCallBack requestCallBack) throws NacosException { - + public void asyncRequest(Request request, RequestMeta requestMeta, RequestCallBack requestCallBack) + throws NacosException { Loggers.RPC_DIGEST.debug(String.format("[%s] send callback request : %s", "rsocket", request)); try { Mono response = clientSocket - .requestResponse(RsocketUtils.convertRequestToPayload(request, buildMeta())); - + .requestResponse(RsocketUtils.convertRequestToPayload(request, requestMeta)); + response.toFuture().acceptEither(failAfter(requestCallBack.getTimeout()), new Consumer() { @Override public void accept(Payload payload) { @@ -139,20 +141,13 @@ public void accept(Payload payload) { } } - private static CompletableFuture failAfter(final long timeouts) { - final CompletableFuture promise = new CompletableFuture(); - RpcScheduledExecutor.TIMEOUT_SHEDULER.schedule(new Callable() { - @Override - public Object call() throws Exception { - final TimeoutException ex = new TimeoutException("Timeout after " + timeouts); - return promise.completeExceptionally(ex); - } - }, timeouts, MILLISECONDS); - return promise; + @Override + public Map getLabels() { + return null; } @Override - public void closeGrapcefully() { + public void close() { if (clientSocket != null && !clientSocket.isDisposed()) { clientSocket.dispose(); } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java index 2656113bc15..737b01e0db9 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/rsocket/RsocketRpcServer.java @@ -22,8 +22,10 @@ import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.PlainBodyResponse; import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.api.utils.NetUtils; import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.remote.RsocketUtils; +import com.alibaba.nacos.common.utils.VersionUtils; import com.alibaba.nacos.core.remote.BaseRpcServer; import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.ConnectionManager; @@ -106,8 +108,8 @@ public void startServer() throws Exception { if (connectionManager.isOverLimit()) { //Not register to the connection manager if current server is over limit. try { - connection.sendRequestNoAck(new ConnectResetRequest()); - connection.closeGrapcefully(); + connection.request(new ConnectResetRequest(), buildMeta()); + connection.close(); } catch (Exception e) { //Do nothing. } @@ -146,7 +148,7 @@ public void onComplete() { connectionManager.unregister(connectionId); } }); - + RSocketProxy rSocketProxy = new NacosRsocket(sendingSocket, connectionid); return Mono.just(rSocketProxy); @@ -218,4 +220,11 @@ public void shundownServer() { this.closeChannel.dispose(); } } + + private RequestMeta buildMeta() { + RequestMeta meta = new RequestMeta(); + meta.setClientVersion(VersionUtils.getFullClientVersion()); + meta.setClientIp(NetUtils.localIP()); + return meta; + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterConnection.java b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterConnection.java index 06a22d3b705..92d0ffe4f02 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterConnection.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/cluster/remote/ClusterConnection.java @@ -20,10 +20,13 @@ import com.alibaba.nacos.api.remote.RequestCallBack; import com.alibaba.nacos.api.remote.RequestFuture; import com.alibaba.nacos.api.remote.request.Request; +import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.ConnectionMetaInfo; +import java.util.Map; + /** * Cluster connection. * @@ -36,25 +39,33 @@ public ClusterConnection(ConnectionMetaInfo metaInfo) { } @Override - public Response sendRequest(Request request, long timeoutMills) throws NacosException { + public Response request(Request request, RequestMeta requestMeta) throws NacosException { return null; } @Override - public void sendRequestNoAck(Request request) throws NacosException { + public Response request(Request request, RequestMeta requestMeta, long timeoutMills) throws NacosException { + return null; } @Override - public RequestFuture sendRequestWithFuture(Request request) throws NacosException { + public RequestFuture requestFuture(Request request, RequestMeta requestMeta) throws NacosException { return null; } @Override - public void sendRequestWithCallBack(Request request, RequestCallBack callBack) throws NacosException { + public void asyncRequest(Request request, RequestMeta requestMeta, RequestCallBack requestCallBack) + throws NacosException { + + } + @Override + public Map getLabels() { + return null; } @Override - public void closeGrapcefully() { + public void close() { + } } diff --git a/pom.xml b/pom.xml index b0b3df32539..49ec9a5da90 100644 --- a/pom.xml +++ b/pom.xml @@ -150,7 +150,7 @@ 2.10.4 1.9.13 0.11.2 - 4.1.42.Final + 4.1.51.Final 2.0.0-RC1 24.1.1-jre