Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue#8163 fix multi instance share the same local snapshot #8202

Merged
merged 1 commit into from
Apr 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ public class PropertyKeyConst {

public static final String ENDPOINT = "endpoint";

public static final String ENDPOINT_QUERY_PARAMS = "endpointQueryParams";

public static final String ENDPOINT_PORT = "endpointPort";

public static final String SERVER_NAME = "serverName";

public static final String NAMESPACE = "namespace";

public static final String USERNAME = "username";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
*/
public class CacheData {

private static final Logger LOGGER = LogUtils.logger(CacheData.class);

static final int CONCURRENCY = 5;

static ThreadFactory internalNotifierFactory = r -> {
Expand All @@ -56,11 +58,16 @@ public class CacheData {
return t;
};

static boolean initSnapshot;

static {
initSnapshot = Boolean.valueOf(System.getProperty("nacos.cache.data.init.snapshot", "true"));
LOGGER.info("nacos.cache.data.init.snapshot = {} ", initSnapshot);
}

static final ThreadPoolExecutor INTERNAL_NOTIFIER = new ThreadPoolExecutor(0, CONCURRENCY, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), internalNotifierFactory);

private static final Logger LOGGER = LogUtils.logger(CacheData.class);

private final String name;

private final ConfigFilterChainManager configFilterChainManager;
Expand Down Expand Up @@ -318,14 +325,14 @@ private void safeNotifyListener(final String dataId, final String group, final S
}

listenerWrap.lastCallMd5 = md5;
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name,
dataId, group, md5, listener, (System.currentTimeMillis() - start));
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name, dataId,
group, md5, listener, (System.currentTimeMillis() - start));
} catch (NacosException ex) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,
dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
group, md5, listener, t.getCause());
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,
md5, listener, t.getCause());
} finally {
listenerWrap.inNotifying = false;
Thread.currentThread().setContextClassLoader(myClassLoader);
Expand Down Expand Up @@ -395,8 +402,10 @@ public CacheData(ConfigFilterChainManager configFilterChainManager, String name,
this.tenant = TenantUtil.getUserTenantForAcm();
listeners = new CopyOnWriteArrayList<>();
this.isInitializing = true;
this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant);
this.md5 = getMd5String(content);
if (initSnapshot) {
this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant);
this.md5 = getMd5String(content);
}
this.encryptedDataKey = loadEncryptedDataKeyFromDiskLocal(name, dataId, group, tenant);
}

Expand All @@ -412,8 +421,10 @@ public CacheData(ConfigFilterChainManager configFilterChainManager, String name,
this.tenant = tenant;
listeners = new CopyOnWriteArrayList<>();
this.isInitializing = true;
this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant);
this.md5 = getMd5String(content);
if (initSnapshot) {
this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant);
this.md5 = getMd5String(content);
}
}

// ==================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ public void startInternal() {

@Override
public String getName() {
return "config_rpc_client";
return serverListManager.getName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,28 +132,28 @@ public ServerListManager(List<String> fixed, String namespace) {
for (String serverAddr : fixed) {
String[] serverAddrArr = InternetAddressUtil.splitIPPortStr(serverAddr);
if (serverAddrArr.length == 1) {
serverAddrs.add(serverAddrArr[0] + InternetAddressUtil.IP_PORT_SPLITER + ParamUtil.getDefaultServerPort());
serverAddrs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this change is necessary

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the modification on ServerListManager has several purposes:
1.optimize the naming rule of server name when the serverlistmanager is using endpoint , port,contentpath ,cluster name which construct the whole address server.
2.support endpoint query string parameter after the address server url to satisfy more usage scenarios , for example, this query string is essential in alibaba internal scene when using address server.
3.support user defined server name to serverlistmanager and client worker , to make our log print more graceful in multi instances scenarios.

.add(serverAddrArr[0] + InternetAddressUtil.IP_PORT_SPLITER + ParamUtil.getDefaultServerPort());
} else {
serverAddrs.add(serverAddr);
}
}
this.serverUrls = new ArrayList<>(serverAddrs);
if (StringUtils.isBlank(namespace)) {
this.name = FIXED_NAME + "-" + getFixedNameSuffix(serverAddrs.toArray(new String[serverAddrs.size()]));
} else {
if (StringUtils.isNotBlank(namespace)) {
this.namespace = namespace;
this.name = FIXED_NAME + "-" + getFixedNameSuffix(serverAddrs.toArray(new String[serverAddrs.size()])) + "-"
+ namespace;
this.tenant = namespace;
}
this.name = initServerName(null);
}

public ServerListManager(String host, int port) {
this.isFixed = false;
this.isStarted = false;
this.name = CUSTOM_NAME + "-" + host + "-" + port;
this.addressServerUrl = String
.format("http://%s:%d%s/%s", host, port, ContextPathUtil.normalizeContextPath(this.contentPath),
this.serverListName);
this.endpoint = host;
this.endpointPort = port;

this.name = initServerName(null);
initAddressServerUrl(null);
}

public ServerListManager(String endpoint) throws NacosException {
Expand All @@ -165,25 +165,18 @@ public ServerListManager(String endpoint, String namespace) throws NacosExceptio
this.isStarted = false;
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.ENDPOINT, endpoint);
endpoint = initEndpoint(properties);
this.endpoint = initEndpoint(properties);

if (StringUtils.isBlank(endpoint)) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank");
}
if (StringUtils.isBlank(namespace)) {
this.name = endpoint;
this.addressServerUrl = String.format("http://%s:%d%s/%s", endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName);
} else {
if (StringUtils.isBlank(endpoint)) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank");
}
this.name = endpoint + "-" + namespace;
if (StringUtils.isNotBlank(namespace)) {
this.namespace = namespace;
this.tenant = namespace;
this.addressServerUrl = String.format("http://%s:%d%s/%s?namespace=%s", endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName, namespace);
}

this.name = initServerName(null);
initAddressServerUrl(properties);
}

