Skip to content

Commit

Permalink
Merge branch '3.0' into feature/reduce-etcd-registry-conn
Browse files Browse the repository at this point in the history
  • Loading branch information
WilliamLeaves authored Jul 23, 2021
2 parents 64fbfbd + 3ab0ab6 commit e5941ad
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 21 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/apache/dubbo-go-hessian2 v1.9.2
github.com/creasty/defaults v1.5.1
github.com/dubbogo/go-zookeeper v1.0.3
github.com/dubbogo/gost v1.11.13
github.com/dubbogo/gost v1.11.14
github.com/dubbogo/triple v1.0.1
github.com/emicklei/go-restful/v3 v3.4.0
github.com/frankban/quicktest v1.4.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ github.com/dubbogo/go-zookeeper v1.0.3 h1:UkuY+rBsxdT7Bs63QAzp9z7XqQ53W1j8E5rwl8
github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
github.com/dubbogo/gost v1.11.13 h1:sWvK1QbHpPBMmRQJV9qIH3syLegQBQa4xAPof3/Kv5c=
github.com/dubbogo/gost v1.11.13/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.14 h1:9lfcdILOmqTOVAW1fPHa5uf1NrD6jlIOBe4vf8576yQ=
github.com/dubbogo/gost v1.11.14/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/dubbogo/net v0.0.3 h1:2k53mh+1U8h1gFjJ8ykzyP4wNdAdgjc5moD+xVHI/AE=
github.com/dubbogo/net v0.0.3/go.mod h1:B6/ka3g8VzcyrmdCH4VkHP1K0aHeI37FmclS+TCwIBU=
Expand Down
10 changes: 6 additions & 4 deletions registry/etcdv3/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

import (
gxchan "github.com/dubbogo/gost/container/chan"
perrors "github.com/pkg/errors"
)

Expand Down Expand Up @@ -80,20 +81,20 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {

type configurationListener struct {
registry *etcdV3Registry
events chan *config_center.ConfigChangeEvent
events *gxchan.UnboundedChan
closeOnce sync.Once
}

// NewConfigurationListener for listening the event of etcdv3.
func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
// add a new waiter
reg.WaitGroup().Add(1)
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32)}
}

// Process data change event from config center of etcd
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
l.events.In() <- configType
}

// Next returns next service event once received
Expand All @@ -104,7 +105,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.")
return nil, perrors.New("listener stopped")

case e := <-l.events:
case val := <-l.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
logger.Infof("got etcd event %#v", e)
if e.ConfigType == remoting.EventTypeDel && l.registry.client.Valid() {
select {
Expand Down
10 changes: 6 additions & 4 deletions registry/kubernetes/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
)

import (
gxchan "github.com/dubbogo/gost/container/chan"
perrors "github.com/pkg/errors"
)

Expand Down Expand Up @@ -80,19 +81,19 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {

type configurationListener struct {
registry *kubernetesRegistry
events chan *config_center.ConfigChangeEvent
events *gxchan.UnboundedChan
}

// NewConfigurationListener for listening the event of kubernetes.
func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
// add a new waiter
reg.WaitGroup().Add(1)
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32)}
}

// Process processes the data change event from config center of kubernetes
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
l.events.In() <- configType
}

// Next returns next service event once received
Expand All @@ -103,7 +104,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
logger.Warnf("listener's kubernetes client connection is broken, so kubernetes event listener exits now.")
return nil, perrors.New("listener stopped")

case e := <-l.events:
case val := <-l.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
logger.Debugf("got kubernetes event %#v", e)
if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() {
select {
Expand Down
15 changes: 9 additions & 6 deletions registry/nacos/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
)

import (
gxchan "github.com/dubbogo/gost/container/chan"
nacosClient "github.com/dubbogo/gost/database/kv/nacos"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
Expand All @@ -44,7 +45,7 @@ import (
type nacosListener struct {
namingClient *nacosClient.NacosNamingClient
listenUrl *common.URL
events chan *config_center.ConfigChangeEvent
events *gxchan.UnboundedChan
instanceMap map[string]model.Instance
cacheLock sync.Mutex
done chan struct{}
Expand All @@ -55,9 +56,10 @@ type nacosListener struct {
func NewNacosListener(url *common.URL, namingClient *nacosClient.NacosNamingClient) (*nacosListener, error) {
listener := &nacosListener{
namingClient: namingClient,
listenUrl: url, events: make(chan *config_center.ConfigChangeEvent, 32),
instanceMap: map[string]model.Instance{},
done: make(chan struct{}),
listenUrl: url,
events: gxchan.NewUnboundedChan(32),
instanceMap: map[string]model.Instance{},
done: make(chan struct{}),
}
err := listener.startListen()
return listener, err
Expand Down Expand Up @@ -198,7 +200,7 @@ func (nl *nacosListener) stopListen() error {
}

func (nl *nacosListener) process(configType *config_center.ConfigChangeEvent) {
nl.events <- configType
nl.events.In() <- configType
}

// Next returns the service event from nacos.
Expand All @@ -209,7 +211,8 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
logger.Warnf("nacos listener is close!listenUrl:%+v", nl.listenUrl)
return nil, perrors.New("listener stopped")

case e := <-nl.events:
case val := <-nl.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
logger.Debugf("got nacos event %s", e)
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(*common.URL)}, nil
}
Expand Down
10 changes: 6 additions & 4 deletions registry/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

import (
gxchan "github.com/dubbogo/gost/container/chan"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
perrors "github.com/pkg/errors"
)
Expand Down Expand Up @@ -116,7 +117,7 @@ func (l *RegistryDataListener) Close() {
type RegistryConfigurationListener struct {
client *gxzookeeper.ZookeeperClient
registry *zkRegistry
events chan *config_center.ConfigChangeEvent
events *gxchan.UnboundedChan
isClosed bool
close chan struct{}
closeOnce sync.Once
Expand All @@ -129,7 +130,7 @@ func NewRegistryConfigurationListener(client *gxzookeeper.ZookeeperClient, reg *
return &RegistryConfigurationListener{
client: client,
registry: reg,
events: make(chan *config_center.ConfigChangeEvent, 32),
events: gxchan.NewUnboundedChan(32),
isClosed: false,
close: make(chan struct{}, 1),
subscribeURL: conf,
Expand All @@ -138,7 +139,7 @@ func NewRegistryConfigurationListener(client *gxzookeeper.ZookeeperClient, reg *

// Process submit the ConfigChangeEvent to the event chan to notify all observer
func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
l.events.In() <- configType
}

// Next will observe the registry state and events chan
Expand All @@ -150,7 +151,8 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
case <-l.registry.Done():
logger.Warnf("zk consumer register has quit, so zk event listener exit now. (registry url {%v}", l.registry.BaseRegistry.URL)
return nil, perrors.New("zookeeper registry, (registry url{%v}) stopped")
case e := <-l.events:
case val := <-l.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
logger.Debugf("got zk event %s", e)
if e.ConfigType == remoting.EventTypeDel && !l.valid() {
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
Expand Down

0 comments on commit e5941ad

Please sign in to comment.