From b6022bbd868305854f51e07312d8d5c3eacdea03 Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Mon, 4 Jan 2021 15:06:32 +0800 Subject: [PATCH 1/8] fix ctx linter error --- common/constant/key.go | 2 +- common/url.go | 8 ++-- common/url_test.go | 42 +++++++++++++++---- config_center/apollo/listener.go | 2 +- .../tps/tps_limiter_method_service_test.go | 2 +- remoting/getty/listener_test.go | 6 +-- 6 files changed, 44 insertions(+), 18 deletions(-) diff --git a/common/constant/key.go b/common/constant/key.go index 12e3096e5a..50aea81371 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -200,7 +200,7 @@ const ( ) const ( - TRACING_REMOTE_SPAN_CTX = "tracing.remote.span.ctx" + TRACING_REMOTE_SPAN_CTX = DubboCtxKey("tracing.remote.span.ctx") ) // Use for router module diff --git a/common/url.go b/common/url.go index 87cacfd7fb..ac49465346 100644 --- a/common/url.go +++ b/common/url.go @@ -90,9 +90,7 @@ type baseUrl struct { Location string // ip+port Ip string Port string - //url.Values is not safe map, add to avoid concurrent map read and map write error - paramsLock sync.RWMutex - params url.Values + PrimitiveURL string } @@ -116,6 +114,10 @@ type URL struct { noCopy noCopy baseUrl + //url.Values is not safe map, add to avoid concurrent map read and map write error + paramsLock sync.RWMutex + params url.Values + Path string // like /com.ikurento.dubbo.UserProvider Username string Password string diff --git a/common/url_test.go b/common/url_test.go index 4008f6a3d3..c645f1a046 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -161,7 +161,10 @@ func TestURLEqual(t *testing.T) { func TestURLGetParam(t *testing.T) { params := url.Values{} params.Set("key", "value") - u := URL{baseUrl: baseUrl{params: params}} + + u := URL{} + u.SetParams(params) + v := u.GetParam("key", "default") assert.Equal(t, "value", v) @@ -172,8 +175,11 @@ func TestURLGetParam(t *testing.T) { func TestURLGetParamInt(t *testing.T) { params := url.Values{} - params.Set("key", "") - u := URL{baseUrl: baseUrl{params: params}} + params.Set("key", "value") + + u := URL{} + u.SetParams(params) + v := u.GetParamInt("key", 1) assert.Equal(t, int64(1), v) @@ -185,7 +191,10 @@ func TestURLGetParamInt(t *testing.T) { func TestURLGetParamIntValue(t *testing.T) { params := url.Values{} params.Set("key", "0") - u := URL{baseUrl: baseUrl{params: params}} + + u := URL{} + u.SetParams(params) + v := u.GetParamInt("key", 1) assert.Equal(t, int64(0), v) @@ -197,7 +206,10 @@ func TestURLGetParamIntValue(t *testing.T) { func TestURLGetParamBool(t *testing.T) { params := url.Values{} params.Set("force", "true") - u := URL{baseUrl: baseUrl{params: params}} + + u := URL{} + u.SetParams(params) + v := u.GetParamBool("force", false) assert.Equal(t, true, v) @@ -210,7 +222,10 @@ func TestURLGetParamAndDecoded(t *testing.T) { rule := "host = 2.2.2.2,1.1.1.1,3.3.3.3 & host !=1.1.1.1 => host = 1.2.3.4" params := url.Values{} params.Set("rule", base64.URLEncoding.EncodeToString([]byte(rule))) - u := URL{baseUrl: baseUrl{params: params}} + + u := URL{} + u.SetParams(params) + v, _ := u.GetParamAndDecoded("rule") assert.Equal(t, rule, v) } @@ -247,7 +262,10 @@ func TestURLToMap(t *testing.T) { func TestURLGetMethodParamInt(t *testing.T) { params := url.Values{} params.Set("methods.GetValue.timeout", "3") - u := URL{baseUrl: baseUrl{params: params}} + + u := URL{} + u.SetParams(params) + v := u.GetMethodParamInt("GetValue", "timeout", 1) assert.Equal(t, int64(3), v) @@ -259,7 +277,10 @@ func TestURLGetMethodParamInt(t *testing.T) { func TestURLGetMethodParam(t *testing.T) { params := url.Values{} params.Set("methods.GetValue.timeout", "3s") - u := URL{baseUrl: baseUrl{params: params}} + + u := URL{} + u.SetParams(params) + v := u.GetMethodParam("GetValue", "timeout", "1s") assert.Equal(t, "3s", v) @@ -271,7 +292,10 @@ func TestURLGetMethodParam(t *testing.T) { func TestURLGetMethodParamBool(t *testing.T) { params := url.Values{} params.Set("methods.GetValue.async", "true") - u := URL{baseUrl: baseUrl{params: params}} + + u := URL{} + u.SetParams(params) + v := u.GetMethodParamBool("GetValue", "async", false) assert.Equal(t, true, v) diff --git a/config_center/apollo/listener.go b/config_center/apollo/listener.go index ace5ed0268..44d325582f 100644 --- a/config_center/apollo/listener.go +++ b/config_center/apollo/listener.go @@ -36,7 +36,7 @@ type apolloListener struct { // nolint func newApolloListener() *apolloListener { return &apolloListener{ - listeners: make(map[config_center.ConfigurationListener]struct{}, 0), + listeners: make(map[config_center.ConfigurationListener]struct{}), } } diff --git a/filter/filter_impl/tps/tps_limiter_method_service_test.go b/filter/filter_impl/tps/tps_limiter_method_service_test.go index a70287eabd..4ff0a232e4 100644 --- a/filter/filter_impl/tps/tps_limiter_method_service_test.go +++ b/filter/filter_impl/tps/tps_limiter_method_service_test.go @@ -113,7 +113,7 @@ func TestMethodServiceTpsLimiterImplIsAllowableMethodLevelOverride(t *testing.T) func TestMethodServiceTpsLimiterImplIsAllowableBothMethodAndService(t *testing.T) { methodName := "hello3" methodConfigPrefix := "methods." + methodName + "." - invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}, 0)) + invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{})) ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/remoting/getty/listener_test.go b/remoting/getty/listener_test.go index 956ecf9849..2700ed8cd8 100644 --- a/remoting/getty/listener_test.go +++ b/remoting/getty/listener_test.go @@ -23,14 +23,14 @@ import ( ) import ( + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/mocktracer" "github.com/stretchr/testify/assert" ) import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol/invocation" - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/mocktracer" ) // test rebuild the ctx @@ -63,7 +63,7 @@ func TestRebuildCtx(t *testing.T) { // Once we decided to transfer more context's key-value, we should change this. // now we only support rebuild the tracing context func rebuildCtx(inv *invocation.RPCInvocation) context.Context { - ctx := context.WithValue(context.Background(), "attachment", inv.Attachments()) + ctx := context.WithValue(context.Background(), constant.DubboCtxKey("attachment"), inv.Attachments()) // actually, if user do not use any opentracing framework, the err will not be nil. spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, From 5d970c664ad1f498808feedbcf0e5672447cbf1a Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Wed, 13 Jan 2021 14:28:21 +0800 Subject: [PATCH 2/8] fix: add provider -> consumer direct notify when shut down --- common/dubbo_dir_refresh.go | 9 +++++++++ registry/directory/directory.go | 19 +++++++++++++++++-- remoting/getty/getty_client.go | 2 +- remoting/getty/listener.go | 15 +++++++++++++-- remoting/getty/pool.go | 13 ++++++++----- 5 files changed, 48 insertions(+), 10 deletions(-) create mode 100644 common/dubbo_dir_refresh.go diff --git a/common/dubbo_dir_refresh.go b/common/dubbo_dir_refresh.go new file mode 100644 index 0000000000..0d942ed802 --- /dev/null +++ b/common/dubbo_dir_refresh.go @@ -0,0 +1,9 @@ +package common + +import "sync" + +// DirMap is used by dubbo-getty, it can notify delete event to dubbo directory directly by getty session +// look up at: +// remoting/getty/listener.go +// registry/directory/directory.go +var DirMap sync.Map diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 1b607351af..8d611a4274 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -69,7 +69,7 @@ type RegistryDirectory struct { } // NewRegistryDirectory will create a new RegistryDirectory -func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) { +func NewRegistryDirectory(url *common.URL, reg registry.Registry) (cluster.Directory, error) { if url.SubURL == nil { return nil, perrors.Errorf("url is invalid, suburl can not be nil") } @@ -79,7 +79,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster. cacheInvokers: []protocol.Invoker{}, cacheInvokersMap: &sync.Map{}, serviceType: url.SubURL.Service(), - registry: registry, + registry: reg, } dir.consumerURL = dir.getConsumerUrl(url.SubURL) @@ -233,6 +233,20 @@ func (dir *RegistryDirectory) setNewInvokers() { dir.RouterChain().SetInvokers(newInvokers) } +// AddToDirMap is called after receive Add event +// this function add a way to delete service in directory directory by getty-session, not only by registry +// getty-session way to delete is faster than registry to delete +// this function register the delete function to common sync.Map, and it read by getty-session, to push url +func (dir *RegistryDirectory) AddToDirMap(url *common.URL) { + common.DirMap.Store(url.Key(), func(url *common.URL) { + eve := ®istry.ServiceEvent{ + Service: url, + Action: remoting.EventTypeDel, + } + dir.Notify(eve) + }) +} + // cacheInvokerByEvent caches invokers from the service event func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) { // judge is override or others @@ -243,6 +257,7 @@ func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) logger.Infof("selector add service url{%s}", event.Service) // FIXME: routers are built in every address notification? dir.configRouters() + dir.AddToDirMap(event.Service) return dir.cacheInvoker(u), nil case remoting.EventTypeDel: logger.Infof("selector delete service url{%s}", event.Service) diff --git a/remoting/getty/getty_client.go b/remoting/getty/getty_client.go index 57221cc6d7..aeb2912e0f 100644 --- a/remoting/getty/getty_client.go +++ b/remoting/getty/getty_client.go @@ -146,7 +146,7 @@ func (c *Client) Connect(url *common.URL) error { initClient(url.Protocol) c.conf = *clientConf // new client - c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) + c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL), url) c.pool.sslEnabled = url.GetParamBool(constant.SSL_ENABLED_KEY, false) // codec diff --git a/remoting/getty/listener.go b/remoting/getty/listener.go index fd4c4898f2..06de261b58 100644 --- a/remoting/getty/listener.go +++ b/remoting/getty/listener.go @@ -19,6 +19,7 @@ package getty import ( "fmt" + "github.com/apache/dubbo-go/common" "sync" "sync/atomic" "time" @@ -69,11 +70,15 @@ func (s *rpcSession) GetReqNum() int32 { type RpcClientHandler struct { conn *gettyRPCClient timeoutTimes int + url *common.URL } // nolint -func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler { - return &RpcClientHandler{conn: client} +func NewRpcClientHandler(client *gettyRPCClient, url *common.URL) *RpcClientHandler { + return &RpcClientHandler{ + conn: client, + url: url, + } } // OnOpen call the getty client session opened, add the session to getty client session list @@ -91,6 +96,12 @@ func (h *RpcClientHandler) OnError(session getty.Session, err error) { // OnClose close the session, remove it from the getty session list func (h *RpcClientHandler) OnClose(session getty.Session) { logger.Infof("session{%s} is closing......", session.Stat()) + f, ok := common.DirMap.Load(h.url.Key()) + if !ok { + h.conn.removeSession(session) + return + } + f.(func(url *common.URL))(h.url) h.conn.removeSession(session) } diff --git a/remoting/getty/pool.go b/remoting/getty/pool.go index 2b1cdfe2f4..3775ec47e1 100644 --- a/remoting/getty/pool.go +++ b/remoting/getty/pool.go @@ -20,6 +20,7 @@ package getty import ( "crypto/tls" "fmt" + "github.com/apache/dubbo-go/common" "math/rand" "net" "sync" @@ -54,7 +55,7 @@ var ( errClientPoolClosed = perrors.New("client pool closed") ) -func newGettyRPCClientConn(pool *gettyRPCClientPool, addr string) (*gettyRPCClient, error) { +func newGettyRPCClientConn(pool *gettyRPCClientPool, addr string, url *common.URL) (*gettyRPCClient, error) { var ( gettyClient getty.Client sslEnabled bool @@ -134,7 +135,7 @@ func (c *gettyRPCClient) newSession(session getty.Session) error { session.SetName(conf.GettySessionParam.SessionName) session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) session.SetPkgHandler(NewRpcClientPackageHandler(c.pool.rpcClient)) - session.SetEventListener(NewRpcClientHandler(c)) + session.SetEventListener(NewRpcClientHandler(c, c.pool.url)) session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout) session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6)) @@ -167,7 +168,7 @@ func (c *gettyRPCClient) newSession(session getty.Session) error { session.SetName(conf.GettySessionParam.SessionName) session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) session.SetPkgHandler(NewRpcClientPackageHandler(c.pool.rpcClient)) - session.SetEventListener(NewRpcClientHandler(c)) + session.SetEventListener(NewRpcClientHandler(c, c.pool.url)) session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout) session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6)) @@ -333,15 +334,17 @@ type gettyRPCClientPool struct { sync.Mutex conns []*gettyRPCClient + url *common.URL } -func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool { +func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration, url *common.URL) *gettyRPCClientPool { return &gettyRPCClientPool{ rpcClient: rpcClient, size: size, ttl: int64(ttl.Seconds()), // init capacity : 2 conns: make([]*gettyRPCClient, 0, 2), + url: url, } } @@ -359,7 +362,7 @@ func (p *gettyRPCClientPool) getGettyRpcClient(addr string) (*gettyRPCClient, er conn, connErr := p.get() if connErr == nil && conn == nil { // create new conn - rpcClientConn, rpcErr := newGettyRPCClientConn(p, addr) + rpcClientConn, rpcErr := newGettyRPCClientConn(p, addr, p.url) if rpcErr == nil { p.put(rpcClientConn) } From 4f84299c02875fc2b5df5d86678b342ce699560a Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Wed, 13 Jan 2021 14:30:00 +0800 Subject: [PATCH 3/8] fix: add provider -> consumer direct notify when shut down --- remoting/getty/listener.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/remoting/getty/listener.go b/remoting/getty/listener.go index 06de261b58..238c211cf5 100644 --- a/remoting/getty/listener.go +++ b/remoting/getty/listener.go @@ -90,19 +90,23 @@ func (h *RpcClientHandler) OnOpen(session getty.Session) error { // OnError the getty client session has errored, so remove the session from the getty client session list func (h *RpcClientHandler) OnError(session getty.Session, err error) { logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err) + h.tryDirectlyDelete() h.conn.removeSession(session) } // OnClose close the session, remove it from the getty session list func (h *RpcClientHandler) OnClose(session getty.Session) { logger.Infof("session{%s} is closing......", session.Stat()) + h.tryDirectlyDelete() + h.conn.removeSession(session) +} + +func (h *RpcClientHandler) tryDirectlyDelete() { f, ok := common.DirMap.Load(h.url.Key()) if !ok { - h.conn.removeSession(session) return } f.(func(url *common.URL))(h.url) - h.conn.removeSession(session) } // OnMessage get response from getty server, and update the session to the getty client session list From fb72628fbbce53ee7807f8c8dd497c86ed7b0c44 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Wed, 13 Jan 2021 14:47:15 +0800 Subject: [PATCH 4/8] fix: test linter --- remoting/getty/getty_client_test.go | 2 +- remoting/getty/pool_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/remoting/getty/getty_client_test.go b/remoting/getty/getty_client_test.go index c32e0c23f4..80f55a8c2a 100644 --- a/remoting/getty/getty_client_test.go +++ b/remoting/getty/getty_client_test.go @@ -94,7 +94,7 @@ func getClient(url *common.URL) *Client { } func testClient_Call(t *testing.T, svr *Server, url *common.URL, c *Client) { - c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) + c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL), &common.URL{}) testGetBigPkg(t, c) testGetUser(t, c) diff --git a/remoting/getty/pool_test.go b/remoting/getty/pool_test.go index 1115a49042..6b888de8d1 100644 --- a/remoting/getty/pool_test.go +++ b/remoting/getty/pool_test.go @@ -18,6 +18,7 @@ package getty import ( + "github.com/apache/dubbo-go/common" "testing" "time" ) @@ -29,7 +30,7 @@ import ( func TestGetConnFromPool(t *testing.T) { var rpcClient Client - clientPoll := newGettyRPCClientConnPool(&rpcClient, 1, time.Duration(5*time.Second)) + clientPoll := newGettyRPCClientConnPool(&rpcClient, 1, time.Duration(5*time.Second), &common.URL{}) var conn1 gettyRPCClient conn1.active = time.Now().Unix() From 887ef911758eaa3c6a339a329905f8acfc3c9121 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Wed, 13 Jan 2021 15:10:50 +0800 Subject: [PATCH 5/8] fix: add liscense --- common/dubbo_dir_refresh.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/common/dubbo_dir_refresh.go b/common/dubbo_dir_refresh.go index 0d942ed802..294a53a5ae 100644 --- a/common/dubbo_dir_refresh.go +++ b/common/dubbo_dir_refresh.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package common import "sync" From e9f044e4144f01028e2716b72f1e1ed8ca2a08c4 Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Thu, 14 Jan 2021 14:48:51 +0800 Subject: [PATCH 6/8] fix: remove should pool --- cluster/router/chain/chain.go | 2 +- cluster/router/healthcheck/health_check_route.go | 5 ----- cluster/router/router.go | 4 ---- cluster/router/tag/tag_router.go | 10 ---------- 4 files changed, 1 insertion(+), 20 deletions(-) diff --git a/cluster/router/chain/chain.go b/cluster/router/chain/chain.go index fccce838bb..8a79cd8156 100644 --- a/cluster/router/chain/chain.go +++ b/cluster/router/chain/chain.go @@ -269,7 +269,7 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) { // rule doesn't change), and the address list doesn't change, then the existing data will be re-used. func poolRouter(p router.Poolable, origin *InvokerCache, invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) { name := p.Name() - if isCacheMiss(origin, name) || p.ShouldPool() || &(origin.invokers) != &invokers { + if isCacheMiss(origin, name) || &(origin.invokers) != &invokers { logger.Debugf("build address cache for router %q", name) return p.Pool(invokers) } diff --git a/cluster/router/healthcheck/health_check_route.go b/cluster/router/healthcheck/health_check_route.go index 1a878af212..f8bde7106c 100644 --- a/cluster/router/healthcheck/health_check_route.go +++ b/cluster/router/healthcheck/health_check_route.go @@ -91,11 +91,6 @@ func (r *HealthCheckRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, return rb, nil } -// ShouldPool will always return true to make sure healthy check constantly. -func (r *HealthCheckRouter) ShouldPool() bool { - return r.enabled -} - func (r *HealthCheckRouter) Name() string { return name } diff --git a/cluster/router/router.go b/cluster/router/router.go index 8a19dcf8cc..8319b3092a 100644 --- a/cluster/router/router.go +++ b/cluster/router/router.go @@ -61,10 +61,6 @@ type Poolable interface { // Pool created address pool and address metadata from the invokers. Pool([]protocol.Invoker) (AddrPool, AddrMetadata) - // ShouldPool returns if it should pool. One typical scenario is a router rule changes, in this case, a pooling - // is necessary, even if the addresses not changed at all. - ShouldPool() bool - // Name return the Poolable's name. Name() string } diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go index 984ecb4eef..42315d2a16 100644 --- a/cluster/router/tag/tag_router.go +++ b/cluster/router/tag/tag_router.go @@ -74,7 +74,6 @@ type tagRouter struct { enabled bool priority int64 application string - ruleChanged bool mutex sync.RWMutex } @@ -190,7 +189,6 @@ func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) { c.mutex.Lock() defer c.mutex.Unlock() c.tagRouterRule = routerRule - c.ruleChanged = true } // URL gets the url of tagRouter @@ -213,7 +211,6 @@ func (c *tagRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.A c.mutex.Lock() defer c.mutex.Unlock() poolWithDynamicTag(invokers, c.tagRouterRule, rb) - c.ruleChanged = false // create metadata in order to avoid lock in route() meta := addrMetadata{application: c.application} if c.tagRouterRule != nil { @@ -268,13 +265,6 @@ func (c *tagRouter) fetchRuleIfNecessary(invokers []protocol.Invoker) { } } -// ShouldPool returns false, to make sure address cache for tag router happens once and only once. -func (c *tagRouter) ShouldPool() bool { - c.mutex.RLock() - defer c.mutex.RUnlock() - return c.ruleChanged -} - // Name returns pool's name func (c *tagRouter) Name() string { return name From 87c1321872be21fe87173e07a6beb8e82b9c0c2d Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Sun, 17 Jan 2021 15:53:55 +0800 Subject: [PATCH 7/8] Fix: --- remoting/getty/listener.go | 2 +- remoting/getty/pool.go | 2 +- remoting/getty/pool_test.go | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/remoting/getty/listener.go b/remoting/getty/listener.go index 238c211cf5..477b372f3e 100644 --- a/remoting/getty/listener.go +++ b/remoting/getty/listener.go @@ -19,7 +19,6 @@ package getty import ( "fmt" - "github.com/apache/dubbo-go/common" "sync" "sync/atomic" "time" @@ -32,6 +31,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol/invocation" diff --git a/remoting/getty/pool.go b/remoting/getty/pool.go index 3775ec47e1..87a49e5020 100644 --- a/remoting/getty/pool.go +++ b/remoting/getty/pool.go @@ -20,7 +20,6 @@ package getty import ( "crypto/tls" "fmt" - "github.com/apache/dubbo-go/common" "math/rand" "net" "sync" @@ -34,6 +33,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" ) diff --git a/remoting/getty/pool_test.go b/remoting/getty/pool_test.go index 6b888de8d1..95ff9bfdee 100644 --- a/remoting/getty/pool_test.go +++ b/remoting/getty/pool_test.go @@ -18,7 +18,6 @@ package getty import ( - "github.com/apache/dubbo-go/common" "testing" "time" ) @@ -27,6 +26,10 @@ import ( "github.com/stretchr/testify/assert" ) +import ( + "github.com/apache/dubbo-go/common" +) + func TestGetConnFromPool(t *testing.T) { var rpcClient Client From dad63a52a8c47996cb09ae7a95490f9933a323ff Mon Sep 17 00:00:00 2001 From: LaurenceLiZhixin <382673304@qq.com> Date: Sun, 17 Jan 2021 17:33:42 +0800 Subject: [PATCH 8/8] fix --- go.mod | 2 ++ registry/directory/directory.go | 4 ++-- remoting/getty/listener.go | 6 +++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 18d85a650f..ac83a94bcf 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,7 @@ module github.com/apache/dubbo-go +go 1.15 + require ( github.com/NYTimes/gziphandler v1.1.1 // indirect github.com/RoaringBitmap/roaring v0.5.5 diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 8d611a4274..b868cc93f9 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -234,8 +234,8 @@ func (dir *RegistryDirectory) setNewInvokers() { } // AddToDirMap is called after receive Add event -// this function add a way to delete service in directory directory by getty-session, not only by registry -// getty-session way to delete is faster than registry to delete +// this function add a way to delete service in directory by getty-session, not only by registry. +// the way of using getty-session to delete is faster than registry to delete // this function register the delete function to common sync.Map, and it read by getty-session, to push url func (dir *RegistryDirectory) AddToDirMap(url *common.URL) { common.DirMap.Store(url.Key(), func(url *common.URL) { diff --git a/remoting/getty/listener.go b/remoting/getty/listener.go index 477b372f3e..e9c17d5499 100644 --- a/remoting/getty/listener.go +++ b/remoting/getty/listener.go @@ -90,18 +90,18 @@ func (h *RpcClientHandler) OnOpen(session getty.Session) error { // OnError the getty client session has errored, so remove the session from the getty client session list func (h *RpcClientHandler) OnError(session getty.Session, err error) { logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err) - h.tryDirectlyDelete() + h.tryDirectDelete() h.conn.removeSession(session) } // OnClose close the session, remove it from the getty session list func (h *RpcClientHandler) OnClose(session getty.Session) { logger.Infof("session{%s} is closing......", session.Stat()) - h.tryDirectlyDelete() + h.tryDirectDelete() h.conn.removeSession(session) } -func (h *RpcClientHandler) tryDirectlyDelete() { +func (h *RpcClientHandler) tryDirectDelete() { f, ok := common.DirMap.Load(h.url.Key()) if !ok { return