Skip to content

Commit

Permalink
Revert "3.0 enhancement, do not merge (#7381)" (#7387)
Browse files Browse the repository at this point in the history
This reverts commit d6c9bf6.
  • Loading branch information
chickenlj authored Mar 16, 2021
1 parent d6c9bf6 commit c3263cf
Show file tree
Hide file tree
Showing 64 changed files with 425 additions and 688 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,4 @@ default boolean isServiceDiscovery() {
void discordAddresses();

RouterChain<T> getRouterChain();

default boolean isNotificationReceived() {
return false;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
@Activate(order = 0)
public class DefaultFilterChainBuilder implements FilterChainBuilder {

/**
* build consumer/provider filter chain
*/
@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
Invoker<T> last = originalInvoker;
Expand All @@ -46,17 +43,14 @@ public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String
return last;
}

/**
* build consumer cluster filter chain
*/
@Override
public <T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> originalInvoker, String key, String group) {
ClusterInvoker<T> last = originalInvoker;
List<ClusterFilter> filters = ExtensionLoader.getExtensionLoader(ClusterFilter.class).getActivateExtension(originalInvoker.getUrl(), key, group);
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(), key, group);

if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final ClusterFilter filter = filters.get(i);
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new ClusterFilterChainNode<>(originalInvoker, next, filter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.rpc.BaseFilter;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
Expand All @@ -30,27 +29,16 @@

@SPI("default")
public interface FilterChainBuilder {
/**
* build consumer/provider filter chain
*/
<T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group);

/**
* build consumer cluster filter chain
*/
<T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> invoker, String key, String group);

/**
* Works on provider side
* @param <T>
* @param <TYPE>
*/
class FilterChainNode<T, TYPE extends Invoker<T>, FILTER extends BaseFilter> implements Invoker<T>{
class FilterChainNode<T, TYPE extends Invoker<T>> implements Invoker<T>{
TYPE originalInvoker;
Invoker<T> nextNode;
FILTER filter;
Filter filter;

public FilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
public FilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, Filter filter) {
this.originalInvoker = originalInvoker;
this.nextNode = nextNode;
this.filter = filter;
Expand Down Expand Up @@ -91,8 +79,8 @@ public Result invoke(Invocation invocation) throws RpcException {
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, originalInvoker, invocation);
}
throw e;
Expand All @@ -114,8 +102,8 @@ public Result invoke(Invocation invocation) throws RpcException {
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onResponse(r, originalInvoker, invocation);
} else {
Expand All @@ -136,14 +124,8 @@ public String toString() {
}
}

/**
* Works on consumer side
* @param <T>
* @param <TYPE>
*/
class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>, FILTER extends BaseFilter>
extends FilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> {
public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>> extends FilterChainNode<T, TYPE> implements ClusterInvoker<T> {
public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, Filter filter) {
super(originalInvoker, nextNode, filter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
import org.apache.dubbo.rpc.RpcException;

import java.util.List;
import java.util.Objects;

import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SERVICE_FILTER_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.PEER_KEY;

/**
* ListenerProtocol
Expand Down Expand Up @@ -67,7 +68,11 @@ public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (UrlUtils.isRegistry(url)) {
return protocol.refer(type, url);
}
return builder.buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
// if it's peer-to-peer url
if (!Objects.isNull(url.getAttribute(PEER_KEY))) {
return builder.buildInvokerChain(protocol.refer(type, url), SERVICE_FILTER_KEY, CommonConstants.CONSUMER);
}
return protocol.refer(type, url);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
/**
* Different from {@link Filter}, ClusterInterceptor works at the outmost layer, before one specific address/invoker is picked.
*/
@Deprecated
@SPI
public interface ClusterInterceptor {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.interceptor;

import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;

@Activate
public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener {

@Override
public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) {
RpcContext context = RpcContext.getContext();
context.setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0);
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
RpcContext.removeServerContext();
}

@Override
public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
RpcContext.removeContext(true);
}

@Override
public Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException {
return clusterInvoker.invoke(invocation);
}

@Override
public void onMessage(Result appResponse, AbstractClusterInvoker<?> invoker, Invocation invocation) {
RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
}

@Override
public void onError(Throwable t, AbstractClusterInvoker<?> invoker, Invocation invocation) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.filter.support;
package org.apache.dubbo.rpc.cluster.interceptor;

import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.ZoneDetector;
import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;

import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE_FORCE;
Expand All @@ -36,11 +32,11 @@
*
* active only when url has key 'cluster=zone-aware'
*/
@Activate(group = CommonConstants.CONSUMER, value = "cluster:zone-aware")
public class ZoneAwareFilter implements ClusterFilter {
@Activate(value = "cluster:zone-aware")
public class ZoneAwareClusterInterceptor implements ClusterInterceptor {

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
public void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
RpcContext rpcContext = RpcContext.getContext();
String zone = (String) rpcContext.getAttachment(REGISTRY_ZONE);
String force = (String) rpcContext.getAttachment(REGISTRY_ZONE_FORCE);
Expand All @@ -57,7 +53,10 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
if (StringUtils.isNotEmpty(force)) {
invocation.setAttachment(REGISTRY_ZONE_FORCE, force);
}
}

@Override
public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {

return invoker.invoke(invocation);
}
}
Loading

0 comments on commit c3263cf

Please sign in to comment.