Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr: etcdv3 for registry #148

Merged
merged 39 commits into from
Aug 12, 2019
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7a34fb2
ETCD tmp complete
Jul 19, 2019
549f4ae
merge && add etcdv3 remote pkg
Jul 23, 2019
c06c8a2
etcdv3 basic complete
Jul 24, 2019
14b198d
ADD etcdv3 basic complete
Jul 24, 2019
07c1d31
reset default config
Jul 24, 2019
61e5ec5
Fix pkg name
Jul 24, 2019
27809f3
Basic function comleted, wait for refactor
Jul 28, 2019
7defc21
Refactor remote registry
Jul 30, 2019
13acf67
ADD etcdv3 registry for dubbo
Jul 31, 2019
898fce9
ADD RACE for etcd test
Jul 31, 2019
0cdbd1d
ADD star&&stop etcd for ut
Jul 31, 2019
b40d56d
ADD etcdctl for ut
Jul 31, 2019
8fa6f5b
Fix etcd work-dir
Jul 31, 2019
d071e25
Fix etcd start && stop
Jul 31, 2019
09a4d07
FMT registry/etcdv3 remoting/v3
Jul 31, 2019
cb529f6
Fix etcd start cmd
Jul 31, 2019
8178ee4
wait for etcd start
Jul 31, 2019
60a490b
wait for etcd start
Jul 31, 2019
13e218d
ADD TestMain for ut
Jul 31, 2019
0e48540
Fix UT framework
Aug 1, 2019
bb4c0d1
Finish etcd registry
Aug 1, 2019
cabd9fc
Fix etcd remoting etcd endpionts
Aug 1, 2019
4576e92
ADD remoting/etcdv3 && registry/etcdv3
Aug 1, 2019
f6e6268
fmt project
Aug 3, 2019
4d3233c
Update go.sum to Fix github.com/coreos/[email protected]+incompatible: che…
Aug 3, 2019
97d0977
Fix etcd dep
Aug 3, 2019
a836ecf
Fix etcd dependency
Aug 3, 2019
f69675e
Fix etcd dependency
Aug 3, 2019
ca5a700
Fix depend with go.etcd.io
Aug 3, 2019
8444dc4
Fix GOPROXY cause dependency mistake
Aug 3, 2019
e574713
Fix export GOPROXY= && retidy
invalid-email-address Aug 3, 2019
21f7c5f
DEL go1.11 support
Aug 3, 2019
dff5a0a
Fix fmt and remove use juju/errors
Aug 10, 2019
86a197a
Fix comflict in go.mod && fmt project
Aug 10, 2019
b0f3e19
Fix deadlock && len(string) ==0 && rename errors-> perrors
Aug 10, 2019
e9a168b
Fix etcd event log format
invalid-email-address Aug 12, 2019
069d5fe
Fix rename facede_test.go facade_test.go
invalid-email-address Aug 12, 2019
1e91d2a
Fix go.sum conflict with develop branch
Aug 12, 2019
48e3abf
Fix merge from local branch
Aug 12, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,35 @@ module github.com/apache/dubbo-go

require (
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dubbogo/getty v1.2.2
github.com/dubbogo/gost v1.1.1
github.com/gogo/protobuf v1.2.1 // indirect
github.com/google/btree v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/juju/errors v1.0.0-20190207033735-e65537c515d7
github.com/magiconair/properties v1.8.1
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0 // indirect
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/stretchr/testify v1.3.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/etcd v3.3.13+incompatible
go.uber.org/atomic v1.4.0
go.uber.org/zap v1.10.0
golang.org/x/time v0.0.0-20190513212739-9d24e82272b4 // indirect
google.golang.org/grpc v1.22.1
gopkg.in/yaml.v2 v2.2.2
)
142 changes: 140 additions & 2 deletions go.sum

Large diffs are not rendered by default.

88 changes: 88 additions & 0 deletions registry/etcdv3/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package etcdv3

import (
"context"
"strings"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
)

import (
"github.com/juju/errors"
sxllwx marked this conversation as resolved.
Show resolved Hide resolved
)

type dataListener struct {
interestedURL []*common.URL
listener remoting.ConfigurationListener
}

func NewRegistryDataListener(listener remoting.ConfigurationListener) *dataListener {
return &dataListener{listener: listener, interestedURL: []*common.URL{}}
}

func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}

func (l *dataListener) DataChange(eventType remoting.Event) bool {

url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):]
serviceURL, err := common.NewURL(context.TODO(), url)
sxllwx marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
return false
}

for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(&remoting.ConfigChangeEvent{Key: eventType.Path, Value: serviceURL, ConfigType: eventType.Action})
return true
}
}

return false
}

type configurationListener struct {
registry *etcdV3Registry
events chan *remoting.ConfigChangeEvent
}

func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
// add a new waiter
reg.wg.Add(1)
return &configurationListener{registry: reg, events: make(chan *remoting.ConfigChangeEvent, 32)}
}
func (l *configurationListener) Process(configType *remoting.ConfigChangeEvent) {
l.events <- configType
}

func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.registry.done:
logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.")
return nil, errors.New("listener stopped")

case e := <-l.events:
logger.Warnf("got etcd event %#s", e)
sxllwx marked this conversation as resolved.
Show resolved Hide resolved
if e.ConfigType == remoting.EventTypeDel {
select {
case <-l.registry.done:
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
default:
}
continue
}
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
}
}
}
func (l *configurationListener) Close() {
l.registry.wg.Done()
}
68 changes: 68 additions & 0 deletions registry/etcdv3/listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package etcdv3

import (
"context"
"testing"
"time"

"github.com/apache/dubbo-go/common"
sxllwx marked this conversation as resolved.
Show resolved Hide resolved
"github.com/apache/dubbo-go/remoting"
"github.com/stretchr/testify/suite"
"go.etcd.io/etcd/embed"
)

type RegistryTestSuite struct {
suite.Suite
etcd *embed.Etcd
}

// start etcd server
func (suite *RegistryTestSuite) SetupSuite() {

t := suite.T()

cfg := embed.NewConfig()
cfg.Dir = "/tmp/default.etcd"
e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
select {
case <-e.Server.ReadyNotify():
t.Log("Server is ready!")
case <-time.After(60 * time.Second):
sxllwx marked this conversation as resolved.
Show resolved Hide resolved
e.Server.Stop() // trigger a shutdown
t.Logf("Server took too long to start!")
}

suite.etcd = e
return
}

// stop etcd server
func (suite *RegistryTestSuite) TearDownSuite() {
suite.etcd.Close()
}

func (suite *RegistryTestSuite) TestDataChange() {

t := suite.T()

listener := NewRegistryDataListener(&MockDataListener{})
url, _ := common.NewURL(context.TODO(), "jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100")
listener.AddInterestedURL(&url)
if !listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"}) {
t.Fatal("data change not ok")
}
}

func TestRegistrySuite(t *testing.T) {
suite.Run(t, &RegistryTestSuite{})
}

type MockDataListener struct {
sxllwx marked this conversation as resolved.
Show resolved Hide resolved
}

func (*MockDataListener) Process(configType *remoting.ConfigChangeEvent) {

sxllwx marked this conversation as resolved.
Show resolved Hide resolved
}
Loading