Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/direct notify #995

Closed
wants to merge 13 commits into from
2 changes: 1 addition & 1 deletion cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
// rule doesn't change), and the address list doesn't change, then the existing data will be re-used.
func poolRouter(p router.Poolable, origin *InvokerCache, invokers []protocol.Invoker) (router.AddrPool, router.AddrMetadata) {
name := p.Name()
if isCacheMiss(origin, name) || p.ShouldPool() || &(origin.invokers) != &invokers {
if isCacheMiss(origin, name) || &(origin.invokers) != &invokers {
logger.Debugf("build address cache for router %q", name)
return p.Pool(invokers)
}
Expand Down
5 changes: 0 additions & 5 deletions cluster/router/healthcheck/health_check_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,6 @@ func (r *HealthCheckRouter) Pool(invokers []protocol.Invoker) (router.AddrPool,
return rb, nil
}

// ShouldPool will always return true to make sure healthy check constantly.
func (r *HealthCheckRouter) ShouldPool() bool {
return r.enabled
}

func (r *HealthCheckRouter) Name() string {
return name
}
Expand Down
4 changes: 0 additions & 4 deletions cluster/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ type Poolable interface {
// Pool created address pool and address metadata from the invokers.
Pool([]protocol.Invoker) (AddrPool, AddrMetadata)

// ShouldPool returns if it should pool. One typical scenario is a router rule changes, in this case, a pooling
// is necessary, even if the addresses not changed at all.
ShouldPool() bool

// Name return the Poolable's name.
Name() string
}
Expand Down
10 changes: 0 additions & 10 deletions cluster/router/tag/tag_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ type tagRouter struct {
enabled bool
priority int64
application string
ruleChanged bool
mutex sync.RWMutex
}

Expand Down Expand Up @@ -190,7 +189,6 @@ func (c *tagRouter) Process(event *config_center.ConfigChangeEvent) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.tagRouterRule = routerRule
c.ruleChanged = true
}

// URL gets the url of tagRouter
Expand All @@ -213,7 +211,6 @@ func (c *tagRouter) Pool(invokers []protocol.Invoker) (router.AddrPool, router.A
c.mutex.Lock()
defer c.mutex.Unlock()
poolWithDynamicTag(invokers, c.tagRouterRule, rb)
c.ruleChanged = false
// create metadata in order to avoid lock in route()
meta := addrMetadata{application: c.application}
if c.tagRouterRule != nil {
Expand Down Expand Up @@ -268,13 +265,6 @@ func (c *tagRouter) fetchRuleIfNecessary(invokers []protocol.Invoker) {
}
}

// ShouldPool returns false, to make sure address cache for tag router happens once and only once.
func (c *tagRouter) ShouldPool() bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.ruleChanged
}

// Name returns pool's name
func (c *tagRouter) Name() string {
return name
Expand Down
26 changes: 26 additions & 0 deletions common/dubbo_dir_refresh.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package common

import "sync"

// DirMap is used by dubbo-getty, it can notify delete event to dubbo directory directly by getty session
// look up at:
// remoting/getty/listener.go
// registry/directory/directory.go
var DirMap sync.Map
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use public variable in this module? Although you have comment in this variable,but It can be used in all modules in dubbo-go.Even the module out of dubbo-go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's the best way to achieve the direct notify from remoting.session to directory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find another way to link this two pkg. As there may be unexpected loop refer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

请允许我使用中文,关于这个问题,我先说一下我的看法。
先看一下现在的结构,我没理解错应该是:
invoker-》client-》getty pool-》session(connection)
现在是直接加在 session 的 close/error 上。

针对这个 PR,我有以下几个疑问:

  • 这个功能是否应该加在 invoker / client ?为什么我这么说?请看第二个问题。
  • 因为有可能有 10 个 connection,其中 n ( n < 10 )个因为网络抖动连不上,然后就去删除 DIr 中的 cache?这也可能和第三个问题有关联。
  • 我只看到在 session(connection)上删除,但是我没看到如何加回去?
  • 如果按照现在的套路,如果 n ( n > 1) 个 registry 要加回去,是不是会造成:频繁增加。

我的建议是:
调整到 invoekr 或者 client,通过判断窗口期中多少个请求失败,再触发删除 cache,最后记得加上恢复 registry 的代码。
但是如果按照我的建议做的话,是否能基于:ActiveFilter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我仔细想了想,这里的确有问题。
我写这个pr的是以:“这个session一旦被关闭,证明server端就一定关闭了”为前提的。一旦session因为网络抖动出现了err,会直接删除掉dir的cache,这样设计不太合理。
我先表述一下这个pr的初衷:是为了提高客户端对于server端下线的感知速度:不只通过regisgtry感知,而是通过session感知。如果判断窗口期请求,我觉得是不太现实,因为这个时间可能registry都通知过来了。
感觉这个需求可能有点问题。
另一方面,我的另一个pr:#976
有提供invoke前的健康检查机制,感觉这两个pr在功能上是有重复的,我和展图讨论下。

19 changes: 17 additions & 2 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type RegistryDirectory struct {
}

// NewRegistryDirectory will create a new RegistryDirectory
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
func NewRegistryDirectory(url *common.URL, reg registry.Registry) (cluster.Directory, error) {
if url.SubURL == nil {
return nil, perrors.Errorf("url is invalid, suburl can not be nil")
}
Expand All @@ -79,7 +79,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.
cacheInvokers: []protocol.Invoker{},
cacheInvokersMap: &sync.Map{},
serviceType: url.SubURL.Service(),
registry: registry,
registry: reg,
}

dir.consumerURL = dir.getConsumerUrl(url.SubURL)
Expand Down Expand Up @@ -233,6 +233,20 @@ func (dir *RegistryDirectory) setNewInvokers() {
dir.RouterChain().SetInvokers(newInvokers)
}

// AddToDirMap is called after receive Add event
// this function add a way to delete service in directory by getty-session, not only by registry.
// the way of using getty-session to delete is faster than registry to delete
// this function register the delete function to common sync.Map, and it read by getty-session, to push url
func (dir *RegistryDirectory) AddToDirMap(url *common.URL) {
common.DirMap.Store(url.Key(), func(url *common.URL) {
eve := &registry.ServiceEvent{
Service: url,
Action: remoting.EventTypeDel,
}
dir.Notify(eve)
})
}

// cacheInvokerByEvent caches invokers from the service event
func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) {
// judge is override or others
Expand All @@ -243,6 +257,7 @@ func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent)
logger.Infof("selector add service url{%s}", event.Service)
// FIXME: routers are built in every address notification?
dir.configRouters()
dir.AddToDirMap(event.Service)
return dir.cacheInvoker(u), nil
case remoting.EventTypeDel:
logger.Infof("selector delete service url{%s}", event.Service)
Expand Down
2 changes: 1 addition & 1 deletion remoting/getty/getty_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (c *Client) Connect(url *common.URL) error {
initClient(url.Protocol)
c.conf = *clientConf
// new client
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL), url)
c.pool.sslEnabled = url.GetParamBool(constant.SSL_ENABLED_KEY, false)

