Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:#1143 Feature/reduce etcd registry conn; wait group modify #1297

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDH
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/dubbo-getty v1.4.3 h1:PCKpryDasKOxwT5MBC6MIMO+0NLOaHF6Xco9YXQw7HI=
github.com/apache/dubbo-getty v1.4.3/go.mod h1:ansXgKxxyhCOiQL29nO5ce1MDcEKmCyZuNR9oMs3hek=
github.com/apache/dubbo-getty v1.4.4 h1:pthYQaCXyjHJ6/SjVwKkX5NhdAqSpUrRL1Z9GowrLdE=
github.com/apache/dubbo-getty v1.4.4/go.mod h1:mcDyiu7M/TVrYDyL8TxDemQkOdvEqqHSQ4jOuYejY1w=
github.com/apache/dubbo-go-hessian2 v1.9.1/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE=
github.com/apache/dubbo-go-hessian2 v1.9.2 h1:XuI8KvENSfKiAhiCBS4RNihmQDoPNmGWKT3gTui0p9A=
github.com/apache/dubbo-go-hessian2 v1.9.2/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE=
Expand Down Expand Up @@ -130,7 +128,6 @@ github.com/dubbogo/go-zookeeper v1.0.3 h1:UkuY+rBsxdT7Bs63QAzp9z7XqQ53W1j8E5rwl8
github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
github.com/dubbogo/gost v1.11.12/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.13 h1:sWvK1QbHpPBMmRQJV9qIH3syLegQBQa4xAPof3/Kv5c=
github.com/dubbogo/gost v1.11.13/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
Expand Down Expand Up @@ -430,7 +427,6 @@ github.com/modern-go/reflect2 v0.0.0-20180320133207-05fbef0ca5da/go.mod h1:bx2lN
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
Expand Down
6 changes: 3 additions & 3 deletions registry/base_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func (r *BaseRegistry) Destroy() {
// first step close registry's all listeners
r.facadeBasedRegistry.CloseListener()
// then close r.done to notify other program who listen to it
close(r.done)
close(r.Done())
// wait waitgroup done (wait listeners outside close over)
r.wg.Wait()
r.WaitGroup().Wait()

// close registry client
r.closeRegisters()
Expand Down Expand Up @@ -474,7 +474,7 @@ func (r *BaseRegistry) closeRegisters() {
// IsAvailable judge to is registry not closed by chan r.done
func (r *BaseRegistry) IsAvailable() bool {
select {
case <-r.done:
case <-r.Done():
return false
default:
return true
Expand Down
1 change: 0 additions & 1 deletion registry/etcdv3/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
); err != nil {
return nil, err
}
r.WaitGroup().Add(1) // etcdv3 client start successful, then wg +1

go etcdv3.HandleClientRestart(r)

Expand Down
5 changes: 2 additions & 3 deletions registry/kubernetes/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {
return nil, perrors.WithStack(err)
}

r.WaitGroup().Add(1)
go r.HandleClientRestart()
r.InitListeners()

Expand Down Expand Up @@ -191,12 +190,12 @@ func newMockKubernetesRegistry(

// HandleClientRestart will reconnect to kubernetes registry center
func (r *kubernetesRegistry) HandleClientRestart() {
r.WaitGroup().Add(1)
defer r.WaitGroup().Done()
var (
err error
failTimes int
)

defer r.WaitGroup().Done()
LOOP:
for {
select {
Expand Down
2 changes: 0 additions & 2 deletions registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
return nil, err
}

r.WaitGroup().Add(1) //zk client start successful, then wg +1
go zookeeper.HandleClientRestart(r)

r.listener = zookeeper.NewZkEventListener(r.client)
Expand Down Expand Up @@ -108,7 +107,6 @@ func newMockZkRegistry(url *common.URL, opts ...gxzookeeper.Option) (*zk.TestClu
if err != nil {
return nil, nil, err
}
r.WaitGroup().Add(1) // zk client start successful, then wg +1
go zookeeper.HandleClientRestart(r)
r.InitListeners()
return c, r, nil
Expand Down
53 changes: 6 additions & 47 deletions remoting/etcdv3/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@
package etcdv3

import (
getty "github.com/apache/dubbo-getty"
"sync"
"time"
)

import (
gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
)

Expand All @@ -46,55 +43,17 @@ type clientFacade interface {

// HandleClientRestart keeps the connection between client and server
func HandleClientRestart(r clientFacade) {
var (
err error
failTimes int
)

r.WaitGroup().Add(1)
defer r.WaitGroup().Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r.WaitGroup()的done,wait和add使用散落在各处,remoting作为基础组件,是否可能有同学直接调用 HandleClientRestart ,然后函数结束出现 negative 的问题?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有可能,用once限制一下HandleClientRestart的调用次数是否可以呢;该方法在init包被引入的时候调用,只调用一次。

LOOP:
for {
select {
case <-r.Client().GetCtx().Done():
r.RestartCallBack()
// re-register all services
time.Sleep(10 * time.Microsecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么这里要Sleep?
我看RestartCallBack的逻辑是重新注册,下面的逻辑是重建client,并且有重试逻辑。使用RestartCallBack的目的是什么呢?

Copy link
Contributor Author

@WilliamLeaves WilliamLeaves Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

代码实现参照zk对于连接过多的优化,做了类似的实现。
<-GetCtx().Done()意味着已经断连,需要调用RestartCallBack重新注册

case <-r.Done():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...")
break LOOP
// re-register all services
case <-r.Client().Done():
r.ClientLock().Lock()
clientName := gxetcd.RegistryETCDV3Client
timeout, _ := time.ParseDuration(r.GetURL().GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
endpoints := r.Client().GetEndPoints()
r.Client().Close()
r.SetClient(nil)
r.ClientLock().Unlock()

// try to connect to etcd,
failTimes = 0
for {
after := getty.GetTimeWheel().After(timeSecondDuration(failTimes * gxetcd.ConnDelay))
select {
case <-r.Done():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...")
break LOOP
case <-after: // avoid connect frequent
}
err = ValidateClient(
r,
gxetcd.WithName(clientName),
gxetcd.WithEndpoints(endpoints...),
gxetcd.WithTimeout(timeout),
gxetcd.WithHeartbeat(1),
)
logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}",
endpoints, perrors.WithStack(err))
if err == nil && r.RestartCallBack() {
break
}
failTimes++
if gxetcd.MaxFailTimes <= failTimes {
failTimes = gxetcd.MaxFailTimes
}
}
return
}
}
}
1 change: 1 addition & 0 deletions remoting/zookeeper/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type ZkClientFacade interface {

// HandleClientRestart keeps the connection between client and server
func HandleClientRestart(r ZkClientFacade) {
r.WaitGroup().Add(1)
defer r.WaitGroup().Done()
for {
select {
Expand Down