Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

notify connect listeners on start up #3405

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.alibaba.nacos.api.config.remote.request;

import java.util.HashMap;
import java.util.Map;

/**
* request to publish a config.
*
Expand All @@ -32,10 +35,35 @@ public class ConfigPublishRequest extends ConfigCommonRequest {

String content;

private Map<String, String> additonMap;

public ConfigPublishRequest() {

}

/**
* get additional param.
*
* @param key key of param.
* @return value of param ,return null if not exist.
*/
public String getAdditionParam(String key) {
return additonMap == null ? null : additonMap.get(key);
}

/**
* put additional param value. will override if exist.
*
* @param key key of param.
* @param value value of param.
*/
public void putAdditonalParam(String key, String value) {
if (additonMap == null) {
additonMap = new HashMap<String, String>();
}
additonMap.put(key, value);
}

public ConfigPublishRequest(String dataId, String group, String tenant, String content) {
this.content = content;
this.dataId = dataId;
Expand Down Expand Up @@ -119,4 +147,4 @@ public String getTenant() {
public void setTenant(String tenant) {
this.tenant = tenant;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -645,20 +645,20 @@ public void run() {
}, 1L, 10L, TimeUnit.MILLISECONDS);

} else {

rpcClientProxy.initAndStart(new ServerListFactory() {
@Override
public String genNextServer() {
ServerListManager serverListManager = agent.getServerListManager();
serverListManager.refreshCurrentServerAddr();
return serverListManager.getCurrentServerAddr();
}

@Override
public String getCurrentServer() {
return agent.getServerListManager().getCurrentServerAddr();
}
});

/*
* Register Listen Change Handler
*/
Expand Down
66 changes: 56 additions & 10 deletions client/src/main/java/com/alibaba/nacos/client/remote/RpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import com.alibaba.nacos.common.lifecycle.Closeable;
import org.slf4j.Logger;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

Expand All @@ -47,18 +50,53 @@ public abstract class RpcClient implements Closeable {
protected AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<RpcClientStatus>(
RpcClientStatus.WAIT_INIT);

protected ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.config.grpc.worker");
t.setDaemon(true);
return t;
}
});

/**
* Notify when client connected.
* Notify when client re connected.
*/
protected void notifyReConnected() {
if (!this.connectionEventListeners.isEmpty()) {
connectionEventListeners.forEach(new Consumer<ConnectionEventListener>() {
@Override
public void accept(ConnectionEventListener connectionEventListener) {
connectionEventListener.onReconnected();
executorService.schedule(new Runnable() {
@Override
public void run() {
if (!connectionEventListeners.isEmpty()) {
connectionEventListeners.forEach(new Consumer<ConnectionEventListener>() {
@Override
public void accept(ConnectionEventListener connectionEventListener) {
connectionEventListener.onReconnected();
}
});
}
});
}
}
}, 0, TimeUnit.MILLISECONDS);

}

/**
* Notify when client new connected.
*/
protected void notifyConnected() {
executorService.schedule(new Runnable() {
@Override
public void run() {
if (!connectionEventListeners.isEmpty()) {
connectionEventListeners.forEach(new Consumer<ConnectionEventListener>() {
@Override
public void accept(ConnectionEventListener connectionEventListener) {
connectionEventListener.onReconnected();
}
});
}
}
}, 0, TimeUnit.MILLISECONDS);
}

/**
Expand Down Expand Up @@ -110,6 +148,15 @@ public boolean isStarting() {
public RpcClient() {
}

/**
* Getter method for property <tt>connectionEventListeners</tt>.
*
* @return property value of connectionEventListeners
*/
protected List<ConnectionEventListener> getConnectionEventListeners() {
return connectionEventListeners;
}

/**
* init server list factory.
*
Expand Down Expand Up @@ -138,7 +185,6 @@ public RpcClient(ServerListFactory serverListFactory) {
/**
* Start this client.
*/
@PostConstruct
public abstract void start() throws NacosException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
Expand All @@ -66,20 +63,10 @@ public class GrpcClient extends RpcClient {

protected RequestGrpc.RequestBlockingStub grpcServiceStub;

ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.config.grpc.worker");
t.setDaemon(true);
return t;
}
});

/**
* Reconnect to current server before switch a new server.
*/
private static final int MAX_RECONNECT_TIMES = 5;
private static final int MAX_RECONNECT_TIMES = 3;

private AtomicInteger reConnectTimesLeft = new AtomicInteger(MAX_RECONNECT_TIMES);

