Skip to content

Commit

Permalink
fix: more invokers with different path (#2000)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lvnszn authored Aug 2, 2022
1 parent eae6b16 commit 9334da0
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 38 deletions.
13 changes: 9 additions & 4 deletions cluster/cluster/available/cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package available

import (
"context"
"errors"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -51,7 +52,8 @@ func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker {

invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)
invoker.EXPECT().GetUrl().Return(availableUrl)
invoker.EXPECT().GetUrl().Return(availableUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()

staticDir := static.NewDirectory(invokers)
clusterInvoker := availableCluster.Join(staticDir)
Expand All @@ -66,8 +68,8 @@ func TestAvailableClusterInvokerSuccess(t *testing.T) {
clusterInvoker := registerAvailable(invoker)

mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}}
invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()

result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})

Expand All @@ -81,7 +83,10 @@ func TestAvailableClusterInvokerNoAvail(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerAvailable(invoker)

invoker.EXPECT().IsAvailable().Return(false)
invoker.EXPECT().IsAvailable().Return(false).AnyTimes()

res := &protocol.RPCResult{Err: errors.New("no provider available")}
invoker.EXPECT().Invoke(gomock.Any()).Return(res).AnyTimes()

result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})

Expand Down
14 changes: 6 additions & 8 deletions cluster/cluster/broadcast/cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ func registerBroadcast(mockInvokers ...*mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", random.NewRandomLoadBalance)

invokers := []protocol.Invoker{}
for i, ivk := range mockInvokers {
for _, ivk := range mockInvokers {
invokers = append(invokers, ivk)
if i == 0 {
ivk.EXPECT().GetUrl().Return(broadcastUrl)
}
ivk.EXPECT().GetUrl().Return(broadcastUrl).AnyTimes()
}
staticDir := static.NewDirectory(invokers)

Expand All @@ -72,7 +70,7 @@ func TestBroadcastInvokeSuccess(t *testing.T) {
for i := 0; i < 3; i++ {
invoker := mock.NewMockInvoker(ctrl)
invokers = append(invokers, invoker)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
}

clusterInvoker := registerBroadcast(invokers...)
Expand All @@ -92,17 +90,17 @@ func TestBroadcastInvokeFailed(t *testing.T) {
for i := 0; i < 10; i++ {
invoker := mock.NewMockInvoker(ctrl)
invokers = append(invokers, invoker)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
}
{
invoker := mock.NewMockInvoker(ctrl)
invokers = append(invokers, invoker)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).AnyTimes()
}
for i := 0; i < 10; i++ {
invoker := mock.NewMockInvoker(ctrl)
invokers = append(invokers, invoker)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
}

clusterInvoker := registerBroadcast(invokers...)
Expand Down
8 changes: 4 additions & 4 deletions cluster/cluster/failback/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func registerFailback(invoker *mock.MockInvoker) protocol.Invoker {
var invokers []protocol.Invoker
invokers = append(invokers, invoker)

invoker.EXPECT().GetUrl().Return(failbackUrl)
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()

staticDir := static.NewDirectory(invokers)
clusterInvoker := failbackCluster.Join(staticDir)
Expand All @@ -73,10 +73,10 @@ func TestFailbackSuceess(t *testing.T) {

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()

invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()

mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()

result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestFailbackRetryOneSuccess(t *testing.T) {
wg.Wait()
assert.Equal(t, int64(0), clusterInvoker.taskList.Len())

invoker.EXPECT().Destroy().Return()
invoker.EXPECT().Destroy().Return().AnyTimes()
clusterInvoker.Destroy()

assert.Equal(t, int64(0), clusterInvoker.taskList.Len())
Expand Down
6 changes: 3 additions & 3 deletions cluster/cluster/failfast/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func registerFailfast(invoker *mock.MockInvoker) protocol.Invoker {
invokers = append(invokers, invoker)

invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failfastUrl)
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()

staticDir := static.NewDirectory(invokers)
clusterInvoker := failfastCluster.Join(staticDir)
Expand All @@ -74,7 +74,7 @@ func TestFailfastInvokeSuccess(t *testing.T) {

mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}}

invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})

assert.NoError(t, result.Error())
Expand All @@ -95,7 +95,7 @@ func TestFailfastInvokeFail(t *testing.T) {

mockResult := &protocol.RPCResult{Err: perrors.New("error")}

invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})

assert.NotNil(t, result.Error())
Expand Down
6 changes: 3 additions & 3 deletions cluster/cluster/failsafe/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func registerFailsafe(invoker *mock.MockInvoker) protocol.Invoker {
invokers = append(invokers, invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()

invoker.EXPECT().GetUrl().Return(failsafeUrl)
invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()

staticDir := static.NewDirectory(invokers)
clusterInvoker := failsafeCluster.Join(staticDir)
Expand All @@ -75,7 +75,7 @@ func TestFailSafeInvokeSuccess(t *testing.T) {

mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}}

invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})

assert.NoError(t, result.Error())
Expand All @@ -95,7 +95,7 @@ func TestFailSafeInvokeFail(t *testing.T) {

mockResult := &protocol.RPCResult{Err: perrors.New("error")}

invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult).AnyTimes()
result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})

