Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Triple client&server api #2502

Merged
merged 22 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 71 additions & 61 deletions client/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func getEnv(key, fallback string) string {
return fallback
}

func updateOrCreateMeshURL(opts *ClientOptions) {
func updateOrCreateMeshURL(opts *ReferenceOptions) {
ref := opts.Reference
con := opts.Consumer
con := opts.cliOpts.Consumer

if ref.URL != "" {
logger.Infof("URL specified explicitly %v", ref.URL)
Expand Down Expand Up @@ -84,30 +84,31 @@ func updateOrCreateMeshURL(opts *ClientOptions) {
}

// ReferWithService retrieves invokers from urls.
func (opts *ClientOptions) ReferWithService(srv common.RPCService) {
opts.refer(srv, nil)
func (refOpts *ReferenceOptions) ReferWithService(srv common.RPCService) {
refOpts.refer(srv, nil)
}

func (opts *ClientOptions) ReferWithInfo(info *ClientInfo) {
opts.refer(nil, info)
func (refOpts *ReferenceOptions) ReferWithInfo(info *ClientInfo) {
refOpts.refer(nil, info)
}

func (opts *ClientOptions) ReferWithServiceAndInfo(srv common.RPCService, info *ClientInfo) {
opts.refer(srv, info)
func (refOpts *ReferenceOptions) ReferWithServiceAndInfo(srv common.RPCService, info *ClientInfo) {
refOpts.refer(srv, info)
}

func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) {
ref := opts.Reference
con := opts.Consumer
func (refOpts *ReferenceOptions) refer(srv common.RPCService, info *ClientInfo) {
ref := refOpts.Reference
clientOpts := refOpts.cliOpts
con := clientOpts.Consumer

var methods []string
if info != nil {
ref.InterfaceName = info.InterfaceName
methods = info.MethodNames
opts.id = info.InterfaceName
opts.info = info
refOpts.id = info.InterfaceName
refOpts.info = info
} else {
opts.id = common.GetReference(srv)
refOpts.id = common.GetReference(srv)
}
// If adaptive service is enabled,
// the cluster and load balance should be overridden to "adaptivesvc" and "p2c" respectively.
Expand All @@ -121,9 +122,9 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) {
common.WithPath(ref.InterfaceName),
common.WithProtocol(ref.Protocol),
common.WithMethods(methods),
common.WithParams(opts.getURLMap()),
common.WithParamsValue(constant.BeanNameKey, opts.id),
common.WithParamsValue(constant.MetadataTypeKey, opts.metaDataType),
common.WithParams(refOpts.getURLMap()),
common.WithParamsValue(constant.BeanNameKey, refOpts.id),
common.WithParamsValue(constant.MetadataTypeKey, refOpts.metaDataType),
)
if info != nil {
cfgURL.SetAttribute(constant.ClientInfoKey, info)
Expand All @@ -132,23 +133,23 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) {
if ref.ForceTag {
cfgURL.AddParam(constant.ForceUseTag, "true")
}
opts.postProcessConfig(cfgURL)
refOpts.postProcessConfig(cfgURL)

// if mesh-enabled is set
updateOrCreateMeshURL(opts)
updateOrCreateMeshURL(refOpts)

// retrieving urls from config, and appending the urls to opts.urls
if err := opts.processURL(cfgURL); err != nil {
// retrieving urls from config, and appending the urls to refOpts.urls
if err := refOpts.processURL(cfgURL); err != nil {
panic(err)
}

// Get invokers according to opts.urls
// Get invokers according to refOpts.urls
var (
invoker protocol.Invoker
regURL *common.URL
)
invokers := make([]protocol.Invoker, len(opts.urls))
for i, u := range opts.urls {
invokers := make([]protocol.Invoker, len(refOpts.urls))
for i, u := range refOpts.urls {
if u.Protocol == constant.ServiceRegistryProtocol {
invoker = extension.GetProtocol(constant.RegistryProtocol).Refer(u)
} else {
Expand All @@ -167,17 +168,17 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) {

// TODO(hxmhlt): decouple from directory, config should not depend on directory module
if len(invokers) == 1 {
opts.invoker = invokers[0]
refOpts.invoker = invokers[0]
if ref.URL != "" {
hitClu := constant.ClusterKeyFailover
if u := opts.invoker.GetURL(); u != nil {
if u := refOpts.invoker.GetURL(); u != nil {
hitClu = u.GetParam(constant.ClusterKey, constant.ClusterKeyZoneAware)
}
cluster, err := extension.GetCluster(hitClu)
if err != nil {
panic(err)
} else {
opts.invoker = cluster.Join(static.NewDirectory(invokers))
refOpts.invoker = cluster.Join(static.NewDirectory(invokers))
}
}
} else {
Expand All @@ -196,7 +197,7 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) {
if err != nil {
panic(err)
} else {
opts.invoker = cluster.Join(static.NewDirectory(invokers))
refOpts.invoker = cluster.Join(static.NewDirectory(invokers))
}
}

Expand All @@ -214,29 +215,29 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) {
if asyncSrv, ok := srv.(common.AsyncCallbackService); ok {
callback = asyncSrv.CallBack
}
opts.pxy = extension.GetProxyFactory(con.ProxyFactory).GetAsyncProxy(opts.invoker, callback, cfgURL)
refOpts.pxy = extension.GetProxyFactory(con.ProxyFactory).GetAsyncProxy(refOpts.invoker, callback, cfgURL)
} else {
opts.pxy = extension.GetProxyFactory(con.ProxyFactory).GetProxy(opts.invoker, cfgURL)
refOpts.pxy = extension.GetProxyFactory(con.ProxyFactory).GetProxy(refOpts.invoker, cfgURL)
}
opts.pxy.Implement(srv)
refOpts.pxy.Implement(srv)
}
// this protocol would be destroyed in graceful_shutdown
// please refer to (https://github.com/apache/dubbo-go/issues/2429)
graceful_shutdown.RegisterProtocol(ref.Protocol)
}

func (opts *ClientOptions) processURL(cfgURL *common.URL) error {
ref := opts.Reference
func (refOpts *ReferenceOptions) processURL(cfgURL *common.URL) error {
ref := refOpts.Reference
if ref.URL != "" { // use user-specific urls
/*
Two types of URL are allowed for opts.URL:
Two types of URL are allowed for refOpts.URL:
1. direct url: server IP, that is, no need for a registry anymore
2. registry url
They will be handled in different ways:
For example, we have a direct url and a registry url:
1. "tri://localhost:10000" is a direct url
2. "registry://localhost:2181" is a registry url.
Then, opts.URL looks like a string separated by semicolon: "tri://localhost:10000;registry://localhost:2181".
Then, refOpts.URL looks like a string separated by semicolon: "tri://localhost:10000;registry://localhost:2181".
The result of urlStrings is a string array: []string{"tri://localhost:10000", "registry://localhost:2181"}.
*/
urlStrings := gxstrings.RegSplit(ref.URL, "\\s*[;]+\\s*")
Expand All @@ -247,7 +248,7 @@ func (opts *ClientOptions) processURL(cfgURL *common.URL) error {
}
if serviceURL.Protocol == constant.RegistryProtocol { // serviceURL in this branch is a registry protocol
serviceURL.SubURL = cfgURL
opts.urls = append(opts.urls, serviceURL)
refOpts.urls = append(refOpts.urls, serviceURL)
} else { // serviceURL in this branch is the target endpoint IP address
if serviceURL.Path == "" {
serviceURL.Path = "/" + ref.InterfaceName
Expand All @@ -256,57 +257,60 @@ func (opts *ClientOptions) processURL(cfgURL *common.URL) error {
// other stuff, e.g. IP, port, etc., are same as serviceURL
newURL := common.MergeURL(serviceURL, cfgURL)
newURL.AddParam("peer", "true")
opts.urls = append(opts.urls, newURL)
refOpts.urls = append(refOpts.urls, newURL)
}
}
} else { // use registry configs
opts.urls = config.LoadRegistries(ref.RegistryIDs, opts.registriesCompat, common.CONSUMER)
refOpts.urls = config.LoadRegistries(ref.RegistryIDs, refOpts.registriesCompat, common.CONSUMER)
// set url to regURLs
for _, regURL := range opts.urls {
for _, regURL := range refOpts.urls {
regURL.SubURL = cfgURL
}
}
return nil
}

func (opts *ClientOptions) CheckAvailable() bool {
ref := opts.Reference
if opts.invoker == nil {
func (refOpts *ReferenceOptions) CheckAvailable() bool {
ref := refOpts.Reference
if refOpts.invoker == nil {
logger.Warnf("The interface %s invoker not exist, may you should check your interface config.", ref.InterfaceName)
return false
}
if !opts.invoker.IsAvailable() {
if !refOpts.invoker.IsAvailable() {
return false
}
return true
}

// Implement
// @v is service provider implemented RPCService
func (opts *ClientOptions) Implement(v common.RPCService) {
if opts.pxy != nil {
opts.pxy.Implement(v)
} else if opts.info != nil {
opts.info.ClientInjectFunc(v, &Client{
invoker: opts.invoker,
info: opts.info,
func (refOpts *ReferenceOptions) Implement(v common.RPCService) {
if refOpts.pxy != nil {
refOpts.pxy.Implement(v)
} else if refOpts.info != nil {
refOpts.info.ClientInjectFunc(v, &Client{
cliOpts: refOpts.cliOpts,
info: refOpts.info,
refOpts: map[string]*ReferenceOptions{},
})
}
}

// GetRPCService gets RPCService from proxy
func (opts *ClientOptions) GetRPCService() common.RPCService {
return opts.pxy.Get()
func (refOpts *ReferenceOptions) GetRPCService() common.RPCService {
return refOpts.pxy.Get()
}

// GetProxy gets proxy
func (opts *ClientOptions) GetProxy() *proxy.Proxy {
return opts.pxy
func (refOpts *ReferenceOptions) GetProxy() *proxy.Proxy {
return refOpts.pxy
}

func (opts *ClientOptions) getURLMap() url.Values {
ref := opts.Reference
app := opts.applicationCompat
func (refOpts *ReferenceOptions) getURLMap() url.Values {
ref := refOpts.Reference
app := refOpts.applicationCompat
metrics := refOpts.cliOpts.Metrics
tracing := refOpts.cliOpts.Otel.TracingConfig

urlMap := url.Values{}
// first set user params
Expand Down Expand Up @@ -352,6 +356,12 @@ func (opts *ClientOptions) getURLMap() url.Values {
if ref.Generic != "" {
defaultReferenceFilter = constant.GenericFilterKey + "," + defaultReferenceFilter
}
if metrics.Enable != nil && *metrics.Enable {
defaultReferenceFilter += fmt.Sprintf(",%s", constant.MetricsFilterKey)
}
if tracing.Enable != nil && *tracing.Enable {
defaultReferenceFilter += fmt.Sprintf(",%s", constant.OTELClientTraceKey)
}
urlMap.Set(constant.ReferenceFilterKey, commonCfg.MergeValue(ref.Filter, "", defaultReferenceFilter))

for _, v := range ref.Methods {
Expand All @@ -368,7 +378,7 @@ func (opts *ClientOptions) getURLMap() url.Values {

// todo: figure this out
//// GenericLoad ...
//func (opts *ClientOptions) GenericLoad(id string) {
//func (opts *ReferenceOptions) GenericLoad(id string) {
// genericService := generic.NewGenericService(opts.id)
// config.SetConsumerService(genericService)
// opts.id = id
Expand All @@ -377,12 +387,12 @@ func (opts *ClientOptions) getURLMap() url.Values {
//}

// GetInvoker get invoker from ReferenceConfigs
func (opts *ClientOptions) GetInvoker() protocol.Invoker {
return opts.invoker
func (refOpts *ReferenceOptions) GetInvoker() protocol.Invoker {
return refOpts.invoker
}

// postProcessConfig asks registered ConfigPostProcessor to post-process the current ReferenceConfigs.
func (opts *ClientOptions) postProcessConfig(url *common.URL) {
func (refOpts *ReferenceOptions) postProcessConfig(url *common.URL) {
for _, p := range extension.GetConfigPostProcessors() {
p.PostProcessReferenceConfig(url)
}
Expand Down
34 changes: 25 additions & 9 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,25 @@ package client

import (
"context"
"fmt"
)

import (
"github.com/pkg/errors"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)

type Client struct {
invoker protocol.Invoker
info *ClientInfo
info *ClientInfo

cliOpts *ClientOptions
refOpts map[string]*ReferenceOptions
}

type ClientInfo struct {
Expand All @@ -58,8 +60,13 @@ func (cli *Client) call(ctx context.Context, paramsRawVals []interface{}, interf
if err != nil {
return nil, err
}
// todo: move timeout into context or invocation
return cli.invoker.Invoke(ctx, inv), nil

refOption := cli.refOpts[common.ServiceKey(interfaceName, options.Group, options.Version)]
if refOption == nil {
return nil, fmt.Errorf("no service found for %s/%s:%s, please check if the service has been registered", options.Group, interfaceName, options.Version)
}

return refOption.invoker.Invoke(ctx, inv), nil

}

Expand Down Expand Up @@ -95,15 +102,23 @@ func (cli *Client) CallBidiStream(ctx context.Context, interfaceName, methodName
return res.Result(), res.Error()
}

func (cli *Client) Init(info *ClientInfo) error {
func (cli *Client) Init(info *ClientInfo, opts ...ReferenceOption) (string, string, error) {
if info == nil {
return errors.New("ClientInfo is nil")
return "", "", errors.New("ClientInfo is nil")
}

cli.cliOpts.ReferWithInfo(info)
cli.invoker = cli.cliOpts.invoker
newRefOptions := defaultReferenceOptions()
err := newRefOptions.init(cli, opts...)
if err != nil {
return "", "", err
}

ref := newRefOptions.Reference
cli.refOpts[common.ServiceKey(info.InterfaceName, ref.Group, ref.Version)] = newRefOptions

newRefOptions.ReferWithInfo(info)

return nil
return ref.Group, ref.Version, nil
}

func generateInvocation(methodName string, paramsRawVals []interface{}, callType string, opts *CallOptions) (protocol.Invocation, error) {
Expand All @@ -125,5 +140,6 @@ func NewClient(opts ...ClientOption) (*Client, error) {
}
return &Client{
cliOpts: newCliOpts,
refOpts: make(map[string]*ReferenceOptions),
}, nil
}
Loading