Skip to content

Commit

Permalink
refactor(cluster): refactor cluster package (#1507)
Browse files Browse the repository at this point in the history
* refactor(cluster): refactor cluster/cluster

* refactor(cluster): refactor directory and loadbalance

* fix(cluster): fix import cycles

* style(cluster): go fmt

* style(cluster): format code by 3-block style

* style(cluster): go fmt

* fix(cluster): fix key bugs

* fix(cluster): fix apache license

* fix(cluster): fix constant name

* style(cluster): go fmt
  • Loading branch information
justxuewei authored Oct 11, 2021
1 parent c22d9c3 commit e213f97
Show file tree
Hide file tree
Showing 58 changed files with 818 additions and 722 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,30 @@
* limitations under the License.
*/

package cluster_impl
package available

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type availableCluster struct{}

const available = "available"

func init() {
extension.SetCluster(available, NewAvailableCluster)
extension.SetCluster(constant.ClusterKeyAvailable, NewAvailableCluster)
}

type cluster struct{}

// NewAvailableCluster returns a cluster instance
//
// Obtain available service providers
func NewAvailableCluster() cluster.Cluster {
return &availableCluster{}
func NewAvailableCluster() clusterpkg.Cluster {
return &cluster{}
}

// Join returns a baseClusterInvoker instance
func (cluster *availableCluster) Join(directory cluster.Directory) protocol.Invoker {
return buildInterceptorChain(NewAvailableClusterInvoker(directory))
func (cluster *cluster) Join(directory directory.Directory) protocol.Invoker {
return clusterpkg.BuildInterceptorChain(NewClusterInvoker(directory))
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package cluster_impl
package available

import (
"context"
Expand All @@ -27,29 +27,30 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/cluster/base"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type availableClusterInvoker struct {
baseClusterInvoker
type clusterInvoker struct {
base.ClusterInvoker
}

// NewAvailableClusterInvoker returns a cluster invoker instance
func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &availableClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
// NewClusterInvoker returns a cluster invoker instance
func NewClusterInvoker(directory directory.Directory) protocol.Invoker {
return &clusterInvoker{
ClusterInvoker: base.NewClusterInvoker(directory),
}
}

func (invoker *availableClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.Directory.List(invocation)
err := invoker.CheckInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
}

err = invoker.checkWhetherDestroyed()
err = invoker.CheckWhetherDestroyed()
if err != nil {
return &protocol.RPCResult{Err: err}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package cluster_impl
package available

import (
"context"
Expand All @@ -31,8 +31,9 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/directory/static"
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
Expand All @@ -45,14 +46,14 @@ var availableUrl, _ = common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user
constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))

func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
extension.SetLoadbalance("random", random.NewLoadBalance)
availableCluster := NewAvailableCluster()

invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)
invoker.EXPECT().GetUrl().Return(availableUrl)

staticDir := directory.NewStaticDirectory(invokers)
staticDir := static.NewDirectory(invokers)
clusterInvoker := availableCluster.Join(staticDir)
return clusterInvoker
}
Expand All @@ -64,7 +65,7 @@ func TestAvailableClusterInvokerSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerAvailable(invoker)

mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}}
invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package cluster_impl
package base

import (
perrors "github.com/pkg/errors"
Expand All @@ -24,69 +24,70 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type baseClusterInvoker struct {
directory cluster.Directory
availablecheck bool
destroyed *atomic.Bool
stickyInvoker protocol.Invoker
type ClusterInvoker struct {
Directory directory.Directory
AvailableCheck bool
Destroyed *atomic.Bool
StickyInvoker protocol.Invoker
}

func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
return baseClusterInvoker{
directory: directory,
availablecheck: true,
destroyed: atomic.NewBool(false),
func NewClusterInvoker(directory directory.Directory) ClusterInvoker {
return ClusterInvoker{
Directory: directory,
AvailableCheck: true,
Destroyed: atomic.NewBool(false),
}
}

func (invoker *baseClusterInvoker) GetURL() *common.URL {
return invoker.directory.GetURL()
func (invoker *ClusterInvoker) GetURL() *common.URL {
return invoker.Directory.GetURL()
}

func (invoker *baseClusterInvoker) Destroy() {
func (invoker *ClusterInvoker) Destroy() {
// this is must atom operation
if invoker.destroyed.CAS(false, true) {
invoker.directory.Destroy()
if invoker.Destroyed.CAS(false, true) {
invoker.Directory.Destroy()
}
}

func (invoker *baseClusterInvoker) IsAvailable() bool {
if invoker.stickyInvoker != nil {
return invoker.stickyInvoker.IsAvailable()
func (invoker *ClusterInvoker) IsAvailable() bool {
if invoker.StickyInvoker != nil {
return invoker.StickyInvoker.IsAvailable()
}
return invoker.directory.IsAvailable()
return invoker.Directory.IsAvailable()
}

// check invokers availables
func (invoker *baseClusterInvoker) checkInvokers(invokers []protocol.Invoker, invocation protocol.Invocation) error {
// CheckInvokers checks invokers' status if is available or not
func (invoker *ClusterInvoker) CheckInvokers(invokers []protocol.Invoker, invocation protocol.Invocation) error {
if len(invokers) == 0 {
ip := common.GetLocalIp()
return perrors.Errorf("Failed to invoke the method %v. No provider available for the service %v from "+
"registry %v on the consumer %v using the dubbo version %v .Please check if the providers have been started and registered.",
invocation.MethodName(), invoker.directory.GetURL().SubURL.Key(), invoker.directory.GetURL().String(), ip, constant.Version)
invocation.MethodName(), invoker.Directory.GetURL().SubURL.Key(), invoker.Directory.GetURL().String(), ip, constant.Version)
}
return nil
}

// check cluster invoker is destroyed or not
func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {
if invoker.destroyed.Load() {
// CheckWhetherDestroyed checks if cluster invoker was destroyed or not
func (invoker *ClusterInvoker) CheckWhetherDestroyed() error {
if invoker.Destroyed.Load() {
ip := common.GetLocalIp()
return perrors.Errorf("Rpc cluster invoker for %v on consumer %v use dubbo version %v is now destroyed! can not invoke any more. ",
invoker.directory.GetURL().Service(), ip, constant.Version)
invoker.Directory.GetURL().Service(), ip, constant.Version)
}
return nil
}

func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
func (invoker *ClusterInvoker) DoSelect(lb loadbalance.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
var selectedInvoker protocol.Invoker
if len(invokers) <= 0 {
return selectedInvoker
Expand All @@ -97,24 +98,24 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
// Get the service method sticky config if have
sticky = url.GetMethodParamBool(invocation.MethodName(), constant.STICKY_KEY, sticky)

if invoker.stickyInvoker != nil && !isInvoked(invoker.stickyInvoker, invokers) {
invoker.stickyInvoker = nil
if invoker.StickyInvoker != nil && !isInvoked(invoker.StickyInvoker, invokers) {
invoker.StickyInvoker = nil
}

if sticky && invoker.availablecheck &&
invoker.stickyInvoker != nil && invoker.stickyInvoker.IsAvailable() &&
(invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {
return invoker.stickyInvoker
if sticky && invoker.AvailableCheck &&
invoker.StickyInvoker != nil && invoker.StickyInvoker.IsAvailable() &&
(invoked == nil || !isInvoked(invoker.StickyInvoker, invoked)) {
return invoker.StickyInvoker
}

selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked)
if sticky {
invoker.stickyInvoker = selectedInvoker
invoker.StickyInvoker = selectedInvoker
}
return selectedInvoker
}

func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
func (invoker *ClusterInvoker) doSelectInvoker(lb loadbalance.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
if len(invokers) == 0 {
return nil
}
Expand All @@ -131,7 +132,7 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
selectedInvoker := lb.Select(invokers, invocation)

// judge if the selected Invoker is invoked and available
if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {
if (!selectedInvoker.IsAvailable() && invoker.AvailableCheck) || isInvoked(selectedInvoker, invoked) {
protocol.SetInvokerUnhealthyStatus(selectedInvoker)
otherInvokers := getOtherInvokers(invokers, selectedInvoker)
// do reselect
Expand Down Expand Up @@ -170,7 +171,7 @@ func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) boo
return false
}

func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cluster.LoadBalance {
func GetLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) loadbalance.LoadBalance {
url := invoker.GetURL()

methodName := invocation.MethodName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package cluster_impl
package base

import (
"fmt"
Expand All @@ -27,7 +27,8 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
Expand All @@ -39,20 +40,20 @@ const (
)

func TestStickyNormal(t *testing.T) {
invokers := []protocol.Invoker{}
var invokers []protocol.Invoker
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf(baseClusterInvokerFormat, i))
url.SetParam("sticky", "true")
invokers = append(invokers, NewMockInvoker(url, 1))
invokers = append(invokers, clusterpkg.NewMockInvoker(url, 1))
}
base := &baseClusterInvoker{}
base.availablecheck = true
invoked := []protocol.Invoker{}
base := &ClusterInvoker{}
base.AvailableCheck = true
var invoked []protocol.Invoker

tmpRandomBalance := loadbalance.NewRandomLoadBalance()
tmpRandomBalance := random.NewLoadBalance()
tmpInvocation := invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil)
result := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
result1 := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
result := base.DoSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
result1 := base.DoSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
assert.Equal(t, result, result1)
}

Expand All @@ -61,14 +62,14 @@ func TestStickyNormalWhenError(t *testing.T) {
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf(baseClusterInvokerFormat, i))
url.SetParam("sticky", "true")
invokers = append(invokers, NewMockInvoker(url, 1))
invokers = append(invokers, clusterpkg.NewMockInvoker(url, 1))
}
base := &baseClusterInvoker{}
base.availablecheck = true
base := &ClusterInvoker{}
base.AvailableCheck = true

invoked := []protocol.Invoker{}
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
var invoked []protocol.Invoker
result := base.DoSelect(random.NewLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
invoked = append(invoked, result)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
result1 := base.DoSelect(random.NewLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
assert.NotEqual(t, result, result1)
}
Loading

0 comments on commit e213f97

Please sign in to comment.