assert.NoError(t, result.Error())
Expand Down
10 changes: 4 additions & 6 deletions cluster/cluster/forking/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,9 @@ func registerForking(mockInvokers ...*mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance(constant.LoadBalanceKeyRoundRobin, roundrobin.NewRRLoadBalance)

var invokers []protocol.Invoker
for i, ivk := range mockInvokers {
for _, ivk := range mockInvokers {
invokers = append(invokers, ivk)
if i == 0 {
ivk.EXPECT().GetUrl().Return(forkingUrl)
}
ivk.EXPECT().GetUrl().Return(forkingUrl).AnyTimes()
}
staticDir := static.NewDirectory(invokers)

Expand Down Expand Up @@ -145,14 +143,14 @@ func TestForkingInvokeHalfTimeout(t *testing.T) {
func(protocol.Invocation) protocol.Result {
wg.Done()
return mockResult
})
}).AnyTimes()
} else {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(protocol.Invocation) protocol.Result {
time.Sleep(2 * time.Second)
wg.Done()
return mockResult
})
}).AnyTimes()
}
}

Expand Down
3 changes: 1 addition & 2 deletions cluster/cluster/zoneaware/cluster_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type interceptor struct {
}
type interceptor struct{}

func (z *interceptor) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
key := constant.RegistryKey + "." + constant.RegistryZoneForceKey
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster/zoneaware/cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestZoneWareInvokerWithPreferredSuccess(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
return mockResult
})
}).AnyTimes()
} else {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
Expand Down
14 changes: 13 additions & 1 deletion cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,19 @@ type RouterChain struct {

// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
finalInvokers := c.invokers
finalInvokers := make([]protocol.Invoker, 0, len(c.invokers))
// multiple invoker may include different methods, find correct invoker otherwise
// will return the invoker without methods
for _, invoker := range c.invokers {
if invoker.GetURL().ServiceKey() == url.ServiceKey() {
finalInvokers = append(finalInvokers, invoker)
}
}

if len(finalInvokers) == 0 {
finalInvokers = c.invokers
}

for _, r := range c.copyRouters() {
finalInvokers = r.Route(finalInvokers, url, invocation)
}
Expand Down
1 change: 0 additions & 1 deletion cluster/router/tag/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func (p *PriorityRouter) Notify(invokers []protocol.Invoker) {
return
}
p.Process(&config_center.ConfigChangeEvent{Key: key, Value: value, ConfigType: remoting.EventTypeAdd})

}

func (p *PriorityRouter) Process(event *config_center.ConfigChangeEvent) {
Expand Down
6 changes: 4 additions & 2 deletions cluster/router/tag/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ tags:
- name: tag1
addresses: [192.168.0.1:20881]
- name: tag2
addresses: [192.168.0.2:20882]`}
addresses: [192.168.0.2:20882]`,
}
dc, _ := mockFactory.GetDynamicConfiguration(ccUrl)
common_cfg.GetEnvInstance().SetDynamicConfiguration(dc)
p.Notify(invokerList)
Expand All @@ -380,7 +381,8 @@ tags:
extension.SetDefaultConfigurator(configurator.NewMockConfigurator)
ccUrl, _ := common.NewURL("mock://127.0.0.1:1111")
mockFactory := &config_center.MockDynamicConfigurationFactory{
Content: `xxxxxx`}
Content: `xxxxxx`,
}
dc, _ := mockFactory.GetDynamicConfiguration(ccUrl)
common_cfg.GetEnvInstance().SetDynamicConfiguration(dc)
p.Notify(invokerList)
Expand Down
11 changes: 8 additions & 3 deletions common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ func (c *URL) Group() string {
return c.GetParam(constant.GroupKey, "")
}

// Interface get interface
func (c *URL) Interface() string {
return c.GetParam(constant.InterfaceKey, "")
}

// Version get group
func (c *URL) Version() string {
return c.GetParam(constant.VersionKey, "")
Expand Down Expand Up @@ -356,7 +361,7 @@ func (c *URL) Key() string {
return buildString
}

//GetCacheInvokerMapKey get directory cacheInvokerMap key
// GetCacheInvokerMapKey get directory cacheInvokerMap key
func (c *URL) GetCacheInvokerMapKey() string {
urlNew, _ := NewURL(c.PrimitiveURL)

Expand All @@ -369,7 +374,7 @@ func (c *URL) GetCacheInvokerMapKey() string {

// ServiceKey gets a unique key of a service.
func (c *URL) ServiceKey() string {
return ServiceKey(c.GetParam(constant.InterfaceKey, strings.TrimPrefix(c.Path, "/")),
return ServiceKey(c.GetParam(constant.InterfaceKey, strings.TrimPrefix(c.Path, constant.PathSeparator)),
c.GetParam(constant.GroupKey, ""), c.GetParam(constant.VersionKey, ""))
}

Expand Down Expand Up @@ -861,7 +866,7 @@ func GetCompareURLEqualFunc() CompareURLEqualFunc {
return compareURLEqualFunc
}

//GetParamDuration get duration if param is invalid or missing will return 3s
// GetParamDuration get duration if param is invalid or missing will return 3s
func (c *URL) GetParamDuration(s string, d string) time.Duration {
if t, err := time.ParseDuration(c.GetParam(s, d)); err == nil {
return t
Expand Down

0 comments on commit 9334da0

Please sign in to comment.