Expand All @@ -102,35 +89,49 @@ private void tryConnectServer() {
getServerListFactory().getCurrentServer());
if (isRunning() || isInitStatus()) {
final RpcClientStatus prevStatus = rpcClientStatus.get();
boolean updateSucess = false;
if (isRunning()) {
updateSucess = rpcClientStatus.compareAndSet(prevStatus, RpcClientStatus.RE_CONNECTING);
} else {
updateSucess = rpcClientStatus.compareAndSet(prevStatus, RpcClientStatus.STARTING);
}
boolean updateSucess = rpcClientStatus.compareAndSet(prevStatus,
isInitStatus() ? RpcClientStatus.STARTING : RpcClientStatus.RE_CONNECTING);

if (updateSucess) {

if (isStarting()) {

buildClientAtFirstTime();
boolean sucess = serverCheck();
if (sucess) {
rpcClientStatus.compareAndSet(RpcClientStatus.STARTING, RpcClientStatus.RUNNING);
LOGGER.info("server check success, client start up success. ");
notifyConnected();
return;
} else {
rpcClientStatus.compareAndSet(RpcClientStatus.STARTING, RpcClientStatus.RE_CONNECTING);
}
}

executorService.schedule(new Runnable() {
@Override
public void run() {

// loop until start client success.
while (!isRunning()) {

buildClientAtFirstTime();
boolean sucess = serverCheck();
if (sucess) {
if (rpcClientStatus.get() == RpcClientStatus.RE_CONNECTING) {
notifyReConnected();
}
LOGGER.info("Server check success, Current Server is {}" + getServerListFactory()
.getCurrentServer());
notifyReConnected();
LOGGER.info("server check success, reconnected success, Current Server is {}"
+ getServerListFactory().getCurrentServer());
rpcClientStatus.compareAndSet(rpcClientStatus.get(), RpcClientStatus.RUNNING);
reConnectTimesLeft.set(MAX_RECONNECT_TIMES);
return;

} else {
int leftRetryTimes = reConnectTimesLeft.decrementAndGet();
if (leftRetryTimes <= 0) {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
//do nothing
}
getServerListFactory().genNextServer();
reConnectTimesLeft.set(MAX_RECONNECT_TIMES);
try {
Expand Down Expand Up @@ -177,7 +178,7 @@ public void start() throws NacosException {
public void run() {
sendBeat();
}
}, 0, 2000, TimeUnit.MILLISECONDS);
}, 0, 3000, TimeUnit.MILLISECONDS);

super.registerServerPushResponseHandler(new ServerPushResponseHandler() {
@Override
Expand All @@ -192,7 +193,6 @@ public void responseReply(Response response) {
}
}
});

}

/**
Expand All @@ -206,27 +206,32 @@ public void sendBeat() {
return;
}

GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.build();
HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(meta).setType(heartBeatRequest.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest)))
.build()).build();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta())
.setType(heartBeatRequest.getType()).setBody(
Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest)))
.build()).build();
GrpcResponse response = grpcServiceStub.request(streamRequest);
} catch (Exception e) {
LOGGER.error("Send heart beat error,will tring to reconnet to server ", e);
tryConnectServer();
}
}

private GrpcMetadata buildMeta() {
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.setVersion(ClientCommonUtils.VERSION).build();
return meta;
}

private boolean serverCheck() {
try {
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.build();

HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(meta).setType(heartBeatRequest.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest)))
.build()).build();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta())
.setType(heartBeatRequest.getType()).setBody(
Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest)))
.build()).build();
GrpcResponse response = grpcServiceStub.request(streamRequest);
return response != null;
} catch (Exception e) {
Expand Down Expand Up @@ -268,16 +273,14 @@ private void buildClient() throws NacosException {
}

LOGGER.info("GrpcClient start to connect to rpc server, serverIp={},port={}", serverIp, serverPort);
this.channel = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext(true).build();

this.channel = ManagedChannelBuilder.forAddress(serverIp, serverPort).usePlaintext().build();

grpcStreamServiceStub = RequestStreamGrpc.newStub(channel);

grpcServiceStub = RequestGrpc.newBlockingStub(channel);

GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.setVersion(ClientCommonUtils.VERSION).build();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(meta).build();

GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).build();

LOGGER.info("GrpcClient send stream request grpc server,streamRequest:{}", streamRequest);
grpcStreamServiceStub.requestStream(streamRequest, new StreamObserver<GrpcResponse>() {
Expand Down Expand Up @@ -325,15 +328,17 @@ public void onCompleted() {
@Override
public Response request(Request request) throws NacosException {

if (!this.isRunning()) {
throw new IllegalStateException("Client is not connected to any server now,please retry later");
}
try {
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.build();
GrpcRequest grpcrequest = GrpcRequest.newBuilder().setMetadata(meta).setType(request.getType())

GrpcRequest grpcrequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).setType(request.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(request)))).build();
GrpcResponse response = grpcServiceStub.request(grpcrequest);
String type = response.getType();
String bodyString = response.getBody().getValue().toStringUtf8();

// transfrom grpcResponse to response model
Class classByType = ResponseRegistry.getClassByType(type);
if (classByType != null) {
Expand Down
Loading