Skip to content

Commit

Permalink
modify the logic when add() or done() of wait group in zk,etcd and k8…
Browse files Browse the repository at this point in the history
…s registry
  • Loading branch information
yexiaobo committed Jul 19, 2021
1 parent fd54619 commit 64fbfbd
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 27 deletions.
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDH
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/dubbo-getty v1.4.3 h1:PCKpryDasKOxwT5MBC6MIMO+0NLOaHF6Xco9YXQw7HI=
github.com/apache/dubbo-getty v1.4.3/go.mod h1:ansXgKxxyhCOiQL29nO5ce1MDcEKmCyZuNR9oMs3hek=
github.com/apache/dubbo-getty v1.4.4 h1:pthYQaCXyjHJ6/SjVwKkX5NhdAqSpUrRL1Z9GowrLdE=
github.com/apache/dubbo-getty v1.4.4/go.mod h1:mcDyiu7M/TVrYDyL8TxDemQkOdvEqqHSQ4jOuYejY1w=
github.com/apache/dubbo-go-hessian2 v1.9.1/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE=
github.com/apache/dubbo-go-hessian2 v1.9.2 h1:XuI8KvENSfKiAhiCBS4RNihmQDoPNmGWKT3gTui0p9A=
github.com/apache/dubbo-go-hessian2 v1.9.2/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE=
Expand Down Expand Up @@ -130,7 +128,6 @@ github.com/dubbogo/go-zookeeper v1.0.3 h1:UkuY+rBsxdT7Bs63QAzp9z7XqQ53W1j8E5rwl8
github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
github.com/dubbogo/gost v1.11.12/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.13 h1:sWvK1QbHpPBMmRQJV9qIH3syLegQBQa4xAPof3/Kv5c=
github.com/dubbogo/gost v1.11.13/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
Expand Down Expand Up @@ -430,7 +427,6 @@ github.com/modern-go/reflect2 v0.0.0-20180320133207-05fbef0ca5da/go.mod h1:bx2lN
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
Expand Down
6 changes: 3 additions & 3 deletions registry/base_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func (r *BaseRegistry) Destroy() {
// first step close registry's all listeners
r.facadeBasedRegistry.CloseListener()
// then close r.done to notify other program who listen to it
close(r.done)
close(r.Done())
// wait waitgroup done (wait listeners outside close over)
r.wg.Wait()
r.WaitGroup().Wait()

// close registry client
r.closeRegisters()
Expand Down Expand Up @@ -474,7 +474,7 @@ func (r *BaseRegistry) closeRegisters() {
// IsAvailable judge to is registry not closed by chan r.done
func (r *BaseRegistry) IsAvailable() bool {
select {
case <-r.done:
case <-r.Done():
return false
default:
return true
Expand Down
5 changes: 2 additions & 3 deletions registry/kubernetes/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {
return nil, perrors.WithStack(err)
}

r.WaitGroup().Add(1)
go r.HandleClientRestart()
r.InitListeners()

Expand Down Expand Up @@ -191,12 +190,12 @@ func newMockKubernetesRegistry(

// HandleClientRestart will reconnect to kubernetes registry center
func (r *kubernetesRegistry) HandleClientRestart() {
r.WaitGroup().Add(1)
defer r.WaitGroup().Done()
var (
err error
failTimes int
)

defer r.WaitGroup().Done()
LOOP:
for {
select {
Expand Down
2 changes: 0 additions & 2 deletions registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
return nil, err
}

r.WaitGroup().Add(1) //zk client start successful, then wg +1
go zookeeper.HandleClientRestart(r)

r.listener = zookeeper.NewZkEventListener(r.client)
Expand Down Expand Up @@ -108,7 +107,6 @@ func newMockZkRegistry(url *common.URL, opts ...gxzookeeper.Option) (*zk.TestClu
if err != nil {
return nil, nil, err
}
r.WaitGroup().Add(1) // zk client start successful, then wg +1
go zookeeper.HandleClientRestart(r)
r.InitListeners()
return c, r, nil
Expand Down
27 changes: 12 additions & 15 deletions remoting/etcdv3/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,19 @@ type clientFacade interface {
common.Node
}

var restartOnce sync.Once

// HandleClientRestart keeps the connection between client and server
func HandleClientRestart(r clientFacade) {
restartOnce.Do(func() {
defer r.WaitGroup().Done()
for {
select {
case <-r.Client().GetCtx().Done():
r.RestartCallBack()
// re-register all services
time.Sleep(10 * time.Microsecond)
case <-r.Done():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...")
return
}
r.WaitGroup().Add(1)
defer r.WaitGroup().Done()
for {
select {
case <-r.Client().GetCtx().Done():
r.RestartCallBack()
// re-register all services
time.Sleep(10 * time.Microsecond)
case <-r.Done():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...")
return
}
})
}
}
1 change: 1 addition & 0 deletions remoting/zookeeper/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type ZkClientFacade interface {

// HandleClientRestart keeps the connection between client and server
func HandleClientRestart(r ZkClientFacade) {
r.WaitGroup().Add(1)
defer r.WaitGroup().Done()
for {
select {
Expand Down

0 comments on commit 64fbfbd

Please sign in to comment.