public ServerListManager(Properties properties) throws NacosException {
Expand All @@ -192,6 +185,11 @@ public ServerListManager(Properties properties) throws NacosException {
String namespace = properties.getProperty(PropertyKeyConst.NAMESPACE);
initParam(properties);

if (StringUtils.isNotBlank(namespace)) {
this.namespace = namespace;
this.tenant = namespace;
}

if (StringUtils.isNotEmpty(serverAddrsStr)) {
this.isFixed = true;
List<String> serverAddrs = new ArrayList<>();
Expand All @@ -211,33 +209,62 @@ public ServerListManager(Properties properties) throws NacosException {
}
}
this.serverUrls = serverAddrs;
if (StringUtils.isBlank(namespace)) {
this.name = FIXED_NAME + "-" + getFixedNameSuffix(
this.serverUrls.toArray(new String[this.serverUrls.size()]));
} else {
this.namespace = namespace;
this.tenant = namespace;
this.name = FIXED_NAME + "-" + getFixedNameSuffix(
this.serverUrls.toArray(new String[this.serverUrls.size()])) + "-" + namespace;
}
this.name = initServerName(properties);

} else {
if (StringUtils.isBlank(endpoint)) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank");
}
this.isFixed = false;
if (StringUtils.isBlank(namespace)) {
this.name = endpoint;
this.addressServerUrl = String.format("http://%s:%d%s/%s", this.endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName);
this.name = initServerName(properties);
initAddressServerUrl(properties);
}

}

private String initServerName(Properties properties) {
String serverName = "";
//1.user define server name.
if (properties != null && properties.containsKey(PropertyKeyConst.SERVER_NAME)) {
serverName = properties.get(PropertyKeyConst.SERVER_NAME).toString();
} else {
// if fix url,use fix url join string.
if (isFixed) {
serverName = FIXED_NAME + "-" + (StringUtils.isNotBlank(namespace) ? (StringUtils.trim(namespace) + "-")
: "") + getFixedNameSuffix(serverUrls.toArray(new String[serverUrls.size()]));
} else {
this.namespace = namespace;
this.tenant = namespace;
this.name = this.endpoint + "-" + namespace;
this.addressServerUrl = String
.format("http://%s:%d%s/%s?namespace=%s", this.endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName, namespace);
//if use endpoint , use endpoint ,content path ,serverlist name
serverName = CUSTOM_NAME + "-" + String
.join("_", endpoint, String.valueOf(endpointPort), contentPath, serverListName) + (
StringUtils.isNotBlank(namespace) ? ("_" + StringUtils.trim(namespace)) : "");
}
}
serverName.replaceAll("\\/", "_");
serverName.replaceAll("\\:", "_");

return serverName;
}

