Skip to content

Commit

Permalink
Brand new routing rule (#7372)
Browse files Browse the repository at this point in the history
  • Loading branch information
qinliujie authored and chickenlj committed Mar 24, 2021
1 parent 631aedc commit 489f488
Show file tree
Hide file tree
Showing 58 changed files with 5,215 additions and 2 deletions.
4 changes: 2 additions & 2 deletions dubbo-cluster/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.dubbo</groupId>
Expand All @@ -39,7 +40,6 @@
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.rpc.cluster.router.mesh.route;

import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.cluster.router.mesh.rule.VsDestinationGroup;
import org.apache.dubbo.rpc.cluster.router.mesh.rule.destination.DestinationRule;
import org.apache.dubbo.rpc.cluster.router.mesh.rule.virtualservice.VirtualServiceRule;
import org.apache.dubbo.rpc.cluster.router.mesh.util.VsDestinationGroupRuleDispatcher;
import org.yaml.snakeyaml.Yaml;

import java.text.MessageFormat;
import java.util.Map;


public class MeshAppRuleListener implements ConfigurationListener {

public static final Logger logger = LoggerFactory.getLogger(MeshAppRuleListener.class);

private final VsDestinationGroupRuleDispatcher vsDestinationGroupRuleDispatcher = new VsDestinationGroupRuleDispatcher();

private String appName;

private VsDestinationGroup vsDestinationGroupHolder;

public MeshAppRuleListener(String appName) {
this.appName = appName;
}

public void receiveConfigInfo(String configInfo) {
logger.info(MessageFormat.format("[MeshAppRule] Received rule for app [{0}]: {1}.",
appName, configInfo));
try {

VsDestinationGroup vsDestinationGroup = new VsDestinationGroup();
vsDestinationGroup.setAppName(appName);

Yaml yaml = new Yaml();
Yaml yaml2 = new Yaml();
Iterable objectIterable = yaml.loadAll(configInfo);
for (Object result : objectIterable) {

Map resultMap = (Map) result;
if (resultMap.get("kind").equals("DestinationRule")) {
DestinationRule destinationRule = yaml2.loadAs(yaml2.dump(result), DestinationRule.class);
vsDestinationGroup.getDestinationRuleList().add(destinationRule);

} else if (resultMap.get("kind").equals("VirtualService")) {
VirtualServiceRule virtualServiceRule = yaml2.loadAs(yaml2.dump(result), VirtualServiceRule.class);
vsDestinationGroup.getVirtualServiceRuleList().add(virtualServiceRule);
}
}

vsDestinationGroupHolder = vsDestinationGroup;
} catch (Exception e) {
logger.error("[MeshAppRule] parse failed: " + configInfo, e);
}
if (vsDestinationGroupHolder != null) {
vsDestinationGroupRuleDispatcher.post(vsDestinationGroupHolder);
}

}

public void register(MeshRuleRouter subscriber) {
if (vsDestinationGroupHolder != null) {
subscriber.onRuleChange(vsDestinationGroupHolder);
}
vsDestinationGroupRuleDispatcher.register(subscriber);
}

//
public void unregister(MeshRuleRouter sub) {
vsDestinationGroupRuleDispatcher.unregister(sub);
}

@Override
public void process(ConfigChangedEvent event) {
receiveConfigInfo(event.getContent());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.rpc.cluster.router.mesh.route;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.registry.AddressListener;
import org.apache.dubbo.rpc.cluster.Directory;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

@Activate(order = 670)
public class MeshRuleAddressListenerInterceptor implements AddressListener {

private static final Object mark = new Object();
private static ConcurrentHashMap<String, Object> appMap = new ConcurrentHashMap<String, Object>();

@Override
public List<URL> notify(List<URL> addresses, URL consumerUrl, Directory registryDirectory) {

if (addresses != null && !addresses.isEmpty()) {
for (URL serviceURL : addresses) {

String app = serviceURL.getRemoteApplication();
if (app != null && !app.isEmpty()) {
if (appMap.putIfAbsent(app, mark) == null) {
MeshRuleManager.subscribeAppRule(app);
}
}
}
}

return addresses;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.rpc.cluster.router.mesh.route;

import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;


public final class MeshRuleManager {

public static final Logger logger = LoggerFactory.getLogger(MeshRuleManager.class);

private static final String MESH_RULE_DATA_ID_SUFFIX = ".MESHAPPRULE";
private static final String GROUP = "DEFAULT_GROUP";

private static ConcurrentHashMap<String, MeshAppRuleListener> appRuleListeners = new ConcurrentHashMap<>();

public synchronized static void subscribeAppRule(String app) {

MeshAppRuleListener meshAppRuleListener = new MeshAppRuleListener(app);
String appRuleDataId = app + MESH_RULE_DATA_ID_SUFFIX;
DynamicConfiguration configuration = ApplicationModel.getEnvironment().getDynamicConfiguration()
.orElse(null);

if (configuration == null) {
logger.warn("Doesn't support DynamicConfiguration!");
return;
}

try {
String rawConfig = configuration.getConfig(appRuleDataId, GROUP, 5000L);
if (rawConfig != null) {
meshAppRuleListener.receiveConfigInfo(rawConfig);
}
} catch (Throwable throwable) {
logger.error("get MeshRuleManager app rule failed.", throwable);
}

configuration.addListener(appRuleDataId, GROUP, meshAppRuleListener);
appRuleListeners.put(app, meshAppRuleListener);
}

public static void register(String app, MeshRuleRouter subscriber) {
MeshAppRuleListener meshAppRuleListener = appRuleListeners.get(app);
if (meshAppRuleListener == null) {
logger.warn("appRuleListener can't find when Router register");
return;
}
meshAppRuleListener.register(subscriber);
}

public static void unregister(MeshRuleRouter subscriber) {
Collection<MeshAppRuleListener> listeners = appRuleListeners.values();
for (MeshAppRuleListener listener : listeners) {
listener.unregister(subscriber);
}
}

}
Loading

0 comments on commit 489f488

Please sign in to comment.