Skip to content

Commit

Permalink
fix: one nacos connection registers a unique service instance. (#570)
Browse files Browse the repository at this point in the history
one nacos connection registers a unique service instance.

Signed-off-by: Cybwan <[email protected]>
  • Loading branch information
cybwan authored Dec 23, 2024
1 parent 70f5ea9 commit 1b17d5d
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 51 deletions.
2 changes: 1 addition & 1 deletion pkg/connector/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (cdr *CatalogDeregistration) ToEureka() *eureka.Instance {

func (cdr *CatalogDeregistration) ToNacos() *vo.DeregisterInstanceParam {
svcInfoSegs := strings.Split(cdr.ServiceID, constant.SERVICE_INFO_SPLITER)
if len(svcInfoSegs) < 4 {
if len(svcInfoSegs) < 2 {
return nil
}
r := new(vo.DeregisterInstanceParam)
Expand Down
124 changes: 74 additions & 50 deletions pkg/connector/provider/nacos.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,67 +19,96 @@ import (
"github.com/flomesh-io/fsm/pkg/connector"
)

const (
aloneConnect = "*"
)

type nacosConnect struct {
namingClient naming_client.INamingClient
serverCfg constant.ServerConfig
clientCfg constant.ClientConfig
}

type NacosDiscoveryClient struct {
connectController connector.ConnectController
namingClient naming_client.INamingClient
serverConfig constant.ServerConfig
clientConfig constant.ClientConfig
nacosConnects map[string]*nacosConnect
lock sync.Mutex
}

func (dc *NacosDiscoveryClient) nacosClient() naming_client.INamingClient {
func (dc *NacosDiscoveryClient) nacosClient(connectKey string) naming_client.INamingClient {
dc.lock.Lock()
defer dc.lock.Unlock()

namespaceId := dc.connectController.GetAuthNacosNamespaceId()
if len(connectKey) == 0 {
connectKey = aloneConnect
}

conn, exists := dc.nacosConnects[connectKey]
if !exists {
conn = new(nacosConnect)
level := env.GetString("LOG_LEVEL", "warn")
conn.clientCfg = constant.ClientConfig{
TimeoutMs: 60000,
NotLoadCacheAtStart: true,
UpdateCacheWhenEmpty: true,
DisableUseSnapShot: false,
LogDir: "/tmp/nacos/log",
CacheDir: "/tmp/nacos/cache",
LogLevel: level,
}
dc.nacosConnects[connectKey] = conn
}

connectController := dc.connectController
namespaceId := connectController.GetAuthNacosNamespaceId()
if len(namespaceId) == 0 {
namespaceId = constant.DEFAULT_NAMESPACE_ID
}
if !strings.EqualFold(dc.clientConfig.NamespaceId, namespaceId) {
dc.clientConfig.NamespaceId = namespaceId
dc.namingClient = nil
if !strings.EqualFold(conn.clientCfg.NamespaceId, namespaceId) {
conn.clientCfg.NamespaceId = namespaceId
conn.namingClient = nil
}
if username := dc.connectController.GetAuthNacosUsername(); !strings.EqualFold(dc.clientConfig.Username, username) {
dc.clientConfig.Username = username
dc.namingClient = nil
if username := connectController.GetAuthNacosUsername(); !strings.EqualFold(conn.clientCfg.Username, username) {
conn.clientCfg.Username = username
conn.namingClient = nil
}
if password := dc.connectController.GetAuthNacosPassword(); !strings.EqualFold(dc.clientConfig.Password, password) {
dc.clientConfig.Password = password
dc.namingClient = nil
if password := connectController.GetAuthNacosPassword(); !strings.EqualFold(conn.clientCfg.Password, password) {
conn.clientCfg.Password = password
conn.namingClient = nil
}
if accessKey := dc.connectController.GetAuthNacosAccessKey(); !strings.EqualFold(dc.clientConfig.AccessKey, accessKey) {
dc.clientConfig.AccessKey = accessKey
dc.namingClient = nil
if accessKey := connectController.GetAuthNacosAccessKey(); !strings.EqualFold(conn.clientCfg.AccessKey, accessKey) {
conn.clientCfg.AccessKey = accessKey
conn.namingClient = nil
}
if secretKey := dc.connectController.GetAuthNacosSecretKey(); !strings.EqualFold(dc.clientConfig.SecretKey, secretKey) {
dc.clientConfig.SecretKey = secretKey
dc.namingClient = nil
if secretKey := connectController.GetAuthNacosSecretKey(); !strings.EqualFold(conn.clientCfg.SecretKey, secretKey) {
conn.clientCfg.SecretKey = secretKey
conn.namingClient = nil
}

address := dc.connectController.GetHTTPAddr()
address := connectController.GetHTTPAddr()
segs := strings.Split(address, ":")
ipAddr := segs[0]
port, _ := strconv.ParseUint(segs[1], 10, 64)

if !strings.EqualFold(dc.serverConfig.IpAddr, ipAddr) {
dc.serverConfig.IpAddr = ipAddr
dc.namingClient = nil
if !strings.EqualFold(conn.serverCfg.IpAddr, ipAddr) {
conn.serverCfg.IpAddr = ipAddr
conn.namingClient = nil
}

if dc.serverConfig.Port != port {
dc.serverConfig.Port = port
dc.namingClient = nil
if conn.serverCfg.Port != port {
conn.serverCfg.Port = port
conn.namingClient = nil
}

if dc.namingClient == nil {
dc.namingClient, _ = clients.CreateNamingClient(map[string]interface{}{
"serverConfigs": []constant.ServerConfig{dc.serverConfig},
"clientConfig": dc.clientConfig,
if conn.namingClient == nil {
conn.namingClient, _ = clients.CreateNamingClient(map[string]interface{}{
"serverConfigs": []constant.ServerConfig{conn.serverCfg},
"clientConfig": conn.clientCfg,
})
}

dc.connectController.WaitLimiter()
return dc.namingClient
connectController.WaitLimiter()
return conn.namingClient
}

func (dc *NacosDiscoveryClient) selectServices() ([]string, error) {
Expand All @@ -90,7 +119,7 @@ func (dc *NacosDiscoveryClient) selectServices() ([]string, error) {
if len(namespaceId) == 0 {
namespaceId = constant.DEFAULT_NAMESPACE_ID
}
if serviceList, err := dc.nacosClient().GetAllServicesInfo(vo.GetAllServiceInfoParam{
if serviceList, err := dc.nacosClient(aloneConnect).GetAllServicesInfo(vo.GetAllServiceInfoParam{
NameSpace: namespaceId,
GroupName: group,
PageNo: 1,
Expand All @@ -115,7 +144,7 @@ func (dc *NacosDiscoveryClient) selectInstances(svc string) ([]model.Instance, e
result, err := dc.connectController.CacheCatalogInstances(svc, func() (interface{}, error) {
var instances []model.Instance
for _, group := range dc.connectController.GetNacos2KGroupSet() {
if groupInstances, err := dc.nacosClient().SelectInstances(vo.SelectInstancesParam{
if groupInstances, err := dc.nacosClient(aloneConnect).SelectInstances(vo.SelectInstancesParam{
ServiceName: svc,
GroupName: group,
Clusters: dc.connectController.GetNacos2KClusterSet(),
Expand Down Expand Up @@ -362,8 +391,12 @@ func (dc *NacosDiscoveryClient) Deregister(dereg *connector.CatalogDeregistratio
return nil
}
port, _ := strconv.Atoi(fmt.Sprintf("%d", ins.Port))
return dc.connectController.CacheDeregisterInstance(dc.getServiceInstanceID(ins.ServiceName, ins.Ip, port, 0), func() error {
_, err := dc.nacosClient().DeregisterInstance(*ins)
instanceId := dc.getServiceInstanceID(ins.ServiceName, ins.Ip, port, 0)
return dc.connectController.CacheDeregisterInstance(instanceId, func() error {
conn := dc.nacosClient(instanceId)
_, err := conn.DeregisterInstance(*ins)
conn.CloseClient()
delete(dc.nacosConnects, instanceId)
return err
})
}
Expand All @@ -388,8 +421,9 @@ func (dc *NacosDiscoveryClient) Register(reg *connector.CatalogRegistration) err
}
}
port, _ := strconv.Atoi(fmt.Sprintf("%d", ins.Port))
return dc.connectController.CacheRegisterInstance(dc.getServiceInstanceID(ins.ServiceName, ins.Ip, port, 0), ins, func() error {
_, err := dc.nacosClient().RegisterInstance(*ins)
instanceId := dc.getServiceInstanceID(ins.ServiceName, ins.Ip, port, 0)
return dc.connectController.CacheRegisterInstance(instanceId, ins, func() error {
_, err := dc.nacosClient(instanceId).RegisterInstance(*ins)
return err
})
}
Expand Down Expand Up @@ -433,19 +467,9 @@ func (dc *NacosDiscoveryClient) Close() {
}

func GetNacosDiscoveryClient(connectController connector.ConnectController) (*NacosDiscoveryClient, error) {
level := env.GetString("LOG_LEVEL", "warn")
nacosDiscoveryClient := new(NacosDiscoveryClient)
nacosDiscoveryClient.connectController = connectController
nacosDiscoveryClient.clientConfig = constant.ClientConfig{
TimeoutMs: 60000,
NotLoadCacheAtStart: true,
UpdateCacheWhenEmpty: true,
DisableUseSnapShot: false,
LogDir: "/tmp/nacos/log",
CacheDir: "/tmp/nacos/cache",
LogLevel: level,
}

nacosDiscoveryClient.nacosConnects = make(map[string]*nacosConnect)
nacosDiscoveryClient.connectController.SetServiceInstanceIDFunc(nacosDiscoveryClient.getServiceInstanceID)
return nacosDiscoveryClient, nil
}

0 comments on commit 1b17d5d

Please sign in to comment.