Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix:zk too many tcp conn #1010

Merged
merged 41 commits into from
Mar 15, 2021
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
2aa489a
Merge pull request #981 from lzp0412/develop
AlexStocks Jan 6, 2021
3120a22
Merge remote-tracking branch 'upstream/develop' into fix_zk_too_many_…
wenxuwan Jan 23, 2021
33dc7d4
try to fix zk too many connections
wenxuwan Jan 23, 2021
984a1ae
remove close function in service_discovery
wenxuwan Jan 23, 2021
06b1cd1
remove unused code
wenxuwan Jan 23, 2021
16a43ac
try to fix lint
wenxuwan Jan 23, 2021
186a8e7
fix imports format and ut
wenxuwan Jan 23, 2021
a53ed2b
fix import fmt
wenxuwan Jan 23, 2021
bd59cfc
fix ut error
wenxuwan Jan 24, 2021
ead5951
fix ut error
wenxuwan Jan 24, 2021
55f40f7
Merge remote-tracking branch 'upstream/develop' into fix_zk_too_many_…
wenxuwan Jan 24, 2021
bb84695
fix ut
wenxuwan Jan 24, 2021
394cbb2
fix lint
wenxuwan Jan 24, 2021
930a15b
try to fix ut
wenxuwan Jan 25, 2021
7df6426
fix ut
wenxuwan Jan 25, 2021
93ae337
fix ut
wenxuwan Jan 25, 2021
2cb436b
so tired to fix the ut
wenxuwan Jan 25, 2021
543721c
fix lint
wenxuwan Jan 25, 2021
2bc815a
remove client close from registry
wenxuwan Jan 26, 2021
0a215ac
fix provider not started
wenxuwan Jan 26, 2021
f1c5ee4
Merge remote-tracking branch 'upstream/1.5' into fix_zk_too_many_tcp_…
wenxuwan Feb 23, 2021
43280d7
change some comments
wenxuwan Feb 25, 2021
6fa3fea
fix facade ut
wenxuwan Feb 25, 2021
132ca25
Merge remote-tracking branch 'upstream/1.5' into fix_zk_too_many_tcp_…
wenxuwan Feb 25, 2021
93374a2
fix glint
wenxuwan Feb 25, 2021
991ef61
Merge remote-tracking branch 'upstream/1.5' into fix_zk_too_many_tcp_…
wenxuwan Feb 25, 2021
cb8b665
fix restart can't find provider error
wenxuwan Feb 26, 2021
3fdbb01
try to fix provider lost
wenxuwan Feb 26, 2021
ef8966c
try to fix provider lost
wenxuwan Feb 27, 2021
ca647ab
try to fix provider can't find error
wenxuwan Feb 27, 2021
3a05f71
fix compile error
wenxuwan Feb 27, 2021
ae6a138
change type error
wenxuwan Feb 27, 2021
a8bc4e1
fix comments
wenxuwan Feb 28, 2021
972a91a
move zk to gost
wenxuwan Mar 2, 2021
0ebe79c
fix lint
wenxuwan Mar 2, 2021
46801c3
replace grpc version
wenxuwan Mar 2, 2021
364be20
update gost version
wenxuwan Mar 3, 2021
786e843
fix compile error
wenxuwan Mar 3, 2021
9d8c910
fix comments
wenxuwan Mar 4, 2021
98ba75e
fix comments
wenxuwan Mar 6, 2021
a231789
upgrade hession version
wenxuwan Mar 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 19 additions & 22 deletions cluster/router/chain/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

import (
zk "github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -53,16 +54,19 @@ const (
consumerFormat = "consumer://%s/com.foo.BarService"
dubboForamt = "dubbo://%s:%d/com.foo.BarService"
anyUrlFormat = "condition://%s/com.foo.BarService"
zk = "zookeeper"
zkName = "zookeeper"
applicationKey = "test-condition"
applicationField = "application"
forceField = "force"
forceValue = "true"
)

var zkCluster *zk.TestCluster

