Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: event driven chain cache #976

Merged
merged 51 commits into from
Feb 7, 2021
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
49b71c0
fix: add health check blace list
Dec 9, 2020
a707429
fix: delete unused comment
Dec 9, 2020
353391f
fmt file
Dec 9, 2020
b3eff92
fix: add remove from blacklist procedure when unregistry
Dec 11, 2020
72e3dff
fix: change gost version to 1.10
Dec 11, 2020
e958d69
fix: add connCheck and fix refresh logic
Dec 18, 2020
f84b8fc
fix: use logger
Dec 23, 2020
f7073cf
fix: back to 1.9.5 gost
Dec 23, 2020
1518966
fix: add test and fix fmt
Dec 23, 2020
e7a0e1c
Merge branch 'develop' into fix/healthCheck
Dec 23, 2020
7d0f479
fix: solve merge conflict
Dec 23, 2020
2831e35
fix: go fmt project
Dec 23, 2020
ae8b2ae
fix: go mod tidy
Dec 27, 2020
e2d57e7
fix: fix merge conflict
Dec 27, 2020
0b7206b
fix: go mod tidy
Dec 27, 2020
7438599
fix: change test file
Dec 28, 2020
daa66ea
Merge branch 'develop' into fix/healthCheck
Dec 28, 2020
7dfa6be
fix: add health check test file
Dec 28, 2020
ac7acc7
fix: fix merge conflict
Dec 28, 2020
8e92b2d
fix: change from trigger to event driven refresh cache
Dec 28, 2020
f466966
ticker refresh chain cache only when needed
Dec 29, 2020
c822cfa
fix: go fmt file
Dec 29, 2020
c1cc113
fix: add router change refresh cache logic
Dec 30, 2020
482f9dc
feat: router change event driven chain cache refresh
Dec 31, 2020
c634ab2
fix: new base directory func add default chain
Dec 31, 2020
4bce431
fix: base directory_test good
Dec 31, 2020
f5030fe
fix: fix test and add another gr to push notify
Dec 31, 2020
c5532cb
fix test notify bug
Dec 31, 2020
b812794
fix test notify bug
Dec 31, 2020
5fe6029
fix: add notify test
Dec 31, 2020
b6022bb
fix ctx linter error
AlexStocks Jan 4, 2021
a856678
fix: conflict
LaurenceLiZhixin Jan 4, 2021
b5784d2
fix: fix err
LaurenceLiZhixin Jan 4, 2021
e5bee95
fix: lint
LaurenceLiZhixin Jan 7, 2021
84bfb9d
fix: lint
LaurenceLiZhixin Jan 7, 2021
a15546a
fix: lint
LaurenceLiZhixin Jan 7, 2021
d305b67
Merge remote-tracking branch 'upstream/develop' into develop
LaurenceLiZhixin Jan 10, 2021
5919b11
Merge remote-tracking branch 'upstream/develop' into develop
LaurenceLiZhixin Jan 13, 2021
ed0b174
fix
LaurenceLiZhixin Jan 17, 2021
be89b70
fix: chinglish
LaurenceLiZhixin Jan 17, 2021
3dc2f17
Merge remote-tracking branch 'upstream/develop' into develop
LaurenceLiZhixin Jan 17, 2021
833c829
fix
LaurenceLiZhixin Jan 17, 2021
add1618
fix: conflict
LaurenceLiZhixin Jan 17, 2021
4f3d161
fix: delete logic of invocation get invoker
LaurenceLiZhixin Jan 22, 2021
30e36d0
Merge branch 'develop' into fix/eventDrivenChainCache
AlexStocks Jan 23, 2021
2c2c35f
fix: split import block
LaurenceLiZhixin Jan 24, 2021
569bacd
Merge branch 'fix/eventDrivenChainCache' of https://github.com/Lauren…
LaurenceLiZhixin Jan 24, 2021
25c1e44
fix: add log to show blacklist changes
LaurenceLiZhixin Jan 26, 2021
e5d53ca
fix: change notify chain
LaurenceLiZhixin Feb 3, 2021
edd99f5
fix: range
LaurenceLiZhixin Feb 3, 2021
93094ed
fix: cr
LaurenceLiZhixin Feb 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 36 additions & 19 deletions cluster/cluster_impl/base_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,38 +121,45 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p

