Skip to content

Commit

Permalink
test(loadbalance): add unittests for p2c (#1686)
Browse files Browse the repository at this point in the history
* test(loadbalance): add unittests for p2c

* fix(loadbalance): fix fixed rand seed problem

* style: format  imports

* style: add apache license
  • Loading branch information
justxuewei authored Dec 25, 2021
1 parent 0e195d1 commit 6710138
Show file tree
Hide file tree
Showing 12 changed files with 352 additions and 32 deletions.
6 changes: 3 additions & 3 deletions cluster/cluster/adaptivesvc/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ func newAdaptiveServiceClusterInvoker(directory directory.Directory) protocol.In
func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := ivk.Directory.List(invocation)
if err := ivk.CheckInvokers(invokers, invocation); err != nil {
return protocol.NewRPCResult(nil, err)
return &protocol.RPCResult{Err: err}
}

// get loadBalance
lbKey := invokers[0].GetURL().GetParam(constant.LoadbalanceKey, constant.LoadBalanceKeyP2C)
if lbKey != constant.LoadBalanceKeyP2C {
return protocol.NewRPCResult(nil, perrors.Errorf("adaptive service not supports %s load balance", lbKey))
return &protocol.RPCResult{Err: perrors.Errorf("adaptive service not supports %s load balance", lbKey)}
}
lb := extension.GetLoadbalance(lbKey)

Expand All @@ -78,7 +78,7 @@ func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx context.Context, invocation
invocation.MethodName(), metrics.HillClimbing, uint64(remaining))
if err != nil {
logger.Warnf("adaptive service metrics update is failed, err: %v", err)
return protocol.NewRPCResult(nil, err)
return &protocol.RPCResult{Err: err}
}

return result
Expand Down
6 changes: 3 additions & 3 deletions cluster/cluster/available/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,18 @@ func (invoker *availableClusterInvoker) Invoke(ctx context.Context, invocation p
invokers := invoker.Directory.List(invocation)
err := invoker.CheckInvokers(invokers, invocation)
if err != nil {
return protocol.NewRPCResult(nil, err)
return &protocol.RPCResult{Err: err}
}

err = invoker.CheckWhetherDestroyed()
if err != nil {
return protocol.NewRPCResult(nil, err)
return &protocol.RPCResult{Err: err}
}

for _, ivk := range invokers {
if ivk.IsAvailable() {
return ivk.Invoke(ctx, invocation)
}
}
return protocol.NewRPCResult(nil, errors.New(fmt.Sprintf("no provider available in %v", invokers)))
return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("no provider available in %v", invokers))}
}
2 changes: 1 addition & 1 deletion cluster/cluster/available/cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestAvailableClusterInvokerSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerAvailable(invoker)

mockResult := protocol.NewRPCResult(clusterpkg.Rest{Tried: 0, Success: true}, nil)
mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}}
invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)

Expand Down
26 changes: 15 additions & 11 deletions cluster/loadbalance/p2c/loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)

var (
randSeed = func() int64 {
return time.Now().Unix()
}
)

