diff --git a/common/constant/default.go b/common/constant/default.go index 8ed645e84a..3c889158e4 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -41,6 +41,8 @@ const ( DEFAULT_FAILBACK_TIMES = "3" DEFAULT_FAILBACK_TIMES_INT = 3 DEFAULT_FAILBACK_TASKS = 100 + DEFAULT_REST_CLIENT = "resty" + DEFAULT_REST_SERVER = "go-restful" ) const ( diff --git a/common/constant/key.go b/common/constant/key.go index c8a03b3be9..07335bed59 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -174,21 +174,36 @@ const ( ) const ( - CONSUMER_SIGN_FILTER = "sign" - PROVIDER_AUTH_FILTER = "auth" - SERVICE_AUTH_KEY = "auth" - AUTHENTICATOR_KEY = "authenticator" - DEFAULT_AUTHENTICATOR = "accesskeys" - DEFAULT_ACCESS_KEY_STORAGE = "urlstorage" - ACCESS_KEY_STORAGE_KEY = "accessKey.storage" - REQUEST_TIMESTAMP_KEY = "timestamp" - REQUEST_SIGNATURE_KEY = "signature" - AK_KEY = "ak" - SIGNATURE_STRING_FORMAT = "%s#%s#%s#%s" + // name of consumer sign filter + CONSUMER_SIGN_FILTER = "sign" + // name of consumer sign filter + PROVIDER_AUTH_FILTER = "auth" + // name of service filter + SERVICE_AUTH_KEY = "auth" + // key of authenticator + AUTHENTICATOR_KEY = "authenticator" + // name of default authenticator + DEFAULT_AUTHENTICATOR = "accesskeys" + // name of default url storage + DEFAULT_ACCESS_KEY_STORAGE = "urlstorage" + // key of storage + ACCESS_KEY_STORAGE_KEY = "accessKey.storage" + // key of request timestamp + REQUEST_TIMESTAMP_KEY = "timestamp" + // key of request signature + REQUEST_SIGNATURE_KEY = "signature" + // AK key + AK_KEY = "ak" + // signature format + SIGNATURE_STRING_FORMAT = "%s#%s#%s#%s" + // key whether enable signature PARAMTER_SIGNATURE_ENABLE_KEY = "param.sign" - CONSUMER = "consumer" - ACCESS_KEY_ID_KEY = "accessKeyId" - SECRET_ACCESS_KEY_KEY = "secretAccessKey" + // consumer + CONSUMER = "consumer" + // key of access key id + ACCESS_KEY_ID_KEY = "accessKeyId" + // key of secret access key + SECRET_ACCESS_KEY_KEY = "secretAccessKey" ) // HealthCheck Router diff --git a/common/constant/time.go b/common/constant/time.go index be1baaca67..3bb339229b 100644 --- a/common/constant/time.go +++ b/common/constant/time.go @@ -22,5 +22,7 @@ import ( ) var ( + // The value will be 10^6 + // 1ms = 10^6ns MsToNanoRate = int64(time.Millisecond / time.Nanosecond) ) diff --git a/common/extension/auth.go b/common/extension/auth.go index e57e22f660..a35fc509da 100644 --- a/common/extension/auth.go +++ b/common/extension/auth.go @@ -9,10 +9,13 @@ var ( accesskeyStorages = make(map[string]func() filter.AccessKeyStorage) ) +// SetAuthenticator put the fcn into map with name func SetAuthenticator(name string, fcn func() filter.Authenticator) { authenticators[name] = fcn } +// GetAuthenticator find the Authenticator with name +// if not found, it will panic func GetAuthenticator(name string) filter.Authenticator { if authenticators[name] == nil { panic("authenticator for " + name + " is not existing, make sure you have import the package.") @@ -20,10 +23,13 @@ func GetAuthenticator(name string) filter.Authenticator { return authenticators[name]() } +// SetAccesskeyStorages will set the fcn into map with this name func SetAccesskeyStorages(name string, fcn func() filter.AccessKeyStorage) { accesskeyStorages[name] = fcn } +// GetAccesskeyStorages find the storage with the name. +// If not found, it will panic. func GetAccesskeyStorages(name string) filter.AccessKeyStorage { if accesskeyStorages[name] == nil { panic("accesskeyStorages for " + name + " is not existing, make sure you have import the package.") diff --git a/common/extension/config_reader.go b/common/extension/config_reader.go new file mode 100644 index 0000000000..aced5b0281 --- /dev/null +++ b/common/extension/config_reader.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/config/interfaces" +) + +var ( + configReaders = make(map[string]func() interfaces.ConfigReader) + defaults = make(map[string]string) +) + +// SetConfigReaders set a creator of config reader. +func SetConfigReaders(name string, v func() interfaces.ConfigReader) { + configReaders[name] = v +} + +// GetConfigReaders get a config reader by name. +func GetConfigReaders(name string) interfaces.ConfigReader { + if configReaders[name] == nil { + panic("config reader for " + name + " is not existing, make sure you have imported the package.") + } + return configReaders[name]() +} + +// SetDefaultConfigReader set {name} to default config reader for {module} +func SetDefaultConfigReader(module, name string) { + defaults[module] = name +} + +// GetDefaultConfigReader +func GetDefaultConfigReader() map[string]string { + return defaults +} diff --git a/common/extension/rest_client.go b/common/extension/rest_client.go new file mode 100644 index 0000000000..514d1fdfd2 --- /dev/null +++ b/common/extension/rest_client.go @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/protocol/rest/client" +) + +var ( + restClients = make(map[string]func(restOptions *client.RestOptions) client.RestClient, 8) +) + +func SetRestClient(name string, fun func(restOptions *client.RestOptions) client.RestClient) { + restClients[name] = fun +} + +func GetNewRestClient(name string, restOptions *client.RestOptions) client.RestClient { + if restClients[name] == nil { + panic("restClient for " + name + " is not existing, make sure you have import the package.") + } + return restClients[name](restOptions) +} diff --git a/common/extension/rest_server.go b/common/extension/rest_server.go new file mode 100644 index 0000000000..fa8d435a5c --- /dev/null +++ b/common/extension/rest_server.go @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/protocol/rest/server" +) + +var ( + restServers = make(map[string]func() server.RestServer, 8) +) + +func SetRestServer(name string, fun func() server.RestServer) { + restServers[name] = fun +} + +func GetNewRestServer(name string) server.RestServer { + if restServers[name] == nil { + panic("restServer for " + name + " is not existing, make sure you have import the package.") + } + return restServers[name]() +} diff --git a/common/yaml/testdata/config.yml b/common/yaml/testdata/config.yml new file mode 100644 index 0000000000..b5c2ca8ad1 --- /dev/null +++ b/common/yaml/testdata/config.yml @@ -0,0 +1,7 @@ + +intTest: 11 +booleanTest: false +strTest: "strTest" + +child: + strTest: "childStrTest" \ No newline at end of file diff --git a/common/yaml/yaml.go b/common/yaml/yaml.go new file mode 100644 index 0000000000..7c31d71c35 --- /dev/null +++ b/common/yaml/yaml.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaml + +import ( + "io/ioutil" + "path" +) + +import ( + perrors "github.com/pkg/errors" + "gopkg.in/yaml.v2" +) + +// loadYMLConfig Load yml config byte from file +func LoadYMLConfig(confProFile string) ([]byte, error) { + if len(confProFile) == 0 { + return nil, perrors.Errorf("application configure(provider) file name is nil") + } + + if path.Ext(confProFile) != ".yml" { + return nil, perrors.Errorf("application configure file name{%v} suffix must be .yml", confProFile) + } + + return ioutil.ReadFile(confProFile) +} + +// unmarshalYMLConfig Load yml config byte from file , then unmarshal to object +func UnmarshalYMLConfig(confProFile string, out interface{}) ([]byte, error) { + confFileStream, err := LoadYMLConfig(confProFile) + if err != nil { + return confFileStream, perrors.Errorf("ioutil.ReadFile(file:%s) = error:%v", confProFile, perrors.WithStack(err)) + } + return confFileStream, yaml.Unmarshal(confFileStream, out) +} diff --git a/common/yaml/yaml_test.go b/common/yaml/yaml_test.go new file mode 100644 index 0000000000..45eee59048 --- /dev/null +++ b/common/yaml/yaml_test.go @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaml + +import ( + "path/filepath" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +func TestUnmarshalYMLConfig(t *testing.T) { + conPath, err := filepath.Abs("./testdata/config.yml") + assert.NoError(t, err) + c := &Config{} + _, err = UnmarshalYMLConfig(conPath, c) + assert.NoError(t, err) + assert.Equal(t, "strTest", c.StrTest) + assert.Equal(t, 11, c.IntTest) + assert.Equal(t, false, c.BooleanTest) + assert.Equal(t, "childStrTest", c.ChildConfig.StrTest) +} + +func TestUnmarshalYMLConfig_Error(t *testing.T) { + c := &Config{} + _, err := UnmarshalYMLConfig("./testdata/config", c) + assert.Error(t, err) + _, err = UnmarshalYMLConfig("", c) + assert.Error(t, err) +} + +type Config struct { + StrTest string `yaml:"strTest" default:"default" json:"strTest,omitempty" property:"strTest"` + IntTest int `default:"109" yaml:"intTest" json:"intTest,omitempty" property:"intTest"` + BooleanTest bool `yaml:"booleanTest" default:"true" json:"booleanTest,omitempty"` + ChildConfig ChildConfig `yaml:"child" json:"child,omitempty"` +} + +type ChildConfig struct { + StrTest string `default:"strTest" default:"default" yaml:"strTest" json:"strTest,omitempty"` +} diff --git a/config/base_config.go b/config/base_config.go index 6d5ec7e249..93c0ce6a66 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -18,8 +18,7 @@ package config import ( - "io/ioutil" - "path" + "bytes" "reflect" "strconv" "strings" @@ -27,7 +26,6 @@ import ( import ( perrors "github.com/pkg/errors" - "gopkg.in/yaml.v2" ) import ( @@ -50,6 +48,8 @@ type BaseConfig struct { fatherConfig interface{} MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"` + + fileStream *bytes.Buffer } // startConfigCenter will start the config center. @@ -364,27 +364,4 @@ func initializeStruct(t reflect.Type, v reflect.Value) { } } - -} - -// loadYMLConfig Load yml config byte from file -func loadYMLConfig(confProFile string) ([]byte, error) { - if len(confProFile) == 0 { - return nil, perrors.Errorf("application configure(provider) file name is nil") - } - - if path.Ext(confProFile) != ".yml" { - return nil, perrors.Errorf("application configure file name{%v} suffix must be .yml", confProFile) - } - - return ioutil.ReadFile(confProFile) -} - -// unmarshalYMLConfig Load yml config byte from file , then unmarshal to object -func unmarshalYMLConfig(confProFile string, out interface{}) error { - confFileStream, err := loadYMLConfig(confProFile) - if err != nil { - return perrors.Errorf("ioutil.ReadFile(file:%s) = error:%v", confProFile, perrors.WithStack(err)) - } - return yaml.Unmarshal(confFileStream, out) } diff --git a/config/base_config_test.go b/config/base_config_test.go index 6973a4a18b..d16b242092 100644 --- a/config/base_config_test.go +++ b/config/base_config_test.go @@ -18,7 +18,6 @@ package config import ( "fmt" - "path/filepath" "reflect" "testing" ) @@ -518,13 +517,3 @@ func Test_initializeStruct(t *testing.T) { return consumerConfig.References != nil }) } - -func TestUnmarshalYMLConfig(t *testing.T) { - conPath, err := filepath.Abs("./testdata/consumer_config_with_configcenter.yml") - assert.NoError(t, err) - c := &ConsumerConfig{} - assert.NoError(t, unmarshalYMLConfig(conPath, c)) - assert.Equal(t, "default", c.ProxyFactory) - assert.Equal(t, "dubbo.properties", c.ConfigCenterConfig.ConfigFile) - assert.Equal(t, "100ms", c.Connect_Timeout) -} diff --git a/config/condition_router_config.go b/config/condition_router_config.go index a95b2d2b12..87e835108e 100644 --- a/config/condition_router_config.go +++ b/config/condition_router_config.go @@ -25,12 +25,13 @@ import ( "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/common/yaml" ) //RouterInit Load config file to init router config func RouterInit(confRouterFile string) error { fileRouterFactories := extension.GetFileRouterFactories() - bytes, err := loadYMLConfig(confRouterFile) + bytes, err := yaml.LoadYMLConfig(confRouterFile) if err != nil { return perrors.Errorf("ioutil.ReadFile(file:%s) = error:%v", confRouterFile, perrors.WithStack(err)) } diff --git a/config/config_loader.go b/config/config_loader.go index 437f4d7323..c0687d8fc1 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -24,9 +24,14 @@ import ( "time" ) +import ( + perrors "github.com/pkg/errors" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" ) @@ -78,6 +83,7 @@ func checkApplicationName(config *ApplicationConfig) { // Load Dubbo Init func Load() { + // init router if confRouterFile != "" { if errPro := RouterInit(confRouterFile); errPro != nil { @@ -89,6 +95,18 @@ func Load() { if consumerConfig == nil { logger.Warnf("consumerConfig is nil!") } else { + // init other consumer config + conConfigType := consumerConfig.ConfigType + for key, value := range extension.GetDefaultConfigReader() { + if conConfigType == nil { + if v, ok := conConfigType[key]; ok { + value = v + } + } + if err := extension.GetConfigReaders(value).ReadConsumerConfig(consumerConfig.fileStream); err != nil { + logger.Errorf("ReadConsumerConfig error: %#v for %s", perrors.WithStack(err), value) + } + } metricConfig = consumerConfig.MetricConfig applicationConfig = consumerConfig.ApplicationConfig @@ -150,6 +168,18 @@ func Load() { if providerConfig == nil { logger.Warnf("providerConfig is nil!") } else { + // init other provider config + proConfigType := providerConfig.ConfigType + for key, value := range extension.GetDefaultConfigReader() { + if proConfigType != nil { + if v, ok := proConfigType[key]; ok { + value = v + } + } + if err := extension.GetConfigReaders(value).ReadProviderConfig(providerConfig.fileStream); err != nil { + logger.Errorf("ReadProviderConfig error: %#v for %s", perrors.WithStack(err), value) + } + } // so, you should know that the consumer's config will be override metricConfig = providerConfig.MetricConfig diff --git a/config/consumer_config.go b/config/consumer_config.go index 94da301ce4..1fa68415bf 100644 --- a/config/consumer_config.go +++ b/config/consumer_config.go @@ -18,6 +18,7 @@ package config import ( + "bytes" "time" ) @@ -30,6 +31,7 @@ import ( import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/common/yaml" ) ///////////////////////// @@ -57,6 +59,7 @@ type ConsumerConfig struct { ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf"` FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" ` ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" ` + ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"` } // UnmarshalYAML ... @@ -86,13 +89,12 @@ func ConsumerInit(confConFile string) error { if confConFile == "" { return perrors.Errorf("application configure(consumer) file name is nil") } - consumerConfig = &ConsumerConfig{} - err := unmarshalYMLConfig(confConFile, consumerConfig) + fileStream, err := yaml.UnmarshalYMLConfig(confConFile, consumerConfig) if err != nil { - return perrors.Errorf("yaml.Unmarshal() = error:%v", perrors.WithStack(err)) + return perrors.Errorf("unmarshalYmlConfig error %v", perrors.WithStack(err)) } - + consumerConfig.fileStream = bytes.NewBuffer(fileStream) //set method interfaceId & interfaceName for k, v := range consumerConfig.References { //set id for reference @@ -116,6 +118,7 @@ func ConsumerInit(confConFile string) error { } } logger.Debugf("consumer config{%#v}\n", consumerConfig) + return nil } @@ -139,5 +142,6 @@ func configCenterRefreshConsumer() error { return perrors.WithMessagef(err, "time.ParseDuration(Connect_Timeout{%#v})", consumerConfig.Connect_Timeout) } } - return err + + return nil } diff --git a/config/interfaces/config_reader.go b/config/interfaces/config_reader.go new file mode 100644 index 0000000000..23f2225e1b --- /dev/null +++ b/config/interfaces/config_reader.go @@ -0,0 +1,9 @@ +package interfaces + +import "bytes" + +// ConfigReader +type ConfigReader interface { + ReadConsumerConfig(reader *bytes.Buffer) error + ReadProviderConfig(reader *bytes.Buffer) error +} diff --git a/config/provider_config.go b/config/provider_config.go index a36fd4d0a0..14b77cafb3 100644 --- a/config/provider_config.go +++ b/config/provider_config.go @@ -17,6 +17,10 @@ package config +import ( + "bytes" +) + import ( "github.com/creasty/defaults" perrors "github.com/pkg/errors" @@ -25,6 +29,7 @@ import ( import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/common/yaml" ) ///////////////////////// @@ -45,6 +50,7 @@ type ProviderConfig struct { ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf" ` FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" ` ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" ` + ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"` } // UnmarshalYAML ... @@ -74,13 +80,13 @@ func ProviderInit(confProFile string) error { if len(confProFile) == 0 { return perrors.Errorf("application configure(provider) file name is nil") } - providerConfig = &ProviderConfig{} - err := unmarshalYMLConfig(confProFile, providerConfig) + fileStream, err := yaml.UnmarshalYMLConfig(confProFile, providerConfig) if err != nil { - return perrors.Errorf("yaml.Unmarshal() = error:%v", perrors.WithStack(err)) + return perrors.Errorf("unmarshalYmlConfig error %v", perrors.WithStack(err)) } + providerConfig.fileStream = bytes.NewBuffer(fileStream) //set method interfaceId & interfaceName for k, v := range providerConfig.Services { //set id for reference diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go index 1bf61a942b..d3373e249b 100644 --- a/config_center/nacos/client.go +++ b/config_center/nacos/client.go @@ -157,7 +157,7 @@ func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*N }, } - svrConfList := []nacosconst.ServerConfig{} + svrConfList := make([]nacosconst.ServerConfig, 0, len(n.NacosAddrs)) for _, nacosAddr := range n.NacosAddrs { split := strings.Split(nacosAddr, ":") port, err := strconv.ParseUint(split[1], 10, 64) diff --git a/config_center/nacos/facade.go b/config_center/nacos/facade.go index fc83e14eac..77a79ed091 100644 --- a/config_center/nacos/facade.go +++ b/config_center/nacos/facade.go @@ -46,15 +46,10 @@ type nacosClientFacade interface { common.Node } -func timeSecondDuration(sec int) time.Duration { - return time.Duration(sec) * time.Second -} - // HandleClientRestart Restart client handler func HandleClientRestart(r nacosClientFacade) { var ( - err error - + err error failTimes int ) @@ -79,7 +74,7 @@ LOOP: case <-r.GetDone(): logger.Warnf("(NacosProviderRegistry)reconnectZkRegistry goroutine exit now...") break LOOP - case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * connDelay)): // Prevent crazy reconnection nacos. + case <-getty.GetTimeWheel().After(time.Duration(failTimes*connDelay) * time.Second): // Prevent crazy reconnection nacos. } err = ValidateNacosClient(r, WithNacosName(nacosName)) logger.Infof("NacosProviderRegistry.validateNacosClient(nacosAddr{%s}) = error{%#v}", diff --git a/go.mod b/go.mod index f986ba6adb..27bcc2b931 100644 --- a/go.mod +++ b/go.mod @@ -15,8 +15,10 @@ require ( github.com/dubbogo/getty v1.3.3 github.com/dubbogo/go-zookeeper v1.0.0 github.com/dubbogo/gost v1.5.2 + github.com/emicklei/go-restful/v3 v3.0.0 github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect github.com/go-errors/errors v1.0.1 // indirect + github.com/go-resty/resty/v2 v2.1.0 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/mock v1.3.1 github.com/golang/protobuf v1.3.2 diff --git a/go.sum b/go.sum index 3bbae5009b..cfde0ef1bd 100644 --- a/go.sum +++ b/go.sum @@ -114,6 +114,8 @@ github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74 h1:2MIh github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74/go.mod h1:UqXY1lYT/ERa4OEAywUqdok1T4RCRdArkhic1Opuavo= github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y= github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= +github.com/emicklei/go-restful/v3 v3.0.0 h1:Duxxa4x0WIHW3bYEDmoAPNjmy8Rbqn+utcF74dlF/G8= +github.com/emicklei/go-restful/v3 v3.0.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.8.0 h1:uE6Fp4fOcAJdc1wTQXLJ+SYistkbG1dNoi6Zs1+Ybvk= github.com/envoyproxy/go-control-plane v0.8.0/go.mod h1:GSSbY9P1neVhdY7G4wu+IK1rk/dqhiCC/4ExuWJZVuk= github.com/envoyproxy/protoc-gen-validate v0.0.14 h1:YBW6/cKy9prEGRYLnaGa4IDhzxZhRCtKsax8srGKDnM= @@ -138,6 +140,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-ole/go-ole v1.2.1 h1:2lOsA72HgjxAuMlKpFiCbHTvu44PIVkZ5hqm3RSdI/E= github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8= +github.com/go-resty/resty/v2 v2.1.0 h1:Z6IefCpUMfnvItVJaJXWv/pMiiD11So35QgwEELsldE= +github.com/go-resty/resty/v2 v2.1.0/go.mod h1:dZGr0i9PLlaaTD4H/hoZIDjQ+r6xq8mgbRzHZf7f2J8= github.com/go-sql-driver/mysql v0.0.0-20180618115901-749ddf1598b4 h1:1LlmVz15APoKz9dnm5j2ePptburJlwEH+/v/pUuoxck= github.com/go-sql-driver/mysql v0.0.0-20180618115901-749ddf1598b4/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -497,6 +501,8 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20170807180024-9a379c6b3e95/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= diff --git a/protocol/rest/client/client_impl/resty_client.go b/protocol/rest/client/client_impl/resty_client.go new file mode 100644 index 0000000000..aa6c23137d --- /dev/null +++ b/protocol/rest/client/client_impl/resty_client.go @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client_impl + +import ( + "context" + "net" + "net/http" + "path" + "time" +) + +import ( + "github.com/go-resty/resty/v2" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol/rest/client" +) + +func init() { + extension.SetRestClient(constant.DEFAULT_REST_CLIENT, NewRestyClient) +} + +type RestyClient struct { + client *resty.Client +} + +func NewRestyClient(restOption *client.RestOptions) client.RestClient { + client := resty.New() + client.SetTransport( + &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + c, err := net.DialTimeout(network, addr, restOption.ConnectTimeout) + if err != nil { + return nil, err + } + err = c.SetDeadline(time.Now().Add(restOption.RequestTimeout)) + if err != nil { + return nil, err + } + return c, nil + }, + }) + return &RestyClient{ + client: client, + } +} + +func (rc *RestyClient) Do(restRequest *client.RestRequest, res interface{}) error { + r, err := rc.client.R(). + SetHeader("Content-Type", restRequest.Consumes). + SetHeader("Accept", restRequest.Produces). + SetPathParams(restRequest.PathParams). + SetQueryParams(restRequest.QueryParams). + SetHeaders(restRequest.Headers). + SetBody(restRequest.Body). + SetResult(res). + Execute(restRequest.Method, "http://"+path.Join(restRequest.Location, restRequest.Path)) + if err != nil { + return perrors.WithStack(err) + } + if r.IsError() { + return perrors.New(r.String()) + } + return nil +} diff --git a/protocol/rest/client/rest_client.go b/protocol/rest/client/rest_client.go new file mode 100644 index 0000000000..7d020abc81 --- /dev/null +++ b/protocol/rest/client/rest_client.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client + +import ( + "time" +) + +type RestOptions struct { + RequestTimeout time.Duration + ConnectTimeout time.Duration +} + +type RestRequest struct { + Location string + Path string + Produces string + Consumes string + Method string + PathParams map[string]string + QueryParams map[string]string + Body interface{} + Headers map[string]string +} + +type RestClient interface { + Do(request *RestRequest, res interface{}) error +} diff --git a/protocol/rest/config/reader/rest_config_reader.go b/protocol/rest/config/reader/rest_config_reader.go new file mode 100644 index 0000000000..873af9924b --- /dev/null +++ b/protocol/rest/config/reader/rest_config_reader.go @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reader + +import ( + "bytes" + "strconv" + "strings" +) + +import ( + perrors "github.com/pkg/errors" + "gopkg.in/yaml.v2" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config/interfaces" + "github.com/apache/dubbo-go/protocol/rest/config" +) + +const REST = "rest" + +func init() { + extension.SetConfigReaders(REST, NewRestConfigReader) + extension.SetDefaultConfigReader(REST, REST) +} + +type RestConfigReader struct { +} + +func NewRestConfigReader() interfaces.ConfigReader { + return &RestConfigReader{} +} + +// ReadConsumerConfig read consumer config for rest protocol +func (cr *RestConfigReader) ReadConsumerConfig(reader *bytes.Buffer) error { + restConsumerConfig := &config.RestConsumerConfig{} + err := yaml.Unmarshal(reader.Bytes(), restConsumerConfig) + if err != nil { + return perrors.Errorf("[Rest Config] unmarshal Consumer error %#v", perrors.WithStack(err)) + } + + restConsumerServiceConfigMap := make(map[string]*config.RestServiceConfig, len(restConsumerConfig.RestServiceConfigsMap)) + for key, rc := range restConsumerConfig.RestServiceConfigsMap { + rc.Client = getNotEmptyStr(rc.Client, restConsumerConfig.Client, constant.DEFAULT_REST_CLIENT) + rc.RestMethodConfigsMap = initMethodConfigMap(rc, restConsumerConfig.Consumes, restConsumerConfig.Produces) + restConsumerServiceConfigMap[strings.TrimPrefix(key, "/")] = rc + } + config.SetRestConsumerServiceConfigMap(restConsumerServiceConfigMap) + return nil +} + +// ReadProviderConfig read provider config for rest protocol +func (cr *RestConfigReader) ReadProviderConfig(reader *bytes.Buffer) error { + restProviderConfig := &config.RestProviderConfig{} + err := yaml.Unmarshal(reader.Bytes(), restProviderConfig) + if err != nil { + return perrors.Errorf("[Rest Config] unmarshal Provider error %#v", perrors.WithStack(err)) + } + restProviderServiceConfigMap := make(map[string]*config.RestServiceConfig, len(restProviderConfig.RestServiceConfigsMap)) + for key, rc := range restProviderConfig.RestServiceConfigsMap { + rc.Server = getNotEmptyStr(rc.Server, restProviderConfig.Server, constant.DEFAULT_REST_SERVER) + rc.RestMethodConfigsMap = initMethodConfigMap(rc, restProviderConfig.Consumes, restProviderConfig.Produces) + restProviderServiceConfigMap[strings.TrimPrefix(key, "/")] = rc + } + config.SetRestProviderServiceConfigMap(restProviderServiceConfigMap) + return nil +} + +// initProviderRestConfig ... +func initMethodConfigMap(rc *config.RestServiceConfig, consumes string, produces string) map[string]*config.RestMethodConfig { + mcm := make(map[string]*config.RestMethodConfig, len(rc.RestMethodConfigs)) + for _, mc := range rc.RestMethodConfigs { + mc.InterfaceName = rc.InterfaceName + mc.Path = rc.Path + mc.Path + mc.Consumes = getNotEmptyStr(mc.Consumes, rc.Consumes, consumes) + mc.Produces = getNotEmptyStr(mc.Produces, rc.Produces, produces) + mc.MethodType = getNotEmptyStr(mc.MethodType, rc.MethodType) + mc = transformMethodConfig(mc) + mcm[mc.MethodName] = mc + } + return mcm +} + +// function will return first not empty string .. +func getNotEmptyStr(args ...string) string { + var r string + for _, t := range args { + if len(t) > 0 { + r = t + break + } + } + return r +} + +// transformMethodConfig +func transformMethodConfig(methodConfig *config.RestMethodConfig) *config.RestMethodConfig { + if len(methodConfig.PathParamsMap) == 0 && len(methodConfig.PathParams) > 0 { + paramsMap, err := parseParamsString2Map(methodConfig.PathParams) + if err != nil { + logger.Warnf("[Rest Config] Path Param parse error:%v", err) + } else { + methodConfig.PathParamsMap = paramsMap + } + } + if len(methodConfig.QueryParamsMap) == 0 && len(methodConfig.QueryParams) > 0 { + paramsMap, err := parseParamsString2Map(methodConfig.QueryParams) + if err != nil { + logger.Warnf("[Rest Config] Argument Param parse error:%v", err) + } else { + methodConfig.QueryParamsMap = paramsMap + } + } + if len(methodConfig.HeadersMap) == 0 && len(methodConfig.Headers) > 0 { + headersMap, err := parseParamsString2Map(methodConfig.Headers) + if err != nil { + logger.Warnf("[Rest Config] Argument Param parse error:%v", err) + } else { + methodConfig.HeadersMap = headersMap + } + } + return methodConfig +} + +// transform a string to a map +// for example: +// string "0:id,1:name" => map [0:id,1:name] +func parseParamsString2Map(params string) (map[int]string, error) { + m := make(map[int]string, 8) + for _, p := range strings.Split(params, ",") { + pa := strings.Split(p, ":") + key, err := strconv.Atoi(pa[0]) + if err != nil { + return nil, err + } + m[key] = pa[1] + } + return m, nil +} diff --git a/protocol/rest/config/reader/rest_config_reader_test.go b/protocol/rest/config/reader/rest_config_reader_test.go new file mode 100644 index 0000000000..d2dba40b9b --- /dev/null +++ b/protocol/rest/config/reader/rest_config_reader_test.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reader + +import ( + "bytes" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common/yaml" + "github.com/apache/dubbo-go/protocol/rest/config" +) + +func TestRestConfigReader_ReadConsumerConfig(t *testing.T) { + bs, err := yaml.LoadYMLConfig("./testdata/consumer_config.yml") + assert.NoError(t, err) + configReader := NewRestConfigReader() + err = configReader.ReadConsumerConfig(bytes.NewBuffer(bs)) + assert.NoError(t, err) + assert.NotEmpty(t, config.GetRestConsumerServiceConfigMap()) +} + +func TestRestConfigReader_ReadProviderConfig(t *testing.T) { + bs, err := yaml.LoadYMLConfig("./testdata/provider_config.yml") + assert.NoError(t, err) + configReader := NewRestConfigReader() + err = configReader.ReadProviderConfig(bytes.NewBuffer(bs)) + assert.NoError(t, err) + assert.NotEmpty(t, config.GetRestProviderServiceConfigMap()) +} diff --git a/protocol/rest/config/reader/testdata/consumer_config.yml b/protocol/rest/config/reader/testdata/consumer_config.yml new file mode 100644 index 0000000000..27d7fdafef --- /dev/null +++ b/protocol/rest/config/reader/testdata/consumer_config.yml @@ -0,0 +1,74 @@ +# dubbo client yaml configure file + +filter: "" + +config_type: + rest: "rest" + +# client +request_timeout : "100ms" +# connect timeout +connect_timeout : "100ms" +check: true +rest_server: "resty" +rest_produces: "*/*" +rest_consumes: "*/*" + +# application config +application: + organization : "ikurento.com" + name : "BDTService" + module : "dubbogo user-info client" + version : "0.0.1" + owner : "ZX" + environment : "dev" + +registries : + + "hangzhouzk": + protocol: "zookeeper" + timeout : "3s" + address: "127.0.0.1:2181" + username: "" + password: "" + "shanghaizk": + protocol: "zookeeper" + timeout : "3s" + address: "127.0.0.1:2182" + username: "" + password: "" + +references: + "UserProvider": + registry: "hangzhouzk,shanghaizk" + filter: "" + protocol : "rest" + version: "1.0" + group: "as" + interface : "com.ikurento.user.UserProvider" + url: "dubbo://127.0.0.1:20000/UserProvider" + cluster: "failover" + timeout: "3s" + rest_client: "resty1" + rest_produces: "application/xml" + rest_consumes: "application/xml" + methods : + - name: "GetUser" + retries: "3" + timeout: "5s" + rest_query_params: "1:userid,2:username" + rest_headers: "3:age" + rest_path_params: "4:time,2:name" + rest_body: 0 + rest_produces: "application/xml" + rest_consumes: "application/xml" + + params: + "serviceid": + "soa.com.ikurento.user.UserProvider" + "forks": 5 + +shutdown_conf: + timeout: 60s + step_timeout: 10s + diff --git a/protocol/rest/config/reader/testdata/provider_config.yml b/protocol/rest/config/reader/testdata/provider_config.yml new file mode 100644 index 0000000000..71d056e727 --- /dev/null +++ b/protocol/rest/config/reader/testdata/provider_config.yml @@ -0,0 +1,88 @@ +# dubbo server yaml configure file + +filter: "" + +config_type: + rest: "rest" + +# application config +application: + organization : "ikurento.com" + name : "BDTService" + module : "dubbogo user-info server" + version : "0.0.1" + owner : "ZX" + environment : "dev" + +registries : + "hangzhouzk": + protocol: "zookeeper" + timeout : "3s" + address: "127.0.0.1:2181" + username: "" + password: "" + "shanghaizk": + protocol: "zookeeper" + timeout : "3s" + address: "127.0.0.1:2182" + username: "" + password: "" + +rest_server: "go-restful" +rest_produces: "*/*" +rest_consumes: "*/*" + +services: + "UserProvider": + registry: "hangzhouzk,shanghaizk" + filter: "" + # the name of limiter + tps.limiter: "default" + # the time unit of interval is ms + tps.limit.interval: 60000 + tps.limit.rate: 200 + # the name of strategy + tps.limit.strategy: "slidingWindow" + # the name of RejectedExecutionHandler + tps.limit.rejected.handler: "default" + # the concurrent request limitation of this service + # if the value < 0, it will not be limited. + execute.limit: "200" + # the name of RejectedExecutionHandler + execute.limit.rejected.handler: "default" + protocol : "rest" + # equivalent to interface of dubbo.xml + interface : "com.ikurento.user.UserProvider" + loadbalance: "random" + version: "1.0" + group: "as" + warmup: "100" + cluster: "failover" + rest_server: "go-restful1" + rest_produces: "*/*" + rest_consumes: "*/*" + methods: + - name: "GetUser" + retries: 1 + loadbalance: "random" + # the concurrent request limitation of this method + # if the value < 0, it will not be limited. + execute.limit: "200" + # the name of RejectedExecutionHandler + execute.limit.rejected.handler: "default" + rest_query_params: "1:userid,2:username" + rest_headers: "3:age" + rest_path_params: "4:time,2:name" + rest_body: 0 + rest_produces: "application/xml" + rest_consumes: "application/xml" + +protocols: + "rest": + name: "rest" + ip : "127.0.0.1" + port : 20000 + + + + diff --git a/protocol/rest/config/rest_config.go b/protocol/rest/config/rest_config.go new file mode 100644 index 0000000000..168ec8ce52 --- /dev/null +++ b/protocol/rest/config/rest_config.go @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "github.com/creasty/defaults" +) + +var ( + restConsumerServiceConfigMap map[string]*RestServiceConfig + restProviderServiceConfigMap map[string]*RestServiceConfig +) + +// RestConsumerConfig ... +type RestConsumerConfig struct { + Client string `default:"resty" yaml:"rest_client" json:"rest_client,omitempty" property:"rest_client"` + Produces string `default:"application/json" yaml:"rest_produces" json:"rest_produces,omitempty" property:"rest_produces"` + Consumes string `default:"application/json" yaml:"rest_consumes" json:"rest_consumes,omitempty" property:"rest_consumes"` + RestServiceConfigsMap map[string]*RestServiceConfig `yaml:"references" json:"references,omitempty" property:"references"` +} + +// UnmarshalYAML ... +func (c *RestConsumerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := defaults.Set(c); err != nil { + return err + } + type plain RestConsumerConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + return nil +} + +// RestProviderConfig ... +type RestProviderConfig struct { + Server string `default:"go-restful" yaml:"rest_server" json:"rest_server,omitempty" property:"rest_server"` + Produces string `default:"*/*" yaml:"rest_produces" json:"rest_produces,omitempty" property:"rest_produces"` + Consumes string `default:"*/*" yaml:"rest_consumes" json:"rest_consumes,omitempty" property:"rest_consumes"` + RestServiceConfigsMap map[string]*RestServiceConfig `yaml:"services" json:"services,omitempty" property:"services"` +} + +// UnmarshalYAML ... +func (c *RestProviderConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := defaults.Set(c); err != nil { + return err + } + type plain RestProviderConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + return nil +} + +// RestServiceConfig ... +type RestServiceConfig struct { + InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` + Url string `yaml:"url" json:"url,omitempty" property:"url"` + Path string `yaml:"rest_path" json:"rest_path,omitempty" property:"rest_path"` + Produces string `yaml:"rest_produces" json:"rest_produces,omitempty" property:"rest_produces"` + Consumes string `yaml:"rest_consumes" json:"rest_consumes,omitempty" property:"rest_consumes"` + MethodType string `yaml:"rest_method" json:"rest_method,omitempty" property:"rest_method"` + Client string `yaml:"rest_client" json:"rest_client,omitempty" property:"rest_client"` + Server string `yaml:"rest_server" json:"rest_server,omitempty" property:"rest_server"` + RestMethodConfigs []*RestMethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` + RestMethodConfigsMap map[string]*RestMethodConfig +} + +// UnmarshalYAML ... +func (c *RestServiceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := defaults.Set(c); err != nil { + return err + } + type plain RestServiceConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + return nil +} + +// RestMethodConfig ... +type RestMethodConfig struct { + InterfaceName string + MethodName string `required:"true" yaml:"name" json:"name,omitempty" property:"name"` + Url string `yaml:"url" json:"url,omitempty" property:"url"` + Path string `yaml:"rest_path" json:"rest_path,omitempty" property:"rest_path"` + Produces string `yaml:"rest_produces" json:"rest_produces,omitempty" property:"rest_produces"` + Consumes string `yaml:"rest_consumes" json:"rest_consumes,omitempty" property:"rest_consumes"` + MethodType string `yaml:"rest_method" json:"rest_method,omitempty" property:"rest_method"` + PathParams string `yaml:"rest_path_params" json:"rest_path_params,omitempty" property:"rest_path_params"` + PathParamsMap map[int]string + QueryParams string `yaml:"rest_query_params" json:"rest_query_params,omitempty" property:"rest_query_params"` + QueryParamsMap map[int]string + Body int `default:"-1" yaml:"rest_body" json:"rest_body,omitempty" property:"rest_body"` + Headers string `yaml:"rest_headers" json:"rest_headers,omitempty" property:"rest_headers"` + HeadersMap map[int]string +} + +// UnmarshalYAML ... +func (c *RestMethodConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := defaults.Set(c); err != nil { + return err + } + type plain RestMethodConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + return nil +} + +// GetRestConsumerServiceConfig ... +func GetRestConsumerServiceConfig(path string) *RestServiceConfig { + return restConsumerServiceConfigMap[path] +} + +// GetRestProviderServiceConfig ... +func GetRestProviderServiceConfig(path string) *RestServiceConfig { + return restProviderServiceConfigMap[path] +} + +// SetRestConsumerServiceConfigMap ... +func SetRestConsumerServiceConfigMap(configMap map[string]*RestServiceConfig) { + restConsumerServiceConfigMap = configMap +} + +// SetRestProviderServiceConfigMap ... +func SetRestProviderServiceConfigMap(configMap map[string]*RestServiceConfig) { + restProviderServiceConfigMap = configMap +} + +// GetRestConsumerServiceConfigMap ... +func GetRestConsumerServiceConfigMap() map[string]*RestServiceConfig { + return restConsumerServiceConfigMap +} + +// GetRestProviderServiceConfigMap ... +func GetRestProviderServiceConfigMap() map[string]*RestServiceConfig { + return restProviderServiceConfigMap +} diff --git a/protocol/rest/rest_exporter.go b/protocol/rest/rest_exporter.go new file mode 100644 index 0000000000..470d525ad8 --- /dev/null +++ b/protocol/rest/rest_exporter.go @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "sync" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +type RestExporter struct { + protocol.BaseExporter +} + +func NewRestExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map) *RestExporter { + return &RestExporter{ + BaseExporter: *protocol.NewBaseExporter(key, invoker, exporterMap), + } +} + +func (re *RestExporter) Unexport() { + serviceId := re.GetInvoker().GetUrl().GetParam(constant.BEAN_NAME_KEY, "") + re.BaseExporter.Unexport() + err := common.ServiceMap.UnRegister(REST, serviceId) + if err != nil { + logger.Errorf("[RestExporter.Unexport] error: %v", err) + } + return +} diff --git a/protocol/rest/rest_invoker.go b/protocol/rest/rest_invoker.go new file mode 100644 index 0000000000..0c82035ac5 --- /dev/null +++ b/protocol/rest/rest_invoker.go @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "context" + "fmt" +) + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" + invocation_impl "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/rest/client" + "github.com/apache/dubbo-go/protocol/rest/config" +) + +type RestInvoker struct { + protocol.BaseInvoker + client client.RestClient + restMethodConfigMap map[string]*config.RestMethodConfig +} + +func NewRestInvoker(url common.URL, client *client.RestClient, restMethodConfig map[string]*config.RestMethodConfig) *RestInvoker { + return &RestInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + client: *client, + restMethodConfigMap: restMethodConfig, + } +} + +func (ri *RestInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + inv := invocation.(*invocation_impl.RPCInvocation) + methodConfig := ri.restMethodConfigMap[inv.MethodName()] + var ( + result protocol.RPCResult + body interface{} + pathParams map[string]string + queryParams map[string]string + headers map[string]string + err error + ) + if methodConfig == nil { + result.Err = perrors.Errorf("[RestInvoker] Rest methodConfig:%s is nil", inv.MethodName()) + return &result + } + if pathParams, err = restStringMapTransform(methodConfig.PathParamsMap, inv.Arguments()); err != nil { + result.Err = err + return &result + } + if queryParams, err = restStringMapTransform(methodConfig.QueryParamsMap, inv.Arguments()); err != nil { + result.Err = err + return &result + } + if headers, err = restStringMapTransform(methodConfig.HeadersMap, inv.Arguments()); err != nil { + result.Err = err + return &result + } + if len(inv.Arguments()) > methodConfig.Body && methodConfig.Body >= 0 { + body = inv.Arguments()[methodConfig.Body] + } + + req := &client.RestRequest{ + Location: ri.GetUrl().Location, + Produces: methodConfig.Produces, + Consumes: methodConfig.Consumes, + Method: methodConfig.MethodType, + Path: methodConfig.Path, + PathParams: pathParams, + QueryParams: queryParams, + Body: body, + Headers: headers, + } + result.Err = ri.client.Do(req, inv.Reply()) + if result.Err == nil { + result.Rest = inv.Reply() + } + return &result +} + +func restStringMapTransform(paramsMap map[int]string, args []interface{}) (map[string]string, error) { + resMap := make(map[string]string, len(paramsMap)) + for k, v := range paramsMap { + if k >= len(args) || k < 0 { + return nil, perrors.Errorf("[Rest Invoke] Index %v is out of bundle", k) + } + resMap[v] = fmt.Sprint(args[k]) + } + return resMap, nil +} diff --git a/protocol/rest/rest_invoker_test.go b/protocol/rest/rest_invoker_test.go new file mode 100644 index 0000000000..42a4fbd358 --- /dev/null +++ b/protocol/rest/rest_invoker_test.go @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "context" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/rest/client" + "github.com/apache/dubbo-go/protocol/rest/client/client_impl" + rest_config "github.com/apache/dubbo-go/protocol/rest/config" +) + +func TestRestInvoker_Invoke(t *testing.T) { + // Refer + proto := GetRestProtocol() + defer proto.Destroy() + url, err := common.NewURL("rest://127.0.0.1:8877/com.ikurento.user.UserProvider?anyhost=true&" + + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + + "side=provider&timeout=3000×tamp=1556509797245") + assert.NoError(t, err) + _, err = common.ServiceMap.Register(url.Protocol, &UserProvider{}) + assert.NoError(t, err) + con := config.ProviderConfig{} + config.SetProviderConfig(con) + configMap := make(map[string]*rest_config.RestServiceConfig) + methodConfigMap := make(map[string]*rest_config.RestMethodConfig) + queryParamsMap := make(map[int]string) + queryParamsMap[1] = "age" + queryParamsMap[2] = "name" + pathParamsMap := make(map[int]string) + pathParamsMap[0] = "userid" + headersMap := make(map[int]string) + headersMap[3] = "Content-Type" + methodConfigMap["GetUserOne"] = &rest_config.RestMethodConfig{ + InterfaceName: "", + MethodName: "GetUserOne", + Path: "/GetUserOne", + Produces: "application/json", + Consumes: "application/json", + MethodType: "POST", + PathParams: "", + PathParamsMap: nil, + QueryParams: "", + QueryParamsMap: nil, + Body: 0, + } + methodConfigMap["GetUserTwo"] = &rest_config.RestMethodConfig{ + InterfaceName: "", + MethodName: "GetUserTwo", + Path: "/GetUserTwo", + Produces: "application/json", + Consumes: "application/json", + MethodType: "POST", + PathParams: "", + PathParamsMap: nil, + QueryParams: "", + QueryParamsMap: nil, + Body: 0, + } + methodConfigMap["GetUserThree"] = &rest_config.RestMethodConfig{ + InterfaceName: "", + MethodName: "GetUserThree", + Path: "/GetUserThree", + Produces: "application/json", + Consumes: "application/json", + MethodType: "POST", + PathParams: "", + PathParamsMap: nil, + QueryParams: "", + QueryParamsMap: nil, + Body: 0, + } + methodConfigMap["GetUserFour"] = &rest_config.RestMethodConfig{ + InterfaceName: "", + MethodName: "GetUserFour", + Path: "/GetUserFour", + Produces: "application/json", + Consumes: "application/json", + MethodType: "POST", + PathParams: "", + PathParamsMap: nil, + QueryParams: "", + QueryParamsMap: nil, + Body: 0, + } + methodConfigMap["GetUserFive"] = &rest_config.RestMethodConfig{ + InterfaceName: "", + MethodName: "GetUserFive", + Path: "/GetUserFive", + Produces: "*/*", + Consumes: "*/*", + MethodType: "GET", + } + methodConfigMap["GetUser"] = &rest_config.RestMethodConfig{ + InterfaceName: "", + MethodName: "GetUser", + Path: "/GetUser/{userid}", + Produces: "application/json", + Consumes: "application/json", + MethodType: "GET", + PathParams: "", + PathParamsMap: pathParamsMap, + QueryParams: "", + QueryParamsMap: queryParamsMap, + Body: -1, + HeadersMap: headersMap, + } + + configMap["com.ikurento.user.UserProvider"] = &rest_config.RestServiceConfig{ + Server: "go-restful", + RestMethodConfigsMap: methodConfigMap, + } + rest_config.SetRestProviderServiceConfigMap(configMap) + proxyFactory := extension.GetProxyFactory("default") + proto.Export(proxyFactory.GetInvoker(url)) + time.Sleep(5 * time.Second) + configMap = make(map[string]*rest_config.RestServiceConfig) + configMap["com.ikurento.user.UserProvider"] = &rest_config.RestServiceConfig{ + RestMethodConfigsMap: methodConfigMap, + } + restClient := client_impl.NewRestyClient(&client.RestOptions{ConnectTimeout: 3 * time.Second, RequestTimeout: 3 * time.Second}) + invoker := NewRestInvoker(url, &restClient, methodConfigMap) + user := &User{} + inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUser"), + invocation.WithArguments([]interface{}{1, int32(23), "username", "application/json"}), invocation.WithReply(user)) + res := invoker.Invoke(context.Background(), inv) + assert.NoError(t, res.Error()) + assert.Equal(t, User{Id: 1, Age: int32(23), Name: "username"}, *res.Result().(*User)) + now := time.Now() + inv = invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUserOne"), + invocation.WithArguments([]interface{}{&User{1, &now, int32(23), "username"}}), invocation.WithReply(user)) + res = invoker.Invoke(context.Background(), inv) + assert.NoError(t, res.Error()) + assert.NotNil(t, res.Result()) + assert.Equal(t, 1, res.Result().(*User).Id) + assert.Equal(t, now.Unix(), res.Result().(*User).Time.Unix()) + assert.Equal(t, int32(23), res.Result().(*User).Age) + assert.Equal(t, "username", res.Result().(*User).Name) + // test 1 + inv = invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUserTwo"), + invocation.WithArguments([]interface{}{&User{1, &now, int32(23), "username"}}), invocation.WithReply(user)) + res = invoker.Invoke(context.Background(), inv) + assert.NoError(t, res.Error()) + assert.NotNil(t, res.Result()) + assert.Equal(t, "username", res.Result().(*User).Name) + // test 2 + inv = invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUserThree"), + invocation.WithArguments([]interface{}{&User{1, &now, int32(23), "username"}}), invocation.WithReply(user)) + res = invoker.Invoke(context.Background(), inv) + assert.NoError(t, res.Error()) + assert.NotNil(t, res.Result()) + assert.Equal(t, "username", res.Result().(*User).Name) + // test 3 + inv = invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUserFour"), + invocation.WithArguments([]interface{}{[]User{User{1, nil, int32(23), "username"}}}), invocation.WithReply(user)) + res = invoker.Invoke(context.Background(), inv) + assert.NoError(t, res.Error()) + assert.NotNil(t, res.Result()) + assert.Equal(t, "username", res.Result().(*User).Name) + // test 4 + inv = invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUserFive"), invocation.WithReply(user)) + res = invoker.Invoke(context.Background(), inv) + assert.Error(t, res.Error(), "test error") + + err = common.ServiceMap.UnRegister(url.Protocol, "com.ikurento.user.UserProvider") + assert.NoError(t, err) +} diff --git a/protocol/rest/rest_protocol.go b/protocol/rest/rest_protocol.go new file mode 100644 index 0000000000..47ecb6093b --- /dev/null +++ b/protocol/rest/rest_protocol.go @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "strings" + "sync" + "time" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/rest/client" + _ "github.com/apache/dubbo-go/protocol/rest/client/client_impl" + rest_config "github.com/apache/dubbo-go/protocol/rest/config" + _ "github.com/apache/dubbo-go/protocol/rest/config/reader" + "github.com/apache/dubbo-go/protocol/rest/server" + _ "github.com/apache/dubbo-go/protocol/rest/server/server_impl" +) + +var ( + restProtocol *RestProtocol +) + +const REST = "rest" + +func init() { + extension.SetProtocol(REST, GetRestProtocol) +} + +type RestProtocol struct { + protocol.BaseProtocol + serverLock sync.Mutex + serverMap map[string]server.RestServer + clientLock sync.Mutex + clientMap map[client.RestOptions]client.RestClient +} + +func NewRestProtocol() *RestProtocol { + return &RestProtocol{ + BaseProtocol: protocol.NewBaseProtocol(), + serverMap: make(map[string]server.RestServer, 8), + clientMap: make(map[client.RestOptions]client.RestClient, 8), + } +} + +func (rp *RestProtocol) Export(invoker protocol.Invoker) protocol.Exporter { + url := invoker.GetUrl() + serviceKey := url.ServiceKey() + exporter := NewRestExporter(serviceKey, invoker, rp.ExporterMap()) + restServiceConfig := rest_config.GetRestProviderServiceConfig(strings.TrimPrefix(url.Path, "/")) + if restServiceConfig == nil { + logger.Errorf("%s service doesn't has provider config", url.Path) + return nil + } + rp.SetExporterMap(serviceKey, exporter) + restServer := rp.getServer(url, restServiceConfig.Server) + restServer.Deploy(invoker, restServiceConfig.RestMethodConfigsMap) + return exporter +} + +func (rp *RestProtocol) Refer(url common.URL) protocol.Invoker { + // create rest_invoker + var requestTimeout = config.GetConsumerConfig().RequestTimeout + requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout) + connectTimeout := config.GetConsumerConfig().ConnectTimeout + if t, err := time.ParseDuration(requestTimeoutStr); err == nil { + requestTimeout = t + } + restServiceConfig := rest_config.GetRestConsumerServiceConfig(strings.TrimPrefix(url.Path, "/")) + if restServiceConfig == nil { + logger.Errorf("%s service doesn't has consumer config", url.Path) + return nil + } + restOptions := client.RestOptions{RequestTimeout: requestTimeout, ConnectTimeout: connectTimeout} + restClient := rp.getClient(restOptions, restServiceConfig.Client) + invoker := NewRestInvoker(url, &restClient, restServiceConfig.RestMethodConfigsMap) + rp.SetInvokers(invoker) + return invoker +} + +func (rp *RestProtocol) getServer(url common.URL, serverType string) server.RestServer { + restServer, ok := rp.serverMap[url.Location] + if ok { + return restServer + } + _, ok = rp.ExporterMap().Load(url.ServiceKey()) + if !ok { + panic("[RestProtocol]" + url.ServiceKey() + "is not existing") + } + rp.serverLock.Lock() + defer rp.serverLock.Unlock() + restServer, ok = rp.serverMap[url.Location] + if ok { + return restServer + } + restServer = extension.GetNewRestServer(serverType) + restServer.Start(url) + rp.serverMap[url.Location] = restServer + return restServer +} + +func (rp *RestProtocol) getClient(restOptions client.RestOptions, clientType string) client.RestClient { + restClient, ok := rp.clientMap[restOptions] + if ok { + return restClient + } + rp.clientLock.Lock() + defer rp.clientLock.Unlock() + restClient, ok = rp.clientMap[restOptions] + if ok { + return restClient + } + restClient = extension.GetNewRestClient(clientType, &restOptions) + rp.clientMap[restOptions] = restClient + return restClient +} + +func (rp *RestProtocol) Destroy() { + // destroy rest_server + rp.BaseProtocol.Destroy() + for key, server := range rp.serverMap { + server.Destroy() + delete(rp.serverMap, key) + } + for key := range rp.clientMap { + delete(rp.clientMap, key) + } +} + +func GetRestProtocol() protocol.Protocol { + if restProtocol == nil { + restProtocol = NewRestProtocol() + } + return restProtocol +} diff --git a/protocol/rest/rest_protocol_test.go b/protocol/rest/rest_protocol_test.go new file mode 100644 index 0000000000..8af73a1839 --- /dev/null +++ b/protocol/rest/rest_protocol_test.go @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + _ "github.com/apache/dubbo-go/common/proxy/proxy_factory" + "github.com/apache/dubbo-go/config" + rest_config "github.com/apache/dubbo-go/protocol/rest/config" +) + +func TestRestProtocol_Refer(t *testing.T) { + // Refer + proto := GetRestProtocol() + url, err := common.NewURL("rest://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" + + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + + "side=provider&timeout=3000×tamp=1556509797245") + assert.NoError(t, err) + con := config.ConsumerConfig{ + ConnectTimeout: 5 * time.Second, + RequestTimeout: 5 * time.Second, + } + config.SetConsumerConfig(con) + configMap := make(map[string]*rest_config.RestServiceConfig) + configMap["com.ikurento.user.UserProvider"] = &rest_config.RestServiceConfig{ + Client: "resty", + } + rest_config.SetRestConsumerServiceConfigMap(configMap) + invoker := proto.Refer(url) + + // make sure url + eq := invoker.GetUrl().URLEqual(url) + assert.True(t, eq) + + // make sure invokers after 'Destroy' + invokersLen := len(proto.(*RestProtocol).Invokers()) + assert.Equal(t, 1, invokersLen) + proto.Destroy() + invokersLen = len(proto.(*RestProtocol).Invokers()) + assert.Equal(t, 0, invokersLen) +} + +func TestRestProtocol_Export(t *testing.T) { + // Export + proto := GetRestProtocol() + url, err := common.NewURL("rest://127.0.0.1:8888/com.ikurento.user.UserProvider?anyhost=true&" + + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + + "side=provider&timeout=3000×tamp=1556509797245") + assert.NoError(t, err) + _, err = common.ServiceMap.Register(url.Protocol, &UserProvider{}) + assert.NoError(t, err) + con := config.ProviderConfig{} + config.SetProviderConfig(con) + configMap := make(map[string]*rest_config.RestServiceConfig) + methodConfigMap := make(map[string]*rest_config.RestMethodConfig) + queryParamsMap := make(map[int]string) + queryParamsMap[1] = "age" + queryParamsMap[2] = "name" + pathParamsMap := make(map[int]string) + pathParamsMap[0] = "userid" + methodConfigMap["GetUser"] = &rest_config.RestMethodConfig{ + InterfaceName: "", + MethodName: "GetUser", + Path: "/GetUser/{userid}", + Produces: "application/json", + Consumes: "application/json", + MethodType: "GET", + PathParams: "", + PathParamsMap: pathParamsMap, + QueryParams: "", + QueryParamsMap: queryParamsMap, + Body: -1, + } + configMap["com.ikurento.user.UserProvider"] = &rest_config.RestServiceConfig{ + Server: "go-restful", + RestMethodConfigsMap: methodConfigMap, + } + rest_config.SetRestProviderServiceConfigMap(configMap) + proxyFactory := extension.GetProxyFactory("default") + exporter := proto.Export(proxyFactory.GetInvoker(url)) + // make sure url + eq := exporter.GetInvoker().GetUrl().URLEqual(url) + assert.True(t, eq) + // make sure exporterMap after 'Unexport' + fmt.Println(url.Path) + _, ok := proto.(*RestProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/")) + assert.True(t, ok) + exporter.Unexport() + _, ok = proto.(*RestProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/")) + assert.False(t, ok) + + // make sure serverMap after 'Destroy' + _, ok = proto.(*RestProtocol).serverMap[url.Location] + assert.True(t, ok) + proto.Destroy() + _, ok = proto.(*RestProtocol).serverMap[url.Location] + assert.False(t, ok) + err = common.ServiceMap.UnRegister(url.Protocol, "com.ikurento.user.UserProvider") + assert.NoError(t, err) +} + +type UserProvider struct { +} + +func (p *UserProvider) Reference() string { + return "com.ikurento.user.UserProvider" +} + +func (p *UserProvider) GetUser(ctx context.Context, id int, age int32, name string, contentType string) (*User, error) { + return &User{ + Id: id, + Time: nil, + Age: age, + Name: name, + }, nil +} + +func (p *UserProvider) GetUserOne(ctx context.Context, user *User) (*User, error) { + return user, nil +} + +func (p *UserProvider) GetUserTwo(ctx context.Context, req []interface{}, rsp *User) error { + m := req[0].(map[string]interface{}) + rsp.Name = m["Name"].(string) + return nil +} + +func (p *UserProvider) GetUserThree(ctx context.Context, user interface{}) (*User, error) { + m := user.(map[string]interface{}) + + u := &User{} + u.Name = m["Name"].(string) + return u, nil +} + +func (p *UserProvider) GetUserFour(ctx context.Context, user []interface{}, id string) (*User, error) { + m := user[0].(map[string]interface{}) + + u := &User{} + u.Name = m["Name"].(string) + return u, nil +} + +func (p *UserProvider) GetUserFive(ctx context.Context, user []interface{}) (*User, error) { + return nil, errors.New("test error") +} + +type User struct { + Id int + Time *time.Time + Age int32 + Name string +} diff --git a/protocol/rest/server/rest_server.go b/protocol/rest/server/rest_server.go new file mode 100644 index 0000000000..c10c98a7b6 --- /dev/null +++ b/protocol/rest/server/rest_server.go @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/rest/config" +) + +type RestServer interface { + Start(url common.URL) + Deploy(invoker protocol.Invoker, restMethodConfig map[string]*config.RestMethodConfig) + UnDeploy(restMethodConfig map[string]*config.RestMethodConfig) + Destroy() +} diff --git a/protocol/rest/server/server_impl/go_restful_server.go b/protocol/rest/server/server_impl/go_restful_server.go new file mode 100644 index 0000000000..3ea25531d6 --- /dev/null +++ b/protocol/rest/server/server_impl/go_restful_server.go @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server_impl + +import ( + "context" + "fmt" + "net" + "net/http" + "reflect" + "strconv" + "strings" + "time" +) + +import ( + "github.com/emicklei/go-restful/v3" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/rest/config" + "github.com/apache/dubbo-go/protocol/rest/server" +) + +func init() { + extension.SetRestServer(constant.DEFAULT_REST_SERVER, GetNewGoRestfulServer) +} + +type GoRestfulServer struct { + srv *http.Server + container *restful.Container +} + +func NewGoRestfulServer() *GoRestfulServer { + return &GoRestfulServer{} +} + +func (grs *GoRestfulServer) Start(url common.URL) { + grs.container = restful.NewContainer() + grs.srv = &http.Server{ + Handler: grs.container, + } + ln, err := net.Listen("tcp", url.Location) + if err != nil { + panic(perrors.New(fmt.Sprintf("Restful Server start error:%v", err))) + } + + go func() { + err := grs.srv.Serve(ln) + if err != nil && err != http.ErrServerClosed { + logger.Errorf("[Go Restful] http.server.Serve(addr{%s}) = err{%+v}", url.Location, err) + } + }() +} + +func (grs *GoRestfulServer) Deploy(invoker protocol.Invoker, restMethodConfig map[string]*config.RestMethodConfig) { + svc := common.ServiceMap.GetService(invoker.GetUrl().Protocol, strings.TrimPrefix(invoker.GetUrl().Path, "/")) + for methodName, config := range restMethodConfig { + // get method + method := svc.Method()[methodName] + argsTypes := method.ArgsType() + replyType := method.ReplyType() + ws := new(restful.WebService) + ws.Path(config.Path). + Produces(strings.Split(config.Produces, ",")...). + Consumes(strings.Split(config.Consumes, ",")...). + Route(ws.Method(config.MethodType).To(getFunc(methodName, invoker, argsTypes, replyType, config))) + grs.container.Add(ws) + } + +} + +func getFunc(methodName string, invoker protocol.Invoker, argsTypes []reflect.Type, + replyType reflect.Type, config *config.RestMethodConfig) func(req *restful.Request, resp *restful.Response) { + return func(req *restful.Request, resp *restful.Response) { + var ( + err error + args []interface{} + ) + if (len(argsTypes) == 1 || len(argsTypes) == 2 && replyType == nil) && + argsTypes[0].String() == "[]interface {}" { + args = getArgsInterfaceFromRequest(req, config) + } else { + args = getArgsFromRequest(req, argsTypes, config) + } + result := invoker.Invoke(context.Background(), invocation.NewRPCInvocation(methodName, args, make(map[string]string))) + if result.Error() != nil { + err = resp.WriteError(http.StatusInternalServerError, result.Error()) + if err != nil { + logger.Errorf("[Go Restful] WriteError error:%v", err) + } + return + } + err = resp.WriteEntity(result.Result()) + if err != nil { + logger.Error("[Go Restful] WriteEntity error:%v", err) + } + } +} +func (grs *GoRestfulServer) UnDeploy(restMethodConfig map[string]*config.RestMethodConfig) { + for _, config := range restMethodConfig { + ws := new(restful.WebService) + ws.Path(config.Path) + err := grs.container.Remove(ws) + if err != nil { + logger.Warnf("[Go restful] Remove web service error:%v", err) + } + } +} + +func (grs *GoRestfulServer) Destroy() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := grs.srv.Shutdown(ctx); err != nil { + logger.Errorf("[Go Restful] Server Shutdown:", err) + } + logger.Infof("[Go Restful] Server exiting") +} + +func getArgsInterfaceFromRequest(req *restful.Request, config *config.RestMethodConfig) []interface{} { + argsMap := make(map[int]interface{}, 8) + maxKey := 0 + for k, v := range config.PathParamsMap { + if maxKey < k { + maxKey = k + } + argsMap[k] = req.PathParameter(v) + } + for k, v := range config.QueryParamsMap { + if maxKey < k { + maxKey = k + } + params := req.QueryParameters(v) + if len(params) == 1 { + argsMap[k] = params[0] + } else { + argsMap[k] = params + } + } + for k, v := range config.HeadersMap { + if maxKey < k { + maxKey = k + } + argsMap[k] = req.HeaderParameter(v) + } + if config.Body >= 0 { + if maxKey < config.Body { + maxKey = config.Body + } + m := make(map[string]interface{}) + // TODO read as a slice + if err := req.ReadEntity(&m); err != nil { + logger.Warnf("[Go restful] Read body entity as map[string]interface{} error:%v", perrors.WithStack(err)) + } else { + argsMap[config.Body] = m + } + } + args := make([]interface{}, maxKey+1) + for k, v := range argsMap { + if k >= 0 { + args[k] = v + } + } + return args +} + +func getArgsFromRequest(req *restful.Request, argsTypes []reflect.Type, config *config.RestMethodConfig) []interface{} { + argsLength := len(argsTypes) + args := make([]interface{}, argsLength) + for i, t := range argsTypes { + args[i] = reflect.Zero(t).Interface() + } + var ( + err error + param interface{} + i64 int64 + ) + for k, v := range config.PathParamsMap { + if k < 0 || k >= argsLength { + logger.Errorf("[Go restful] Path param parse error, the args:%v doesn't exist", k) + continue + } + t := argsTypes[k] + kind := t.Kind() + if kind == reflect.Ptr { + t = t.Elem() + } + if kind == reflect.Int { + param, err = strconv.Atoi(req.PathParameter(v)) + } else if kind == reflect.Int32 { + i64, err = strconv.ParseInt(req.PathParameter(v), 10, 32) + if err == nil { + param = int32(i64) + } + } else if kind == reflect.Int64 { + param, err = strconv.ParseInt(req.PathParameter(v), 10, 64) + } else if kind == reflect.String { + param = req.PathParameter(v) + } else { + logger.Warnf("[Go restful] Path param parse error, the args:%v of type isn't int or string", k) + continue + } + if err != nil { + logger.Errorf("[Go restful] Path param parse error, error is %v", err) + continue + } + args[k] = param + } + for k, v := range config.QueryParamsMap { + if k < 0 || k >= argsLength { + logger.Errorf("[Go restful] Query param parse error, the args:%v doesn't exist", k) + continue + } + t := argsTypes[k] + kind := t.Kind() + if kind == reflect.Ptr { + t = t.Elem() + } + if kind == reflect.Slice { + param = req.QueryParameters(v) + } else if kind == reflect.String { + param = req.QueryParameter(v) + } else if kind == reflect.Int { + param, err = strconv.Atoi(req.QueryParameter(v)) + } else if kind == reflect.Int32 { + i64, err = strconv.ParseInt(req.QueryParameter(v), 10, 32) + if err == nil { + param = int32(i64) + } + } else if kind == reflect.Int64 { + param, err = strconv.ParseInt(req.QueryParameter(v), 10, 64) + } else { + logger.Errorf("[Go restful] Query param parse error, the args:%v of type isn't int or string or slice", k) + continue + } + if err != nil { + logger.Errorf("[Go restful] Query param parse error, error is %v", err) + continue + } + args[k] = param + } + + if config.Body >= 0 && config.Body < len(argsTypes) { + t := argsTypes[config.Body] + kind := t.Kind() + if kind == reflect.Ptr { + t = t.Elem() + } + var ni interface{} + if t.String() == "[]interface {}" { + ni = make([]map[string]interface{}, 0) + } else if t.String() == "interface {}" { + ni = make(map[string]interface{}) + } else { + n := reflect.New(t) + if n.CanInterface() { + ni = n.Interface() + } + } + if err := req.ReadEntity(&ni); err != nil { + logger.Errorf("[Go restful] Read body entity error:%v", err) + } else { + args[config.Body] = ni + } + } + + for k, v := range config.HeadersMap { + param := req.HeaderParameter(v) + if k < 0 || k >= argsLength { + logger.Errorf("[Go restful] Header param parse error, the args:%v doesn't exist", k) + continue + } + t := argsTypes[k] + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + if t.Kind() == reflect.String { + args[k] = param + } else { + logger.Errorf("[Go restful] Header param parse error, the args:%v of type isn't string", k) + } + } + + return args +} + +func GetNewGoRestfulServer() server.RestServer { + return NewGoRestfulServer() +} diff --git a/registry/base_registry.go b/registry/base_registry.go index 5b9aef8292..3b64e93e2f 100644 --- a/registry/base_registry.go +++ b/registry/base_registry.go @@ -69,11 +69,20 @@ func init() { */ type FacadeBasedRegistry interface { Registry + + // CreatePath create the path in the registry CreatePath(string) error + // DoRegister actually do the register job DoRegister(string, string) error + // DoSubscribe actually subscribe the URL DoSubscribe(conf *common.URL) (Listener, error) + // CloseAndNilClient close the client and then reset the client in registry to nil + // you should notice that this method will be invoked inside a lock. + // So you should implement this method as light weighted as you can. CloseAndNilClient() + // CloseListener close listeners CloseListener() + // InitListeners init listeners InitListeners() } @@ -153,7 +162,7 @@ func (r *BaseRegistry) service(c common.URL) string { func (r *BaseRegistry) RestartCallBack() bool { // copy r.services - services := []common.URL{} + services := make([]common.URL, 0, len(r.services)) for _, confIf := range r.services { services = append(services, confIf) } @@ -227,9 +236,11 @@ func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string return "", "", perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods) } dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) - r.cltLock.Lock() - err = r.facadeBasedRegistry.CreatePath(dubboPath) - r.cltLock.Unlock() + func() { + r.cltLock.Lock() + defer r.cltLock.Unlock() + err = r.facadeBasedRegistry.CreatePath(dubboPath) + }() if err != nil { logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err)) return "", "", perrors.WithMessagef(err, "facadeBasedRegistry.CreatePath(path:%s)", dubboPath) @@ -251,10 +262,11 @@ func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string logger.Debugf("provider url params:%#v", params) var host string if c.Ip == "" { - host = localIP + ":" + c.Port + host = localIP } else { - host = c.Ip + ":" + c.Port + host = c.Ip } + host += ":" + c.Port rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, params.Encode()) // Print your own registration service providers. @@ -271,17 +283,25 @@ func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string err error ) dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER]) - r.cltLock.Lock() - err = r.facadeBasedRegistry.CreatePath(dubboPath) - r.cltLock.Unlock() + + func() { + r.cltLock.Lock() + defer r.cltLock.Unlock() + err = r.facadeBasedRegistry.CreatePath(dubboPath) + + }() if err != nil { logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) return "", "", perrors.WithStack(err) } dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) - r.cltLock.Lock() - err = r.facadeBasedRegistry.CreatePath(dubboPath) - r.cltLock.Unlock() + + func() { + r.cltLock.Lock() + defer r.cltLock.Unlock() + err = r.facadeBasedRegistry.CreatePath(dubboPath) + }() + if err != nil { logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) return "", "", perrors.WithStack(err) @@ -345,9 +365,9 @@ func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) // closeRegisters close and remove registry client and reset services map func (r *BaseRegistry) closeRegisters() { + logger.Infof("begin to close provider client") r.cltLock.Lock() defer r.cltLock.Unlock() - logger.Infof("begin to close provider client") // Close and remove(set to nil) the registry client r.facadeBasedRegistry.CloseAndNilClient() // reset the services map