Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.0' into 3.0-improve-reference-…
Browse files Browse the repository at this point in the history
…bean-register
  • Loading branch information
kylixs committed Apr 22, 2021
2 parents 689180b + e933387 commit 28b0098
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.dubbo.common.utils.ReflectUtils;

import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.lang.ref.SoftReference;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
Expand Down Expand Up @@ -50,6 +50,8 @@ public Object invoke(Object proxy, Method method, Object[] args) {
private static final AtomicLong PROXY_CLASS_COUNTER = new AtomicLong(0);
private static final String PACKAGE_NAME = Proxy.class.getPackage().getName();
private static final Map<ClassLoader, Map<String, Object>> PROXY_CACHE_MAP = new WeakHashMap<ClassLoader, Map<String, Object>>();
// cache class, avoid PermGen OOM.
private static final Map<ClassLoader, Map<String, Object>> PROXY_CLASS_MAP = new WeakHashMap<ClassLoader, Map<String, Object>>();

private static final Object PENDING_GENERATION_MARKER = new Object();

Expand Down Expand Up @@ -103,8 +105,11 @@ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {

// get cache by class loader.
final Map<String, Object> cache;
// cache class
final Map<String, Object> classCache;
synchronized (PROXY_CACHE_MAP) {
cache = PROXY_CACHE_MAP.computeIfAbsent(cl, k -> new HashMap<>());
classCache = PROXY_CLASS_MAP.computeIfAbsent(cl, k -> new HashMap<>());
}

Proxy proxy = null;
Expand All @@ -118,14 +123,38 @@ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
}
}