func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
if len(invokers) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if need warn log ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方比较有意思,是改了个已有的坑。
invocation.Invoker()是个nil,所以一走到这一行就会panic。
后来发现这里不会被执行到。不存在invokers列表为空的情况。所以决定删掉这行日志了。

logger.Errorf("the invokers of %s is nil. ", invocation.Invoker().GetUrl().ServiceKey())
return nil
}
go protocol.TryRefreshBlackList()
if len(invokers) == 1 {
return invokers[0]
if invokers[0].IsAvailable() {
return invokers[0]
}
protocol.SetInvokerUnhealthyStatus(invokers[0])
logger.Errorf("the invokers of %s is nil. ", invokers[0].GetUrl().ServiceKey())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个错误日志看起来跟上下文无关,或者是否应该放在 SetInvokerUnhealthyStatus 里更合适

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里位置主要是针对invokers列表因为健康检查导致为空时,打印出对应失败的key。

return nil
}

selectedInvoker := lb.Select(invokers, invocation)

//judge to if the selectedInvoker is invoked

//judge if the selected Invoker is invoked and available
if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {
protocol.SetInvokerUnhealthyStatus(selectedInvoker)
otherInvokers := getOtherInvokers(invokers, selectedInvoker)
// do reselect
var reslectInvokers []protocol.Invoker

for _, invoker := range invokers {
if !invoker.IsAvailable() {
for i := 0; i < 3; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

当整个循环次数超过 3 次,最后还是会返回最开始不可用的 selectedInvoker ,是否符合预期

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you r right, I've fixed it.

if len(otherInvokers) == 0 {
// no other ivk to reselect, return to fallback
logger.Errorf("all %d invokers is unavailable for %s.", len(invokers), selectedInvoker.GetUrl().String())
return nil
}
reselectedInvoker := lb.Select(otherInvokers, invocation)
if isInvoked(reselectedInvoker, invoked) {
otherInvokers = getOtherInvokers(otherInvokers, reselectedInvoker)
continue
}
if !reselectedInvoker.IsAvailable() {
logger.Infof("the invoker of %s is not available, maybe some network error happened or the server is shutdown.",
invoker.GetUrl().Ip)
protocol.SetInvokerUnhealthyStatus(reselectedInvoker)
otherInvokers = getOtherInvokers(otherInvokers, reselectedInvoker)
continue
}

if !isInvoked(invoker, invoked) {
reslectInvokers = append(reslectInvokers, invoker)
}
}

if len(reslectInvokers) > 0 {
selectedInvoker = lb.Select(reslectInvokers, invocation)
} else {
logger.Errorf("all %d invokers is unavailable for %s.", len(invokers), selectedInvoker.GetUrl().String())
return nil
selectedInvoker = reselectedInvoker
break
}
}
return selectedInvoker
Expand Down Expand Up @@ -194,3 +201,13 @@ func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cl
}
return extension.GetLoadbalance(lb)
}

func getOtherInvokers(invokers []protocol.Invoker, invoker protocol.Invoker) []protocol.Invoker {
otherInvokers := make([]protocol.Invoker, 0)
for _, i := range invokers {
if i != invoker {
otherInvokers = append(otherInvokers, i)
}
}
return otherInvokers
}
7 changes: 7 additions & 0 deletions cluster/cluster_impl/failback_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func TestFailbackSuceess(t *testing.T) {

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()

invoker.EXPECT().IsAvailable().Return(true)

mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)

Expand All @@ -88,6 +90,7 @@ func TestFailbackRetryOneSuccess(t *testing.T) {
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true)

