From e499d1c81b3aaadb0a1871caa3ace0900313d3c0 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Mon, 1 Jun 2020 15:16:41 +0800 Subject: [PATCH 1/8] lock optimize --- registry/etcdv3/registry.go | 8 +++++++- registry/kubernetes/registry.go | 8 +++++++- registry/zookeeper/registry.go | 35 +++++++++++++++++++++++---------- remoting/zookeeper/client.go | 6 ------ 4 files changed, 39 insertions(+), 18 deletions(-) diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index a65d090349..a94352c665 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -119,8 +119,14 @@ func (r *etcdV3Registry) DoUnregister(root string, node string) error { } func (r *etcdV3Registry) CloseAndNilClient() { - r.client.Close() + r.cltLock.Lock() + client := r.client r.client = nil + r.cltLock.Unlock() + if client == nil { + return + } + client.Close() } func (r *etcdV3Registry) CloseListener() { diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go index 7ee0f6b0ee..4c59fc080b 100644 --- a/registry/kubernetes/registry.go +++ b/registry/kubernetes/registry.go @@ -81,8 +81,14 @@ func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) { } func (r *kubernetesRegistry) CloseAndNilClient() { - r.client.Close() + r.cltLock.Lock() + client := r.client r.client = nil + r.cltLock.Unlock() + if client == nil { + return + } + client.Close() } func (r *kubernetesRegistry) CloseListener() { diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 5d5f9e0526..fd6ef86e77 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -156,12 +156,15 @@ func (r *zkRegistry) DoRegister(root string, node string) error { } func (r *zkRegistry) DoUnregister(root string, node string) error { - r.cltLock.Lock() - defer r.cltLock.Unlock() - if !r.ZkClient().ZkConnValid() { + client := r.client + if client == nil { + return perrors.New("zk Client is null, can not process registerTempZookeeperNode ") + } + + if !client.ZkConnValid() { return perrors.Errorf("zk client is not valid.") } - return r.ZkClient().Delete(path.Join(root, node)) + return client.Delete(path.Join(root, node)) } func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) { @@ -173,8 +176,15 @@ func (r *zkRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) } func (r *zkRegistry) CloseAndNilClient() { - r.client.Close() + r.cltLock.Lock() + client := r.client r.client = nil + r.cltLock.Unlock() + + if client == nil { + return + } + client.Close() } func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient { @@ -202,22 +212,27 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { ) r.cltLock.Lock() - defer r.cltLock.Unlock() - err = r.client.Create(root) + client := r.client + r.cltLock.Unlock() + if client == nil { + return perrors.New("zk Client is null, can not process registerTempZookeeperNode ") + } + + err = client.Create(root) if err != nil { logger.Errorf("zk.Create(root{%s}) = err{%v}", root, perrors.WithStack(err)) return perrors.WithStack(err) } // try to register the node - zkPath, err = r.client.RegisterTemp(root, node) + zkPath, err = client.RegisterTemp(root, node) if err != nil { logger.Errorf("Register temp node(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err)) if perrors.Cause(err) == zk.ErrNodeExists { // should delete the old node logger.Info("Register temp node failed, try to delete the old and recreate (root{%s}, node{%s}) , ignore!", root, node) - if err = r.client.Delete(zkPath); err == nil { - _, err = r.client.RegisterTemp(root, node) + if err = client.Delete(zkPath); err == nil { + _, err = client.RegisterTemp(root, node) } if err != nil { logger.Errorf("Recreate the temp node failed, (root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err)) diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index bd1da54776..59d976f5d8 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -419,9 +419,7 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error { for _, str := range strings.Split(basePath, "/")[1:] { tmpPath = path.Join(tmpPath, "/", str) err = errNilZkClientConn - z.Lock() conn := z.Conn - z.Unlock() if conn != nil { _, err = conn.Create(tmpPath, value, 0, zk.WorldACL(zk.PermAll)) } @@ -446,9 +444,7 @@ func (z *ZookeeperClient) Delete(basePath string) error { ) err = errNilZkClientConn - z.Lock() conn := z.Conn - z.Unlock() if conn != nil { err = conn.Delete(basePath, -1) } @@ -468,9 +464,7 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er err = errNilZkClientConn data = []byte("") zkPath = path.Join(basePath) + "/" + node - z.Lock() conn := z.Conn - z.Unlock() if conn != nil { tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) } From 3ae2a82ec79f77273cfeff963702bd80fb1d2520 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Mon, 1 Jun 2020 15:21:28 +0800 Subject: [PATCH 2/8] revert changes for CloseAndNilClient --- registry/etcdv3/registry.go | 8 +------- registry/kubernetes/registry.go | 8 +------- registry/zookeeper/registry.go | 9 +-------- 3 files changed, 3 insertions(+), 22 deletions(-) diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index a94352c665..a65d090349 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -119,14 +119,8 @@ func (r *etcdV3Registry) DoUnregister(root string, node string) error { } func (r *etcdV3Registry) CloseAndNilClient() { - r.cltLock.Lock() - client := r.client + r.client.Close() r.client = nil - r.cltLock.Unlock() - if client == nil { - return - } - client.Close() } func (r *etcdV3Registry) CloseListener() { diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go index 4c59fc080b..7ee0f6b0ee 100644 --- a/registry/kubernetes/registry.go +++ b/registry/kubernetes/registry.go @@ -81,14 +81,8 @@ func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) { } func (r *kubernetesRegistry) CloseAndNilClient() { - r.cltLock.Lock() - client := r.client + r.client.Close() r.client = nil - r.cltLock.Unlock() - if client == nil { - return - } - client.Close() } func (r *kubernetesRegistry) CloseListener() { diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index fd6ef86e77..32f9fe7a29 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -176,15 +176,8 @@ func (r *zkRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) } func (r *zkRegistry) CloseAndNilClient() { - r.cltLock.Lock() - client := r.client + r.client.Close() r.client = nil - r.cltLock.Unlock() - - if client == nil { - return - } - client.Close() } func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient { From 977fffa8500d65720ab23e85a4e263c78b9a02e1 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Mon, 1 Jun 2020 15:24:01 +0800 Subject: [PATCH 3/8] lock optimize --- registry/zookeeper/registry.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 32f9fe7a29..912f8c2669 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -204,9 +204,7 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { zkPath string ) - r.cltLock.Lock() client := r.client - r.cltLock.Unlock() if client == nil { return perrors.New("zk Client is null, can not process registerTempZookeeperNode ") } @@ -260,9 +258,7 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen zkListener = NewRegistryConfigurationListener(r.client, r, conf) if r.listener == nil { - r.cltLock.Lock() client := r.client - r.cltLock.Unlock() if client == nil { return nil, perrors.New("zk connection broken") } From 9c623ec6b16432f9b6ae773193f9cc3092d93a22 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Mon, 1 Jun 2020 15:34:23 +0800 Subject: [PATCH 4/8] revert changes for registerTempZookeeperNode --- registry/zookeeper/registry.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 912f8c2669..b8f0cfb9f4 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -204,26 +204,23 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { zkPath string ) - client := r.client - if client == nil { - return perrors.New("zk Client is null, can not process registerTempZookeeperNode ") - } - - err = client.Create(root) + r.cltLock.Lock() + defer r.cltLock.Unlock() + err = r.client.Create(root) if err != nil { logger.Errorf("zk.Create(root{%s}) = err{%v}", root, perrors.WithStack(err)) return perrors.WithStack(err) } // try to register the node - zkPath, err = client.RegisterTemp(root, node) + zkPath, err = r.client.RegisterTemp(root, node) if err != nil { logger.Errorf("Register temp node(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err)) if perrors.Cause(err) == zk.ErrNodeExists { // should delete the old node logger.Info("Register temp node failed, try to delete the old and recreate (root{%s}, node{%s}) , ignore!", root, node) - if err = client.Delete(zkPath); err == nil { - _, err = client.RegisterTemp(root, node) + if err = r.client.Delete(zkPath); err == nil { + _, err = r.client.RegisterTemp(root, node) } if err != nil { logger.Errorf("Recreate the temp node failed, (root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err)) From f7ea5f2b3b38627bac995cec94fc4d5e8ec31c67 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Mon, 1 Jun 2020 15:53:03 +0800 Subject: [PATCH 5/8] lock optimize --- remoting/zookeeper/client.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index 59d976f5d8..fcf07d4666 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -487,9 +487,7 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, ) err = errNilZkClientConn - z.Lock() conn := z.Conn - z.Unlock() if conn != nil { tmpPath, err = conn.Create( path.Join(basePath)+"/", @@ -520,9 +518,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, ) err = errNilZkClientConn - z.Lock() conn := z.Conn - z.Unlock() if conn != nil { children, stat, watcher, err = conn.ChildrenW(path) } @@ -556,9 +552,7 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) { ) err = errNilZkClientConn - z.Lock() conn := z.Conn - z.Unlock() if conn != nil { children, stat, err = conn.Children(path) } @@ -589,9 +583,7 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) { ) err = errNilZkClientConn - z.Lock() conn := z.Conn - z.Unlock() if conn != nil { exist, _, watcher, err = conn.ExistsW(zkPath) } From db059d3db1eb88ca967ef78b1ca73082afa23501 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Mon, 1 Jun 2020 23:13:09 +0800 Subject: [PATCH 6/8] lock optimize : change to rwlock --- registry/zookeeper/registry.go | 12 ++++++------ remoting/zookeeper/client.go | 16 +++++++++++++++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index b8f0cfb9f4..cf442a1969 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -156,15 +156,13 @@ func (r *zkRegistry) DoRegister(root string, node string) error { } func (r *zkRegistry) DoUnregister(root string, node string) error { - client := r.client - if client == nil { - return perrors.New("zk Client is null, can not process registerTempZookeeperNode ") - } + r.cltLock.Lock() + defer r.cltLock.Unlock() - if !client.ZkConnValid() { + if !r.ZkClient().ZkConnValid() { return perrors.Errorf("zk client is not valid.") } - return client.Delete(path.Join(root, node)) + return r.ZkClient().Delete(path.Join(root, node)) } func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) { @@ -255,7 +253,9 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen zkListener = NewRegistryConfigurationListener(r.client, r, conf) if r.listener == nil { + r.cltLock.Lock() client := r.client + r.cltLock.Unlock() if client == nil { return nil, perrors.New("zk connection broken") } diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index fcf07d4666..cc34c76b42 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -51,7 +51,7 @@ var ( type ZookeeperClient struct { name string ZkAddrs []string - sync.Mutex // for conn + sync.RWMutex // for conn Conn *zk.Conn Timeout time.Duration exit chan struct{} @@ -419,7 +419,9 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error { for _, str := range strings.Split(basePath, "/")[1:] { tmpPath = path.Join(tmpPath, "/", str) err = errNilZkClientConn + z.RLock() conn := z.Conn + z.RUnlock() if conn != nil { _, err = conn.Create(tmpPath, value, 0, zk.WorldACL(zk.PermAll)) } @@ -444,7 +446,9 @@ func (z *ZookeeperClient) Delete(basePath string) error { ) err = errNilZkClientConn + z.RLock() conn := z.Conn + z.RUnlock() if conn != nil { err = conn.Delete(basePath, -1) } @@ -464,7 +468,9 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er err = errNilZkClientConn data = []byte("") zkPath = path.Join(basePath) + "/" + node + z.RLock() conn := z.Conn + z.RUnlock() if conn != nil { tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) } @@ -487,7 +493,9 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, ) err = errNilZkClientConn + z.RLock() conn := z.Conn + z.RUnlock() if conn != nil { tmpPath, err = conn.Create( path.Join(basePath)+"/", @@ -518,7 +526,9 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, ) err = errNilZkClientConn + z.RLock() conn := z.Conn + z.RUnlock() if conn != nil { children, stat, watcher, err = conn.ChildrenW(path) } @@ -552,7 +562,9 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) { ) err = errNilZkClientConn + z.RLock() conn := z.Conn + z.RUnlock() if conn != nil { children, stat, err = conn.Children(path) } @@ -583,7 +595,9 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) { ) err = errNilZkClientConn + z.RLock() conn := z.Conn + z.RUnlock() if conn != nil { exist, _, watcher, err = conn.ExistsW(zkPath) } From 1d1bbe08088bbe704f79ffdbf3e19f0ef1fbda0d Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Mon, 1 Jun 2020 23:17:33 +0800 Subject: [PATCH 7/8] lock optimize : change to rwlock --- registry/zookeeper/registry.go | 1 - remoting/zookeeper/client.go | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index cf442a1969..5d5f9e0526 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -158,7 +158,6 @@ func (r *zkRegistry) DoRegister(root string, node string) error { func (r *zkRegistry) DoUnregister(root string, node string) error { r.cltLock.Lock() defer r.cltLock.Unlock() - if !r.ZkClient().ZkConnValid() { return perrors.Errorf("zk client is not valid.") } diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index cc34c76b42..c66bf4b673 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -278,7 +278,7 @@ LOOP: break LOOP case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged): logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path) - z.Lock() + z.RLock() for p, a := range z.eventRegistry { if strings.HasPrefix(p, event.Path) { logger.Infof("send event{state:zk.EventNodeDataChange, Path:%s} notify event to path{%s} related listener", @@ -288,7 +288,7 @@ LOOP: } } } - z.Unlock() + z.RUnlock() case (int)(zk.StateConnecting), (int)(zk.StateConnected), (int)(zk.StateHasSession): if state == (int)(zk.StateHasSession) { continue @@ -371,11 +371,11 @@ func (z *ZookeeperClient) ZkConnValid() bool { } valid := true - z.Lock() + z.RLock() if z.Conn == nil { valid = false } - z.Unlock() + z.RUnlock() return valid } From 9ae184fe7b0bf2531088b6a4cbe02f05ed50d074 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Tue, 2 Jun 2020 21:43:33 +0800 Subject: [PATCH 8/8] extract method for get zookeeper connection --- remoting/zookeeper/client.go | 45 ++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index c66bf4b673..a165d8a77c 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -416,15 +416,15 @@ func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error { ) logger.Debugf("zookeeperClient.Create(basePath{%s})", basePath) + conn := z.getConn() + err = errNilZkClientConn + if conn == nil { + return perrors.WithMessagef(err, "zk.Create(path:%s)", basePath) + } + for _, str := range strings.Split(basePath, "/")[1:] { tmpPath = path.Join(tmpPath, "/", str) - err = errNilZkClientConn - z.RLock() - conn := z.Conn - z.RUnlock() - if conn != nil { - _, err = conn.Create(tmpPath, value, 0, zk.WorldACL(zk.PermAll)) - } + _, err = conn.Create(tmpPath, value, 0, zk.WorldACL(zk.PermAll)) if err != nil { if err == zk.ErrNodeExists { @@ -446,9 +446,7 @@ func (z *ZookeeperClient) Delete(basePath string) error { ) err = errNilZkClientConn - z.RLock() - conn := z.Conn - z.RUnlock() + conn := z.getConn() if conn != nil { err = conn.Delete(basePath, -1) } @@ -468,9 +466,7 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er err = errNilZkClientConn data = []byte("") zkPath = path.Join(basePath) + "/" + node - z.RLock() - conn := z.Conn - z.RUnlock() + conn := z.getConn() if conn != nil { tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) } @@ -493,9 +489,7 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, ) err = errNilZkClientConn - z.RLock() - conn := z.Conn - z.RUnlock() + conn := z.getConn() if conn != nil { tmpPath, err = conn.Create( path.Join(basePath)+"/", @@ -526,9 +520,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, ) err = errNilZkClientConn - z.RLock() - conn := z.Conn - z.RUnlock() + conn := z.getConn() if conn != nil { children, stat, watcher, err = conn.ChildrenW(path) } @@ -562,9 +554,7 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) { ) err = errNilZkClientConn - z.RLock() - conn := z.Conn - z.RUnlock() + conn := z.getConn() if conn != nil { children, stat, err = conn.Children(path) } @@ -595,9 +585,7 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) { ) err = errNilZkClientConn - z.RLock() - conn := z.Conn - z.RUnlock() + conn := z.getConn() if conn != nil { exist, _, watcher, err = conn.ExistsW(zkPath) } @@ -618,3 +606,10 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) { func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) { return z.Conn.Get(zkPath) } + +// getConn gets zookeeper connection safely +func (z *ZookeeperClient) getConn() *zk.Conn { + z.RLock() + defer z.RUnlock() + return z.Conn +}