private void initAddressServerUrl(Properties properties) {
if (isFixed) {
return;
}
StringBuilder addressServerUrlTem = new StringBuilder(
String.format("http://%s:%d%s/%s", this.endpoint, this.endpointPort,
ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName));
boolean hasQueryString = false;
if (StringUtils.isNotBlank(namespace)) {
addressServerUrlTem.append("?namespace=" + namespace);
hasQueryString = false;
}
if (properties != null && properties.containsKey(PropertyKeyConst.ENDPOINT_QUERY_PARAMS)) {
addressServerUrlTem
.append(hasQueryString ? "&" : "?" + properties.get(PropertyKeyConst.ENDPOINT_QUERY_PARAMS));

}

this.addressServerUrl = addressServerUrlTem.toString();
LOGGER.info("serverName = {}, address server url = {}", this.name, this.addressServerUrl);
}

private void initParam(Properties properties) {
Expand Down
4 changes: 4 additions & 0 deletions client/src/main/resources/nacos-logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@
<appender-ref ref="REMOTE_LOG_FILE"/>
</Logger>

<Logger name="com.alibaba.nacos.shaded.io.grpc" level="${com.alibaba.nacos.log.level:-info}"
additivity="false">
<appender-ref ref="REMOTE_LOG_FILE"/>
</Logger>

<logger name="com.alibaba.nacos.client.config" level="${com.alibaba.nacos.config.log.level:-info}"
additivity="false">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testGetterAndSetter() throws NacosException {
Assert.assertNull(encode);
Assert.assertEquals("namespace1", namespace);
Assert.assertEquals("namespace1", tenant);
Assert.assertEquals("aaa-namespace1", name);
Assert.assertEquals("custom-aaa_8080_nacos_serverlist_namespace1", name);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void testShutdown() throws NacosException, NoSuchFieldException, IllegalA
agent1.setAccessible(false);

Assert.assertTrue(clientWorker.isHealthServer());
Assert.assertEquals("config_rpc_client", clientWorker.getAgentName());
Assert.assertEquals(null, clientWorker.getAgentName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void testStart() throws NacosException {
Assert.fail();
} catch (NacosException e) {
Assert.assertEquals(
"fail to get NACOS-server serverlist! env:custom-localhost-0, not connnect url:http://localhost:0/nacos/serverlist",
"fail to get NACOS-server serverlist! env:custom-localhost_0_nacos_serverlist, not connnect url:http://localhost:0/nacos/serverlist",
e.getErrMsg());
}
mgr.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,26 +79,31 @@ public static RpcClient createClient(String clientName, ConnectionType connectio
/**
* create a rpc client.
*
* @param clientName client name.
* @param connectionType client type.
* @param clientName client name.
* @param connectionType client type.
* @param threadPoolCoreSize grpc thread pool core size
* @param threadPoolMaxSize grpc thread pool max size
* @param threadPoolMaxSize grpc thread pool max size
* @return rpc client.
*/
public static RpcClient createClient(String clientName, ConnectionType connectionType,
Integer threadPoolCoreSize, Integer threadPoolMaxSize,
Map<String, String> labels) {
public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,
Integer threadPoolMaxSize, Map<String, String> labels) {
if (!ConnectionType.GRPC.equals(connectionType)) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}

return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
GrpcClient client = new GrpcSdkClient(clientNameInner);
client.setThreadPoolCoreSize(threadPoolCoreSize);
client.setThreadPoolMaxSize(threadPoolMaxSize);
client.labels(labels);
return client;
try {
GrpcClient client = new GrpcSdkClient(clientNameInner);
client.setThreadPoolCoreSize(threadPoolCoreSize);
client.setThreadPoolMaxSize(threadPoolMaxSize);
client.labels(labels);
return client;
} catch (Throwable throwable) {
LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);
throw throwable;
}

});
}

Expand All @@ -117,19 +122,18 @@ public static RpcClient createClusterClient(String clientName, ConnectionType co
/**
* create a rpc client.
*
* @param clientName client name.
* @param connectionType client type.
* @param clientName client name.
* @param connectionType client type.
* @param threadPoolCoreSize grpc thread pool core size
* @param threadPoolMaxSize grpc thread pool max size
* @param threadPoolMaxSize grpc thread pool max size
* @return rpc client.
*/
public static RpcClient createClusterClient(String clientName, ConnectionType connectionType,
Integer threadPoolCoreSize, Integer threadPoolMaxSize,
Map<String, String> labels) {
Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels) {
if (!ConnectionType.GRPC.equals(connectionType)) {
throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
}

return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
GrpcClient client = new GrpcClusterClient(clientNameInner);
client.setThreadPoolCoreSize(threadPoolCoreSize);
Expand Down