// failed at first
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
Expand All @@ -98,6 +101,7 @@ func TestFailbackRetryOneSuccess(t *testing.T) {
wg.Add(1)
now := time.Now()
mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(protocol.Invocation) protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5)
Expand Down Expand Up @@ -131,6 +135,7 @@ func TestFailbackRetryFailed(t *testing.T) {
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()

mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult)
Expand Down Expand Up @@ -177,6 +182,7 @@ func TestFailbackRetryFailed10Times(t *testing.T) {
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
clusterInvoker.maxRetries = 10

invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()

// 10 task should failed firstly.
Expand Down Expand Up @@ -220,6 +226,7 @@ func TestFailbackOutOfLimit(t *testing.T) {
clusterInvoker.failbackTasks = 1

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()

mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(11)
Expand Down
3 changes: 3 additions & 0 deletions cluster/cluster_impl/failfast_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func registerFailfast(invoker *mock.MockInvoker) protocol.Invoker {
invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)

invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failfastUrl)

staticDir := directory.NewStaticDirectory(invokers)
Expand All @@ -67,6 +68,7 @@ func TestFailfastInvokeSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(invoker)

invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()

mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
Expand All @@ -87,6 +89,7 @@ func TestFailfastInvokeFail(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailfast(invoker)

invoker.EXPECT().IsAvailable().Return(true).AnyTimes()
invoker.EXPECT().GetUrl().Return(failfastUrl).AnyTimes()

mockResult := &protocol.RPCResult{Err: perrors.New("error")}
Expand Down
4 changes: 4 additions & 0 deletions cluster/cluster_impl/failsafe_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func registerFailsafe(invoker *mock.MockInvoker) protocol.Invoker {

invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()

invoker.EXPECT().GetUrl().Return(failbackUrl)

Expand All @@ -67,6 +68,8 @@ func TestFailSafeInvokeSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailsafe(invoker)

invoker.EXPECT().IsAvailable().Return(true).AnyTimes()

invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()

mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
Expand All @@ -85,6 +88,7 @@ func TestFailSafeInvokeFail(t *testing.T) {

invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailsafe(invoker)
invoker.EXPECT().IsAvailable().Return(true).AnyTimes()

invoker.EXPECT().GetUrl().Return(failsafeUrl).AnyTimes()

Expand Down
8 changes: 4 additions & 4 deletions cluster/directory/base_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {

routers := make([]router.PriorityRouter, 0, len(urls))

rc := dir.routerChain

for _, url := range urls {
routerKey := url.GetParam(constant.ROUTER_KEY, "")

Expand All @@ -94,7 +96,7 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
}
}
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewPriorityRouter(url)
r, err := factory.NewPriorityRouter(url, rc.GetNotifyChan())
if err != nil {
logger.Errorf("Create router fail. router key: %s, url:%s, error: %+v", routerKey, url.Service(), err)
return
Expand All @@ -104,10 +106,8 @@ func (dir *BaseDirectory) SetRouters(urls []*common.URL) {

logger.Infof("Init file condition router success, size: %v", len(routers))
dir.mutex.Lock()
rc := dir.routerChain
dir.mutex.Unlock()

rc.AddRouters(routers)
dir.mutex.Unlock()
}

func (dir *BaseDirectory) isProperRouter(url *common.URL) bool {
Expand Down
5 changes: 4 additions & 1 deletion cluster/directory/base_directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
)

import (
"github.com/apache/dubbo-go/cluster/router/chain"
_ "github.com/apache/dubbo-go/cluster/router/condition"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
Expand All @@ -50,7 +51,9 @@ func TestBuildRouterChain(t *testing.T) {
regURL := url
regURL.AddParam(constant.INTERFACE_KEY, "mock-app")
directory := NewBaseDirectory(regURL)

var err error
directory.routerChain, err = chain.NewRouterChain(regURL)
assert.Nil(t, err)
localIP := common.GetLocalIp()
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
routeURL := getRouteURL(rule, anyURL)
Expand Down
Empty file removed cluster/router/.gitkeep
Empty file.
2 changes: 2 additions & 0 deletions cluster/router/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ type Chain interface {
SetInvokers([]protocol.Invoker)
// AddRouters Add routers
AddRouters([]PriorityRouter)
// GetNotifyChan get notify channel of this chain
GetNotifyChan() chan struct{}
}
60 changes: 28 additions & 32 deletions cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package chain
import (
"sort"
"sync"
"sync/atomic"
"time"
)

import (
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)

import (
Expand All @@ -38,9 +38,7 @@ import (
)

const (
timeInterval = 5 * time.Second
timeThreshold = 2 * time.Second
countThreshold = 5
timeInterval = 5 * time.Second
)

// RouterChain Router chain
Expand All @@ -65,8 +63,10 @@ type RouterChain struct {
notify chan struct{}
// Address cache
cache atomic.Value
// init
init sync.Once
}

func (c *RouterChain) GetNotifyChan() chan struct{} {
return c.notify
}

// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
Expand Down Expand Up @@ -104,6 +104,9 @@ func (c *RouterChain) AddRouters(routers []router.PriorityRouter) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.routers = newRouters
go func() {
c.notify <- struct{}{}
}()
}

// SetInvokers receives updated invokers from registry center. If the times of notification exceeds countThreshold and
Expand All @@ -113,32 +116,21 @@ func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
c.invokers = invokers
c.mutex.Unlock()

// it should trigger init router for first call
c.init.Do(func() {
go func() {
c.notify <- struct{}{}
}()
})

c.count++
now := time.Now()
if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
c.last = now
c.count = 0
go func() {
c.notify <- struct{}{}
}()
}
go func() {
c.notify <- struct{}{}
}()
}

// loop listens on events to update the address cache when it's necessary, either when it receives notification
// from address update, or when timeInterval exceeds.
// loop listens on events to update the address cache when it receives notification
// from address update,
func (c *RouterChain) loop() {
ticker := time.NewTicker(timeInterval)
for {
select {
case <-ticker.C:
c.buildCache()
if protocol.GetAndRefreshState() {
c.buildCache()
}
case <-c.notify:
c.buildCache()
}
Expand Down Expand Up @@ -235,9 +227,15 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
if len(routerFactories) == 0 {
return nil, perrors.Errorf("No routerFactory exits , create one please")
}

chain := &RouterChain{
last: time.Now(),
notify: make(chan struct{}),
}

routers := make([]router.PriorityRouter, 0, len(routerFactories))
for key, routerFactory := range routerFactories {
r, err := routerFactory().NewPriorityRouter(url)
r, err := routerFactory().NewPriorityRouter(url, chain.notify)
if r == nil || err != nil {
logger.Errorf("router chain build router fail! routerFactories key:%s error:%s", key, err.Error())
continue
Expand All @@ -250,12 +248,10 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {

sortRouter(newRouters)

chain := &RouterChain{
builtinRouters: routers,
routers: newRouters,
last: time.Now(),
notify: make(chan struct{}),
}
routerNeedsUpdateInit := atomic.Bool{}
routerNeedsUpdateInit.Store(false)
chain.routers = newRouters
chain.builtinRouters = routers
if url != nil {
chain.url = url
}
Expand Down
8 changes: 7 additions & 1 deletion cluster/router/chain/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,13 @@ conditions:
url := getConditionRouteUrl(applicationKey)
assert.NotNil(t, url)
factory := extension.GetRouterFactory(url.Protocol)
r, err := factory.NewPriorityRouter(url)
notify := make(chan struct{})
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for range ch {
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed~

for {
<-notify
}
}()
r, err := factory.NewPriorityRouter(url, notify)
assert.Nil(t, err)
assert.NotNil(t, r)

Expand Down
5 changes: 3 additions & 2 deletions cluster/router/condition/app_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ const (
// AppRouter For listen application router with config center
type AppRouter struct {
listenableRouter
notify interface{}
}

// NewAppRouter Init AppRouter by url
func NewAppRouter(url *common.URL) (*AppRouter, error) {
func NewAppRouter(url *common.URL, notify chan struct{}) (*AppRouter, error) {
if url == nil {
return nil, perrors.Errorf("No route URL for create app router!")
}
appRouter, err := newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, ""))
appRouter, err := newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, ""), notify)
if err != nil {
return nil, err
}
Expand Down
Loading