Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.0] fix service discovery implementation and introduce ClusterFiIlter #7388

Merged
merged 32 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
051bcdc
mem optimization
chickenlj Feb 26, 2021
8905c33
internal repo
chickenlj Feb 26, 2021
753eeff
intern protocol
chickenlj Feb 26, 2021
918a84f
Merge branch '3.0' into 3.0-metadatainfo-opt
chickenlj Feb 26, 2021
d0a710e
refactor service discovery listener to support extension
chickenlj Feb 28, 2021
1794b9d
replace HashMap with UnifiedMap
chickenlj Mar 1, 2021
b3b5625
list compatible keys
chickenlj Mar 2, 2021
9b14e44
remove unused field in DefaultServiceInstance
chickenlj Mar 3, 2021
3a1f002
Merge branch '3.0' of https://github.com/apache/dubbo into 3.0
chickenlj Mar 3, 2021
d8e3aba
fix ut compilation
chickenlj Mar 4, 2021
d74edfb
use concurrent hashmap (or disable isAvailable in the future)
chickenlj Mar 4, 2021
f629a53
delegate method
chickenlj Mar 4, 2021
7028e50
support local migration file
chickenlj Mar 9, 2021
4c9c5a5
Refactor filter chain, introduce ClusterFilter
chickenlj Mar 9, 2021
64d695a
Merge branch '3.0-listener-enhancement' into 3.0-metadatainfo-opt
chickenlj Mar 9, 2021
12700f3
add ClusterFilter config to shade plugin
chickenlj Mar 10, 2021
2c85bc7
destroy another registry listener when switching to one
chickenlj Mar 10, 2021
7737b3f
delete distribution repo config
chickenlj Mar 10, 2021
8deb4c4
Fix metadata retry
chickenlj Mar 10, 2021
b335151
Fix loading of local migration rule
chickenlj Mar 10, 2021
bfe8856
fix refer instance filter
chickenlj Mar 10, 2021
7835e08
fix zookeeper service discovery watcher
chickenlj Mar 10, 2021
bd91572
delete err output
chickenlj Mar 11, 2021
065f752
public constant
chickenlj Mar 11, 2021
d4ecb16
Merge branch '3.0-metadatainfo-opt-internal' into 3.0-metadatainfo-opt
chickenlj Mar 11, 2021
d94be30
remove synchronization lock
chickenlj Mar 12, 2021
f2f32b3
fix invoker destroy
chickenlj Mar 12, 2021
cfc7410
Merge branch '3.0-metadatainfo-opt-internal' into 3.0-metadatainfo-opt
chickenlj Mar 15, 2021
7ab938b
Merge branch '3.0-preview' into 3.0-metadatainfo-opt
chickenlj Mar 16, 2021
2752593
fix ut
chickenlj Mar 16, 2021
584aa81
use 3.0.0-SNAPSHOT
chickenlj Mar 16, 2021
e218147
disable ut
chickenlj Mar 16, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@ default boolean isServiceDiscovery() {
void discordAddresses();

RouterChain<T> getRouterChain();

default boolean isNotificationReceived() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.filter;

import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.rpc.BaseFilter;

@SPI
public interface ClusterFilter extends BaseFilter {
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
@Activate(order = 0)
public class DefaultFilterChainBuilder implements FilterChainBuilder {

/**
* build consumer/provider filter chain
*/
@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
Invoker<T> last = originalInvoker;
Expand All @@ -43,14 +46,17 @@ public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String
return last;
}

/**
* build consumer cluster filter chain
*/
@Override
public <T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> originalInvoker, String key, String group) {
ClusterInvoker<T> last = originalInvoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(), key, group);
List<ClusterFilter> filters = ExtensionLoader.getExtensionLoader(ClusterFilter.class).getActivateExtension(originalInvoker.getUrl(), key, group);

if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final ClusterFilter filter = filters.get(i);
final Invoker<T> next = last;
last = new ClusterFilterChainNode<>(originalInvoker, next, filter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.rpc.BaseFilter;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
Expand All @@ -29,16 +30,27 @@

@SPI("default")
public interface FilterChainBuilder {
/**
* build consumer/provider filter chain
*/
<T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group);

/**
* build consumer cluster filter chain
*/
<T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> invoker, String key, String group);

class FilterChainNode<T, TYPE extends Invoker<T>> implements Invoker<T>{
/**
* Works on provider side
* @param <T>
* @param <TYPE>
*/
class FilterChainNode<T, TYPE extends Invoker<T>, FILTER extends BaseFilter> implements Invoker<T>{
TYPE originalInvoker;
Invoker<T> nextNode;
Filter filter;
FILTER filter;

public FilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, Filter filter) {
public FilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
this.originalInvoker = originalInvoker;
this.nextNode = nextNode;
this.filter = filter;
Expand Down Expand Up @@ -79,8 +91,8 @@ public Result invoke(Invocation invocation) throws RpcException {
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
listener.onError(e, originalInvoker, invocation);
}
throw e;
Expand All @@ -102,8 +114,8 @@ public Result invoke(Invocation invocation) throws RpcException {
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
if (t == null) {
listener.onResponse(r, originalInvoker, invocation);
} else {
Expand All @@ -124,8 +136,14 @@ public String toString() {
}
}

class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>> extends FilterChainNode<T, TYPE> implements ClusterInvoker<T> {
public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, Filter filter) {
/**
* Works on consumer side
* @param <T>
* @param <TYPE>
*/
class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>, FILTER extends BaseFilter>
extends FilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> {
public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
super(originalInvoker, nextNode, filter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@
import org.apache.dubbo.rpc.RpcException;

import java.util.List;
import java.util.Objects;

import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SERVICE_FILTER_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.PEER_KEY;

/**
* ListenerProtocol
Expand Down Expand Up @@ -68,11 +67,7 @@ public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (UrlUtils.isRegistry(url)) {
return protocol.refer(type, url);
}
// if it's peer-to-peer url
if (!Objects.isNull(url.getAttribute(PEER_KEY))) {
return builder.buildInvokerChain(protocol.refer(type, url), SERVICE_FILTER_KEY, CommonConstants.CONSUMER);
}
return protocol.refer(type, url);
return builder.buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.filter;
package org.apache.dubbo.rpc.cluster.filter.support;

import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.utils.CollectionUtils;
Expand All @@ -28,6 +28,7 @@
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;

import java.util.Map;

Expand All @@ -39,11 +40,11 @@
* ConsumerContextFilter set current RpcContext with invoker,invocation, local host, remote host and port
* for consumer invoker.It does it to make the requires info available to execution thread's RpcContext.
*
* @see org.apache.dubbo.rpc.Filter
* @see Filter
* @see RpcContext
*/
@Activate(group = CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {
public class ConsumerContextFilter implements ClusterFilter, ClusterFilter.Listener {

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Expand Down Expand Up @@ -76,7 +77,24 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
+ invocation.getMethodName() + ", terminate directly."), invocation);
}
}
return invoker.invoke(invocation);

try {
RpcContext.removeServerContext();
return invoker.invoke(invocation);
} finally {
RpcContext.removeContext();
}
}

@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// pass attachments to result
RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
}

@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.interceptor;
package org.apache.dubbo.rpc.cluster.filter.support;

import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.ZoneDetector;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;

import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE_FORCE;
Expand All @@ -32,11 +36,11 @@
*
* active only when url has key 'cluster=zone-aware'
*/
@Activate(value = "cluster:zone-aware")
public class ZoneAwareClusterInterceptor implements ClusterInterceptor {
@Activate(group = CommonConstants.CONSUMER, value = "cluster:zone-aware")
public class ZoneAwareFilter implements ClusterFilter {

@Override
public void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getContext();
String zone = (String) rpcContext.getAttachment(REGISTRY_ZONE);
String force = (String) rpcContext.getAttachment(REGISTRY_ZONE_FORCE);
Expand All @@ -53,10 +57,7 @@ public void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocati
if (StringUtils.isNotEmpty(force)) {
invocation.setAttachment(REGISTRY_ZONE_FORCE, force);
}
}

@Override
public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {

return invoker.invoke(invocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
/**
* Different from {@link Filter}, ClusterInterceptor works at the outmost layer, before one specific address/invoker is picked.
*/
@Deprecated
@SPI
public interface ClusterInterceptor {

Expand Down

This file was deleted.

Loading