Skip to content

Commit

Permalink
fix different client worker share same localsnapshot bug ,add propert…
Browse files Browse the repository at this point in the history
…ies to control whether load snaoshot content on startup in CacheData
  • Loading branch information
shiyiyue1102 committed Apr 20, 2022
1 parent a9e2cbd commit 8294673
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 75 deletions.
4 changes: 4 additions & 0 deletions api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ public class PropertyKeyConst {

public static final String ENDPOINT = "endpoint";

public static final String ENDPOINT_QUERY_PARAMS = "endpointQueryParams";

public static final String ENDPOINT_PORT = "endpointPort";

public static final String SERVER_NAME = "serverName";

public static final String NAMESPACE = "namespace";

public static final String USERNAME = "username";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
*/
public class CacheData {

private static final Logger LOGGER = LogUtils.logger(CacheData.class);

static final int CONCURRENCY = 5;

static ThreadFactory internalNotifierFactory = r -> {
Expand All @@ -56,11 +58,16 @@ public class CacheData {
return t;
};

static boolean initSnapshot;

static {
initSnapshot = Boolean.valueOf(System.getProperty("nacos.cache.data.init.snapshot", "true"));
LOGGER.info("nacos.cache.data.init.snapshot = {} ", initSnapshot);
}

static final ThreadPoolExecutor INTERNAL_NOTIFIER = new ThreadPoolExecutor(0, CONCURRENCY, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), internalNotifierFactory);

private static final Logger LOGGER = LogUtils.logger(CacheData.class);

private final String name;

private final ConfigFilterChainManager configFilterChainManager;
Expand Down Expand Up @@ -318,14 +325,14 @@ private void safeNotifyListener(final String dataId, final String group, final S
}

listenerWrap.lastCallMd5 = md5;
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name,
dataId, group, md5, listener, (System.currentTimeMillis() - start));
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name, dataId,
group, md5, listener, (System.currentTimeMillis() - start));
} catch (NacosException ex) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,
dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
group, md5, listener, t.getCause());
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,
md5, listener, t.getCause());
} finally {
listenerWrap.inNotifying = false;
Thread.currentThread().setContextClassLoader(myClassLoader);
Expand Down Expand Up @@ -395,8 +402,10 @@ public CacheData(ConfigFilterChainManager configFilterChainManager, String name,
this.tenant = TenantUtil.getUserTenantForAcm();
listeners = new CopyOnWriteArrayList<>();
this.isInitializing = true;
this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant);
this.md5 = getMd5String(content);
if (initSnapshot) {
this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant);
this.md5 = getMd5String(content);
}
this.encryptedDataKey = loadEncryptedDataKeyFromDiskLocal(name, dataId, group, tenant);
}

Expand All @@ -412,8 +421,10 @@ public CacheData(ConfigFilterChainManager configFilterChainManager, String name,
this.tenant = tenant;
listeners = new CopyOnWriteArrayList<>();
this.isInitializing = true;
this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant);
this.md5 = getMd5String(content);
if (initSnapshot) {
this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant);
this.md5 = getMd5String(content);
}
}

// ==================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ public void startInternal() {

