diff --git a/config/testdata/consumer_config.yml b/config/testdata/consumer_config.yml index 85e0eb2fe9..373871dccb 100644 --- a/config/testdata/consumer_config.yml +++ b/config/testdata/consumer_config.yml @@ -56,6 +56,12 @@ protocol_conf: session_timeout: "20s" pool_size: 64 pool_ttl: 600 + # gr_pool_size is recommended to be set to [cpu core number] * 100 + gr_pool_size: 1200 + # queue_len is recommended to be set to 64 or 128 + queue_len: 64 + # queue_number is recommended to be set to gr_pool_size / 20 + queue_number: 60 getty_session_param: compress_encoding: false tcp_no_delay: true @@ -63,7 +69,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/config/testdata/consumer_config_with_configcenter.yml b/config/testdata/consumer_config_with_configcenter.yml index 1a33ec3971..c6505e4492 100644 --- a/config/testdata/consumer_config_with_configcenter.yml +++ b/config/testdata/consumer_config_with_configcenter.yml @@ -30,7 +30,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/config/testdata/provider_config.yml b/config/testdata/provider_config.yml index 3b6e43a51a..e135860c20 100644 --- a/config/testdata/provider_config.yml +++ b/config/testdata/provider_config.yml @@ -56,6 +56,12 @@ protocol_conf: dubbo: session_number: 700 session_timeout: "20s" + # gr_pool_size is recommended to be set to [cpu core number] * 10 + gr_pool_size: 120 + # queue_len is recommended to be set to 64 or 128 + queue_len: 64 + # queue_number is recommended to be set to gr_pool_size / 20 + queue_number: 6 getty_session_param: compress_encoding: false tcp_no_delay: true @@ -63,7 +69,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/examples/dubbo/go-client/profiles/dev/client.yml b/examples/dubbo/go-client/profiles/dev/client.yml index e1131df77e..ff69668841 100644 --- a/examples/dubbo/go-client/profiles/dev/client.yml +++ b/examples/dubbo/go-client/profiles/dev/client.yml @@ -75,7 +75,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/examples/dubbo/go-client/profiles/release/client.yml b/examples/dubbo/go-client/profiles/release/client.yml index dabde06c75..b4d897fda2 100644 --- a/examples/dubbo/go-client/profiles/release/client.yml +++ b/examples/dubbo/go-client/profiles/release/client.yml @@ -75,7 +75,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/examples/dubbo/go-client/profiles/test/client.yml b/examples/dubbo/go-client/profiles/test/client.yml index e0b742eff4..c8b5c58691 100644 --- a/examples/dubbo/go-client/profiles/test/client.yml +++ b/examples/dubbo/go-client/profiles/test/client.yml @@ -75,7 +75,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/examples/dubbo/go-server/profiles/dev/server.yml b/examples/dubbo/go-server/profiles/dev/server.yml index 2f0decaec6..79c2cb2cc2 100644 --- a/examples/dubbo/go-server/profiles/dev/server.yml +++ b/examples/dubbo/go-server/profiles/dev/server.yml @@ -84,7 +84,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/examples/dubbo/go-server/profiles/release/server.yml b/examples/dubbo/go-server/profiles/release/server.yml index 92306b7067..6890ed3bdb 100644 --- a/examples/dubbo/go-server/profiles/release/server.yml +++ b/examples/dubbo/go-server/profiles/release/server.yml @@ -83,7 +83,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/examples/dubbo/go-server/profiles/test/server.yml b/examples/dubbo/go-server/profiles/test/server.yml index 03140d6bde..b6dd41da44 100644 --- a/examples/dubbo/go-server/profiles/test/server.yml +++ b/examples/dubbo/go-server/profiles/test/server.yml @@ -83,7 +83,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/examples/dubbo/with-configcenter-go-client/profiles/dev/client.yml b/examples/dubbo/with-configcenter-go-client/profiles/dev/client.yml index 1ebdf5bb5c..c8e7bd0b05 100644 --- a/examples/dubbo/with-configcenter-go-client/profiles/dev/client.yml +++ b/examples/dubbo/with-configcenter-go-client/profiles/dev/client.yml @@ -30,7 +30,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/examples/dubbo/with-configcenter-go-client/profiles/release/client.yml b/examples/dubbo/with-configcenter-go-client/profiles/release/client.yml index 1ebdf5bb5c..c8e7bd0b05 100644 --- a/examples/dubbo/with-configcenter-go-client/profiles/release/client.yml +++ b/examples/dubbo/with-configcenter-go-client/profiles/release/client.yml @@ -30,7 +30,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/examples/dubbo/with-configcenter-go-client/profiles/test/client.yml b/examples/dubbo/with-configcenter-go-client/profiles/test/client.yml index 1ebdf5bb5c..c8e7bd0b05 100644 --- a/examples/dubbo/with-configcenter-go-client/profiles/test/client.yml +++ b/examples/dubbo/with-configcenter-go-client/profiles/test/client.yml @@ -30,7 +30,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/examples/dubbo/with-configcenter-go-server/profiles/dev/server.yml b/examples/dubbo/with-configcenter-go-server/profiles/dev/server.yml index a2ae2fe5d8..cdaaca4c38 100644 --- a/examples/dubbo/with-configcenter-go-server/profiles/dev/server.yml +++ b/examples/dubbo/with-configcenter-go-server/profiles/dev/server.yml @@ -31,7 +31,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/examples/dubbo/with-configcenter-go-server/profiles/release/server.yml b/examples/dubbo/with-configcenter-go-server/profiles/release/server.yml index a2ae2fe5d8..cdaaca4c38 100644 --- a/examples/dubbo/with-configcenter-go-server/profiles/release/server.yml +++ b/examples/dubbo/with-configcenter-go-server/profiles/release/server.yml @@ -31,7 +31,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/examples/dubbo/with-configcenter-go-server/profiles/test/server.yml b/examples/dubbo/with-configcenter-go-server/profiles/test/server.yml index a2ae2fe5d8..cdaaca4c38 100644 --- a/examples/dubbo/with-configcenter-go-server/profiles/test/server.yml +++ b/examples/dubbo/with-configcenter-go-server/profiles/test/server.yml @@ -31,7 +31,6 @@ protocol_conf: keep_alive_period: "120s" tcp_r_buf_size: 262144 tcp_w_buf_size: 65536 - pkg_rq_size: 1024 pkg_wq_size: 512 tcp_read_timeout: "1s" tcp_write_timeout: "5s" diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index c60b265cbc..d9098b6c19 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -25,6 +25,7 @@ import ( import ( "github.com/dubbogo/getty" + "github.com/dubbogo/gost/sync" "github.com/dubbogo/hessian2" perrors "github.com/pkg/errors" "go.uber.org/atomic" @@ -45,7 +46,8 @@ var ( errClientClosed = perrors.New("client closed") errClientReadTimeout = perrors.New("client read timeout") - clientConf *ClientConfig + clientConf *ClientConfig + clientGrpool *gxsync.TaskPool ) func init() { @@ -78,6 +80,7 @@ func init() { } clientConf = conf + setClientGrpool() } func SetClientConf(c ClientConfig) { @@ -87,12 +90,20 @@ func SetClientConf(c ClientConfig) { logger.Warnf("[ClientConfig CheckValidity] error: %v", err) return } + setClientGrpool() } func GetClientConf() ClientConfig { return *clientConf } +func setClientGrpool() { + if clientConf.GrPoolSize > 1 { + clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen), + gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber)) + } +} + type Options struct { // connect timeout ConnectTimeout time.Duration diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index 8c24098b48..55cb6816a0 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -173,7 +173,6 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) { KeepAlivePeriod: "120s", TcpRBufSize: 262144, TcpWBufSize: 65536, - PkgRQSize: 1024, PkgWQSize: 512, TcpReadTimeout: "4s", TcpWriteTimeout: "5s", @@ -193,7 +192,6 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) { KeepAlivePeriod: "120s", TcpRBufSize: 262144, TcpWBufSize: 65536, - PkgRQSize: 1024, PkgWQSize: 512, TcpReadTimeout: "1s", TcpWriteTimeout: "5s", diff --git a/protocol/dubbo/config.go b/protocol/dubbo/config.go index e59b7f2fd8..1ac3c9ad97 100644 --- a/protocol/dubbo/config.go +++ b/protocol/dubbo/config.go @@ -34,7 +34,6 @@ type ( keepAlivePeriod time.Duration TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"` TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"` - PkgRQSize int `default:"1024" yaml:"pkg_rq_size" json:"pkg_rq_size,omitempty"` PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"` TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"` tcpReadTimeout time.Duration @@ -53,6 +52,11 @@ type ( sessionTimeout time.Duration SessionNumber int `default:"1000" yaml:"session_number" json:"session_number,omitempty"` + // grpool + GrPoolSize int `default:"0" yaml:"gr_pool_size" json:"gr_pool_size,omitempty"` + QueueLen int `default:"0" yaml:"queue_len" json:"queue_len,omitempty"` + QueueNumber int `default:"0" yaml:"queue_number" json:"queue_number,omitempty"` + // session tcp parameters GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` } @@ -76,6 +80,11 @@ type ( PoolSize int `default:"2" yaml:"pool_size" json:"pool_size,omitempty"` PoolTTL int `default:"180" yaml:"pool_ttl" json:"pool_ttl,omitempty"` + // grpool + GrPoolSize int `default:"0" yaml:"gr_pool_size" json:"gr_pool_size,omitempty"` + QueueLen int `default:"0" yaml:"queue_len" json:"queue_len,omitempty"` + QueueNumber int `default:"0" yaml:"queue_number" json:"queue_number,omitempty"` + // session tcp parameters GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` } diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index 23e860c19c..546a5b335a 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -111,7 +111,6 @@ func (c *gettyRPCClient) newSession(session getty.Session) error { session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) session.SetPkgHandler(NewRpcClientPackageHandler(c.pool.rpcClient)) session.SetEventListener(NewRpcClientHandler(c)) - session.SetRQLen(conf.GettySessionParam.PkgRQSize) session.SetWQLen(conf.GettySessionParam.PkgWQSize) session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout) @@ -119,6 +118,8 @@ func (c *gettyRPCClient) newSession(session getty.Session) error { session.SetWaitTime(conf.GettySessionParam.waitTimeout) logger.Debugf("client new session:%s\n", session.Stat()) + session.SetTaskPool(clientGrpool) + return nil } diff --git a/protocol/dubbo/server.go b/protocol/dubbo/server.go index 25c8f1bf4d..8daeee05e2 100644 --- a/protocol/dubbo/server.go +++ b/protocol/dubbo/server.go @@ -24,6 +24,7 @@ import ( import ( "github.com/dubbogo/getty" + "github.com/dubbogo/gost/sync" "gopkg.in/yaml.v2" ) @@ -33,7 +34,10 @@ import ( "github.com/apache/dubbo-go/config" ) -var srvConf *ServerConfig +var ( + srvConf *ServerConfig + srvGrpool *gxsync.TaskPool +) func init() { @@ -64,6 +68,7 @@ func init() { } srvConf = conf + SetServerGrpool() } func SetServerConfig(s ServerConfig) { @@ -73,12 +78,20 @@ func SetServerConfig(s ServerConfig) { logger.Warnf("[ServerConfig CheckValidity] error: %v", err) return } + SetServerGrpool() } func GetServerConfig() ServerConfig { return *srvConf } +func SetServerGrpool() { + if srvConf.GrPoolSize > 1 { + srvGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen), + gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber)) + } +} + type Server struct { conf ServerConfig tcpServer getty.Server @@ -123,7 +136,6 @@ func (s *Server) newSession(session getty.Session) error { session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) session.SetPkgHandler(rpcServerPkgHandler) session.SetEventListener(s.rpcHandler) - session.SetRQLen(conf.GettySessionParam.PkgRQSize) session.SetWQLen(conf.GettySessionParam.PkgWQSize) session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout) @@ -131,6 +143,8 @@ func (s *Server) newSession(session getty.Session) error { session.SetWaitTime(conf.GettySessionParam.waitTimeout) logger.Debugf("app accepts new session:%s\n", session.Stat()) + session.SetTaskPool(srvGrpool) + return nil }