Skip to content

Commit

Permalink
Merge pull request #948 from cvictory/fix/metadata_fail
Browse files Browse the repository at this point in the history
Fix: make metadata report work without serviceDiscovery
  • Loading branch information
AlexStocks authored Jan 23, 2021
2 parents 9a666eb + 9a980ff commit 6c989b9
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 13 deletions.
17 changes: 17 additions & 0 deletions common/extension/metadata_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ import (
)

import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/metadata/service"
)

var (
// there will be two types: local or remote
metadataServiceInsMap = make(map[string]func() (service.MetadataService, error), 2)
// remoteMetadataService
remoteMetadataService service.MetadataService
)

// SetMetadataService will store the msType => creator pair
Expand All @@ -48,3 +51,17 @@ func GetMetadataService(msType string) (service.MetadataService, error) {
"local - github.com/apache/dubbo-go/metadata/service/inmemory, \n"+
"remote - github.com/apache/dubbo-go/metadata/service/remote", msType))
}

// GetRemoteMetadataService will get a RemoteMetadataService instance
func GetRemoteMetadataService() (service.MetadataService, error) {
if remoteMetadataService != nil {
return remoteMetadataService, nil
}
if creator, ok := metadataServiceInsMap["remote"]; ok {
var err error
remoteMetadataService, err = creator()
return remoteMetadataService, err
}
logger.Warn("could not find the metadata service creator for metadataType: remote")
return nil, perrors.New(fmt.Sprintf("could not find the metadata service creator for metadataType: remote"))
}
9 changes: 8 additions & 1 deletion config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
// FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
}

// publish consumer metadata
publishConsumerDefinition(cfgURL)
// create proxy
if c.Async {
callback := GetCallback(c.id)
Expand Down Expand Up @@ -257,6 +258,12 @@ func (c *ReferenceConfig) GenericLoad(id string) {
c.Implement(genericService)
}

func publishConsumerDefinition(url *common.URL) {
if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil {
remoteMetadataService.PublishServiceDefinition(url)
}
}

// postProcessConfig asks registered ConfigPostProcessor to post-process the current ReferenceConfig.
func (c *ReferenceConfig) postProcessConfig(url *common.URL) {
for _, p := range extension.GetConfigPostProcessors() {
Expand Down
7 changes: 7 additions & 0 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (c *ServiceConfig) Export() error {
}
c.exporters = append(c.exporters, exporter)
}
publishServiceDefinition(ivkURL)
}
c.exported.Store(true)
return nil
Expand Down Expand Up @@ -347,6 +348,12 @@ func (c *ServiceConfig) GetExportedUrls() []*common.URL {
return nil
}

func publishServiceDefinition(url *common.URL) {
if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil {
remoteMetadataService.PublishServiceDefinition(url)
}
}

// postProcessConfig asks registered ConfigPostProcessor to post-process the current ServiceConfig.
func (c *ServiceConfig) postProcessConfig(url *common.URL) {
for _, p := range extension.GetConfigPostProcessors() {
Expand Down
4 changes: 4 additions & 0 deletions metadata/report/delegate/delegate_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ type MetadataReport struct {
// NewMetadataReport will create a MetadataReport with initiation
func NewMetadataReport() (*MetadataReport, error) {
url := instance.GetMetadataReportUrl()
if url == nil {
logger.Warn("the metadataReport URL is not configured, you should configure it.")
return nil, perrors.New("the metadataReport URL is not configured, you should configure it.")
}
bmr := &MetadataReport{
reportUrl: url,
syncReport: url.GetParamBool(constant.SYNC_REPORT_KEY, false),
Expand Down
34 changes: 26 additions & 8 deletions metadata/service/remote/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,22 +116,40 @@ func (mts *MetadataService) UnsubscribeURL(url *common.URL) error {
func (mts *MetadataService) PublishServiceDefinition(url *common.URL) error {
interfaceName := url.GetParam(constant.INTERFACE_KEY, "")
isGeneric := url.GetParamBool(constant.GENERIC_KEY, false)
if len(interfaceName) > 0 && !isGeneric {
sv := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey())
sd := definition.BuildServiceDefinition(*sv, url)
if common.RoleType(common.PROVIDER).Role() == url.GetParam(constant.SIDE_KEY, "") {
if len(interfaceName) > 0 && !isGeneric {
sv := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey())
sd := definition.BuildServiceDefinition(*sv, url)
id := &identifier.MetadataIdentifier{
BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
ServiceInterface: interfaceName,
Version: url.GetParam(constant.VERSION_KEY, ""),
Group: url.GetParam(constant.GROUP_KEY, constant.DUBBO),
Side: url.GetParam(constant.SIDE_KEY, constant.PROVIDER_PROTOCOL),
},
}
mts.delegateReport.StoreProviderMetadata(id, sd)
return nil
}
logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url)
} else {
params := make(map[string]string, len(url.GetParams()))
url.RangeParams(func(key, value string) bool {
params[key] = value
return true
})
id := &identifier.MetadataIdentifier{
BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
ServiceInterface: interfaceName,
Version: url.GetParam(constant.VERSION_KEY, ""),
// Group: url.GetParam(constant.GROUP_KEY, constant.SERVICE_DISCOVERY_DEFAULT_GROUP),
Group: url.GetParam(constant.GROUP_KEY, constant.DUBBO),
Side: url.GetParam(constant.SIDE_KEY, "provider"),
Group: url.GetParam(constant.GROUP_KEY, constant.DUBBO),
Side: url.GetParam(constant.SIDE_KEY, "consumer"),
},
}
mts.delegateReport.StoreProviderMetadata(id, sd)
mts.delegateReport.StoreConsumerMetadata(id, params)
return nil
}
logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url)

return nil
}

Expand Down
4 changes: 0 additions & 4 deletions registry/servicediscovery/service_discovery_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,6 @@ func (s *serviceDiscoveryRegistry) Register(url *common.URL) error {
logger.Warnf("The URL[%s] has been registry!", url.String())
}

err = s.metaDataService.PublishServiceDefinition(url)
if err != nil {
return perrors.WithMessage(err, "publish the service definition failed. ")
}
return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""),
url.GetParam(constant.GROUP_KEY, ""),
url.GetParam(constant.Version, ""),
Expand Down

0 comments on commit 6c989b9

Please sign in to comment.