func init() {
extension.SetLoadbalance(constant.LoadBalanceKeyP2C, newP2CLoadBalance)
}
Expand Down Expand Up @@ -69,25 +75,23 @@ func (l *p2cLoadBalance) Select(invokers []protocol.Invoker, invocation protocol
if len(invokers) == 2 {
i, j = 0, 1
} else {
rand.Seed(time.Now().Unix())
rand.Seed(randSeed())
i = rand.Intn(len(invokers))
j = i
for i == j {
j = rand.Intn(len(invokers))
}
}
logger.Debugf("[P2C select] Two invokers were selected, i: %d, j: %d, invoker[i]: %s, invoker[j]: %s.",
i, j, invokers[i], invokers[j])
logger.Debugf("[P2C select] Two invokers were selected, invoker[%d]: %s, invoker[%d]: %s.",
i, invokers[i], j, invokers[j])

// TODO(justxuewei): please consider how to get the real method name from $invoke,
// see also [#1511](https://github.com/apache/dubbo-go/issues/1511)
methodName := invocation.MethodName()
methodName := invocation.ActualMethodName()
// remainingIIface, remainingJIface means remaining capacity of node i and node j.
// If one of the metrics is empty, invoke the invocation to that node directly.
remainingIIface, err := m.GetMethodMetrics(invokers[i].GetURL(), methodName, metrics.HillClimbing)
if err != nil {
if errors.Is(err, metrics.ErrMetricsNotFound) {
logger.Debugf("[P2C select] The invoker[i] was selected, because it hasn't been selected before.")
logger.Debugf("[P2C select] The invoker[%d] was selected, because it hasn't been selected before.", i)
return invokers[i]
}
logger.Warnf("get method metrics err: %v", err)
Expand All @@ -97,7 +101,7 @@ func (l *p2cLoadBalance) Select(invokers []protocol.Invoker, invocation protocol
remainingJIface, err := m.GetMethodMetrics(invokers[j].GetURL(), methodName, metrics.HillClimbing)
if err != nil {
if errors.Is(err, metrics.ErrMetricsNotFound) {
logger.Debugf("[P2C select] The invoker[j] was selected, because it hasn't been selected before.")
logger.Debugf("[P2C select] The invoker[%d] was selected, because it hasn't been selected before.", j)
return invokers[j]
}
logger.Warnf("get method metrics err: %v", err)
Expand All @@ -116,14 +120,14 @@ func (l *p2cLoadBalance) Select(invokers []protocol.Invoker, invocation protocol
panic(fmt.Sprintf("the type of %s expects to be uint64, but gets %T", metrics.HillClimbing, remainingJIface))
}

logger.Debugf("[P2C select] The invoker[i] remaining is %d, and the invoker[j] is %d.", remainingI, remainingJ)
logger.Debugf("[P2C select] The invoker[%d] remaining is %d, and the invoker[%d] is %d.", i, remainingI, j, remainingJ)

// For the remaining capacity, the bigger, the better.
if remainingI > remainingJ {
logger.Debugf("[P2C select] The invoker[i] was selected.")
logger.Debugf("[P2C select] The invoker[%d] was selected.", i)
return invokers[i]
}

logger.Debugf("[P2C select] The invoker[j] was selected.")
logger.Debugf("[P2C select] The invoker[%d] was selected.", j)
return invokers[j]
}
178 changes: 178 additions & 0 deletions cluster/loadbalance/p2c/loadbalance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package p2c

import (
"testing"
)

import (
"github.com/golang/mock/gomock"

"github.com/stretchr/testify/assert"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster/metrics"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/protocol"
protoinvoc "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)

func TestLoadBalance(t *testing.T) {
lb := newP2CLoadBalance()
invocation := protoinvoc.NewRPCInvocation("TestMethod", []interface{}{}, nil)
randSeed = func() int64 {
return 0
}

t.Run("no invokers", func(t *testing.T) {
ivk := lb.Select([]protocol.Invoker{}, invocation)
assert.Nil(t, ivk)
})

t.Run("one invoker", func(t *testing.T) {
url0, _ := common.NewURL("dubbo://192.168.1.0:20000/com.ikurento.user.UserProvider")

ivkArr := []protocol.Invoker{
protocol.NewBaseInvoker(url0),
}
ivk := lb.Select(ivkArr, invocation)
assert.Equal(t, ivkArr[0].GetURL().String(), ivk.GetURL().String())
})

t.Run("two invokers", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

m := metrics.NewMockMetrics(ctrl)
metrics.LocalMetrics = m

url0, _ := common.NewURL("dubbo://192.168.1.0:20000/com.ikurento.user.UserProvider")
url1, _ := common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")

m.EXPECT().
GetMethodMetrics(gomock.Eq(url0), gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
Times(1).
Return(uint64(10), nil)
m.EXPECT().
GetMethodMetrics(gomock.Eq(url1), gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
Times(1).
Return(uint64(5), nil)

ivkArr := []protocol.Invoker{
protocol.NewBaseInvoker(url0),
protocol.NewBaseInvoker(url1),
}

ivk := lb.Select(ivkArr, invocation)

assert.Equal(t, ivkArr[0].GetURL().String(), ivk.GetURL().String())
})

t.Run("multiple invokers", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

m := metrics.NewMockMetrics(ctrl)
metrics.LocalMetrics = m

url0, _ := common.NewURL("dubbo://192.168.1.0:20000/com.ikurento.user.UserProvider")
url1, _ := common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
url2, _ := common.NewURL("dubbo://192.168.1.2:20000/com.ikurento.user.UserProvider")

m.EXPECT().
GetMethodMetrics(gomock.Eq(url0), gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
Times(1).
Return(uint64(10), nil)
m.EXPECT().
GetMethodMetrics(gomock.Eq(url1), gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
Times(1).
Return(uint64(5), nil)

ivkArr := []protocol.Invoker{
protocol.NewBaseInvoker(url0),
protocol.NewBaseInvoker(url1),
protocol.NewBaseInvoker(url2),
}

ivk := lb.Select(ivkArr, invocation)

assert.Equal(t, ivkArr[0].GetURL().String(), ivk.GetURL().String())
})

t.Run("metrics i not found", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

m := metrics.NewMockMetrics(ctrl)
metrics.LocalMetrics = m

url0, _ := common.NewURL("dubbo://192.168.1.0:20000/com.ikurento.user.UserProvider")
url1, _ := common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
url2, _ := common.NewURL("dubbo://192.168.1.2:20000/com.ikurento.user.UserProvider")

m.EXPECT().
GetMethodMetrics(gomock.Eq(url0), gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
Times(1).
Return(0, metrics.ErrMetricsNotFound)

ivkArr := []protocol.Invoker{
protocol.NewBaseInvoker(url0),
protocol.NewBaseInvoker(url1),
protocol.NewBaseInvoker(url2),
}

ivk := lb.Select(ivkArr, invocation)

assert.Equal(t, ivkArr[0].GetURL().String(), ivk.GetURL().String())
})

t.Run("metrics j not found", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

m := metrics.NewMockMetrics(ctrl)
metrics.LocalMetrics = m

url0, _ := common.NewURL("dubbo://192.168.1.0:20000/com.ikurento.user.UserProvider")
url1, _ := common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
url2, _ := common.NewURL("dubbo://192.168.1.2:20000/com.ikurento.user.UserProvider")

m.EXPECT().
GetMethodMetrics(gomock.Eq(url0), gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
Times(1).
Return(uint64(0), nil)

m.EXPECT().
GetMethodMetrics(gomock.Eq(url1), gomock.Eq(invocation.MethodName()), gomock.Eq(metrics.HillClimbing)).
Times(1).
Return(uint64(0), metrics.ErrMetricsNotFound)

ivkArr := []protocol.Invoker{
protocol.NewBaseInvoker(url0),
protocol.NewBaseInvoker(url1),
protocol.NewBaseInvoker(url2),
}

ivk := lb.Select(ivkArr, invocation)

assert.Equal(t, ivkArr[1].GetURL().String(), ivk.GetURL().String())
})

}
2 changes: 2 additions & 0 deletions cluster/metrics/local_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func init() {
LocalMetrics = newLocalMetrics()
}

var _ Metrics = (*localMetrics)(nil)

type localMetrics struct {
// protect metrics
lock *sync.RWMutex
Expand Down
Loading

0 comments on commit 6710138

Please sign in to comment.