Skip to content

Commit

Permalink
optmize rpc client lifecycle ;request timeout to rpc client; some bug…
Browse files Browse the repository at this point in the history
…fix (#3797)

* optmize rpc client lifecycle and some bugfix

* get server load optimize; add request timeout to rpc client
  • Loading branch information
shiyiyue1102 authored Sep 10, 2020
1 parent 50b0427 commit 721791a
Show file tree
Hide file tree
Showing 18 changed files with 241 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ public void onConnected() {

@Override
public void onDisConnect() {
System.out.println("clear listen context...");
Collection<CacheData> values = cacheMap.get().values();

for (CacheData cacheData : values) {
Expand Down
15 changes: 9 additions & 6 deletions client/src/test/java/com/alibaba/nacos/client/ConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public class ConfigTest {
public void before() throws Exception {
Properties properties = new Properties();
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160..148:8848,127.0.0.1:8848,127.0.0.1:8848");
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848,11.160.144.149:8848");
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.149:8848,11.160.144.148:8848,127.0.0.1:8848");
//"11.239.114.187:8848,,11.239.113.204:8848,11.239.112.161:8848");
//"11.239.114.187:8848");
configService = NacosFactory.createConfigService(properties);
Expand Down Expand Up @@ -234,7 +234,7 @@ public void test() throws Exception {
Random random = new Random();
final String dataId = "xiaochun.xxc";
final String group = "xiaochun.xxc";
final String content = "lessspring-" + System.currentTimeMillis();
// final String content = "lessspring-" + System.currentTimeMillis();

Thread th = new Thread(new Runnable() {
@Override
Expand All @@ -244,10 +244,12 @@ public void run() {
int times = 1000;
while (times > 0) {
try {
configService.publishConfig(dataId, group, "value" + System.currentTimeMillis());
String content1 = "value" + System.currentTimeMillis();
System.out.println("publish content:" + content1);
configService.publishConfig(dataId, group, content1);

times--;
Thread.sleep(500L);
Thread.sleep(2000L);
} catch (Exception e) {
e.printStackTrace();
}
Expand All @@ -264,7 +266,8 @@ public void run() {
Listener listener = new AbstractListener() {
@Override
public void receiveConfigInfo(String configInfo) {
System.out.println("receiveConfigInfo1 :" + configInfo);
System.out.println("receiveConfigInfo1 content:" + configInfo + "," + System.currentTimeMillis());

}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public boolean isAbandon() {

/**
* Setter method for property <tt>abandon</tt>.
* connection event will be ignored if connection is abandoned.
*
* @param abandon value to be assigned to property abandon
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,13 @@ public abstract class RpcClient implements Closeable {

private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RpcClient.class);

protected static final long ACTIVE_INTERNAL = 3000L;

private ServerListFactory serverListFactory;

protected LinkedBlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<ConnectionEvent>();

protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<RpcClientStatus>(
RpcClientStatus.WAIT_INIT);

private long activeTimeStamp = System.currentTimeMillis();

protected ScheduledExecutorService executorService;

protected volatile Connection currentConnetion;
Expand Down Expand Up @@ -144,14 +140,6 @@ protected void notifyConnected() {
}
}

protected boolean overActiveTime() {
return System.currentTimeMillis() - this.activeTimeStamp > ACTIVE_INTERNAL;
}

protected void refereshActiveTimestamp() {
this.activeTimeStamp = System.currentTimeMillis();
}

/**
* check is this client is inited.
*
Expand All @@ -170,6 +158,15 @@ public boolean isRunning() {
return this.rpcClientStatus.get() == RpcClientStatus.RUNNING;
}

/**
* check is this client is shutdwon.
*
* @return
*/
public boolean isShutdwon() {
return this.rpcClientStatus.get() == RpcClientStatus.SHUTDOWN;
}

/**
* init server list factory.
*
Expand Down Expand Up @@ -199,8 +196,8 @@ public void initLabels(Map<String, String> labels) {
/**
* Start this client.
*/
public void start() throws NacosException {

public final void start() throws NacosException {
boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITED, RpcClientStatus.STARTING);
if (!success) {
return;
Expand Down Expand Up @@ -296,6 +293,25 @@ public Response requestReply(Request request) {
}

});
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
RpcClient.this.shutdown();
} catch (NacosException e) {
e.printStackTrace();
}

}
});

}

@Override
public void shutdown() throws NacosException {
executorService.shutdown();
rpcClientStatus.set(RpcClientStatus.SHUTDOWN);
closeConnection(currentConnetion);
}

private final ReentrantLock switchingLock = new ReentrantLock();
Expand All @@ -309,7 +325,7 @@ public void switchServerAsync() {
/**
* 1.判断当前是否正在重连中 2.如果正在重连中,则直接返回;如果不在重连中,则启动重连 3.重连逻辑:创建一个新的连接,如果连接可用
*/
protected void switchServerAsync(final ServerInfo serverInfoTryOnce) {
protected void switchServerAsync(final ServerInfo recommendServerInfo) {

//return if is in switching of other thread.
if (switchingFlag.get()) {
Expand All @@ -321,24 +337,24 @@ public void run() {

try {

AtomicReference<ServerInfo> serverInfoTryOnceInner = new AtomicReference<ServerInfo>(
serverInfoTryOnce);
AtomicReference<ServerInfo> recommendServer = new AtomicReference<ServerInfo>(recommendServerInfo);
//only one thread can execute switching meantime.
boolean innerLock = switchingLock.tryLock();
if (!innerLock) {
return;
}
switchingFlag.set(true);
switchingFlag.compareAndSet(false, true);
// loop until start client success.
boolean switchSuccess = false;
while (!switchSuccess) {

while (!switchSuccess && !isShutdwon()) {
//1.get a new server
ServerInfo serverInfo = null;
//2.create a new channel to new server
try {
serverInfo = serverInfoTryOnceInner.get() == null ? nextRpcServer()
: serverInfoTryOnceInner.get();
serverInfo = recommendServer.get() == null ? nextRpcServer() : recommendServer.get();
System.out.println(RpcClient.this.name + "trying to connect server:" + serverInfo);

Connection connectNew = connectToServer(serverInfo);
if (connectNew != null) {
System.out.println(RpcClient.this.name + "-success to connect server:" + serverInfo);
Expand All @@ -356,11 +372,15 @@ public void run() {
System.out.println(RpcClient.this.name + "-fail to connect server:" + serverInfo);
}

if (isShutdwon()) {
closeConnection(connectNew);
}

} catch (Exception e) {
System.out.println(RpcClient.this.name + "-fail to connect server:" + serverInfo
+ " ,error message is " + e.getMessage());
} finally {
serverInfoTryOnceInner.set(null);
recommendServer.set(null);
}

try {
Expand All @@ -372,7 +392,9 @@ public void run() {
}
}

System.out.println(RpcClient.this.name + "-success to connect server,return");
if (isShutdwon()) {
System.out.println(RpcClient.this.name + " is shutdown ,stop reconnect to server :");
}

} catch (Exception e) {
e.printStackTrace();
Expand All @@ -391,6 +413,10 @@ private void closeConnection(Connection connection) {
}
}

protected boolean connectionAbandon() {
return !(currentConnetion != null) && currentConnetion.isAbandon();
}

/**
* get connection type of this client.
*
Expand Down Expand Up @@ -428,24 +454,40 @@ public ServerInfo getCurrentServer() {
* @return
*/
public Response request(Request request) throws NacosException {
int retryTimes = 3;
return request(request, 3000L);
}

/**
* send request.
*
* @param request request.
* @return
*/
public Response request(Request request, long timeoutMills) throws NacosException {
int retryTimes = 3;
Response response = null;
Exception exceptionToThrow = null;
while (retryTimes > 0) {
try {
if (this.currentConnetion == null) {
if (this.currentConnetion == null || !isRunning()) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "client not connected.");
}
Response response = this.currentConnetion.request(request, buildMeta());
if (response != null && response instanceof ConnectionUnregisterResponse) {
synchronized (this) {
clearContextOnResetRequest();
switchServerAsync();
throw new IllegalStateException("Invalid client status.");
response = this.currentConnetion.request(request, buildMeta());

if (response != null) {
if (response instanceof ConnectionUnregisterResponse) {
synchronized (this) {
clearContextOnResetRequest();
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
throw new IllegalStateException("Invalid client status.");
}
} else {
return response;
}
}
refereshActiveTimestamp();
return response;

} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "Fail to send request,request={},errorMesssage={}", request,
e.getMessage());
Expand All @@ -454,6 +496,11 @@ public Response request(Request request) throws NacosException {
retryTimes--;

}

if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}

if (exceptionToThrow != null) {
throw new NacosException(SERVER_ERROR, exceptionToThrow);
}
Expand All @@ -476,7 +523,6 @@ public void asyncRequest(Request request, RequestCallBack callback) throws Nacos
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "client not connected.");
}
this.currentConnetion.asyncRequest(request, buildMeta(), callback);
refereshActiveTimestamp();
return;
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "Fail to send request,request={},errorMesssage={}", request,
Expand Down Expand Up @@ -522,6 +568,7 @@ public RequestFuture requestFuture(Request request) throws NacosException {
* @return response.
*/
protected Response handleServerRequest(final Request request) {

for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
Response response = serverRequestHandler.requestReply(request);
if (response != null) {
Expand All @@ -532,7 +579,7 @@ protected Response handleServerRequest(final Request request) {
}

/**
* register connection handler.will be notified wher inner connect changed.
* register connection handler.will be notified when inner connect changed.
*
* @param connectionEventListener connectionEventListener
*/
Expand All @@ -557,6 +604,24 @@ public void registerServerPushResponseHandler(ServerRequestHandler serverRequest
this.serverRequestHandlers.add(serverRequestHandler);
}

/**
* Getter method for property <tt>name</tt>.
*
* @return property value of name
*/
public String getName() {
return name;
}

/**
* Setter method for property <tt>name</tt>.
*
* @param name value to be assigned to property name
*/
public void setName(String name) {
this.name = name;
}

/**
* Getter method for property <tt>serverListFactory</tt>.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,10 @@ public static Set<Map.Entry<String, RpcClient>> getAllClientEntrys() {
* @param clientName client name.
*/
public static void destroyClient(String clientName) throws NacosException {
RpcClient rpcClient = clientMap.get(clientName);
RpcClient rpcClient = clientMap.remove(clientName);
if (rpcClient != null) {
rpcClient.shutdown();
}
clientMap.remove(clientName);
}

public static RpcClient getClient(String clientName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,17 @@ public enum RpcClientStatus {
/**
* running.
*/
RUNNING(3, "client is running...");
RUNNING(4, "client is running..."),

/**
* is in starting.
*/
UNHEALTHY(3, "client unhealthy,may closed by server,in rereconnecting"),

/**
* running.
*/
SHUTDOWN(5, "client is shutdown...");

int status;

Expand Down
Loading

0 comments on commit 721791a

Please sign in to comment.