Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: issue #951 etcd exit panic #1013

Merged
merged 2 commits into from
Jan 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions registry/etcdv3/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package etcdv3

import (
"strings"
"sync"
)

import (
Expand Down Expand Up @@ -79,8 +80,9 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {
}

type configurationListener struct {
registry *etcdV3Registry
events chan *config_center.ConfigChangeEvent
registry *etcdV3Registry
events chan *config_center.ConfigChangeEvent
closeOnce sync.Once
}

// NewConfigurationListener for listening the event of etcdv3.
Expand Down Expand Up @@ -120,5 +122,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {

// Close etcd registry center
func (l *configurationListener) Close() {
l.registry.WaitGroup().Done()
l.closeOnce.Do(func() {
l.registry.WaitGroup().Done()
})
}
19 changes: 5 additions & 14 deletions registry/etcdv3/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type etcdV3Registry struct {
registry.BaseRegistry
cltLock sync.Mutex
client *etcdv3.Client
listenerLock sync.Mutex
listenerLock sync.RWMutex
listener *etcdv3.EventListener
dataListener *dataListener
configListener *configurationListener
Expand Down Expand Up @@ -150,27 +150,18 @@ func (r *etcdV3Registry) CreatePath(k string) error {

// DoSubscribe actually subscribe the provider URL
func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) {

var (
configListener *configurationListener
)

r.listenerLock.Lock()
configListener = r.configListener
r.listenerLock.Unlock()
r.listenerLock.RLock()
configListener := r.configListener
r.listenerLock.RUnlock()
if r.listener == nil {
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("etcd client broken")
}

// new client & listener
listener := etcdv3.NewEventListener(r.client)

r.listenerLock.Lock()
r.listener = listener
r.listener = etcdv3.NewEventListener(r.client) // new client & listener
r.listenerLock.Unlock()
}

Expand Down
6 changes: 3 additions & 3 deletions remoting/etcdv3/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// nolint
type EventListener struct {
client *Client
keyMapLock sync.Mutex
keyMapLock sync.RWMutex
keyMap map[string]struct{}
wg sync.WaitGroup
}
Expand Down Expand Up @@ -181,9 +181,9 @@ func timeSecondDuration(sec int) time.Duration {
// --------> listenServiceNodeEvent
func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) {

l.keyMapLock.Lock()
l.keyMapLock.RLock()
_, ok := l.keyMap[key]
l.keyMapLock.Unlock()
l.keyMapLock.RUnlock()
if ok {
logger.Warnf("etcdv3 key %s has already been listened.", key)
return
Expand Down