From 644e3c24ca7da483e24db053f78922cd1863e954 Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Wed, 17 Mar 2021 00:03:09 +0800 Subject: [PATCH] [3.0] fix service discovery implementation and introduce ClusterFiIlter (#7388) --- .../apache/dubbo/rpc/cluster/Directory.java | 4 + .../rpc/cluster/filter/ClusterFilter.java | 24 +++ .../filter/DefaultFilterChainBuilder.java | 10 +- .../cluster/filter/FilterChainBuilder.java | 36 +++- .../cluster/filter/ProtocolFilterWrapper.java | 9 +- .../support}/ConsumerContextFilter.java | 26 ++- .../support/ZoneAwareFilter.java} | 19 +- .../interceptor/ClusterInterceptor.java | 1 + .../ConsumerContextClusterInterceptor.java | 60 ------ .../support/wrapper/AbstractCluster.java | 190 ++++++++++-------- ...che.dubbo.rpc.cluster.filter.ClusterFilter | 2 + ...rpc.cluster.interceptor.ClusterInterceptor | 2 - dubbo-common/pom.xml | 4 + .../org/apache/dubbo/common/URLStrParser.java | 9 +- .../dubbo/common/config/Environment.java | 20 ++ .../common/constants/CommonConstants.java | 6 + .../common/url/component/URLAddress.java | 2 +- .../common/url/component/URLItemCache.java | 44 ++-- .../dubbo/common/url/component/URLParam.java | 4 +- .../dubbo/common/utils/ConfigUtils.java | 47 +++++ .../apache/dubbo/config/AbstractConfig.java | 3 +- .../org/apache/dubbo/config/Constants.java | 2 + .../config/bootstrap/DubboBootstrap.java | 3 +- .../ServiceInstanceHostPortCustomizer.java | 5 +- .../dubbo/config/AbstractConfigTest.java | 4 +- .../src/main/resources/dubbo-migration.yaml | 3 + dubbo-dependencies-bom/pom.xml | 6 + dubbo-distribution/dubbo-all/pom.xml | 8 + .../apache/dubbo/metadata/MetadataInfo.java | 4 + .../dubbo/monitor/support/MonitorFilter.java | 3 +- .../dubbo/auth/filter/ConsumerSignFilter.java | 2 +- .../apache/dubbo/registry/NotifyListener.java | 4 + .../client/DefaultServiceInstance.java | 58 +++--- .../EventPublishingServiceDiscovery.java | 15 ++ .../client/FileSystemServiceDiscovery.java | 2 +- .../client/SelfHostMetaServiceDiscovery.java | 2 +- .../registry/client/ServiceDiscovery.java | 5 + .../client/ServiceDiscoveryRegistry.java | 2 +- .../ServiceDiscoveryRegistryDirectory.java | 5 +- .../registry/client/ServiceInstance.java | 13 +- .../ServiceInstancesChangedListener.java | 117 ++++++----- .../client/metadata/MetadataUtils.java | 2 +- .../store/RemoteMetadataServiceImpl.java | 2 +- .../DefaultMigrationAddressComparator.java | 4 +- .../client/migration/MigrationInvoker.java | 89 ++++---- .../migration/MigrationRuleHandler.java | 4 +- .../migration/MigrationRuleListener.java | 8 +- .../integration/DynamicDirectory.java | 8 +- .../client/DefaultServiceInstanceTest.java | 3 +- .../FileSystemServiceDiscoveryTest.java | 2 + .../multiple/MultipleServiceDiscovery.java | 38 ++-- .../nacos/util/NacosNamingServiceUtils.java | 4 +- .../zookeeper/ZookeeperServiceDiscovery.java | 34 +++- ...ookeeperServiceDiscoveryChangeWatcher.java | 38 +++- .../zookeeper/util/CuratorFrameworkUtils.java | 2 +- .../ZookeeperServiceDiscoveryTest.java | 3 +- .../java/org/apache/dubbo/rpc/BaseFilter.java | 31 +++ .../java/org/apache/dubbo/rpc/Filter.java | 40 ++-- .../dubbo/rpc/protocol/AbstractInvoker.java | 2 +- .../internal/org.apache.dubbo.rpc.Filter | 2 - .../protocol/dubbo/filter/FutureFilter.java | 4 +- .../internal/org.apache.dubbo.rpc.Filter | 3 +- ...che.dubbo.rpc.cluster.filter.ClusterFilter | 1 + .../rpc/protocol/dubbo/FutureFilterTest.java | 4 +- 64 files changed, 688 insertions(+), 425 deletions(-) create mode 100644 dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ClusterFilter.java rename {dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter => dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support}/ConsumerContextFilter.java (83%) rename dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/{interceptor/ZoneAwareClusterInterceptor.java => filter/support/ZoneAwareFilter.java} (80%) delete mode 100644 dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java create mode 100644 dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter delete mode 100644 dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor create mode 100644 dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/dubbo-migration.yaml create mode 100644 dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java create mode 100644 dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java index 9fd3ca2ae0a..5a92d97814c 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java @@ -65,4 +65,8 @@ default boolean isServiceDiscovery() { void discordAddresses(); RouterChain getRouterChain(); + + default boolean isNotificationReceived() { + return false; + } } \ No newline at end of file diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ClusterFilter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ClusterFilter.java new file mode 100644 index 00000000000..7d48dc90309 --- /dev/null +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ClusterFilter.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.filter; + +import org.apache.dubbo.common.extension.SPI; +import org.apache.dubbo.rpc.BaseFilter; + +@SPI +public interface ClusterFilter extends BaseFilter { +} diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java index 982008ba453..e2e26d2a00d 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java @@ -27,6 +27,9 @@ @Activate(order = 0) public class DefaultFilterChainBuilder implements FilterChainBuilder { + /** + * build consumer/provider filter chain + */ @Override public Invoker buildInvokerChain(final Invoker originalInvoker, String key, String group) { Invoker last = originalInvoker; @@ -43,14 +46,17 @@ public Invoker buildInvokerChain(final Invoker originalInvoker, String return last; } + /** + * build consumer cluster filter chain + */ @Override public ClusterInvoker buildClusterInvokerChain(final ClusterInvoker originalInvoker, String key, String group) { ClusterInvoker last = originalInvoker; - List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(), key, group); + List filters = ExtensionLoader.getExtensionLoader(ClusterFilter.class).getActivateExtension(originalInvoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { - final Filter filter = filters.get(i); + final ClusterFilter filter = filters.get(i); final Invoker next = last; last = new ClusterFilterChainNode<>(originalInvoker, next, filter); } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java index 20275e27af0..949a66c5c48 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java @@ -18,6 +18,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.SPI; +import org.apache.dubbo.rpc.BaseFilter; import org.apache.dubbo.rpc.Filter; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; @@ -29,16 +30,27 @@ @SPI("default") public interface FilterChainBuilder { + /** + * build consumer/provider filter chain + */ Invoker buildInvokerChain(final Invoker invoker, String key, String group); + /** + * build consumer cluster filter chain + */ ClusterInvoker buildClusterInvokerChain(final ClusterInvoker invoker, String key, String group); - class FilterChainNode> implements Invoker{ + /** + * Works on provider side + * @param + * @param + */ + class FilterChainNode, FILTER extends BaseFilter> implements Invoker{ TYPE originalInvoker; Invoker nextNode; - Filter filter; + FILTER filter; - public FilterChainNode(TYPE originalInvoker, Invoker nextNode, Filter filter) { + public FilterChainNode(TYPE originalInvoker, Invoker nextNode, FILTER filter) { this.originalInvoker = originalInvoker; this.nextNode = nextNode; this.filter = filter; @@ -79,8 +91,8 @@ public Result invoke(Invocation invocation) throws RpcException { } finally { listenableFilter.removeListener(invocation); } - } else if (filter instanceof Filter.Listener) { - Filter.Listener listener = (Filter.Listener) filter; + } else if (filter instanceof FILTER.Listener) { + FILTER.Listener listener = (FILTER.Listener) filter; listener.onError(e, originalInvoker, invocation); } throw e; @@ -102,8 +114,8 @@ public Result invoke(Invocation invocation) throws RpcException { } finally { listenableFilter.removeListener(invocation); } - } else if (filter instanceof Filter.Listener) { - Filter.Listener listener = (Filter.Listener) filter; + } else if (filter instanceof FILTER.Listener) { + FILTER.Listener listener = (FILTER.Listener) filter; if (t == null) { listener.onResponse(r, originalInvoker, invocation); } else { @@ -124,8 +136,14 @@ public String toString() { } } - class ClusterFilterChainNode> extends FilterChainNode implements ClusterInvoker { - public ClusterFilterChainNode(TYPE originalInvoker, Invoker nextNode, Filter filter) { + /** + * Works on consumer side + * @param + * @param + */ + class ClusterFilterChainNode, FILTER extends BaseFilter> + extends FilterChainNode implements ClusterInvoker { + public ClusterFilterChainNode(TYPE originalInvoker, Invoker nextNode, FILTER filter) { super(originalInvoker, nextNode, filter); } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java index 65e8813a4cc..389b173ba68 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java @@ -28,10 +28,9 @@ import org.apache.dubbo.rpc.RpcException; import java.util.List; -import java.util.Objects; +import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY; import static org.apache.dubbo.common.constants.CommonConstants.SERVICE_FILTER_KEY; -import static org.apache.dubbo.rpc.cluster.Constants.PEER_KEY; /** * ListenerProtocol @@ -68,11 +67,7 @@ public Invoker refer(Class type, URL url) throws RpcException { if (UrlUtils.isRegistry(url)) { return protocol.refer(type, url); } - // if it's peer-to-peer url - if (!Objects.isNull(url.getAttribute(PEER_KEY))) { - return builder.buildInvokerChain(protocol.refer(type, url), SERVICE_FILTER_KEY, CommonConstants.CONSUMER); - } - return protocol.refer(type, url); + return builder.buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER); } @Override diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java similarity index 83% rename from dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java rename to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java index 0fead6b37a1..4a1ed3bffca 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.rpc.filter; +package org.apache.dubbo.rpc.cluster.filter.support; import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.common.utils.CollectionUtils; @@ -28,6 +28,7 @@ import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.TimeoutCountDown; +import org.apache.dubbo.rpc.cluster.filter.ClusterFilter; import java.util.Map; @@ -39,11 +40,11 @@ * ConsumerContextFilter set current RpcContext with invoker,invocation, local host, remote host and port * for consumer invoker.It does it to make the requires info available to execution thread's RpcContext. * - * @see org.apache.dubbo.rpc.Filter + * @see Filter * @see RpcContext */ @Activate(group = CONSUMER, order = -10000) -public class ConsumerContextFilter implements Filter { +public class ConsumerContextFilter implements ClusterFilter, ClusterFilter.Listener { @Override public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { @@ -76,7 +77,24 @@ public Result invoke(Invoker invoker, Invocation invocation) throws RpcExcept + invocation.getMethodName() + ", terminate directly."), invocation); } } - return invoker.invoke(invocation); + + try { + RpcContext.removeServerContext(); + return invoker.invoke(invocation); + } finally { + RpcContext.removeContext(); + } + } + + @Override + public void onResponse(Result appResponse, Invoker invoker, Invocation invocation) { + // pass attachments to result + RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments()); + } + + @Override + public void onError(Throwable t, Invoker invoker, Invocation invocation) { + } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ZoneAwareFilter.java similarity index 80% rename from dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java rename to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ZoneAwareFilter.java index 6daec08d10a..cd0a7ab6553 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ZoneAwareFilter.java @@ -14,15 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.rpc.cluster.interceptor; +package org.apache.dubbo.rpc.cluster.filter.support; +import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcContext; +import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.ZoneDetector; -import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker; +import org.apache.dubbo.rpc.cluster.filter.ClusterFilter; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE_FORCE; @@ -32,11 +36,11 @@ * * active only when url has key 'cluster=zone-aware' */ -@Activate(value = "cluster:zone-aware") -public class ZoneAwareClusterInterceptor implements ClusterInterceptor { +@Activate(group = CommonConstants.CONSUMER, value = "cluster:zone-aware") +public class ZoneAwareFilter implements ClusterFilter { @Override - public void before(AbstractClusterInvoker clusterInvoker, Invocation invocation) { + public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { RpcContext rpcContext = RpcContext.getContext(); String zone = (String) rpcContext.getAttachment(REGISTRY_ZONE); String force = (String) rpcContext.getAttachment(REGISTRY_ZONE_FORCE); @@ -53,10 +57,7 @@ public void before(AbstractClusterInvoker clusterInvoker, Invocation invocati if (StringUtils.isNotEmpty(force)) { invocation.setAttachment(REGISTRY_ZONE_FORCE, force); } - } - - @Override - public void after(AbstractClusterInvoker clusterInvoker, Invocation invocation) { + return invoker.invoke(invocation); } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java index 199361f6add..821dd2e8903 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java @@ -26,6 +26,7 @@ /** * Different from {@link Filter}, ClusterInterceptor works at the outmost layer, before one specific address/invoker is picked. */ +@Deprecated @SPI public interface ClusterInterceptor { diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java deleted file mode 100644 index 053bc87c21f..00000000000 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.rpc.cluster.interceptor; - -import org.apache.dubbo.common.extension.Activate; -import org.apache.dubbo.common.utils.NetUtils; -import org.apache.dubbo.rpc.Invocation; -import org.apache.dubbo.rpc.Result; -import org.apache.dubbo.rpc.RpcContext; -import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.RpcInvocation; -import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker; - -@Activate -public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener { - - @Override - public void before(AbstractClusterInvoker invoker, Invocation invocation) { - RpcContext context = RpcContext.getContext(); - context.setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0); - if (invocation instanceof RpcInvocation) { - ((RpcInvocation) invocation).setInvoker(invoker); - } - RpcContext.removeServerContext(); - } - - @Override - public void after(AbstractClusterInvoker clusterInvoker, Invocation invocation) { - RpcContext.removeContext(true); - } - - @Override - public Result intercept(AbstractClusterInvoker clusterInvoker, Invocation invocation) throws RpcException { - return clusterInvoker.invoke(invocation); - } - - @Override - public void onMessage(Result appResponse, AbstractClusterInvoker invoker, Invocation invocation) { - RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments()); - } - - @Override - public void onError(Throwable t, AbstractClusterInvoker invoker, Invocation invocation) { - - } -} diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java index 9ce920ddbe0..1c2d047f78b 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java @@ -17,6 +17,7 @@ package org.apache.dubbo.rpc.cluster.support.wrapper; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.config.ConfigurationUtils; import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.utils.CollectionUtils; @@ -36,6 +37,7 @@ import java.util.List; +import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_INTERCEPTOR_COMPATIBLE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.INVOCATION_INTERCEPTOR_KEY; import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY; import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_INTERCEPTOR_KEY; @@ -44,15 +46,10 @@ public abstract class AbstractCluster implements Cluster { private Invoker buildClusterInterceptors(AbstractClusterInvoker clusterInvoker, String key) { // AbstractClusterInvoker last = clusterInvoker; - AbstractClusterInvoker last = buildInterceptorInvoker(new FilterInvoker<>(clusterInvoker)); - List interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtensions(); + AbstractClusterInvoker last = buildInterceptorInvoker(new ClusterFilterInvoker<>(clusterInvoker)); - if (!interceptors.isEmpty()) { - for (int i = interceptors.size() - 1; i >= 0; i--) { - final ClusterInterceptor interceptor = interceptors.get(i); - final AbstractClusterInvoker next = last; - last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next); - } + if (Boolean.parseBoolean(ConfigurationUtils.getProperty(CLUSTER_INTERCEPTOR_COMPATIBLE_KEY, "false"))) { + return build27xCompatibleClusterInterceptors(clusterInvoker, last); } return last; } @@ -70,90 +67,15 @@ private AbstractClusterInvoker buildInterceptorInvoker(AbstractClusterInv if (CollectionUtils.isEmpty(builders)) { return invoker; } - return new InterceptorInvoker<>(invoker, builders); + return new InvocationInterceptorInvoker<>(invoker, builders); } protected abstract AbstractClusterInvoker doJoin(Directory directory) throws RpcException; - static class InterceptorInvokerNode extends AbstractClusterInvoker { - - private AbstractClusterInvoker clusterInvoker; - private ClusterInterceptor interceptor; - private AbstractClusterInvoker next; - - public InterceptorInvokerNode(AbstractClusterInvoker clusterInvoker, - ClusterInterceptor interceptor, - AbstractClusterInvoker next) { - this.clusterInvoker = clusterInvoker; - this.interceptor = interceptor; - this.next = next; - } - - @Override - public Class getInterface() { - return clusterInvoker.getInterface(); - } - - @Override - public URL getUrl() { - return clusterInvoker.getUrl(); - } - - @Override - public boolean isAvailable() { - return clusterInvoker.isAvailable(); - } - - @Override - public Result invoke(Invocation invocation) throws RpcException { - Result asyncResult; - try { - interceptor.before(next, invocation); - asyncResult = interceptor.intercept(next, invocation); - } catch (Exception e) { - // onError callback - if (interceptor instanceof ClusterInterceptor.Listener) { - ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; - listener.onError(e, clusterInvoker, invocation); - } - throw e; - } finally { - interceptor.after(next, invocation); - } - return asyncResult.whenCompleteWithContext((r, t) -> { - // onResponse callback - if (interceptor instanceof ClusterInterceptor.Listener) { - ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; - if (t == null) { - listener.onMessage(r, clusterInvoker, invocation); - } else { - listener.onError(t, clusterInvoker, invocation); - } - } - }); - } - - @Override - public void destroy() { - clusterInvoker.destroy(); - } - - @Override - public String toString() { - return clusterInvoker.toString(); - } - - @Override - protected Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException { - // The only purpose is to build a interceptor chain, so the cluster related logic doesn't matter. - return null; - } - } - - static class FilterInvoker extends AbstractClusterInvoker { + static class ClusterFilterInvoker extends AbstractClusterInvoker { private ClusterInvoker filterInvoker; - public FilterInvoker(AbstractClusterInvoker invoker) { + public ClusterFilterInvoker(AbstractClusterInvoker invoker) { List builders = ExtensionLoader.getExtensionLoader(FilterChainBuilder.class).getActivateExtensions(); if (CollectionUtils.isEmpty(builders)) { filterInvoker = invoker; @@ -203,10 +125,10 @@ protected Result doInvoke(Invocation invocation, List> invokers, Load } } - static class InterceptorInvoker extends AbstractClusterInvoker { + static class InvocationInterceptorInvoker extends AbstractClusterInvoker { private ClusterInvoker interceptorInvoker; - public InterceptorInvoker(AbstractClusterInvoker invoker, List builders) { + public InvocationInterceptorInvoker(AbstractClusterInvoker invoker, List builders) { ClusterInvoker tmpInvoker = invoker; for (InvocationInterceptorBuilder builder : builders) { tmpInvoker = builder.buildClusterInterceptorChain(tmpInvoker, INVOCATION_INTERCEPTOR_KEY, CommonConstants.CONSUMER); @@ -248,4 +170,96 @@ protected Result doInvoke(Invocation invocation, List> invokers, Load return null; } } + + @Deprecated + private ClusterInvoker build27xCompatibleClusterInterceptors(AbstractClusterInvoker clusterInvoker, AbstractClusterInvoker last) { + List interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtensions(); + + if (!interceptors.isEmpty()) { + for (int i = interceptors.size() - 1; i >= 0; i--) { + final ClusterInterceptor interceptor = interceptors.get(i); + final AbstractClusterInvoker next = last; + last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next); + } + } + return last; + } + + @Deprecated + static class InterceptorInvokerNode extends AbstractClusterInvoker { + + private AbstractClusterInvoker clusterInvoker; + private ClusterInterceptor interceptor; + private AbstractClusterInvoker next; + + public InterceptorInvokerNode(AbstractClusterInvoker clusterInvoker, + ClusterInterceptor interceptor, + AbstractClusterInvoker next) { + this.clusterInvoker = clusterInvoker; + this.interceptor = interceptor; + this.next = next; + } + + @Override + public Class getInterface() { + return clusterInvoker.getInterface(); + } + + @Override + public URL getUrl() { + return clusterInvoker.getUrl(); + } + + @Override + public boolean isAvailable() { + return clusterInvoker.isAvailable(); + } + + @Override + public Result invoke(Invocation invocation) throws RpcException { + Result asyncResult; + try { + interceptor.before(next, invocation); + asyncResult = interceptor.intercept(next, invocation); + } catch (Exception e) { + // onError callback + if (interceptor instanceof ClusterInterceptor.Listener) { + ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; + listener.onError(e, clusterInvoker, invocation); + } + throw e; + } finally { + interceptor.after(next, invocation); + } + return asyncResult.whenCompleteWithContext((r, t) -> { + // onResponse callback + if (interceptor instanceof ClusterInterceptor.Listener) { + ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; + if (t == null) { + listener.onMessage(r, clusterInvoker, invocation); + } else { + listener.onError(t, clusterInvoker, invocation); + } + } + }); + } + + @Override + public void destroy() { + clusterInvoker.destroy(); + } + + @Override + public String toString() { + return clusterInvoker.toString(); + } + + @Override + protected Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException { + // The only purpose is to build a interceptor chain, so the cluster related logic doesn't matter. + return null; + } + } + + } diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter new file mode 100644 index 00000000000..8f70d31aa0b --- /dev/null +++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter @@ -0,0 +1,2 @@ +zone-aware=org.apache.dubbo.rpc.cluster.filter.support.ZoneAwareFilter +consumercontext=org.apache.dubbo.rpc.cluster.filter.support.ConsumerContextFilter \ No newline at end of file diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor deleted file mode 100644 index 3f3f008955f..00000000000 --- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor +++ /dev/null @@ -1,2 +0,0 @@ -context=org.apache.dubbo.rpc.cluster.interceptor.ConsumerContextClusterInterceptor -zone-aware=org.apache.dubbo.rpc.cluster.interceptor.ZoneAwareClusterInterceptor \ No newline at end of file diff --git a/dubbo-common/pom.xml b/dubbo-common/pom.xml index 44d9fab807c..b424661be68 100644 --- a/dubbo-common/pom.xml +++ b/dubbo-common/pom.xml @@ -72,6 +72,10 @@ javax.annotation javax.annotation-api + + org.eclipse.collections + eclipse-collections + \ No newline at end of file diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java index 619b1bb0e77..5bd2970847c 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java @@ -21,8 +21,9 @@ import org.apache.dubbo.common.url.component.ServiceConfigURL; import org.apache.dubbo.common.url.component.URLItemCache; +import org.eclipse.collections.impl.map.mutable.UnifiedMap; + import java.util.Collections; -import java.util.HashMap; import java.util.Map; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY_PREFIX; @@ -73,7 +74,7 @@ private static Map parseDecodedParams(String str, int from) { } TempBuf tempBuf = DECODE_TEMP_BUF.get(); - Map params = new HashMap<>(); + Map params = new UnifiedMap<>(); int nameStart = from; int valueStart = -1; int i; @@ -169,7 +170,7 @@ private static URL parseURLBody(String fullURLStr, String decodedBody, Map parseEncodedParams(String str, int from) { } TempBuf tempBuf = DECODE_TEMP_BUF.get(); - Map params = new HashMap<>(); + Map params = new UnifiedMap<>(); int nameStart = from; int valueStart = -1; int i; diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java index ea4b4e5ad3a..3da6cca05ee 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java @@ -17,11 +17,13 @@ package org.apache.dubbo.common.config; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; +import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.context.FrameworkExt; import org.apache.dubbo.common.context.LifecycleAdapter; import org.apache.dubbo.common.extension.DisableInject; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.ConfigUtils; import org.apache.dubbo.config.AbstractConfig; import org.apache.dubbo.config.ConfigCenterConfig; import org.apache.dubbo.config.context.ConfigConfigurationAdapter; @@ -54,6 +56,7 @@ public class Environment extends LifecycleAdapter implements FrameworkExt { private boolean configCenterFirst = true; private DynamicConfiguration dynamicConfiguration; + private String localMigrationRule; public Environment() { this.propertiesConfiguration = new PropertiesConfiguration(); @@ -76,6 +79,19 @@ public void initialize() throws IllegalStateException { this.externalConfiguration.setProperties(externalConfigurationMap); this.appExternalConfiguration.setProperties(appExternalConfigurationMap); + + loadMigrationRule(); + } + + private void loadMigrationRule() { + String path = System.getProperty(CommonConstants.DUBBO_MIGRATION_KEY); + if (path == null || path.length() == 0) { + path = System.getenv(CommonConstants.DUBBO_MIGRATION_KEY); + if (path == null || path.length() == 0) { + path = CommonConstants.DEFAULT_DUBBO_MIGRATION_FILE; + } + } + this.localMigrationRule = ConfigUtils.loadMigrationRule(path); } @DisableInject @@ -220,6 +236,10 @@ public InmemoryConfiguration getAppExternalConfiguration() { return appExternalConfiguration; } + public String getLocalMigrationRule() { + return localMigrationRule; + } + // For test public void clearExternalConfigs() { this.externalConfiguration.clear(); diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java index 268c313079f..953b11b0b3a 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java @@ -45,6 +45,10 @@ public interface CommonConstants { String DEFAULT_DUBBO_PROPERTIES = "dubbo.properties"; + String DUBBO_MIGRATION_KEY = "dubbo.migration.file"; + + String DEFAULT_DUBBO_MIGRATION_FILE = "dubbo-migration.yaml"; + String ANY_VALUE = "*"; /** @@ -376,4 +380,6 @@ public interface CommonConstants { String CACHE_CLEAR_TASK_INTERVAL = "dubbo.application.url.cache.task.interval"; String CACHE_CLEAR_WAITING_THRESHOLD = "dubbo.application.url.cache.clear.waiting"; + String CLUSTER_INTERCEPTOR_COMPATIBLE_KEY = "dubbo.application.cluster.interceptor.compatible"; + } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java index 63d140b5a97..a0e89f121f2 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java @@ -252,7 +252,7 @@ private static PathURLAddress createPathURLAddress(String decodeStr, String rawA } // check cache - protocol = URLItemCache.checkProtocol(protocol); + protocol = URLItemCache.intern(protocol); path = URLItemCache.checkPath(path); return new PathURLAddress(protocol, username, password, path, host, port, rawAddress); diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java index 2384493153d..54372c296d9 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java @@ -19,14 +19,13 @@ import org.apache.dubbo.common.utils.LRUCache; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; public class URLItemCache { // thread safe with limited size, by default 1000 private static final Map PARAM_KEY_CACHE = new LRUCache<>(10000); - private static final Map PARAM_VALUE_CACHE = new LRUCache<>(100000); + private static final Map PARAM_VALUE_CACHE = new LRUCache<>(50000); private static final Map PATH_CACHE = new LRUCache<>(10000); - private static final Map PROTOCOL_CACHE = new ConcurrentHashMap<>(); + private static final Map REVISION_CACHE = new LRUCache<>(10000); public static void putParams(Map params, String key, String value) { String cachedKey = PARAM_KEY_CACHE.get(key); @@ -43,17 +42,6 @@ public static void putParams(Map params, String key, String valu params.put(cachedKey, cachedValue); } - public static String checkProtocol(String _protocol) { - if (_protocol == null) { - return _protocol; - } - String cachedProtocol = PROTOCOL_CACHE.putIfAbsent(_protocol, _protocol); - if (cachedProtocol != null) { - return cachedProtocol; - } - return _protocol; - } - public static String checkPath(String _path) { if (_path == null) { return _path; @@ -64,4 +52,32 @@ public static String checkPath(String _path) { } return _path; } + + public static String checkRevision(String _revision) { + if (_revision == null) { + return _revision; + } + String revision = REVISION_CACHE.putIfAbsent(_revision, _revision); + if (revision != null) { + return revision; + } + return _revision; + } + + public static String intern(String _protocol) { + if (_protocol == null) { + return _protocol; + } + return _protocol.intern(); + } + + public static void putParamsIntern(Map params, String key, String value) { + if (key == null || value == null) { + params.put(key, value); + return; + } + key = key.intern(); + value = value.intern(); + params.put(key, value); + } } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java index b2dca0c5dcf..cea92cb3966 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java @@ -21,6 +21,8 @@ import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.StringUtils; +import org.eclipse.collections.impl.map.mutable.UnifiedMap; + import java.io.Serializable; import java.util.Collections; import java.util.HashMap; @@ -263,7 +265,7 @@ public static URLParam parse(String rawParam, boolean encoded, Map parameters = new HashMap<>((int) (parts.length/.75f) + 1); + Map parameters = new UnifiedMap<>((int) (parts.length/.75f) + 1); for (String part : parts) { part = part.trim(); if (part.length() > 0) { diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java index c1f47111392..d0d0693e26c 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java @@ -21,12 +21,16 @@ import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; +import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.util.ArrayList; +import java.util.Arrays; import java.util.Enumeration; import java.util.List; import java.util.Map; @@ -296,6 +300,49 @@ public static Properties loadProperties(String fileName, boolean allowMultiFile, return properties; } + public static String loadMigrationRule(String fileName) { + String rawRule = ""; + if (checkFileNameExist(fileName)) { + try { + try (FileInputStream input = new FileInputStream(fileName)) { + rawRule = readString(input); + } + } catch (Throwable e) { + logger.warn("Failed to load " + fileName + " file from " + fileName + "(ignore this file): " + e.getMessage(), e); + } + return rawRule; + } + + try { + InputStream is = ClassUtils.getClassLoader().getResourceAsStream(fileName); + if (is != null) { + rawRule = readString(is); + } + } catch (Throwable e) { + logger.warn("Failed to load " + fileName + " file from " + fileName + "(ignore this file): " + e.getMessage(), e); + } + return rawRule; + } + + private static String readString(InputStream is) { + StringBuilder stringBuilder = new StringBuilder(); + char[] buffer = new char[10]; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))){ + int n; + while ((n = reader.read(buffer)) != -1) { + if (n < 10) { + buffer = Arrays.copyOf(buffer, n); + } + stringBuilder.append(String.valueOf(buffer)); + buffer = new char[10]; + } + } catch (IOException e) { + logger.error("Read migration file error.", e); + } + + return stringBuilder.toString(); + } + /** * check if the fileName can be found in filesystem * diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java index 4e7bbf4c6e8..47556c50d8b 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java @@ -36,6 +36,7 @@ import java.io.Serializable; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -336,7 +337,7 @@ protected static Map convert(Map parameters, Str String value = entry.getValue(); result.put(pre + key, value); // For compatibility, key like "registry-type" will has a duplicate key "registry.type" - if (key.contains("-")) { + if (Arrays.binarySearch(Constants.DOT_COMPATIBLE_KEYS, key) != -1) { result.put(pre + key.replace('-', '.'), value); } } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java index f8fed849863..894f13819fb 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java @@ -117,4 +117,6 @@ public interface Constants { String REGISTER_KEY = "register"; String MULTI_SERIALIZATION_KEY = "serialize.multiple"; + + String[] DOT_COMPATIBLE_KEYS = new String[]{"qos-enable", "qos-port", "qos-accept-foreign-ip"}; } diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java index a4116f49ae3..cdf66ddd0c4 100644 --- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java +++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java @@ -520,6 +520,7 @@ public void initialize() { } ApplicationModel.initFrameworkExts(); + startConfigCenter(); @@ -1162,7 +1163,7 @@ private void registerServiceInstance() { private void doRegisterServiceInstance(ServiceInstance serviceInstance) { // register instance only when at least one service is exported. - if (serviceInstance.getPort() != null && serviceInstance.getPort() != -1) { + if (serviceInstance.getPort() > 0) { publishMetadataToRemote(serviceInstance); logger.info("Start registering instance address to registry."); getServiceDiscoveries().forEach(serviceDiscovery -> diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java index 383133747a5..8693828bb2f 100644 --- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java +++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java @@ -36,14 +36,14 @@ public class ServiceInstanceHostPortCustomizer implements ServiceInstanceCustomi @Override public void customize(ServiceInstance serviceInstance) { - if (serviceInstance.getPort() != null) { + if (serviceInstance.getPort() > 0) { return; } WritableMetadataService writableMetadataService = WritableMetadataService.getDefaultExtension(); String host = null; - Integer port = null; + int port = -1; Set urls = writableMetadataService.getExportedServiceURLs(); if (CollectionUtils.isNotEmpty(urls)) { String preferredProtocol = ApplicationModel.getApplicationConfig().getProtocol(); @@ -64,7 +64,6 @@ public void customize(ServiceInstance serviceInstance) { DefaultServiceInstance instance = (DefaultServiceInstance) serviceInstance; instance.setHost(host); instance.setPort(port); - instance.setId(host + ":" + port); } } } diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java index 041cd750737..04d5f7259d7 100644 --- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java +++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java @@ -116,8 +116,6 @@ public void testAppendParameters1() throws Exception { Assertions.assertEquals("ONE,1", parameters.get("prefix.num")); Assertions.assertEquals("hello%2Fworld", parameters.get("prefix.naming")); Assertions.assertEquals("30", parameters.get("prefix.age")); - Assertions.assertTrue(parameters.containsKey("prefix.key-2")); - Assertions.assertTrue(parameters.containsKey("prefix.key.2")); Assertions.assertFalse(parameters.containsKey("prefix.secret")); } @@ -807,7 +805,7 @@ public void setSecret(String secret) { public Map getParameters() { Map map = new HashMap(); map.put("key.1", "one"); - map.put("key-2", "two"); + map.put("key.2", "two"); return map; } } diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/dubbo-migration.yaml b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/dubbo-migration.yaml new file mode 100644 index 00000000000..dc2e8f21319 --- /dev/null +++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/dubbo-migration.yaml @@ -0,0 +1,3 @@ +key: demo-consumer +step: FORCE_APPLICATION +threshold: 0.1 \ No newline at end of file diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml index e6b67c1784a..8cc1d43d3d6 100644 --- a/dubbo-dependencies-bom/pom.xml +++ b/dubbo-dependencies-bom/pom.xml @@ -91,6 +91,7 @@ 4.3.16.RELEASE 3.20.0-GA + 10.4.0 3.2.5.Final 4.1.56.Final 1.1.7 @@ -185,6 +186,11 @@ javassist ${javassist_version} + + org.eclipse.collections + eclipse-collections + ${eclipse_collections_version} + org.jboss.netty netty diff --git a/dubbo-distribution/dubbo-all/pom.xml b/dubbo-distribution/dubbo-all/pom.xml index 5729680a88a..b464e419a3d 100644 --- a/dubbo-distribution/dubbo-all/pom.xml +++ b/dubbo-distribution/dubbo-all/pom.xml @@ -311,6 +311,10 @@ org.javassist javassist + + org.eclipse.collections + eclipse-collections + io.netty netty-all @@ -511,6 +515,10 @@ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter + + META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter + META-INF/dubbo/internal/org.apache.dubbo.rpc.InvokerListener diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java index 024328e4e2c..4106ad29a90 100644 --- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java +++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java @@ -42,6 +42,8 @@ import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY; public class MetadataInfo implements Serializable { + public static final MetadataInfo EMPTY = new MetadataInfo(); + private String app; private String revision; private Map services; @@ -50,6 +52,8 @@ public class MetadataInfo implements Serializable { private transient Map extendParams; private transient AtomicBoolean reported = new AtomicBoolean(false); + public MetadataInfo() {} + public MetadataInfo(String app) { this(app, null, null); } diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java index 1492e0bcac4..47e4bc74532 100644 --- a/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java +++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java @@ -36,7 +36,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY; @@ -49,7 +48,7 @@ /** * MonitorFilter. (SPI, Singleton, ThreadSafe) */ -@Activate(group = {PROVIDER, CONSUMER}) +@Activate(group = {PROVIDER}) public class MonitorFilter implements Filter, Filter.Listener { private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class); diff --git a/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java b/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java index cf984a5cbff..96438c53d04 100644 --- a/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java +++ b/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java @@ -33,7 +33,7 @@ * * @see org.apache.dubbo.rpc.Filter */ -@Activate(group = CommonConstants.CONSUMER, order = -10000) +@Activate(group = CommonConstants.CONSUMER, value = Constants.SERVICE_AUTH, order = -10000) public class ConsumerSignFilter implements Filter { @Override diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java index 89e3e75b38d..4c02cdc81a9 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java @@ -45,4 +45,8 @@ public interface NotifyListener { default void addServiceListener(ServiceInstancesChangedListener instanceListener) { } + default URL getConsumerUrl() { + return null; + } + } \ No newline at end of file diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java index 93ba70e1918..1bcf284d46e 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java @@ -19,6 +19,7 @@ import org.apache.dubbo.metadata.MetadataInfo; import com.alibaba.fastjson.JSON; +import org.eclipse.collections.impl.map.mutable.UnifiedMap; import java.util.HashMap; import java.util.List; @@ -39,24 +40,23 @@ public class DefaultServiceInstance implements ServiceInstance { private static final long serialVersionUID = 1149677083747278100L; - private String id; - private String serviceName; private String host; - private Integer port; + private int port; private boolean enabled; private boolean healthy; - private Map metadata = new HashMap<>(); + private Map metadata = new UnifiedMap<>(); private transient String address; private transient MetadataInfo serviceMetadata; // used at runtime - private transient Map extendParams = new HashMap<>(); + private transient String registryCluster; // extendParams can be more flexiable, but one single property uses less space + private transient Map extendParams; private transient List endpoints; public DefaultServiceInstance() { @@ -70,17 +70,16 @@ public DefaultServiceInstance(DefaultServiceInstance other) { this.healthy = other.healthy; this.metadata = other.metadata; this.serviceMetadata = other.serviceMetadata; + this.registryCluster = other.registryCluster; this.extendParams = other.extendParams; this.endpoints = other.endpoints; this.address = null; - this.id = null; } - public DefaultServiceInstance(String id, String serviceName, String host, Integer port) { + public DefaultServiceInstance(String serviceName, String host, Integer port) { if (port != null && port.intValue() < 1) { throw new IllegalArgumentException("The port must be greater than zero!"); } - this.id = id; this.serviceName = serviceName; this.host = host; this.port = port; @@ -88,18 +87,10 @@ public DefaultServiceInstance(String id, String serviceName, String host, Intege this.healthy = true; } - public DefaultServiceInstance(String serviceName, String host, Integer port) { - this(host + ":" + port, serviceName, host, port); - } - public DefaultServiceInstance(String serviceName) { this.serviceName = serviceName; } - public void setId(String id) { - this.id = id; - } - public void setServiceName(String serviceName) { this.serviceName = serviceName; } @@ -108,11 +99,6 @@ public void setHost(String host) { this.host = host; } - @Override - public String getId() { - return id; - } - @Override public String getServiceName() { return serviceName; @@ -123,12 +109,12 @@ public String getHost() { return host; } - public void setPort(Integer port) { + public void setPort(int port) { this.port = port; } @Override - public Integer getPort() { + public int getPort() { return port; } @@ -172,8 +158,20 @@ public SortedMap getSortedMetadata() { return new TreeMap<>(getMetadata()); } + @Override + public String getRegistryCluster() { + return registryCluster; + } + + public void setRegistryCluster(String registryCluster) { + this.registryCluster = registryCluster; + } + @Override public Map getExtendParams() { + if (extendParams == null) { + extendParams = new HashMap<>(); + } return extendParams; } @@ -187,16 +185,19 @@ public List getEndpoints() { public DefaultServiceInstance copy(Endpoint endpoint) { DefaultServiceInstance copyOfInstance = new DefaultServiceInstance(this); copyOfInstance.setPort(endpoint.getPort()); - copyOfInstance.setId(copyOfInstance.getAddress()); return copyOfInstance; } @Override public Map getAllParams() { - Map allParams = new HashMap<>((int) ((metadata.size() + extendParams.size()) / 0.75f + 1)); - allParams.putAll(metadata); - allParams.putAll(extendParams); - return allParams; + if (extendParams == null) { + return metadata; + } else { + Map allParams = new HashMap<>((int) ((metadata.size() + extendParams.size()) / 0.75f + 1)); + allParams.putAll(metadata); + allParams.putAll(extendParams); + return allParams; + } } public void setMetadata(Map metadata) { @@ -249,7 +250,6 @@ public int hashCode() { @Override public String toString() { return "DefaultServiceInstance{" + - "id='" + id + '\'' + ", serviceName='" + serviceName + '\'' + ", host='" + host + '\'' + ", port=" + port + diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java index ee99000423a..f9c80198ab0 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java @@ -223,6 +223,21 @@ public void addServiceInstancesChangedListener(ServiceInstancesChangedListener l eventDispatcher.addEventListener(listener); } + @Override + public ServiceInstancesChangedListener createListener(Set serviceNames) { + return serviceDiscovery.createListener(serviceNames); + } + + @Override + public void removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws IllegalArgumentException { + serviceDiscovery.removeServiceInstancesChangedListener(listener); + } + + @Override + public long getDelay() { + return serviceDiscovery.getDelay(); + } + @Override public URL getUrl() { return serviceDiscovery.getUrl(); diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java index 2a5116870fe..1482a9d0811 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java @@ -110,7 +110,7 @@ private void removeFile(File file) { } private String getServiceInstanceId(ServiceInstance serviceInstance) { - String id = serviceInstance.getId(); + String id = serviceInstance.getAddress(); if (StringUtils.isBlank(id)) { return serviceInstance.getHost() + "." + serviceInstance.getPort(); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java index 10344204737..7537090be01 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java @@ -207,7 +207,7 @@ public URL getUrl() { @SuppressWarnings("unchecked") public final void fillServiceInstance(DefaultServiceInstance serviceInstance) { - String hostId = serviceInstance.getId(); + String hostId = serviceInstance.getAddress(); if (metadataMap.containsKey(hostId)) { // Use cached metadata. // Metadata will be updated by provider callback diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java index 90ba196800f..aa318d4bdbb 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java @@ -219,6 +219,7 @@ default void addServiceInstancesChangedListener(ServiceInstancesChangedListener /** * unsubscribe to instances change event. + * * @param listener * @throws IllegalArgumentException */ @@ -226,6 +227,10 @@ default void removeServiceInstancesChangedListener(ServiceInstancesChangedListen throws IllegalArgumentException { } + default ServiceInstancesChangedListener createListener(Set serviceNames) { + return new ServiceInstancesChangedListener(serviceNames, this); + } + /** * Dispatch the {@link ServiceInstancesChangedEvent} * diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java index d19c8de64c6..31ae6f967b1 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java @@ -291,7 +291,7 @@ protected void subscribeURLs(URL url, NotifyListener listener, Set servi // register ServiceInstancesChangedListener ServiceInstancesChangedListener serviceListener = serviceListeners.computeIfAbsent(serviceNamesKey, k -> { - ServiceInstancesChangedListener serviceInstancesChangedListener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery); + ServiceInstancesChangedListener serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames); serviceInstancesChangedListener.setUrl(url); serviceNames.forEach(serviceName -> { List serviceInstances = serviceDiscovery.getInstances(serviceName); diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java index c41e0281242..6232097552b 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java @@ -32,9 +32,10 @@ import org.apache.dubbo.rpc.RpcContext; import org.apache.dubbo.rpc.cluster.RouterChain; +import org.eclipse.collections.impl.map.mutable.UnifiedMap; + import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -152,7 +153,7 @@ private void refreshInvoker(List invokerUrls) { * @return invokers */ private Map> toInvokers(List urls) { - Map> newUrlInvokerMap = new HashMap<>(); + Map> newUrlInvokerMap = new UnifiedMap<>(); if (urls == null || urls.isEmpty()) { return newUrlInvokerMap; } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java index 5bb2bfe4bae..3a55c4e3b3d 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java @@ -28,13 +28,6 @@ */ public interface ServiceInstance extends Serializable { - /** - * The id of the registered service instance. - * - * @return nullable - */ - String getId(); - /** * The name of service that current instance belongs to. * @@ -54,7 +47,7 @@ public interface ServiceInstance extends Serializable { * * @return the positive integer if present */ - Integer getPort(); + int getPort(); String getAddress(); @@ -87,6 +80,10 @@ default boolean isHealthy() { SortedMap getSortedMetadata(); + String getRegistryCluster(); + + void setRegistryCluster(String registryCluster); + Map getExtendParams(); Map getAllParams(); diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java index 22280bd7bca..dd0a0cff922 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java @@ -53,7 +53,6 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE; -import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_KEY; import static org.apache.dubbo.metadata.RevisionResolver.EMPTY_REVISION; import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision; @@ -67,14 +66,14 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener private static final Logger logger = LoggerFactory.getLogger(ServiceInstancesChangedListener.class); - private final Set serviceNames; - private final ServiceDiscovery serviceDiscovery; - private URL url; - private Map listeners; + protected final Set serviceNames; + protected final ServiceDiscovery serviceDiscovery; + protected URL url; + protected Map listeners; private Map> allInstances; - private Map> serviceUrls; + private Map serviceUrls; private Map revisionToMetadata; @@ -112,8 +111,8 @@ public synchronized void onEvent(ServiceInstancesChangedEvent event) { Map> revisionToInstances = new HashMap<>(); Map> localServiceToRevisions = new HashMap<>(); - Map, List>> protocolRevisionsToUrls = new HashMap<>(); - Map> newServiceUrls = new HashMap<>();//TODO + Map, Object>> protocolRevisionsToUrls = new HashMap<>(); + Map newServiceUrls = new HashMap<>();//TODO Map newRevisionToMetadata = new HashMap<>(); for (Map.Entry> entry : allInstances.entrySet()) { @@ -151,27 +150,14 @@ public synchronized void onEvent(ServiceInstancesChangedEvent event) { localServiceToRevisions.forEach((serviceInfo, revisions) -> { String protocol = serviceInfo.getProtocol(); - Map, List> revisionsToUrls = protocolRevisionsToUrls.computeIfAbsent(protocol, k -> { + Map, Object> revisionsToUrls = protocolRevisionsToUrls.computeIfAbsent(protocol, k -> { return new HashMap<>(); }); - List urls = revisionsToUrls.get(revisions); + Object urls = revisionsToUrls.get(revisions); if (urls != null) { newServiceUrls.put(serviceInfo.getMatchKey(), urls); } else { - urls = new ArrayList<>(); - for (String r : revisions) { - for (ServiceInstance i : revisionToInstances.get(r)) { - // different protocols may have ports specified in meta - if (ServiceInstanceMetadataUtils.hasEndpoints(i)) { - DefaultServiceInstance.Endpoint endpoint = ServiceInstanceMetadataUtils.getEndpoint(i, protocol); - if (endpoint != null && !endpoint.getPort().equals(i.getPort())) { - urls.add(((DefaultServiceInstance)i).copy(endpoint).toURL()); - break; - } - } - urls.add(i.toURL()); - } - } + urls = getServiceUrlsCache(revisionToInstances, revisions, protocol); revisionsToUrls.put(revisions, urls); newServiceUrls.put(serviceInfo.getMatchKey(), urls); } @@ -183,7 +169,7 @@ public synchronized void onEvent(ServiceInstancesChangedEvent event) { public synchronized void addListenerAndNotify(String serviceKey, NotifyListener listener) { this.listeners.put(serviceKey, listener); - List urls = this.serviceUrls.get(serviceKey); + List urls = getAddresses(serviceKey); if (CollectionUtils.isNotEmpty(urls)) { listener.notify(urls); } @@ -197,7 +183,7 @@ public void removeListener(String serviceKey) { } public List getUrls(String serviceKey) { - return toUrlsWithEmpty(serviceUrls.get(serviceKey)); + return toUrlsWithEmpty(getAddresses(serviceKey)); } /** @@ -241,7 +227,7 @@ public final boolean accept(ServiceInstancesChangedEvent event) { return serviceNames.contains(event.getServiceName()); } - private boolean isRetryAndExpired(ServiceInstancesChangedEvent event) { + protected boolean isRetryAndExpired(ServiceInstancesChangedEvent event) { String appName = event.getServiceName(); List appInstances = event.getServiceInstances(); @@ -261,13 +247,13 @@ private boolean isRetryAndExpired(ServiceInstancesChangedEvent event) { return false; } - private boolean hasEmptyMetadata(Map revisionToMetadata) { + protected boolean hasEmptyMetadata(Map revisionToMetadata) { if (revisionToMetadata == null) { return false; } boolean result = false; for (Map.Entry entry : revisionToMetadata.entrySet()) { - if (entry.getValue() == null) { + if (entry.getValue() == MetadataInfo.EMPTY) { result = true; break; } @@ -275,31 +261,31 @@ private boolean hasEmptyMetadata(Map revisionToMetadata) { return result; } - private MetadataInfo getRemoteMetadata(ServiceInstance instance, String revision, Map> localServiceToRevisions, List subInstances) { + protected MetadataInfo getRemoteMetadata(ServiceInstance instance, String revision, Map> localServiceToRevisions, List subInstances) { MetadataInfo metadata = revisionToMetadata.get(revision); - if (metadata == null) { - if (failureCounter.get() < 3 || (System.currentTimeMillis() - lastFailureTime > 10000)) { - metadata = getMetadataInfo(instance); - if (metadata != null) { - logger.info("MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision + " is " + metadata); - failureCounter.set(0); - revisionToMetadata.putIfAbsent(revision, metadata); - parseMetadata(revision, metadata, localServiceToRevisions); - } else { - logger.error("Failed to get MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision - + ", wait for retry."); - lastFailureTime = System.currentTimeMillis(); - failureCounter.incrementAndGet(); - } + if (metadata == null + || (metadata == MetadataInfo.EMPTY && (failureCounter.get() < 3 || (System.currentTimeMillis() - lastFailureTime > 10000)))) { + metadata = getMetadataInfo(instance); + + if (metadata != MetadataInfo.EMPTY) { + logger.info("MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision + " is " + metadata); + failureCounter.set(0); + revisionToMetadata.putIfAbsent(revision, metadata); + parseMetadata(revision, metadata, localServiceToRevisions); + } else { + logger.error("Failed to get MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision + + ", wait for retry."); + lastFailureTime = System.currentTimeMillis(); + failureCounter.incrementAndGet(); } - } else if (subInstances.size() == 1) { + } else if (metadata != MetadataInfo.EMPTY && subInstances.size() == 1) { // "subInstances.size() >= 2" means metadata of this revision has been parsed, ignore parseMetadata(revision, metadata, localServiceToRevisions); } return metadata; } - private Map> parseMetadata(String revision, MetadataInfo metadata, Map> localServiceToRevisions) { + protected Map> parseMetadata(String revision, MetadataInfo metadata, Map> localServiceToRevisions) { Map serviceInfos = metadata.getServices(); for (Map.Entry entry : serviceInfos.entrySet()) { Set set = localServiceToRevisions.computeIfAbsent(entry.getValue(), k -> new TreeSet<>()); @@ -309,10 +295,12 @@ private Map> parseMetadata(String revision, MetadataInf return localServiceToRevisions; } - private MetadataInfo getMetadataInfo(ServiceInstance instance) { + protected MetadataInfo getMetadataInfo(ServiceInstance instance) { String metadataType = ServiceInstanceMetadataUtils.getMetadataStorageType(instance); // FIXME, check "REGISTRY_CLUSTER_KEY" must be set by every registry implementation. - instance.getExtendParams().putIfAbsent(REGISTRY_CLUSTER_KEY, RegistryClusterIdentifier.getExtension(url).consumerKey(url)); + if (instance.getRegistryCluster() == null) { + instance.setRegistryCluster(RegistryClusterIdentifier.getExtension(url).consumerKey(url)); + } MetadataInfo metadataInfo; try { if (logger.isDebugEnabled()) { @@ -332,19 +320,46 @@ private MetadataInfo getMetadataInfo(ServiceInstance instance) { logger.error("Failed to load service metadata, meta type is " + metadataType, e); metadataInfo = null; } + + if (metadataInfo == null) { + metadataInfo = MetadataInfo.EMPTY; + } return metadataInfo; } - private void notifyAddressChanged() { + protected Object getServiceUrlsCache(Map> revisionToInstances, Set revisions, String protocol) { + List urls; + urls = new ArrayList<>(); + for (String r : revisions) { + for (ServiceInstance i : revisionToInstances.get(r)) { + // different protocols may have ports specified in meta + if (ServiceInstanceMetadataUtils.hasEndpoints(i)) { + DefaultServiceInstance.Endpoint endpoint = ServiceInstanceMetadataUtils.getEndpoint(i, protocol); + if (endpoint != null && !endpoint.getPort().equals(i.getPort())) { + urls.add(((DefaultServiceInstance) i).copy(endpoint).toURL()); + break; + } + } + urls.add(i.toURL()); + } + } + return urls; + } + + protected List getAddresses(String serviceProtocolKey) { + return (List) serviceUrls.get(serviceProtocolKey); + } + + protected void notifyAddressChanged() { listeners.forEach((key, notifyListener) -> { //FIXME, group wildcard match - List urls = toUrlsWithEmpty(serviceUrls.get(key)); + List urls = toUrlsWithEmpty(getAddresses(key)); logger.info("Notify service " + key + " with urls " + urls.size()); notifyListener.notify(urls); }); } - private List toUrlsWithEmpty(List urls) { + protected List toUrlsWithEmpty(List urls) { if (urls == null) { urls = Collections.emptyList(); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java index 27f66f5d9a7..42382f8f59a 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java @@ -86,7 +86,7 @@ public static void publishServiceDefinition(URL url) { } public static String computeKey(ServiceInstance serviceInstance) { - return serviceInstance.getServiceName() + "##" + serviceInstance.getId() + "##" + + return serviceInstance.getServiceName() + "##" + serviceInstance.getAddress() + "##" + ServiceInstanceMetadataUtils.getExportedServicesRevision(serviceInstance); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java index bca273bdcd5..d217e2d5704 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java @@ -85,7 +85,7 @@ public MetadataInfo getMetadata(ServiceInstance instance) { SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(instance.getServiceName(), ServiceInstanceMetadataUtils.getExportedServicesRevision(instance)); - String registryCluster = instance.getExtendParams().get(REGISTRY_CLUSTER_KEY); + String registryCluster = instance.getRegistryCluster(); checkRemoteConfigured(); diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java index 30f1ea1291e..a9c7b8ce318 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java @@ -39,11 +39,11 @@ public class DefaultMigrationAddressComparator implements MigrationAddressCompar @Override public boolean shouldMigrate(ClusterInvoker serviceDiscoveryInvoker, ClusterInvoker invoker, MigrationRule rule) { if (!serviceDiscoveryInvoker.hasProxyInvokers()) { - logger.info("No instance address available, will not migrate."); + logger.info("No instance address available, stop compare."); return false; } if (!invoker.hasProxyInvokers()) { - logger.info("No interface address available, will migrate."); + logger.info("No interface address available, stop compare."); return true; } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java index 15e243422e3..2e94d3f9c54 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java @@ -27,7 +27,6 @@ import org.apache.dubbo.registry.integration.DynamicDirectory; import org.apache.dubbo.registry.integration.RegistryProtocol; import org.apache.dubbo.rpc.Invocation; -import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.cluster.Cluster; @@ -36,7 +35,6 @@ import org.apache.dubbo.rpc.model.ApplicationModel; import org.apache.dubbo.rpc.model.ConsumerModel; -import java.util.List; import java.util.Set; import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY; @@ -288,6 +286,9 @@ public boolean invokersChanged() { private volatile boolean invokersChanged; + /** + * Need to know which invoker change triggered this compare. + */ private synchronized void compareAddresses(ClusterInvoker serviceDiscoveryInvoker, ClusterInvoker invoker) { this.invokersChanged = true; if (logger.isDebugEnabled()) { @@ -297,10 +298,10 @@ private synchronized void compareAddresses(ClusterInvoker serviceDiscoveryInv Set detectors = ExtensionLoader.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances(); if (detectors != null && detectors.stream().allMatch(migrationDetector -> migrationDetector.shouldMigrate(serviceDiscoveryInvoker, invoker, rule))) { logger.info("serviceKey:" + invoker.getUrl().getServiceKey() + " switch to APP Level address"); - discardInterfaceInvokerAddress(invoker); + destroyInterfaceInvoker(invoker); } else { logger.info("serviceKey:" + invoker.getUrl().getServiceKey() + " switch to Service Level address"); - discardServiceDiscoveryInvokerAddress(serviceDiscoveryInvoker); + destroyServiceDiscoveryInvoker(serviceDiscoveryInvoker); } } @@ -310,26 +311,28 @@ protected synchronized void destroyServiceDiscoveryInvoker(ClusterInvoker ser updateConsumerModel(currentAvailableInvoker, serviceDiscoveryInvoker); } if (serviceDiscoveryInvoker != null) { - if (logger.isDebugEnabled()) { - logger.debug("Destroying instance address invokers, will not listen for address changes until re-subscribed, " + type.getName()); + if (serviceDiscoveryInvoker.getDirectory().isNotificationReceived()) { + if (logger.isInfoEnabled()) { + logger.info("Destroying instance address invokers, will not listen for address changes until re-subscribed, " + type.getName()); + } + serviceDiscoveryInvoker.destroy(); } - serviceDiscoveryInvoker.destroy(); } } - protected synchronized void discardServiceDiscoveryInvokerAddress(ClusterInvoker serviceDiscoveryInvoker) { - if (this.invoker != null) { - this.currentAvailableInvoker = this.invoker; - updateConsumerModel(currentAvailableInvoker, serviceDiscoveryInvoker); - } - if (serviceDiscoveryInvoker != null) { - if (logger.isDebugEnabled()) { - List> invokers = serviceDiscoveryInvoker.getDirectory().getAllInvokers(); - logger.debug("Discarding instance addresses, total size " + (invokers == null ? 0 : invokers.size())); - } -// serviceDiscoveryInvoker.getDirectory().discordAddresses(); - } - } +// protected synchronized void discardServiceDiscoveryInvokerAddress(ClusterInvoker serviceDiscoveryInvoker) { +// if (this.invoker != null) { +// this.currentAvailableInvoker = this.invoker; +// updateConsumerModel(currentAvailableInvoker, serviceDiscoveryInvoker); +// } +// if (serviceDiscoveryInvoker != null) { +// if (logger.isDebugEnabled()) { +// List> invokers = serviceDiscoveryInvoker.getDirectory().getAllInvokers(); +// logger.debug("Discarding instance addresses, total size " + (invokers == null ? 0 : invokers.size())); +// } +//// serviceDiscoveryInvoker.getDirectory().discordAddresses(); +// } +// } protected void refreshServiceDiscoveryInvoker() { clearListener(serviceDiscoveryInvoker); @@ -338,8 +341,6 @@ protected void refreshServiceDiscoveryInvoker() { logger.debug("Re-subscribing instance addresses, current interface " + type.getName()); } serviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url); - } else { - ((DynamicDirectory) serviceDiscoveryInvoker.getDirectory()).markInvokersChanged(); } } @@ -352,8 +353,6 @@ protected void refreshInterfaceInvoker() { } invoker = registryProtocol.getInvoker(cluster, registry, type, url); - } else { - ((DynamicDirectory) invoker.getDirectory()).markInvokersChanged(); } } @@ -363,26 +362,28 @@ protected synchronized void destroyInterfaceInvoker(ClusterInvoker invoker) { updateConsumerModel(currentAvailableInvoker, invoker); } if (invoker != null) { - if (logger.isDebugEnabled()) { - logger.debug("Destroying interface address invokers, will not listen for address changes until re-subscribed, " + type.getName()); - } - invoker.destroy(); - } - } - - protected synchronized void discardInterfaceInvokerAddress(ClusterInvoker invoker) { - if (this.serviceDiscoveryInvoker != null) { - this.currentAvailableInvoker = this.serviceDiscoveryInvoker; - updateConsumerModel(currentAvailableInvoker, invoker); - } - if (invoker != null) { - if (logger.isDebugEnabled()) { - List> invokers = invoker.getDirectory().getAllInvokers(); - logger.debug("Discarding interface addresses, total address size " + (invokers == null ? 0 : invokers.size())); + if (invoker.getDirectory().isNotificationReceived()) { + if (logger.isInfoEnabled()) { + logger.info("Destroying interface address invokers, will not listen for address changes until re-subscribed, " + type.getName()); + } + invoker.destroy(); } - //invoker.getDirectory().discordAddresses(); } } +// +// protected synchronized void discardInterfaceInvokerAddress(ClusterInvoker invoker) { +// if (this.serviceDiscoveryInvoker != null) { +// this.currentAvailableInvoker = this.serviceDiscoveryInvoker; +// updateConsumerModel(currentAvailableInvoker, invoker); +// } +// if (invoker != null) { +// if (logger.isDebugEnabled()) { +// List> invokers = invoker.getDirectory().getAllInvokers(); +// logger.debug("Discarding interface addresses, total address size " + (invokers == null ? 0 : invokers.size())); +// } +// //invoker.getDirectory().discordAddresses(); +// } +// } private void clearListener(ClusterInvoker invoker) { if (invoker == null) return; @@ -410,9 +411,9 @@ private void updateConsumerModel(ClusterInvoker workingInvoker, ClusterInvoke if (workingInvoker != null) { consumerModel.getServiceMetadata().addAttribute("currentClusterInvoker", workingInvoker); } - if (backInvoker != null) { - consumerModel.getServiceMetadata().addAttribute("backupClusterInvoker", backInvoker); - } +// if (backInvoker != null) { +// consumerModel.getServiceMetadata().addAttribute("backupClusterInvoker", backInvoker); +// } } } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java index 5972e065f2c..b9718f44b28 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java @@ -30,8 +30,8 @@ @Activate public class MigrationRuleHandler { + public static final String DUBBO_SERVICEDISCOVERY_MIGRATION = "dubbo.application.service-discovery.migration"; private static final Logger logger = LoggerFactory.getLogger(MigrationRuleHandler.class); - private static final String DUBBO_SERVICEDISCOVERY_MIGRATION = "dubbo.application.service-discovery.migration"; private MigrationClusterInvoker migrationInvoker; private MigrationStep currentStep; @@ -119,8 +119,10 @@ private void refreshInvoker(MigrationStep step, Float threshold) { } if (step == MigrationStep.APPLICATION_FIRST) { + setCurrentStepAndThreshold(step, threshold); migrationInvoker.refreshServiceDiscoveryInvokerOnMappingCallback(false); } else if (step == MigrationStep.FORCE_APPLICATION) { + setCurrentStepAndThreshold(step, threshold); migrationInvoker.refreshServiceDiscoveryInvokerOnMappingCallback(true); } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java index 399a260f8dc..85c7349043f 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java @@ -67,20 +67,24 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur public MigrationRuleListener() { this.configuration = ApplicationModel.getEnvironment().getDynamicConfiguration().orElse(null); + + String localRawRule = ApplicationModel.getEnvironment().getLocalMigrationRule(); + String defaultRawRule = StringUtils.isEmpty(localRawRule) ? INIT : localRawRule; + if (this.configuration != null) { logger.info("Listening for migration rules on dataId " + RULE_KEY + ", group " + DUBBO_SERVICEDISCOVERY_MIGRATION); configuration.addListener(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION, this); String rawRule = configuration.getConfig(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION); if (StringUtils.isEmpty(rawRule)) { - rawRule = INIT; + rawRule = defaultRawRule; } this.rawRule = rawRule; } else { if (logger.isWarnEnabled()) { logger.warn("Using default configuration rule because config center is not configured!"); } - rawRule = INIT; + rawRule = defaultRawRule; } // process(new ConfigChangedEvent(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION, rawRule)); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java index 8b29483aaae..c7b0daaa63c 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java @@ -251,20 +251,18 @@ public synchronized void setInvokersChangedListener(InvokersChangedListener list this.invokersChangedListener = listener; if (invokersChangedListener != null && invokersChanged) { invokersChangedListener.onChange(); - invokersChanged = false; } } protected synchronized void invokersChanged() { invokersChanged = true; - if (invokersChangedListener != null && invokersChanged) { + if (invokersChangedListener != null) { invokersChangedListener.onChange(); - invokersChanged = false; } } - public synchronized void markInvokersChanged() { - this.invokersChanged = true; + public boolean isNotificationReceived() { + return invokersChanged; } protected abstract void destroyAllInvokers(); diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java index 85b63de95dc..e2909ce14fa 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java @@ -19,7 +19,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import static java.lang.String.valueOf; import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.METADATA_SERVICE_URLS_PROPERTY_NAME; import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -37,7 +36,7 @@ public class DefaultServiceInstanceTest { public DefaultServiceInstance instance; public static DefaultServiceInstance createInstance() { - DefaultServiceInstance instance = new DefaultServiceInstance(valueOf(System.nanoTime()), "A", "127.0.0.1", 8080); + DefaultServiceInstance instance = new DefaultServiceInstance("A", "127.0.0.1", 8080); instance.getMetadata().put("dubbo.metadata-service.urls", "[ \"dubbo://192.168.0.102:20881/com.alibaba.cloud.dubbo.service.DubboMetadataService?anyhost=true&application=spring-cloud-alibaba-dubbo-provider&bind.ip=192.168.0.102&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=spring-cloud-alibaba-dubbo-provider&interface=com.alibaba.cloud.dubbo.service.DubboMetadataService&methods=getAllServiceKeys,getServiceRestMetadata,getExportedURLs,getAllExportedURLs&pid=17134&qos.enable=false®ister=true&release=2.7.3&revision=1.0.0&side=provider×tamp=1564826098503&version=1.0.0\" ]"); instance.getMetadata().put("dubbo.metadata-service.url-params", "{\"dubbo\":{\"application\":\"dubbo-provider-demo\",\"deprecated\":\"false\",\"group\":\"dubbo-provider-demo\",\"version\":\"1.0.0\",\"timestamp\":\"1564845042651\",\"dubbo\":\"2.0.2\",\"provider.host\":\"192.168.0.102\",\"provider.port\":\"20880\"}}"); return instance; diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java index 8677381f8e5..65527df8a24 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static org.apache.dubbo.registry.client.DefaultServiceInstanceTest.createInstance; @@ -29,6 +30,7 @@ * * @since 2.7.5 */ +@Disabled("FileSystemServiceDiscovery implementation is not stable enough at present") public class FileSystemServiceDiscoveryTest { private FileSystemServiceDiscovery serviceDiscovery; diff --git a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java index 084376b9adf..15bf2d1c63f 100644 --- a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java @@ -20,7 +20,6 @@ import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.utils.DefaultPage; import org.apache.dubbo.common.utils.Page; -import org.apache.dubbo.event.ConditionalEventListener; import org.apache.dubbo.registry.client.ServiceDiscovery; import org.apache.dubbo.registry.client.ServiceDiscoveryFactory; import org.apache.dubbo.registry.client.ServiceInstance; @@ -33,6 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; public class MultipleServiceDiscovery implements ServiceDiscovery { public static final String REGISTRY_PREFIX_KEY = "child."; @@ -88,15 +88,22 @@ public void unregister(ServiceInstance serviceInstance) throws RuntimeException @Override public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException { - MultiServiceInstancesChangedListener multiListener = new MultiServiceInstancesChangedListener(listener); + MultiServiceInstancesChangedListener multiListener = (MultiServiceInstancesChangedListener) listener; for (String registryKey : serviceDiscoveries.keySet()) { - SingleServiceInstancesChangedListener singleListener = new SingleServiceInstancesChangedListener(listener.getServiceNames(), serviceDiscoveries.get(registryKey), multiListener); - multiListener.putSingleListener(registryKey, singleListener); - serviceDiscoveries.get(registryKey).addServiceInstancesChangedListener(singleListener); + ServiceDiscovery serviceDiscovery = serviceDiscoveries.get(registryKey); + SingleServiceInstancesChangedListener singleListener = multiListener.getAndComputeIfAbsent(registryKey, k -> { + return new SingleServiceInstancesChangedListener(listener.getServiceNames(), serviceDiscovery, multiListener); + }); + serviceDiscovery.addServiceInstancesChangedListener(singleListener); } } + @Override + public ServiceInstancesChangedListener createListener(Set serviceNames) { + return new MultiServiceInstancesChangedListener(serviceNames, this); + } + @Override public Page getInstances(String serviceName, int offset, int pageSize, boolean healthyOnly) throws NullPointerException, IllegalArgumentException, UnsupportedOperationException { @@ -123,17 +130,12 @@ public ServiceInstance getLocalInstance() { return serviceInstance; } - protected static class MultiServiceInstancesChangedListener implements ConditionalEventListener { - private final ServiceInstancesChangedListener sourceListener; - private final Map singleListenerMap = new ConcurrentHashMap<>(); - - public MultiServiceInstancesChangedListener(ServiceInstancesChangedListener sourceListener) { - this.sourceListener = sourceListener; - } + protected static class MultiServiceInstancesChangedListener extends ServiceInstancesChangedListener { + private final Map singleListenerMap; - @Override - public boolean accept(ServiceInstancesChangedEvent event) { - return sourceListener.getServiceNames().contains(event.getServiceName()); + public MultiServiceInstancesChangedListener(Set serviceNames, ServiceDiscovery serviceDiscovery) { + super(serviceNames, serviceDiscovery); + this.singleListenerMap = new ConcurrentHashMap<>(); } @Override @@ -149,12 +151,16 @@ public void onEvent(ServiceInstancesChangedEvent event) { } } - sourceListener.onEvent(new ServiceInstancesChangedEvent(event.getServiceName(), serviceInstances)); + super.onEvent(new ServiceInstancesChangedEvent(event.getServiceName(), serviceInstances)); } public void putSingleListener(String registryKey, SingleServiceInstancesChangedListener singleListener) { singleListenerMap.put(registryKey, singleListener); } + + public SingleServiceInstancesChangedListener getAndComputeIfAbsent(String registryKey, Function func) { + return singleListenerMap.computeIfAbsent(registryKey, func); + } } protected static class SingleServiceInstancesChangedListener extends ServiceInstancesChangedListener { diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java index 7542b5e0bac..abc7467347d 100644 --- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java +++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java @@ -57,7 +57,6 @@ public class NacosNamingServiceUtils { */ public static Instance toInstance(ServiceInstance serviceInstance) { Instance instance = new Instance(); - instance.setInstanceId(serviceInstance.getId()); instance.setServiceName(serviceInstance.getServiceName()); instance.setIp(serviceInstance.getHost()); instance.setPort(serviceInstance.getPort()); @@ -75,8 +74,7 @@ public static Instance toInstance(ServiceInstance serviceInstance) { * @since 2.7.5 */ public static ServiceInstance toServiceInstance(Instance instance) { - DefaultServiceInstance serviceInstance = new DefaultServiceInstance(instance.getInstanceId(), - NamingUtils.getServiceName(instance.getServiceName()), instance.getIp(), instance.getPort()); + DefaultServiceInstance serviceInstance = new DefaultServiceInstance(NamingUtils.getServiceName(instance.getServiceName()), instance.getIp(), instance.getPort()); serviceInstance.setMetadata(instance.getMetadata()); serviceInstance.setEnabled(instance.isEnabled()); serviceInstance.setHealthy(instance.isHealthy()); diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java index dce7d20cdea..549b8034c7d 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java @@ -26,6 +26,7 @@ import org.apache.dubbo.registry.client.AbstractServiceDiscovery; import org.apache.dubbo.registry.client.ServiceDiscovery; import org.apache.dubbo.registry.client.ServiceInstance; +import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; import org.apache.dubbo.rpc.RpcException; @@ -40,6 +41,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import static org.apache.dubbo.common.function.ThrowableFunction.execute; import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.isInstanceUpdated; @@ -199,18 +201,28 @@ protected void registerServiceWatcher(String serviceName, ServiceInstancesChange throw new IllegalStateException("registerServiceWatcher create path=" + path + " fail.", e); } - CuratorWatcher watcher = watcherCaches.computeIfAbsent(path, key -> - new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, listener)); - try { - curatorFramework.getChildren().usingWatcher(watcher).forPath(path); - } catch (KeeperException.NoNodeException e) { - // ignored - if (logger.isErrorEnabled()) { - logger.error(e.getMessage()); + CountDownLatch latch = new CountDownLatch(1); + ZookeeperServiceDiscoveryChangeWatcher watcher = watcherCaches.computeIfAbsent(path, key -> { + ZookeeperServiceDiscoveryChangeWatcher tmpWatcher = new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, path, latch); + try { + curatorFramework.getChildren().usingWatcher(tmpWatcher).forPath(path); + } catch (KeeperException.NoNodeException e) { + // ignored + if (logger.isErrorEnabled()) { + logger.error(e.getMessage()); + } + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); } - } catch (Exception e) { - throw new IllegalStateException(e.getMessage(), e); - } + return tmpWatcher; + }); + watcher.addListener(listener); + listener.onEvent(new ServiceInstancesChangedEvent(serviceName, this.getInstances(serviceName))); + latch.countDown(); + } + + public void reRegisterWatcher(ZookeeperServiceDiscoveryChangeWatcher watcher) throws Exception { + curatorFramework.getChildren().usingWatcher(watcher).forPath(watcher.getPath()); } private String buildServicePath(String serviceName) { diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java index e34d3e47144..8cffc3eca24 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java @@ -16,8 +16,10 @@ */ package org.apache.dubbo.registry.zookeeper; +import org.apache.dubbo.common.utils.ConcurrentHashSet; import org.apache.dubbo.registry.RegistryNotifier; import org.apache.dubbo.registry.client.ServiceDiscovery; +import org.apache.dubbo.registry.client.ServiceInstance; import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; @@ -25,6 +27,10 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + import static org.apache.dubbo.rpc.model.ApplicationModel.getExecutorRepository; import static org.apache.zookeeper.Watcher.Event.EventType.NodeChildrenChanged; import static org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged; @@ -37,7 +43,7 @@ * @since 2.7.5 */ public class ZookeeperServiceDiscoveryChangeWatcher implements CuratorWatcher { - private ServiceInstancesChangedListener listener; + private Set listeners = new ConcurrentHashSet<>(); private final ZookeeperServiceDiscovery zookeeperServiceDiscovery; @@ -47,34 +53,52 @@ public class ZookeeperServiceDiscoveryChangeWatcher implements CuratorWatcher { private final String serviceName; + private final String path; + + private CountDownLatch latch; + public ZookeeperServiceDiscoveryChangeWatcher(ZookeeperServiceDiscovery zookeeperServiceDiscovery, String serviceName, - ServiceInstancesChangedListener listener) { + String path, + CountDownLatch latch) { this.zookeeperServiceDiscovery = zookeeperServiceDiscovery; this.serviceName = serviceName; - this.listener = listener; + this.path = path; + this.latch = latch; this.notifier = new RegistryNotifier(zookeeperServiceDiscovery.getDelay(), getExecutorRepository().getServiceDiscoveryAddressNotificationExecutor()) { @Override protected void doNotify(Object rawAddresses) { - listener.onEvent((ServiceInstancesChangedEvent)rawAddresses); + listeners.forEach(listener -> listener.onEvent((ServiceInstancesChangedEvent)rawAddresses)); } }; } @Override public void process(WatchedEvent event) throws Exception { + try { + latch.await(); + } catch (InterruptedException e) { + } Watcher.Event.EventType eventType = event.getType(); if (NodeChildrenChanged.equals(eventType) || NodeDataChanged.equals(eventType)) { if (shouldKeepWatching()) { - notifier.notify(new ServiceInstancesChangedEvent(serviceName, zookeeperServiceDiscovery.getInstances(serviceName))); - zookeeperServiceDiscovery.registerServiceWatcher(serviceName, listener); - zookeeperServiceDiscovery.dispatchServiceInstancesChangedEvent(serviceName); + zookeeperServiceDiscovery.reRegisterWatcher(this); + List instanceList = zookeeperServiceDiscovery.getInstances(serviceName); + notifier.notify(new ServiceInstancesChangedEvent(serviceName, instanceList)); } } } + public String getPath() { + return path; + } + + public void addListener(ServiceInstancesChangedListener listener) { + listeners.add(listener); + } + public boolean shouldKeepWatching() { return keepWatching; } diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java index 278aae7cd40..ed9be546df6 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java @@ -85,7 +85,7 @@ public static ServiceInstance build(org.apache.curator.x.discovery.ServiceInstan String host = instance.getAddress(); int port = instance.getPort(); ZookeeperInstance zookeeperInstance = instance.getPayload(); - DefaultServiceInstance serviceInstance = new DefaultServiceInstance(instance.getId(), name, host, port); + DefaultServiceInstance serviceInstance = new DefaultServiceInstance(name, host, port); serviceInstance.setMetadata(zookeeperInstance.getMetadata()); return serviceInstance; } diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java index 5f3b68085c2..359d62ae0d5 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java @@ -34,7 +34,6 @@ import static java.util.Arrays.asList; import static org.apache.dubbo.common.utils.NetUtils.getAvailablePort; import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.INSTANCE_REVISION_UPDATED_KEY; -import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.generateId; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -104,7 +103,7 @@ public void testRegistration() throws InterruptedException { } private DefaultServiceInstance createServiceInstance(String serviceName, String host, int port) { - return new DefaultServiceInstance(generateId(host, port), serviceName, host, port); + return new DefaultServiceInstance(serviceName, host, port); } // @Test diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java new file mode 100644 index 00000000000..d0b78484fd8 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc; + +public interface BaseFilter { + /** + * Make sure call invoker.invoke() in your implementation. + */ + Result invoke(Invoker invoker, Invocation invocation) throws RpcException; + + interface Listener { + + void onResponse(Result appResponse, Invoker invoker, Invocation invocation); + + void onError(Throwable t, Invoker invoker, Invocation invocation); + } +} diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java index 58e821566f0..74acb51d4b5 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java @@ -33,6 +33,32 @@ * Caching is implemented in dubbo using filter approach. If cache is configured for invocation then before * remote call configured caching type's (e.g. Thread Local, JCache etc) implementation invoke method gets called. * + * + * Start from 3.0, the semantics of the Filter component at the consumer side has changed. + * Instead of intercepting a specific instance of invoker, Filter in 3.0 now intercepts ClusterInvoker. A new SPI named + * InstanceFilter is introduced to work as the same semantic as Filter in 2.x. + * + * The difference of Filter is as follows: + * + * 3.x Filter + * + * -> InstanceFilter -> Invoker + * + * Proxy -> Filter -> Filter -> ClusterInvoker -> InstanceFilter -> Invoker + * + * -> InstanceFilter -> Invoker + * + * + * 2.x Filter + * + * Filter -> Filter -> Invoker + * + * Proxy -> ClusterInvoker -> Filter -> Filter -> Invoker + * + * Filter -> Filter -> Invoker + * + * If you want to a Filter + * * Filter. (SPI, Singleton, ThreadSafe) * * @see org.apache.dubbo.rpc.filter.GenericFilter @@ -41,17 +67,5 @@ * @see org.apache.dubbo.rpc.filter.TpsLimitFilter */ @SPI -public interface Filter { - /** - * Make sure call invoker.invoke() in your implementation. - */ - Result invoke(Invoker invoker, Invocation invocation) throws RpcException; - - interface Listener { - - void onResponse(Result appResponse, Invoker invoker, Invocation invocation); - - void onError(Throwable t, Invoker invoker, Invocation invocation); - } - +public interface Filter extends BaseFilter { } \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java index 1b1b0af9ad9..c49aa6c64d3 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java @@ -52,7 +52,7 @@ */ public abstract class AbstractInvoker implements Invoker { - protected final Logger logger = LoggerFactory.getLogger(getClass()); + protected static final Logger logger = LoggerFactory.getLogger(AbstractInvoker.class); private final Class type; diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter index 5255f559093..7c526c25aec 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter +++ b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter @@ -3,10 +3,8 @@ generic=org.apache.dubbo.rpc.filter.GenericFilter genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter token=org.apache.dubbo.rpc.filter.TokenFilter accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter -activelimit=org.apache.dubbo.rpc.filter.ActiveLimitFilter classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter context=org.apache.dubbo.rpc.filter.ContextFilter -consumercontext=org.apache.dubbo.rpc.filter.ConsumerContextFilter exception=org.apache.dubbo.rpc.filter.ExceptionFilter executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java index ee922afed32..ad95481bd96 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java @@ -20,11 +20,11 @@ import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.rpc.Filter; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.cluster.filter.ClusterFilter; import org.apache.dubbo.rpc.model.ApplicationModel; import org.apache.dubbo.rpc.model.AsyncMethodInfo; import org.apache.dubbo.rpc.model.ConsumerModel; @@ -39,7 +39,7 @@ * EventFilter */ @Activate(group = CommonConstants.CONSUMER) -public class FutureFilter implements Filter, Filter.Listener { +public class FutureFilter implements ClusterFilter, ClusterFilter.Listener { protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class); diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter index 79a7a380dfa..ee41594ca83 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter @@ -1,2 +1 @@ -trace=org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter -future=org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter \ No newline at end of file +trace=org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter new file mode 100644 index 00000000000..4783214a47e --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter @@ -0,0 +1 @@ +future=org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java index 1a1b0fd4f11..d34f49a6977 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java @@ -18,11 +18,11 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.AppResponse; -import org.apache.dubbo.rpc.Filter; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.RpcInvocation; +import org.apache.dubbo.rpc.cluster.filter.ClusterFilter; import org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter; import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService; @@ -40,7 +40,7 @@ */ public class FutureFilterTest { private static RpcInvocation invocation; - private Filter eventFilter = new FutureFilter(); + private ClusterFilter eventFilter = new FutureFilter(); @BeforeAll public static void setUp() {