diff --git a/before_ut.sh b/before_ut.sh index 210e9e723b..b55e424ef7 100755 --- a/before_ut.sh +++ b/before_ut.sh @@ -36,5 +36,8 @@ cp ${zkJar} cluster/router/chain/zookeeper-4unittest/contrib/fatjar mkdir -p cluster/router/condition/zookeeper-4unittest/contrib/fatjar cp ${zkJar} cluster/router/condition/zookeeper-4unittest/contrib/fatjar +mkdir -p cluster/router/tag/zookeeper-4unittest/contrib/fatjar +cp ${zkJar} cluster/router/tag/zookeeper-4unittest/contrib/fatjar + mkdir -p metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar cp ${zkJar} metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar \ No newline at end of file diff --git a/cluster/router/condition/app_router_test.go b/cluster/router/condition/app_router_test.go index 8b38f2dd61..ea18604964 100644 --- a/cluster/router/condition/app_router_test.go +++ b/cluster/router/condition/app_router_test.go @@ -24,7 +24,6 @@ import ( ) import ( - _ "github.com/apache/dubbo-go/config_center/zookeeper" "github.com/stretchr/testify/assert" ) @@ -34,6 +33,7 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/config_center" + _ "github.com/apache/dubbo-go/config_center/zookeeper" "github.com/apache/dubbo-go/remoting" "github.com/apache/dubbo-go/remoting/zookeeper" ) diff --git a/cluster/router/condition/listenable_router.go b/cluster/router/condition/listenable_router.go index 4ccc19e955..7f4f14a8e4 100644 --- a/cluster/router/condition/listenable_router.go +++ b/cluster/router/condition/listenable_router.go @@ -85,7 +85,7 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) { return l, nil } -// Process Process config change event , generate routers and set them to the listenableRouter instance +// Process Process config change event, generate routers and set them to the listenableRouter instance func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) { logger.Infof("Notification of condition rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value) if remoting.EventTypeDel == event.ConfigType { diff --git a/cluster/router/tag/file.go b/cluster/router/tag/file.go index 8144c83203..433abcb72e 100644 --- a/cluster/router/tag/file.go +++ b/cluster/router/tag/file.go @@ -42,7 +42,7 @@ type FileTagRouter struct { force bool } -// NewFileTagRouter Create file tag router instance with content ( from config file) +// NewFileTagRouter Create file tag router instance with content (from config file) func NewFileTagRouter(content []byte) (*FileTagRouter, error) { fileRouter := &FileTagRouter{} rule, err := getRule(string(content)) diff --git a/cluster/router/tag/router_rule.go b/cluster/router/tag/router_rule.go index 926446dcb2..5fb7ab151c 100644 --- a/cluster/router/tag/router_rule.go +++ b/cluster/router/tag/router_rule.go @@ -22,9 +22,27 @@ import ( "github.com/apache/dubbo-go/common/yaml" ) +/** + * %YAML1.2 + * --- + * force: true + * runtime: false + * enabled: true + * priority: 1 + * key: demo-provider + * tags: + * - name: tag1 + * addresses: [ip1, ip2] + * - name: tag2 + * addresses: [ip3, ip4] + * ... + */ // RouterRule RouterRule config read from config file or config center type RouterRule struct { router.BaseRouterRule `yaml:",inline""` + Tags []Tag + addressToTagNames map[string][]string + tagNameToAddresses map[string][]string } func getRule(rawRule string) (*RouterRule, error) { @@ -34,5 +52,58 @@ func getRule(rawRule string) (*RouterRule, error) { return r, err } r.RawRule = rawRule + r.init() return r, nil } + +func (t *RouterRule) init() { + t.addressToTagNames = make(map[string][]string, 8) + t.tagNameToAddresses = make(map[string][]string, 8) + for _, tag := range t.Tags { + for _, address := range tag.Addresses { + t.addressToTagNames[address] = append(t.addressToTagNames[address], tag.Name) + } + t.tagNameToAddresses[tag.Name] = tag.Addresses + } +} + +func (t *RouterRule) getAddresses() []string { + var result = make([]string, 0, 8*len(t.Tags)) + for _, tag := range t.Tags { + result = append(result, tag.Addresses...) + } + return result +} + +func (t *RouterRule) getTagNames() []string { + var result = make([]string, 0, len(t.Tags)) + for _, tag := range t.Tags { + result = append(result, tag.Name) + } + return result +} + +func (t *RouterRule) hasTag(tag string) bool { + for _, t := range t.Tags { + if tag == t.Name { + return true + } + } + return false +} + +func (t *RouterRule) getAddressToTagNames() map[string][]string { + return t.addressToTagNames +} + +func (t *RouterRule) getTagNameToAddresses() map[string][]string { + return t.tagNameToAddresses +} + +func (t *RouterRule) getTags() []Tag { + return t.Tags +} + +func (t *RouterRule) setTags(tags []Tag) { + t.Tags = tags +} diff --git a/cluster/router/tag/router_rule_test.go b/cluster/router/tag/router_rule_test.go index 2df65193f9..4e0f5b729e 100644 --- a/cluster/router/tag/router_rule_test.go +++ b/cluster/router/tag/router_rule_test.go @@ -22,19 +22,56 @@ import ( ) import ( - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" ) -func TestGetRule(t *testing.T) { +type RuleTestSuite struct { + suite.Suite + rule *RouterRule +} + +func (suite *RuleTestSuite) SetupTest() { + var err error yml := ` scope: application -runtime: true force: true +runtime: false +enabled: true +priority: 1 +key: demo-provider +tags: + - name: tag1 + addresses: [ip1, ip2] + - name: tag2 + addresses: [ip3, ip4] ` - rule, e := getRule(yml) - assert.Nil(t, e) - assert.NotNil(t, rule) - assert.Equal(t, true, rule.Force) - assert.Equal(t, true, rule.Runtime) - assert.Equal(t, "application", rule.Scope) + suite.rule, err = getRule(yml) + suite.Nil(err) +} + +func (suite *RuleTestSuite) TestGetRule() { + var err error + suite.Equal(true, suite.rule.Force) + suite.Equal(false, suite.rule.Runtime) + suite.Equal("application", suite.rule.Scope) + suite.Equal(1, suite.rule.Priority) + suite.Equal("demo-provider", suite.rule.Key) + suite.Nil(err) +} + +func (suite *RuleTestSuite) TestGetTagNames() { + suite.Equal([]string{"tag1", "tag2"}, suite.rule.getTagNames()) +} + +func (suite *RuleTestSuite) TestGetAddresses() { + suite.Equal([]string{"ip1", "ip2", "ip3", "ip4"}, suite.rule.getAddresses()) +} + +func (suite *RuleTestSuite) TestHasTag() { + suite.Equal(true, suite.rule.hasTag("tag1")) + suite.Equal(false, suite.rule.hasTag("tag404")) +} + +func TestRuleTestSuite(t *testing.T) { + suite.Run(t, new(RuleTestSuite)) } diff --git a/cluster/router/tag/tag.go b/cluster/router/tag/tag.go new file mode 100644 index 0000000000..73d10b5db4 --- /dev/null +++ b/cluster/router/tag/tag.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tag + +type Tag struct { + Name string + Addresses []string +} + +func (t *Tag) getName() string { + return t.Name +} + +func (t *Tag) setName(name string) { + t.Name = name +} + +func (t *Tag) getAddresses() []string { + return t.Addresses +} + +func (t *Tag) setAddresses(addresses []string) { + t.Addresses = addresses +} diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index e1376fd96a..ece950ebc0 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -18,7 +18,10 @@ package tag import ( + "errors" + "net" "strconv" + "strings" ) import ( @@ -27,15 +30,21 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/config" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/remoting" ) // tagRouter defines url, enable and the priority type tagRouter struct { - url *common.URL - enabled bool - priority int64 + url *common.URL + tagRouterRule *RouterRule + enabled bool + priority int64 + application string } // NewTagRouter returns a tagRouter instance if url is not nil @@ -55,6 +64,15 @@ func (c *tagRouter) isEnabled() bool { return c.enabled } +func (c *tagRouter) SetApplication(app string) { + c.application = app +} + +func (c *tagRouter) tagRouterRuleCopy() RouterRule { + routerRule := *c.tagRouterRule + return routerRule +} + // Route gets a list of invoker func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { if !c.isEnabled() { @@ -63,7 +81,152 @@ func (c *tagRouter) Route(invokers []protocol.Invoker, url *common.URL, invocati if len(invokers) == 0 { return invokers } - return filterUsingStaticTag(invokers, url, invocation) + if c.tagRouterRule == nil || !c.tagRouterRule.Valid || !c.tagRouterRule.Enabled { + return filterUsingStaticTag(invokers, url, invocation) + } + // since the rule can be changed by config center, we should copy one to use. + tagRouterRuleCopy := c.tagRouterRuleCopy() + tag, ok := invocation.Attachments()[constant.Tagkey] + if !ok { + tag = url.GetParam(constant.Tagkey, "") + } + var ( + result []protocol.Invoker + addresses []string + ) + // if we are requesting for a Provider with a specific tag + if len(tag) > 0 { + addresses, _ = tagRouterRuleCopy.getTagNameToAddresses()[tag] + // filter by dynamic tag group first + if len(addresses) > 0 { + filterAddressMatches := func(invoker protocol.Invoker) bool { + url := invoker.GetUrl() + if len(addresses) > 0 && checkAddressMatch(addresses, url.Ip, url.Port) { + return true + } + return false + } + result = filterInvoker(invokers, filterAddressMatches) + if len(result) > 0 || tagRouterRuleCopy.Force { + return result + } + } else { + // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by + // dynamic tag group but force=false. check static tag + filter := func(invoker protocol.Invoker) bool { + if invoker.GetUrl().GetParam(constant.Tagkey, "") == tag { + return true + } + return false + } + result = filterInvoker(invokers, filter) + } + // If there's no tagged providers that can match the current tagged request. force.tag is set by default + // to false, which means it will invoke any providers without a tag unless it's explicitly disallowed. + if len(result) > 0 || isForceUseTag(url, invocation) { + return result + } else { + // FAILOVER: return all Providers without any tags. + filterAddressNotMatches := func(invoker protocol.Invoker) bool { + url := invoker.GetUrl() + if len(addresses) == 0 || !checkAddressMatch(tagRouterRuleCopy.getAddresses(), url.Ip, url.Port) { + return true + } + return false + } + filterTagIsEmpty := func(invoker protocol.Invoker) bool { + if invoker.GetUrl().GetParam(constant.Tagkey, "") == "" { + return true + } + return false + } + return filterInvoker(invokers, filterAddressNotMatches, filterTagIsEmpty) + } + } else { + // return all addresses in dynamic tag group. + addresses = tagRouterRuleCopy.getAddresses() + if len(addresses) > 0 { + filterAddressNotMatches := func(invoker protocol.Invoker) bool { + url := invoker.GetUrl() + if len(addresses) == 0 || !checkAddressMatch(addresses, url.Ip, url.Port) { + return true + } + return false + } + result = filterInvoker(invokers, filterAddressNotMatches) + // 1. all addresses are in dynamic tag group, return empty list. + if len(result) == 0 { + return result + } + } + // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the + // static tag group. + filter := func(invoker protocol.Invoker) bool { + localTag := invoker.GetUrl().GetParam(constant.Tagkey, "") + return localTag == "" || !(tagRouterRuleCopy.hasTag(localTag)) + } + return filterInvoker(result, filter) + } +} + +func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) { + logger.Infof("Notification of tag rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value) + if remoting.EventTypeDel == event.ConfigType { + c.tagRouterRule = nil + return + } else { + content, ok := event.Value.(string) + if !ok { + logger.Errorf("Convert event content fail,raw content:[%s] ", event.Value) + return + } + + routerRule, err := getRule(content) + if err != nil { + logger.Errorf("Parse dynamic tag router rule fail,error:[%s] ", err) + return + } + c.tagRouterRule = routerRule + return + } +} + +func (c *tagRouter) Notify(invokers []protocol.Invoker) { + if len(invokers) == 0 { + return + } + invoker := invokers[0] + url := invoker.GetUrl() + providerApplication := url.GetParam(constant.RemoteApplicationKey, "") + if providerApplication == "" { + logger.Error("TagRouter must getConfig from or subscribe to a specific application, but the application " + + "in this TagRouter is not specified.") + return + } + dynamicConfiguration := config.GetEnvInstance().GetDynamicConfiguration() + if dynamicConfiguration == nil { + logger.Error("Get dynamicConfiguration fail, dynamicConfiguration is nil, init config center plugin please") + return + } + + if providerApplication != c.application { + dynamicConfiguration.RemoveListener(c.application+constant.TagRouterRuleSuffix, c) + } + + routerKey := providerApplication + constant.TagRouterRuleSuffix + dynamicConfiguration.AddListener(routerKey, c) + //get rule + rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP)) + if len(rule) == 0 || err != nil { + logger.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err) + return + } + if rule != "" { + c.Process(&config_center.ConfigChangeEvent{ + Key: routerKey, + Value: rule, + ConfigType: remoting.EventTypeUpdate}) + } } // URL gets the url of tagRouter @@ -76,7 +239,7 @@ func (c *tagRouter) Priority() int64 { return c.priority } -// filterUsingStaticTag gets a list of invoker using static tag +// filterUsingStaticTag gets a list of invoker using static tag, If there's no dynamic tag rule being set, use static tag in URL func filterUsingStaticTag(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { if tag, ok := invocation.Attachments()[constant.Tagkey]; ok { result := make([]protocol.Invoker, 0, 8) @@ -100,3 +263,163 @@ func isForceUseTag(url *common.URL, invocation protocol.Invocation) bool { } return false } + +type filter func(protocol.Invoker) bool + +func filterInvoker(invokers []protocol.Invoker, filters ...filter) []protocol.Invoker { + var res []protocol.Invoker +OUTER: + for _, invoker := range invokers { + for _, filter := range filters { + if !filter(invoker) { + continue OUTER + } + } + res = append(res, invoker) + } + return res +} + +// TODO: need move to dubbogo/gost +func checkAddressMatch(addresses []string, host, port string) bool { + for _, address := range addresses { + if matchIp(address, host, port) { + return true + } + if address == net.JoinHostPort(constant.ANYHOST_VALUE, port) { + return true + } + } + return false +} + +func matchIp(pattern, host, port string) bool { + // if the pattern is subnet format, it will not be allowed to config port param in pattern. + if strings.Contains(pattern, "/") { + _, subnet, _ := net.ParseCIDR(pattern) + if subnet != nil && subnet.Contains(net.ParseIP(host)) { + return true + } + return false + } + return matchIpRange(pattern, host, port) +} + +func matchIpRange(pattern, host, port string) bool { + if pattern == "" || host == "" { + logger.Error("Illegal Argument pattern or hostName. Pattern:" + pattern + ", Host:" + host) + return false + } + + pattern = strings.TrimSpace(pattern) + if "*.*.*.*" == pattern || "*" == pattern { + return true + } + + isIpv4 := true + ip4 := net.ParseIP(host).To4() + + if ip4 == nil { + isIpv4 = false + } + + hostAndPort := getPatternHostAndPort(pattern, isIpv4) + if hostAndPort[1] != "" && hostAndPort[1] != port { + return false + } + + pattern = hostAndPort[0] + // TODO 常量化 + splitCharacter := "." + if !isIpv4 { + splitCharacter = ":" + } + + mask := strings.Split(pattern, splitCharacter) + // check format of pattern + if err := checkHostPattern(pattern, mask, isIpv4); err != nil { + logger.Error(err) + return false + } + + if pattern == host { + return true + } + + // short name condition + if !ipPatternContains(pattern) { + return pattern == host + } + + ipAddress := strings.Split(host, splitCharacter) + for i := 0; i < len(mask); i++ { + if "*" == mask[i] || mask[i] == ipAddress[i] { + continue + } else if strings.Contains(mask[i], "-") { + rangeNumStrs := strings.Split(mask[i], "-") + if len(rangeNumStrs) != 2 { + logger.Error("There is wrong format of ip Address: " + mask[i]) + return false + } + min := getNumOfIpSegment(rangeNumStrs[0], isIpv4) + max := getNumOfIpSegment(rangeNumStrs[1], isIpv4) + ip := getNumOfIpSegment(ipAddress[i], isIpv4) + if ip < min || ip > max { + return false + } + } else if "0" == ipAddress[i] && "0" == mask[i] || "00" == mask[i] || "000" == mask[i] || "0000" == mask[i] { + continue + } else if mask[i] != ipAddress[i] { + return false + } + } + return true +} + +func ipPatternContains(pattern string) bool { + return strings.Contains(pattern, "*") || strings.Contains(pattern, "-") +} + +func checkHostPattern(pattern string, mask []string, isIpv4 bool) error { + if !isIpv4 { + if len(mask) != 8 && ipPatternContains(pattern) { + return errors.New("If you config ip expression that contains '*' or '-', please fill qualified ip pattern like 234e:0:4567:0:0:0:3d:*. ") + } + if len(mask) != 8 && !strings.Contains(pattern, "::") { + return errors.New("The host is ipv6, but the pattern is not ipv6 pattern : " + pattern) + } + } else { + if len(mask) != 4 { + return errors.New("The host is ipv4, but the pattern is not ipv4 pattern : " + pattern) + } + } + return nil +} + +func getPatternHostAndPort(pattern string, isIpv4 bool) []string { + result := make([]string, 2) + if strings.HasPrefix(pattern, "[") && strings.Contains(pattern, "]:") { + end := strings.Index(pattern, "]:") + result[0] = pattern[1:end] + result[1] = pattern[end+2:] + } else if strings.HasPrefix(pattern, "[") && strings.HasSuffix(pattern, "]") { + result[0] = pattern[1 : len(pattern)-1] + result[1] = "" + } else if isIpv4 && strings.Contains(pattern, ":") { + end := strings.Index(pattern, ":") + result[0] = pattern[:end] + result[1] = pattern[end+1:] + } else { + result[0] = pattern + } + return result +} + +func getNumOfIpSegment(ipSegment string, isIpv4 bool) int { + if isIpv4 { + ipSeg, _ := strconv.Atoi(ipSegment) + return ipSeg + } + ipSeg, _ := strconv.ParseInt(ipSegment, 0, 16) + return int(ipSeg) +} diff --git a/cluster/router/tag/tag_router_test.go b/cluster/router/tag/tag_router_test.go index 000b3ec672..e5ddc2890c 100644 --- a/cluster/router/tag/tag_router_test.go +++ b/cluster/router/tag/tag_router_test.go @@ -19,25 +19,43 @@ package tag import ( "context" + "fmt" + "github.com/stretchr/testify/suite" "testing" + "time" ) import ( + "github.com/dubbogo/go-zookeeper/zk" "github.com/stretchr/testify/assert" ) import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/config" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config_center" + _ "github.com/apache/dubbo-go/config_center/zookeeper" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting" + "github.com/apache/dubbo-go/remoting/zookeeper" ) const ( - tagRouterTestHangZhouUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou" - tagRouterTestShangHaiUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai" - tagRouterTestBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing" - tagRouterTestUserConsumer = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true" - tagRouterTestUserConsumerTag = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true" + tagRouterTestHangZhouUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=hangzhou&remote.application=test-tag" + tagRouterTestShangHaiUrl = "dubbo://127.0.0.1:20002/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=shanghai&remote.application=test-tag" + tagRouterTestBeijingUrl = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=true&dubbo.tag=beijing&remote.application=test-tag" + tagRouterTestEnabledBeijingUrl = "dubbo://127.0.0.1:20004/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0&enabled=false&dubbo.tag=beijing&remote.application=test-tag" + tagRouterTestUserConsumer = "dubbo://127.0.0.1:20005/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag" + tagRouterTestUserConsumerTag = "dubbo://127.0.0.1:20000/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&dubbo.force.tag=true&remote.application=test-tag" + + tagRouterTestDynamicIpv4Provider1 = "dubbo://127.0.0.1:20001/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag" + tagRouterTestDynamicIpv4Provider2 = "dubbo://127.0.0.1:20002/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag" + tagRouterTestDynamicIpv4Provider3 = "dubbo://127.0.0.1:20003/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag" + tagRouterTestDynamicIpv4Provider4 = "dubbo://127.0.0.1:20004/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag&dubbo.tag=tag4" + tagRouterTestDynamicIpv4Provider5 = "dubbo://127.0.0.1:20005/com.ikurento.user.UserConsumer?interface=com.ikurento.user.UserConsumer&group=&version=2.6.0&enabled=true&remote.application=test-tag&dubbo.tag=tag5" tagRouterTestDubboTag = "dubbo.tag" tagRouterTestDubboForceTag = "dubbo.force.tag" @@ -45,6 +63,15 @@ const ( tagRouterTestGuangZhou = "guangzhou" tagRouterTestFalse = "false" tagRouterTestTrue = "true" + + routerPath = "/dubbo/config/dubbo/test-tag.tag-router" + routerLocalIP = "127.0.0.1" + routerZk = "zookeeper" +) + +var ( + zkFormat = "zookeeper://%s:%d" + conditionFormat = "condition://%s/com.foo.BarService" ) // MockInvoker is only mock the Invoker to support test tagRouter @@ -160,3 +187,211 @@ func TestTagRouterRouteNoForce(t *testing.T) { invRst2 := tagRouter.Route(invokers, &u1, inv) assert.Equal(t, 3, len(invRst2)) } + +func TestFilterInvoker(t *testing.T) { + u2, e2 := common.NewURL(tagRouterTestHangZhouUrl) + u3, e3 := common.NewURL(tagRouterTestShangHaiUrl) + u4, e4 := common.NewURL(tagRouterTestBeijingUrl) + u5, e5 := common.NewURL(tagRouterTestEnabledBeijingUrl) + assert.Nil(t, e2) + assert.Nil(t, e3) + assert.Nil(t, e4) + assert.Nil(t, e5) + inv2 := NewMockInvoker(u2) + inv3 := NewMockInvoker(u3) + inv4 := NewMockInvoker(u4) + inv5 := NewMockInvoker(u5) + var invokers []protocol.Invoker + invokers = append(invokers, inv2, inv3, inv4, inv5) + filterTag := func(invoker protocol.Invoker) bool { + if invoker.GetUrl().GetParam(constant.Tagkey, "") == "beijing" { + return true + } + return false + } + res := filterInvoker(invokers, filterTag) + assert.Equal(t, []protocol.Invoker{inv4, inv5}, res) + flag := true + filterEnabled := func(invoker protocol.Invoker) bool { + if invoker.GetUrl().GetParamBool(constant.RouterEnabled, false) == flag { + return true + } + return false + } + res2 := filterInvoker(invokers, filterTag, filterEnabled) + assert.Equal(t, []protocol.Invoker{inv4}, res2) +} + +type DynamicTagRouter struct { + suite.Suite + rule *RouterRule + + route *tagRouter + zkClient *zookeeper.ZookeeperClient + testCluster *zk.TestCluster + invokers []protocol.Invoker + url *common.URL +} + +func TestDynamicTagRouter(t *testing.T) { + dtg := &DynamicTagRouter{} + u1, _ := common.NewURL(tagRouterTestDynamicIpv4Provider1) + u2, _ := common.NewURL(tagRouterTestDynamicIpv4Provider2) + u3, _ := common.NewURL(tagRouterTestDynamicIpv4Provider3) + u4, _ := common.NewURL(tagRouterTestDynamicIpv4Provider4) + u5, _ := common.NewURL(tagRouterTestDynamicIpv4Provider5) + inv1 := NewMockInvoker(u1) + inv2 := NewMockInvoker(u2) + inv3 := NewMockInvoker(u3) + inv4 := NewMockInvoker(u4) + inv5 := NewMockInvoker(u5) + dtg.invokers = append(dtg.invokers, inv1, inv2, inv3, inv4, inv5) + suite.Run(t, dtg) +} + +func (suite *DynamicTagRouter) SetupTest() { + var err error + testYML := `enabled: true +scope: application +force: true +runtime: false +valid: true +priority: 1 +key: demo-provider +tags: + - name: tag1 + addresses: ["127.0.0.1:20001"] + - name: tag2 + addresses: ["127.0.0.1:20002"] + - name: tag3 + addresses: ["127.0.0.1:20003", "127.0.0.1:20004"] +` + ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) + suite.NoError(err) + err = z.Create(routerPath) + suite.NoError(err) + + suite.zkClient = z + suite.testCluster = ts + + _, err = z.Conn.Set(routerPath, []byte(testYML), 0) + suite.NoError(err) + + zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, suite.testCluster.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl) + config.GetEnvInstance().SetDynamicConfiguration(configuration) + + suite.Nil(err) + suite.NotNil(configuration) + + url, e1 := common.NewURL(tagRouterTestUserConsumerTag) + suite.Nil(e1) + + tagRouter, err := NewTagRouter(&url) + suite.Nil(err) + suite.NotNil(tagRouter) + suite.route = tagRouter + suite.url = &url +} + +func (suite *DynamicTagRouter) TearDownTest() { + suite.zkClient.Close() + suite.testCluster.Stop() +} + +func (suite *DynamicTagRouter) TestDynamicTagRouterSetByIPv4() { + invokers := suite.invokers + suite.route.Notify(invokers) + suite.NotNil(suite.route.tagRouterRule) + + consumer := &invocation.RPCInvocation{} + consumer.SetAttachments(tagRouterTestDubboTag, "tag1") + targetInvokers := suite.route.Route(invokers, suite.url, consumer) + suite.Equal(1, len(targetInvokers)) + suite.Equal(targetInvokers[0], suite.invokers[0]) + + consumer.SetAttachments(tagRouterTestDubboTag, "tag3") + targetInvokers = suite.route.Route(invokers, suite.url, consumer) + suite.Equal(2, len(targetInvokers)) + suite.Equal(targetInvokers, []protocol.Invoker{suite.invokers[2], suite.invokers[3]}) +} + +func (suite *DynamicTagRouter) TestDynamicTagRouterStaticTag() { + invokers := suite.invokers + consumer := &invocation.RPCInvocation{} + consumer.SetAttachments(tagRouterTestDubboTag, "tag4") + targetInvokers := suite.route.Route(invokers, suite.url, consumer) + suite.Equal(1, len(targetInvokers)) + suite.Equal(targetInvokers[0], suite.invokers[3]) +} + +// Teas no tag and return a address are not in dynamic tag group +func (suite *DynamicTagRouter) TestDynamicTagRouterByNoTagAndAddressMatch() { + invokers := suite.invokers + suite.route.Notify(invokers) + suite.NotNil(suite.route.tagRouterRule) + consumer := &invocation.RPCInvocation{} + targetInvokers := suite.route.Route(invokers, suite.url, consumer) + suite.Equal(1, len(targetInvokers)) + suite.Equal(targetInvokers[0], suite.invokers[4]) + // test if there are some addresses that are not in any dynamic tag group, continue to filter using the static tag group. + consumer.SetAttachments(tagRouterTestDubboTag, "tag5") + targetInvokers = suite.route.Route(invokers, suite.url, consumer) + suite.Equal(1, len(targetInvokers)) + suite.Equal(targetInvokers[0], suite.invokers[4]) +} + +func (suite *DynamicTagRouter) TestTODO() { + testYML := `enabled: true +scope: application +force: true +runtime: false +valid: true +priority: 1 +key: demo-provider +tags: + - name: tag1 + addresses: ["127.0.0.1:20001"] + - name: tag2 + addresses: ["127.0.0.1:20002"] + - name: tag3 + addresses: ["127.0.0.1:20003", "127.0.0.1:20004"] +` + _, err := suite.zkClient.Conn.Set(routerPath, []byte(testYML), 1) + suite.NoError(err) + + zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, routerLocalIP, suite.testCluster.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory(routerZk).GetDynamicConfiguration(&zkUrl) + config.GetEnvInstance().SetDynamicConfiguration(configuration) +} + +func TestProcess(t *testing.T) { + u1, err := common.NewURL(tagRouterTestUserConsumerTag) + assert.Nil(t, err) + tagRouter, e := NewTagRouter(&u1) + assert.Nil(t, e) + assert.NotNil(t, tagRouter) + + testYML := ` +scope: application +force: true +runtime: false +enabled: true +valid: true +priority: 1 +key: demo-provider +tags: + - name: beijing + addresses: [192.168.1.1, 192.168.1.2] + - name: hangzhou + addresses: [192.168.1.3, 192.168.1.4] +` + tagRouter.Process(&config_center.ConfigChangeEvent{Value: testYML, ConfigType: remoting.EventTypeAdd}) + assert.NotNil(t, tagRouter.tagRouterRule) + assert.Equal(t, []string{"beijing", "hangzhou"}, tagRouter.tagRouterRule.getTagNames()) + assert.Equal(t, []string{"192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4"}, tagRouter.tagRouterRule.getAddresses()) + assert.Equal(t, []string{"192.168.1.3", "192.168.1.4"}, tagRouter.tagRouterRule.getTagNameToAddresses()["hangzhou"]) + assert.Equal(t, []string{"beijing"}, tagRouter.tagRouterRule.getAddressToTagNames()["192.168.1.1"]) + tagRouter.Process(&config_center.ConfigChangeEvent{ConfigType: remoting.EventTypeDel}) + assert.Nil(t, tagRouter.tagRouterRule) +} diff --git a/common/constant/key.go b/common/constant/key.go index cd23dd0f1a..a86e797ae5 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -187,6 +187,9 @@ const ( HealthCheckRouterName = "health_check" // TagRouterName Specify the name of TagRouter TagRouterName = "tag" + // TagRouterRuleSuffix Specify tag router suffix + TagRouterRuleSuffix = ".tag-router" + RemoteApplicationKey = "remote.application" // ConditionRouterRuleSuffix Specify condition router suffix ConditionRouterRuleSuffix = ".condition-router"