diff --git a/common/constant/key.go b/common/constant/key.go index 7c45a1397d..7a9eb683a8 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -169,6 +169,10 @@ const ( NACOS_USERNAME = "username" ) +const ( + FILE_KEY = "file" +) + const ( ZOOKEEPER_KEY = "zookeeper" ) diff --git a/common/url.go b/common/url.go index e5fa895adb..0ff3e8f034 100644 --- a/common/url.go +++ b/common/url.go @@ -396,6 +396,17 @@ func (c *URL) AddParam(key string, value string) { c.params.Add(key, value) } +// AddParamAvoidNil will add key-value pair +// Not thread-safe +// think twice before using it. +func (c *URL) AddParamAvoidNil(key string, value string) { + if c.params == nil { + c.params = url.Values{} + } + + c.params.Add(key, value) +} + // SetParam will put the key-value pair into url // it's not thread safe. // think twice before you want to use this method diff --git a/config_center/apollo/impl.go b/config_center/apollo/impl.go index 8030a2c800..ac5328c27a 100644 --- a/config_center/apollo/impl.go +++ b/config_center/apollo/impl.go @@ -45,6 +45,7 @@ const ( ) type apolloConfiguration struct { + cc.BaseDynamicConfiguration url *common.URL listeners sync.Map diff --git a/config_center/base_dynamic_configuration.go b/config_center/base_dynamic_configuration.go new file mode 100644 index 0000000000..3d6757852a --- /dev/null +++ b/config_center/base_dynamic_configuration.go @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config_center + +// BaseDynamicConfiguration will default implementation DynamicConfiguration some method +type BaseDynamicConfiguration struct { +} + +// RemoveConfig +func (bdc *BaseDynamicConfiguration) RemoveConfig(string, string) error { + return nil +} diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go index 540febc9d3..cbf8e8cf03 100644 --- a/config_center/dynamic_configuration.go +++ b/config_center/dynamic_configuration.go @@ -58,6 +58,9 @@ type DynamicConfiguration interface { // PublishConfig will publish the config with the (key, group, value) pair PublishConfig(string, string, string) error + // RemoveConfig will remove the config white the (key, group) pair + RemoveConfig(string, string) error + // GetConfigKeysByGroup will return all keys with the group GetConfigKeysByGroup(group string) (*gxset.HashSet, error) } diff --git a/config_center/file/factory.go b/config_center/file/factory.go new file mode 100644 index 0000000000..2dda900e20 --- /dev/null +++ b/config_center/file/factory.go @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package file + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/parser" +) + +func init() { + extension.SetConfigCenterFactory(constant.FILE_KEY, func() config_center.DynamicConfigurationFactory { + return &fileDynamicConfigurationFactory{} + }) +} + +type fileDynamicConfigurationFactory struct { +} + +// GetDynamicConfiguration Get Configuration with URL +func (f *fileDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, + error) { + dynamicConfiguration, err := newFileSystemDynamicConfiguration(url) + if err != nil { + return nil, perrors.WithStack(err) + } + + dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{}) + return dynamicConfiguration, err +} diff --git a/config_center/file/impl.go b/config_center/file/impl.go new file mode 100644 index 0000000000..9d8254026b --- /dev/null +++ b/config_center/file/impl.go @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package file + +import ( + "bytes" + "errors" + "io/ioutil" + "os" + "os/exec" + "os/user" + "path" + "path/filepath" + "runtime" + "strings" +) + +import ( + gxset "github.com/dubbogo/gost/container/set" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/parser" +) + +const ( + PARAM_NAME_PREFIX = "dubbo.config-center." + CONFIG_CENTER_DIR_PARAM_NAME = PARAM_NAME_PREFIX + "dir" + CONFIG_CENTER_ENCODING_PARAM_NAME = PARAM_NAME_PREFIX + "encoding" + DEFAULT_CONFIG_CENTER_ENCODING = "UTF-8" +) + +// FileSystemDynamicConfiguration +type FileSystemDynamicConfiguration struct { + config_center.BaseDynamicConfiguration + url *common.URL + rootPath string + encoding string + cacheListener *CacheListener + parser parser.ConfigurationParser +} + +func newFileSystemDynamicConfiguration(url *common.URL) (*FileSystemDynamicConfiguration, error) { + encode := url.GetParam(CONFIG_CENTER_ENCODING_PARAM_NAME, DEFAULT_CONFIG_CENTER_ENCODING) + + root := url.GetParam(CONFIG_CENTER_DIR_PARAM_NAME, "") + var c *FileSystemDynamicConfiguration + if _, err := os.Stat(root); err != nil { + // not exist, use default, /XXX/xx/.dubbo/config-center + if rp, err := Home(); err != nil { + return nil, perrors.WithStack(err) + } else { + root = path.Join(rp, ".dubbo", "config-center") + } + } + + if _, err := os.Stat(root); err != nil { + // it must be dir, if not exist, will create + if err = createDir(root); err != nil { + return nil, perrors.WithStack(err) + } + } + + c = &FileSystemDynamicConfiguration{ + url: url, + rootPath: root, + encoding: encode, + } + + c.cacheListener = NewCacheListener(c.rootPath) + + return c, nil +} + +// RootPath get root path +func (fsdc *FileSystemDynamicConfiguration) RootPath() string { + return fsdc.rootPath +} + +// Parser Get Parser +func (fsdc *FileSystemDynamicConfiguration) Parser() parser.ConfigurationParser { + return fsdc.parser +} + +// SetParser Set Parser +func (fsdc *FileSystemDynamicConfiguration) SetParser(p parser.ConfigurationParser) { + fsdc.parser = p +} + +// AddListener Add listener +func (fsdc *FileSystemDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, + opts ...config_center.Option) { + tmpOpts := &config_center.Options{} + for _, opt := range opts { + opt(tmpOpts) + } + + path := fsdc.GetPath(key, tmpOpts.Group) + + fsdc.cacheListener.AddListener(path, listener) +} + +// RemoveListener Remove listener +func (fsdc *FileSystemDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, + opts ...config_center.Option) { + tmpOpts := &config_center.Options{} + for _, opt := range opts { + opt(tmpOpts) + } + + path := fsdc.GetPath(key, tmpOpts.Group) + + fsdc.cacheListener.RemoveListener(path, listener) +} + +// GetProperties get properties file +func (fsdc *FileSystemDynamicConfiguration) GetProperties(key string, opts ...config_center.Option) (string, error) { + tmpOpts := &config_center.Options{} + for _, opt := range opts { + opt(tmpOpts) + } + + path := fsdc.GetPath(key, tmpOpts.Group) + file, err := ioutil.ReadFile(path) + if err != nil { + return "", perrors.WithStack(err) + } + + return string(file), nil +} + +// GetRule get Router rule properties file +func (fsdc *FileSystemDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) { + return fsdc.GetProperties(key, opts...) +} + +// GetInternalProperty get value by key in Default properties file(dubbo.properties) +func (fsdc *FileSystemDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, + error) { + return fsdc.GetProperties(key, opts...) +} + +// PublishConfig will publish the config with the (key, group, value) pair +func (fsdc *FileSystemDynamicConfiguration) PublishConfig(key string, group string, value string) error { + path := fsdc.GetPath(key, group) + return fsdc.write2File(path, value) +} + +// GetConfigKeysByGroup will return all keys with the group +func (fsdc *FileSystemDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) { + path := fsdc.GetPath("", group) + r := gxset.NewSet() + + fileInfo, _ := ioutil.ReadDir(path) + + for _, file := range fileInfo { + // list file + if file.IsDir() { + continue + } + + r.Add(file.Name()) + } + + return r, nil +} + +// RemoveConfig will remove the config whit hte (key, group) +func (fsdc *FileSystemDynamicConfiguration) RemoveConfig(key string, group string) error { + path := fsdc.GetPath(key, group) + _, err := fsdc.deleteDelay(path) + return err +} + +// Close close file watcher +func (fsdc *FileSystemDynamicConfiguration) Close() error { + return fsdc.cacheListener.Close() +} + +// GetPath get path +func (fsdc *FileSystemDynamicConfiguration) GetPath(key string, group string) string { + if len(key) == 0 { + return path.Join(fsdc.rootPath, group) + } + + if len(group) == 0 { + group = config_center.DEFAULT_GROUP + } + + return path.Join(fsdc.rootPath, group, key) +} + +func (fsdc *FileSystemDynamicConfiguration) deleteDelay(path string) (bool, error) { + if path == "" { + return false, nil + } + + if err := os.RemoveAll(path); err != nil { + return false, err + } + + return true, nil +} + +func (fsdc *FileSystemDynamicConfiguration) write2File(fp string, value string) error { + if err := forceMkdirParent(fp); err != nil { + return perrors.WithStack(err) + } + + return ioutil.WriteFile(fp, []byte(value), os.ModePerm) +} + +func forceMkdirParent(fp string) error { + pd := getParentDirectory(fp) + + return createDir(pd) +} + +func createDir(path string) error { + // create dir, chmod is drwxrwxrwx(0777) + if err := os.MkdirAll(path, os.ModePerm); err != nil { + return err + } + + return nil +} + +func getParentDirectory(fp string) string { + return substr(fp, 0, strings.LastIndex(fp, string(filepath.Separator))) +} + +func substr(s string, pos, length int) string { + runes := []rune(s) + l := pos + length + if l > len(runes) { + l = len(runes) + } + return string(runes[pos:l]) +} + +// Home returns the home directory for the executing user. +// +// This uses an OS-specific method for discovering the home directory. +// An error is returned if a home directory cannot be detected. +func Home() (string, error) { + user, err := user.Current() + if nil == err { + return user.HomeDir, nil + } + + // cross compile support + if "windows" == runtime.GOOS { + return homeWindows() + } + + // Unix-like system, so just assume Unix + return homeUnix() +} + +func homeUnix() (string, error) { + // First prefer the HOME environmental variable + if home := os.Getenv("HOME"); home != "" { + return home, nil + } + + // If that fails, try the shell + var stdout bytes.Buffer + cmd := exec.Command("sh", "-c", "eval echo ~$USER") + cmd.Stdout = &stdout + if err := cmd.Run(); err != nil { + return "", err + } + + result := strings.TrimSpace(stdout.String()) + if result == "" { + return "", errors.New("blank output when reading home directory") + } + + return result, nil +} + +func homeWindows() (string, error) { + drive := os.Getenv("HOMEDRIVE") + path := os.Getenv("HOMEPATH") + home := drive + path + if drive == "" || path == "" { + home = os.Getenv("USERPROFILE") + } + if home == "" { + return "", errors.New("HOMEDRIVE, HOMEPATH, and USERPROFILE are blank") + } + + return home, nil +} diff --git a/config_center/file/impl_test.go b/config_center/file/impl_test.go new file mode 100644 index 0000000000..58892953d5 --- /dev/null +++ b/config_center/file/impl_test.go @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package file + +import ( + "fmt" + "os" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config_center" +) + +const ( + key = "com.dubbo.go" +) + +func initFileData(t *testing.T) (*FileSystemDynamicConfiguration, error) { + urlString := "registry://127.0.0.1:2181" + regurl, err := common.NewURL(urlString) + assert.NoError(t, err) + dc, err := extension.GetConfigCenterFactory("file").GetDynamicConfiguration(®url) + assert.NoError(t, err) + + return dc.(*FileSystemDynamicConfiguration), err +} + +func TestPublishAndGetConfig(t *testing.T) { + file, err := initFileData(t) + assert.NoError(t, err) + err = file.PublishConfig(key, "", "A") + assert.NoError(t, err) + + prop, err := file.GetProperties(key) + assert.NoError(t, err) + assert.Equal(t, "A", prop) + + defer destroy(file.rootPath, file) +} + +func TestAddListener(t *testing.T) { + file, err := initFileData(t) + group := "dubbogo" + value := "Test Value" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + + listener := &mockDataListener{} + file.AddListener(key, listener, config_center.WithGroup(group)) + + value = "Test Value 2" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + // remove need wait a moment + time.Sleep(time.Second) + defer destroy(file.rootPath, file) +} + +func TestRemoveListener(t *testing.T) { + file, err := initFileData(t) + group := "dubbogo" + value := "Test Value" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + + listener := &mockDataListener{} + file.AddListener(key, listener, config_center.WithGroup(group)) + + value = "Test Value 2" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + + // make sure callback before RemoveListener + time.Sleep(time.Second) + file.RemoveListener(key, listener, config_center.WithGroup(group)) + value = "Test Value 3" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + // remove need wait a moment + time.Sleep(time.Second) + defer destroy(file.rootPath, file) +} + +func TestGetConfigKeysByGroup(t *testing.T) { + file, err := initFileData(t) + group := "dubbogo" + value := "Test Value" + err = file.PublishConfig(key, group, value) + gs, err := file.GetConfigKeysByGroup(group) + assert.NoError(t, err) + assert.Equal(t, 1, gs.Size()) + assert.Equal(t, key, gs.Values()[0]) + // remove need wait a moment + time.Sleep(time.Second) + defer destroy(file.rootPath, file) +} + +func TestGetConfig(t *testing.T) { + file, err := initFileData(t) + assert.NoError(t, err) + group := "dubbogo" + value := "Test Value" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + prop, err := file.GetProperties(key, config_center.WithGroup(group)) + assert.NoError(t, err) + assert.Equal(t, value, prop) + defer destroy(file.rootPath, file) +} + +func TestPublishConfig(t *testing.T) { + file, err := initFileData(t) + assert.NoError(t, err) + group := "dubbogo" + value := "Test Value" + err = file.PublishConfig(key, group, value) + assert.NoError(t, err) + prop, err := file.GetInternalProperty(key, config_center.WithGroup(group)) + assert.NoError(t, err) + assert.Equal(t, value, prop) + defer destroy(file.rootPath, file) +} + +func destroy(path string, fdc *FileSystemDynamicConfiguration) { + fdc.Close() + os.RemoveAll(path) +} + +type mockDataListener struct{} + +func (l *mockDataListener) Process(configType *config_center.ConfigChangeEvent) { + fmt.Printf("process!!!!! %v", configType) +} diff --git a/config_center/file/listener.go b/config_center/file/listener.go new file mode 100644 index 0000000000..d569030e5a --- /dev/null +++ b/config_center/file/listener.go @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package file + +import ( + "io/ioutil" + "sync" +) + +import ( + "github.com/fsnotify/fsnotify" +) + +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/remoting" +) + +// CacheListener is file watcher +type CacheListener struct { + watch *fsnotify.Watcher + keyListeners sync.Map + rootPath string +} + +// NewCacheListener creates a new CacheListener +func NewCacheListener(rootPath string) *CacheListener { + cl := &CacheListener{rootPath: rootPath} + // start watcher + watch, err := fsnotify.NewWatcher() + if err != nil { + logger.Errorf("file : listen config fail, error:%v ", err) + } + go func() { + for { + select { + case event := <-watch.Events: + key := event.Name + logger.Debugf("watcher %s, event %v", cl.rootPath, event) + if event.Op&fsnotify.Write == fsnotify.Write { + if l, ok := cl.keyListeners.Load(key); ok { + dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, + remoting.EventTypeUpdate) + } + } + if event.Op&fsnotify.Create == fsnotify.Create { + if l, ok := cl.keyListeners.Load(key); ok { + dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, + remoting.EventTypeAdd) + } + } + if event.Op&fsnotify.Remove == fsnotify.Remove { + if l, ok := cl.keyListeners.Load(key); ok { + removeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeDel) + } + } + case err := <-watch.Errors: + // err may be nil, ignore + if err != nil { + logger.Warnf("file : listen watch fail:%+v", err) + } + } + } + }() + cl.watch = watch + + extension.AddCustomShutdownCallback(func() { + cl.watch.Close() + }) + + return cl +} + +func removeCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) { + if len(lmap) == 0 { + logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event) + return + } + for l := range lmap { + callback(l, key, "", event) + } +} + +func dataChangeCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) { + if len(lmap) == 0 { + logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event) + return + } + c := getFileContent(key) + for l := range lmap { + callback(l, key, c, event) + } +} + +func callback(listener config_center.ConfigurationListener, path, data string, event remoting.EventType) { + listener.Process(&config_center.ConfigChangeEvent{Key: path, Value: data, ConfigType: event}) +} + +// Close will remove key listener and close watcher +func (cl *CacheListener) Close() error { + cl.keyListeners.Range(func(key, value interface{}) bool { + cl.keyListeners.Delete(key) + return true + }) + return cl.watch.Close() +} + +// AddListener will add a listener if loaded +// if you watcher a file or directory not exist, will error with no such file or directory +func (cl *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) { + // reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure + // make a map[your type]struct{} like set in java + listeners, loaded := cl.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{ + listener: {}}) + if loaded { + listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{} + cl.keyListeners.Store(key, listeners) + return + } + if err := cl.watch.Add(key); err != nil { + logger.Errorf("watcher add path:%s err:%v", key, err) + } +} + +// RemoveListener will delete a listener if loaded +func (cl *CacheListener) RemoveListener(key string, listener config_center.ConfigurationListener) { + listeners, loaded := cl.keyListeners.Load(key) + if !loaded { + return + } + delete(listeners.(map[config_center.ConfigurationListener]struct{}), listener) + if err := cl.watch.Remove(key); err != nil { + logger.Errorf("watcher remove path:%s err:%v", key, err) + } +} + +func getFileContent(path string) string { + c, err := ioutil.ReadFile(path) + if err != nil { + logger.Errorf("read file path:%s err:%v", path, err) + return "" + } + + return string(c) +} diff --git a/config_center/mock_dynamic_config.go b/config_center/mock_dynamic_config.go index 8fe0a25123..9bebd600c6 100644 --- a/config_center/mock_dynamic_config.go +++ b/config_center/mock_dynamic_config.go @@ -98,6 +98,7 @@ func (c *MockDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.Ha // MockDynamicConfiguration uses to parse content and defines listener type MockDynamicConfiguration struct { + BaseDynamicConfiguration parser parser.ConfigurationParser content string listener map[string]ConfigurationListener diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index bbf707b938..be94b9a2e3 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -47,6 +47,7 @@ const ( // nacosDynamicConfiguration is the implementation of DynamicConfiguration based on nacos type nacosDynamicConfiguration struct { + config_center.BaseDynamicConfiguration url *common.URL rootPath string wg sync.WaitGroup diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index ef579eb2d1..485abcb5f0 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -44,6 +44,7 @@ const ( ) type zookeeperDynamicConfiguration struct { + config_center.BaseDynamicConfiguration url *common.URL rootPath string wg sync.WaitGroup diff --git a/go.mod b/go.mod index 9e18cc276a..2d0a66d0a6 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect github.com/emicklei/go-restful/v3 v3.0.0 github.com/frankban/quicktest v1.4.1 // indirect + github.com/fsnotify/fsnotify v1.4.7 github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 // indirect github.com/go-co-op/gocron v0.1.1 github.com/go-resty/resty/v2 v2.1.0 diff --git a/registry/file/listener.go b/registry/file/listener.go new file mode 100644 index 0000000000..3fe7400226 --- /dev/null +++ b/registry/file/listener.go @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package file + +import "github.com/apache/dubbo-go/config_center" + +// RegistryConfigurationListener represent the processor of flie watcher +type RegistryConfigurationListener struct { +} + +// Process submit the ConfigChangeEvent to the event chan to notify all observer +func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) { + +} diff --git a/registry/file/service_discovery.go b/registry/file/service_discovery.go new file mode 100644 index 0000000000..59c5cf9a89 --- /dev/null +++ b/registry/file/service_discovery.go @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package file + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path" + "strconv" +) + +import ( + gxset "github.com/dubbogo/gost/container/set" + gxpage "github.com/dubbogo/gost/page" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/file" + "github.com/apache/dubbo-go/registry" +) + +// init will put the service discovery into extension +func init() { + extension.SetServiceDiscovery(constant.FILE_KEY, newFileSystemServiceDiscovery) +} + +// fileServiceDiscovery is the implementation of service discovery based on file. +type fileSystemServiceDiscovery struct { + dynamicConfiguration file.FileSystemDynamicConfiguration + rootPath string + fileMap map[string]string +} + +func newFileSystemServiceDiscovery(name string) (registry.ServiceDiscovery, error) { + sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name) + if !ok || sdc.Protocol != constant.FILE_KEY { + return nil, perrors.New("could not init the instance because the config is invalid") + } + + rp, err := file.Home() + if err != nil { + return nil, perrors.WithStack(err) + } + + fdcf := extension.GetConfigCenterFactory(constant.FILE_KEY) + p := path.Join(rp, ".dubbo", constant.REGISTRY_KEY) + url, _ := common.NewURL("") + url.AddParamAvoidNil(file.CONFIG_CENTER_DIR_PARAM_NAME, p) + c, err := fdcf.GetDynamicConfiguration(&url) + if err != nil { + return nil, perrors.WithStack(err) + } + + sd := &fileSystemServiceDiscovery{ + dynamicConfiguration: *c.(*file.FileSystemDynamicConfiguration), + rootPath: p, + fileMap: make(map[string]string), + } + + extension.AddCustomShutdownCallback(func() { + sd.Destroy() + }) + + for _, v := range sd.GetServices().Values() { + for _, i := range sd.GetInstances(v.(string)) { + // like java do nothing + l := &RegistryConfigurationListener{} + sd.dynamicConfiguration.AddListener(getServiceInstanceId(i), l, config_center.WithGroup(getServiceName(i))) + } + } + + return sd, nil +} + +// nolint +func (fssd *fileSystemServiceDiscovery) String() string { + return fmt.Sprintf("file-system-service-discovery") +} + +// Destroy will destroy the service discovery. +// If the discovery cannot be destroy, it will return an error. +func (fssd *fileSystemServiceDiscovery) Destroy() error { + fssd.dynamicConfiguration.Close() + + for _, f := range fssd.fileMap { + fssd.releaseAndRemoveRegistrationFiles(f) + } + + return nil +} + +// nolint +func (fssd *fileSystemServiceDiscovery) releaseAndRemoveRegistrationFiles(file string) { + os.RemoveAll(file) +} + +// ----------------- registration ---------------- + +// Register will register an instance of ServiceInstance to registry +func (fssd *fileSystemServiceDiscovery) Register(instance registry.ServiceInstance) error { + id := getServiceInstanceId(instance) + sn := getServiceName(instance) + + c, err := toJsonString(instance) + if err != nil { + return perrors.WithStack(err) + } + + err = fssd.dynamicConfiguration.PublishConfig(id, sn, c) + if err != nil { + return perrors.WithStack(err) + } + + fssd.fileMap[id] = fssd.dynamicConfiguration.GetPath(id, sn) + + return nil +} + +// nolint +func getServiceInstanceId(si registry.ServiceInstance) string { + if si.GetId() == "" { + return si.GetHost() + "." + strconv.Itoa(si.GetPort()) + } + + return si.GetId() +} + +// nolint +func getServiceName(si registry.ServiceInstance) string { + return si.GetServiceName() +} + +// toJsonString to json string +func toJsonString(si registry.ServiceInstance) (string, error) { + bytes, err := json.Marshal(si) + if err != nil { + return "", perrors.WithStack(err) + } + + return string(bytes), nil +} + +// Update will update the data of the instance in registry +func (fssd *fileSystemServiceDiscovery) Update(instance registry.ServiceInstance) error { + return fssd.Register(instance) +} + +// Unregister will unregister this instance from registry +func (fssd *fileSystemServiceDiscovery) Unregister(instance registry.ServiceInstance) error { + id := getServiceInstanceId(instance) + sn := getServiceName(instance) + + err := fssd.dynamicConfiguration.RemoveConfig(id, sn) + if err != nil { + return perrors.WithStack(err) + } + + delete(fssd.fileMap, instance.GetId()) + return nil +} + +// ----------------- discovery ------------------- +// GetDefaultPageSize will return the default page size +func (fssd *fileSystemServiceDiscovery) GetDefaultPageSize() int { + return 100 +} + +// GetServices will return the all service names. +func (fssd *fileSystemServiceDiscovery) GetServices() *gxset.HashSet { + r := gxset.NewSet() + // dynamicConfiguration root path is the actual root path + fileInfo, _ := ioutil.ReadDir(fssd.dynamicConfiguration.RootPath()) + + for _, file := range fileInfo { + if file.IsDir() { + r.Add(file.Name()) + } + } + + return r +} + +// GetInstances will return all service instances with serviceName +func (fssd *fileSystemServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { + set, err := fssd.dynamicConfiguration.GetConfigKeysByGroup(serviceName) + if err != nil { + logger.Errorf("[FileServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ", + serviceName, err) + return make([]registry.ServiceInstance, 0, 0) + } + + res := make([]registry.ServiceInstance, 0, set.Size()) + for _, v := range set.Values() { + id := v.(string) + p, err := fssd.dynamicConfiguration.GetProperties(id, config_center.WithGroup(serviceName)) + if err != nil { + logger.Errorf("[FileServiceDiscovery] Could not get the properties for id{%s}, service{%s}, "+ + "error = err{%v} ", + id, serviceName, err) + return make([]registry.ServiceInstance, 0, 0) + } + + dsi := ®istry.DefaultServiceInstance{} + err = json.Unmarshal([]byte(p), dsi) + if err != nil { + logger.Errorf("[FileServiceDiscovery] Could not unmarshal the properties for id{%s}, service{%s}, "+ + "error = err{%v} ", + id, serviceName, err) + return make([]registry.ServiceInstance, 0, 0) + } + + res = append(res, dsi) + } + + return res +} + +// GetInstancesByPage will return a page containing instances of ServiceInstance with the serviceName +// the page will start at offset +func (fssd *fileSystemServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { + return nil +} + +// GetHealthyInstancesByPage will return a page containing instances of ServiceInstance. +// The param healthy indices that the instance should be healthy or not. +// The page will start at offset +func (fssd *fileSystemServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, + healthy bool) gxpage.Pager { + return nil +} + +// Batch get all instances by the specified service names +func (fssd *fileSystemServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, + requestedSize int) map[string]gxpage.Pager { + return nil +} + +// ----------------- event ---------------------- +// AddListener adds a new ServiceInstancesChangedListener +// client +func (fssd *fileSystemServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { + //fssd.dynamicConfiguration.AddListener(listener.ServiceName) + return nil +} + +// DispatchEventByServiceName dispatches the ServiceInstancesChangedEvent to service instance whose name is serviceName +func (fssd *fileSystemServiceDiscovery) DispatchEventByServiceName(serviceName string) error { + return fssd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, fssd.GetInstances(serviceName))) +} + +// DispatchEventForInstances dispatches the ServiceInstancesChangedEvent to target instances +func (fssd *fileSystemServiceDiscovery) DispatchEventForInstances(serviceName string, + instances []registry.ServiceInstance) error { + return fssd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances)) +} + +// DispatchEvent dispatches the event +func (fssd *fileSystemServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { + extension.GetGlobalDispatcher().Dispatch(event) + return nil +} diff --git a/registry/file/service_discovery_test.go b/registry/file/service_discovery_test.go new file mode 100644 index 0000000000..0bffcae31d --- /dev/null +++ b/registry/file/service_discovery_test.go @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package file + +import ( + "math/rand" + "strconv" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/registry" +) + +var ( + testName = "test" +) + +func TestNewFileSystemServiceDiscoveryAndDestroy(t *testing.T) { + prepareData() + serviceDiscovery, err := newFileSystemServiceDiscovery(testName) + assert.NoError(t, err) + assert.NotNil(t, serviceDiscovery) + defer serviceDiscovery.Destroy() +} + +func TestCURDFileSystemServiceDiscovery(t *testing.T) { + prepareData() + serviceDiscovery, err := extension.GetServiceDiscovery(constant.FILE_KEY, testName) + assert.NoError(t, err) + md := make(map[string]string) + + rand.Seed(time.Now().Unix()) + serviceName := "service-name" + strconv.Itoa(rand.Intn(10000)) + md["t1"] = "test1" + r1 := ®istry.DefaultServiceInstance{ + Id: "123456789", + ServiceName: serviceName, + Host: "127.0.0.1", + Port: 2233, + Enable: true, + Healthy: true, + Metadata: md, + } + err = serviceDiscovery.Register(r1) + assert.NoError(t, err) + + instances := serviceDiscovery.GetInstances(r1.ServiceName) + assert.Equal(t, 1, len(instances)) + assert.Equal(t, r1.Id, instances[0].GetId()) + assert.Equal(t, r1.ServiceName, instances[0].GetServiceName()) + assert.Equal(t, r1.Port, instances[0].GetPort()) + + err = serviceDiscovery.Unregister(r1) + assert.NoError(t, err) + + err = serviceDiscovery.Register(r1) + + defer serviceDiscovery.Destroy() +} + +func prepareData() { + config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{ + Protocol: "file", + } +}