Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support incremental configuration synchronization client #90

Merged
merged 25 commits into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f654cb3
feat: support incremental configuration synchronization client
jackie-coming Nov 26, 2024
44b70fa
Merge remote-tracking branch 'upstream/main' into feautre/Incremental…
jackie-coming Dec 8, 2024
6859fe1
code format
jackie-coming Dec 8, 2024
9842289
add Unknown sync mode
jackie-coming Dec 14, 2024
b454698
code format
jackie-coming Dec 20, 2024
89321ac
code format
jackie-coming Dec 21, 2024
08e2705
code format
jackie-coming Dec 21, 2024
69f4677
code format
jackie-coming Dec 21, 2024
121317b
code format
jackie-coming Dec 21, 2024
d489886
更新 RemoteConfigRepository.java
jackie-coming Dec 22, 2024
4e9e146
更新 ApolloConfig.java
jackie-coming Dec 22, 2024
545c074
更新 ApolloConfig.java
jackie-coming Dec 22, 2024
07b3b13
更新 ConfigurationChange.java
jackie-coming Dec 22, 2024
be92b58
更新 ConfigSyncType.java
jackie-coming Dec 22, 2024
201b782
更新 ConfigSyncType.java
jackie-coming Dec 22, 2024
eb737cf
更新 ConfigSyncType.java
jackie-coming Dec 22, 2024
18e2139
更新 ConfigurationChange.java
jackie-coming Dec 22, 2024
81661cf
更新 ConfigurationChangeType.java
jackie-coming Dec 22, 2024
7d1ffd7
更新 ConfigurationChangeTypeUtils.java
jackie-coming Dec 22, 2024
a5a47e4
更新 ConfigSyncType.java
jackie-coming Dec 22, 2024
66aae85
code format
jackie-coming Dec 22, 2024
8cf6c6d
code format
jackie-coming Dec 22, 2024
0f7d2a3
add UnknownSync action
jackie-coming Dec 22, 2024
ee7e05a
Apply suggestions from code review
nobodyiam Dec 23, 2024
d3abb56
Merge branch 'main' into feautre/IncrementalSync
nobodyiam Dec 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfig;
import com.ctrip.framework.apollo.core.dto.ApolloNotificationMessages;
import com.ctrip.framework.apollo.core.dto.ConfigurationChange;
import com.ctrip.framework.apollo.core.dto.ServiceDTO;
import com.ctrip.framework.apollo.core.enums.ConfigSyncType;
import com.ctrip.framework.apollo.core.enums.ConfigurationChangeType;
import com.ctrip.framework.apollo.core.schedule.ExponentialSchedulePolicy;
import com.ctrip.framework.apollo.core.schedule.SchedulePolicy;
import com.ctrip.framework.apollo.core.signature.Signature;
Expand All @@ -49,6 +52,7 @@
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -63,7 +67,9 @@
* @author Jason Song([email protected])
*/
public class RemoteConfigRepository extends AbstractConfigRepository {
private static final Logger logger = DeferredLoggerFactory.getLogger(RemoteConfigRepository.class);

private static final Logger logger = DeferredLoggerFactory.getLogger(
RemoteConfigRepository.class);
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
private static final Joiner.MapJoiner MAP_JOINER = Joiner.on("&").withKeyValueSeparator("=");
private static final Escaper pathEscaper = UrlEscapers.urlPathSegmentEscaper();
Expand Down Expand Up @@ -92,7 +98,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
/**
* Constructor.
*
* @param appId the appId
* @param appId the appId
* @param namespace the namespace
*/
public RemoteConfigRepository(String appId, String namespace) {
Expand All @@ -107,7 +113,8 @@ public RemoteConfigRepository(String appId, String namespace) {
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(
m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh();
Expand All @@ -118,7 +125,7 @@ public Properties getConfig() {
if (m_configCache.get() == null) {
long start = System.currentTimeMillis();
this.sync();
Tracer.logEvent(APOLLO_CLIENT_NAMESPACE_FIRST_LOAD_SPEND+":"+m_namespace,
Tracer.logEvent(APOLLO_CLIENT_NAMESPACE_FIRST_LOAD_SPEND + ":" + m_namespace,
String.valueOf(System.currentTimeMillis() - start));
}
return transformApolloConfigToProperties(m_configCache.get());
Expand All @@ -141,7 +148,8 @@ private void schedulePeriodicRefresh() {
new Runnable() {
@Override
public void run() {
Tracer.logEvent(APOLLO_CONFIGSERVICE, String.format("periodicRefresh: %s", m_namespace));
Tracer.logEvent(APOLLO_CONFIGSERVICE,
String.format("periodicRefresh: %s", m_namespace));
logger.debug("refresh config for namespace: {}", m_namespace);
trySync();
Tracer.logEvent(APOLLO_CLIENT_VERSION, Apollo.VERSION);
Expand All @@ -166,7 +174,7 @@ protected synchronized void sync() {
}

if (current != null) {
Tracer.logEvent(String.format(APOLLO_CLIENT_CONFIGS+"%s", current.getNamespaceName()),
Tracer.logEvent(String.format(APOLLO_CLIENT_CONFIGS + "%s", current.getNamespaceName()),
current.getReleaseKey());
}

Expand Down Expand Up @@ -217,7 +225,8 @@ private ApolloConfig loadApolloConfig() {
if (onErrorSleepTime > 0) {
logger.warn(
"Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);
onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster,
m_namespace);

try {
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
Expand All @@ -227,7 +236,7 @@ private ApolloConfig loadApolloConfig() {
}

url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
dataCenter, m_remoteMessages.get(), m_configCache.get());
dataCenter, m_remoteMessages.get(), m_configCache.get());

logger.debug("Loading config from {}", url);

Expand Down Expand Up @@ -255,6 +264,23 @@ private ApolloConfig loadApolloConfig() {

ApolloConfig result = response.getBody();

if (result != null) {

ConfigSyncType configSyncType = ConfigSyncType.fromString(result.getConfigSyncType());

if (configSyncType == ConfigSyncType.INCREMENTAL_SYNC) {
nobodyiam marked this conversation as resolved.
Show resolved Hide resolved

ApolloConfig previousConfig = m_configCache.get();

Map<String, String> previousConfigurations =
(previousConfig != null) ? previousConfig.getConfigurations() : null;

result.setConfigurations(
mergeConfigurations(previousConfigurations, result.getConfigurationChanges()));

jackie-coming marked this conversation as resolved.
Show resolved Hide resolved
jackie-coming marked this conversation as resolved.
Show resolved Hide resolved
}
}

logger.debug("Loaded config for {}: {}", m_namespace, result);

return result;
Expand All @@ -268,13 +294,14 @@ private ApolloConfig loadApolloConfig() {
appId, cluster, m_namespace);
statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),
message);
Tracer.logEvent(APOLLO_CLIENT_NAMESPACE_NOT_FOUND,m_namespace);
Tracer.logEvent(APOLLO_CLIENT_NAMESPACE_NOT_FOUND, m_namespace);

}
Tracer.logEvent(APOLLO_CONFIG_EXCEPTION, ExceptionUtil.getDetailMessage(statusCodeException));
Tracer.logEvent(APOLLO_CONFIG_EXCEPTION,
ExceptionUtil.getDetailMessage(statusCodeException));
transaction.setStatus(statusCodeException);
exception = statusCodeException;
if(ex.getStatusCode() == 404) {
if (ex.getStatusCode() == 404) {
break retryLoopLabel;
}
} catch (Throwable ex) {
Expand All @@ -298,7 +325,7 @@ private ApolloConfig loadApolloConfig() {
}

String assembleQueryConfigUrl(String uri, String appId, String cluster, String namespace,
String dataCenter, ApolloNotificationMessages remoteMessages, ApolloConfig previousConfig) {
String dataCenter, ApolloNotificationMessages remoteMessages, ApolloConfig previousConfig) {

String path = "configs/%s/%s/%s";
List<String> pathParams =
Expand Down Expand Up @@ -343,7 +370,8 @@ private void scheduleLongPollingRefresh() {
remoteConfigLongPollService.submit(m_appId, m_namespace, this);
}

public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto,
ApolloNotificationMessages remoteMessages) {
m_longPollServiceDto.set(longPollNotifiedServiceDto);
m_remoteMessages.set(remoteMessages);
m_executorService.submit(new Runnable() {
Expand All @@ -363,4 +391,34 @@ private List<ServiceDTO> getConfigServices() {

return services;
}

Map<String, String> mergeConfigurations(Map<String, String> previousConfigurations,
List<ConfigurationChange> configurationChanges) {
Map<String, String> newConfigurations = new HashMap<>();

if (previousConfigurations != null) {
newConfigurations = Maps.newHashMap(previousConfigurations);
}

if (configurationChanges == null) {
return newConfigurations;
}

for (ConfigurationChange change : configurationChanges) {
switch (ConfigurationChangeType.fromString(change.getConfigurationChangeType())) {
case ADDED:
case MODIFIED:
newConfigurations.put(change.getKey(), change.getNewValue());
break;
case DELETED:
newConfigurations.remove(change.getKey());
break;
default:
//do nothing
break;
}
}

return newConfigurations;
}
}
Loading
Loading