Skip to content

Commit

Permalink
bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
dyx1234 committed Oct 19, 2024
1 parent 73581e9 commit c52337c
Show file tree
Hide file tree
Showing 13 changed files with 482 additions and 512 deletions.
2 changes: 1 addition & 1 deletion apollo-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>18.0.0</version>
<version>${client-java.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.ctrip.framework.apollo.Kubernetes;

import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.models.*;
Expand All @@ -25,8 +26,9 @@
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

@Service
public class KubernetesManager {
Expand All @@ -35,20 +37,34 @@ public class KubernetesManager {

private final Logger log = LoggerFactory.getLogger(this.getClass());

@PostConstruct
public void initClient() {
public KubernetesManager() {
try {
client = Config.defaultClient();
Configuration.setDefaultApiClient(client);
coreV1Api = new CoreV1Api(client);

} catch (Exception e) {
String errorMessage = "Failed to initialize Kubernetes client: " + e.getMessage();
log.error(errorMessage, e);
throw new RuntimeException(errorMessage, e);
}
}

public KubernetesManager(CoreV1Api coreV1Api) {
this.coreV1Api = coreV1Api;
}

public V1ConfigMap buildConfigMap(String name, String namespace, Map<String, String> data) {
V1ObjectMeta metadata = new V1ObjectMeta()
.name(name)
.namespace(namespace);

return new V1ConfigMap()
.apiVersion("v1")
.kind("ConfigMap")
.metadata(metadata)
.data(data);
}

/**
* Creates a Kubernetes ConfigMap.
*
Expand All @@ -60,14 +76,12 @@ public void initClient() {
*/
public String createConfigMap(String configMapNamespace, String name, Map<String, String> data) {
if (configMapNamespace == null || configMapNamespace.isEmpty() || name == null || name.isEmpty()) {
log.error("create config map failed due to null or empty parameter: configMapNamespace={}, name={}", configMapNamespace, name);
throw new IllegalArgumentException("ConfigMap namespace and name cannot be null or empty");
log.error("create configmap failed due to null or empty parameter: configMapNamespace={}, name={}", configMapNamespace, name);
}
V1ConfigMap configMap = new V1ConfigMap()
.metadata(new V1ObjectMeta().name(name).namespace(configMapNamespace))
.data(data);
V1ConfigMap configMap = buildConfigMap(name, configMapNamespace, data);
try {
coreV1Api.createNamespacedConfigMap(configMapNamespace, configMap, null, null, null, null);
log.info("ConfigMap created successfully: name: {}, namespace: {}", name, configMapNamespace);
return name;
} catch (Exception e) {
throw new RuntimeException("Failed to create ConfigMap: " + e.getMessage(), e);
Expand All @@ -77,18 +91,16 @@ public String createConfigMap(String configMapNamespace, String name, Map<String
/**
* get value from config map
*
* @param configMapNamespace
* @param configMapNamespace configmap namespace
* @param name config map name (appId)
* @return configMap data(all key-value pairs in config map)
*/
public String loadFromConfigMap(String configMapNamespace, String name) {
if (configMapNamespace == null || configMapNamespace.isEmpty() || name == null || name.isEmpty() ) {
String errorMessage = String.format("Parameters can not be null or empty, configMapNamespace: '%s', name: '%s'", configMapNamespace, name);
log.error(errorMessage);
throw new IllegalArgumentException(errorMessage);
log.error("load configmap failed due to null or empty parameter: configMapNamespace={}, name={}", configMapNamespace, name);
}
try {
log.info("Starting to read ConfigMap: " + name);
log.info("Starting to read ConfigMap: {}", name);
V1ConfigMap configMap = coreV1Api.readNamespacedConfigMap(name, configMapNamespace, null);
if (configMap == null) {
throw new RuntimeException(String.format("ConfigMap does not exist, configMapNamespace: %s, name: %s", configMapNamespace, name));
Expand Down Expand Up @@ -120,10 +132,7 @@ public String getValueFromConfigMap(String configMapNamespace, String name, Stri
}
try {
V1ConfigMap configMap = coreV1Api.readNamespacedConfigMap(name, configMapNamespace, null);
if (configMap == null) {
throw new RuntimeException(String.format("ConfigMap does not exist, configMapNamespace: %s, name: %s", configMapNamespace, name));
}
if (!configMap.getData().containsKey(key)) {
if (!Objects.requireNonNull(configMap.getData()).containsKey(key)) {
throw new RuntimeException(String.format("Specified key not found in ConfigMap: %s, configMapNamespace: %s, name: %s", name, configMapNamespace, name));
}
return configMap.getData().get(key);
Expand All @@ -135,24 +144,45 @@ public String getValueFromConfigMap(String configMapNamespace, String name, Stri
/**
* update config map
*
* @param configMapNamespace
* @param configMapNamespace configmap namespace
* @param name config map name (appId)
* @param data new data
* @return config map name
*/
public String updateConfigMap(String configMapNamespace, String name, Map<String, String> data) {
// TODO 使用client自带的retry机制,设置重试次数,CAS
public boolean updateConfigMap(String configMapNamespace, String name, Map<String, String> data) {
if (configMapNamespace == null || configMapNamespace.isEmpty() || name == null || name.isEmpty() || data == null || data.isEmpty()) {
log.error("Parameters can not be null or empty: configMapNamespace={}, name={}", configMapNamespace, name);
return null;
return false;
}
try {
V1ConfigMap configMap = new V1ConfigMap().metadata(new V1ObjectMeta().name(name).namespace(configMapNamespace)).data(data);
coreV1Api.replaceNamespacedConfigMap(name, configMapNamespace, configMap, null, null, null, "fieldManagerValue");
return name;
} catch (Exception e) {
log.error("update config map failed", e);
return null;

// retry
int maxRetries = 5;
int retryCount = 0;
long waitTime = 100;

while (retryCount < maxRetries) {
try {
V1ConfigMap configmap = coreV1Api.readNamespacedConfigMap(name, configMapNamespace, null);
configmap.setData(data);
coreV1Api.replaceNamespacedConfigMap(name, configMapNamespace, configmap, null, null, null, null);
return true;
} catch (ApiException e) {
if (e.getCode() == 409) {
retryCount++;
log.warn("Conflict occurred, retrying... (" + retryCount + ")");
try {
TimeUnit.MILLISECONDS.sleep(waitTime);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
waitTime = Math.min(waitTime * 2, 1000);
} else {
System.err.println("Error updating ConfigMap: " + e.getMessage());
}
}
}
return retryCount < maxRetries;
}

/**
Expand All @@ -168,10 +198,12 @@ public boolean checkConfigMapExist(String configMapNamespace, String configMapNa
return false;
}
try {
log.info("Check whether ConfigMap exists, configMapName: {}", configMapName);
coreV1Api.readNamespacedConfigMap(configMapName, configMapNamespace, null);
return true;
} catch (Exception e) {
// configmap not exist
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@
import com.ctrip.framework.apollo.tracer.spi.Transaction;
import com.ctrip.framework.apollo.util.ConfigUtil;
import com.ctrip.framework.apollo.util.ExceptionUtil;
import com.ctrip.framework.foundation.internals.provider.DefaultServerProvider;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.slf4j.Logger;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Expand All @@ -50,22 +49,13 @@ public class K8sConfigMapConfigRepository extends AbstractConfigRepository
private final String namespace;
private String configMapName;
private String configMapKey;
private String configMapNamespace;
private final String configMapNamespace;
private final ConfigUtil configUtil;
private final KubernetesManager kubernetesManager;
private volatile Properties configMapProperties;
private volatile DefaultServerProvider serverProvider;
// 上游数据源
private volatile ConfigRepository upstream;
private volatile ConfigSourceType sourceType = ConfigSourceType.CONFIGMAP;

/**
* configmapNamespace 用户配的,不配用默认default
* configmapName appid
* configmap-key cluster+namespace
* configmap-value 配置文件信息的json串
*/

/**
* Constructor
*
Expand All @@ -79,7 +69,6 @@ public K8sConfigMapConfigRepository(String namespace, ConfigRepository upstream)
this.namespace = namespace;
configUtil = ApolloInjector.getInstance(ConfigUtil.class);
kubernetesManager = ApolloInjector.getInstance(KubernetesManager.class);
// 读取,默认为default
configMapNamespace = configUtil.getConfigMapNamespace();

this.setConfigMapKey(configUtil.getCluster(), namespace);
Expand All @@ -89,7 +78,7 @@ public K8sConfigMapConfigRepository(String namespace, ConfigRepository upstream)

void setConfigMapKey(String cluster, String namespace) {
// TODO 兜底key怎么设计不会冲突(cluster初始化时已经设置了层级)
// cluster 就是用户定义>idc>default,所以已经不需要额外层级设置了
// cluster: 用户定义>idc>default,所以已经不需要额外层级设置了
if (StringUtils.isBlank(cluster)) {
configMapKey = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).join("default", namespace);
return;
Expand Down Expand Up @@ -118,11 +107,10 @@ private void checkConfigMapName(String configMapName) {
if (StringUtils.isBlank(configMapName)) {
throw new IllegalArgumentException("ConfigMap name cannot be null");
}
// 判断configMap是否存在,若存在直接返回,若不存在尝试创建
if (kubernetesManager.checkConfigMapExist(configMapNamespace, configMapName)) {
return;
}
// TODO 初步理解这里只创建就可以,后续update事件再写入新值
// Create an empty configmap, write the new value in the update event
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "createK8sConfigMap");
transaction.addData("configMapName", configMapName);
try {
Expand All @@ -142,7 +130,6 @@ private void checkConfigMapName(String configMapName) {
* 1. 从上游成功恢复(开启文件存储)
* 2. 从上游成功恢复(没开启文件存储,从remote)
* 3. 从k8s成功恢复
* 怎么mock k8s客户端coreapi有点卡住
*/
@Override
public Properties getConfig() {
Expand All @@ -151,6 +138,7 @@ public Properties getConfig() {
}
Properties result = propertiesFactory.getPropertiesInstance();
result.putAll(configMapProperties);
logger.info("configmap值:{}", configMapProperties);
return result;
}

Expand Down Expand Up @@ -183,7 +171,7 @@ public ConfigSourceType getSourceType() {
*/
@Override
protected void sync() {
// 链式恢复,先从上游数据源读取
// Chain recovery, first read from upstream data source
boolean syncFromUpstreamResultSuccess = trySyncFromUpstream();

if (syncFromUpstreamResultSuccess) {
Expand Down Expand Up @@ -212,35 +200,24 @@ protected void sync() {
}
}

// 职责明确: manager层进行序列化和解析,把key传进去
public Properties loadFromK8sConfigMap() throws IOException {
Preconditions.checkNotNull(configMapName, "ConfigMap name cannot be null");

Properties properties = null;
try {
// 从ConfigMap获取整个配置信息的JSON字符串
String jsonConfig = kubernetesManager.getValueFromConfigMap(configMapNamespace, configUtil.getAppId(), configMapKey);
if (jsonConfig == null) {
// TODO 待修改,先重试访问idc再default保底
// TODO 重试访问idc,default
jsonConfig = kubernetesManager.getValueFromConfigMap(configMapNamespace, configMapName, Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).join(configUtil, namespace));
}

// 确保获取到的配置信息不为空
if (jsonConfig != null) {
// 解码Base64编码的JSON字符串
jsonConfig = new String(Base64.getDecoder().decode(jsonConfig));
}

// 创建Properties实例
// Convert jsonConfig to properties
properties = propertiesFactory.getPropertiesInstance();

// 使用Gson将JSON字符串转换为Map对象
if (jsonConfig != null && !jsonConfig.isEmpty()) {
Gson gson = new Gson();
Type type = new TypeToken<Map<String, String>>() {
}.getType();
Map<String, String> configMap = gson.fromJson(jsonConfig, type);
// 将Map中的键值对填充到Properties对象中
for (Map.Entry<String, String> entry : configMap.entrySet()) {
properties.setProperty(entry.getKey(), entry.getValue());
}
Expand All @@ -258,7 +235,7 @@ private boolean trySyncFromUpstream() {
return false;
}
try {
// 拉新数据,并将新数据更新到configMap
logger.info("Start sync from the upstream data source, upstream.getConfig:{}, upstream.getSourceType():{}", upstream.getConfig(), upstream.getSourceType());
updateConfigMapProperties(upstream.getConfig(), upstream.getSourceType());
return true;
} catch (Throwable ex) {
Expand All @@ -269,7 +246,6 @@ private boolean trySyncFromUpstream() {
return false;
}

// 从上游数据恢复,并更新configmap
private synchronized void updateConfigMapProperties(Properties newProperties, ConfigSourceType sourceType) {
this.sourceType = sourceType;
if (newProperties.equals(configMapProperties)) {
Expand Down Expand Up @@ -297,20 +273,17 @@ public void onRepositoryChange(String namespace, Properties newProperties) {
}

public void persistConfigMap(Properties properties) {
// 将Properties中的值持久化到configmap中,并使用事务管理
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "persistK8sConfigMap");
transaction.addData("configMapName", configUtil.getAppId());
transaction.addData("configMapNamespace", configUtil.getConfigMapNamespace());
try {
// 使用Gson将properties转换为JSON字符串
// Convert properties to a JSON string using Gson
Gson gson = new Gson();
String jsonConfig = gson.toJson(properties);
String encodedJsonConfig = Base64.getEncoder().encodeToString(jsonConfig.getBytes());
// 创建一个新的HashMap, 将编码后的JSON字符串作为值,业务appId作为键,存入data中
Map<String, String> data = new HashMap<>();
data.put(configUtil.getAppId(), encodedJsonConfig);
data.put(configMapKey, jsonConfig);

// 更新ConfigMap
// update configmap
kubernetesManager.updateConfigMap(configUtil.getConfigMapNamespace(), configUtil.getAppId(), data);
transaction.setStatus(Transaction.SUCCESS);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,9 @@ public ConfigFile createConfigFile(String namespace, ConfigFileFormat configFile
}

ConfigRepository createConfigRepository(String namespace) {
// TODO 本地和configMap允许同时开启
// 已完成,创建configmapRepo时会同时创建一个fileRepo作为上游,相当于同时开启了,路径若未设置用默认
if (m_configUtil.isPropertyKubernetesCacheEnabled()) {
return createConfigMapConfigRepository(namespace);
}else if (m_configUtil.isPropertyFileCacheEnabled()) {
} else if (m_configUtil.isPropertyFileCacheEnabled()) {
return createLocalConfigRepository(namespace);
}
return createRemoteConfigRepository(namespace);
Expand Down
Loading

0 comments on commit c52337c

Please sign in to comment.