if (value == PENDING_GENERATION_MARKER) {
try {
cache.wait();
} catch (InterruptedException e) {
// get Class by key.
Object clazzObj = classCache.get(key);
if (null == clazzObj || clazzObj instanceof Reference<?>) {
Class<?> clazz = null;
if (clazzObj instanceof Reference<?>) {
clazz = (Class<?>) ((Reference<?>) clazzObj).get();
}

if (null == clazz) {
if (value == PENDING_GENERATION_MARKER) {
try {
cache.wait();
} catch (InterruptedException e) {
}
} else {
cache.put(key, PENDING_GENERATION_MARKER);
break;
}
} else {
try {
proxy = (Proxy) clazz.newInstance();
return proxy;
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
} finally {
if (null == proxy) {
cache.remove(key);
} else {
cache.put(key, new SoftReference<>(proxy));
}
}
}
} else {
cache.put(key, PENDING_GENERATION_MARKER);
break;
}
}
while (true);
Expand Down Expand Up @@ -190,7 +219,7 @@ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
ccp.setClassName(pcn);
ccp.addField("public static java.lang.reflect.Method[] methods;");
ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
ccp.addConstructor(Modifier.PUBLIC, new Class<?>[] {InvocationHandler.class}, new Class<?>[0], "handler=$1;");
ccp.addDefaultConstructor();
Class<?> clazz = ccp.toClass();
clazz.getField("methods").set(null, methods.toArray(new Method[0]));
Expand All @@ -204,6 +233,10 @@ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
Class<?> pc = ccm.toClass();
proxy = (Proxy) pc.newInstance();

synchronized (classCache) {
classCache.put(key, new SoftReference<Class<?>>(pc));
}
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
Expand All @@ -220,7 +253,7 @@ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
if (proxy == null) {
cache.remove(key);
} else {
cache.put(key, new WeakReference<Proxy>(proxy));
cache.put(key, new SoftReference<Proxy>(proxy));
}
cache.notifyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,19 @@ public interface RegistryConstants {

String REGISTRY_PUBLISH_INSTANCE_KEY = "publish-instance";

String REGISTER_MODE_KEY = "register-mode";

String DUBBO_REGISTER_MODE_DEFAULT_KEY = "dubbo.application.register-mode";

String DUBBO_PUBLISH_INTERFACE_DEFAULT_KEY = "dubbo.application.publish-interface";

String DUBBO_PUBLISH_INSTANCE_DEFAULT_KEY = "dubbo.application.publish-instance";

String DEFAULT_REGISTER_MODE_INTERFACE = "interface";

String DEFAULT_REGISTER_MODE_INSTANCE = "instance";

String DEFAULT_REGISTER_MODE_ALL = "all";
/**
* The parameter key of Dubbo Registry type
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private String createServiceKey(String serviceName, String serviceVersion, int p
}

buf.append(serviceName);
if (StringUtils.isNotEmpty(serviceVersion) && !"0.0.0".equals(serviceVersion)) {
if (StringUtils.isNotEmpty(serviceVersion) && !"0.0.0".equals(serviceVersion) && !"*".equals(serviceVersion)) {
buf.append(':').append(serviceVersion);
}
buf.append(':').append(port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,13 @@
import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.USERNAME_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.DUBBO_PUBLISH_INSTANCE_DEFAULT_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.DUBBO_PUBLISH_INTERFACE_DEFAULT_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_REGISTER_MODE_ALL;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_REGISTER_MODE_INSTANCE;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_REGISTER_MODE_INTERFACE;
import static org.apache.dubbo.common.constants.RegistryConstants.DUBBO_REGISTER_MODE_DEFAULT_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTER_MODE_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PUBLISH_INSTANCE_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PUBLISH_INTERFACE_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_PROTOCOL;
import static org.apache.dubbo.common.constants.RemotingConstants.BACKUP_KEY;
Expand Down Expand Up @@ -214,28 +215,36 @@ private static List<URL> genCompatibleRegistries(List<URL> registryList, boolean
List<URL> result = new ArrayList<>(registryList.size());
registryList.forEach(registryURL -> {
if (provider) {
boolean publishInterface = registryURL.getParameter(REGISTRY_PUBLISH_INTERFACE_KEY, ConfigurationUtils.getDynamicGlobalConfiguration().getBoolean(DUBBO_PUBLISH_INTERFACE_DEFAULT_KEY, true));
// for registries enabled service discovery, automatically register interface compatible addresses.
if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
String registerMode = registryURL.getParameter(REGISTER_MODE_KEY, ConfigurationUtils.getDynamicGlobalConfiguration().getString(DUBBO_REGISTER_MODE_DEFAULT_KEY, DEFAULT_REGISTER_MODE_INSTANCE));
if (!isValidRegisterMode(registerMode)) {
registerMode = DEFAULT_REGISTER_MODE_INSTANCE;
}
result.add(registryURL);
if (publishInterface && registryNotExists(registryURL, registryList, REGISTRY_PROTOCOL)) {
if (DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode)
&& registryNotExists(registryURL, registryList, REGISTRY_PROTOCOL)) {
URL interfaceCompatibleRegistryURL = URLBuilder.from(registryURL)
.setProtocol(REGISTRY_PROTOCOL)
.removeParameter(REGISTRY_TYPE_KEY)
.build();
result.add(interfaceCompatibleRegistryURL);
}
} else {
boolean publishInstance = registryURL.getParameter(REGISTRY_PUBLISH_INSTANCE_KEY, ConfigurationUtils.getDynamicGlobalConfiguration().getBoolean(DUBBO_PUBLISH_INSTANCE_DEFAULT_KEY, true));
if (registryNotExists(registryURL, registryList, SERVICE_REGISTRY_PROTOCOL)
&& publishInstance) {
String registerMode = registryURL.getParameter(REGISTER_MODE_KEY, ConfigurationUtils.getDynamicGlobalConfiguration().getString(DUBBO_REGISTER_MODE_DEFAULT_KEY, DEFAULT_REGISTER_MODE_INTERFACE));
if (!isValidRegisterMode(registerMode)) {
registerMode = DEFAULT_REGISTER_MODE_INTERFACE;
}
if ((DEFAULT_REGISTER_MODE_INSTANCE.equalsIgnoreCase(registerMode) || DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode))
&& registryNotExists(registryURL, registryList, SERVICE_REGISTRY_PROTOCOL)) {
URL serviceDiscoveryRegistryURL = URLBuilder.from(registryURL)
.setProtocol(SERVICE_REGISTRY_PROTOCOL)
.removeParameter(REGISTRY_TYPE_KEY)
.build();
result.add(serviceDiscoveryRegistryURL);
}
if (publishInterface) {

if (DEFAULT_REGISTER_MODE_INTERFACE.equalsIgnoreCase(registerMode) || DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode)) {
result.add(registryURL);
}
}
Expand All @@ -246,6 +255,14 @@ private static List<URL> genCompatibleRegistries(List<URL> registryList, boolean
return result;
}

private static boolean isValidRegisterMode(String mode) {
return StringUtils.isNotEmpty(mode)
&& (DEFAULT_REGISTER_MODE_INTERFACE.equalsIgnoreCase(mode)
|| DEFAULT_REGISTER_MODE_INSTANCE.equalsIgnoreCase(mode)
|| DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(mode)
);
}

private static boolean registryNotExists(URL registryURL, List<URL> registryList, String registryType) {
return registryList.stream().noneMatch(
url -> registryType.equals(url.getProtocol()) && registryURL.getBackupAddress().equals(url.getBackupAddress())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
service-mapping=org.apache.dubbo.config.event.listener.ServiceNameMappingListener
config-logging=org.apache.dubbo.config.event.listener.LoggingEventListener
2 changes: 1 addition & 1 deletion dubbo-dependencies-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@
<eureka.version>1.9.12</eureka.version>

<!-- Fabric8 for Kubernetes -->
<fabric8_kubernetes_version>4.10.3</fabric8_kubernetes_version>
<fabric8_kubernetes_version>5.3.0</fabric8_kubernetes_version>

<!-- Alibaba -->
<alibaba_spring_context_support_version>1.0.8</alibaba_spring_context_support_version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,9 @@ protected boolean hasEmptyMetadata(Map<String, MetadataInfo> revisionToMetadata)
protected MetadataInfo getRemoteMetadata(ServiceInstance instance, String revision, Map<ServiceInfo, Set<String>> localServiceToRevisions, List<ServiceInstance> subInstances) {
MetadataInfo metadata = revisionToMetadata.get(revision);

if (metadata != null && metadata != MetadataInfo.EMPTY) {
logger.info("MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision + "&cluster=" + instance.getRegistryCluster() + ", " + metadata);
}

if (metadata == null
|| (metadata == MetadataInfo.EMPTY && (failureCounter.get() < 3 || (System.currentTimeMillis() - lastFailureTime > 10000)))) {
metadata = getMetadataInfo(instance);

if (metadata != MetadataInfo.EMPTY) {
failureCounter.set(0);
revisionToMetadata.putIfAbsent(revision, metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@
import io.fabric8.kubernetes.api.model.EndpointSubset;
import io.fabric8.kubernetes.api.model.Endpoints;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;

import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -123,11 +124,12 @@ public void register(ServiceInstance serviceInstance) throws RuntimeException {
.pods()
.inNamespace(namespace)
.withName(currentHostname)
.edit()
.editOrNewMetadata()
.addToAnnotations(KUBERNETES_PROPERTIES_KEY, JSONObject.toJSONString(serviceInstance.getMetadata()))
.endMetadata()
.done();
.edit(pod->
new PodBuilder(pod)
.editOrNewMetadata()
.addToAnnotations(KUBERNETES_PROPERTIES_KEY, JSONObject.toJSONString(serviceInstance.getMetadata()))
.endMetadata()
.build());
if (logger.isInfoEnabled()) {
logger.info("Write Current Service Instance Metadata to Kubernetes pod. " +
"Current pod name: " + currentHostname);
Expand All @@ -149,11 +151,12 @@ public void unregister(ServiceInstance serviceInstance) throws RuntimeException
.pods()
.inNamespace(namespace)
.withName(currentHostname)
.edit()
.editOrNewMetadata()
.removeFromAnnotations(KUBERNETES_PROPERTIES_KEY)
.endMetadata()
.done();
.edit(pod ->
new PodBuilder(pod)
.editOrNewMetadata()
.removeFromAnnotations(KUBERNETES_PROPERTIES_KEY)
.endMetadata()
.build());
if (logger.isInfoEnabled()) {
logger.info("Remove Current Service Instance from Kubernetes pod. Current pod name: " + currentHostname);
}
Expand Down Expand Up @@ -222,7 +225,7 @@ public void eventReceived(Action action, Endpoints resource) {
}

@Override
public void onClose(KubernetesClientException cause) {
public void onClose(WatcherException cause) {
// ignore
}
});
Expand Down Expand Up @@ -253,7 +256,7 @@ public void eventReceived(Action action, Pod resource) {
}

@Override
public void onClose(KubernetesClientException cause) {
public void onClose(WatcherException cause) {
// ignore
}
});
Expand Down Expand Up @@ -284,7 +287,7 @@ public void eventReceived(Action action, Service resource) {
}

@Override
public void onClose(KubernetesClientException cause) {
public void onClose(WatcherException cause) {
// ignore
}
});
Expand Down
Loading

0 comments on commit 28b0098

Please sign in to comment.