diff --git a/.asf.yaml b/.asf.yaml index db175df5d6..10e852b4ca 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -3,3 +3,29 @@ notifications: issues: notifications@dubbo.apache.org pullrequests: notifications@dubbo.apache.org jira_options: link label link label +github: + homepage: https://dubbo.apache.org/ + description: "Go Implementation For Apache Dubbo ." + labels: + - go + - rpc + - microservices + - http2 + - service-mesh + features: + # Enable wiki for documentation + wiki: true + # Enable issue management + issues: true + # Enable projects for project management boards + projects: true + protected_branches: + master: + # only disable force push + foo: bar + 3.0: + # only disable force push + foo: bar + 3.1: + # only disable force push + foo: bar \ No newline at end of file diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index 58e7625ced..a00607e672 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -50,8 +50,8 @@ jobs: - name: Merge base if: ${{ github.event_name == 'pull_request' }} run: | - git fetch origin 3.0 - git checkout -b 3.0 origin/3.0 + git fetch origin 3.1 + git checkout -b 3.1 origin/3.1 git remote add devrepo https://github.com/${{github.event.pull_request.head.repo.full_name}}.git git fetch devrepo ${{github.event.pull_request.head.sha}} git config --global user.email "dubbo-go@github-ci.com" diff --git a/common/constant/default.go b/common/constant/default.go index 747f409e0a..93ab319d08 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -89,4 +89,5 @@ const ( const ( ServiceDiscoveryDefaultGroup = "DEFAULT_GROUP" + NotAvailable = "N/A" ) diff --git a/common/constant/env.go b/common/constant/env.go index cebf7dec74..89bb95ce72 100644 --- a/common/constant/env.go +++ b/common/constant/env.go @@ -19,8 +19,13 @@ package constant // nolint const ( - ConfigFileEnvKey = "DUBBO_GO_CONFIG_PATH" // key of environment variable dubbogo configure file path - AppLogConfFile = "AppLogConfFile" - PodNameEnvKey = "POD_NAME" - PodNamespaceEnvKey = "POD_NAMESPACE" + ConfigFileEnvKey = "DUBBO_GO_CONFIG_PATH" // key of environment variable dubbogo configure file path + AppLogConfFile = "AppLogConfFile" + PodNameEnvKey = "POD_NAME" + PodNamespaceEnvKey = "POD_NAMESPACE" + ClusterDomainKey = "CLUSTER_DOMAIN" + DefaultClusterDomain = "cluster.local" + DefaultNamespace = "default" + SVC = "svc" + DefaultMeshPort = 80 ) diff --git a/common/constant/key.go b/common/constant/key.go index 4fd53cdb71..ddef397f29 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -161,6 +161,9 @@ const ( RegistrySimplifiedKey = "simplified" RegistryNamespaceKey = "registry.namespace" RegistryGroupKey = "registry.group" + RegistryTypeInterface = "interface" + RegistryTypeService = "service" + RegistryTypeAll = "all" ) const ( diff --git a/common/extension/metadata_report_factory.go b/common/extension/metadata_report_factory.go index 776cd2cbf4..f80b2502d3 100644 --- a/common/extension/metadata_report_factory.go +++ b/common/extension/metadata_report_factory.go @@ -31,7 +31,7 @@ func SetMetadataReportFactory(name string, v func() factory.MetadataReportFactor // GetMetadataReportFactory finds the MetadataReportFactory with @name func GetMetadataReportFactory(name string) factory.MetadataReportFactory { if metaDataReportFactories[name] == nil { - panic("metadata report for " + name + " is not existing, make sure you have import the package.") + return nil } return metaDataReportFactories[name]() } diff --git a/common/metadata_info.go b/common/metadata_info.go index b365a581b5..aad5926b70 100644 --- a/common/metadata_info.go +++ b/common/metadata_info.go @@ -205,7 +205,7 @@ func (si *ServiceInfo) GetMethods() []string { s := si.Params[constant.MethodsKey] return strings.Split(s, ",") } - methods := make([]string, 8) + methods := make([]string, 0, 8) for k, _ := range si.Params { ms := strings.Index(k, ".") if ms > 0 { diff --git a/common/url.go b/common/url.go index 75e207010a..e40b8356dd 100644 --- a/common/url.go +++ b/common/url.go @@ -148,10 +148,10 @@ func WithMethods(methods []string) Option { } } -// WithParams sets params for URL +// WithParams deep copy the params in the argument into params of the target URL func WithParams(params url.Values) Option { return func(url *URL) { - url.params = params + url.SetParams(params) } } @@ -190,6 +190,13 @@ func WithPath(path string) Option { } } +// WithInterface sets interface param for URL +func WithInterface(v string) Option { + return func(url *URL) { + url.SetParam(constant.InterfaceKey, v) + } +} + // WithLocation sets location for URL func WithLocation(location string) Option { return func(url *URL) { @@ -294,6 +301,14 @@ func (c *URL) Version() string { return c.GetParam(constant.VersionKey, "") } +// Address with format "ip:port" +func (c *URL) Address() string { + if c.Port == "" { + return c.Ip + } + return c.Ip + ":" + c.Port +} + // URLEqual judge @URL and @c is equal or not. func (c *URL) URLEqual(url *URL) bool { tmpC := c.Clone() @@ -515,6 +530,23 @@ func (c *URL) GetParam(s string, d string) string { return r } +// GetParamNoDefault gets value by key, return nil,false if no value found mapping to the key +func (c *URL) GetParamNoDefault(s string) (string, bool) { + c.paramsLock.RLock() + defer c.paramsLock.RUnlock() + + var r string + ok := true + if len(c.params) > 0 { + r = c.params.Get(s) + } + if len(r) == 0 { + ok = false + } + + return r, ok +} + // GetParams gets values func (c *URL) GetParams() url.Values { return c.params @@ -693,7 +725,7 @@ func MergeURL(serviceURL *URL, referenceURL *URL) *URL { // iterator the referenceURL if serviceURL not have the key ,merge in // referenceURL usually will not changed. so change RangeParams to GetParams to avoid the string value copy.// Group get group for key, value := range referenceURL.GetParams() { - if v := mergedURL.GetParam(key, ""); len(v) == 0 { + if _, ok := mergedURL.GetParamNoDefault(key); !ok { if len(value) > 0 { params[key] = value } @@ -704,7 +736,7 @@ func MergeURL(serviceURL *URL, referenceURL *URL) *URL { methodConfigMergeFcn := mergeNormalParam(params, referenceURL, []string{constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey}) // remote timestamp - if v := serviceURL.GetParam(constant.TimestampKey, ""); len(v) > 0 { + if v, ok := serviceURL.GetParamNoDefault(constant.TimestampKey); !ok { params[constant.RemoteTimestampKey] = []string{v} params[constant.TimestampKey] = []string{referenceURL.GetParam(constant.TimestampKey, "")} } diff --git a/common/url_test.go b/common/url_test.go index 66f000364c..dc9bccfc1b 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -57,7 +57,7 @@ func TestNewURLWithOptions(t *testing.T) { assert.Equal(t, loopbackAddress, u.Ip) assert.Equal(t, "8080", u.Port) assert.Equal(t, methods, u.Methods) - assert.Equal(t, params, u.params) + assert.Equal(t, 2, len(u.params)) } func TestURL(t *testing.T) { diff --git a/config/config_loader.go b/config/config_loader.go index 99c35b22f1..20cb4e1f2e 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -24,6 +24,8 @@ import ( ) import ( + "github.com/dubbogo/gost/log/logger" + "github.com/knadh/koanf" perrors "github.com/pkg/errors" @@ -90,6 +92,7 @@ func registerServiceInstance() { continue } // publish app level data to registry + logger.Infof("Starting register instance address %v", instance) err := sdr.GetServiceDiscovery().Register(instance) if err != nil { panic(err) diff --git a/config/config_utils.go b/config/config_utils.go index f08d1b901a..fa6b51b2d5 100644 --- a/config/config_utils.go +++ b/config/config_utils.go @@ -116,3 +116,7 @@ func verify(s interface{}) error { func clientNameID(config extension.Config, protocol, address string) string { return strings.Join([]string{config.Prefix(), protocol, address}, "-") } + +func isValid(addr string) bool { + return addr != "" && addr != constant.NotAvailable +} diff --git a/config/consumer_config.go b/config/consumer_config.go index 2fcbb69774..69336d0188 100644 --- a/config/consumer_config.go +++ b/config/consumer_config.go @@ -52,6 +52,7 @@ type ConsumerConfig struct { TracingKey string `yaml:"tracing-key" json:"tracing-key" property:"tracing-key"` FilterConf interface{} `yaml:"filter-conf" json:"filter-conf,omitempty" property:"filter-conf"` MaxWaitTimeForServiceDiscovery string `default:"3s" yaml:"max-wait-time-for-service-discovery" json:"max-wait-time-for-service-discovery,omitempty" property:"max-wait-time-for-service-discovery"` + MeshEnabled bool `yaml:"mesh-enabled" json:"mesh-enabled,omitempty" property:"mesh-enabled"` rootConfig *RootConfig } @@ -239,6 +240,11 @@ func (ccb *ConsumerConfigBuilder) SetFilterConf(filterConf interface{}) *Consume return ccb } +func (ccb *ConsumerConfigBuilder) SetMeshEnabled(meshEnabled bool) *ConsumerConfigBuilder { + ccb.consumerConfig.MeshEnabled = meshEnabled + return ccb +} + func (ccb *ConsumerConfigBuilder) SetRootConfig(rootConfig *RootConfig) *ConsumerConfigBuilder { ccb.consumerConfig.rootConfig = rootConfig return ccb diff --git a/config/instance/metadata_report.go b/config/instance/metadata_report.go index beac76b7b2..1cb5f639cb 100644 --- a/config/instance/metadata_report.go +++ b/config/instance/metadata_report.go @@ -33,18 +33,27 @@ var ( once sync.Once ) -// GetMetadataReportInstance will return the instance in lazy mode. Be careful the instance create will only -// execute once. -func GetMetadataReportInstance(selectiveUrl ...*common.URL) report.MetadataReport { +func GetMetadataReportInstance() report.MetadataReport { + if instance != nil { + return instance + } + + return GetMetadataReportByRegistryProtocol("") +} + +// SetMetadataReportInstance, init metadat report instance +func SetMetadataReportInstance(selectiveUrl ...*common.URL) { once.Do(func() { var url *common.URL if len(selectiveUrl) > 0 { url = selectiveUrl[0] - instance = extension.GetMetadataReportFactory(url.Protocol).CreateMetadataReport(url) + fac := extension.GetMetadataReportFactory(url.Protocol) + if fac != nil { + instance = fac.CreateMetadataReport(url) + } reportUrl = url } }) - return instance } // GetMetadataReportUrl will return the report instance url diff --git a/config/instance/metadata_report_test.go b/config/instance/metadata_report_test.go index 4a56883b4f..e395d93efa 100644 --- a/config/instance/metadata_report_test.go +++ b/config/instance/metadata_report_test.go @@ -40,7 +40,8 @@ func TestGetMetadataReportInstance(t *testing.T) { return &mockMetadataReportFactory{} }) u, _ := common.NewURL("mock://127.0.0.1") - rpt := GetMetadataReportInstance(u) + SetMetadataReportInstance(u) + rpt := GetMetadataReportInstance() assert.NotNil(t, rpt) } diff --git a/config/instance/registry_metadata_report.go b/config/instance/registry_metadata_report.go index 2b8a874602..cd89a69ed3 100644 --- a/config/instance/registry_metadata_report.go +++ b/config/instance/registry_metadata_report.go @@ -21,6 +21,10 @@ import ( "sync" ) +import ( + "github.com/dubbogo/gost/log/logger" +) + import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/extension" @@ -36,6 +40,13 @@ var ( func GetMetadataReportByRegistryProtocol(protocol string) report.MetadataReport { mux.RLock() defer mux.RUnlock() + if protocol == "" { + // return the first instance + for _, regInstance := range regInstances { + return regInstance + } + } + // find the accurate instance regInstance, ok := regInstances[protocol] if !ok { return nil @@ -50,5 +61,11 @@ func SetMetadataReportInstanceByReg(url *common.URL) { if _, ok := regInstances[url.Protocol]; ok { return } - regInstances[url.Protocol] = extension.GetMetadataReportFactory(url.Protocol).CreateMetadataReport(url) + + fac := extension.GetMetadataReportFactory(url.Protocol) + if fac != nil { + regInstances[url.Protocol] = fac.CreateMetadataReport(url) + } else { + logger.Infof("Metadata of type %v not registered.", url.Protocol) + } } diff --git a/config/metadata_report_config.go b/config/metadata_report_config.go index 837f7af8e0..c5ae1fd1a3 100644 --- a/config/metadata_report_config.go +++ b/config/metadata_report_config.go @@ -85,7 +85,7 @@ func (mc *MetadataReportConfig) StartMetadataReport() error { return nil } if tmpUrl, err := mc.ToUrl(); err == nil { - instance.GetMetadataReportInstance(tmpUrl) + instance.SetMetadataReportInstance(tmpUrl) return nil } else { return perrors.Wrap(err, "Start MetadataReport failed.") diff --git a/config/reference_config.go b/config/reference_config.go index d4dfe876ec..95de9641ac 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -20,6 +20,7 @@ package config import ( "fmt" "net/url" + "os" "strconv" "time" ) @@ -27,7 +28,10 @@ import ( import ( "github.com/creasty/defaults" + "github.com/dubbogo/gost/log/logger" gxstrings "github.com/dubbogo/gost/strings" + + constant2 "github.com/dubbogo/triple/pkg/common/constant" ) import ( @@ -43,34 +47,34 @@ import ( // ReferenceConfig is the configuration of service consumer type ReferenceConfig struct { - pxy *proxy.Proxy - id string - InterfaceName string `yaml:"interface" json:"interface,omitempty" property:"interface"` - Check *bool `yaml:"check" json:"check,omitempty" property:"check"` - URL string `yaml:"url" json:"url,omitempty" property:"url"` - Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` - Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"` - RegistryIDs []string `yaml:"registry-ids" json:"registry-ids,omitempty" property:"registry-ids"` - Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"` - Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` - Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` - Group string `yaml:"group" json:"group,omitempty" property:"group"` - Version string `yaml:"version" json:"version,omitempty" property:"version"` - Serialization string `yaml:"serialization" json:"serialization" property:"serialization"` - ProvidedBy string `yaml:"provided_by" json:"provided_by,omitempty" property:"provided_by"` - Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` - Async bool `yaml:"async" json:"async,omitempty" property:"async"` - Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` - invoker protocol.Invoker - urls []*common.URL - Generic string `yaml:"generic" json:"generic,omitempty" property:"generic"` - Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` - RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` - ForceTag bool `yaml:"force.tag" json:"force.tag,omitempty" property:"force.tag"` - TracingKey string `yaml:"tracing-key" json:"tracing-key,omitempty" propertiy:"tracing-key"` - - rootConfig *RootConfig - metaDataType string + pxy *proxy.Proxy + id string + InterfaceName string `yaml:"interface" json:"interface,omitempty" property:"interface"` + Check *bool `yaml:"check" json:"check,omitempty" property:"check"` + URL string `yaml:"url" json:"url,omitempty" property:"url"` + Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` + Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"` + RegistryIDs []string `yaml:"registry-ids" json:"registry-ids,omitempty" property:"registry-ids"` + Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"` + Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` + Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` + Group string `yaml:"group" json:"group,omitempty" property:"group"` + Version string `yaml:"version" json:"version,omitempty" property:"version"` + Serialization string `yaml:"serialization" json:"serialization" property:"serialization"` + ProvidedBy string `yaml:"provided_by" json:"provided_by,omitempty" property:"provided_by"` + Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` + Async bool `yaml:"async" json:"async,omitempty" property:"async"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` + invoker protocol.Invoker + urls []*common.URL + Generic string `yaml:"generic" json:"generic,omitempty" property:"generic"` + Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` + RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` + ForceTag bool `yaml:"force.tag" json:"force.tag,omitempty" property:"force.tag"` + TracingKey string `yaml:"tracing-key" json:"tracing-key,omitempty" propertiy:"tracing-key"` + rootConfig *RootConfig + metaDataType string + MeshProviderPort int `yaml:"mesh-provider-port" json:"mesh-provider-port,omitempty" propertiy:"mesh-provider-port"` } func (rc *ReferenceConfig) Prefix() string { @@ -120,6 +124,41 @@ func (rc *ReferenceConfig) Init(root *RootConfig) error { return verify(rc) } +func getEnv(key, fallback string) string { + if value, ok := os.LookupEnv(key); ok { + return value + } + return fallback +} + +func updateOrCreateMeshURL(rc *ReferenceConfig) { + if rc.URL != "" { + logger.Infof("URL specified explicitly %v", rc.URL) + } + + if !rc.rootConfig.Consumer.MeshEnabled { + return + } + if rc.Protocol != constant2.TRIPLE { + panic(fmt.Sprintf("Mesh mode enabled, Triple protocol expected but %v protocol found!", rc.Protocol)) + } + if rc.ProvidedBy == "" { + panic(fmt.Sprintf("Mesh mode enabled, provided-by should not be empty!")) + } + + podNamespace := getEnv(constant.PodNamespaceEnvKey, constant.DefaultNamespace) + clusterDomain := getEnv(constant.ClusterDomainKey, constant.DefaultClusterDomain) + + var meshPort int + if rc.MeshProviderPort > 0 { + meshPort = rc.MeshProviderPort + } else { + meshPort = constant.DefaultMeshPort + } + + rc.URL = "tri://" + rc.ProvidedBy + "." + podNamespace + constant.SVC + clusterDomain + ":" + strconv.Itoa(meshPort) +} + // Refer retrieves invokers from urls. func (rc *ReferenceConfig) Refer(srv interface{}) { // If adaptive service is enabled, @@ -144,6 +183,9 @@ func (rc *ReferenceConfig) Refer(srv interface{}) { } rc.postProcessConfig(cfgURL) + // if mesh-enabled is set + updateOrCreateMeshURL(rc) + // retrieving urls from config, and appending the urls to rc.urls if rc.URL != "" { // use user-specific urls /* @@ -173,6 +215,7 @@ func (rc *ReferenceConfig) Refer(srv interface{}) { // replace params of serviceURL with params of cfgUrl // other stuff, e.g. IP, port, etc., are same as serviceURL newURL := common.MergeURL(serviceURL, cfgURL) + newURL.AddParam("peer", "true") rc.urls = append(rc.urls, newURL) } } diff --git a/config/registry_config.go b/config/registry_config.go index c8cf359ea5..55f61642d9 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -41,20 +41,22 @@ import ( // RegistryConfig is the configuration of the registry center type RegistryConfig struct { - Protocol string `validate:"required" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` - Timeout string `default:"5s" validate:"required" yaml:"timeout" json:"timeout,omitempty" property:"timeout"` // unit: second - Group string `yaml:"group" json:"group,omitempty" property:"group"` - Namespace string `yaml:"namespace" json:"namespace,omitempty" property:"namespace"` - TTL string `default:"10s" yaml:"ttl" json:"ttl,omitempty" property:"ttl"` // unit: minute - Address string `validate:"required" yaml:"address" json:"address,omitempty" property:"address"` - Username string `yaml:"username" json:"username,omitempty" property:"username"` - Password string `yaml:"password" json:"password,omitempty" property:"password"` - Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"` - Preferred bool `yaml:"preferred" json:"preferred,omitempty" property:"preferred"` // Always use this registry first if set to true, useful when subscribe to multiple registriesConfig - Zone string `yaml:"zone" json:"zone,omitempty" property:"zone"` // The region where the registry belongs, usually used to isolate traffics - Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"` // Affects traffic distribution among registriesConfig, useful when subscribe to multiple registriesConfig Take effect only when no preferred registry is specified. - Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` - RegistryType string `yaml:"registry-type"` + Protocol string `validate:"required" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` + Timeout string `default:"5s" validate:"required" yaml:"timeout" json:"timeout,omitempty" property:"timeout"` // unit: second + Group string `yaml:"group" json:"group,omitempty" property:"group"` + Namespace string `yaml:"namespace" json:"namespace,omitempty" property:"namespace"` + TTL string `default:"10s" yaml:"ttl" json:"ttl,omitempty" property:"ttl"` // unit: minute + Address string `validate:"required" yaml:"address" json:"address,omitempty" property:"address"` + Username string `yaml:"username" json:"username,omitempty" property:"username"` + Password string `yaml:"password" json:"password,omitempty" property:"password"` + Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"` + Preferred bool `yaml:"preferred" json:"preferred,omitempty" property:"preferred"` // Always use this registry first if set to true, useful when subscribe to multiple registriesConfig + Zone string `yaml:"zone" json:"zone,omitempty" property:"zone"` // The region where the registry belongs, usually used to isolate traffics + Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"` // Affects traffic distribution among registriesConfig, useful when subscribe to multiple registriesConfig Take effect only when no preferred registry is specified. + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` + RegistryType string `yaml:"registry-type"` + UseAsMetaReport bool `default:"true" yaml:"use-as-meta-report" json:"use-as-meta-report,omitempty" property:"use-as-meta-report"` + UseAsConfigCenter bool `default:"true" yaml:"use-as-config-center" json:"use-as-config-center,omitempty" property:"use-as-config-center"` } // Prefix dubbo.registries @@ -91,7 +93,7 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values { func (c *RegistryConfig) startRegistryConfig() error { c.translateRegistryAddress() - if GetApplicationConfig().MetadataType == constant.DefaultMetadataStorageType && c.RegistryType == constant.ServiceKey { + if c.UseAsMetaReport && isValid(c.Address) { if tmpUrl, err := c.toMetadataReportUrl(); err == nil { instance.SetMetadataReportInstanceByReg(tmpUrl) } else { @@ -149,11 +151,13 @@ func (c *RegistryConfig) GetInstance(roleType common.RoleType) (registry.Registr func (c *RegistryConfig) toURL(roleType common.RoleType) (*common.URL, error) { address := c.translateRegistryAddress() var registryURLProtocol string - if c.RegistryType == "service" { + if c.RegistryType == constant.RegistryTypeService { // service discovery protocol registryURLProtocol = constant.ServiceRegistryProtocol - } else { + } else if c.RegistryType == constant.RegistryTypeInterface { registryURLProtocol = constant.RegistryProtocol + } else { + registryURLProtocol = constant.ServiceRegistryProtocol } return common.NewURL(registryURLProtocol+"://"+address, common.WithParams(c.getUrlMap(roleType)), @@ -167,6 +171,92 @@ func (c *RegistryConfig) toURL(roleType common.RoleType) (*common.URL, error) { ) } +func (c *RegistryConfig) toURLs(roleType common.RoleType) ([]*common.URL, error) { + address := c.translateRegistryAddress() + var urls []*common.URL + var err error + var registryURL *common.URL + + if !isValid(c.Address) { + logger.Infof("Empty or N/A registry address found, the process will work with no registry enabled " + + "which means that the address of this instance will not be registered and not able to be found by other consumer instances.") + return urls, nil + } + + if c.RegistryType == constant.RegistryTypeService { + // service discovery protocol + if registryURL, err = c.createNewURL(constant.ServiceRegistryProtocol, address, roleType); err == nil { + urls = append(urls, registryURL) + } + } else if c.RegistryType == constant.RegistryTypeInterface { + if registryURL, err = c.createNewURL(constant.RegistryProtocol, address, roleType); err == nil { + urls = append(urls, registryURL) + } + } else if c.RegistryType == constant.RegistryTypeAll { + if registryURL, err = c.createNewURL(constant.ServiceRegistryProtocol, address, roleType); err == nil { + urls = append(urls, registryURL) + } + if registryURL, err = c.createNewURL(constant.RegistryProtocol, address, roleType); err == nil { + urls = append(urls, registryURL) + } + } else { + if registryURL, err = c.createNewURL(constant.ServiceRegistryProtocol, address, roleType); err == nil { + urls = append(urls, registryURL) + } + } + return urls, err +} + +func loadRegistries(registryIds []string, registries map[string]*RegistryConfig, roleType common.RoleType) []*common.URL { + var registryURLs []*common.URL + //trSlice := strings.Split(targetRegistries, ",") + + for k, registryConf := range registries { + target := false + + // if user not config targetRegistries, default load all + // Notice: in func "func Split(s, sep string) []string" comment: + // if s does not contain sep and sep is not empty, SplitAfter returns + // a slice of length 1 whose only element is s. So we have to add the + // condition when targetRegistries string is not set (it will be "" when not set) + if len(registryIds) == 0 || (len(registryIds) == 1 && registryIds[0] == "") { + target = true + } else { + // else if user config targetRegistries + for _, tr := range registryIds { + if tr == k { + target = true + break + } + } + } + + if target { + if urls, err := registryConf.toURLs(roleType); err != nil { + logger.Errorf("The registry id: %s url is invalid, error: %#v", k, err) + panic(err) + } else { + registryURLs = append(registryURLs, urls...) + } + } + } + + return registryURLs +} + +func (c *RegistryConfig) createNewURL(protocol string, address string, roleType common.RoleType) (*common.URL, error) { + return common.NewURL(protocol+"://"+address, + common.WithParams(c.getUrlMap(roleType)), + common.WithParamsValue(constant.RegistrySimplifiedKey, strconv.FormatBool(c.Simplified)), + common.WithParamsValue(constant.RegistryKey, c.Protocol), + common.WithParamsValue(constant.RegistryNamespaceKey, c.Namespace), + common.WithParamsValue(constant.RegistryTimeoutKey, c.Timeout), + common.WithUsername(c.Username), + common.WithPassword(c.Password), + common.WithLocation(c.Address), + ) +} + const ( defaultZKAddr = "127.0.0.1:2181" // default registry address of zookeeper defaultNacosAddr = "127.0.0.1:8848" // the default registry address of nacos diff --git a/config/service_config.go b/config/service_config.go index fe8ba4b97a..96ce5a0491 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -350,43 +350,6 @@ func loadProtocol(protocolIds []string, protocols map[string]*ProtocolConfig) [] return returnProtocols } -func loadRegistries(registryIds []string, registries map[string]*RegistryConfig, roleType common.RoleType) []*common.URL { - var registryURLs []*common.URL - //trSlice := strings.Split(targetRegistries, ",") - - for k, registryConf := range registries { - target := false - - // if user not config targetRegistries, default load all - // Notice: in func "func Split(s, sep string) []string" comment: - // if s does not contain sep and sep is not empty, SplitAfter returns - // a slice of length 1 whose only element is s. So we have to add the - // condition when targetRegistries string is not set (it will be "" when not set) - if len(registryIds) == 0 || (len(registryIds) == 1 && registryIds[0] == "") { - target = true - } else { - // else if user config targetRegistries - for _, tr := range registryIds { - if tr == k { - target = true - break - } - } - } - - if target { - if registryURL, err := registryConf.toURL(roleType); err != nil { - logger.Errorf("The registry id: %s url is invalid, error: %#v", k, err) - panic(err) - } else { - registryURLs = append(registryURLs, registryURL) - } - } - } - - return registryURLs -} - // Unexport will call unexport of all exporters service config exported func (s *ServiceConfig) Unexport() { if !s.exported.Load() { diff --git a/config/service_config_test.go b/config/service_config_test.go index 5a0639f420..da6b5af0dd 100644 --- a/config/service_config_test.go +++ b/config/service_config_test.go @@ -96,7 +96,7 @@ func TestNewServiceConfigBuilder(t *testing.T) { t.Run("loadRegistries&loadProtocol&getRandomPort", func(t *testing.T) { registries := loadRegistries(serviceConfig.RegistryIDs, serviceConfig.RCRegistriesMap, common.PROVIDER) assert.Equal(t, len(registries), 1) - assert.Equal(t, registries[0].Protocol, "registry") + assert.Equal(t, "service-discovery-registry", registries[0].Protocol) assert.Equal(t, registries[0].Port, "8848") assert.Equal(t, registries[0].GetParam("registry.role", "1"), "3") assert.Equal(t, registries[0].GetParam("registry", "zk"), "nacos") diff --git a/metadata/mapping/metadata/service_name_mapping.go b/metadata/mapping/metadata/service_name_mapping.go index ae934ae481..7899cb6cd8 100644 --- a/metadata/mapping/metadata/service_name_mapping.go +++ b/metadata/mapping/metadata/service_name_mapping.go @@ -65,11 +65,12 @@ func (d *MetadataServiceNameMapping) Map(url *common.URL) error { metadataReport := getMetaDataReport(url.GetParam(constant.RegistryKey, "")) if metadataReport == nil { - return perrors.New("get metadata report instance is nil") - } - err := metadataReport.RegisterServiceAppMapping(serviceInterface, defaultGroup, appName) - if err != nil { - return perrors.WithStack(err) + logger.Info("get metadata report instance is nil, metadata service will be enabled!") + } else { + err := metadataReport.RegisterServiceAppMapping(serviceInterface, defaultGroup, appName) + if err != nil { + return perrors.WithStack(err) + } } return nil } diff --git a/metadata/report/nacos/report.go b/metadata/report/nacos/report.go index 655b577040..e55793ea18 100644 --- a/metadata/report/nacos/report.go +++ b/metadata/report/nacos/report.go @@ -211,17 +211,14 @@ func (n *nacosMetadataReport) getConfig(param vo.ConfigParam) (string, error) { // RegisterServiceAppMapping map the specified Dubbo service interface to current Dubbo app name func (n *nacosMetadataReport) RegisterServiceAppMapping(key string, group string, value string) error { - oldVal, err := n.getConfig(vo.ConfigParam{ + oldVal, _ := n.getConfig(vo.ConfigParam{ DataId: key, Group: group, }) - if err != nil { - return err - } - if strings.Contains(oldVal, value) { - return nil - } if oldVal != "" { + if strings.Contains(oldVal, value) { + return nil + } value = oldVal + constant.CommaSeparator + value } return n.storeMetadata(vo.ConfigParam{ diff --git a/metadata/service/remote/service_test.go b/metadata/service/remote/service_test.go index fa4a17e984..49b296a0df 100644 --- a/metadata/service/remote/service_test.go +++ b/metadata/service/remote/service_test.go @@ -113,7 +113,7 @@ func TestMetadataService(t *testing.T) { extension.SetMetadataReportFactory("mock", getMetadataReportFactory) u, err := common.NewURL("mock://127.0.0.1:20000/?sync.report=true") assert.NoError(t, err) - instance.GetMetadataReportInstance(u) + instance.SetMetadataReportInstance(u) mts, err := GetRemoteMetadataService() assert.NoError(t, err) assert.NotNil(t, mts) diff --git a/registry/directory/directory.go b/registry/directory/directory.go index bef438b63c..ecac8bac27 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -92,12 +92,11 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (director dir.consumerConfigurationListener = newConsumerConfigurationListener(dir) - go dir.subscribe(url.SubURL) return dir, nil } // subscribe from registry -func (dir *RegistryDirectory) subscribe(url *common.URL) { +func (dir *RegistryDirectory) Subscribe(url *common.URL) { logger.Debugf("subscribe service :%s for RegistryDirectory.", url.Key()) dir.consumerConfigurationListener.addNotifyListener(dir) dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url) diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 33814430ad..493162b674 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -170,7 +170,7 @@ func normalRegistryDir(noMockEvent ...bool) (*RegistryDirectory, *registry.MockR mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) dir, _ := NewRegistryDirectory(url, mockRegistry) - go dir.(*RegistryDirectory).subscribe(suburl) + go dir.(*RegistryDirectory).Subscribe(suburl) if len(noMockEvent) == 0 { for i := 0; i < 3; i++ { mockRegistry.(*registry.MockRegistry).MockEvent( diff --git a/registry/event/service_instances_changed_listener_impl.go b/registry/event/service_instances_changed_listener_impl.go index a9ce70a52f..b9277e1be9 100644 --- a/registry/event/service_instances_changed_listener_impl.go +++ b/registry/event/service_instances_changed_listener_impl.go @@ -68,6 +68,8 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e observer.Event) error protocolRevisionsToUrls := make(map[string]map[*gxset.HashSet][]*common.URL) newServiceURLs := make(map[string][]*common.URL) + logger.Infof("Received instance notification event of service %s, instance list size %s", ce.ServiceName, len(ce.Instances)) + for _, instances := range lstn.allInstances { for _, instance := range instances { if instance.GetMetadata() == nil { @@ -103,27 +105,27 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e observer.Event) error } lstn.revisionToMetadata = newRevisionToMetadata - for serviceInstance, revisions := range localServiceToRevisions { - revisionsToUrls := protocolRevisionsToUrls[serviceInstance.Protocol] + for serviceInfo, revisions := range localServiceToRevisions { + revisionsToUrls := protocolRevisionsToUrls[serviceInfo.Protocol] if revisionsToUrls == nil { - protocolRevisionsToUrls[serviceInstance.Protocol] = make(map[*gxset.HashSet][]*common.URL) - revisionsToUrls = protocolRevisionsToUrls[serviceInstance.Protocol] + protocolRevisionsToUrls[serviceInfo.Protocol] = make(map[*gxset.HashSet][]*common.URL) + revisionsToUrls = protocolRevisionsToUrls[serviceInfo.Protocol] } urls := revisionsToUrls[revisions] if urls != nil { - newServiceURLs[serviceInstance.GetMatchKey()] = urls + newServiceURLs[serviceInfo.GetMatchKey()] = urls } else { urls = make([]*common.URL, 0, 8) for _, v := range revisions.Values() { r := v.(string) for _, i := range revisionToInstances[r] { if i != nil { - urls = append(urls, i.ToURLs()...) + urls = append(urls, i.ToURLs(serviceInfo)...) } } } revisionsToUrls[revisions] = urls - newServiceURLs[serviceInstance.GetMatchKey()] = urls + newServiceURLs[serviceInfo.GetMatchKey()] = urls } } lstn.serviceUrls = newServiceURLs diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 1b0d390d09..ae0b2d0c2a 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -42,7 +42,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3/health" "dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper" "dubbo.apache.org/dubbo-go/v3/registry" - _ "dubbo.apache.org/dubbo-go/v3/registry/directory" + "dubbo.apache.org/dubbo-go/v3/registry/directory" "dubbo.apache.org/dubbo-go/v3/remoting" ) @@ -149,12 +149,19 @@ func (proto *registryProtocol) Refer(url *common.URL) protocol.Invoker { reg := proto.getRegistry(url) // new registry directory for store service url from registry - directory, err := extension.GetDefaultRegistryDirectory(registryUrl, reg) + dic, err := extension.GetDefaultRegistryDirectory(registryUrl, reg) if err != nil { logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!", serviceUrl.String(), err.Error()) return nil } + // TODO, refactor to avoid type conversion + regDic, ok := dic.(*directory.RegistryDirectory) + if !ok { + logger.Errorf("Directory %v is expected to implement Directory, and will return nil invoker!", dic) + return nil + } + regDic.Subscribe(registryUrl.SubURL) err = reg.Register(serviceUrl) if err != nil { @@ -168,7 +175,7 @@ func (proto *registryProtocol) Refer(url *common.URL) protocol.Invoker { if err != nil { panic(err) } - invoker := cluster.Join(directory) + invoker := cluster.Join(dic) return invoker } diff --git a/registry/service_instance.go b/registry/service_instance.go index 6dc1061c19..ca1a8ff129 100644 --- a/registry/service_instance.go +++ b/registry/service_instance.go @@ -57,7 +57,7 @@ type ServiceInstance interface { GetMetadata() map[string]string // ToURLs will return a list of url - ToURLs() []*common.URL + ToURLs(service *common.ServiceInfo) []*common.URL // GetEndPoints will get end points from metadata GetEndPoints() []*Endpoint @@ -91,6 +91,7 @@ type DefaultServiceInstance struct { ServiceMetadata *common.MetadataInfo Address string GroupName string + endpoints []*Endpoint `json:"-"` } // GetID will return this instance's id. It should be unique. @@ -142,11 +143,29 @@ func (d *DefaultServiceInstance) SetServiceMetadata(m *common.MetadataInfo) { } // ToURLs return a list of url. -func (d *DefaultServiceInstance) ToURLs() []*common.URL { +func (d *DefaultServiceInstance) ToURLs(service *common.ServiceInfo) []*common.URL { urls := make([]*common.URL, 0, 8) - for _, service := range d.ServiceMetadata.Services { + if d.endpoints == nil { + err := json.Unmarshal([]byte(d.Metadata[constant.ServiceInstanceEndpoints]), &d.endpoints) + if err != nil { + logger.Errorf("Error parsing endpoints of service instance v%, multiple protocol services might not be able to work properly, err is v%.", d, err) + } + } + + if len(d.endpoints) > 0 { + for _, endpoint := range d.endpoints { + if endpoint.Protocol == service.Protocol { + url := common.NewURLWithOptions(common.WithProtocol(service.Protocol), + common.WithIp(d.Host), common.WithPort(strconv.Itoa(endpoint.Port)), + common.WithPath(service.Name), common.WithInterface(service.Name), + common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams())) + urls = append(urls, url) + } + } + } else { url := common.NewURLWithOptions(common.WithProtocol(service.Protocol), common.WithIp(d.Host), common.WithPort(strconv.Itoa(d.Port)), + common.WithPath(service.Name), common.WithInterface(service.Name), common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams())) urls = append(urls, url) } diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 6d1362b749..d24e86cf6b 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -197,18 +197,23 @@ func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.No } services := s.getServices(url) if services.Empty() { - return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+ - "subscription url:%s", url.String()) + return perrors.Errorf("Should has at least one way to know which services this interface belongs to,"+ + " either specify 'provided-by' for reference or enable metadata-report center subscription url:%s", url.String()) } // FIXME ServiceNames.String() is not good serviceNamesKey := services.String() - protocolServiceKey := url.ServiceKey() + ":" + url.Protocol + protocol := "tri" // consume "tri" protocol by default, other protocols need to be specified on reference/consumer explicitly + if url.Protocol != "" { + protocol = url.Protocol + } + protocolServiceKey := url.ServiceKey() + ":" + protocol listener := s.serviceListeners[serviceNamesKey] if listener == nil { listener = event.NewServiceInstancesChangedListener(services) for _, serviceNameTmp := range services.Values() { serviceName := serviceNameTmp.(string) instances := s.serviceDiscovery.GetInstances(serviceName) + logger.Infof("Synchronized instance notification on subscription, instance list size %s", len(instances)) err = listener.OnEvent(®istry.ServiceInstancesChangedEvent{ ServiceName: serviceName, Instances: instances, diff --git a/remoting/zookeeper/curator_discovery/service_discovery.go b/remoting/zookeeper/curator_discovery/service_discovery.go index d6bb25f902..f57516f2f2 100644 --- a/remoting/zookeeper/curator_discovery/service_discovery.go +++ b/remoting/zookeeper/curator_discovery/service_discovery.go @@ -74,6 +74,14 @@ func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error { if err != nil { return err } + + err = sd.client.Delete(path) + if err != nil { + logger.Infof("Failed when trying to delete node %s, will continue with the registration process. "+ + "This is designed to avoid previous ephemeral node hold the position,"+ + " so it's normal for this action to fail because the node might not exist or has been deleted, error msg is %s.", path, err.Error()) + } + err = sd.client.CreateTempWithValue(path, data) if err == zk.ErrNodeExists { _, state, _ := sd.client.GetContent(path) @@ -274,6 +282,11 @@ func (sd *ServiceDiscovery) pathForInstance(name, id string) string { return path.Join(sd.basePath, name, id) } +// nolint +func (sd *ServiceDiscovery) prefixPathForInstance(name string) string { + return path.Join(sd.basePath, name) +} + // nolint func (sd *ServiceDiscovery) pathForName(name string) string { return path.Join(sd.basePath, name)