Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr: Develop req timeout config #284

Merged
merged 13 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL {
return true
})
//loadBalance,cluster,retries strategy config
methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY})
methodConfigMergeFcn := mergeNormalParam(mergedUrl, referenceUrl, []string{constant.LOADBALANCE_KEY, constant.CLUSTER_KEY, constant.RETRIES_KEY, constant.TIMEOUT_KEY})

//remote timestamp
if v := serviceUrl.GetParam(constant.TIMESTAMP_KEY, ""); len(v) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions common/url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestMergeUrl(t *testing.T) {
serviceUrlParams.Set("test2", "1")
serviceUrlParams.Set(constant.CLUSTER_KEY, "roundrobin")
serviceUrlParams.Set(constant.RETRIES_KEY, "2")
serviceUrlParams.Set("methods.testMethod."+constant.RETRIES_KEY, "2")
serviceUrlParams.Set(constant.METHOD_KEYS+".testMethod."+constant.RETRIES_KEY, "2")
referenceUrl, _ := NewURL(context.TODO(), "mock1://127.0.0.1:1111", WithParams(referenceUrlParams), WithMethods([]string{"testMethod"}))
serviceUrl, _ := NewURL(context.TODO(), "mock2://127.0.0.1:20000", WithParams(serviceUrlParams))

Expand All @@ -248,7 +248,7 @@ func TestMergeUrl(t *testing.T) {
assert.Equal(t, "1", mergedUrl.GetParam("test2", ""))
assert.Equal(t, "1", mergedUrl.GetParam("test3", ""))
assert.Equal(t, "1", mergedUrl.GetParam(constant.RETRIES_KEY, ""))
assert.Equal(t, "1", mergedUrl.GetParam("methods.testMethod."+constant.RETRIES_KEY, ""))
assert.Equal(t, "2", mergedUrl.GetParam(constant.METHOD_KEYS+".testMethod."+constant.RETRIES_KEY, ""))
}

func TestURL_SetParams(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions config/method_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type MethodConfig struct {
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
}

func (c *MethodConfig) Prefix() string {
Expand Down
49 changes: 28 additions & 21 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,28 @@ import (
)

type ReferenceConfig struct {
context context.Context
pxy *proxy.Proxy
id string
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Url string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Async bool `yaml:"async" json:"async,omitempty" property:"async"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
invoker protocol.Invoker
urls []*common.URL
Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"`
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
context context.Context
pxy *proxy.Proxy
id string
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Url string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Async bool `yaml:"async" json:"async,omitempty" property:"async"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
invoker protocol.Invoker
urls []*common.URL
Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"`
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
}

func (c *ReferenceConfig) Prefix() string {
Expand Down Expand Up @@ -174,6 +175,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.VERSION_KEY, refconfig.Version)
urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic))
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
if len(refconfig.RequestTimeout) != 0 {
urlMap.Set(constant.TIMEOUT_KEY, refconfig.RequestTimeout)
pantianying marked this conversation as resolved.
Show resolved Hide resolved
}
//getty invoke async or sync
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async))
urlMap.Set(constant.STICKY_KEY, strconv.FormatBool(refconfig.Sticky))
Expand All @@ -198,6 +202,9 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance)
urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries)
urlMap.Set("methods."+v.Name+"."+constant.STICKY_KEY, strconv.FormatBool(v.Sticky))
if len(v.RequestTimeout) != 0 {
urlMap.Set("methods."+v.Name+"."+constant.TIMEOUT_KEY, v.RequestTimeout)
}
}

return urlMap
Expand Down
23 changes: 19 additions & 4 deletions config/testdata/consumer_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ references:
interface : "com.ikurento.user.UserProvider"
url: "dubbo://127.0.0.1:20000/UserProvider"
cluster: "failover"
timeout: "3s"
methods :
- name: "GetUser"
retries: "3"
timeout: "5s"
params:
"serviceid":
"soa.com.ikurento.user.UserProvider"
Expand All @@ -54,19 +56,30 @@ shutdown_conf:
step_timeout: 10s

protocol_conf:
# when you choose the Dubbo protocol, the following configuration takes effect
dubbo:
reconnect_interval: 0
# reconnect_interval is the actual number of connections a session can use
connection_number: 2
heartbeat_period: "5s"
session_timeout: "20s"
pool_size: 64
# heartbeat_period is heartbeat interval between server and client connection.
# Effective by client configuration
heartbeat_period: "30s"
# when the session is inactive for more than session_timeout, the session may be closed
session_timeout: "30s"
# a reference has the size of the session connection pool
# that is the maximum number of sessions it may have
pool_size: 4
# dubbo-go uses getty as the network connection library.
# The following is the relevant configuration of getty
pool_ttl: 600
# gr_pool_size is recommended to be set to [cpu core number] * 100
gr_pool_size: 1200
# queue_len is recommended to be set to 64 or 128
queue_len: 64
# queue_number is recommended to be set to gr_pool_size / 20
queue_number: 60
# dubbo-go uses getty as the network connection library.
# The following is the relevant configuration of getty
getty_session_param:
compress_encoding: false
tcp_no_delay: true
Expand All @@ -78,5 +91,7 @@ protocol_conf:
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
max_msg_len: 1024
# maximum len of data per request
# this refers to the total amount of data requested or returned
max_msg_len: 102400
session_name: "client"
8 changes: 8 additions & 0 deletions protocol/dubbo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,15 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac
p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "")
p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "")
p.Service.Method = request.method

p.Service.Timeout = c.opts.RequestTimeout
var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "")
if len(timeout) != 0 {
if t, err := time.ParseDuration(timeout); err == nil {
p.Service.Timeout = t
}
}

p.Header.SerialID = byte(S_Dubbo)
p.Body = hessian.NewRequest(request.args, request.atta)

Expand Down
12 changes: 11 additions & 1 deletion protocol/dubbo/dubbo_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package dubbo

import (
"sync"
"time"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
Expand Down Expand Up @@ -67,9 +69,17 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
}

func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
//default requestTimeout
var requestTimeout = config.GetConsumerConfig().RequestTimeout

pantianying marked this conversation as resolved.
Show resolved Hide resolved
requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout)
if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
requestTimeout = t
}

invoker := NewDubboInvoker(url, NewClient(Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
RequestTimeout: config.GetConsumerConfig().RequestTimeout,
RequestTimeout: requestTimeout,
}))
dp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
Expand Down
12 changes: 11 additions & 1 deletion protocol/jsonrpc/jsonrpc_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ package jsonrpc
import (
"strings"
"sync"
"time"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
Expand Down Expand Up @@ -66,9 +68,17 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
}

func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker {
//default requestTimeout
var requestTimeout = config.GetConsumerConfig().RequestTimeout

requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout)
if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
requestTimeout = t
}

invoker := NewJsonrpcInvoker(url, NewHTTPClient(&HTTPOptions{
HandshakeTimeout: config.GetConsumerConfig().ConnectTimeout,
HTTPTimeout: config.GetConsumerConfig().RequestTimeout,
HTTPTimeout: requestTimeout,
}))
jp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
Expand Down