From e213f97fda576a1193622ed8e41b7caca615bba2 Mon Sep 17 00:00:00 2001 From: Xuewei Niu Date: Mon, 11 Oct 2021 12:53:21 +0800 Subject: [PATCH] refactor(cluster): refactor cluster package (#1507) * refactor(cluster): refactor cluster/cluster * refactor(cluster): refactor directory and loadbalance * fix(cluster): fix import cycles * style(cluster): go fmt * style(cluster): format code by 3-block style * style(cluster): go fmt * fix(cluster): fix key bugs * fix(cluster): fix apache license * fix(cluster): fix constant name * style(cluster): go fmt --- .../available/cluster.go} | 22 ++-- .../available/cluster_invoker.go} | 25 ++-- .../available/cluster_invoker_test.go} | 13 ++- .../base/cluster_invoker.go} | 79 ++++++------- .../base/cluster_invoker_test.go} | 33 +++--- .../broadcast/cluster.go} | 24 ++-- .../broadcast/cluster_invoker.go} | 23 ++-- .../broadcast/cluster_invoker_test.go} | 17 +-- cluster/{ => cluster}/cluster.go | 3 +- cluster/{ => cluster}/cluster_interceptor.go | 0 .../failback/cluster.go} | 24 ++-- .../failback/cluster_invoker.go} | 40 +++---- .../failback/cluster_test.go} | 29 ++--- .../failfast/cluster.go} | 24 ++-- .../failfast/cluster_invoker.go} | 27 ++--- .../failfast/cluster_test.go} | 23 ++-- .../failover/cluster.go} | 21 ++-- .../failover/cluster_invoker.go} | 34 +++--- .../failover/cluster_test.go} | 107 ++++-------------- .../failsafe/cluster.go} | 24 ++-- .../failsafe/cluster_invoker.go} | 24 ++-- .../failsafe/cluster_test.go} | 23 ++-- .../forking/cluster.go} | 24 ++-- .../forking/cluster_invoker.go} | 28 ++--- .../forking/cluster_test.go} | 21 ++-- .../interceptor_invoker.go | 45 +++++++- cluster/cluster/mock.go | 106 +++++++++++++++++ .../zoneaware/cluster.go} | 19 ++-- .../zoneaware/cluster_interceptor.go} | 12 +- .../zoneaware/cluster_invoker.go} | 26 ++--- .../zoneaware/cluster_invoker_test.go} | 33 +++--- .../{mock_cluster.go => import.go} | 29 ++--- .../{base_directory.go => base/directory.go} | 26 ++--- .../directory_test.go} | 16 +-- cluster/{ => directory}/directory.go | 2 +- .../directory.go} | 29 ++--- .../directory_test.go} | 6 +- .../consistenthashing/loadbalance.go | 80 +++++++++++++ .../loadbalance_test.go} | 12 +- .../selector.go} | 80 ++----------- .../loadbalance.go} | 25 ++-- .../loadbalance_test.go} | 6 +- cluster/{ => loadbalance}/loadbalance.go | 2 +- .../{random.go => random/loadbalance.go} | 25 ++-- .../loadbalance_test.go} | 28 ++--- .../loadbalance.go} | 34 +++--- .../loadbalance_test.go} | 6 +- common/constant/cluster.go | 11 +- common/constant/loadbalance.go | 25 ++++ common/extension/cluster.go | 2 +- common/extension/cluster_interceptor.go | 60 ---------- common/extension/loadbalance.go | 8 +- common/extension/registry_directory.go | 6 +- config/reference_config.go | 16 +-- filter/filter_impl/import.go | 3 +- imports/imports.go | 29 ++++- registry/directory/directory.go | 19 ++-- registry/protocol/protocol_test.go | 2 +- 58 files changed, 818 insertions(+), 722 deletions(-) rename cluster/{cluster_impl/available_cluster.go => cluster/available/cluster.go} (68%) rename cluster/{cluster_impl/available_cluster_invoker.go => cluster/available/cluster_invoker.go} (66%) rename cluster/{cluster_impl/available_cluster_invoker_test.go => cluster/available/cluster_invoker_test.go} (87%) rename cluster/{cluster_impl/base_cluster_invoker.go => cluster/base/cluster_invoker.go} (69%) rename cluster/{cluster_impl/base_cluster_invoker_test.go => cluster/base/cluster_invoker_test.go} (65%) rename cluster/{cluster_impl/broadcast_cluster.go => cluster/broadcast/cluster.go} (69%) rename cluster/{cluster_impl/broadcast_cluster_invoker.go => cluster/broadcast/cluster_invoker.go} (70%) rename cluster/{cluster_impl/broadcast_cluster_invoker_test.go => cluster/broadcast/cluster_invoker_test.go} (86%) rename cluster/{ => cluster}/cluster.go (90%) rename cluster/{ => cluster}/cluster_interceptor.go (100%) rename cluster/{cluster_impl/failback_cluster.go => cluster/failback/cluster.go} (68%) rename cluster/{cluster_impl/failback_cluster_invoker.go => cluster/failback/cluster_invoker.go} (82%) rename cluster/{cluster_impl/failback_cluster_test.go => cluster/failback/cluster_test.go} (89%) rename cluster/{cluster_impl/failfast_cluster.go => cluster/failfast/cluster.go} (68%) rename cluster/{cluster_impl/failfast_cluster_invoker.go => cluster/failfast/cluster_invoker.go} (60%) rename cluster/{cluster_impl/failfast_cluster_test.go => cluster/failfast/cluster_test.go} (83%) rename cluster/{cluster_impl/failover_cluster.go => cluster/failover/cluster.go} (71%) rename cluster/{cluster_impl/failover_cluster_invoker.go => cluster/failover/cluster_invoker.go} (79%) rename cluster/{cluster_impl/failover_cluster_test.go => cluster/failover/cluster_test.go} (59%) rename cluster/{cluster_impl/failsafe_cluster.go => cluster/failsafe/cluster.go} (67%) rename cluster/{cluster_impl/failsafe_cluster_invoker.go => cluster/failsafe/cluster_invoker.go} (76%) rename cluster/{cluster_impl/failsafe_cluster_test.go => cluster/failsafe/cluster_test.go} (83%) rename cluster/{cluster_impl/forking_cluster.go => cluster/forking/cluster.go} (69%) rename cluster/{cluster_impl/forking_cluster_invoker.go => cluster/forking/cluster_invoker.go} (76%) rename cluster/{cluster_impl/forking_cluster_test.go => cluster/forking/cluster_test.go} (86%) rename cluster/{cluster_impl => cluster}/interceptor_invoker.go (63%) create mode 100644 cluster/cluster/mock.go rename cluster/{cluster_impl/zone_aware_cluster.go => cluster/zoneaware/cluster.go} (73%) rename cluster/{cluster_impl/zone_aware_cluster_interceptor.go => cluster/zoneaware/cluster_interceptor.go} (80%) rename cluster/{cluster_impl/zone_aware_cluster_invoker.go => cluster/zoneaware/cluster_invoker.go} (82%) rename cluster/{cluster_impl/zone_aware_cluster_invoker_test.go => cluster/zoneaware/cluster_invoker_test.go} (88%) rename cluster/cluster_impl/{mock_cluster.go => import.go} (56%) rename cluster/directory/{base_directory.go => base/directory.go} (76%) rename cluster/directory/{base_directory_test.go => base/directory_test.go} (92%) rename cluster/{ => directory}/directory.go (98%) rename cluster/directory/{static_directory.go => static/directory.go} (76%) rename cluster/directory/{static_directory_test.go => static/directory_test.go} (94%) create mode 100644 cluster/loadbalance/consistenthashing/loadbalance.go rename cluster/loadbalance/{consistent_hash_test.go => consistenthashing/loadbalance_test.go} (94%) rename cluster/loadbalance/{consistent_hash.go => consistenthashing/selector.go} (52%) rename cluster/loadbalance/{least_active.go => leastactive/loadbalance.go} (81%) rename cluster/loadbalance/{least_active_test.go => leastactive/loadbalance_test.go} (95%) rename cluster/{ => loadbalance}/loadbalance.go (98%) rename cluster/loadbalance/{random.go => random/loadbalance.go} (75%) rename cluster/loadbalance/{random_test.go => random/loadbalance_test.go} (82%) rename cluster/loadbalance/{round_robin.go => roundrobin/loadbalance.go} (81%) rename cluster/loadbalance/{round_robin_test.go => roundrobin/loadbalance_test.go} (95%) create mode 100644 common/constant/loadbalance.go delete mode 100644 common/extension/cluster_interceptor.go diff --git a/cluster/cluster_impl/available_cluster.go b/cluster/cluster/available/cluster.go similarity index 68% rename from cluster/cluster_impl/available_cluster.go rename to cluster/cluster/available/cluster.go index 23a535eaa4..4320c18d7c 100644 --- a/cluster/cluster_impl/available_cluster.go +++ b/cluster/cluster/available/cluster.go @@ -15,30 +15,30 @@ * limitations under the License. */ -package cluster_impl +package available import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" + "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type availableCluster struct{} - -const available = "available" - func init() { - extension.SetCluster(available, NewAvailableCluster) + extension.SetCluster(constant.ClusterKeyAvailable, NewAvailableCluster) } +type cluster struct{} + // NewAvailableCluster returns a cluster instance // // Obtain available service providers -func NewAvailableCluster() cluster.Cluster { - return &availableCluster{} +func NewAvailableCluster() clusterpkg.Cluster { + return &cluster{} } // Join returns a baseClusterInvoker instance -func (cluster *availableCluster) Join(directory cluster.Directory) protocol.Invoker { - return buildInterceptorChain(NewAvailableClusterInvoker(directory)) +func (cluster *cluster) Join(directory directory.Directory) protocol.Invoker { + return clusterpkg.BuildInterceptorChain(NewClusterInvoker(directory)) } diff --git a/cluster/cluster_impl/available_cluster_invoker.go b/cluster/cluster/available/cluster_invoker.go similarity index 66% rename from cluster/cluster_impl/available_cluster_invoker.go rename to cluster/cluster/available/cluster_invoker.go index a9c75ec75c..fc6dea3170 100644 --- a/cluster/cluster_impl/available_cluster_invoker.go +++ b/cluster/cluster/available/cluster_invoker.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package available import ( "context" @@ -27,29 +27,30 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/cluster/base" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type availableClusterInvoker struct { - baseClusterInvoker +type clusterInvoker struct { + base.ClusterInvoker } -// NewAvailableClusterInvoker returns a cluster invoker instance -func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker { - return &availableClusterInvoker{ - baseClusterInvoker: newBaseClusterInvoker(directory), +// NewClusterInvoker returns a cluster invoker instance +func NewClusterInvoker(directory directory.Directory) protocol.Invoker { + return &clusterInvoker{ + ClusterInvoker: base.NewClusterInvoker(directory), } } -func (invoker *availableClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - invokers := invoker.directory.List(invocation) - err := invoker.checkInvokers(invokers, invocation) +func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + invokers := invoker.Directory.List(invocation) + err := invoker.CheckInvokers(invokers, invocation) if err != nil { return &protocol.RPCResult{Err: err} } - err = invoker.checkWhetherDestroyed() + err = invoker.CheckWhetherDestroyed() if err != nil { return &protocol.RPCResult{Err: err} } diff --git a/cluster/cluster_impl/available_cluster_invoker_test.go b/cluster/cluster/available/cluster_invoker_test.go similarity index 87% rename from cluster/cluster_impl/available_cluster_invoker_test.go rename to cluster/cluster/available/cluster_invoker_test.go index c97f4f2652..22d6981136 100644 --- a/cluster/cluster_impl/available_cluster_invoker_test.go +++ b/cluster/cluster/available/cluster_invoker_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package available import ( "context" @@ -31,8 +31,9 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster/directory" - "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory/static" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" @@ -45,14 +46,14 @@ var availableUrl, _ = common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker { - extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + extension.SetLoadbalance("random", random.NewLoadBalance) availableCluster := NewAvailableCluster() invokers := []protocol.Invoker{} invokers = append(invokers, invoker) invoker.EXPECT().GetUrl().Return(availableUrl) - staticDir := directory.NewStaticDirectory(invokers) + staticDir := static.NewDirectory(invokers) clusterInvoker := availableCluster.Join(staticDir) return clusterInvoker } @@ -64,7 +65,7 @@ func TestAvailableClusterInvokerSuccess(t *testing.T) { invoker := mock.NewMockInvoker(ctrl) clusterInvoker := registerAvailable(invoker) - mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}} invoker.EXPECT().IsAvailable().Return(true) invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster/base/cluster_invoker.go similarity index 69% rename from cluster/cluster_impl/base_cluster_invoker.go rename to cluster/cluster/base/cluster_invoker.go index 1d1f86636c..5b4958d753 100644 --- a/cluster/cluster_impl/base_cluster_invoker.go +++ b/cluster/cluster/base/cluster_invoker.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package base import ( perrors "github.com/pkg/errors" @@ -24,7 +24,8 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" @@ -32,61 +33,61 @@ import ( "dubbo.apache.org/dubbo-go/v3/protocol" ) -type baseClusterInvoker struct { - directory cluster.Directory - availablecheck bool - destroyed *atomic.Bool - stickyInvoker protocol.Invoker +type ClusterInvoker struct { + Directory directory.Directory + AvailableCheck bool + Destroyed *atomic.Bool + StickyInvoker protocol.Invoker } -func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker { - return baseClusterInvoker{ - directory: directory, - availablecheck: true, - destroyed: atomic.NewBool(false), +func NewClusterInvoker(directory directory.Directory) ClusterInvoker { + return ClusterInvoker{ + Directory: directory, + AvailableCheck: true, + Destroyed: atomic.NewBool(false), } } -func (invoker *baseClusterInvoker) GetURL() *common.URL { - return invoker.directory.GetURL() +func (invoker *ClusterInvoker) GetURL() *common.URL { + return invoker.Directory.GetURL() } -func (invoker *baseClusterInvoker) Destroy() { +func (invoker *ClusterInvoker) Destroy() { // this is must atom operation - if invoker.destroyed.CAS(false, true) { - invoker.directory.Destroy() + if invoker.Destroyed.CAS(false, true) { + invoker.Directory.Destroy() } } -func (invoker *baseClusterInvoker) IsAvailable() bool { - if invoker.stickyInvoker != nil { - return invoker.stickyInvoker.IsAvailable() +func (invoker *ClusterInvoker) IsAvailable() bool { + if invoker.StickyInvoker != nil { + return invoker.StickyInvoker.IsAvailable() } - return invoker.directory.IsAvailable() + return invoker.Directory.IsAvailable() } -// check invokers availables -func (invoker *baseClusterInvoker) checkInvokers(invokers []protocol.Invoker, invocation protocol.Invocation) error { +// CheckInvokers checks invokers' status if is available or not +func (invoker *ClusterInvoker) CheckInvokers(invokers []protocol.Invoker, invocation protocol.Invocation) error { if len(invokers) == 0 { ip := common.GetLocalIp() return perrors.Errorf("Failed to invoke the method %v. No provider available for the service %v from "+ "registry %v on the consumer %v using the dubbo version %v .Please check if the providers have been started and registered.", - invocation.MethodName(), invoker.directory.GetURL().SubURL.Key(), invoker.directory.GetURL().String(), ip, constant.Version) + invocation.MethodName(), invoker.Directory.GetURL().SubURL.Key(), invoker.Directory.GetURL().String(), ip, constant.Version) } return nil } -// check cluster invoker is destroyed or not -func (invoker *baseClusterInvoker) checkWhetherDestroyed() error { - if invoker.destroyed.Load() { +// CheckWhetherDestroyed checks if cluster invoker was destroyed or not +func (invoker *ClusterInvoker) CheckWhetherDestroyed() error { + if invoker.Destroyed.Load() { ip := common.GetLocalIp() return perrors.Errorf("Rpc cluster invoker for %v on consumer %v use dubbo version %v is now destroyed! can not invoke any more. ", - invoker.directory.GetURL().Service(), ip, constant.Version) + invoker.Directory.GetURL().Service(), ip, constant.Version) } return nil } -func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { +func (invoker *ClusterInvoker) DoSelect(lb loadbalance.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { var selectedInvoker protocol.Invoker if len(invokers) <= 0 { return selectedInvoker @@ -97,24 +98,24 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p // Get the service method sticky config if have sticky = url.GetMethodParamBool(invocation.MethodName(), constant.STICKY_KEY, sticky) - if invoker.stickyInvoker != nil && !isInvoked(invoker.stickyInvoker, invokers) { - invoker.stickyInvoker = nil + if invoker.StickyInvoker != nil && !isInvoked(invoker.StickyInvoker, invokers) { + invoker.StickyInvoker = nil } - if sticky && invoker.availablecheck && - invoker.stickyInvoker != nil && invoker.stickyInvoker.IsAvailable() && - (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) { - return invoker.stickyInvoker + if sticky && invoker.AvailableCheck && + invoker.StickyInvoker != nil && invoker.StickyInvoker.IsAvailable() && + (invoked == nil || !isInvoked(invoker.StickyInvoker, invoked)) { + return invoker.StickyInvoker } selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked) if sticky { - invoker.stickyInvoker = selectedInvoker + invoker.StickyInvoker = selectedInvoker } return selectedInvoker } -func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { +func (invoker *ClusterInvoker) doSelectInvoker(lb loadbalance.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { if len(invokers) == 0 { return nil } @@ -131,7 +132,7 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc selectedInvoker := lb.Select(invokers, invocation) // judge if the selected Invoker is invoked and available - if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) { + if (!selectedInvoker.IsAvailable() && invoker.AvailableCheck) || isInvoked(selectedInvoker, invoked) { protocol.SetInvokerUnhealthyStatus(selectedInvoker) otherInvokers := getOtherInvokers(invokers, selectedInvoker) // do reselect @@ -170,7 +171,7 @@ func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) boo return false } -func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cluster.LoadBalance { +func GetLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) loadbalance.LoadBalance { url := invoker.GetURL() methodName := invocation.MethodName() diff --git a/cluster/cluster_impl/base_cluster_invoker_test.go b/cluster/cluster/base/cluster_invoker_test.go similarity index 65% rename from cluster/cluster_impl/base_cluster_invoker_test.go rename to cluster/cluster/base/cluster_invoker_test.go index 81ce4905bf..fa24940e9e 100644 --- a/cluster/cluster_impl/base_cluster_invoker_test.go +++ b/cluster/cluster/base/cluster_invoker_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package base import ( "fmt" @@ -27,7 +27,8 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/invocation" @@ -39,20 +40,20 @@ const ( ) func TestStickyNormal(t *testing.T) { - invokers := []protocol.Invoker{} + var invokers []protocol.Invoker for i := 0; i < 10; i++ { url, _ := common.NewURL(fmt.Sprintf(baseClusterInvokerFormat, i)) url.SetParam("sticky", "true") - invokers = append(invokers, NewMockInvoker(url, 1)) + invokers = append(invokers, clusterpkg.NewMockInvoker(url, 1)) } - base := &baseClusterInvoker{} - base.availablecheck = true - invoked := []protocol.Invoker{} + base := &ClusterInvoker{} + base.AvailableCheck = true + var invoked []protocol.Invoker - tmpRandomBalance := loadbalance.NewRandomLoadBalance() + tmpRandomBalance := random.NewLoadBalance() tmpInvocation := invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil) - result := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked) - result1 := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked) + result := base.DoSelect(tmpRandomBalance, tmpInvocation, invokers, invoked) + result1 := base.DoSelect(tmpRandomBalance, tmpInvocation, invokers, invoked) assert.Equal(t, result, result1) } @@ -61,14 +62,14 @@ func TestStickyNormalWhenError(t *testing.T) { for i := 0; i < 10; i++ { url, _ := common.NewURL(fmt.Sprintf(baseClusterInvokerFormat, i)) url.SetParam("sticky", "true") - invokers = append(invokers, NewMockInvoker(url, 1)) + invokers = append(invokers, clusterpkg.NewMockInvoker(url, 1)) } - base := &baseClusterInvoker{} - base.availablecheck = true + base := &ClusterInvoker{} + base.AvailableCheck = true - invoked := []protocol.Invoker{} - result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked) + var invoked []protocol.Invoker + result := base.DoSelect(random.NewLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked) invoked = append(invoked, result) - result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked) + result1 := base.DoSelect(random.NewLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked) assert.NotEqual(t, result, result1) } diff --git a/cluster/cluster_impl/broadcast_cluster.go b/cluster/cluster/broadcast/cluster.go similarity index 69% rename from cluster/cluster_impl/broadcast_cluster.go rename to cluster/cluster/broadcast/cluster.go index c2e8955b44..bc80550c17 100644 --- a/cluster/cluster_impl/broadcast_cluster.go +++ b/cluster/cluster/broadcast/cluster.go @@ -15,31 +15,31 @@ * limitations under the License. */ -package cluster_impl +package broadcast import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" + "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type broadcastCluster struct{} - -const broadcast = "broadcast" - func init() { - extension.SetCluster(broadcast, NewBroadcastCluster) + extension.SetCluster(constant.ClusterKeyBroadcast, NewCluster) } -// NewBroadcastCluster returns a broadcast cluster instance. +type cluster struct{} + +// NewCluster returns a broadcast cluster instance. // // Calling all providers' broadcast one by one. All errors will be reported. // It is usually used to notify all providers to update local resource information such as caches or logs. -func NewBroadcastCluster() cluster.Cluster { - return &broadcastCluster{} +func NewCluster() clusterpkg.Cluster { + return &cluster{} } // Join returns a baseClusterInvoker instance -func (cluster *broadcastCluster) Join(directory cluster.Directory) protocol.Invoker { - return buildInterceptorChain(newBroadcastClusterInvoker(directory)) +func (cluster *cluster) Join(directory directory.Directory) protocol.Invoker { + return clusterpkg.BuildInterceptorChain(newClusterInvoker(directory)) } diff --git a/cluster/cluster_impl/broadcast_cluster_invoker.go b/cluster/cluster/broadcast/cluster_invoker.go similarity index 70% rename from cluster/cluster_impl/broadcast_cluster_invoker.go rename to cluster/cluster/broadcast/cluster_invoker.go index 93a1b13637..50286068f8 100644 --- a/cluster/cluster_impl/broadcast_cluster_invoker.go +++ b/cluster/cluster/broadcast/cluster_invoker.go @@ -15,36 +15,37 @@ * limitations under the License. */ -package cluster_impl +package broadcast import ( "context" ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/cluster/base" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type broadcastClusterInvoker struct { - baseClusterInvoker +type clusterInvoker struct { + base.ClusterInvoker } -func newBroadcastClusterInvoker(directory cluster.Directory) protocol.Invoker { - return &broadcastClusterInvoker{ - baseClusterInvoker: newBaseClusterInvoker(directory), +func newClusterInvoker(directory directory.Directory) protocol.Invoker { + return &clusterInvoker{ + ClusterInvoker: base.NewClusterInvoker(directory), } } // nolint -func (invoker *broadcastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - invokers := invoker.directory.List(invocation) - err := invoker.checkInvokers(invokers, invocation) +func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + invokers := invoker.Directory.List(invocation) + err := invoker.CheckInvokers(invokers, invocation) if err != nil { return &protocol.RPCResult{Err: err} } - err = invoker.checkWhetherDestroyed() + err = invoker.CheckWhetherDestroyed() if err != nil { return &protocol.RPCResult{Err: err} } diff --git a/cluster/cluster_impl/broadcast_cluster_invoker_test.go b/cluster/cluster/broadcast/cluster_invoker_test.go similarity index 86% rename from cluster/cluster_impl/broadcast_cluster_invoker_test.go rename to cluster/cluster/broadcast/cluster_invoker_test.go index 678598e9a4..356151d78f 100644 --- a/cluster/cluster_impl/broadcast_cluster_invoker_test.go +++ b/cluster/cluster/broadcast/cluster_invoker_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package broadcast import ( "context" @@ -31,8 +31,9 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster/directory" - "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory/static" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" @@ -45,7 +46,7 @@ var broadcastUrl, _ = common.NewURL( fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) func registerBroadcast(mockInvokers ...*mock.MockInvoker) protocol.Invoker { - extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + extension.SetLoadbalance("random", random.NewLoadBalance) invokers := []protocol.Invoker{} for i, ivk := range mockInvokers { @@ -54,9 +55,9 @@ func registerBroadcast(mockInvokers ...*mock.MockInvoker) protocol.Invoker { ivk.EXPECT().GetUrl().Return(broadcastUrl) } } - staticDir := directory.NewStaticDirectory(invokers) + staticDir := static.NewDirectory(invokers) - broadcastCluster := NewBroadcastCluster() + broadcastCluster := NewCluster() clusterInvoker := broadcastCluster.Join(staticDir) return clusterInvoker } @@ -67,7 +68,7 @@ func TestBroadcastInvokeSuccess(t *testing.T) { invokers := make([]*mock.MockInvoker, 0) - mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}} for i := 0; i < 3; i++ { invoker := mock.NewMockInvoker(ctrl) invokers = append(invokers, invoker) @@ -86,7 +87,7 @@ func TestBroadcastInvokeFailed(t *testing.T) { invokers := make([]*mock.MockInvoker, 0) - mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}} mockFailedResult := &protocol.RPCResult{Err: errors.New("just failed")} for i := 0; i < 10; i++ { invoker := mock.NewMockInvoker(ctrl) diff --git a/cluster/cluster.go b/cluster/cluster/cluster.go similarity index 90% rename from cluster/cluster.go rename to cluster/cluster/cluster.go index 1a528da41c..8f810c9cfa 100644 --- a/cluster/cluster.go +++ b/cluster/cluster/cluster.go @@ -18,11 +18,12 @@ package cluster import ( + "dubbo.apache.org/dubbo-go/v3/cluster/directory" "dubbo.apache.org/dubbo-go/v3/protocol" ) // Cluster // Extension - Cluster type Cluster interface { - Join(Directory) protocol.Invoker + Join(directory.Directory) protocol.Invoker } diff --git a/cluster/cluster_interceptor.go b/cluster/cluster/cluster_interceptor.go similarity index 100% rename from cluster/cluster_interceptor.go rename to cluster/cluster/cluster_interceptor.go diff --git a/cluster/cluster_impl/failback_cluster.go b/cluster/cluster/failback/cluster.go similarity index 68% rename from cluster/cluster_impl/failback_cluster.go rename to cluster/cluster/failback/cluster.go index 766657079f..f3a942da96 100644 --- a/cluster/cluster_impl/failback_cluster.go +++ b/cluster/cluster/failback/cluster.go @@ -15,31 +15,31 @@ * limitations under the License. */ -package cluster_impl +package failback import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" + "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type failbackCluster struct{} - -const failback = "failback" - func init() { - extension.SetCluster(failback, NewFailbackCluster) + extension.SetCluster(constant.ClusterKeyFailback, NewCluster) } -// NewFailbackCluster returns a failback cluster instance +type cluster struct{} + +// NewCluster returns a failback cluster instance // // Failure automatically restored, failed to record the background request, // regular retransmission. Usually used for message notification operations. -func NewFailbackCluster() cluster.Cluster { - return &failbackCluster{} +func NewCluster() clusterpkg.Cluster { + return &cluster{} } // Join returns a baseClusterInvoker instance -func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker { - return buildInterceptorChain(newFailbackClusterInvoker(directory)) +func (cluster *cluster) Join(directory directory.Directory) protocol.Invoker { + return clusterpkg.BuildInterceptorChain(newClusterInvoker(directory)) } diff --git a/cluster/cluster_impl/failback_cluster_invoker.go b/cluster/cluster/failback/cluster_invoker.go similarity index 82% rename from cluster/cluster_impl/failback_cluster_invoker.go rename to cluster/cluster/failback/cluster_invoker.go index 38bbd7a94c..4e677fa303 100644 --- a/cluster/cluster_impl/failback_cluster_invoker.go +++ b/cluster/cluster/failback/cluster_invoker.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package failback import ( "context" @@ -29,7 +29,9 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/cluster/base" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/common/logger" @@ -42,8 +44,8 @@ import ( * * Failback */ -type failbackClusterInvoker struct { - baseClusterInvoker +type clusterInvoker struct { + base.ClusterInvoker once sync.Once ticker *time.Ticker @@ -52,9 +54,9 @@ type failbackClusterInvoker struct { taskList *queue.Queue } -func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { - invoker := &failbackClusterInvoker{ - baseClusterInvoker: newBaseClusterInvoker(directory), +func newClusterInvoker(directory directory.Directory) protocol.Invoker { + invoker := &clusterInvoker{ + ClusterInvoker: base.NewClusterInvoker(directory), } retriesConfig := invoker.GetURL().GetParam(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES) retries, err := strconv.Atoi(retriesConfig) @@ -72,11 +74,11 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { return invoker } -func (invoker *failbackClusterInvoker) tryTimerTaskProc(ctx context.Context, retryTask *retryTimerTask) { +func (invoker *clusterInvoker) tryTimerTaskProc(ctx context.Context, retryTask *retryTimerTask) { invoked := make([]protocol.Invoker, 0) invoked = append(invoked, retryTask.lastInvoker) - retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked) + retryInvoker := invoker.DoSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked) result := retryInvoker.Invoke(ctx, retryTask.invocation) if result.Error() != nil { retryTask.lastInvoker = retryInvoker @@ -84,7 +86,7 @@ func (invoker *failbackClusterInvoker) tryTimerTaskProc(ctx context.Context, ret } } -func (invoker *failbackClusterInvoker) process(ctx context.Context) { +func (invoker *clusterInvoker) process(ctx context.Context) { invoker.ticker = time.NewTicker(time.Second * 1) for range invoker.ticker.C { // check each timeout task and re-run @@ -112,7 +114,7 @@ func (invoker *failbackClusterInvoker) process(ctx context.Context) { } } -func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) { +func (invoker *clusterInvoker) checkRetry(retryTask *retryTimerTask, err error) { logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.\n", retryTask.invocation.MethodName(), invoker.GetURL().Service(), err.Error()) retryTask.retries++ @@ -129,9 +131,9 @@ func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err } // nolint -func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - invokers := invoker.directory.List(invocation) - if err := invoker.checkInvokers(invokers, invocation); err != nil { +func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + invokers := invoker.Directory.List(invocation) + if err := invoker.CheckInvokers(invokers, invocation); err != nil { logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n", invocation.MethodName(), invoker.GetURL().Service(), err) return &protocol.RPCResult{} @@ -148,7 +150,7 @@ func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation pr loadBalance := extension.GetLoadbalance(lb) invoked := make([]protocol.Invoker, 0, len(invokers)) - ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked) + ivk := invoker.DoSelect(loadBalance, invocation, invokers, invoked) // DO INVOKE result := ivk.Invoke(ctx, invocation) if result.Error() != nil { @@ -174,8 +176,8 @@ func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation pr return result } -func (invoker *failbackClusterInvoker) Destroy() { - invoker.baseClusterInvoker.Destroy() +func (invoker *clusterInvoker) Destroy() { + invoker.ClusterInvoker.Destroy() // stop ticker if invoker.ticker != nil { @@ -186,7 +188,7 @@ func (invoker *failbackClusterInvoker) Destroy() { } type retryTimerTask struct { - loadbalance cluster.LoadBalance + loadbalance loadbalance.LoadBalance invocation protocol.Invocation invokers []protocol.Invoker lastInvoker protocol.Invoker @@ -194,7 +196,7 @@ type retryTimerTask struct { lastT time.Time } -func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, +func newRetryTimerTask(loadbalance loadbalance.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, lastInvoker protocol.Invoker) *retryTimerTask { return &retryTimerTask{ loadbalance: loadbalance, diff --git a/cluster/cluster_impl/failback_cluster_test.go b/cluster/cluster/failback/cluster_test.go similarity index 89% rename from cluster/cluster_impl/failback_cluster_test.go rename to cluster/cluster/failback/cluster_test.go index f730c9d745..b43d196e90 100644 --- a/cluster/cluster_impl/failback_cluster_test.go +++ b/cluster/cluster/failback/cluster_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package failback import ( "context" @@ -34,8 +34,9 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster/directory" - "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory/static" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" @@ -49,15 +50,15 @@ var failbackUrl, _ = common.NewURL( // registerFailback register failbackCluster to cluster extension. func registerFailback(invoker *mock.MockInvoker) protocol.Invoker { - extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) - failbackCluster := NewFailbackCluster() + extension.SetLoadbalance("random", random.NewLoadBalance) + failbackCluster := NewCluster() - invokers := []protocol.Invoker{} + var invokers []protocol.Invoker invokers = append(invokers, invoker) invoker.EXPECT().GetUrl().Return(failbackUrl) - staticDir := directory.NewStaticDirectory(invokers) + staticDir := static.NewDirectory(invokers) clusterInvoker := failbackCluster.Join(staticDir) return clusterInvoker } @@ -68,13 +69,13 @@ func TestFailbackSuceess(t *testing.T) { defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*clusterInvoker) invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() invoker.EXPECT().IsAvailable().Return(true) - mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}} invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) @@ -87,7 +88,7 @@ func TestFailbackRetryOneSuccess(t *testing.T) { defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*clusterInvoker) invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() invoker.EXPECT().IsAvailable().Return(true) @@ -100,7 +101,7 @@ func TestFailbackRetryOneSuccess(t *testing.T) { var wg sync.WaitGroup wg.Add(1) now := time.Now() - mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + mockSuccResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}} invoker.EXPECT().IsAvailable().Return(true) invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result { delta := time.Since(now).Nanoseconds() / int64(time.Second) @@ -132,7 +133,7 @@ func TestFailbackRetryFailed(t *testing.T) { defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*clusterInvoker) invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() invoker.EXPECT().IsAvailable().Return(true).AnyTimes() @@ -179,7 +180,7 @@ func TestFailbackRetryFailed10Times(t *testing.T) { defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*clusterInvoker) clusterInvoker.maxRetries = 10 invoker.EXPECT().IsAvailable().Return(true).AnyTimes() @@ -222,7 +223,7 @@ func TestFailbackOutOfLimit(t *testing.T) { defer ctrl.Finish() invoker := mock.NewMockInvoker(ctrl) - clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) + clusterInvoker := registerFailback(invoker).(*clusterInvoker) clusterInvoker.failbackTasks = 1 invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() diff --git a/cluster/cluster_impl/failfast_cluster.go b/cluster/cluster/failfast/cluster.go similarity index 68% rename from cluster/cluster_impl/failfast_cluster.go rename to cluster/cluster/failfast/cluster.go index a8022d5f23..b7aa346d96 100644 --- a/cluster/cluster_impl/failfast_cluster.go +++ b/cluster/cluster/failfast/cluster.go @@ -15,31 +15,31 @@ * limitations under the License. */ -package cluster_impl +package failfast import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" + "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type failfastCluster struct{} - -const failfast = "failfast" - func init() { - extension.SetCluster(failfast, NewFailFastCluster) + extension.SetCluster(constant.ClusterKeyFailfast, NewCluster) } -// NewFailFastCluster returns a failfast cluster instance. +type cluster struct{} + +// NewCluster returns a failfast cluster instance. // // Fast failure, only made a call, failure immediately error. Usually used for non-idempotent write operations, // such as adding records. -func NewFailFastCluster() cluster.Cluster { - return &failfastCluster{} +func NewCluster() clusterpkg.Cluster { + return &cluster{} } // Join returns a baseClusterInvoker instance -func (cluster *failfastCluster) Join(directory cluster.Directory) protocol.Invoker { - return buildInterceptorChain(newFailFastClusterInvoker(directory)) +func (cluster *cluster) Join(directory directory.Directory) protocol.Invoker { + return clusterpkg.BuildInterceptorChain(newClusterInvoker(directory)) } diff --git a/cluster/cluster_impl/failfast_cluster_invoker.go b/cluster/cluster/failfast/cluster_invoker.go similarity index 60% rename from cluster/cluster_impl/failfast_cluster_invoker.go rename to cluster/cluster/failfast/cluster_invoker.go index 1f62c7581c..306236255c 100644 --- a/cluster/cluster_impl/failfast_cluster_invoker.go +++ b/cluster/cluster/failfast/cluster_invoker.go @@ -15,42 +15,43 @@ * limitations under the License. */ -package cluster_impl +package failfast import ( "context" ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/cluster/base" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type failfastClusterInvoker struct { - baseClusterInvoker +type clusterInvoker struct { + base.ClusterInvoker } -func newFailFastClusterInvoker(directory cluster.Directory) protocol.Invoker { - return &failfastClusterInvoker{ - baseClusterInvoker: newBaseClusterInvoker(directory), +func newClusterInvoker(directory directory.Directory) protocol.Invoker { + return &clusterInvoker{ + ClusterInvoker: base.NewClusterInvoker(directory), } } // nolint -func (invoker *failfastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - invokers := invoker.directory.List(invocation) - err := invoker.checkInvokers(invokers, invocation) +func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + invokers := invoker.Directory.List(invocation) + err := invoker.CheckInvokers(invokers, invocation) if err != nil { return &protocol.RPCResult{Err: err} } - loadbalance := getLoadBalance(invokers[0], invocation) + loadbalance := base.GetLoadBalance(invokers[0], invocation) - err = invoker.checkWhetherDestroyed() + err = invoker.CheckWhetherDestroyed() if err != nil { return &protocol.RPCResult{Err: err} } - ivk := invoker.doSelect(loadbalance, invocation, invokers, nil) + ivk := invoker.DoSelect(loadbalance, invocation, invokers, nil) return ivk.Invoke(ctx, invocation) } diff --git a/cluster/cluster_impl/failfast_cluster_test.go b/cluster/cluster/failfast/cluster_test.go similarity index 83% rename from cluster/cluster_impl/failfast_cluster_test.go rename to cluster/cluster/failfast/cluster_test.go index 74d570730a..a739dac78b 100644 --- a/cluster/cluster_impl/failfast_cluster_test.go +++ b/cluster/cluster/failfast/cluster_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package failfast import ( "context" @@ -32,8 +32,9 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster/directory" - "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory/static" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" @@ -47,16 +48,16 @@ var failfastUrl, _ = common.NewURL( // registerFailfast register failfastCluster to cluster extension. func registerFailfast(invoker *mock.MockInvoker) protocol.Invoker { - extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) - failfastCluster := NewFailFastCluster() + extension.SetLoadbalance("random", random.NewLoadBalance) + failfastCluster := NewCluster() - invokers := []protocol.Invoker{} + var invokers []protocol.Invoker invokers = append(invokers, invoker) invoker.EXPECT().IsAvailable().Return(true).AnyTimes() invoker.EXPECT().GetUrl().Return(failfastUrl) - staticDir := directory.NewStaticDirectory(invokers) + staticDir := static.NewDirectory(invokers) clusterInvoker := failfastCluster.Join(staticDir) return clusterInvoker } @@ -71,15 +72,15 @@ func TestFailfastInvokeSuccess(t *testing.T) { invoker.EXPECT().IsAvailable().Return(true).AnyTimes() invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes() - mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}} invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.NoError(t, result.Error()) - res := result.Result().(rest) - assert.True(t, res.success) - assert.Equal(t, 0, res.tried) + res := result.Result().(clusterpkg.Rest) + assert.True(t, res.Success) + assert.Equal(t, 0, res.Tried) } func TestFailfastInvokeFail(t *testing.T) { diff --git a/cluster/cluster_impl/failover_cluster.go b/cluster/cluster/failover/cluster.go similarity index 71% rename from cluster/cluster_impl/failover_cluster.go rename to cluster/cluster/failover/cluster.go index 84dee3a3ae..56bc757cd6 100644 --- a/cluster/cluster_impl/failover_cluster.go +++ b/cluster/cluster/failover/cluster.go @@ -15,31 +15,32 @@ * limitations under the License. */ -package cluster_impl +package failover import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type failoverCluster struct{} - func init() { - extension.SetCluster(constant.FAILOVER_CLUSTER_NAME, NewFailoverCluster) + extension.SetCluster(constant.ClusterKeyFailover, newCluster) } -// NewFailoverCluster returns a failover cluster instance +type cluster struct{} + +// newCluster returns a failover cluster instance // // Failure automatically switch, when there is a failure, // retry the other server (default). Usually used for read operations, // but retries can result in longer delays. -func NewFailoverCluster() cluster.Cluster { - return &failoverCluster{} +func newCluster() clusterpkg.Cluster { + return &cluster{} } // Join returns a baseClusterInvoker instance -func (cluster *failoverCluster) Join(directory cluster.Directory) protocol.Invoker { - return buildInterceptorChain(newFailoverClusterInvoker(directory)) +func (cluster *cluster) Join(directory directory.Directory) protocol.Invoker { + return clusterpkg.BuildInterceptorChain(newClusterInvoker(directory)) } diff --git a/cluster/cluster_impl/failover_cluster_invoker.go b/cluster/cluster/failover/cluster_invoker.go similarity index 79% rename from cluster/cluster_impl/failover_cluster_invoker.go rename to cluster/cluster/failover/cluster_invoker.go index 44a938c5ea..6440627cd6 100644 --- a/cluster/cluster_impl/failover_cluster_invoker.go +++ b/cluster/cluster/failover/cluster_invoker.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package failover import ( "context" @@ -28,25 +28,25 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/cluster/base" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type failoverClusterInvoker struct { - baseClusterInvoker +type clusterInvoker struct { + base.ClusterInvoker } -func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { - return &failoverClusterInvoker{ - baseClusterInvoker: newBaseClusterInvoker(directory), +func newClusterInvoker(directory directory.Directory) protocol.Invoker { + return &clusterInvoker{ + ClusterInvoker: base.NewClusterInvoker(directory), } } -// nolint -func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { +func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { var ( result protocol.Result invoked []protocol.Invoker @@ -54,29 +54,29 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr ivk protocol.Invoker ) - invokers := invoker.directory.List(invocation) - if err := invoker.checkInvokers(invokers, invocation); err != nil { + invokers := invoker.Directory.List(invocation) + if err := invoker.CheckInvokers(invokers, invocation); err != nil { return &protocol.RPCResult{Err: err} } methodName := invocation.MethodName() retries := getRetries(invokers, methodName) - loadBalance := getLoadBalance(invokers[0], invocation) + loadBalance := base.GetLoadBalance(invokers[0], invocation) for i := 0; i <= retries; i++ { // Reselect before retry to avoid a change of candidate `invokers`. // NOTE: if `invokers` changed, then `invoked` also lose accuracy. if i > 0 { - if err := invoker.checkWhetherDestroyed(); err != nil { + if err := invoker.CheckWhetherDestroyed(); err != nil { return &protocol.RPCResult{Err: err} } - invokers = invoker.directory.List(invocation) - if err := invoker.checkInvokers(invokers, invocation); err != nil { + invokers = invoker.Directory.List(invocation) + if err := invoker.CheckInvokers(invokers, invocation); err != nil { return &protocol.RPCResult{Err: err} } } - ivk = invoker.doSelect(loadBalance, invocation, invokers, invoked) + ivk = invoker.DoSelect(loadBalance, invocation, invokers, invoked) if ivk == nil { continue } @@ -91,7 +91,7 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr } ip := common.GetLocalIp() invokerSvc := invoker.GetURL().Service() - invokerUrl := invoker.directory.GetURL() + invokerUrl := invoker.Directory.GetURL() if ivk == nil { logger.Errorf("Failed to invoke the method %s of the service %s .No provider is available.", methodName, invokerSvc) return &protocol.RPCResult{ diff --git a/cluster/cluster_impl/failover_cluster_test.go b/cluster/cluster/failover/cluster_test.go similarity index 59% rename from cluster/cluster_impl/failover_cluster_test.go rename to cluster/cluster/failover/cluster_test.go index eb29270f73..ad0aae36d8 100644 --- a/cluster/cluster_impl/failover_cluster_test.go +++ b/cluster/cluster/failover/cluster_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package failover import ( "context" @@ -25,101 +25,32 @@ import ( ) import ( - perrors "github.com/pkg/errors" - "github.com/stretchr/testify/assert" ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster/directory" - "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory/static" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" - "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/invocation" ) -// nolint -type MockInvoker struct { - url *common.URL - available bool - destroyed bool - - successCount int -} - -// nolint -func NewMockInvoker(url *common.URL, successCount int) *MockInvoker { - return &MockInvoker{ - url: url, - available: true, - destroyed: false, - successCount: successCount, - } -} - -// nolint -func (bi *MockInvoker) GetURL() *common.URL { - return bi.url -} - -// nolint -func (bi *MockInvoker) IsAvailable() bool { - return bi.available -} - -// nolint -func (bi *MockInvoker) IsDestroyed() bool { - return bi.destroyed -} - -// nolint -type rest struct { - tried int - success bool -} - -// nolint -func (bi *MockInvoker) Invoke(c context.Context, invocation protocol.Invocation) protocol.Result { - count++ - var ( - success bool - err error - ) - if count >= bi.successCount { - success = true - } else { - err = perrors.New("error") - } - result := &protocol.RPCResult{Err: err, Rest: rest{tried: count, success: success}} - - return result -} - -// nolint -func (bi *MockInvoker) Destroy() { - logger.Infof("Destroy invoker: %v", bi.GetURL().String()) - bi.destroyed = true - bi.available = false -} - -// nolint -var count int - // nolint func normalInvoke(successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result { - extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) - failoverCluster := NewFailoverCluster() + extension.SetLoadbalance("random", random.NewLoadBalance) + failoverCluster := newCluster() - invokers := []protocol.Invoker{} + var invokers []protocol.Invoker for i := 0; i < 10; i++ { newUrl, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i), common.WithParams(urlParam)) - invokers = append(invokers, NewMockInvoker(newUrl, successCount)) + invokers = append(invokers, clusterpkg.NewMockInvoker(newUrl, successCount)) } - staticDir := directory.NewStaticDirectory(invokers) + staticDir := static.NewDirectory(invokers) clusterInvoker := failoverCluster.Join(staticDir) if len(invocations) > 0 { return clusterInvoker.Invoke(context.Background(), invocations[0]) @@ -132,7 +63,7 @@ func TestFailoverInvokeSuccess(t *testing.T) { urlParams := url.Values{} result := normalInvoke(3, urlParams) assert.NoError(t, result.Error()) - count = 0 + clusterpkg.Count = 0 } // nolint @@ -140,7 +71,7 @@ func TestFailoverInvokeFail(t *testing.T) { urlParams := url.Values{} result := normalInvoke(4, urlParams) assert.Errorf(t, result.Error(), "error") - count = 0 + clusterpkg.Count = 0 } // nolint @@ -149,7 +80,7 @@ func TestFailoverInvoke1(t *testing.T) { urlParams.Set(constant.RETRIES_KEY, "3") result := normalInvoke(4, urlParams) assert.NoError(t, result.Error()) - count = 0 + clusterpkg.Count = 0 } // nolint @@ -161,26 +92,26 @@ func TestFailoverInvoke2(t *testing.T) { ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test")) result := normalInvoke(4, urlParams, ivc) assert.NoError(t, result.Error()) - count = 0 + clusterpkg.Count = 0 } // nolint func TestFailoverDestroy(t *testing.T) { - extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) - failoverCluster := NewFailoverCluster() + extension.SetLoadbalance("random", random.NewLoadBalance) + failoverCluster := newCluster() invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) - invokers = append(invokers, NewMockInvoker(url, 1)) + u, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invokers = append(invokers, clusterpkg.NewMockInvoker(u, 1)) } - staticDir := directory.NewStaticDirectory(invokers) + staticDir := static.NewDirectory(invokers) clusterInvoker := failoverCluster.Join(staticDir) assert.Equal(t, true, clusterInvoker.IsAvailable()) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.NoError(t, result.Error()) - count = 0 + clusterpkg.Count = 0 clusterInvoker.Destroy() assert.Equal(t, false, clusterInvoker.IsAvailable()) } diff --git a/cluster/cluster_impl/failsafe_cluster.go b/cluster/cluster/failsafe/cluster.go similarity index 67% rename from cluster/cluster_impl/failsafe_cluster.go rename to cluster/cluster/failsafe/cluster.go index ac55e44b1d..048781b54d 100644 --- a/cluster/cluster_impl/failsafe_cluster.go +++ b/cluster/cluster/failsafe/cluster.go @@ -15,31 +15,31 @@ * limitations under the License. */ -package cluster_impl +package failsafe import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" + "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type failsafeCluster struct{} - -const failsafe = "failsafe" - func init() { - extension.SetCluster(failsafe, NewFailsafeCluster) + extension.SetCluster(constant.ClusterKeyFailsafe, newCluster) } -// NewFailsafeCluster returns a failsafe cluster instance. +type cluster struct{} + +// newCluster returns a failsafe cluster instance. // // Failure of security, anomalies, directly ignored. Usually it is // used to write audit logs and other operations. -func NewFailsafeCluster() cluster.Cluster { - return &failsafeCluster{} +func newCluster() clusterpkg.Cluster { + return &cluster{} } // Join returns a baseClusterInvoker instance -func (cluster *failsafeCluster) Join(directory cluster.Directory) protocol.Invoker { - return buildInterceptorChain(newFailsafeClusterInvoker(directory)) +func (cluster *cluster) Join(directory directory.Directory) protocol.Invoker { + return clusterpkg.BuildInterceptorChain(newClusterInvoker(directory)) } diff --git a/cluster/cluster_impl/failsafe_cluster_invoker.go b/cluster/cluster/failsafe/cluster_invoker.go similarity index 76% rename from cluster/cluster_impl/failsafe_cluster_invoker.go rename to cluster/cluster/failsafe/cluster_invoker.go index aea0858898..b9ba62f8b5 100644 --- a/cluster/cluster_impl/failsafe_cluster_invoker.go +++ b/cluster/cluster/failsafe/cluster_invoker.go @@ -15,14 +15,15 @@ * limitations under the License. */ -package cluster_impl +package failsafe import ( "context" ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/cluster/base" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/common/logger" @@ -36,21 +37,20 @@ import ( * Fail-safe * */ -type failsafeClusterInvoker struct { - baseClusterInvoker +type clusterInvoker struct { + base.ClusterInvoker } -func newFailsafeClusterInvoker(directory cluster.Directory) protocol.Invoker { - return &failsafeClusterInvoker{ - baseClusterInvoker: newBaseClusterInvoker(directory), +func newClusterInvoker(directory directory.Directory) protocol.Invoker { + return &clusterInvoker{ + ClusterInvoker: base.NewClusterInvoker(directory), } } -// nolint -func (invoker *failsafeClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - invokers := invoker.directory.List(invocation) +func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + invokers := invoker.Directory.List(invocation) - err := invoker.checkInvokers(invokers, invocation) + err := invoker.CheckInvokers(invokers, invocation) if err != nil { return &protocol.RPCResult{} } @@ -68,7 +68,7 @@ func (invoker *failsafeClusterInvoker) Invoke(ctx context.Context, invocation pr invoked := make([]protocol.Invoker, 0) var result protocol.Result - ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + ivk := invoker.DoSelect(loadbalance, invocation, invokers, invoked) // DO INVOKE result = ivk.Invoke(ctx, invocation) if result.Error() != nil { diff --git a/cluster/cluster_impl/failsafe_cluster_test.go b/cluster/cluster/failsafe/cluster_test.go similarity index 83% rename from cluster/cluster_impl/failsafe_cluster_test.go rename to cluster/cluster/failsafe/cluster_test.go index de67fddd49..275995a886 100644 --- a/cluster/cluster_impl/failsafe_cluster_test.go +++ b/cluster/cluster/failsafe/cluster_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package failsafe import ( "context" @@ -32,8 +32,9 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster/directory" - "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory/static" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" @@ -47,16 +48,16 @@ var failsafeUrl, _ = common.NewURL( // registerFailsafe register failsafeCluster to cluster extension. func registerFailsafe(invoker *mock.MockInvoker) protocol.Invoker { - extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) - failsafeCluster := NewFailsafeCluster() + extension.SetLoadbalance("random", random.NewLoadBalance) + failsafeCluster := newCluster() - invokers := []protocol.Invoker{} + var invokers []protocol.Invoker invokers = append(invokers, invoker) invoker.EXPECT().IsAvailable().Return(true).AnyTimes() - invoker.EXPECT().GetUrl().Return(failbackUrl) + invoker.EXPECT().GetUrl().Return(failsafeUrl) - staticDir := directory.NewStaticDirectory(invokers) + staticDir := static.NewDirectory(invokers) clusterInvoker := failsafeCluster.Join(staticDir) return clusterInvoker } @@ -72,14 +73,14 @@ func TestFailSafeInvokeSuccess(t *testing.T) { invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() - mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}} invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) assert.NoError(t, result.Error()) - res := result.Result().(rest) - assert.True(t, res.success) + res := result.Result().(clusterpkg.Rest) + assert.True(t, res.Success) } func TestFailSafeInvokeFail(t *testing.T) { diff --git a/cluster/cluster_impl/forking_cluster.go b/cluster/cluster/forking/cluster.go similarity index 69% rename from cluster/cluster_impl/forking_cluster.go rename to cluster/cluster/forking/cluster.go index 7221d2df25..466fec2c0d 100644 --- a/cluster/cluster_impl/forking_cluster.go +++ b/cluster/cluster/forking/cluster.go @@ -15,31 +15,31 @@ * limitations under the License. */ -package cluster_impl +package forking import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" + "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type forkingCluster struct{} - -const forking = "forking" - func init() { - extension.SetCluster(forking, NewForkingCluster) + extension.SetCluster(constant.ClusterKeyForking, newCluster) } -// NewForkingCluster returns a forking cluster instance. +type cluster struct{} + +// newCluster returns a forking cluster instance. // // Multiple servers are invoked in parallel, returning as soon as one succeeds. // Usually it is used for real-time demanding read operations while wasting more service resources. -func NewForkingCluster() cluster.Cluster { - return &forkingCluster{} +func newCluster() clusterpkg.Cluster { + return &cluster{} } // Join returns a baseClusterInvoker instance -func (cluster *forkingCluster) Join(directory cluster.Directory) protocol.Invoker { - return buildInterceptorChain(newForkingClusterInvoker(directory)) +func (cluster *cluster) Join(directory directory.Directory) protocol.Invoker { + return clusterpkg.BuildInterceptorChain(newClusterInvoker(directory)) } diff --git a/cluster/cluster_impl/forking_cluster_invoker.go b/cluster/cluster/forking/cluster_invoker.go similarity index 76% rename from cluster/cluster_impl/forking_cluster_invoker.go rename to cluster/cluster/forking/cluster_invoker.go index 5844f065d3..0b76ffb93c 100644 --- a/cluster/cluster_impl/forking_cluster_invoker.go +++ b/cluster/cluster/forking/cluster_invoker.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package forking import ( "context" @@ -28,30 +28,30 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/cluster/base" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type forkingClusterInvoker struct { - baseClusterInvoker +type clusterInvoker struct { + base.ClusterInvoker } -func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker { - return &forkingClusterInvoker{ - baseClusterInvoker: newBaseClusterInvoker(directory), +func newClusterInvoker(directory directory.Directory) protocol.Invoker { + return &clusterInvoker{ + ClusterInvoker: base.NewClusterInvoker(directory), } } -// nolint -func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - if err := invoker.checkWhetherDestroyed(); err != nil { +func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + if err := invoker.CheckWhetherDestroyed(); err != nil { return &protocol.RPCResult{Err: err} } - invokers := invoker.directory.List(invocation) - if err := invoker.checkInvokers(invokers, invocation); err != nil { + invokers := invoker.Directory.List(invocation) + if err := invoker.CheckInvokers(invokers, invocation); err != nil { return &protocol.RPCResult{Err: err} } @@ -61,9 +61,9 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro if forks < 0 || forks > len(invokers) { selected = invokers } else { - loadBalance := getLoadBalance(invokers[0], invocation) + loadBalance := base.GetLoadBalance(invokers[0], invocation) for i := 0; i < forks; i++ { - if ivk := invoker.doSelect(loadBalance, invocation, invokers, selected); ivk != nil { + if ivk := invoker.DoSelect(loadBalance, invocation, invokers, selected); ivk != nil { selected = append(selected, ivk) } } diff --git a/cluster/cluster_impl/forking_cluster_test.go b/cluster/cluster/forking/cluster_test.go similarity index 86% rename from cluster/cluster_impl/forking_cluster_test.go rename to cluster/cluster/forking/cluster_test.go index bbb12bb9b4..c92bade090 100644 --- a/cluster/cluster_impl/forking_cluster_test.go +++ b/cluster/cluster/forking/cluster_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package forking import ( "context" @@ -33,8 +33,9 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster/directory" - "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory/static" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/roundrobin" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" @@ -47,18 +48,18 @@ var forkingUrl, _ = common.NewURL( fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT)) func registerForking(mockInvokers ...*mock.MockInvoker) protocol.Invoker { - extension.SetLoadbalance(loadbalance.RoundRobin, loadbalance.NewRoundRobinLoadBalance) + extension.SetLoadbalance(constant.LoadBalanceKeyRoundRobin, roundrobin.NewLoadBalance) - invokers := []protocol.Invoker{} + var invokers []protocol.Invoker for i, ivk := range mockInvokers { invokers = append(invokers, ivk) if i == 0 { ivk.EXPECT().GetUrl().Return(forkingUrl) } } - staticDir := directory.NewStaticDirectory(invokers) + staticDir := static.NewDirectory(invokers) - forkingCluster := NewForkingCluster() + forkingCluster := newCluster() clusterInvoker := forkingCluster.Join(staticDir) return clusterInvoker } @@ -69,7 +70,7 @@ func TestForkingInvokeSuccess(t *testing.T) { invokers := make([]*mock.MockInvoker, 0) - mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}} forkingUrl.AddParam(constant.FORKS_KEY, strconv.Itoa(3)) // forkingUrl.AddParam(constant.TIMEOUT_KEY, strconv.Itoa(constant.DEFAULT_TIMEOUT)) @@ -99,7 +100,7 @@ func TestForkingInvokeTimeout(t *testing.T) { invokers := make([]*mock.MockInvoker, 0) - mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}} forkingUrl.AddParam(constant.FORKS_KEY, strconv.Itoa(3)) var wg sync.WaitGroup @@ -130,7 +131,7 @@ func TestForkingInvokeHalfTimeout(t *testing.T) { invokers := make([]*mock.MockInvoker, 0) - mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}} forkingUrl.AddParam(constant.FORKS_KEY, strconv.Itoa(3)) var wg sync.WaitGroup diff --git a/cluster/cluster_impl/interceptor_invoker.go b/cluster/cluster/interceptor_invoker.go similarity index 63% rename from cluster/cluster_impl/interceptor_invoker.go rename to cluster/cluster/interceptor_invoker.go index a22695d74c..0cbe262dab 100644 --- a/cluster/cluster_impl/interceptor_invoker.go +++ b/cluster/cluster/interceptor_invoker.go @@ -15,23 +15,56 @@ * limitations under the License. */ -package cluster_impl +package cluster import ( "context" + "sync" ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" ) +var ( + lock sync.RWMutex + interceptors = make(map[string]func() Interceptor) +) + +// SetClusterInterceptor sets cluster interceptor so that user has chance to inject extra logics before and after +// cluster invoker +func SetClusterInterceptor(name string, fun func() Interceptor) { + lock.Lock() + defer lock.Unlock() + interceptors[name] = fun +} + +// GetClusterInterceptor returns the cluster interceptor instance with the given name +func GetClusterInterceptor(name string) Interceptor { + lock.RLock() + defer lock.RUnlock() + if interceptors[name] == nil { + panic("cluster_interceptor for " + name + " doesn't exist, make sure the corresponding package is imported") + } + return interceptors[name]() +} + +// GetClusterInterceptors returns all instances of registered cluster interceptors +func GetClusterInterceptors() []Interceptor { + lock.RLock() + defer lock.RUnlock() + ret := make([]Interceptor, 0, len(interceptors)) + for _, f := range interceptors { + ret = append(ret, f()) + } + return ret +} + // InterceptorInvoker mocks cluster interceptor as an invoker type InterceptorInvoker struct { next protocol.Invoker - interceptor cluster.Interceptor + interceptor Interceptor } // GetURL is used to get url from InterceptorInvoker @@ -54,10 +87,10 @@ func (i *InterceptorInvoker) Destroy() { i.next.Destroy() } -func buildInterceptorChain(invoker protocol.Invoker, builtins ...cluster.Interceptor) protocol.Invoker { +func BuildInterceptorChain(invoker protocol.Invoker, builtins ...Interceptor) protocol.Invoker { // The order of interceptors is from left to right, so loading from right to left next := invoker - interceptors := extension.GetClusterInterceptors() + interceptors := GetClusterInterceptors() if len(interceptors) != 0 { for i := len(interceptors) - 1; i >= 0; i-- { v := &InterceptorInvoker{next: next, interceptor: interceptors[i]} diff --git a/cluster/cluster/mock.go b/cluster/cluster/mock.go new file mode 100644 index 0000000000..77d3829d52 --- /dev/null +++ b/cluster/cluster/mock.go @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster + +import ( + "context" +) + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/cluster/directory" + "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/logger" + "dubbo.apache.org/dubbo-go/v3/protocol" +) + +var Count int + +type Rest struct { + Tried int + Success bool +} + +type mockCluster struct{} + +// NewMockCluster returns a mock cluster instance. +// +// Mock cluster is usually used for service degradation, such as an authentication service. +// When the service provider is completely hung up, the client does not throw an exception, +// return an authorization failure through the Mock data instead. +func NewMockCluster() Cluster { + return &mockCluster{} +} + +func (cluster *mockCluster) Join(directory directory.Directory) protocol.Invoker { + return BuildInterceptorChain(protocol.NewBaseInvoker(directory.GetURL())) +} + +type MockInvoker struct { + url *common.URL + available bool + destroyed bool + + successCount int +} + +func NewMockInvoker(url *common.URL, successCount int) *MockInvoker { + return &MockInvoker{ + url: url, + available: true, + destroyed: false, + successCount: successCount, + } +} + +func (bi *MockInvoker) GetURL() *common.URL { + return bi.url +} + +func (bi *MockInvoker) IsAvailable() bool { + return bi.available +} + +func (bi *MockInvoker) IsDestroyed() bool { + return bi.destroyed +} + +func (bi *MockInvoker) Invoke(c context.Context, invocation protocol.Invocation) protocol.Result { + Count++ + var ( + success bool + err error + ) + if Count >= bi.successCount { + success = true + } else { + err = perrors.New("error") + } + result := &protocol.RPCResult{Err: err, Rest: Rest{Tried: Count, Success: success}} + + return result +} + +func (bi *MockInvoker) Destroy() { + logger.Infof("Destroy invoker: %v", bi.GetURL().String()) + bi.destroyed = true + bi.available = false +} diff --git a/cluster/cluster_impl/zone_aware_cluster.go b/cluster/cluster/zoneaware/cluster.go similarity index 73% rename from cluster/cluster_impl/zone_aware_cluster.go rename to cluster/cluster/zoneaware/cluster.go index a280ba9fb7..134583bc2d 100644 --- a/cluster/cluster_impl/zone_aware_cluster.go +++ b/cluster/cluster/zoneaware/cluster.go @@ -15,30 +15,31 @@ * limitations under the License. */ -package cluster_impl +package zoneaware import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type zoneAwareCluster struct{} - func init() { - extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, NewZoneAwareCluster) + extension.SetCluster(constant.ClusterKeyZoneAware, newCluster) } +type cluster struct{} + // NewZoneAwareCluster returns a zoneaware cluster instance. // // More than one registry for subscription. // Usually it is used for choose between registries. -func NewZoneAwareCluster() cluster.Cluster { - return &zoneAwareCluster{} +func newCluster() clusterpkg.Cluster { + return &cluster{} } // Join returns a zoneAwareClusterInvoker instance -func (cluster *zoneAwareCluster) Join(directory cluster.Directory) protocol.Invoker { - return buildInterceptorChain(newZoneAwareClusterInvoker(directory), getZoneAwareInterceptor()) +func (cluster *cluster) Join(directory directory.Directory) protocol.Invoker { + return clusterpkg.BuildInterceptorChain(newClusterInvoker(directory), newInterceptor()) } diff --git a/cluster/cluster_impl/zone_aware_cluster_interceptor.go b/cluster/cluster/zoneaware/cluster_interceptor.go similarity index 80% rename from cluster/cluster_impl/zone_aware_cluster_interceptor.go rename to cluster/cluster/zoneaware/cluster_interceptor.go index ea26c2ea2c..eabe4e5cb3 100644 --- a/cluster/cluster_impl/zone_aware_cluster_interceptor.go +++ b/cluster/cluster/zoneaware/cluster_interceptor.go @@ -15,22 +15,22 @@ * limitations under the License. */ -package cluster_impl +package zoneaware import ( "context" ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type zoneAwareInterceptor struct { +type interceptor struct { } -func (z *zoneAwareInterceptor) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { +func (z *interceptor) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY force := ctx.Value(key) @@ -52,6 +52,6 @@ func (z *zoneAwareInterceptor) Invoke(ctx context.Context, invoker protocol.Invo return invoker.Invoke(ctx, invocation) } -func getZoneAwareInterceptor() cluster.Interceptor { - return &zoneAwareInterceptor{} +func newInterceptor() clusterpkg.Interceptor { + return &interceptor{} } diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker.go b/cluster/cluster/zoneaware/cluster_invoker.go similarity index 82% rename from cluster/cluster_impl/zone_aware_cluster_invoker.go rename to cluster/cluster/zoneaware/cluster_invoker.go index 1e04da555b..d6a72d14f0 100644 --- a/cluster/cluster_impl/zone_aware_cluster_invoker.go +++ b/cluster/cluster/zoneaware/cluster_invoker.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package zoneaware import ( "context" @@ -23,7 +23,8 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/cluster/base" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/protocol" ) @@ -35,22 +36,21 @@ import ( // 2. check the zone the current request belongs, pick the registry that has the same zone first. // 3. Evenly balance traffic between all registries based on each registry's weight. // 4. Pick anyone that's available. -type zoneAwareClusterInvoker struct { - baseClusterInvoker +type clusterInvoker struct { + base.ClusterInvoker } -func newZoneAwareClusterInvoker(directory cluster.Directory) protocol.Invoker { - invoker := &zoneAwareClusterInvoker{ - baseClusterInvoker: newBaseClusterInvoker(directory), +func newClusterInvoker(directory directory.Directory) protocol.Invoker { + invoker := &clusterInvoker{ + ClusterInvoker: base.NewClusterInvoker(directory), } return invoker } -// nolint -func (invoker *zoneAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - invokers := invoker.directory.List(invocation) +func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + invokers := invoker.Directory.List(invocation) - err := invoker.checkInvokers(invokers, invocation) + err := invoker.CheckInvokers(invokers, invocation) if err != nil { return &protocol.RPCResult{Err: err} } @@ -84,8 +84,8 @@ func (invoker *zoneAwareClusterInvoker) Invoke(ctx context.Context, invocation p } // load balance among all registries, with registry weight count in. - loadBalance := getLoadBalance(invokers[0], invocation) - ivk := invoker.doSelect(loadBalance, invocation, invokers, nil) + loadBalance := base.GetLoadBalance(invokers[0], invocation) + ivk := invoker.DoSelect(loadBalance, invocation, invokers, nil) if ivk != nil && ivk.IsAvailable() { return ivk.Invoke(ctx, invocation) } diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker_test.go b/cluster/cluster/zoneaware/cluster_invoker_test.go similarity index 88% rename from cluster/cluster_impl/zone_aware_cluster_invoker_test.go rename to cluster/cluster/zoneaware/cluster_invoker_test.go index 62433f3e01..084035badd 100644 --- a/cluster/cluster_impl/zone_aware_cluster_invoker_test.go +++ b/cluster/cluster/zoneaware/cluster_invoker_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster_impl +package zoneaware import ( "context" @@ -30,9 +30,12 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster/directory" + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory/static" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/invocation" "dubbo.apache.org/dubbo-go/v3/protocol/mock" @@ -46,7 +49,7 @@ func TestZoneWareInvokerWithPreferredSuccess(t *testing.T) { mockResult := &protocol.RPCResult{ Attrs: map[string]interface{}{constant.PREFERRED_KEY: "true"}, - Rest: rest{tried: 0, success: true}, + Rest: clusterpkg.Rest{Tried: 0, Success: true}, } var invokers []protocol.Invoker @@ -71,8 +74,8 @@ func TestZoneWareInvokerWithPreferredSuccess(t *testing.T) { invokers = append(invokers, invoker) } - zoneAwareCluster := NewZoneAwareCluster() - staticDir := directory.NewStaticDirectory(invokers) + zoneAwareCluster := newCluster() + staticDir := static.NewDirectory(invokers) clusterInvoker := zoneAwareCluster.Join(staticDir) result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) @@ -81,6 +84,8 @@ func TestZoneWareInvokerWithPreferredSuccess(t *testing.T) { } func TestZoneWareInvokerWithWeightSuccess(t *testing.T) { + extension.SetLoadbalance(constant.LoadBalanceKeyRandom, random.NewLoadBalance) + ctrl := gomock.NewController(t) // In Go versions 1.14+, if you pass a *testing.T // into gomock.NewController(t) you no longer need to call ctrl.Finish(). @@ -102,7 +107,7 @@ func TestZoneWareInvokerWithWeightSuccess(t *testing.T) { func(invocation protocol.Invocation) protocol.Result { return &protocol.RPCResult{ Attrs: map[string]interface{}{constant.WEIGHT_KEY: w1}, - Rest: rest{tried: 0, success: true}, + Rest: clusterpkg.Rest{Tried: 0, Success: true}, } }).MaxTimes(100) } else { @@ -111,15 +116,15 @@ func TestZoneWareInvokerWithWeightSuccess(t *testing.T) { func(invocation protocol.Invocation) protocol.Result { return &protocol.RPCResult{ Attrs: map[string]interface{}{constant.WEIGHT_KEY: w2}, - Rest: rest{tried: 0, success: true}, + Rest: clusterpkg.Rest{Tried: 0, Success: true}, } }).MaxTimes(100) } invokers = append(invokers, invoker) } - zoneAwareCluster := NewZoneAwareCluster() - staticDir := directory.NewStaticDirectory(invokers) + zoneAwareCluster := newCluster() + staticDir := static.NewDirectory(invokers) clusterInvoker := zoneAwareCluster.Join(staticDir) var w2Count, w1Count int @@ -159,14 +164,14 @@ func TestZoneWareInvokerWithZoneSuccess(t *testing.T) { func(invocation protocol.Invocation) protocol.Result { return &protocol.RPCResult{ Attrs: map[string]interface{}{constant.ZONE_KEY: zoneValue}, - Rest: rest{tried: 0, success: true}, + Rest: clusterpkg.Rest{Tried: 0, Success: true}, } }) invokers = append(invokers, invoker) } - zoneAwareCluster := NewZoneAwareCluster() - staticDir := directory.NewStaticDirectory(invokers) + zoneAwareCluster := newCluster() + staticDir := static.NewDirectory(invokers) clusterInvoker := zoneAwareCluster.Join(staticDir) inv := &invocation.RPCInvocation{} @@ -195,8 +200,8 @@ func TestZoneWareInvokerWithZoneForceFail(t *testing.T) { invokers = append(invokers, invoker) } - zoneAwareCluster := NewZoneAwareCluster() - staticDir := directory.NewStaticDirectory(invokers) + zoneAwareCluster := newCluster() + staticDir := static.NewDirectory(invokers) clusterInvoker := zoneAwareCluster.Join(staticDir) inv := &invocation.RPCInvocation{} diff --git a/cluster/cluster_impl/mock_cluster.go b/cluster/cluster_impl/import.go similarity index 56% rename from cluster/cluster_impl/mock_cluster.go rename to cluster/cluster_impl/import.go index 40d1c2bf5a..a7b16b4d40 100644 --- a/cluster/cluster_impl/mock_cluster.go +++ b/cluster/cluster_impl/import.go @@ -17,23 +17,18 @@ package cluster_impl +// This package is for being compatible with older dubbo-go, please use `imports` package. +// This package may be DEPRECATED OR REMOVED in the future. + import ( - "dubbo.apache.org/dubbo-go/v3/cluster" - "dubbo.apache.org/dubbo-go/v3/protocol" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/available" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/broadcast" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/failback" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/failfast" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/failover" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/failsafe" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/forking" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/zoneaware" ) -type mockCluster struct{} - -// NewMockCluster returns a mock cluster instance. -// -// Mock cluster is usually used for service degradation, such as an authentication service. -// When the service provider is completely hung up, the client does not throw an exception, -// return an authorization failure through the Mock data instead. -func NewMockCluster() cluster.Cluster { - return &mockCluster{} -} - -// nolint -func (cluster *mockCluster) Join(directory cluster.Directory) protocol.Invoker { - return buildInterceptorChain(protocol.NewBaseInvoker(directory.GetURL())) -} +func init() {} diff --git a/cluster/directory/base_directory.go b/cluster/directory/base/directory.go similarity index 76% rename from cluster/directory/base_directory.go rename to cluster/directory/base/directory.go index 44ab2d92e3..52445fcb03 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base/directory.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package directory +package base import ( "sync" @@ -32,8 +32,8 @@ import ( "dubbo.apache.org/dubbo-go/v3/common/constant" ) -// BaseDirectory Abstract implementation of Directory: Invoker list returned from this Directory's list method have been filtered by Routers -type BaseDirectory struct { +// Directory Abstract implementation of Directory: Invoker list returned from this Directory's list method have been filtered by Routers +type Directory struct { url *common.URL destroyed *atomic.Bool // this mutex for change the properties in BaseDirectory, like routerChain , destroyed etc @@ -41,9 +41,9 @@ type BaseDirectory struct { routerChain router.Chain } -// NewBaseDirectory Create BaseDirectory with URL -func NewBaseDirectory(url *common.URL) BaseDirectory { - return BaseDirectory{ +// NewDirectory Create BaseDirectory with URL +func NewDirectory(url *common.URL) Directory { + return Directory{ url: url, destroyed: atomic.NewBool(false), routerChain: &chain.RouterChain{}, @@ -51,28 +51,28 @@ func NewBaseDirectory(url *common.URL) BaseDirectory { } // RouterChain Return router chain in directory -func (dir *BaseDirectory) RouterChain() router.Chain { +func (dir *Directory) RouterChain() router.Chain { return dir.routerChain } // SetRouterChain Set router chain in directory -func (dir *BaseDirectory) SetRouterChain(routerChain router.Chain) { +func (dir *Directory) SetRouterChain(routerChain router.Chain) { dir.mutex.Lock() defer dir.mutex.Unlock() dir.routerChain = routerChain } // GetURL Get URL -func (dir *BaseDirectory) GetURL() *common.URL { +func (dir *Directory) GetURL() *common.URL { return dir.url } // GetDirectoryUrl Get URL instance -func (dir *BaseDirectory) GetDirectoryUrl() *common.URL { +func (dir *Directory) GetDirectoryUrl() *common.URL { return dir.url } -func (dir *BaseDirectory) isProperRouter(url *common.URL) bool { +func (dir *Directory) isProperRouter(url *common.URL) bool { app := url.GetParam(constant.APPLICATION_KEY, "") dirApp := dir.GetURL().GetParam(constant.APPLICATION_KEY, "") if len(dirApp) == 0 && dir.GetURL().SubURL != nil { @@ -92,7 +92,7 @@ func (dir *BaseDirectory) isProperRouter(url *common.URL) bool { } // Destroy Destroy -func (dir *BaseDirectory) Destroy(doDestroy func()) { +func (dir *Directory) Destroy(doDestroy func()) { if dir.destroyed.CAS(false, true) { dir.mutex.Lock() doDestroy() @@ -101,6 +101,6 @@ func (dir *BaseDirectory) Destroy(doDestroy func()) { } // IsAvailable Once directory init finish, it will change to true -func (dir *BaseDirectory) IsAvailable() bool { +func (dir *Directory) IsAvailable() bool { return !dir.destroyed.Load() } diff --git a/cluster/directory/base_directory_test.go b/cluster/directory/base/directory_test.go similarity index 92% rename from cluster/directory/base_directory_test.go rename to cluster/directory/base/directory_test.go index e465cea6f8..79cca2cbcd 100644 --- a/cluster/directory/base_directory_test.go +++ b/cluster/directory/base/directory_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package directory +package base import ( "encoding/base64" @@ -40,7 +40,7 @@ var ( ) func TestNewBaseDirectory(t *testing.T) { - dir := NewBaseDirectory(url) + dir := NewDirectory(url) assert.Equal(t, url, dir.GetURL()) assert.Equal(t, url, dir.GetDirectoryUrl()) } @@ -48,7 +48,7 @@ func TestNewBaseDirectory(t *testing.T) { func TestBuildRouterChain(t *testing.T) { regURL := url regURL.AddParam(constant.INTERFACE_KEY, "mock-app") - directory := NewBaseDirectory(regURL) + directory := NewDirectory(regURL) var err error directory.routerChain, err = chain.NewRouterChain(regURL) assert.Error(t, err) @@ -65,7 +65,7 @@ func getRouteURL(rule string, u *common.URL) *common.URL { func TestIsProperRouter(t *testing.T) { regURL := url regURL.AddParam(constant.APPLICATION_KEY, "mock-app") - d := NewBaseDirectory(regURL) + d := NewDirectory(regURL) localIP := common.GetLocalIp() rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP)) routeURL := getRouteURL(rule, anyURL) @@ -75,7 +75,7 @@ func TestIsProperRouter(t *testing.T) { regURL.AddParam(constant.APPLICATION_KEY, "") regURL.AddParam(constant.INTERFACE_KEY, "com.foo.BarService") - d = NewBaseDirectory(regURL) + d = NewDirectory(regURL) routeURL = getRouteURL(rule, anyURL) routeURL.AddParam(constant.INTERFACE_KEY, "com.foo.BarService") rst = d.isProperRouter(routeURL) @@ -83,14 +83,14 @@ func TestIsProperRouter(t *testing.T) { regURL.AddParam(constant.APPLICATION_KEY, "") regURL.AddParam(constant.INTERFACE_KEY, "") - d = NewBaseDirectory(regURL) + d = NewDirectory(regURL) routeURL = getRouteURL(rule, anyURL) rst = d.isProperRouter(routeURL) assert.True(t, rst) regURL.SetParam(constant.APPLICATION_KEY, "") regURL.SetParam(constant.INTERFACE_KEY, "") - d = NewBaseDirectory(regURL) + d = NewDirectory(regURL) routeURL = getRouteURL(rule, anyURL) routeURL.AddParam(constant.APPLICATION_KEY, "mock-service") rst = d.isProperRouter(routeURL) @@ -98,7 +98,7 @@ func TestIsProperRouter(t *testing.T) { regURL.SetParam(constant.APPLICATION_KEY, "") regURL.SetParam(constant.INTERFACE_KEY, "") - d = NewBaseDirectory(regURL) + d = NewDirectory(regURL) routeURL = getRouteURL(rule, anyURL) routeURL.AddParam(constant.INTERFACE_KEY, "mock-service") rst = d.isProperRouter(routeURL) diff --git a/cluster/directory.go b/cluster/directory/directory.go similarity index 98% rename from cluster/directory.go rename to cluster/directory/directory.go index d89f0cfb46..257eb4a703 100644 --- a/cluster/directory.go +++ b/cluster/directory/directory.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster +package directory import ( "dubbo.apache.org/dubbo-go/v3/common" diff --git a/cluster/directory/static_directory.go b/cluster/directory/static/directory.go similarity index 76% rename from cluster/directory/static_directory.go rename to cluster/directory/static/directory.go index ac7690af8b..7e877e42ad 100644 --- a/cluster/directory/static_directory.go +++ b/cluster/directory/static/directory.go @@ -15,41 +15,42 @@ * limitations under the License. */ -package directory +package static import ( perrors "github.com/pkg/errors" ) import ( + "dubbo.apache.org/dubbo-go/v3/cluster/directory/base" "dubbo.apache.org/dubbo-go/v3/cluster/router/chain" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/protocol" ) -type staticDirectory struct { - BaseDirectory +type directory struct { + base.Directory invokers []protocol.Invoker } -// NewStaticDirectory Create a new staticDirectory with invokers -func NewStaticDirectory(invokers []protocol.Invoker) *staticDirectory { +// NewDirectory Create a new staticDirectory with invokers +func NewDirectory(invokers []protocol.Invoker) *directory { var url *common.URL if len(invokers) > 0 { url = invokers[0].GetURL() } - dir := &staticDirectory{ - BaseDirectory: NewBaseDirectory(url), - invokers: invokers, + dir := &directory{ + Directory: base.NewDirectory(url), + invokers: invokers, } - dir.routerChain.SetInvokers(invokers) + dir.RouterChain().SetInvokers(invokers) return dir } // for-loop invokers ,if all invokers is available ,then it means directory is available -func (dir *staticDirectory) IsAvailable() bool { +func (dir *directory) IsAvailable() bool { if len(dir.invokers) == 0 { return false } @@ -62,7 +63,7 @@ func (dir *staticDirectory) IsAvailable() bool { } // List List invokers -func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invoker { +func (dir *directory) List(invocation protocol.Invocation) []protocol.Invoker { l := len(dir.invokers) invokers := make([]protocol.Invoker, l) copy(invokers, dir.invokers) @@ -76,8 +77,8 @@ func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invo } // Destroy Destroy -func (dir *staticDirectory) Destroy() { - dir.BaseDirectory.Destroy(func() { +func (dir *directory) Destroy() { + dir.Directory.Destroy(func() { for _, ivk := range dir.invokers { ivk.Destroy() } @@ -86,7 +87,7 @@ func (dir *staticDirectory) Destroy() { } // BuildRouterChain build router chain by invokers -func (dir *staticDirectory) BuildRouterChain(invokers []protocol.Invoker) error { +func (dir *directory) BuildRouterChain(invokers []protocol.Invoker) error { if len(invokers) == 0 { return perrors.Errorf("invokers == null") } diff --git a/cluster/directory/static_directory_test.go b/cluster/directory/static/directory_test.go similarity index 94% rename from cluster/directory/static_directory_test.go rename to cluster/directory/static/directory_test.go index b959f6d688..205d9364e7 100644 --- a/cluster/directory/static_directory_test.go +++ b/cluster/directory/static/directory_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package directory +package static import ( "fmt" @@ -39,7 +39,7 @@ func TestStaticDirList(t *testing.T) { invokers = append(invokers, protocol.NewBaseInvoker(url)) } - staticDir := NewStaticDirectory(invokers) + staticDir := NewDirectory(invokers) list := staticDir.List(&invocation.RPCInvocation{}) assert.Len(t, list, 10) @@ -52,7 +52,7 @@ func TestStaticDirDestroy(t *testing.T) { invokers = append(invokers, protocol.NewBaseInvoker(url)) } - staticDir := NewStaticDirectory(invokers) + staticDir := NewDirectory(invokers) assert.Equal(t, true, staticDir.IsAvailable()) staticDir.Destroy() assert.Equal(t, false, staticDir.IsAvailable()) diff --git a/cluster/loadbalance/consistenthashing/loadbalance.go b/cluster/loadbalance/consistenthashing/loadbalance.go new file mode 100644 index 0000000000..965f55d735 --- /dev/null +++ b/cluster/loadbalance/consistenthashing/loadbalance.go @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package consistenthashing + +import ( + "encoding/json" + "hash/crc32" + "regexp" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/protocol" +) + +const ( + // HashNodes hash nodes + HashNodes = "hash.nodes" + // HashArguments key of hash arguments in url + HashArguments = "hash.arguments" +) + +var ( + selectors = make(map[string]*selector) + re = regexp.MustCompile(constant.COMMA_SPLIT_PATTERN) +) + +func init() { + extension.SetLoadbalance(constant.LoadBalanceKeyConsistentHashing, newLoadBalance) +} + +// loadBalance implementation of load balancing: using consistent hashing +type loadBalance struct{} + +// newLoadBalance creates NewConsistentHashLoadBalance +// +// The same parameters of the request is always sent to the same provider. +func newLoadBalance() loadbalance.LoadBalance { + return &loadBalance{} +} + +// Select gets invoker based on load balancing strategy +func (lb *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker { + methodName := invocation.MethodName() + key := invokers[0].GetURL().ServiceKey() + "." + methodName + + // hash the invokers + var bs []byte + for _, invoker := range invokers { + b, err := json.Marshal(invoker) + if err != nil { + return nil + } + bs = append(bs, b...) + } + hashCode := crc32.ChecksumIEEE(bs) + selector, ok := selectors[key] + if !ok || selector.hashCode != hashCode { + selectors[key] = newSelector(invokers, methodName, hashCode) + selector = selectors[key] + } + return selector.Select(invocation) +} diff --git a/cluster/loadbalance/consistent_hash_test.go b/cluster/loadbalance/consistenthashing/loadbalance_test.go similarity index 94% rename from cluster/loadbalance/consistent_hash_test.go rename to cluster/loadbalance/consistenthashing/loadbalance_test.go index 72848b55e5..80b9b017ed 100644 --- a/cluster/loadbalance/consistent_hash_test.go +++ b/cluster/loadbalance/consistenthashing/loadbalance_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package loadbalance +package consistenthashing import ( "fmt" @@ -27,7 +27,7 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/invocation" @@ -52,14 +52,14 @@ func TestConsistentHashSelectorSuite(t *testing.T) { type consistentHashSelectorSuite struct { suite.Suite - selector *consistentHashSelector + selector *selector } func (s *consistentHashSelectorSuite) SetupTest() { var invokers []protocol.Invoker url, _ := common.NewURL(url20000) invokers = append(invokers, protocol.NewBaseInvoker(url)) - s.selector = newConsistentHashSelector(invokers, "echo", 999944) + s.selector = newSelector(invokers, "echo", 999944) } func (s *consistentHashSelectorSuite) TestToKey() { @@ -91,7 +91,7 @@ type consistentHashLoadBalanceSuite struct { invoker1 protocol.Invoker invoker2 protocol.Invoker invoker3 protocol.Invoker - lb cluster.LoadBalance + lb loadbalance.LoadBalance } func (s *consistentHashLoadBalanceSuite) SetupTest() { @@ -108,7 +108,7 @@ func (s *consistentHashLoadBalanceSuite) SetupTest() { s.invoker3 = protocol.NewBaseInvoker(s.url3) s.invokers = append(s.invokers, s.invoker1, s.invoker2, s.invoker3) - s.lb = NewConsistentHashLoadBalance() + s.lb = newLoadBalance() } func (s *consistentHashLoadBalanceSuite) TestSelect() { diff --git a/cluster/loadbalance/consistent_hash.go b/cluster/loadbalance/consistenthashing/selector.go similarity index 52% rename from cluster/loadbalance/consistent_hash.go rename to cluster/loadbalance/consistenthashing/selector.go index 7f1414c8a4..55d37507f7 100644 --- a/cluster/loadbalance/consistent_hash.go +++ b/cluster/loadbalance/consistenthashing/selector.go @@ -15,14 +15,11 @@ * limitations under the License. */ -package loadbalance +package consistenthashing import ( "crypto/md5" - "encoding/json" "fmt" - "hash/crc32" - "regexp" "sort" "strconv" "strings" @@ -33,65 +30,11 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" - "dubbo.apache.org/dubbo-go/v3/common/constant" - "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" ) -const ( - // ConsistentHash consistent hash - ConsistentHash = "consistenthash" - // HashNodes hash nodes - HashNodes = "hash.nodes" - // HashArguments key of hash arguments in url - HashArguments = "hash.arguments" -) - -var ( - selectors = make(map[string]*consistentHashSelector) - re = regexp.MustCompile(constant.COMMA_SPLIT_PATTERN) -) - -func init() { - extension.SetLoadbalance(ConsistentHash, NewConsistentHashLoadBalance) -} - -// consistentHashLoadBalance implementation of load balancing: using consistent hashing -type consistentHashLoadBalance struct{} - -// NewConsistentHashLoadBalance creates NewConsistentHashLoadBalance -// -// The same parameters of the request is always sent to the same provider. -func NewConsistentHashLoadBalance() cluster.LoadBalance { - return &consistentHashLoadBalance{} -} - -// Select gets invoker based on load balancing strategy -func (lb *consistentHashLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker { - methodName := invocation.MethodName() - key := invokers[0].GetURL().ServiceKey() + "." + methodName - - // hash the invokers - bs := make([]byte, 0) - for _, invoker := range invokers { - b, err := json.Marshal(invoker) - if err != nil { - return nil - } - bs = append(bs, b...) - } - hashCode := crc32.ChecksumIEEE(bs) - selector, ok := selectors[key] - if !ok || selector.hashCode != hashCode { - selectors[key] = newConsistentHashSelector(invokers, methodName, hashCode) - selector = selectors[key] - } - return selector.Select(invocation) -} - -// consistentHashSelector implementation of Selector:get invoker based on load balancing strategy -type consistentHashSelector struct { +// selector implementation of Selector:get invoker based on load balancing strategy +type selector struct { hashCode uint32 replicaNum int virtualInvokers map[uint32]protocol.Invoker @@ -99,10 +42,10 @@ type consistentHashSelector struct { argumentIndex []int } -func newConsistentHashSelector(invokers []protocol.Invoker, methodName string, - hashCode uint32) *consistentHashSelector { +func newSelector(invokers []protocol.Invoker, methodName string, + hashCode uint32) *selector { - selector := &consistentHashSelector{} + selector := &selector{} selector.virtualInvokers = make(map[uint32]protocol.Invoker) selector.hashCode = hashCode url := invokers[0].GetURL() @@ -132,23 +75,23 @@ func newConsistentHashSelector(invokers []protocol.Invoker, methodName string, } // Select gets invoker based on load balancing strategy -func (c *consistentHashSelector) Select(invocation protocol.Invocation) protocol.Invoker { +func (c *selector) Select(invocation protocol.Invocation) protocol.Invoker { key := c.toKey(invocation.Arguments()) digest := md5.Sum([]byte(key)) return c.selectForKey(c.hash(digest, 0)) } -func (c *consistentHashSelector) toKey(args []interface{}) string { +func (c *selector) toKey(args []interface{}) string { var sb strings.Builder for i := range c.argumentIndex { if i >= 0 && i < len(args) { - fmt.Fprint(&sb, args[i].(string)) + _, _ = fmt.Fprint(&sb, args[i].(string)) } } return sb.String() } -func (c *consistentHashSelector) selectForKey(hash uint32) protocol.Invoker { +func (c *selector) selectForKey(hash uint32) protocol.Invoker { idx := sort.Search(len(c.keys), func(i int) bool { return c.keys[i] >= hash }) @@ -158,8 +101,7 @@ func (c *consistentHashSelector) selectForKey(hash uint32) protocol.Invoker { return c.virtualInvokers[c.keys[idx]] } -// nolint -func (c *consistentHashSelector) hash(digest [16]byte, i int) uint32 { +func (c *selector) hash(digest [16]byte, i int) uint32 { return (uint32(digest[3+i*4]&0xFF) << 24) | (uint32(digest[2+i*4]&0xFF) << 16) | (uint32(digest[1+i*4]&0xFF) << 8) | uint32(digest[i*4]&0xFF)&0xFFFFFFF } diff --git a/cluster/loadbalance/least_active.go b/cluster/loadbalance/leastactive/loadbalance.go similarity index 81% rename from cluster/loadbalance/least_active.go rename to cluster/loadbalance/leastactive/loadbalance.go index 59767408fd..317982bc81 100644 --- a/cluster/loadbalance/least_active.go +++ b/cluster/loadbalance/leastactive/loadbalance.go @@ -15,38 +15,39 @@ * limitations under the License. */ -package loadbalance +package leastactive import ( "math/rand" ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" ) const ( - // LeastActive is used to set the load balance extension - LeastActive = "leastactive" + // Key is used to set the load balance extension + Key = "leastactive" ) func init() { - extension.SetLoadbalance(LeastActive, NewLeastActiveLoadBalance) + extension.SetLoadbalance(constant.LoadBalanceKeyLeastActive, newLoadBalance) } -type leastActiveLoadBalance struct{} +type loadBalance struct{} -// NewLeastActiveLoadBalance returns a least active load balance. +// newLoadBalance returns a least active load balance. // // A random mechanism based on actives, actives means the number of a consumer's requests have been sent to provider but not yet got response. -func NewLeastActiveLoadBalance() cluster.LoadBalance { - return &leastActiveLoadBalance{} +func newLoadBalance() loadbalance.LoadBalance { + return &loadBalance{} } // Select gets invoker based on least active load balancing strategy -func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker { +func (lb *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker { count := len(invokers) if count == 0 { return nil @@ -69,7 +70,7 @@ func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation // Active number active := protocol.GetMethodStatus(invoker.GetURL(), invocation.MethodName()).GetActive() // current weight (maybe in warmUp) - weight := GetWeight(invoker, invocation) + weight := loadbalance.GetWeight(invoker, invocation) // There are smaller active services if leastActive == -1 || active < leastActive { leastActive = active @@ -97,7 +98,7 @@ func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation offsetWeight := rand.Int63n(totalWeight) + 1 for i := 0; i < leastCount; i++ { leastIndex := leastIndexes[i] - offsetWeight -= GetWeight(invokers[i], invocation) + offsetWeight -= loadbalance.GetWeight(invokers[i], invocation) if offsetWeight <= 0 { return invokers[leastIndex] } diff --git a/cluster/loadbalance/least_active_test.go b/cluster/loadbalance/leastactive/loadbalance_test.go similarity index 95% rename from cluster/loadbalance/least_active_test.go rename to cluster/loadbalance/leastactive/loadbalance_test.go index 5d6a50fa52..223692bdd3 100644 --- a/cluster/loadbalance/least_active_test.go +++ b/cluster/loadbalance/leastactive/loadbalance_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package loadbalance +package leastactive import ( "fmt" @@ -34,7 +34,7 @@ import ( ) func TestLeastActiveSelect(t *testing.T) { - loadBalance := NewLeastActiveLoadBalance() + loadBalance := newLoadBalance() var invokers []protocol.Invoker @@ -51,7 +51,7 @@ func TestLeastActiveSelect(t *testing.T) { } func TestLeastActiveByWeight(t *testing.T) { - loadBalance := NewLeastActiveLoadBalance() + loadBalance := newLoadBalance() var invokers []protocol.Invoker loop := 3 diff --git a/cluster/loadbalance.go b/cluster/loadbalance/loadbalance.go similarity index 98% rename from cluster/loadbalance.go rename to cluster/loadbalance/loadbalance.go index b66c18c6d2..d8a5dcd6e7 100644 --- a/cluster/loadbalance.go +++ b/cluster/loadbalance/loadbalance.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package cluster +package loadbalance import ( "dubbo.apache.org/dubbo-go/v3/protocol" diff --git a/cluster/loadbalance/random.go b/cluster/loadbalance/random/loadbalance.go similarity index 75% rename from cluster/loadbalance/random.go rename to cluster/loadbalance/random/loadbalance.go index 86c34448b7..c3a7fb5d68 100644 --- a/cluster/loadbalance/random.go +++ b/cluster/loadbalance/random/loadbalance.go @@ -15,36 +15,33 @@ * limitations under the License. */ -package loadbalance +package random import ( "math/rand" ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" ) -const ( - name = "random" -) - func init() { - extension.SetLoadbalance(name, NewRandomLoadBalance) + extension.SetLoadbalance(constant.LoadBalanceKeyRandom, NewLoadBalance) } -type randomLoadBalance struct{} +type loadBalance struct{} -// NewRandomLoadBalance returns a random load balance instance. +// NewLoadBalance returns a random load balance instance. // // Set random probabilities by weight, and the request will be sent to provider randomly. -func NewRandomLoadBalance() cluster.LoadBalance { - return &randomLoadBalance{} +func NewLoadBalance() loadbalance.LoadBalance { + return &loadBalance{} } -func (lb *randomLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker { +func (lb *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker { var length int if length = len(invokers); length == 1 { return invokers[0] @@ -52,12 +49,12 @@ func (lb *randomLoadBalance) Select(invokers []protocol.Invoker, invocation prot sameWeight := true weights := make([]int64, length) - firstWeight := GetWeight(invokers[0], invocation) + firstWeight := loadbalance.GetWeight(invokers[0], invocation) totalWeight := firstWeight weights[0] = firstWeight for i := 1; i < length; i++ { - weight := GetWeight(invokers[i], invocation) + weight := loadbalance.GetWeight(invokers[i], invocation) weights[i] = weight totalWeight += weight diff --git a/cluster/loadbalance/random_test.go b/cluster/loadbalance/random/loadbalance_test.go similarity index 82% rename from cluster/loadbalance/random_test.go rename to cluster/loadbalance/random/loadbalance_test.go index de8ce1ead1..37ad2642d1 100644 --- a/cluster/loadbalance/random_test.go +++ b/cluster/loadbalance/random/loadbalance_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package loadbalance +package random import ( "fmt" @@ -43,29 +43,29 @@ const ( ) func TestRandomlbSelect(t *testing.T) { - randomlb := NewRandomLoadBalance() + randomlb := NewLoadBalance() - invokers := []protocol.Invoker{} + var invokers []protocol.Invoker - url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, 0)) - invokers = append(invokers, protocol.NewBaseInvoker(url)) + u, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, 0)) + invokers = append(invokers, protocol.NewBaseInvoker(u)) i := randomlb.Select(invokers, &invocation.RPCInvocation{}) - assert.True(t, i.GetURL().URLEqual(url)) + assert.True(t, i.GetURL().URLEqual(u)) for i := 1; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i)) - invokers = append(invokers, protocol.NewBaseInvoker(url)) + u, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i)) + invokers = append(invokers, protocol.NewBaseInvoker(u)) } randomlb.Select(invokers, &invocation.RPCInvocation{}) } func TestRandomlbSelectWeight(t *testing.T) { - randomlb := NewRandomLoadBalance() + randomlb := NewLoadBalance() invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i)) - invokers = append(invokers, protocol.NewBaseInvoker(url)) + u, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i)) + invokers = append(invokers, protocol.NewBaseInvoker(u)) } urlParams := url.Values{} @@ -92,12 +92,12 @@ func TestRandomlbSelectWeight(t *testing.T) { } func TestRandomlbSelectWarmup(t *testing.T) { - randomlb := NewRandomLoadBalance() + randomlb := NewLoadBalance() invokers := []protocol.Invoker{} for i := 0; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i)) - invokers = append(invokers, protocol.NewBaseInvoker(url)) + u, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i)) + invokers = append(invokers, protocol.NewBaseInvoker(u)) } urlParams := url.Values{} diff --git a/cluster/loadbalance/round_robin.go b/cluster/loadbalance/roundrobin/loadbalance.go similarity index 81% rename from cluster/loadbalance/round_robin.go rename to cluster/loadbalance/roundrobin/loadbalance.go index e52e031350..80665d1645 100644 --- a/cluster/loadbalance/round_robin.go +++ b/cluster/loadbalance/roundrobin/loadbalance.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package loadbalance +package roundrobin import ( "math" @@ -25,42 +25,38 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" ) const ( - // RoundRobin load balancing way - RoundRobin = "roundrobin" - - // nolint - COMPLETE = 0 - // nolint - UPDATING = 1 + Complete = 0 + Updating = 1 ) var ( methodWeightMap sync.Map // [string]invokers - state = int32(COMPLETE) // update lock acquired ? + state = int32(Complete) // update lock acquired ? recyclePeriod = 60 * time.Second.Nanoseconds() ) func init() { - extension.SetLoadbalance(RoundRobin, NewRoundRobinLoadBalance) + extension.SetLoadbalance(constant.LoadBalanceKeyRoundRobin, NewLoadBalance) } -type roundRobinLoadBalance struct{} +type loadBalance struct{} -// NewRoundRobinLoadBalance returns a round robin load balance +// NewLoadBalance returns a round robin load balance // // Use the weight's common advisory to determine round robin ratio -func NewRoundRobinLoadBalance() cluster.LoadBalance { - return &roundRobinLoadBalance{} +func NewLoadBalance() loadbalance.LoadBalance { + return &loadBalance{} } // Select gets invoker based on round robin load balancing strategy -func (lb *roundRobinLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker { +func (lb *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker { count := len(invokers) if count == 0 { return nil @@ -83,7 +79,7 @@ func (lb *roundRobinLoadBalance) Select(invokers []protocol.Invoker, invocation ) for _, invoker := range invokers { - weight := GetWeight(invoker, invocation) + weight := loadbalance.GetWeight(invoker, invocation) if weight < 0 { weight = 0 } @@ -122,8 +118,8 @@ func (lb *roundRobinLoadBalance) Select(invokers []protocol.Invoker, invocation } func cleanIfRequired(clean bool, invokers *cachedInvokers, now *time.Time) { - if clean && atomic.CompareAndSwapInt32(&state, COMPLETE, UPDATING) { - defer atomic.CompareAndSwapInt32(&state, UPDATING, COMPLETE) + if clean && atomic.CompareAndSwapInt32(&state, Complete, Updating) { + defer atomic.CompareAndSwapInt32(&state, Updating, Complete) invokers.Range(func(identify, robin interface{}) bool { weightedRoundRobin := robin.(*weightedRoundRobin) elapsed := now.Sub(*weightedRoundRobin.lastUpdate).Nanoseconds() diff --git a/cluster/loadbalance/round_robin_test.go b/cluster/loadbalance/roundrobin/loadbalance_test.go similarity index 95% rename from cluster/loadbalance/round_robin_test.go rename to cluster/loadbalance/roundrobin/loadbalance_test.go index 015c49aeca..fea54858e3 100644 --- a/cluster/loadbalance/round_robin_test.go +++ b/cluster/loadbalance/roundrobin/loadbalance_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package loadbalance +package roundrobin import ( "fmt" @@ -35,7 +35,7 @@ import ( ) func TestRoundRobinSelect(t *testing.T) { - loadBalance := NewRoundRobinLoadBalance() + loadBalance := NewLoadBalance() var invokers []protocol.Invoker @@ -53,7 +53,7 @@ func TestRoundRobinSelect(t *testing.T) { } func TestRoundRobinByWeight(t *testing.T) { - loadBalance := NewRoundRobinLoadBalance() + loadBalance := NewLoadBalance() var invokers []protocol.Invoker loop := 10 diff --git a/common/constant/cluster.go b/common/constant/cluster.go index 6894f3595e..14f3375317 100644 --- a/common/constant/cluster.go +++ b/common/constant/cluster.go @@ -17,8 +17,13 @@ package constant -// nolint const ( - FAILOVER_CLUSTER_NAME = "failover" - ZONEAWARE_CLUSTER_NAME = "zoneAware" + ClusterKeyAvailable = "available" + ClusterKeyBroadcast = "broadcast" + ClusterKeyFailback = "failback" + ClusterKeyFailfast = "failfast" + ClusterKeyFailover = "failover" + ClusterKeyFailsafe = "failsafe" + ClusterKeyForking = "forking" + ClusterKeyZoneAware = "zoneAware" ) diff --git a/common/constant/loadbalance.go b/common/constant/loadbalance.go new file mode 100644 index 0000000000..7b1d8ea195 --- /dev/null +++ b/common/constant/loadbalance.go @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package constant + +const ( + LoadBalanceKeyConsistentHashing = "consistenthashing" + LoadBalanceKeyLeastActive = "leastactive" + LoadBalanceKeyRandom = "random" + LoadBalanceKeyRoundRobin = "roundrobin" +) diff --git a/common/extension/cluster.go b/common/extension/cluster.go index 7c91c5e452..d3614ba1b1 100644 --- a/common/extension/cluster.go +++ b/common/extension/cluster.go @@ -18,7 +18,7 @@ package extension import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/cluster" ) var clusters = make(map[string]func() cluster.Cluster) diff --git a/common/extension/cluster_interceptor.go b/common/extension/cluster_interceptor.go deleted file mode 100644 index e4372597e4..0000000000 --- a/common/extension/cluster_interceptor.go +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package extension - -import ( - "sync" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/cluster" -) - -var ( - lock sync.RWMutex - interceptors = make(map[string]func() cluster.Interceptor) -) - -// SetClusterInterceptor sets cluster interceptor so that user has chance to inject extra logics before and after -// cluster invoker -func SetClusterInterceptor(name string, fun func() cluster.Interceptor) { - lock.Lock() - defer lock.Unlock() - interceptors[name] = fun -} - -// GetClusterInterceptor returns the cluster interceptor instance with the given name -func GetClusterInterceptor(name string) cluster.Interceptor { - lock.RLock() - defer lock.RUnlock() - if interceptors[name] == nil { - panic("cluster_interceptor for " + name + " doesn't exist, make sure the corresponding package is imported") - } - return interceptors[name]() -} - -// GetClusterInterceptors returns all instances of registered cluster interceptors -func GetClusterInterceptors() []cluster.Interceptor { - lock.RLock() - defer lock.RUnlock() - ret := make([]cluster.Interceptor, 0, len(interceptors)) - for _, f := range interceptors { - ret = append(ret, f()) - } - return ret -} diff --git a/common/extension/loadbalance.go b/common/extension/loadbalance.go index 2a8fcf4e10..3308b405d8 100644 --- a/common/extension/loadbalance.go +++ b/common/extension/loadbalance.go @@ -18,19 +18,19 @@ package extension import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" ) -var loadbalances = make(map[string]func() cluster.LoadBalance) +var loadbalances = make(map[string]func() loadbalance.LoadBalance) // SetLoadbalance sets the loadbalance extension with @name // For example: random/round_robin/consistent_hash/least_active/... -func SetLoadbalance(name string, fcn func() cluster.LoadBalance) { +func SetLoadbalance(name string, fcn func() loadbalance.LoadBalance) { loadbalances[name] = fcn } // GetLoadbalance finds the loadbalance extension with @name -func GetLoadbalance(name string) cluster.LoadBalance { +func GetLoadbalance(name string) loadbalance.LoadBalance { if loadbalances[name] == nil { panic("loadbalance for " + name + " is not existing, make sure you have import the package.") } diff --git a/common/extension/registry_directory.go b/common/extension/registry_directory.go index e810659cb6..ae2fef3af3 100644 --- a/common/extension/registry_directory.go +++ b/common/extension/registry_directory.go @@ -18,12 +18,12 @@ package extension import ( - "dubbo.apache.org/dubbo-go/v3/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/registry" ) -type registryDirectory func(url *common.URL, registry registry.Registry) (cluster.Directory, error) +type registryDirectory func(url *common.URL, registry registry.Registry) (directory.Directory, error) var defaultRegistry registryDirectory @@ -33,7 +33,7 @@ func SetDefaultRegistryDirectory(v registryDirectory) { } // GetDefaultRegistryDirectory finds the registryDirectory with url and registry -func GetDefaultRegistryDirectory(config *common.URL, registry registry.Registry) (cluster.Directory, error) { +func GetDefaultRegistryDirectory(config *common.URL, registry registry.Registry) (directory.Directory, error) { if defaultRegistry == nil { panic("registry directory is not existing, make sure you have import the package.") } diff --git a/config/reference_config.go b/config/reference_config.go index 4dc871ab92..d67f641297 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -31,7 +31,7 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster/directory" + "dubbo.apache.org/dubbo-go/v3/cluster/directory/static" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" @@ -176,25 +176,25 @@ func (rc *ReferenceConfig) Refer(srv interface{}) { if len(invokers) == 1 { rc.invoker = invokers[0] if rc.URL != "" { - hitClu := constant.FAILOVER_CLUSTER_NAME + hitClu := constant.ClusterKeyFailover if u := rc.invoker.GetURL(); u != nil { - hitClu = u.GetParam(constant.CLUSTER_KEY, constant.ZONEAWARE_CLUSTER_NAME) + hitClu = u.GetParam(constant.CLUSTER_KEY, constant.ClusterKeyZoneAware) } - rc.invoker = extension.GetCluster(hitClu).Join(directory.NewStaticDirectory(invokers)) + rc.invoker = extension.GetCluster(hitClu).Join(static.NewDirectory(invokers)) } } else { var hitClu string if regURL != nil { // for multi-subscription scenario, use 'zone-aware' policy by default - hitClu = constant.ZONEAWARE_CLUSTER_NAME + hitClu = constant.ClusterKeyZoneAware } else { // not a registry url, must be direct invoke. - hitClu = constant.FAILOVER_CLUSTER_NAME + hitClu = constant.ClusterKeyFailover if u := invokers[0].GetURL(); u != nil { - hitClu = u.GetParam(constant.CLUSTER_KEY, constant.ZONEAWARE_CLUSTER_NAME) + hitClu = u.GetParam(constant.CLUSTER_KEY, constant.ClusterKeyZoneAware) } } - rc.invoker = extension.GetCluster(hitClu).Join(directory.NewStaticDirectory(invokers)) + rc.invoker = extension.GetCluster(hitClu).Join(static.NewDirectory(invokers)) } // publish consumer's metadata diff --git a/filter/filter_impl/import.go b/filter/filter_impl/import.go index 8782cd4dfc..457e3cdde6 100644 --- a/filter/filter_impl/import.go +++ b/filter/filter_impl/import.go @@ -17,8 +17,7 @@ package filter_impl -// This package is for being compatible with older dubbo-go, please refer to https://github.com/dubbogo/imports to see -// the recommended import ways. +// This package is for being compatible with older dubbo-go, please use `imports` package. // This package may be DEPRECATED OR REMOVED in the future. import ( diff --git a/imports/imports.go b/imports/imports.go index c5f757c083..078be19ae9 100644 --- a/imports/imports.go +++ b/imports/imports.go @@ -18,13 +18,36 @@ package imports import ( - _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster_impl" - _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/available" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/broadcast" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/failback" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/failfast" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/failover" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/failsafe" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/forking" + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/zoneaware" + _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/consistenthashing" + _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/leastactive" + _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random" + _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/roundrobin" _ "dubbo.apache.org/dubbo-go/v3/common/proxy/proxy_factory" _ "dubbo.apache.org/dubbo-go/v3/config_center/apollo" _ "dubbo.apache.org/dubbo-go/v3/config_center/nacos" _ "dubbo.apache.org/dubbo-go/v3/config_center/zookeeper" - _ "dubbo.apache.org/dubbo-go/v3/filter/filter_impl" + _ "dubbo.apache.org/dubbo-go/v3/filter/accesslog" + _ "dubbo.apache.org/dubbo-go/v3/filter/active" + _ "dubbo.apache.org/dubbo-go/v3/filter/auth" + _ "dubbo.apache.org/dubbo-go/v3/filter/echo" + _ "dubbo.apache.org/dubbo-go/v3/filter/execlmt" + _ "dubbo.apache.org/dubbo-go/v3/filter/generic" + _ "dubbo.apache.org/dubbo-go/v3/filter/gshutdown" + _ "dubbo.apache.org/dubbo-go/v3/filter/hystrix" + _ "dubbo.apache.org/dubbo-go/v3/filter/metrics" + _ "dubbo.apache.org/dubbo-go/v3/filter/seata" + _ "dubbo.apache.org/dubbo-go/v3/filter/sentinel" + _ "dubbo.apache.org/dubbo-go/v3/filter/token" + _ "dubbo.apache.org/dubbo-go/v3/filter/tps" + _ "dubbo.apache.org/dubbo-go/v3/filter/tracing" _ "dubbo.apache.org/dubbo-go/v3/metadata/mapping/metadata" _ "dubbo.apache.org/dubbo-go/v3/metadata/report/etcd" _ "dubbo.apache.org/dubbo-go/v3/metadata/report/nacos" diff --git a/registry/directory/directory.go b/registry/directory/directory.go index eb22597b68..b8a4b674b0 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -29,8 +29,9 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/cluster" "dubbo.apache.org/dubbo-go/v3/cluster/directory" + "dubbo.apache.org/dubbo-go/v3/cluster/directory/base" + "dubbo.apache.org/dubbo-go/v3/cluster/directory/static" "dubbo.apache.org/dubbo-go/v3/cluster/router/chain" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" @@ -52,7 +53,7 @@ func init() { // RegistryDirectory implementation of Directory: // Invoker list returned from this Directory's list method have been filtered by Routers type RegistryDirectory struct { - directory.BaseDirectory + base.Directory cacheInvokers []protocol.Invoker invokersLock sync.RWMutex serviceType string @@ -69,13 +70,13 @@ type RegistryDirectory struct { } // NewRegistryDirectory will create a new RegistryDirectory -func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) { +func NewRegistryDirectory(url *common.URL, registry registry.Registry) (directory.Directory, error) { if url.SubURL == nil { return nil, perrors.Errorf("url is invalid, suburl can not be nil") } logger.Debugf("new RegistryDirectory for service :%s.", url.Key()) dir := &RegistryDirectory{ - BaseDirectory: directory.NewBaseDirectory(url), + Directory: base.NewDirectory(url), cacheInvokers: []protocol.Invoker{}, cacheInvokersMap: &sync.Map{}, serviceType: url.SubURL.Service(), @@ -85,7 +86,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster. dir.consumerURL = dir.getConsumerUrl(url.SubURL) if routerChain, err := chain.NewRouterChain(dir.consumerURL); err == nil { - dir.BaseDirectory.SetRouterChain(routerChain) + dir.Directory.SetRouterChain(routerChain) } else { logger.Warnf("fail to create router chain with url: %s, err is: %v", url.SubURL, err) } @@ -302,7 +303,7 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker { } } else { for _, invokers := range groupInvokersMap { - staticDir := directory.NewStaticDirectory(invokers) + staticDir := static.NewDirectory(invokers) cst := extension.GetCluster(dir.GetURL().SubURL.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) err = staticDir.BuildRouterChain(invokers) if err != nil { @@ -400,8 +401,8 @@ func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.In // IsAvailable whether the directory is available func (dir *RegistryDirectory) IsAvailable() bool { - if !dir.BaseDirectory.IsAvailable() { - return dir.BaseDirectory.IsAvailable() + if !dir.Directory.IsAvailable() { + return dir.Directory.IsAvailable() } for _, ivk := range dir.cacheInvokers { @@ -416,7 +417,7 @@ func (dir *RegistryDirectory) IsAvailable() bool { // Destroy method func (dir *RegistryDirectory) Destroy() { // TODO:unregister & unsubscribe - dir.BaseDirectory.Destroy(func() { + dir.Directory.Destroy(func() { invokers := dir.cacheInvokers dir.cacheInvokers = []protocol.Invoker{} for _, ivk := range invokers { diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index 686a77db23..8c57db09c7 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -29,7 +29,7 @@ import ( ) import ( - cluster "dubbo.apache.org/dubbo-go/v3/cluster/cluster_impl" + "dubbo.apache.org/dubbo-go/v3/cluster/cluster" "dubbo.apache.org/dubbo-go/v3/common" common_cfg "dubbo.apache.org/dubbo-go/v3/common/config" "dubbo.apache.org/dubbo-go/v3/common/constant"