diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go index ed30559ed3..0d39bff13e 100644 --- a/cluster/cluster_impl/base_cluster_invoker.go +++ b/cluster/cluster_impl/base_cluster_invoker.go @@ -121,41 +121,49 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker { if len(invokers) == 0 { - logger.Errorf("the invokers of %s is nil. ", invocation.Invoker().GetUrl().ServiceKey()) return nil } + go protocol.TryRefreshBlackList() if len(invokers) == 1 { - return invokers[0] + if invokers[0].IsAvailable() { + return invokers[0] + } + protocol.SetInvokerUnhealthyStatus(invokers[0]) + logger.Errorf("the invokers of %s is nil. ", invokers[0].GetUrl().ServiceKey()) + return nil } selectedInvoker := lb.Select(invokers, invocation) - //judge to if the selectedInvoker is invoked - + //judge if the selected Invoker is invoked and available if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) { + protocol.SetInvokerUnhealthyStatus(selectedInvoker) + otherInvokers := getOtherInvokers(invokers, selectedInvoker) // do reselect - var reslectInvokers []protocol.Invoker - - for _, invoker := range invokers { - if !invoker.IsAvailable() { + for i := 0; i < 3; i++ { + if len(otherInvokers) == 0 { + // no other ivk to reselect, return to fallback + break + } + reselectedInvoker := lb.Select(otherInvokers, invocation) + if isInvoked(reselectedInvoker, invoked) { + otherInvokers = getOtherInvokers(otherInvokers, reselectedInvoker) + continue + } + if !reselectedInvoker.IsAvailable() { logger.Infof("the invoker of %s is not available, maybe some network error happened or the server is shutdown.", invoker.GetUrl().Ip) + protocol.SetInvokerUnhealthyStatus(reselectedInvoker) + otherInvokers = getOtherInvokers(otherInvokers, reselectedInvoker) continue } - - if !isInvoked(invoker, invoked) { - reslectInvokers = append(reslectInvokers, invoker) - } - } - - if len(reslectInvokers) > 0 { - selectedInvoker = lb.Select(reslectInvokers, invocation) - } else { - logger.Errorf("all %d invokers is unavailable for %s.", len(invokers), selectedInvoker.GetUrl().String()) - return nil + return reselectedInvoker } + } else { + return selectedInvoker } - return selectedInvoker + logger.Errorf("all %d invokers is unavailable for %s.", len(invokers), selectedInvoker.GetUrl().String()) + return nil } func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { @@ -194,3 +202,13 @@ func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cl } return extension.GetLoadbalance(lb) } + +func getOtherInvokers(invokers []protocol.Invoker, invoker protocol.Invoker) []protocol.Invoker { + otherInvokers := make([]protocol.Invoker, 0) + for _, i := range invokers { + if i != invoker { + otherInvokers = append(otherInvokers, i) + } + } + return otherInvokers +} diff --git a/cluster/cluster_impl/failback_cluster_test.go b/cluster/cluster_impl/failback_cluster_test.go index 0edb81d428..d36e16e269 100644 --- a/cluster/cluster_impl/failback_cluster_test.go +++ b/cluster/cluster_impl/failback_cluster_test.go @@ -72,6 +72,8 @@ func TestFailbackSuceess(t *testing.T) { invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + invoker.EXPECT().IsAvailable().Return(true) + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) @@ -88,6 +90,7 @@ func TestFailbackRetryOneSuccess(t *testing.T) { clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + invoker.EXPECT().IsAvailable().Return(true) // failed at first mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} @@ -98,6 +101,7 @@ func TestFailbackRetryOneSuccess(t *testing.T) { wg.Add(1) now := time.Now() mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + invoker.EXPECT().IsAvailable().Return(true) invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result { delta := time.Since(now).Nanoseconds() / int64(time.Second) assert.True(t, delta >= 5) @@ -131,6 +135,7 @@ func TestFailbackRetryFailed(t *testing.T) { clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult) @@ -177,6 +182,7 @@ func TestFailbackRetryFailed10Times(t *testing.T) { clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker) clusterInvoker.maxRetries = 10 + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() // 10 task should failed firstly. @@ -220,6 +226,7 @@ func TestFailbackOutOfLimit(t *testing.T) { clusterInvoker.failbackTasks = 1 invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(11) diff --git a/cluster/cluster_impl/failfast_cluster_test.go b/cluster/cluster_impl/failfast_cluster_test.go index 77e8e9c5da..9ac06b8d4a 100644 --- a/cluster/cluster_impl/failfast_cluster_test.go +++ b/cluster/cluster_impl/failfast_cluster_test.go @@ -53,6 +53,7 @@ func registerFailfast(invoker *mock.MockInvoker) protocol.Invoker { invokers := []protocol.Invoker{} invokers = append(invokers, invoker) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() invoker.EXPECT().GetUrl().Return(failfastUrl) staticDir := directory.NewStaticDirectory(invokers) @@ -67,6 +68,7 @@ func TestFailfastInvokeSuccess(t *testing.T) { invoker := mock.NewMockInvoker(ctrl) clusterInvoker := registerFailfast(invoker) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes() mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} @@ -87,6 +89,7 @@ func TestFailfastInvokeFail(t *testing.T) { invoker := mock.NewMockInvoker(ctrl) clusterInvoker := registerFailfast(invoker) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes() mockResult := &protocol.RPCResult{Err: perrors.New("error")} diff --git a/cluster/cluster_impl/failsafe_cluster_test.go b/cluster/cluster_impl/failsafe_cluster_test.go index d9a716e1ae..5e208bddde 100644 --- a/cluster/cluster_impl/failsafe_cluster_test.go +++ b/cluster/cluster_impl/failsafe_cluster_test.go @@ -52,6 +52,7 @@ func registerFailsafe(invoker *mock.MockInvoker) protocol.Invoker { invokers := []protocol.Invoker{} invokers = append(invokers, invoker) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() invoker.EXPECT().GetUrl().Return(failbackUrl) @@ -67,6 +68,8 @@ func TestFailSafeInvokeSuccess(t *testing.T) { invoker := mock.NewMockInvoker(ctrl) clusterInvoker := registerFailsafe(invoker) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} @@ -85,6 +88,7 @@ func TestFailSafeInvokeFail(t *testing.T) { invoker := mock.NewMockInvoker(ctrl) clusterInvoker := registerFailsafe(invoker) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes() diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index d1025a152b..309cd4429c 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -82,6 +82,8 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) { routers := make([]router.PriorityRouter, 0, len(urls)) + rc := dir.routerChain + for _, url := range urls { routerKey := url.GetParam(constant.ROUTER_KEY, "") @@ -94,7 +96,7 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) { } } factory := extension.GetRouterFactory(url.Protocol) - r, err := factory.NewPriorityRouter(url) + r, err := factory.NewPriorityRouter(url, rc.GetNotifyChan()) if err != nil { logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err) return @@ -104,10 +106,8 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) { logger.Infof("Init file condition router success, size: %v", len(routers)) dir.mutex.Lock() - rc := dir.routerChain - dir.mutex.Unlock() - rc.AddRouters(routers) + dir.mutex.Unlock() } func (dir *BaseDirectory) isProperRouter(url *common.URL) bool { diff --git a/cluster/directory/base_directory_test.go b/cluster/directory/base_directory_test.go index 16e3c5a960..443f07de2c 100644 --- a/cluster/directory/base_directory_test.go +++ b/cluster/directory/base_directory_test.go @@ -28,6 +28,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/cluster/router/chain" _ "github.com/apache/dubbo-go/cluster/router/condition" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" @@ -50,7 +51,9 @@ func TestBuildRouterChain(t *testing.T) { regURL := url regURL.AddParam(constant.INTERFACE_KEY, "mock-app") directory := NewBaseDirectory(regURL) - + var err error + directory.routerChain, err = chain.NewRouterChain(regURL) + assert.Nil(t, err) localIP := common.GetLocalIp() rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP)) routeURL := getRouteURL(rule, anyURL) diff --git a/cluster/router/.gitkeep b/cluster/router/.gitkeep deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/cluster/router/chain.go b/cluster/router/chain.go index 3614d0a5a3..cb33cf927f 100644 --- a/cluster/router/chain.go +++ b/cluster/router/chain.go @@ -29,4 +29,6 @@ type Chain interface { SetInvokers([]protocol.Invoker) // AddRouters Add routers AddRouters([]PriorityRouter) + // GetNotifyChan get notify channel of this chain + GetNotifyChan() chan struct{} } diff --git a/cluster/router/chain/chain.go b/cluster/router/chain/chain.go index fccce838bb..13cb1ff352 100644 --- a/cluster/router/chain/chain.go +++ b/cluster/router/chain/chain.go @@ -20,12 +20,12 @@ package chain import ( "sort" "sync" - "sync/atomic" "time" ) import ( perrors "github.com/pkg/errors" + "go.uber.org/atomic" ) import ( @@ -38,9 +38,7 @@ import ( ) const ( - timeInterval = 5 * time.Second - timeThreshold = 2 * time.Second - countThreshold = 5 + timeInterval = 5 * time.Second ) // RouterChain Router chain @@ -65,8 +63,10 @@ type RouterChain struct { notify chan struct{} // Address cache cache atomic.Value - // init - init sync.Once +} + +func (c *RouterChain) GetNotifyChan() chan struct{} { + return c.notify } // Route Loop routers in RouterChain and call Route method to determine the target invokers list. @@ -104,6 +104,9 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) { c.mutex.Lock() defer c.mutex.Unlock() c.routers = newRouters + go func() { + c.notify <- struct{}{} + }() } // SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and @@ -113,32 +116,21 @@ func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) { c.invokers = invokers c.mutex.Unlock() - // it should trigger init router for first call - c.init.Do(func() { - go func() { - c.notify <- struct{}{} - }() - }) - - c.count++ - now := time.Now() - if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold { - c.last = now - c.count = 0 - go func() { - c.notify <- struct{}{} - }() - } + go func() { + c.notify <- struct{}{} + }() } -// loop listens on events to update the address cache when it's necessary, either when it receives notification -// from address update, or when timeInterval exceeds. +// loop listens on events to update the address cache when it receives notification +// from address update, func (c *RouterChain) loop() { ticker := time.NewTicker(timeInterval) for { select { case <-ticker.C: - c.buildCache() + if protocol.GetAndRefreshState() { + c.buildCache() + } case <-c.notify: c.buildCache() } @@ -235,9 +227,15 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) { if len(routerFactories) == 0 { return nil, perrors.Errorf("No routerFactory exits , create one please") } + + chain := &RouterChain{ + last: time.Now(), + notify: make(chan struct{}), + } + routers := make([]router.PriorityRouter, 0, len(routerFactories)) for key, routerFactory := range routerFactories { - r, err := routerFactory().NewPriorityRouter(url) + r, err := routerFactory().NewPriorityRouter(url, chain.notify) if r == nil || err != nil { logger.Errorf("router chain build router fail! routerFactories key:%s error:%s", key, err.Error()) continue @@ -250,12 +248,10 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) { sortRouter(newRouters) - chain := &RouterChain{ - builtinRouters: routers, - routers: newRouters, - last: time.Now(), - notify: make(chan struct{}), - } + routerNeedsUpdateInit := atomic.Bool{} + routerNeedsUpdateInit.Store(false) + chain.routers = newRouters + chain.builtinRouters = routers if url != nil { chain.url = url } diff --git a/cluster/router/chain/chain_test.go b/cluster/router/chain/chain_test.go index 1bb7124a4f..779f4c7c00 100644 --- a/cluster/router/chain/chain_test.go +++ b/cluster/router/chain/chain_test.go @@ -148,7 +148,12 @@ conditions: url := getConditionRouteUrl(applicationKey) assert.NotNil(t, url) factory := extension.GetRouterFactory(url.Protocol) - r, err := factory.NewPriorityRouter(url) + notify := make(chan struct{}) + go func() { + for range notify { + } + }() + r, err := factory.NewPriorityRouter(url, notify) assert.Nil(t, err) assert.NotNil(t, r) diff --git a/cluster/router/condition/app_router.go b/cluster/router/condition/app_router.go index 056e32851c..64b3914b87 100644 --- a/cluster/router/condition/app_router.go +++ b/cluster/router/condition/app_router.go @@ -34,14 +34,15 @@ const ( // AppRouter For listen application router with config center type AppRouter struct { listenableRouter + notify interface{} } // NewAppRouter Init AppRouter by url -func NewAppRouter(url *common.URL) (*AppRouter, error) { +func NewAppRouter(url *common.URL, notify chan struct{}) (*AppRouter, error) { if url == nil { return nil, perrors.Errorf("No route URL for create app router!") } - appRouter, err := newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, "")) + appRouter, err := newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, ""), notify) if err != nil { return nil, err } diff --git a/cluster/router/condition/app_router_test.go b/cluster/router/condition/app_router_test.go index c8bc43eb33..5fb776a44b 100644 --- a/cluster/router/condition/app_router_test.go +++ b/cluster/router/condition/app_router_test.go @@ -81,7 +81,12 @@ conditions: assert.NotNil(t, configuration) appRouteURL := getAppRouteURL(routerKey) - appRouter, err := NewAppRouter(appRouteURL) + notify := make(chan struct{}) + go func() { + for range notify { + } + }() + appRouter, err := NewAppRouter(appRouteURL, notify) assert.Nil(t, err) assert.NotNil(t, appRouter) @@ -132,7 +137,12 @@ conditions: assert.NotNil(t, configuration) appRouteURL := getAppRouteURL(routerKey) - appRouter, err := NewAppRouter(appRouteURL) + notify := make(chan struct{}) + go func() { + for range notify { + } + }() + appRouter, err := NewAppRouter(appRouteURL, notify) assert.Nil(t, err) assert.NotNil(t, appRouter) @@ -174,7 +184,12 @@ conditions: assert.NotNil(t, configuration) appRouteURL := getAppRouteURL(routerKey) - appRouter, err := NewAppRouter(appRouteURL) + notify := make(chan struct{}) + go func() { + for range notify { + } + }() + appRouter, err := NewAppRouter(appRouteURL, notify) assert.Nil(t, err) assert.NotNil(t, appRouter) diff --git a/cluster/router/condition/factory.go b/cluster/router/condition/factory.go index f8d3e13010..2c1f03516a 100644 --- a/cluster/router/condition/factory.go +++ b/cluster/router/condition/factory.go @@ -37,8 +37,8 @@ func newConditionRouterFactory() router.PriorityRouterFactory { } // NewPriorityRouter creates ConditionRouterFactory by URL -func (c *ConditionRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) { - return NewConditionRouter(url) +func (c *ConditionRouterFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) { + return NewConditionRouter(url, notify) } // NewRouter Create FileRouterFactory by Content @@ -54,6 +54,6 @@ func newAppRouterFactory() router.PriorityRouterFactory { } // NewPriorityRouter creates AppRouterFactory by URL -func (c *AppRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) { - return NewAppRouter(url) +func (c *AppRouterFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) { + return NewAppRouter(url, notify) } diff --git a/cluster/router/condition/factory_test.go b/cluster/router/condition/factory_test.go index c916588eee..e08016d13e 100644 --- a/cluster/router/condition/factory_test.go +++ b/cluster/router/condition/factory_test.go @@ -132,37 +132,47 @@ func (bi *MockInvoker) Destroy() { func TestRoute_matchWhen(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4")) - router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + notify := make(chan struct{}) + go func() { + for range notify { + } + }() + router, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) cUrl, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, factory1111Ip)) matchWhen := router.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen) rule1 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1)) + router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1), notify) matchWhen1 := router1.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen1) rule2 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 & host !=1.1.1.1 => host = 1.2.3.4")) - router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2)) + router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify) matchWhen2 := router2.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, false, matchWhen2) rule3 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3)) + router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify) matchWhen3 := router3.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen3) rule4 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4)) + router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4), notify) matchWhen4 := router4.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen4) rule5 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.1 => host = 1.2.3.4")) - router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5)) + router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5), notify) matchWhen5 := router5.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, false, matchWhen5) rule6 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.2 => host = 1.2.3.4")) - router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6)) + router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6), notify) matchWhen6 := router6.(*ConditionRouter).MatchWhen(cUrl, inv) assert.Equal(t, true, matchWhen6) } func TestRoute_matchFilter(t *testing.T) { + notify := make(chan struct{}) + go func() { + for range notify { + } + }() localIP := common.GetLocalIp() t.Logf("The local ip is %s", localIP) url1, _ := common.NewURL("dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson") @@ -175,12 +185,12 @@ func TestRoute_matchFilter(t *testing.T) { rule4 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.2,10.20.3.3,10.20.3.4")) rule5 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host != 10.20.3.3")) rule6 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " serialization = fastjson")) - router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1)) - router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2)) - router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3)) - router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4)) - router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5)) - router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6)) + router1, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule1), notify) + router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify) + router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify) + router4, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule4), notify) + router5, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule5), notify) + router6, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule6), notify) cUrl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) ret1 := router1.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), cUrl, &invocation.RPCInvocation{}) ret2 := router2.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), cUrl, &invocation.RPCInvocation{}) @@ -198,9 +208,14 @@ func TestRoute_matchFilter(t *testing.T) { } func TestRoute_methodRoute(t *testing.T) { + notify := make(chan struct{}) + go func() { + for range notify { + } + }() inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{})) rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) url, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo") matchWhen := r.(*ConditionRouter).MatchWhen(url, inv) assert.Equal(t, true, matchWhen) @@ -209,42 +224,57 @@ func TestRoute_methodRoute(t *testing.T) { assert.Equal(t, true, matchWhen) url2, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip)) rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4")) - router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2)) + router2, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule2), notify) matchWhen = router2.(*ConditionRouter).MatchWhen(url2, inv) assert.Equal(t, false, matchWhen) url3, _ := common.NewURL(fmt.Sprintf(factoryConsumerMethodFormat, factory1111Ip)) rule3 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host=1.1.1.1 => host = 1.2.3.4")) - router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3)) + router3, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule3), notify) matchWhen = router3.(*ConditionRouter).MatchWhen(url3, inv) assert.Equal(t, true, matchWhen) } func TestRoute_ReturnFalse(t *testing.T) { + notify := make(chan struct{}) + go func() { + for range notify { + } + }() url, _ := common.NewURL("") localIP := common.GetLocalIp() invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)} inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false")) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, 0, len(ret.ToArray())) } func TestRoute_ReturnEmpty(t *testing.T) { + notify := make(chan struct{}) + go func() { + for range notify { + } + }() localIP := common.GetLocalIp() url, _ := common.NewURL("") invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)} inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => ")) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, 0, len(ret.ToArray())) } func TestRoute_ReturnAll(t *testing.T) { + notify := make(chan struct{}) + go func() { + for range notify { + } + }() localIP := common.GetLocalIp() urlString := "dubbo://" + localIP + "/com.foo.BarService" dubboURL, _ := common.NewURL(urlString) @@ -255,7 +285,7 @@ func TestRoute_ReturnAll(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP)) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, len(invokers), len(ret.ToArray())) } @@ -265,6 +295,11 @@ func TestRoute_HostFilter(t *testing.T) { url1, _ := common.NewURL(factory333URL) url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) url3, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) + notify := make(chan struct{}) + go func() { + for range notify { + } + }() invoker1 := NewMockInvoker(url1, 1) invoker2 := NewMockInvoker(url2, 2) invoker3 := NewMockInvoker(url3, 3) @@ -272,7 +307,7 @@ func TestRoute_HostFilter(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP)) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, 2, len(ret.ToArray())) assert.Equal(t, invoker2, invokers[ret.ToArray()[0]]) @@ -280,6 +315,11 @@ func TestRoute_HostFilter(t *testing.T) { } func TestRoute_Empty_HostFilter(t *testing.T) { + notify := make(chan struct{}) + go func() { + for range notify { + } + }() localIP := common.GetLocalIp() url1, _ := common.NewURL(factory333URL) url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) @@ -291,7 +331,7 @@ func TestRoute_Empty_HostFilter(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte(" => " + " host = " + localIP)) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, 2, len(ret.ToArray())) assert.Equal(t, invoker2, invokers[ret.ToArray()[0]]) @@ -299,6 +339,11 @@ func TestRoute_Empty_HostFilter(t *testing.T) { } func TestRoute_False_HostFilter(t *testing.T) { + notify := make(chan struct{}) + go func() { + for range notify { + } + }() localIP := common.GetLocalIp() url1, _ := common.NewURL(factory333URL) url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) @@ -310,7 +355,7 @@ func TestRoute_False_HostFilter(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP)) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, 2, len(ret.ToArray())) assert.Equal(t, invoker2, invokers[ret.ToArray()[0]]) @@ -318,6 +363,11 @@ func TestRoute_False_HostFilter(t *testing.T) { } func TestRoute_Placeholder(t *testing.T) { + notify := make(chan struct{}) + go func() { + for range notify { + } + }() localIP := common.GetLocalIp() url1, _ := common.NewURL(factory333URL) url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) @@ -329,7 +379,7 @@ func TestRoute_Placeholder(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = $host")) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrl(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, 2, len(ret.ToArray())) assert.Equal(t, invoker2, invokers[ret.ToArray()[0]]) @@ -337,6 +387,11 @@ func TestRoute_Placeholder(t *testing.T) { } func TestRoute_NoForce(t *testing.T) { + notify := make(chan struct{}) + go func() { + for range notify { + } + }() localIP := common.GetLocalIp() url1, _ := common.NewURL(factory333URL) url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) @@ -348,12 +403,17 @@ func TestRoute_NoForce(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP))) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule)) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithNoForce(rule), notify) ret := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, len(invokers), len(ret.ToArray())) } func TestRoute_Force(t *testing.T) { + notify := make(chan struct{}) + go func() { + for range notify { + } + }() localIP := common.GetLocalIp() url1, _ := common.NewURL(factory333URL) url2, _ := common.NewURL(fmt.Sprintf(factoryDubboFormat, localIP)) @@ -365,7 +425,7 @@ func TestRoute_Force(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf(factoryHostIp1234Format, localIP))) curl, _ := common.NewURL(fmt.Sprintf(factoryConsumerFormat, localIP)) - r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true")) + r, _ := newConditionRouterFactory().NewPriorityRouter(getRouteUrlWithForce(rule, "true"), notify) fileredInvokers := r.Route(utils.ToBitmap(invokers), setUpAddrCache(invokers), curl, inv) assert.Equal(t, 0, len(fileredInvokers.ToArray())) } diff --git a/cluster/router/condition/listenable_router.go b/cluster/router/condition/listenable_router.go index 102d766a8a..2e55b2075d 100644 --- a/cluster/router/condition/listenable_router.go +++ b/cluster/router/condition/listenable_router.go @@ -47,8 +47,9 @@ type listenableRouter struct { conditionRouters []*ConditionRouter routerRule *RouterRule url *common.URL - //force bool - priority int64 + force bool + priority int64 + notify chan struct{} } // RouterRule Get RouterRule instance from listenableRouter @@ -56,7 +57,7 @@ func (l *listenableRouter) RouterRule() *RouterRule { return l.routerRule } -func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) { +func newListenableRouter(url *common.URL, ruleKey string, notify chan struct{}) (*AppRouter, error) { if ruleKey == "" { return nil, perrors.Errorf("NewListenableRouter ruleKey is nil, can't create Listenable router") } @@ -64,6 +65,7 @@ func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) { l.url = url l.priority = listenableRouterDefaultPriority + l.notify = notify routerKey := ruleKey + constant.ConditionRouterRuleSuffix // add listener @@ -110,6 +112,9 @@ func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) { return } l.generateConditions(routerRule) + go func() { + l.notify <- struct{}{} + }() } func (l *listenableRouter) generateConditions(rule *RouterRule) { diff --git a/cluster/router/condition/router.go b/cluster/router/condition/router.go index 0817b32843..d543ca3f94 100644 --- a/cluster/router/condition/router.go +++ b/cluster/router/condition/router.go @@ -62,6 +62,7 @@ type ConditionRouter struct { enabled bool WhenCondition map[string]MatchPair ThenCondition map[string]MatchPair + notify chan struct{} } // NewConditionRouterWithRule Init condition router by raw rule @@ -111,7 +112,7 @@ func NewConditionRouterWithRule(rule string) (*ConditionRouter, error) { } // NewConditionRouter Init condition router by URL -func NewConditionRouter(url *common.URL) (*ConditionRouter, error) { +func NewConditionRouter(url *common.URL, notify chan struct{}) (*ConditionRouter, error) { if url == nil { return nil, perrors.Errorf("Illegal route URL!") } @@ -135,6 +136,7 @@ func NewConditionRouter(url *common.URL) (*ConditionRouter, error) { router.priority = url.GetParamInt(constant.RouterPriority, defaultPriority) router.Force = url.GetParamBool(constant.RouterForce, false) router.enabled = url.GetParamBool(constant.RouterEnabled, true) + router.notify = notify return router, nil } diff --git a/cluster/router/condition/router_test.go b/cluster/router/condition/router_test.go index a344b64efb..2895703dbc 100644 --- a/cluster/router/condition/router_test.go +++ b/cluster/router/condition/router_test.go @@ -58,8 +58,13 @@ func TestParseRule(t *testing.T) { } func TestNewConditionRouter(t *testing.T) { + notify := make(chan struct{}) + go func() { + for range notify { + } + }() url, _ := common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`) - router, err := NewConditionRouter(url) + router, err := NewConditionRouter(url, notify) assert.Nil(t, err) assert.Equal(t, true, router.Enabled()) assert.Equal(t, true, router.Force) @@ -69,22 +74,22 @@ func TestNewConditionRouter(t *testing.T) { assert.EqualValues(t, router.WhenCondition, whenRule) assert.EqualValues(t, router.ThenCondition, thenRule) - router, err = NewConditionRouter(nil) + router, err = NewConditionRouter(nil, notify) assert.Nil(t, router) assert.Error(t, err) url, _ = common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&priority=1&router=condition&rule=YSAmT4gYiAmIGQ%3D`) - router, err = NewConditionRouter(url) + router, err = NewConditionRouter(url, notify) assert.Nil(t, router) assert.Error(t, err) url, _ = common.NewURL(`condition://0.0.0.0:?application=mock-app&category=routers&force=true&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`) - router, err = NewConditionRouter(url) + router, err = NewConditionRouter(url, notify) assert.Nil(t, err) assert.Equal(t, int64(150), router.Priority()) url, _ = common.NewURL(`condition://0.0.0.0:?category=routers&force=true&interface=mock-service&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D`) - router, err = NewConditionRouter(url) + router, err = NewConditionRouter(url, notify) assert.Nil(t, err) assert.Equal(t, int64(140), router.Priority()) } diff --git a/cluster/router/conn_checker.go b/cluster/router/conn_checker.go new file mode 100644 index 0000000000..fda0ef868b --- /dev/null +++ b/cluster/router/conn_checker.go @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package router + +import ( + "github.com/apache/dubbo-go/protocol" +) + +// ConnChecker is used to determine whether the invoker is healthy or not +type ConnChecker interface { + // IsConnHealthy evaluates the healthy state on the given Invoker + IsConnHealthy(invoker protocol.Invoker) bool +} diff --git a/cluster/router/conncheck/conn_check_route.go b/cluster/router/conncheck/conn_check_route.go new file mode 100644 index 0000000000..97f049d85b --- /dev/null +++ b/cluster/router/conncheck/conn_check_route.go @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package conncheck + +import ( + "github.com/RoaringBitmap/roaring" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/utils" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +const ( + connHealthy = "conn-healthy" + name = "conn-check-router" +) + +// ConnCheckRouter provides a health-first routing mechanism through ConnChecker +type ConnCheckRouter struct { + url *common.URL + checker router.ConnChecker + notify chan struct{} +} + +// NewConnCheckRouter construct an NewConnCheckRouter via url +func NewConnCheckRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) { + r := &ConnCheckRouter{ + url: url, + notify: notify, + } + checkerName := url.GetParam(constant.HEALTH_CHECKER, constant.DEFAULT_HEALTH_CHECKER) + r.checker = extension.GetConnChecker(checkerName, url) + return r, nil +} + +// Route gets a list of healthy invoker +func (r *ConnCheckRouter) Route(invokers *roaring.Bitmap, cache router.Cache, url *common.URL, invocation protocol.Invocation) *roaring.Bitmap { + addrPool := cache.FindAddrPool(r) + // Add healthy invoker to the list + healthyInvokers := utils.JoinIfNotEqual(addrPool[connHealthy], invokers) + // If all invokers are considered unhealthy, downgrade to all invoker + if healthyInvokers.IsEmpty() { + logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey()) + return invokers + } + return healthyInvokers +} + +// Pool separates healthy invokers from others. +func (r *ConnCheckRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) { + rb := make(router.AddrPool, 8) + rb[connHealthy] = roaring.NewBitmap() + for i, invoker := range invokers { + if r.checker.IsConnHealthy(invoker) { + rb[connHealthy].Add(uint32(i)) + } + } + return rb, nil +} + +// ShouldPool will always return true to make sure healthy check constantly. +func (r *ConnCheckRouter) ShouldPool() bool { + return true +} + +// Name get name of ConnCheckerRouter +func (r *ConnCheckRouter) Name() string { + return name +} + +// Priority get Router priority level +func (r *ConnCheckRouter) Priority() int64 { + return 0 +} + +// URL Return URL in router +func (r *ConnCheckRouter) URL() *common.URL { + return r.url +} + +// ConnChecker returns the HealthChecker bound to this HealthCheckRouter +func (r *ConnCheckRouter) ConnChecker() router.ConnChecker { + return r.checker +} diff --git a/cluster/router/conncheck/conn_check_route_test.go b/cluster/router/conncheck/conn_check_route_test.go new file mode 100644 index 0000000000..fec733167f --- /dev/null +++ b/cluster/router/conncheck/conn_check_route_test.go @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package conncheck + +import ( + "fmt" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/chain" + "github.com/apache/dubbo-go/cluster/router/utils" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +const ( + connCheckRoute1010IP = "192.168.10.10" + connCheckRoute1011IP = "192.168.10.11" + connCheckRoute1012IP = "192.168.10.12" + connCheckRouteMethodNameTest = "test" + connCheck1001URL = "dubbo://192.168.10.1/com.ikurento.user.UserProvider" + connCheckRouteUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider" +) + +func TestConnCheckRouterRoute(t *testing.T) { + defer protocol.CleanAllStatus() + notify := make(chan struct{}) + go func() { + for range notify { + } + }() + consumerURL, _ := common.NewURL(connCheck1001URL) + url1, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1010IP)) + url2, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1011IP)) + url3, _ := common.NewURL(fmt.Sprintf(connCheckRouteUrlFormat, connCheckRoute1012IP)) + hcr, _ := NewConnCheckRouter(consumerURL, notify) + + var invokers []protocol.Invoker + invoker1 := NewMockInvoker(url1) + invoker2 := NewMockInvoker(url2) + invoker3 := NewMockInvoker(url3) + protocol.SetInvokerUnhealthyStatus(invoker1) + protocol.SetInvokerUnhealthyStatus(invoker2) + + invokers = append(invokers, invoker1, invoker2, invoker3) + inv := invocation.NewRPCInvocation(connCheckRouteMethodNameTest, nil, nil) + res := hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*ConnCheckRouter), invokers), consumerURL, inv) + + // now invoker3 is healthy + assert.True(t, len(res.ToArray()) == 1) + + // check blacklist remove + protocol.RemoveInvokerUnhealthyStatus(invoker1) + res = hcr.Route(utils.ToBitmap(invokers), setUpAddrCache(hcr.(*ConnCheckRouter), invokers), consumerURL, inv) + // now invoker3 invoker1 is healthy + assert.True(t, len(res.ToArray()) == 2) + +} + +func TestRecovery(t *testing.T) { + // check recover + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker1 := mock.NewMockInvoker(ctrl) + invoker2 := mock.NewMockInvoker(ctrl) + + invoker1.EXPECT().GetUrl().Return(&common.URL{Path: "path1"}).AnyTimes() + invoker2.EXPECT().GetUrl().Return(&common.URL{Path: "path2"}).AnyTimes() + invoker1.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker2.EXPECT().IsAvailable().Return(true).AnyTimes() + + protocol.SetInvokerUnhealthyStatus(invoker1) + protocol.SetInvokerUnhealthyStatus(invoker2) + assert.Equal(t, len(protocol.GetBlackListInvokers(16)), 2) + protocol.TryRefreshBlackList() + assert.Equal(t, len(protocol.GetBlackListInvokers(16)), 0) +} + +func setUpAddrCache(r router.Poolable, addrs []protocol.Invoker) router.Cache { + pool, info := r.Pool(addrs) + cache := chain.BuildCache(addrs) + cache.SetAddrMeta(r.Name(), info) + cache.SetAddrPool(r.Name(), pool) + return cache +} diff --git a/cluster/router/conncheck/conn_health_check.go b/cluster/router/conncheck/conn_health_check.go new file mode 100644 index 0000000000..9f05b0695f --- /dev/null +++ b/cluster/router/conncheck/conn_health_check.go @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package conncheck + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +func init() { + extension.SetConnChecker(constant.DEFAULT_CONN_CHECKER, NewDefaultConnChecker) +} + +// DefaultConnChecker is the default implementation of ConnChecker, which determines the health status of invoker conn +type DefaultConnChecker struct { +} + +// IsConnHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request +// and the current active request +func (c *DefaultConnChecker) IsConnHealthy(invoker protocol.Invoker) bool { + return protocol.GetInvokerHealthyStatus(invoker) +} + +// NewDefaultConnChecker constructs a new DefaultConnChecker based on the url +func NewDefaultConnChecker(url *common.URL) router.ConnChecker { + return &DefaultConnChecker{} +} diff --git a/cluster/router/conncheck/conn_health_check_test.go b/cluster/router/conncheck/conn_health_check_test.go new file mode 100644 index 0000000000..44d148e1b8 --- /dev/null +++ b/cluster/router/conncheck/conn_health_check_test.go @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package conncheck + +import ( + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +const ( + connCheckDubbo1010IP = "192.168.10.10" + connCheckDubboUrlFormat = "dubbo://%s:20000/com.ikurento.user.UserProvider" +) + +func TestDefaultConnCheckerIsHealthy(t *testing.T) { + defer protocol.CleanAllStatus() + url, _ := common.NewURL(fmt.Sprintf(connCheckDubboUrlFormat, connCheckDubbo1010IP)) + cc := NewDefaultConnChecker(url).(*DefaultConnChecker) + invoker := NewMockInvoker(url) + healthy := cc.IsConnHealthy(invoker) + assert.True(t, healthy) + + invoker = NewMockInvoker(url) + cc = NewDefaultConnChecker(url).(*DefaultConnChecker) + // add to black list + protocol.SetInvokerUnhealthyStatus(invoker) + assert.False(t, cc.IsConnHealthy(invoker)) +} diff --git a/cluster/router/conncheck/factory.go b/cluster/router/conncheck/factory.go new file mode 100644 index 0000000000..a7b19aaea6 --- /dev/null +++ b/cluster/router/conncheck/factory.go @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package conncheck + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" +) + +func init() { + extension.SetRouterFactory(constant.ConnCheckRouterName, newConnCheckRouteFactory) +} + +// ConnCheckRouteFactory is the factory to create conn check router, it aims at filter ip with unhealthy status +// the unhealthy status is storied in protocol/rpc_status.go with sync.Map +type ConnCheckRouteFactory struct { +} + +// newConnCheckRouteFactory construct a new ConnCheckRouteFactory +func newConnCheckRouteFactory() router.PriorityRouterFactory { + return &ConnCheckRouteFactory{} +} + +// NewPriorityRouter construct a new NewConnCheckRouter via url +func (f *ConnCheckRouteFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) { + return NewConnCheckRouter(url, notify) +} diff --git a/cluster/router/conncheck/factory_test.go b/cluster/router/conncheck/factory_test.go new file mode 100644 index 0000000000..02f8fb472e --- /dev/null +++ b/cluster/router/conncheck/factory_test.go @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package conncheck + +import ( + "context" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +// nolint +type MockInvoker struct { + url *common.URL +} + +// nolint +func NewMockInvoker(url *common.URL) *MockInvoker { + return &MockInvoker{ + url: url, + } +} + +// nolint +func (bi *MockInvoker) GetUrl() *common.URL { + return bi.url +} + +// nolint +func (bi *MockInvoker) IsAvailable() bool { + return true +} + +// nolint +func (bi *MockInvoker) IsDestroyed() bool { + return true +} + +// nolint +func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result { + return nil +} + +// nolint +func (bi *MockInvoker) Destroy() { +} + +// nolint +func TestHealthCheckRouteFactory(t *testing.T) { + factory := newConnCheckRouteFactory() + assert.NotNil(t, factory) +} diff --git a/cluster/router/healthcheck/default_health_check.go b/cluster/router/healthcheck/default_health_check.go index 378463be56..eb15e6f642 100644 --- a/cluster/router/healthcheck/default_health_check.go +++ b/cluster/router/healthcheck/default_health_check.go @@ -48,10 +48,6 @@ type DefaultHealthChecker struct { // IsHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request // and the current active request func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool { - if !invoker.IsAvailable() { - return false - } - urlStatus := protocol.GetURLStatus(invoker.GetUrl()) if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.GetOutStandingRequestCountLimit() { logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key()) diff --git a/cluster/router/healthcheck/factory.go b/cluster/router/healthcheck/factory.go index 40c9dd7ab9..a9054c7714 100644 --- a/cluster/router/healthcheck/factory.go +++ b/cluster/router/healthcheck/factory.go @@ -38,6 +38,6 @@ func newHealthCheckRouteFactory() router.PriorityRouterFactory { } // NewPriorityRouter construct a new NewHealthCheckRouter via url -func (f *HealthCheckRouteFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) { - return NewHealthCheckRouter(url) +func (f *HealthCheckRouteFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) { + return NewHealthCheckRouter(url, notify) } diff --git a/cluster/router/healthcheck/health_check_route.go b/cluster/router/healthcheck/health_check_route.go index 1a878af212..bd9543e1b2 100644 --- a/cluster/router/healthcheck/health_check_route.go +++ b/cluster/router/healthcheck/health_check_route.go @@ -32,9 +32,8 @@ import ( ) const ( - HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled" - healthy = "healthy" - name = "health-check-router" + healthy = "healthy" + name = "health-check-router" ) // HealthCheckRouter provides a health-first routing mechanism through HealthChecker @@ -42,13 +41,15 @@ type HealthCheckRouter struct { url *common.URL enabled bool checker router.HealthChecker + notify chan struct{} } // NewHealthCheckRouter construct an HealthCheckRouter via url -func NewHealthCheckRouter(url *common.URL) (router.PriorityRouter, error) { +func NewHealthCheckRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) { r := &HealthCheckRouter{ url: url, - enabled: url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false), + enabled: url.GetParamBool(constant.HEALTH_ROUTE_ENABLED_KEY, false), + notify: notify, } if r.enabled { checkerName := url.GetParam(constant.HEALTH_CHECKER, constant.DEFAULT_HEALTH_CHECKER) @@ -87,7 +88,6 @@ func (r *HealthCheckRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, rb[healthy].Add(uint32(i)) } } - return rb, nil } diff --git a/cluster/router/healthcheck/health_check_route_test.go b/cluster/router/healthcheck/health_check_route_test.go index 0730f105b7..f499c85c09 100644 --- a/cluster/router/healthcheck/health_check_route_test.go +++ b/cluster/router/healthcheck/health_check_route_test.go @@ -49,12 +49,17 @@ const ( func TestHealthCheckRouterRoute(t *testing.T) { defer protocol.CleanAllStatus() + notify := make(chan struct{}) + go func() { + for range notify { + } + }() consumerURL, _ := common.NewURL(healthCheck1001URL) - consumerURL.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") + consumerURL.SetParam(constant.HEALTH_ROUTE_ENABLED_KEY, "true") url1, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1010IP)) url2, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1011IP)) url3, _ := common.NewURL(fmt.Sprintf(healthCheckRouteUrlFormat, healthCheckRoute1012IP)) - hcr, _ := NewHealthCheckRouter(consumerURL) + hcr, _ := NewHealthCheckRouter(consumerURL, notify) var invokers []protocol.Invoker invoker1 := NewMockInvoker(url1) @@ -112,13 +117,18 @@ func TestHealthCheckRouterRoute(t *testing.T) { func TestNewHealthCheckRouter(t *testing.T) { defer protocol.CleanAllStatus() + notify := make(chan struct{}) + go func() { + for range notify { + } + }() url, _ := common.NewURL(fmt.Sprintf(healthCheckDubboUrlFormat, healthCheckDubbo1010IP)) - hcr, _ := NewHealthCheckRouter(url) + hcr, _ := NewHealthCheckRouter(url, notify) h := hcr.(*HealthCheckRouter) assert.Nil(t, h.checker) - url.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") - hcr, _ = NewHealthCheckRouter(url) + url.SetParam(constant.HEALTH_ROUTE_ENABLED_KEY, "true") + hcr, _ = NewHealthCheckRouter(url, notify) h = hcr.(*HealthCheckRouter) assert.NotNil(t, h.checker) @@ -130,7 +140,7 @@ func TestNewHealthCheckRouter(t *testing.T) { url.SetParam(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500") url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") - hcr, _ = NewHealthCheckRouter(url) + hcr, _ = NewHealthCheckRouter(url, notify) h = hcr.(*HealthCheckRouter) dhc = h.checker.(*DefaultHealthChecker) assert.Equal(t, dhc.outStandingRequestConutLimit, int32(1000)) diff --git a/cluster/router/router.go b/cluster/router/router.go index 8a19dcf8cc..1d71554f6b 100644 --- a/cluster/router/router.go +++ b/cluster/router/router.go @@ -30,7 +30,7 @@ import ( // PriorityRouterFactory creates creates priority router with url type PriorityRouterFactory interface { // NewPriorityRouter creates router instance with URL - NewPriorityRouter(*common.URL) (PriorityRouter, error) + NewPriorityRouter(*common.URL, chan struct{}) (PriorityRouter, error) } // FilePriorityRouterFactory creates priority router with parse config file diff --git a/cluster/router/tag/factory.go b/cluster/router/tag/factory.go index a5d989cd31..fd2c15cddf 100644 --- a/cluster/router/tag/factory.go +++ b/cluster/router/tag/factory.go @@ -37,8 +37,8 @@ func NewTagRouterFactory() router.PriorityRouterFactory { // NewPriorityRouter create a tagRouter by tagRouterFactory with a url // The url contains router configuration information -func (c *tagRouterFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) { - return NewTagRouter(url) +func (c *tagRouterFactory) NewPriorityRouter(url *common.URL, notify chan struct{}) (router.PriorityRouter, error) { + return NewTagRouter(url, notify) } // NewFileRouter create a tagRouter by profile content diff --git a/cluster/router/tag/factory_test.go b/cluster/router/tag/factory_test.go index b350bb2a91..2a8eac20ba 100644 --- a/cluster/router/tag/factory_test.go +++ b/cluster/router/tag/factory_test.go @@ -39,7 +39,12 @@ func TestTagRouterFactoryNewRouter(t *testing.T) { u1, err := common.NewURL(fmt.Sprintf(factoryFormat, factoryLocalIP)) assert.Nil(t, err) factory := NewTagRouterFactory() - tagRouter, e := factory.NewPriorityRouter(u1) + notify := make(chan struct{}) + go func() { + for range notify { + } + }() + tagRouter, e := factory.NewPriorityRouter(u1, notify) assert.Nil(t, e) assert.NotNil(t, tagRouter) } diff --git a/cluster/router/tag/file.go b/cluster/router/tag/file.go index a51ae69cf2..81c25109dd 100644 --- a/cluster/router/tag/file.go +++ b/cluster/router/tag/file.go @@ -45,6 +45,7 @@ type FileTagRouter struct { } // NewFileTagRouter Create file tag router instance with content (from config file) +// todo fix this router, now it is useless, tag router is nil func NewFileTagRouter(content []byte) (*FileTagRouter, error) { fileRouter := &FileTagRouter{} rule, err := getRule(string(content)) @@ -52,7 +53,8 @@ func NewFileTagRouter(content []byte) (*FileTagRouter, error) { return nil, perrors.Errorf("yaml.Unmarshal() failed , error:%v", perrors.WithStack(err)) } fileRouter.routerRule = rule - fileRouter.router, err = NewTagRouter(fileRouter.URL()) + notify := make(chan struct{}) + fileRouter.router, err = NewTagRouter(fileRouter.URL(), notify) return fileRouter, err } diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index 984ecb4eef..3d0393a487 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -76,10 +76,11 @@ type tagRouter struct { application string ruleChanged bool mutex sync.RWMutex + notify chan struct{} } // NewTagRouter returns a tagRouter instance if url is not nil -func NewTagRouter(url *common.URL) (*tagRouter, error) { +func NewTagRouter(url *common.URL, notify chan struct{}) (*tagRouter, error) { if url == nil { return nil, perrors.Errorf("Illegal route URL!") } @@ -87,6 +88,7 @@ func NewTagRouter(url *common.URL) (*tagRouter, error) { url: url, enabled: url.GetParamBool(constant.RouterEnabled, true), priority: url.GetParamInt(constant.RouterPriority, 0), + notify: notify, }, nil } @@ -191,6 +193,7 @@ func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) { defer c.mutex.Unlock() c.tagRouterRule = routerRule c.ruleChanged = true + c.notify <- struct{}{} } // URL gets the url of tagRouter diff --git a/cluster/router/tag/tag_router_test.go b/cluster/router/tag/tag_router_test.go index 26c4b8352b..efee5bd684 100644 --- a/cluster/router/tag/tag_router_test.go +++ b/cluster/router/tag/tag_router_test.go @@ -119,17 +119,27 @@ func (bi *MockInvoker) Destroy() { func TestTagRouterPriority(t *testing.T) { u1, err := common.NewURL(tagRouterTestUserConsumerTag) + notify := make(chan struct{}) + go func() { + for range notify { + } + }() assert.Nil(t, err) - tagRouter, e := NewTagRouter(u1) + tagRouter, e := NewTagRouter(u1, notify) assert.Nil(t, e) p := tagRouter.Priority() assert.Equal(t, int64(0), p) } func TestTagRouterRouteForce(t *testing.T) { + notify := make(chan struct{}) + go func() { + for range notify { + } + }() u1, e1 := common.NewURL(tagRouterTestUserConsumerTag) assert.Nil(t, e1) - tagRouter, e := NewTagRouter(u1) + tagRouter, e := NewTagRouter(u1, notify) assert.Nil(t, e) u2, e2 := common.NewURL(tagRouterTestHangZhouUrl) @@ -161,7 +171,12 @@ func TestTagRouterRouteForce(t *testing.T) { func TestTagRouterRouteNoForce(t *testing.T) { u1, e1 := common.NewURL(tagRouterTestUserConsumer) assert.Nil(t, e1) - tagRouter, e := NewTagRouter(u1) + notify := make(chan struct{}) + go func() { + for range notify { + } + }() + tagRouter, e := NewTagRouter(u1, notify) assert.Nil(t, e) u2, e2 := common.NewURL(tagRouterTestHangZhouUrl) @@ -224,7 +239,12 @@ func TestRouteBeijingInvoker(t *testing.T) { invokers = append(invokers, inv2, inv3, inv4, inv5) url, _ := common.NewURL(tagRouterTestBeijingUrl) - tagRouter, _ := NewTagRouter(url) + notify := make(chan struct{}) + go func() { + for range notify { + } + }() + tagRouter, _ := NewTagRouter(url, notify) rb := roaring.NewBitmap() rb.AddRange(0, uint64(len(invokers))) @@ -300,7 +320,12 @@ tags: url, e1 := common.NewURL(tagRouterTestUserConsumerTag) suite.Nil(e1) - tagRouter, err := NewTagRouter(url) + notify := make(chan struct{}) + go func() { + for range notify { + } + }() + tagRouter, err := NewTagRouter(url, notify) suite.Nil(err) suite.NotNil(tagRouter) suite.route = tagRouter @@ -365,7 +390,12 @@ func (suite *DynamicTagRouter) TestDynamicTagRouterByNoTagAndAddressMatch() { func TestProcess(t *testing.T) { u1, err := common.NewURL(tagRouterTestUserConsumerTag) assert.Nil(t, err) - tagRouter, e := NewTagRouter(u1) + notify := make(chan struct{}) + go func() { + for range notify { + } + }() + tagRouter, e := NewTagRouter(u1, notify) assert.Nil(t, e) assert.NotNil(t, tagRouter) @@ -383,12 +413,17 @@ tags: - name: hangzhou addresses: [192.168.1.3, 192.168.1.4] ` + go func() { + for range notify { + } + }() tagRouter.Process(&config_center.ConfigChangeEvent{Value: testYML, ConfigType: remoting.EventTypeAdd}) assert.NotNil(t, tagRouter.tagRouterRule) assert.Equal(t, []string{"beijing", "hangzhou"}, tagRouter.tagRouterRule.getTagNames()) assert.Equal(t, []string{"192.168.1.1", "192.168.1.2", "192.168.1.3", "192.168.1.4"}, tagRouter.tagRouterRule.getAddresses()) assert.Equal(t, []string{"192.168.1.3", "192.168.1.4"}, tagRouter.tagRouterRule.getTagNameToAddresses()["hangzhou"]) assert.Equal(t, []string{"beijing"}, tagRouter.tagRouterRule.getAddressToTagNames()["192.168.1.1"]) + tagRouter.Process(&config_center.ConfigChangeEvent{ConfigType: remoting.EventTypeDel}) assert.Nil(t, tagRouter.tagRouterRule) } diff --git a/common/constant/key.go b/common/constant/key.go index 0515094f28..3793f2e290 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -91,6 +91,7 @@ const ( RETRY_PERIOD_KEY = "retry.period" RETRY_TIMES_KEY = "retry.times" CYCLE_REPORT_KEY = "cycle.report" + DEFAULT_BLACK_LIST_RECOVER_BLOCK = 16 ) const ( @@ -214,6 +215,8 @@ const ( ListenableRouterName = "listenable" // HealthCheckRouterName Specify the name of HealthCheckRouter HealthCheckRouterName = "health_check" + // ConnCheckRouterName Specify the name of ConnCheckRouter + ConnCheckRouterName = "conn_check" // TagRouterName Specify the name of TagRouter TagRouterName = "tag" // TagRouterRuleSuffix Specify tag router suffix @@ -239,8 +242,9 @@ const ( // ForceUseTag is the tag in attachment ForceUseTag = "dubbo.force.tag" Tagkey = "dubbo.tag" - - // Attachment key in context in invoker + // HEALTH_ROUTE_ENABLED_KEY defines if use health router + HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled" + // AttachmentKey in context in invoker AttachmentKey = DubboCtxKey("attachment") ) @@ -296,7 +300,9 @@ const ( HEALTH_CHECKER = "health.checker" // The name of the default implementation of HealthChecker DEFAULT_HEALTH_CHECKER = "default" - // The key of outstanding-request-limit + // The name of the default implementation of C + DEFAULT_CONN_CHECKER = "default" + // The key of outstanding-request-limit\ OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit" // The key of successive-failed-request's threshold SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold" diff --git a/common/extension/conn_checker.go b/common/extension/conn_checker.go new file mode 100644 index 0000000000..fbd9e34b23 --- /dev/null +++ b/common/extension/conn_checker.go @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" +) + +var ( + connCheckers = make(map[string]func(url *common.URL) router.ConnChecker) +) + +// SetHealthChecker sets the HealthChecker with @name +func SetConnChecker(name string, fcn func(_ *common.URL) router.ConnChecker) { + connCheckers[name] = fcn +} + +// GetHealthChecker gets the HealthChecker with @name +func GetConnChecker(name string, url *common.URL) router.ConnChecker { + f, ok := connCheckers[name] + if !ok || f == nil { + panic("connCheckers for " + name + " is not existing, make sure you have import the package.") + } + return connCheckers[name](url) +} diff --git a/config/reference_config.go b/config/reference_config.go index 895ab9df26..8a830ddba2 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -100,9 +100,7 @@ func (c *ReferenceConfig) Refer(_ interface{}) { if c.ForceTag { cfgURL.AddParam(constant.ForceUseTag, "true") } - c.postProcessConfig(cfgURL) - if c.Url != "" { // 1. user specified URL, could be peer-to-peer address, or register center's address. urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*") diff --git a/protocol/rpc_status.go b/protocol/rpc_status.go index 8d443e84f0..8fc5ddd76e 100644 --- a/protocol/rpc_status.go +++ b/protocol/rpc_status.go @@ -23,15 +23,28 @@ import ( "time" ) +import ( + uberAtomic "go.uber.org/atomic" +) + import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" ) var ( - methodStatistics sync.Map // url -> { methodName : RPCStatus} - serviceStatistic sync.Map // url -> RPCStatus + methodStatistics sync.Map // url -> { methodName : RPCStatus} + serviceStatistic sync.Map // url -> RPCStatus + invokerBlackList sync.Map // store unhealthy url blackList + blackListCacheDirty uberAtomic.Bool // store if the cache in chain is not refreshed by blacklist + blackListRefreshing int32 // store if the refresing method is processing ) +func init() { + blackListCacheDirty.Store(false) +} + // RPCStatus is URL statistics. type RPCStatus struct { active int32 @@ -181,4 +194,82 @@ func CleanAllStatus() { return true } serviceStatistic.Range(delete2) + delete3 := func(key, _ interface{}) bool { + invokerBlackList.Delete(key) + return true + } + invokerBlackList.Range(delete3) +} + +// GetInvokerHealthyStatus get invoker's conn healthy status +func GetInvokerHealthyStatus(invoker Invoker) bool { + _, found := invokerBlackList.Load(invoker.GetUrl().Key()) + return !found +} + +// SetInvokerUnhealthyStatus add target invoker to black list +func SetInvokerUnhealthyStatus(invoker Invoker) { + invokerBlackList.Store(invoker.GetUrl().Key(), invoker) + logger.Info("Add invoker ip = ", invoker.GetUrl().Location, " to black list") + blackListCacheDirty.Store(true) +} + +// RemoveInvokerUnhealthyStatus remove unhealthy status of target invoker from blacklist +func RemoveInvokerUnhealthyStatus(invoker Invoker) { + invokerBlackList.Delete(invoker.GetUrl().Key()) + logger.Info("Remove invoker ip = ", invoker.GetUrl().Location, " from black list") + blackListCacheDirty.Store(true) +} + +// GetBlackListInvokers get at most size of blockSize invokers from black list +func GetBlackListInvokers(blockSize int) []Invoker { + resultIvks := make([]Invoker, 0, 16) + invokerBlackList.Range(func(k, v interface{}) bool { + resultIvks = append(resultIvks, v.(Invoker)) + return true + }) + if blockSize > len(resultIvks) { + return resultIvks + } + return resultIvks[:blockSize] +} + +// RemoveUrlKeyUnhealthyStatus called when event of provider unregister, delete from black list +func RemoveUrlKeyUnhealthyStatus(key string) { + invokerBlackList.Delete(key) + logger.Info("Remove invoker key = ", key, " from black list") + blackListCacheDirty.Store(true) +} + +func GetAndRefreshState() bool { + state := blackListCacheDirty.Load() + blackListCacheDirty.Store(false) + return state +} + +// TryRefreshBlackList start 3 gr to check at most block=16 invokers in black list +// if target invoker is available, then remove it from black list +func TryRefreshBlackList() { + if atomic.CompareAndSwapInt32(&blackListRefreshing, 0, 1) { + wg := sync.WaitGroup{} + defer func() { + atomic.CompareAndSwapInt32(&blackListRefreshing, 1, 0) + }() + + ivks := GetBlackListInvokers(constant.DEFAULT_BLACK_LIST_RECOVER_BLOCK) + logger.Debug("blackList len = ", len(ivks)) + + for i := 0; i < 3; i++ { + wg.Add(1) + go func(ivks []Invoker, i int) { + defer wg.Done() + for j, _ := range ivks { + if j%3-i == 0 && ivks[j].(Invoker).IsAvailable() { + RemoveInvokerUnhealthyStatus(ivks[i]) + } + } + }(ivks, i) + } + wg.Wait() + } } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 1b607351af..a392b3a042 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -326,6 +326,7 @@ func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker { logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", key) + protocol.RemoveUrlKeyUnhealthyStatus(key) if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); ok { dir.cacheInvokersMap.Delete(key) return cacheInvoker.(protocol.Invoker)