1
+ package org .dhorse .infrastructure .strategy .cluster .k8s ;
2
+
3
+ import java .util .Collections ;
4
+ import java .util .HashSet ;
5
+ import java .util .Set ;
6
+
7
+ import org .apache .commons .lang3 .StringUtils ;
8
+ import org .dhorse .api .enums .MessageCodeEnum ;
9
+ import org .dhorse .infrastructure .component .ComponentConstants ;
10
+ import org .dhorse .infrastructure .component .SpringBeanContext ;
11
+ import org .dhorse .infrastructure .repository .po .ClusterPO ;
12
+ import org .dhorse .infrastructure .utils .Constants ;
13
+ import org .dhorse .infrastructure .utils .HttpUtils ;
14
+ import org .dhorse .infrastructure .utils .K8sUtils ;
15
+ import org .dhorse .infrastructure .utils .LogUtils ;
16
+ import org .slf4j .Logger ;
17
+ import org .slf4j .LoggerFactory ;
18
+ import org .springframework .util .CollectionUtils ;
19
+
20
+ import io .fabric8 .kubernetes .api .model .ConfigMap ;
21
+ import io .fabric8 .kubernetes .api .model .Namespace ;
22
+ import io .fabric8 .kubernetes .api .model .NamespaceList ;
23
+ import io .fabric8 .kubernetes .api .model .ObjectMeta ;
24
+ import io .fabric8 .kubernetes .client .KubernetesClient ;
25
+ import io .fabric8 .kubernetes .client .dsl .Resource ;
26
+
27
+ public class DHorseConfigHelper {
28
+
29
+ private static final Logger logger = LoggerFactory .getLogger (DHorseConfigHelper .class );
30
+
31
+ /**
32
+ * 通过ConfigMap向k8s集群写入dhorse服务器的地址,地址格式为:ip1:8100,ip2:8100
33
+ */
34
+ public static void writeServerIp (ClusterPO clusterPO , KubernetesClient client ) {
35
+ NamespaceList namespaceList = client .namespaces ().list ();
36
+ if (CollectionUtils .isEmpty (namespaceList .getItems ())) {
37
+ return ;
38
+ }
39
+
40
+ ComponentConstants componentConstants = SpringBeanContext .getBean (ComponentConstants .class );
41
+ for (Namespace n : namespaceList .getItems ()) {
42
+ String namespace = n .getMetadata ().getName ();
43
+ if (!K8sUtils .DHORSE_NAMESPACE .equals (namespace )
44
+ && K8sUtils .getSystemNamspaces ().contains (namespace )) {
45
+ continue ;
46
+ }
47
+ if (!"Active" .equals (n .getStatus ().getPhase ())){
48
+ continue ;
49
+ }
50
+
51
+ ConfigMap configMap = dhorseConfigMap ();
52
+ Resource <ConfigMap > resource = client .configMaps ().inNamespace (namespace )
53
+ .resource (configMap );
54
+ ConfigMap existedCP = resource .get ();
55
+ if (existedCP == null ) {
56
+ String ipPortUri = Constants .hostIp () + ":" + componentConstants .getServerPort ();
57
+ if (ipPortUri .startsWith (Constants .LOCALHOST_IP )) {
58
+ LogUtils .throwException (logger , "Your dhorse server mast have a valid ip, not 127.0.0.1" , MessageCodeEnum .DHORSE_SERVER_URL_FAILURE );
59
+ }
60
+ configMap .setData (Collections .singletonMap (K8sUtils .DHORSE_SERVER_URL_KEY , ipPortUri ));
61
+ resource .create ();
62
+ }else {
63
+ Set <String > newIp = new HashSet <>();
64
+ newIp .add (Constants .hostIp () + ":" + componentConstants .getServerPort ());
65
+ String ipStr = existedCP .getData ().get (K8sUtils .DHORSE_SERVER_URL_KEY );
66
+ if (!StringUtils .isBlank (ipStr )) {
67
+ String [] ips = ipStr .split ("," );
68
+ for (String ip : ips ) {
69
+ if (ip .startsWith (Constants .LOCALHOST_IP )) {
70
+ continue ;
71
+ }
72
+ if (!HttpUtils .pingDHorseServer (ip )) {
73
+ continue ;
74
+ }
75
+ newIp .add (ip );
76
+ }
77
+ }
78
+ configMap .setData (Collections .singletonMap (K8sUtils .DHORSE_SERVER_URL_KEY , String .join ("," , newIp )));
79
+ resource .update ();
80
+ }
81
+ }
82
+ }
83
+
84
+ /**
85
+ * 通过ConfigMap向k8s集群删除dhorse服务器的地址,地址格式为:ip1:8100,ip2:8100
86
+ */
87
+ public static void deleteServerIp (ClusterPO clusterPO , KubernetesClient client ) {
88
+ NamespaceList namespaceList = client .namespaces ().list ();
89
+ if (CollectionUtils .isEmpty (namespaceList .getItems ())) {
90
+ return ;
91
+ }
92
+ for (Namespace n : namespaceList .getItems ()) {
93
+ String namespace = n .getMetadata ().getName ();
94
+ if (!K8sUtils .DHORSE_NAMESPACE .equals (namespace )
95
+ && K8sUtils .getSystemNamspaces ().contains (namespace )) {
96
+ continue ;
97
+ }
98
+ if (!"Active" .equals (n .getStatus ().getPhase ())){
99
+ continue ;
100
+ }
101
+ ConfigMap configMap = client .configMaps ().inNamespace (namespace )
102
+ .withName (K8sUtils .DHORSE_CONFIGMAP_NAME ).get ();
103
+ if (configMap == null ) {
104
+ continue ;
105
+ }
106
+ Set <String > newIp = new HashSet <>();
107
+ String ipStr = configMap .getData ().get (K8sUtils .DHORSE_SERVER_URL_KEY );
108
+ if (!StringUtils .isBlank (ipStr )) {
109
+ String [] ips = ipStr .split ("," );
110
+ //ip格式为:127.0.0.1:8100
111
+ for (String ip : ips ) {
112
+ if (Constants .hostIp ().equals (ip .split (":" )[0 ])) {
113
+ continue ;
114
+ }
115
+ if (!HttpUtils .pingDHorseServer (ip )) {
116
+ continue ;
117
+ }
118
+ newIp .add (ip );
119
+ }
120
+ }
121
+ configMap .setData (Collections .singletonMap (K8sUtils .DHORSE_SERVER_URL_KEY , String .join ("," , newIp )));
122
+ Resource <ConfigMap > resource = client .configMaps ().inNamespace (namespace ).resource (configMap );
123
+ if (newIp .size () == 0 ) {
124
+ resource .delete ();
125
+ }else {
126
+ resource .update ();
127
+ }
128
+ }
129
+ }
130
+
131
+ private static ConfigMap dhorseConfigMap () {
132
+ ConfigMap configMap = new ConfigMap ();
133
+ ObjectMeta meta = new ObjectMeta ();
134
+ meta .setName (K8sUtils .DHORSE_CONFIGMAP_NAME );
135
+ meta .setLabels (Collections .singletonMap (K8sUtils .DHORSE_LABEL_KEY , K8sUtils .DHORSE_CONFIGMAP_NAME ));
136
+ configMap .setMetadata (meta );
137
+ return configMap ;
138
+ }
139
+ }
0 commit comments