Skip to content

Commit

Permalink
[3.0] fix service discovery implementation and introduce ClusterFiIlt…
Browse files Browse the repository at this point in the history
…er (#7388)
  • Loading branch information
chickenlj committed Mar 16, 2021
1 parent c3263cf commit 644e3c2
Show file tree
Hide file tree
Showing 64 changed files with 688 additions and 425 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@ default boolean isServiceDiscovery() {
void discordAddresses();

RouterChain<T> getRouterChain();

default boolean isNotificationReceived() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.filter;

import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.rpc.BaseFilter;

@SPI
public interface ClusterFilter extends BaseFilter {
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
@Activate(order = 0)
public class DefaultFilterChainBuilder implements FilterChainBuilder {

/**
* build consumer/provider filter chain
*/
@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
Invoker<T> last = originalInvoker;
Expand All @@ -43,14 +46,17 @@ public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String
return last;
}

/**
* build consumer cluster filter chain
*/
@Override
public <T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> originalInvoker, String key, String group) {
ClusterInvoker<T> last = originalInvoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(), key, group);
List<ClusterFilter> filters = ExtensionLoader.getExtensionLoader(ClusterFilter.class).getActivateExtension(originalInvoker.getUrl(), key, group);

if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final ClusterFilter filter = filters.get(i);
final Invoker<T> next = last;
last = new ClusterFilterChainNode<>(originalInvoker, next, filter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.rpc.BaseFilter;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
Expand All @@ -29,16 +30,27 @@

@SPI("default")
public interface FilterChainBuilder {
/**
* build consumer/provider filter chain
*/
<T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group);

/**
* build consumer cluster filter chain
*/
<T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> invoker, String key, String group);

class FilterChainNode<T, TYPE extends Invoker<T>> implements Invoker<T>{
/**
* Works on provider side
* @param <T>
* @param <TYPE>
*/
class FilterChainNode<T, TYPE extends Invoker<T>, FILTER extends BaseFilter> implements Invoker<T>{
TYPE originalInvoker;
Invoker<T> nextNode;
Filter filter;
FILTER filter;

public FilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, Filter filter) {
public FilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
this.originalInvoker = originalInvoker;
this.nextNode = nextNode;
this.filter = filter;
Expand Down Expand Up @@ -79,8 +91,8 @@ public Result invoke(Invocation invocation) throws RpcException {
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
listener.onError(e, originalInvoker, invocation);
}
throw e;
Expand All @@ -102,8 +114,8 @@ public Result invoke(Invocation invocation) throws RpcException {
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
if (t == null) {
listener.onResponse(r, originalInvoker, invocation);
} else {
Expand All @@ -124,8 +136,14 @@ public String toString() {
}
}

class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>> extends FilterChainNode<T, TYPE> implements ClusterInvoker<T> {
public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, Filter filter) {
/**
* Works on consumer side
* @param <T>
* @param <TYPE>
*/
class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>, FILTER extends BaseFilter>
extends FilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> {
public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
super(originalInvoker, nextNode, filter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@
import org.apache.dubbo.rpc.RpcException;

import java.util.List;
import java.util.Objects;

import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SERVICE_FILTER_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.PEER_KEY;

/**
* ListenerProtocol
Expand Down Expand Up @@ -68,11 +67,7 @@ public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (UrlUtils.isRegistry(url)) {
return protocol.refer(type, url);
}
// if it's peer-to-peer url
if (!Objects.isNull(url.getAttribute(PEER_KEY))) {
return builder.buildInvokerChain(protocol.refer(type, url), SERVICE_FILTER_KEY, CommonConstants.CONSUMER);
}
return protocol.refer(type, url);
return builder.buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.filter;
package org.apache.dubbo.rpc.cluster.filter.support;

import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.utils.CollectionUtils;
Expand All @@ -28,6 +28,7 @@
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;

import java.util.Map;

Expand All @@ -39,11 +40,11 @@
* ConsumerContextFilter set current RpcContext with invoker,invocation, local host, remote host and port
* for consumer invoker.It does it to make the requires info available to execution thread's RpcContext.
*
* @see org.apache.dubbo.rpc.Filter
* @see Filter
* @see RpcContext
*/
@Activate(group = CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {
public class ConsumerContextFilter implements ClusterFilter, ClusterFilter.Listener {

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Expand Down Expand Up @@ -76,7 +77,24 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
+ invocation.getMethodName() + ", terminate directly."), invocation);
}
}
return invoker.invoke(invocation);

try {
RpcContext.removeServerContext();
return invoker.invoke(invocation);
} finally {
RpcContext.removeContext();
}
}

@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// pass attachments to result
RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
}

@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.interceptor;
package org.apache.dubbo.rpc.cluster.filter.support;

import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.ZoneDetector;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;

import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE_FORCE;
Expand All @@ -32,11 +36,11 @@
*
* active only when url has key 'cluster=zone-aware'
*/
@Activate(value = "cluster:zone-aware")
public class ZoneAwareClusterInterceptor implements ClusterInterceptor {
@Activate(group = CommonConstants.CONSUMER, value = "cluster:zone-aware")
public class ZoneAwareFilter implements ClusterFilter {

@Override
public void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getContext();
String zone = (String) rpcContext.getAttachment(REGISTRY_ZONE);
String force = (String) rpcContext.getAttachment(REGISTRY_ZONE_FORCE);
Expand All @@ -53,10 +57,7 @@ public void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocati
if (StringUtils.isNotEmpty(force)) {
invocation.setAttachment(REGISTRY_ZONE_FORCE, force);
}
}

@Override
public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {

return invoker.invoke(invocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
/**
* Different from {@link Filter}, ClusterInterceptor works at the outmost layer, before one specific address/invoker is picked.
*/
@Deprecated
@SPI
public interface ClusterInterceptor {

Expand Down

This file was deleted.

Loading

0 comments on commit 644e3c2

Please sign in to comment.