func TestNewRouterChain(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
zkCluster = ts
err = z.Create(path)
assert.NoError(t, err)
testyml := `scope: application
Expand All @@ -77,12 +81,12 @@ conditions:
_, err = z.Conn.Set(path, []byte(testyml), 0)
assert.NoError(t, err)
defer func() {
_ = ts.Stop()
z.Delete(path)
z.Close()
}()

zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)

assert.Nil(t, err)
Expand Down Expand Up @@ -114,7 +118,7 @@ func TestNewRouterChainURLNil(t *testing.T) {
}

func TestRouterChainAddRouters(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
_, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second, zookeeper.WithTestCluster(zkCluster))
assert.NoError(t, err)
err = z.Create(path)
assert.NoError(t, err)
Expand All @@ -131,13 +135,12 @@ conditions:
_, err = z.Conn.Set(path, []byte(testyml), 0)
assert.NoError(t, err)
defer func() {
_ = ts.Stop()
assert.NoError(t, err)
z.Delete(path)
z.Close()
}()

zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, zkCluster.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
assert.NoError(t, err)
config.GetEnvInstance().SetDynamicConfiguration(configuration)

Expand All @@ -164,15 +167,10 @@ conditions:
}

func TestRouterChainRoute(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
defer func() {
err = ts.Stop()
assert.NoError(t, err)
z.Close()
}()

ts, _, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second, zookeeper.WithTestCluster(zkCluster))
assert.Nil(t, err)
zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
assert.NoError(t, err)
config.GetEnvInstance().SetDynamicConfiguration(configuration)

Expand All @@ -197,7 +195,7 @@ func TestRouterChainRoute(t *testing.T) {
}

func TestRouterChainRouteAppRouter(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second, zookeeper.WithTestCluster(zkCluster))
assert.NoError(t, err)
err = z.Create(path)
assert.NoError(t, err)
Expand All @@ -214,13 +212,12 @@ conditions:
_, err = z.Conn.Set(path, []byte(testyml), 0)
assert.NoError(t, err)
defer func() {
_ = ts.Stop()
assert.NoError(t, err)
z.Delete(path)
z.Close()
}()

zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
assert.NoError(t, err)
config.GetEnvInstance().SetDynamicConfiguration(configuration)

Expand All @@ -242,7 +239,7 @@ conditions:
}

func TestRouterChainRouteNoRoute(t *testing.T) {
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second)
ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second, zookeeper.WithTestCluster(zkCluster))
assert.Nil(t, err)
defer func() {
_ = ts.Stop()
Expand All @@ -251,7 +248,7 @@ func TestRouterChainRouteNoRoute(t *testing.T) {
}()

zkUrl, _ := common.NewURL(fmt.Sprintf(zkFormat, localIP, ts.Servers[0].Port))
configuration, err := extension.GetConfigCenterFactory(zk).GetDynamicConfiguration(zkUrl)
configuration, err := extension.GetConfigCenterFactory(zkName).GetDynamicConfiguration(zkUrl)
config.GetEnvInstance().SetDynamicConfiguration(configuration)
assert.Nil(t, err)

Expand Down
2 changes: 0 additions & 2 deletions cluster/router/condition/app_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ conditions:
_, err = z.Conn.Set(routerPath, []byte(testYML), 0)
assert.NoError(t, err)
defer func() {
err = ts.Stop()
assert.NoError(t, err)
z.Close()
}()
Expand Down Expand Up @@ -124,7 +123,6 @@ conditions:
_, err = z.Conn.Set(routerPath, []byte(testYML), 0)
assert.NoError(t, err)
defer func() {
err = ts.Stop()
assert.NoError(t, err)
z.Close()
}()
Expand Down
2 changes: 1 addition & 1 deletion cluster/router/local/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ func newLocalPriorityRouteFactory() router.PriorityRouterFactory {
}

// NewPriorityRouter construct a new NewLocalDiscRouter via url
func (f *LocalPriorityRouteFactory) NewPriorityRouter(url *common.URL) (router.PriorityRouter, error) {
func (f *LocalPriorityRouteFactory) NewPriorityRouter(url *common.URL, ch chan struct{}) (router.PriorityRouter, error) {
return NewLocalPriorityRouter(url)
}
2 changes: 0 additions & 2 deletions cluster/router/tag/tag_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,6 @@ tags:

func (suite *DynamicTagRouter) TearDownTest() {
suite.zkClient.Close()
err := suite.testCluster.Stop()
suite.Nil(err)
}

func (suite *DynamicTagRouter) TestDynamicTagRouterSetByIPv4() {
Expand Down
3 changes: 1 addition & 2 deletions config_center/zookeeper/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,9 @@ func (c *zookeeperDynamicConfiguration) IsAvailable() bool {
}

func (c *zookeeperDynamicConfiguration) closeConfigs() {
logger.Infof("begin to close provider zk client")
c.cltLock.Lock()
defer c.cltLock.Unlock()
logger.Infof("begin to close provider zk client")
// Close the old client first to close the tmp node
c.client.Close()
wenxuwan marked this conversation as resolved.
Show resolved Hide resolved
c.client = nil
}
Expand Down
17 changes: 11 additions & 6 deletions config_center/zookeeper/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ func initZkData(group string, t *testing.T) (*zk.TestCluster, *zookeeperDynamicC
dubbo.service.com.ikurento.user.UserProvider.cluster=failover
`
if group != "" {
err = zreg.client.Create(path.Join(zreg.rootPath, "dubbo", dubboPropertyFileName))
err = zreg.client.Create(path.Join(zreg.rootPath, group, dubboPropertyFileName))
assert.NoError(t, err)

_, err = zreg.client.Conn.Set(path.Join(zreg.rootPath, "dubbo", dubboPropertyFileName), []byte(data), 0)
_, err = zreg.client.Conn.Set(path.Join(zreg.rootPath, group, dubboPropertyFileName), []byte(data), 0)
assert.NoError(t, err)
} else {
err = zreg.client.Create(path.Join(zreg.rootPath, dubboPropertyFileName))
Expand All @@ -100,6 +100,7 @@ func initZkData(group string, t *testing.T) (*zk.TestCluster, *zookeeperDynamicC
func TestGetConfig(t *testing.T) {
ts, reg := initZkData("dubbo", t)
defer func() {
reg.client.Close()
err := ts.Stop()
assert.NoError(t, err)
}()
Expand All @@ -122,11 +123,13 @@ func TestGetConfig(t *testing.T) {
func TestAddListener(t *testing.T) {
ts, reg := initZkData("", t)
defer func() {
reg.client.Close()
err := ts.Stop()
assert.NoError(t, err)
}()
listener := &mockDataListener{}
reg.AddListener(dubboPropertyFileName, listener)

listener.wg.Add(1)
data := `
dubbo.consumer.request_timeout=3s
Expand Down Expand Up @@ -158,6 +161,7 @@ func TestAddListener(t *testing.T) {
func TestRemoveListener(t *testing.T) {
ts, reg := initZkData("", t)
defer func() {
reg.client.Close()
err := ts.Stop()
assert.NoError(t, err)
}()
Expand Down Expand Up @@ -197,19 +201,20 @@ func TestZookeeperDynamicConfigurationPublishConfig(t *testing.T) {
value := "Test Data"
customGroup := "Custom Group"
key := "myKey"
ts, zk := initZkData(config_center.DEFAULT_GROUP, t)
ts, reg := initZkData(config_center.DEFAULT_GROUP, t)
defer func() {
reg.client.Close()
err := ts.Stop()
assert.NoError(t, err)
}()
err := zk.PublishConfig(key, customGroup, value)
err := reg.PublishConfig(key, customGroup, value)
assert.Nil(t, err)
result, err := zk.GetInternalProperty("myKey", config_center.WithGroup(customGroup))
result, err := reg.GetInternalProperty("myKey", config_center.WithGroup(customGroup))
assert.Nil(t, err)
assert.Equal(t, value, result)

var keys *gxset.HashSet
keys, err = zk.GetConfigKeysByGroup(customGroup)
keys, err = reg.GetConfigKeysByGroup(customGroup)
assert.Nil(t, err)
assert.Equal(t, 1, keys.Size())
assert.True(t, keys.Contains(key))
Expand Down
4 changes: 2 additions & 2 deletions registry/kubernetes/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

import (
"github.com/apache/dubbo-getty"
getty "github.com/apache/dubbo-getty"
perrors "github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
)
Expand Down Expand Up @@ -202,7 +202,7 @@ func (r *kubernetesRegistry) HandleClientRestart() {
failTimes int
)

defer r.WaitGroup()
defer r.WaitGroup().Done()
LOOP:
for {
select {
Expand Down
9 changes: 2 additions & 7 deletions registry/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen
l.subscribed[url.ServiceKey()] = listener
}

// UnSubscribeURL is used to set a watch listener for url
// UnSubscribeURL is used to unset a watch listener for url
func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener {
if l.closed {
return nil
}
listener := l.subscribed[url.ServiceKey()]
listener.(*RegistryConfigurationListener).Close()
delete(l.subscribed, url.ServiceKey())
return listener
}
Expand Down Expand Up @@ -142,9 +143,6 @@ func (l *RegistryConfigurationListener) Process(configType *config_center.Config
func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.client.Done():
wenxuwan marked this conversation as resolved.
Show resolved Hide resolved
logger.Warnf("listener's zk client connection (address {%s}) is broken, so zk event listener exit now.", l.client.ZkAddrs)
return nil, perrors.New("zookeeper client stopped")
case <-l.close:
return nil, perrors.New("listener have been closed")
case <-l.registry.Done():
Expand All @@ -156,9 +154,6 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
continue
}
//r.update(e.res)
//write to invoker
//r.outerEventCh <- e.res
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(*common.URL)}, nil
}
}
Expand Down
14 changes: 5 additions & 9 deletions registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (r *zkRegistry) InitListeners() {
defer oldDataListener.mutex.Unlock()
r.dataListener.closed = true
recovered := r.dataListener.subscribed
if len(recovered) > 0 {
if recovered != nil && len(recovered) > 0 {
gaoxinge marked this conversation as resolved.
Show resolved Hide resolved
// recover all subscribed url
for _, oldListener := range recovered {
var (
Expand Down Expand Up @@ -282,11 +282,7 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
//Interested register to dataconfig.
r.dataListener.SubscribeURL(conf, zkListener)

go r.listener.ListenServiceEvent(
conf,
fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())),
r.dataListener,
)
go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), r.dataListener)

return zkListener, nil
}
Expand All @@ -297,9 +293,9 @@ func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationL
r.dataListener.mutex.Lock()
configurationListener := r.dataListener.subscribed[conf.ServiceKey()]
if configurationListener != nil {
rcListener, _ := configurationListener.(*RegistryConfigurationListener)
if rcListener != nil {
if rcListener.isClosed {
zkListener, _ = configurationListener.(*RegistryConfigurationListener)
if zkListener != nil {
if zkListener.isClosed {
gaoxinge marked this conversation as resolved.
Show resolved Hide resolved
r.dataListener.mutex.Unlock()
return nil, perrors.New("configListener already been closed")
}
Expand Down
7 changes: 5 additions & 2 deletions registry/zookeeper/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ func Test_Register(t *testing.T) {
regURL, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithParamsValue("serviceid", "soa.mock"), common.WithMethods([]string{"GetUser", "AddUser"}))

ts, reg, _ := newMockZkRegistry(regURL)
ts, reg, err := newMockZkRegistry(regURL)
if err != nil {
assert.NoError(t, err)
wenxuwan marked this conversation as resolved.
Show resolved Hide resolved
}
defer func() {
_ = ts.Stop()
}()
err := reg.Register(url)
err = reg.Register(url)
children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26cluster%3Dmock%26.*.serviceid%3Dsoa.mock", children)
assert.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions registry/zookeeper/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import (
"strings"
"sync"
)

import (
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/hash/page"
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/hash/page"
perrors "github.com/pkg/errors"
)

Expand Down Expand Up @@ -111,6 +110,7 @@ func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error
if err != nil {
return nil, err
}
zksd.WaitGroup().Add(1) //zk client start successful, then wg +1
go zookeeper.HandleClientRestart(zksd)
zksd.csd = curator_discovery.NewServiceDiscovery(zksd.client, rootPath)
return zksd, nil
Expand Down
Loading