@Override
public String getName() {
return "config_rpc_client";
return serverListManager.getName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,28 +132,28 @@ public ServerListManager(List<String> fixed, String namespace) {
for (String serverAddr : fixed) {
String[] serverAddrArr = InternetAddressUtil.splitIPPortStr(serverAddr);
if (serverAddrArr.length == 1) {
serverAddrs.add(serverAddrArr[0] + InternetAddressUtil.IP_PORT_SPLITER + ParamUtil.getDefaultServerPort());
serverAddrs
.add(serverAddrArr[0] + InternetAddressUtil.IP_PORT_SPLITER + ParamUtil.getDefaultServerPort());
} else {
serverAddrs.add(serverAddr);
}
}
this.serverUrls = new ArrayList<>(serverAddrs);
if (StringUtils.isBlank(namespace)) {
this.name = FIXED_NAME + "-" + getFixedNameSuffix(serverAddrs.toArray(new String[serverAddrs.size()]));
} else {
if (StringUtils.isNotBlank(namespace)) {
this.namespace = namespace;
this.name = FIXED_NAME + "-" + getFixedNameSuffix(serverAddrs.toArray(new String[serverAddrs.size()])) + "-"
+ namespace;
this.tenant = namespace;
}
this.name = initServerName(null);
}

public ServerListManager(String host, int port) {
this.isFixed = false;
this.isStarted = false;
this.name = CUSTOM_NAME + "-" + host + "-" + port;
this.addressServerUrl = String
.format("http://%s:%d%s/%s", host, port, ContextPathUtil.normalizeContextPath(this.contentPath),
this.serverListName);
this.endpoint = host;
this.endpointPort = port;

this.name = initServerName(null);
initAddressServerUrl(null);
}

public ServerListManager(String endpoint) throws NacosException {
Expand All @@ -165,25 +165,18 @@ public ServerListManager(String endpoint, String namespace) throws NacosExceptio
this.isStarted = false;
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.ENDPOINT, endpoint);
endpoint = initEndpoint(properties);
this.endpoint = initEndpoint(properties);

if (StringUtils.isBlank(endpoint)) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank");
}
if (StringUtils.isBlank(namespace)) {
this.name = endpoint;
this.addressServerUrl = String.format("http://%s:%d%s/%s", endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName);
} else {
if (StringUtils.isBlank(endpoint)) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank");
}
this.name = endpoint + "-" + namespace;
if (StringUtils.isNotBlank(namespace)) {
this.namespace = namespace;
this.tenant = namespace;
this.addressServerUrl = String.format("http://%s:%d%s/%s?namespace=%s", endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName, namespace);
}

this.name = initServerName(null);
initAddressServerUrl(properties);
}

public ServerListManager(Properties properties) throws NacosException {
Expand All @@ -192,6 +185,11 @@ public ServerListManager(Properties properties) throws NacosException {
String namespace = properties.getProperty(PropertyKeyConst.NAMESPACE);
initParam(properties);

if (StringUtils.isNotBlank(namespace)) {
this.namespace = namespace;
this.tenant = namespace;
}

if (StringUtils.isNotEmpty(serverAddrsStr)) {
this.isFixed = true;
List<String> serverAddrs = new ArrayList<>();
Expand All @@ -211,33 +209,62 @@ public ServerListManager(Properties properties) throws NacosException {
}
}
this.serverUrls = serverAddrs;
if (StringUtils.isBlank(namespace)) {
this.name = FIXED_NAME + "-" + getFixedNameSuffix(
this.serverUrls.toArray(new String[this.serverUrls.size()]));
} else {
this.namespace = namespace;
this.tenant = namespace;
this.name = FIXED_NAME + "-" + getFixedNameSuffix(
this.serverUrls.toArray(new String[this.serverUrls.size()])) + "-" + namespace;
}
this.name = initServerName(properties);

} else {
if (StringUtils.isBlank(endpoint)) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank");
}
this.isFixed = false;
if (StringUtils.isBlank(namespace)) {
this.name = endpoint;
this.addressServerUrl = String.format("http://%s:%d%s/%s", this.endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName);
this.name = initServerName(properties);
initAddressServerUrl(properties);
}

}

private String initServerName(Properties properties) {
String serverName = "";
//1.user define server name.
if (properties != null && properties.containsKey(PropertyKeyConst.SERVER_NAME)) {
serverName = properties.get(PropertyKeyConst.SERVER_NAME).toString();
} else {
// if fix url,use fix url join string.
if (isFixed) {
serverName = FIXED_NAME + "-" + (StringUtils.isNotBlank(namespace) ? (StringUtils.trim(namespace) + "-")
: "") + getFixedNameSuffix(serverUrls.toArray(new String[serverUrls.size()]));
} else {
this.namespace = namespace;
this.tenant = namespace;
this.name = this.endpoint + "-" + namespace;
this.addressServerUrl = String
.format("http://%s:%d%s/%s?namespace=%s", this.endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName, namespace);
//if use endpoint , use endpoint ,content path ,serverlist name
serverName = CUSTOM_NAME + "-" + String
.join("_", endpoint, String.valueOf(endpointPort), contentPath, serverListName) + (
StringUtils.isNotBlank(namespace) ? ("_" + StringUtils.trim(namespace)) : "");
}
}
serverName.replaceAll("\\/", "_");
serverName.replaceAll("\\:", "_");

return serverName;
}