// codec
Expand Down
2 changes: 1 addition & 1 deletion remoting/getty/getty_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func getClient(url *common.URL) *Client {
}

func testClient_Call(t *testing.T, svr *Server, url *common.URL, c *Client) {
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL), &common.URL{})

testGetBigPkg(t, c)
testGetUser(t, c)
Expand Down
19 changes: 17 additions & 2 deletions remoting/getty/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol/invocation"
Expand Down Expand Up @@ -69,11 +70,15 @@ func (s *rpcSession) GetReqNum() int32 {
type RpcClientHandler struct {
conn *gettyRPCClient
timeoutTimes int
url *common.URL
}

// nolint
func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler {
return &RpcClientHandler{conn: client}
func NewRpcClientHandler(client *gettyRPCClient, url *common.URL) *RpcClientHandler {
return &RpcClientHandler{
conn: client,
url: url,
}
}

// OnOpen call the getty client session opened, add the session to getty client session list
Expand All @@ -85,15 +90,25 @@ func (h *RpcClientHandler) OnOpen(session getty.Session) error {
// OnError the getty client session has errored, so remove the session from the getty client session list
func (h *RpcClientHandler) OnError(session getty.Session, err error) {
logger.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.tryDirectDelete()
h.conn.removeSession(session)
}

// OnClose close the session, remove it from the getty session list
func (h *RpcClientHandler) OnClose(session getty.Session) {
logger.Infof("session{%s} is closing......", session.Stat())
h.tryDirectDelete()
h.conn.removeSession(session)
}

func (h *RpcClientHandler) tryDirectDelete() {
f, ok := common.DirMap.Load(h.url.Key())
if !ok {
return
}
f.(func(url *common.URL))(h.url)
}

// OnMessage get response from getty server, and update the session to the getty client session list
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
result, ok := pkg.(remoting.DecodeResult)
Expand Down
13 changes: 8 additions & 5 deletions remoting/getty/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
)
Expand All @@ -54,7 +55,7 @@ var (
errClientPoolClosed = perrors.New("client pool closed")
)

func newGettyRPCClientConn(pool *gettyRPCClientPool, addr string) (*gettyRPCClient, error) {
func newGettyRPCClientConn(pool *gettyRPCClientPool, addr string, url *common.URL) (*gettyRPCClient, error) {
var (
gettyClient getty.Client
sslEnabled bool
Expand Down Expand Up @@ -134,7 +135,7 @@ func (c *gettyRPCClient) newSession(session getty.Session) error {
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcClientPackageHandler(c.pool.rpcClient))
session.SetEventListener(NewRpcClientHandler(c))
session.SetEventListener(NewRpcClientHandler(c, c.pool.url))
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
Expand Down Expand Up @@ -167,7 +168,7 @@ func (c *gettyRPCClient) newSession(session getty.Session) error {
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcClientPackageHandler(c.pool.rpcClient))
session.SetEventListener(NewRpcClientHandler(c))
session.SetEventListener(NewRpcClientHandler(c, c.pool.url))
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
Expand Down Expand Up @@ -333,15 +334,17 @@ type gettyRPCClientPool struct {

sync.Mutex
conns []*gettyRPCClient
url *common.URL
}

func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool {
func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration, url *common.URL) *gettyRPCClientPool {
return &gettyRPCClientPool{
rpcClient: rpcClient,
size: size,
ttl: int64(ttl.Seconds()),
// init capacity : 2
conns: make([]*gettyRPCClient, 0, 2),
url: url,
}
}

Expand All @@ -359,7 +362,7 @@ func (p *gettyRPCClientPool) getGettyRpcClient(addr string) (*gettyRPCClient, er
conn, connErr := p.get()
if connErr == nil && conn == nil {
// create new conn
rpcClientConn, rpcErr := newGettyRPCClientConn(p, addr)
rpcClientConn, rpcErr := newGettyRPCClientConn(p, addr, p.url)
if rpcErr == nil {
p.put(rpcClientConn)
}
Expand Down
6 changes: 5 additions & 1 deletion remoting/getty/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ import (
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common"
)

func TestGetConnFromPool(t *testing.T) {
var rpcClient Client

clientPoll := newGettyRPCClientConnPool(&rpcClient, 1, time.Duration(5*time.Second))
clientPoll := newGettyRPCClientConnPool(&rpcClient, 1, time.Duration(5*time.Second), &common.URL{})

var conn1 gettyRPCClient
conn1.active = time.Now().Unix()
Expand Down