Skip to content

Commit 6615ae8

Browse files
committed
[alibaba#359] Use SPI to make health scalable
1 parent fceff72 commit 6615ae8

File tree

6 files changed

+163
-56
lines changed

6 files changed

+163
-56
lines changed

api/src/main/java/com/alibaba/nacos/api/naming/pojo/AbstractHealthChecker.java

+32
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.alibaba.nacos.api.naming.pojo;
1717

1818
import com.alibaba.fastjson.annotation.JSONField;
19+
import com.alibaba.fastjson.serializer.SerializeWriter;
1920
import com.alibaba.nacos.api.common.Constants;
2021
import org.apache.commons.lang3.StringUtils;
2122

@@ -45,8 +46,16 @@ public void setType(String type) {
4546
* @return Another instance with exactly the same fields.
4647
* @throws CloneNotSupportedException
4748
*/
49+
@Override
4850
public abstract AbstractHealthChecker clone() throws CloneNotSupportedException;
4951

52+
/**
53+
* used to JsonAdapter
54+
*/
55+
public void jsonAdapterCallback(SerializeWriter writer){
56+
// do nothing
57+
}
58+
5059
public static class None extends AbstractHealthChecker {
5160

5261
public static final String TYPE = "NONE";
@@ -116,6 +125,17 @@ public Map<String, String> getCustomHeaders() {
116125
return headerMap;
117126
}
118127

128+
/**
129+
* used to JsonAdapter
130+
*
131+
* @param writer
132+
*/
133+
@Override
134+
public void jsonAdapterCallback(SerializeWriter writer) {
135+
writer.writeFieldValue(',', "path", getPath());
136+
writer.writeFieldValue(',', "headers", getHeaders());
137+
}
138+
119139
@Override
120140
public int hashCode() {
121141
return Objects.hash(path, headers, expectedResponseCode);
@@ -215,6 +235,18 @@ public void setPwd(String pwd) {
215235
this.pwd = pwd;
216236
}
217237

238+
/**
239+
* used to JsonAdapter
240+
*
241+
* @param writer
242+
*/
243+
@Override
244+
public void jsonAdapterCallback(SerializeWriter writer) {
245+
writer.writeFieldValue(',', "user", getUser());
246+
writer.writeFieldValue(',', "pwd", getPwd());
247+
writer.writeFieldValue(',', "cmd", getCmd());
248+
}
249+
218250
@Override
219251
public int hashCode() {
220252
return Objects.hash(user, pwd, cmd);
Binary file not shown.

naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckProcessorDelegate.java

+20-23
Original file line numberDiff line numberDiff line change
@@ -15,48 +15,45 @@
1515
*/
1616
package com.alibaba.nacos.naming.healthcheck;
1717

18+
import com.alibaba.nacos.naming.healthcheck.extend.HealthCheckExtendProvider;
1819
import org.springframework.beans.factory.annotation.Autowired;
1920
import org.springframework.stereotype.Component;
2021

22+
import java.util.Collection;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
import java.util.stream.Collectors;
26+
2127
/**
2228
* @author nacos
2329
*/
2430
@Component("healthCheckDelegate")
2531
public class HealthCheckProcessorDelegate implements HealthCheckProcessor {
2632

27-
@Autowired
28-
private HttpHealthCheckProcessor httpProcessor;
29-
30-
@Autowired
31-
private TcpSuperSenseProcessor tcpProcessor;
33+
private Map<String, HealthCheckProcessor> healthCheckProcessorMap
34+
= new HashMap<>();
3235

33-
@Autowired
34-
private MysqlHealthCheckProcessor mysqlProcessor;
36+
public HealthCheckProcessorDelegate(HealthCheckExtendProvider provider) {
37+
provider.init();
38+
}
3539

3640
@Autowired
37-
private NoneHealthCheckProcessor noneProcessor;
41+
public void addProcessor(Collection<HealthCheckProcessor> processors){
42+
healthCheckProcessorMap.putAll(processors.stream()
43+
.filter(processor -> processor.getType() != null)
44+
.collect(Collectors.toMap(HealthCheckProcessor::getType, processor -> processor)));
45+
}
3846

3947
@Override
4048
public void process(HealthCheckTask task) {
4149

4250
String type = task.getCluster().getHealthChecker().getType();
43-
44-
if (type.equals(httpProcessor.getType())) {
45-
httpProcessor.process(task);
46-
return;
47-
}
48-
49-
if (type.equals(tcpProcessor.getType())) {
50-
tcpProcessor.process(task);
51-
return;
52-
}
53-
54-
if (type.equals(mysqlProcessor.getType())) {
55-
mysqlProcessor.process(task);
56-
return;
51+
HealthCheckProcessor processor = healthCheckProcessorMap.get(type);
52+
if(processor == null){
53+
processor = healthCheckProcessorMap.get("none");
5754
}
5855

59-
noneProcessor.process(task);
56+
processor.process(task);
6057
}
6158

6259
@Override

naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckType.java

+29-4
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,49 @@
1515
*/
1616
package com.alibaba.nacos.naming.healthcheck;
1717

18+
import com.alibaba.nacos.api.naming.pojo.AbstractHealthChecker;
19+
20+
import java.util.Map;
21+
import java.util.concurrent.ConcurrentHashMap;
22+
1823
/**
1924
* @author nkorange
2025
*/
2126
public enum HealthCheckType {
2227
/**
2328
* TCP type
2429
*/
25-
TCP,
30+
TCP("tcp", AbstractHealthChecker.Tcp.class),
2631
/**
2732
* HTTP type
2833
*/
29-
HTTP,
34+
HTTP("http", AbstractHealthChecker.Http.class),
3035
/**
3136
* MySQL type
3237
*/
33-
MYSQL,
38+
MYSQL("mysql", AbstractHealthChecker.Mysql.class),
3439
/**
3540
* No check
3641
*/
37-
NONE
42+
NONE("none", AbstractHealthChecker.None.class);
43+
44+
private String name;
45+
46+
private Class healthCheckerClass;
47+
48+
private static Map<String, Class> EXTEND =
49+
new ConcurrentHashMap<>();
50+
51+
HealthCheckType(String name, Class healthCheckerClass) {
52+
this.name = name;
53+
this.healthCheckerClass = healthCheckerClass;
54+
}
55+
56+
public static void registerHealthChecker(String type, Class healthCheckerClass){
57+
EXTEND.putIfAbsent(type, healthCheckerClass);
58+
}
59+
60+
public static Class ofHealthCheckerClass(String type){
61+
return valueOf(type) == null ? EXTEND.get(type) : valueOf(type).healthCheckerClass;
62+
}
3863
}

naming/src/main/java/com/alibaba/nacos/naming/healthcheck/JsonAdapter.java

+5-29
Original file line numberDiff line numberDiff line change
@@ -42,25 +42,16 @@ public static JsonAdapter getInstance() {
4242
return INSTANCE;
4343
}
4444

45+
@SuppressWarnings("unchecked")
4546
@Override
4647
public <T> T deserialze(DefaultJSONParser parser, Type type, Object fieldName) {
4748
JSONObject jsonObj = (JSONObject) parser.parse();
4849
String checkType = jsonObj.getString("type");
4950

50-
if (StringUtils.equals(checkType, AbstractHealthChecker.Http.TYPE)) {
51-
return (T) JSON.parseObject(jsonObj.toJSONString(), AbstractHealthChecker.Http.class);
52-
}
53-
54-
if (StringUtils.equals(checkType, AbstractHealthChecker.Tcp.TYPE)) {
55-
return (T) JSON.parseObject(jsonObj.toJSONString(), AbstractHealthChecker.Tcp.class);
56-
}
57-
58-
if (StringUtils.equals(checkType, AbstractHealthChecker.None.TYPE)) {
59-
return (T) JSON.parseObject(jsonObj.toJSONString(), AbstractHealthChecker.None.class);
60-
}
51+
Class target = HealthCheckType.ofHealthCheckerClass(checkType);
6152

62-
if (StringUtils.equals(checkType, AbstractHealthChecker.Mysql.TYPE)) {
63-
return (T) JSON.parseObject(jsonObj.toJSONString(), AbstractHealthChecker.Mysql.class);
53+
if(target != null){
54+
return (T) JSON.parseObject(jsonObj.toJSONString(), target);
6455
}
6556

6657
return null;
@@ -83,21 +74,6 @@ public void write(JSONSerializer jsonSerializer, Object o, Object o1, Type type,
8374

8475
writer.writeFieldValue(',', "type", config.getType());
8576

86-
if (StringUtils.equals(config.getType(), HealthCheckType.HTTP.name())) {
87-
AbstractHealthChecker.Http httpCheckConfig = (AbstractHealthChecker.Http) config;
88-
writer.writeFieldValue(',', "path", httpCheckConfig.getPath());
89-
writer.writeFieldValue(',', "headers", httpCheckConfig.getHeaders());
90-
}
91-
92-
if (StringUtils.equals(config.getType(), HealthCheckType.TCP.name())) {
93-
// nothing sepcial to handle
94-
}
95-
96-
if (StringUtils.equals(config.getType(), HealthCheckType.MYSQL.name())) {
97-
AbstractHealthChecker.Mysql mysqlCheckConfig = (AbstractHealthChecker.Mysql) config;
98-
writer.writeFieldValue(',', "user", mysqlCheckConfig.getUser());
99-
writer.writeFieldValue(',', "pwd", mysqlCheckConfig.getPwd());
100-
writer.writeFieldValue(',', "cmd", mysqlCheckConfig.getCmd());
101-
}
77+
config.jsonAdapterCallback(writer);
10278
}
10379
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package com.alibaba.nacos.naming.healthcheck.extend;
2+
3+
import com.alibaba.nacos.api.naming.pojo.AbstractHealthChecker;
4+
import com.alibaba.nacos.naming.healthcheck.HealthCheckProcessor;
5+
import com.alibaba.nacos.naming.healthcheck.HealthCheckType;
6+
import org.springframework.beans.BeansException;
7+
import org.springframework.beans.factory.BeanFactory;
8+
import org.springframework.beans.factory.BeanFactoryAware;
9+
import org.springframework.beans.factory.config.SingletonBeanRegistry;
10+
import org.springframework.stereotype.Component;
11+
12+
import java.util.HashSet;
13+
import java.util.Iterator;
14+
import java.util.ServiceLoader;
15+
import java.util.Set;
16+
17+
/**
18+
* @author XCXCXCXCX
19+
*/
20+
@Component
21+
public class HealthCheckExtendProvider implements BeanFactoryAware{
22+
23+
private ServiceLoader<HealthCheckProcessor> processorLoader
24+
= ServiceLoader.load(HealthCheckProcessor.class);
25+
26+
private ServiceLoader<AbstractHealthChecker> checkerLoader
27+
= ServiceLoader.load(AbstractHealthChecker.class);
28+
29+
private SingletonBeanRegistry registry;
30+
31+
private static final char LOWER_A = 'A';
32+
private static final char LOWER_Z = 'Z';
33+
34+
public void init(){
35+
loadExtend();
36+
}
37+
38+
private void loadExtend() {
39+
Iterator<HealthCheckProcessor> processorIt = processorLoader.iterator();
40+
Iterator<AbstractHealthChecker> healthCheckerIt = checkerLoader.iterator();
41+
42+
Set<String> processorType = new HashSet<>();
43+
Set<String> healthCheckerType = new HashSet<>();
44+
while(processorIt.hasNext()){
45+
HealthCheckProcessor processor = processorIt.next();
46+
processorType.add(processor.getType());
47+
registry.registerSingleton(lowerFirstChar(processor.getClass().getSimpleName()), processor);
48+
}
49+
50+
while(healthCheckerIt.hasNext()){
51+
AbstractHealthChecker checker = healthCheckerIt.next();
52+
healthCheckerType.add(checker.getType());
53+
HealthCheckType.registerHealthChecker(checker.getType(), checker.getClass());
54+
}
55+
if(!processorType.equals(healthCheckerType)){
56+
throw new RuntimeException("An unmatched processor and healthChecker are detected in the extension package.");
57+
}
58+
}
59+
60+
private String lowerFirstChar(String simpleName) {
61+
if(simpleName == null || "".equals(simpleName)){
62+
throw new IllegalArgumentException("can't find extend processor class name");
63+
}
64+
char[] chars = simpleName.toCharArray();
65+
if(chars[0] >= LOWER_A && chars[0] <= LOWER_Z){
66+
chars[0] = (char)(chars[0] + 32);
67+
}
68+
return String.valueOf(chars);
69+
}
70+
71+
@Override
72+
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
73+
if(beanFactory instanceof SingletonBeanRegistry){
74+
this.registry = (SingletonBeanRegistry) beanFactory;
75+
}
76+
}
77+
}

0 commit comments

Comments
 (0)