private void initAddressServerUrl(Properties properties) {
if (isFixed) {
return;
}
StringBuilder addressServerUrlTem = new StringBuilder(
String.format("http://%s:%d%s/%s", this.endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName));
boolean hasQueryString = false;
if (StringUtils.isNotBlank(namespace)) {
addressServerUrlTem.append("?namespace=" + namespace);
hasQueryString = false;
}
if (properties != null && properties.containsKey(PropertyKeyConst.ENDPOINT_QUERY_PARAMS)) {
addressServerUrlTem
.append(hasQueryString ? "&" : "?" + properties.get(PropertyKeyConst.ENDPOINT_QUERY_PARAMS));

}

this.addressServerUrl = addressServerUrlTem.toString();
LOGGER.info("serverName = {}, address server url = {}", this.name, this.addressServerUrl);
}

private void initParam(Properties properties) {
Expand Down
4 changes: 4 additions & 0 deletions client/src/main/resources/nacos-logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@
<appender-ref ref="REMOTE_LOG_FILE"/>
</Logger>

<Logger name="com.alibaba.nacos.shaded.io.grpc" level="${com.alibaba.nacos.log.level:-info}"
additivity="false">
<appender-ref ref="REMOTE_LOG_FILE"/>
</Logger>

<logger name="com.alibaba.nacos.client.config" level="${com.alibaba.nacos.config.log.level:-info}"
additivity="false">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testGetterAndSetter() throws NacosException {
Assert.assertNull(encode);
Assert.assertEquals("namespace1", namespace);
Assert.assertEquals("namespace1", tenant);
Assert.assertEquals("aaa-namespace1", name);
Assert.assertEquals("custom-aaa_8080_nacos_serverlist_namespace1", name);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void testShutdown() throws NacosException, NoSuchFieldException, IllegalA
agent1.setAccessible(false);

Assert.assertTrue(clientWorker.isHealthServer());
Assert.assertEquals("config_rpc_client", clientWorker.getAgentName());
Assert.assertEquals(null, clientWorker.getAgentName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void testStart() throws NacosException {
Assert.fail();
} catch (NacosException e) {
Assert.assertEquals(
"fail to get NACOS-server serverlist! env:custom-localhost-0, not connnect url:http://localhost:0/nacos/serverlist",
"fail to get NACOS-server serverlist! env:custom-localhost_0_nacos_serverlist, not connnect url:http://localhost:0/nacos/serverlist",
e.getErrMsg());
}
mgr.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,26 +79,31 @@ public static RpcClient createClient(String clientName, ConnectionType connectio
/**
* create a rpc client.
*
* @param clientName client name.
* @param connectionType client type.
* @param clientName client name.
* @param connectionType client type.
* @param threadPoolCoreSize grpc thread pool core size
* @param threadPoolMaxSize grpc thread pool max size
* @param threadPoolMaxSize grpc thread pool max size
* @return rpc client.
*/
public static RpcClient createClient(String clientName, ConnectionType connectionType,
Integer threadPoolCoreSize, Integer threadPoolMaxSize,
Map<String, String> labels) {
public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,
Integer threadPoolMaxSize, Map<String, String> labels) {
if (!ConnectionType.GRPC.equals(connectionType)) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}

return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
GrpcClient client = new GrpcSdkClient(clientNameInner);
client.setThreadPoolCoreSize(threadPoolCoreSize);
client.setThreadPoolMaxSize(threadPoolMaxSize);
client.labels(labels);
return client;
try {
GrpcClient client = new GrpcSdkClient(clientNameInner);
client.setThreadPoolCoreSize(threadPoolCoreSize);
client.setThreadPoolMaxSize(threadPoolMaxSize);
client.labels(labels);
return client;
} catch (Throwable throwable) {
LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);
throw throwable;
}

});
}

Expand All @@ -117,19 +122,18 @@ public static RpcClient createClusterClient(String clientName, ConnectionType co
/**
* create a rpc client.
*
* @param clientName client name.
* @param connectionType client type.
* @param clientName client name.
* @param connectionType client type.
* @param threadPoolCoreSize grpc thread pool core size
* @param threadPoolMaxSize grpc thread pool max size
* @param threadPoolMaxSize grpc thread pool max size
* @return rpc client.
*/
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Integer threadPoolCoreSize, Integer threadPoolMaxSize,
Map<String, String> labels) {
Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels) {
if (!ConnectionType.GRPC.equals(connectionType)) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}

return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
GrpcClient client = new GrpcClusterClient(clientNameInner);
client.setThreadPoolCoreSize(threadPoolCoreSize);
Expand Down

0 comments on commit 8294673

Please sign in to comment.