Skip to content

Commit

Permalink
Nacos client (#1255)
Browse files Browse the repository at this point in the history
* build(deps): bump actions/cache from v2.1.4 to v2.1.5

Bumps [actions/cache](https://github.com/actions/cache) from v2.1.4 to v2.1.5.
- [Release notes](https://github.com/actions/cache/releases)
- [Commits](actions/cache@v2.1.4...1a9e213)

Signed-off-by: dependabot[bot] <[email protected]>

* improve etcd version and change create to put (#1203)

* up:remoting nacos

* add:nacos service discovery

* up:设置默认值

* up:nacos registroy client

* up:nacon config client

* up:go fmt

* up:nacos config client

* up:test

* up:修改初æ测试方法

* up:fmt

* up:triple version

* up:修改配置操作

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Xin.Zh <[email protected]>
Co-authored-by: AlexStocks <[email protected]>
Co-authored-by: randy <[email protected]>
  • Loading branch information
5 people authored Jun 14, 2021
1 parent 22bc867 commit 75fc696
Show file tree
Hide file tree
Showing 14 changed files with 243 additions and 418 deletions.
112 changes: 24 additions & 88 deletions config_center/nacos/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,51 @@
package nacos

import (
"strconv"
"strings"
"sync"
"time"
)

import (
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/config_client"
nacosconst "github.com/nacos-group/nacos-sdk-go/common/constant"
nacosClient "github.com/dubbogo/gost/database/kv/nacos"
perrors "github.com/pkg/errors"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/remoting/nacos"
)

// NacosClient Nacos client
// NacosClient Nacos configClient
type NacosClient struct {
name string
NacosAddrs []string
sync.Mutex // for Client
client *config_client.IConfigClient
exit chan struct{}
Timeout time.Duration
once sync.Once
onceClose func()
name string
NacosAddrs []string
sync.Mutex // for Client
configClient *nacosClient.NacosConfigClient
exit chan struct{}
Timeout time.Duration
once sync.Once
onceClose func()
}

// Client Get Client
func (n *NacosClient) Client() *config_client.IConfigClient {
return n.client
func (n *NacosClient) Client() *nacosClient.NacosConfigClient {
return n.configClient
}

// SetClient Set client
func (n *NacosClient) SetClient(client *config_client.IConfigClient) {
// SetClient Set configClient
func (n *NacosClient) SetClient(configClient *nacosClient.NacosConfigClient) {
n.Lock()
n.client = client
n.configClient = configClient
n.Unlock()
}

type option func(*options)

type options struct {
nacosName string
// client *NacosClient
// configClient *NacosClient
}

// WithNacosName Set nacos name
Expand All @@ -75,7 +72,7 @@ func WithNacosName(name string) option {
}
}

// ValidateNacosClient Validate nacos client , if null then create it
// ValidateNacosClient Validate nacos configClient , if null then create it
func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
if container == nil {
return perrors.Errorf("container can not be null")
Expand All @@ -95,7 +92,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
nacosAddresses := strings.Split(url.Location, ",")
if container.NacosClient() == nil {
// in dubbo ,every registry only connect one node ,so this is []string{r.Address}
newClient, err := newNacosClient(os.nacosName, nacosAddresses, timeout, url)
newClient, err := nacos.NewNacosConfigClientByUrl(url)
if err != nil {
logger.Errorf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}",
os.nacosName, url.Location, timeout.String(), err)
Expand All @@ -105,79 +102,18 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
}

if container.NacosClient().Client() == nil {
configClient, err := initNacosConfigClient(nacosAddresses, timeout, url)
configClient, err := nacos.NewNacosConfigClientByUrl(url)
if err != nil {
logger.Errorf("initNacosConfigClient(addr:%+v,timeout:%v,url:%v) = err %+v",
nacosAddresses, timeout.String(), url, err)
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
container.NacosClient().SetClient(&configClient)

container.NacosClient().SetClient(configClient.Client())
}

return perrors.WithMessagef(nil, "newNacosClient(address:%+v)", url.PrimitiveURL)
}

func newNacosClient(name string, nacosAddrs []string, timeout time.Duration, url *common.URL) (*NacosClient, error) {
var (
err error
n *NacosClient
)

n = &NacosClient{
name: name,
NacosAddrs: nacosAddrs,
Timeout: timeout,
exit: make(chan struct{}),
onceClose: func() {
close(n.exit)
},
}

configClient, err := initNacosConfigClient(nacosAddrs, timeout, url)
if err != nil {
logger.Errorf("initNacosConfigClient(addr:%+v,timeout:%v,url:%v) = err %+v",
nacosAddrs, timeout.String(), url, err)
return n, perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
n.SetClient(&configClient)

return n, nil
}

func initNacosConfigClient(nacosAddrs []string, timeout time.Duration, url *common.URL) (config_client.IConfigClient, error) {
var svrConfList []nacosconst.ServerConfig
for _, nacosAddr := range nacosAddrs {
split := strings.Split(nacosAddr, ":")
port, err := strconv.ParseUint(split[1], 10, 64)
if err != nil {
logger.Errorf("strconv.ParseUint(nacos addr port:%+v) = error %+v", split[1], err)
continue
}
svrconf := nacosconst.ServerConfig{
IpAddr: split[0],
Port: port,
}
svrConfList = append(svrConfList, svrconf)
}

return clients.CreateConfigClient(map[string]interface{}{
"serverConfigs": svrConfList,
"clientConfig": nacosconst.ClientConfig{
TimeoutMs: uint64(int32(timeout / time.Millisecond)),
ListenInterval: uint64(int32(timeout / time.Millisecond)),
NotLoadCacheAtStart: true,
LogDir: url.GetParam(constant.NACOS_LOG_DIR_KEY, ""),
CacheDir: url.GetParam(constant.NACOS_CACHE_DIR_KEY, ""),
Endpoint: url.GetParam(constant.NACOS_ENDPOINT, ""),
Username: url.GetParam(constant.NACOS_USERNAME, ""),
Password: url.GetParam(constant.NACOS_PASSWORD, ""),
NamespaceId: url.GetParam(constant.NACOS_NAMESPACE_ID, ""),
},
})
}

// Done Get nacos client exit signal
// Done Get nacos configClient exit signal
func (n *NacosClient) Done() <-chan struct{} {
return n.exit
}
Expand All @@ -193,7 +129,7 @@ func (n *NacosClient) stop() bool {
return false
}

// NacosClientValid Get nacos client valid status
// NacosClientValid Get nacos configClient valid status
func (n *NacosClient) NacosClientValid() bool {
select {
case <-n.exit:
Expand All @@ -211,7 +147,7 @@ func (n *NacosClient) NacosClientValid() bool {
return valid
}

// Close Close nacos client , then set null
// Close Close nacos configClient , then set null
func (n *NacosClient) Close() {
if n == nil {
return
Expand Down
49 changes: 19 additions & 30 deletions config_center/nacos/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package nacos

import (
"strings"
"testing"
"time"
)
Expand All @@ -32,9 +31,10 @@ import (
)

func TestNewNacosClient(t *testing.T) {
server := mockCommonNacosServer()
nacosURL := strings.ReplaceAll(server.URL, "http", "registry")

nacosURL := "registry://127.0.0.1:8848"
registryUrl, _ := common.NewURL(nacosURL)

c := &nacosDynamicConfiguration{
url: registryUrl,
done: make(chan struct{}),
Expand All @@ -44,52 +44,41 @@ func TestNewNacosClient(t *testing.T) {
c.wg.Add(1)
go HandleClientRestart(c)
go func() {
// c.client.Close() and <-c.client.Done() have order requirements.
// If c.client.Close() is called first.It is possible that "go HandleClientRestart(c)"
// sets c.client to nil before calling c.client.Done().
// c.configClient.Close() and <-c.configClient.Done() have order requirements.
// If c.configClient.Close() is called first.It is possible that "go HandleClientRestart(c)"
// sets c.configClient to nil before calling c.configClient.Done().
time.Sleep(time.Second)
c.client.Close()
}()
<-c.client.Done()
//<-c.client.Done()
c.Destroy()
}

func TestSetNacosClient(t *testing.T) {
server := mockCommonNacosServer()
nacosURL := "registry://" + server.Listener.Addr().String()
nacosURL := "registry://127.0.0.1:8848"
registryUrl, _ := common.NewURL(nacosURL)

c := &nacosDynamicConfiguration{
url: registryUrl,
done: make(chan struct{}),
}
var client *NacosClient
client = &NacosClient{
name: nacosClientName,
NacosAddrs: []string{nacosURL},
Timeout: 15 * time.Second,
exit: make(chan struct{}),
onceClose: func() {
close(client.exit)
},
}
c.SetNacosClient(client)

err := ValidateNacosClient(c, WithNacosName(nacosClientName))
assert.NoError(t, err)
c.wg.Add(1)
go HandleClientRestart(c)
go func() {
// c.client.Close() and <-c.client.Done() have order requirements.
// If c.client.Close() is called first.It is possible that "go HandleClientRestart(c)"
// sets c.client to nil before calling c.client.Done().
// c.configClient.Close() and <-c.configClient.Done() have order requirements.
// If c.configClient.Close() is called first.It is possible that "go HandleClientRestart(c)"
// sets c.configClient to nil before calling c.configClient.Done().
time.Sleep(time.Second)
c.client.Close()
}()
<-c.client.Done()
c.Destroy()
}

func TestNewNacosClient_connectError(t *testing.T) {
nacosURL := "registry://127.0.0.1:8888"
nacosURL := "registry://127.0.0.1:8848"
registryUrl, err := common.NewURL(nacosURL)
assert.NoError(t, err)
c := &nacosDynamicConfiguration{
Expand All @@ -101,14 +90,14 @@ func TestNewNacosClient_connectError(t *testing.T) {
c.wg.Add(1)
go HandleClientRestart(c)
go func() {
// c.client.Close() and <-c.client.Done() have order requirements.
// If c.client.Close() is called first.It is possible that "go HandleClientRestart(c)"
// sets c.client to nil before calling c.client.Done().
// c.configClient.Close() and <-c.configClient.Done() have order requirements.
// If c.configClient.Close() is called first.It is possible that "go HandleClientRestart(c)"
// sets c.configClient to nil before calling c.configClient.Done().
time.Sleep(time.Second)
c.client.Close()
}()
<-c.client.Done()
// let client do retry
// <-c.client.Done()
// let configClient do retry
time.Sleep(5 * time.Second)
c.Destroy()
}
54 changes: 7 additions & 47 deletions config_center/nacos/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,75 +19,35 @@ package nacos

import (
"sync"
"time"
)

import (
"github.com/apache/dubbo-getty"
perrors "github.com/pkg/errors"
nacosClient "github.com/dubbogo/gost/database/kv/nacos"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/logger"
)

const (
connDelay = 3
maxFailTimes = 15
)

type nacosClientFacade interface {
NacosClient() *NacosClient
SetNacosClient(*NacosClient)
// WaitGroup for wait group control, zk client listener & zk client container
NacosClient() *nacosClient.NacosConfigClient
SetNacosClient(*nacosClient.NacosConfigClient)
// WaitGroup for wait group control, zk configClient listener & zk configClient container
WaitGroup() *sync.WaitGroup
// GetDone For nacos client control RestartCallBack() bool
// GetDone For nacos configClient control RestartCallBack() bool
GetDone() chan struct{}
common.Node
}

// HandleClientRestart Restart client handler
// HandleClientRestart Restart configClient handler
func HandleClientRestart(r nacosClientFacade) {
var (
err error
failTimes int
)

defer r.WaitGroup().Done()
LOOP:
for {
select {
case <-r.GetDone():
logger.Warnf("(NacosProviderRegistry)reconnectNacosRegistry goroutine exit now...")
break LOOP
// re-register all services
case <-r.NacosClient().Done():
r.NacosClient().Close()
nacosName := r.NacosClient().name
nacosAddress := r.NacosClient().NacosAddrs
r.SetNacosClient(nil)

// Connect nacos until success.
failTimes = 0
for {
select {
case <-r.GetDone():
logger.Warnf("(NacosProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(time.Duration(failTimes*connDelay) * time.Second): // Prevent crazy reconnection nacos.
}
err = ValidateNacosClient(r, WithNacosName(nacosName))
logger.Infof("NacosProviderRegistry.validateNacosClient(nacosAddr{%s}) = error{%#v}",
nacosAddress, perrors.WithStack(err))
if err == nil {
break
}
failTimes++
if maxFailTimes <= failTimes {
failTimes = maxFailTimes
}
}
return
}
}
}
Loading

0 comments on commit 75fc696

Please sign in to comment.