Skip to content

Commit

Permalink
#269 Support service group
Browse files Browse the repository at this point in the history
  • Loading branch information
nkorange committed Feb 18, 2019
1 parent ce610d4 commit 7af1b48
Show file tree
Hide file tree
Showing 25 changed files with 226 additions and 99 deletions.
20 changes: 10 additions & 10 deletions api/src/main/java/com/alibaba/nacos/api/naming/pojo/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ public class Service {
/**
* application name of this service
*/
private String app;
private String appName;

/**
* Service group to classify services into different sets.
*/
private String group;
private String groupName;

private Map<String, String> metadata = new HashMap<String, String>();

Expand All @@ -74,20 +74,20 @@ public void setProtectThreshold(float protectThreshold) {
this.protectThreshold = protectThreshold;
}

public String getApp() {
return app;
public String getAppName() {
return appName;
}

public void setApp(String app) {
this.app = app;
public void setAppName(String appName) {
this.appName = appName;
}

public String getGroup() {
return group;
public String getGroupName() {
return groupName;
}

public void setGroup(String group) {
this.group = group;
public void setGroupName(String groupName) {
this.groupName = groupName;
}

public Map<String, String> getMetadata() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public interface ConsistencyService {
* @param listener callback of data change
* @throws NacosException
*/
void listen(String key, DataListener listener) throws NacosException;
void listen(String key, RecordListener listener) throws NacosException;

/**
* Cancel listening of a data
Expand All @@ -75,7 +75,7 @@ public interface ConsistencyService {
* @param listener callback of data change
* @throws NacosException
*/
void unlisten(String key, DataListener listener) throws NacosException;
void unlisten(String key, RecordListener listener) throws NacosException;

/**
* Is the local server responsible for a data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ public Datum get(String key) throws NacosException {
}

@Override
public void listen(String key, DataListener listener) throws NacosException {
public void listen(String key, RecordListener listener) throws NacosException {
mapConsistencyService(key).listen(key, listener);
}

@Override
public void unlisten(String key, DataListener listener) throws NacosException {
public void unlisten(String key, RecordListener listener) throws NacosException {
mapConsistencyService(key).unlisten(key, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*
* @author nacos
*/
public interface DataListener<T extends Record> {
public interface RecordListener<T extends Record> {

/**
* Determine if the listener was registered with this key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public void run() {
initialized = true;
} else {
int desiredInstanceCount = Integer.parseInt(lines.get(0).split("=")[1]);
if (desiredInstanceCount * partitionConfig.getInitDataRatio() < dataStore.keys().size()) {
if (desiredInstanceCount <= 0 ||
desiredInstanceCount * partitionConfig.getInitDataRatio() < dataStore.keys().size()) {
initialized = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.DataListener;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService;
Expand Down Expand Up @@ -66,7 +66,7 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
@Autowired
private Serializer serializer;

private volatile Map<String, List<DataListener>> listeners = new ConcurrentHashMap<>();
private volatile Map<String, List<RecordListener>> listeners = new ConcurrentHashMap<>();

@Override
public void put(String key, Record value) throws NacosException {
Expand Down Expand Up @@ -97,7 +97,7 @@ public void onPut(String key, Record value) {
if (!listeners.containsKey(key)) {
return;
}
for (DataListener listener : listeners.get(key)) {
for (RecordListener listener : listeners.get(key)) {
try {
listener.onChange(key, value);
} catch (Exception e) {
Expand All @@ -113,7 +113,7 @@ public void onRemove(String key) {
if (!listeners.containsKey(key)) {
return;
}
for (DataListener listener : listeners.get(key)) {
for (RecordListener listener : listeners.get(key)) {
try {
listener.onDelete(key);
} catch (Exception e) {
Expand Down Expand Up @@ -172,7 +172,7 @@ public void onReceiveTimestamps(Map<String, String> timestamps, String server) {
if (!listeners.containsKey(entry.getKey())) {
return;
}
for (DataListener listener : listeners.get(entry.getKey())) {
for (RecordListener listener : listeners.get(entry.getKey())) {
try {
listener.onChange(entry.getKey(), entry.getValue().value);
} catch (Exception e) {
Expand All @@ -188,20 +188,20 @@ public void onReceiveTimestamps(Map<String, String> timestamps, String server) {
}

@Override
public void listen(String key, DataListener listener) throws NacosException {
public void listen(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
listeners.put(key, new ArrayList<>());
}
listeners.get(key).add(listener);
}

@Override
public void unlisten(String key, DataListener listener) throws NacosException {
public void unlisten(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
return;
}
for (DataListener dataListener : listeners.get(key)) {
if (dataListener.equals(listener)) {
for (RecordListener recordListener : listeners.get(key)) {
if (recordListener.equals(listener)) {
listeners.get(key).remove(listener);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.alibaba.nacos.naming.consistency.persistent.raft;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.consistency.DataListener;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.misc.Loggers;
Expand Down Expand Up @@ -62,12 +62,12 @@ public Datum get(String key) throws NacosException {
}

@Override
public void listen(String key, DataListener listener) throws NacosException {
public void listen(String key, RecordListener listener) throws NacosException {
raftCore.listen(key, listener);
}

@Override
public void unlisten(String key, DataListener listener) throws NacosException {
public void unlisten(String key, RecordListener listener) throws NacosException {
raftCore.unlisten(key, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.DataListener;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.core.Instances;
Expand Down Expand Up @@ -90,7 +90,7 @@ public Thread newThread(Runnable r) {

public static final int PUBLISH_TERM_INCREASE_COUNT = 100;

private volatile Map<String, List<DataListener>> listeners = new ConcurrentHashMap<>();
private volatile Map<String, List<RecordListener>> listeners = new ConcurrentHashMap<>();

private volatile ConcurrentMap<String, Datum> datums = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -143,7 +143,7 @@ public void init() throws Exception {
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}

public Map<String, List<DataListener>> getListeners() {
public Map<String, List<RecordListener>> getListeners() {
return listeners;
}

Expand Down Expand Up @@ -751,9 +751,9 @@ public Integer onCompleted(Response response) throws Exception {
return local;
}

public void listen(String key, DataListener listener) {
public void listen(String key, RecordListener listener) {

List<DataListener> listenerList = listeners.get(key);
List<RecordListener> listenerList = listeners.get(key);
if (listenerList != null && listenerList.contains(listener)) {
return;
}
Expand Down Expand Up @@ -781,13 +781,13 @@ public void listen(String key, DataListener listener) {
}
}

public void unlisten(String key, DataListener listener) {
public void unlisten(String key, RecordListener listener) {

if (!listeners.containsKey(key)) {
return;
}

for (DataListener dl : listeners.get(key)) {
for (RecordListener dl : listeners.get(key)) {
// TODO maybe use equal:
if (dl == listener) {
listeners.get(key).remove(listener);
Expand Down Expand Up @@ -922,7 +922,7 @@ public void run() {

if (KeyBuilder.matchServiceMetaKey(datum.key) && !KeyBuilder.matchSwitchKey(datum.key)) {

for (DataListener listener : listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
for (RecordListener listener : listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datum.key, getDatum(datum.key).value);
Expand All @@ -942,7 +942,7 @@ public void run() {
continue;
}

for (DataListener listener : listeners.get(datum.key)) {
for (RecordListener listener : listeners.get(datum.key)) {

count++;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,30 @@ public synchronized Datum readDatum(File file, String namespaceId) throws IOExce
}

if (KeyBuilder.matchServiceMetaKey(file.getName())) {

Datum<Service> serviceDatum;

try {
return JSON.parseObject(json.replace("\\", ""), new TypeReference<Datum<Service>>() {
serviceDatum = JSON.parseObject(json.replace("\\", ""), new TypeReference<Datum<Service>>() {
});
} catch (Exception e) {
JSONObject jsonObject = JSON.parseObject(json);

Datum<Service> serviceDatum = new Datum<>();
serviceDatum = new Datum<>();
serviceDatum.timestamp.set(jsonObject.getLongValue("timestamp"));
serviceDatum.key = jsonObject.getString("key");
serviceDatum.value = JSON.parseObject(jsonObject.getString("value"), Service.class);
return serviceDatum;
}

if (StringUtils.isBlank(serviceDatum.value.getGroupName())) {
serviceDatum.value.setGroupName(UtilsAndCommons.DEFAULT_GROUP_NAME);
}
if (!serviceDatum.value.getName().contains(UtilsAndCommons.GROUP_SERVICE_CONNECTOR)) {
serviceDatum.value.setName(UtilsAndCommons.DEFAULT_GROUP_NAME
+ UtilsAndCommons.GROUP_SERVICE_CONNECTOR + serviceDatum.value.getName());
}

return serviceDatum;
}

if (KeyBuilder.matchInstanceListKey(file.getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public JSONObject serviceList(HttpServletRequest request) throws Exception {
JSONArray serviceJsonArray = new JSONArray();
for (Service service : services) {
ServiceView serviceView = new ServiceView();
serviceView.setName(service.getName());
serviceView.setName(UtilsAndCommons.getServiceName(service.getName()));
serviceView.setGroupName(UtilsAndCommons.getGroupName(service.getName()));
serviceView.setClusterCount(service.getClusterMap().size());
serviceView.setIpCount(service.allIPs().size());

Expand Down Expand Up @@ -99,7 +100,7 @@ public ServiceDetailView serviceDetail(HttpServletRequest request) throws Except

String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
UtilsAndCommons.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, "serviceName");
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.NOT_FOUND, "serivce " + serviceName + " is not found!");
Expand Down Expand Up @@ -133,8 +134,8 @@ public JSONObject instanceList(HttpServletRequest request) throws Exception {

String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
UtilsAndCommons.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, "serviceName");
String clusterName = WebUtils.required(request, "clusterName");
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String clusterName = WebUtils.required(request, CommonParams.CLUSTER_NAME);
int page = Integer.parseInt(WebUtils.required(request, "startPg"));
int pageSize = Integer.parseInt(WebUtils.required(request, "pgSize"));

Expand Down Expand Up @@ -184,7 +185,8 @@ public List<ServiceDetailInfo> listDetail(HttpServletRequest request) {
(serviceName, service) -> {

ServiceDetailInfo serviceDetailInfo = new ServiceDetailInfo();
serviceDetailInfo.setServiceName(serviceName);
serviceDetailInfo.setServiceName(UtilsAndCommons.getServiceName(serviceName));
serviceDetailInfo.setGroupName(UtilsAndCommons.getGroupName(serviceName));
serviceDetailInfo.setMetadata(service.getMetadata());

Map<String, ClusterInfo> clusterInfoMap = getStringClusterInfoMap(service);
Expand Down Expand Up @@ -242,11 +244,11 @@ public JSONObject getServicesByIP(HttpServletRequest request) {
for (Instance instance : instances) {
if (ip.contains(":")) {
if (StringUtils.equals(instance.getIp() + ":" + instance.getPort(), ip)) {
serviceNames.add(namespaceId + UtilsAndCommons.SERVICE_GROUP_CONNECTOR + service.getName());
serviceNames.add(namespaceId + UtilsAndCommons.NAMESPACE_SERVICE_CONNECTOR + service.getName());
}
} else {
if (StringUtils.equals(instance.getIp(), ip)) {
serviceNames.add(namespaceId + UtilsAndCommons.SERVICE_GROUP_CONNECTOR + service.getName());
serviceNames.add(namespaceId + UtilsAndCommons.NAMESPACE_SERVICE_CONNECTOR + service.getName());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public String update(HttpServletRequest request) throws Exception {

String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
UtilsAndCommons.DEFAULT_NAMESPACE_ID);
String clusterName = WebUtils.required(request, "clusterName");
String serviceName = WebUtils.required(request, "serviceName");
String clusterName = WebUtils.required(request, CommonParams.CLUSTER_NAME);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String healthChecker = WebUtils.required(request, "healthChecker");
String metadata = WebUtils.optional(request, "metadata", StringUtils.EMPTY);
String checkPort = WebUtils.required(request, "checkPort");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,12 @@ public String update(HttpServletRequest request) throws Exception {

String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
UtilsAndCommons.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, "serviceName");
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String ip = WebUtils.required(request, "ip");
int port = Integer.parseInt(WebUtils.required(request, "port"));
boolean valid = Boolean.valueOf(WebUtils.required(request, "valid"));
String clusterName = WebUtils.optional(request, "clusterName", UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME
, UtilsAndCommons.DEFAULT_CLUSTER_NAME);

if (!distroMapper.responsible(serviceName)) {
String server = distroMapper.mapSrv(serviceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,9 @@ public JSONObject listWithHealthStatus(HttpServletRequest request) throws NacosE
String serviceName;
String namespaceId;

if (key.contains(UtilsAndCommons.SERVICE_GROUP_CONNECTOR)) {
namespaceId = key.split(UtilsAndCommons.SERVICE_GROUP_CONNECTOR)[0];
serviceName = key.split(UtilsAndCommons.SERVICE_GROUP_CONNECTOR)[1];
if (key.contains(UtilsAndCommons.NAMESPACE_SERVICE_CONNECTOR)) {
namespaceId = key.split(UtilsAndCommons.NAMESPACE_SERVICE_CONNECTOR)[0];
serviceName = key.split(UtilsAndCommons.NAMESPACE_SERVICE_CONNECTOR)[1];
} else {
namespaceId = UtilsAndCommons.DEFAULT_NAMESPACE_ID;
serviceName = key;
Expand Down
Loading

0 comments on commit 7af1b48

Please sign in to comment.