diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index c25028d58f..d81c7523c4 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -74,11 +74,12 @@ type RegistryConfigurationListener struct { client *zk.ZookeeperClient registry *zkRegistry events chan *config_center.ConfigChangeEvent + isClosed bool } func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { reg.wg.Add(1) - return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} + return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false} } func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) { l.events <- configType @@ -114,6 +115,7 @@ func (l *RegistryConfigurationListener) Close() { * if the registry is not available, it means that the registry has been destroy * so we don't need to call Done(), or it will cause the negative count panic for registry.wg */ + l.isClosed = true l.registry.wg.Done() } } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 29ae51d44f..e41991556a 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -46,6 +46,7 @@ import ( const ( RegistryZkClient = "zk registry" RegistryConnDelay = 3 + MaxWaitInterval = time.Duration(3e9) ) var ( @@ -200,6 +201,10 @@ func (r *zkRegistry) RestartCallBack() bool { } logger.Infof("success to re-register service :%v", confIf.Key()) } + r.listener = zookeeper.NewZkEventListener(r.client) + r.configListener = NewRegistryConfigurationListener(r.client, r) + r.dataListener = NewRegistryDataListener(r.configListener) + return flag } @@ -399,10 +404,19 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { func (r *zkRegistry) subscribe(conf *common.URL) (registry.Listener, error) { return r.getListener(conf) } +func sleepWait(n int) { + wait := time.Duration((n + 1) * 2e8) + if wait > MaxWaitInterval { + wait = MaxWaitInterval + } + time.Sleep(wait) +} //subscribe from registry func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { + n := 0 for { + n++ if !r.IsAvailable() { logger.Warnf("event listener game over.") return @@ -423,14 +437,14 @@ func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyLi if serviceEvent, err := listener.Next(); err != nil { logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) listener.Close() - return + break } else { logger.Infof("update begin, service event: %v", serviceEvent.String()) notifyListener.Notify(serviceEvent) } } - + sleepWait(n) } } @@ -440,6 +454,10 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen ) r.listenerLock.Lock() + if r.configListener.isClosed { + r.listenerLock.Unlock() + return nil, perrors.New("configListener already been closed") + } zkListener = r.configListener r.listenerLock.Unlock() if r.listener == nil {