Skip to content

Commit

Permalink
fix: when entering idle mode, the old resolve-related resources have …
Browse files Browse the repository at this point in the history
…not been completely processed, and an exception occurs when exiting idle mode immediately. (#3402)

fix(test): fix gotest DATA RACE

fix(test): gotest errchecked

chore(gotest): gofmt

chore(gotest): consul address
  • Loading branch information
harbourlga authored Oct 8, 2024
1 parent 186ab88 commit 3395079
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 28 deletions.
10 changes: 2 additions & 8 deletions contrib/registry/consul/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, er
services: &atomic.Value{},
serviceName: name,
}
set.ctx, set.cancel = context.WithCancel(context.Background())
r.registry[name] = set
}

Expand All @@ -209,10 +208,8 @@ func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, er
// otherwise the initial data may be blocked forever during the watch.
w.event <- struct{}{}
}
if !ok {
if err := r.resolve(set.ctx, set); err != nil {
return nil, err
}
if err := r.resolve(ctx, set); err != nil {
return nil, err
}
return w, nil
}
Expand Down Expand Up @@ -248,9 +245,6 @@ func (r *Registry) resolve(ctx context.Context, ss *serviceSet) error {
}
idx = tmpIdx
case <-ctx.Done():
r.lock.Lock()
delete(r.registry, ss.serviceName)
r.lock.Unlock()
return
}
}
Expand Down
156 changes: 144 additions & 12 deletions contrib/registry/consul/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,17 @@ func TestRegistry_IdleAndWatch(t *testing.T) {
Version: "v0.0.1",
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
}
instance2 := &registry.ServiceInstance{
ID: "1",
Name: "server-1",
Version: "v0.0.2",
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
}

type args struct {
ctx context.Context
instance *registry.ServiceInstance
ctx context.Context
instance *registry.ServiceInstance
changeInstance *registry.ServiceInstance
}

tests := []struct {
Expand All @@ -445,8 +452,9 @@ func TestRegistry_IdleAndWatch(t *testing.T) {
{
name: "many client, one idle",
args: args{
ctx: context.Background(),
instance: instance1,
ctx: context.Background(),
instance: instance1,
changeInstance: instance2,
},
want: []*registry.ServiceInstance{instance1},
wantErr: false,
Expand Down Expand Up @@ -504,9 +512,7 @@ func TestRegistry_IdleAndWatch(t *testing.T) {
}
}
time.Sleep(2 * time.Second)
change := tt.args.instance
change.Version = "v0.0.2"
err = r.Register(tt.args.ctx, change)
err = r.Register(tt.args.ctx, tt.args.changeInstance)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -574,7 +580,7 @@ func TestRegistry_IdleAndWatch2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 10; i++ {
stopCtx, stopCancel := context.WithCancel(ctx)
watch, err1 := r.Watch(context.Background(), tt.args.instance.Name)
watch, err1 := r.Watch(stopCtx, tt.args.instance.Name)
if err1 != nil {
t.Error(err1)
}
Expand All @@ -586,10 +592,6 @@ func TestRegistry_IdleAndWatch2(t *testing.T) {
t.Errorf("GetService() got = %v", service)
return
}
_, err2 = watch.Next()
if err2 == nil {
t.Errorf("watch exit exception:%d ", i)
}
}(i)
go func() {
select {
Expand Down Expand Up @@ -631,6 +633,136 @@ func TestRegistry_IdleAndWatch2(t *testing.T) {
}
}

func TestRegistry_ExitOldResolverAndReWatch(t *testing.T) {
addr := fmt.Sprintf("%s:9091", getIntranetIP())

time.Sleep(time.Millisecond * 100)
cli, err := api.NewClient(&api.Config{Address: "127.0.0.1:8500", WaitTime: 2 * time.Second})
if err != nil {
t.Fatalf("create consul client failed: %v", err)
}

instance1 := &registry.ServiceInstance{
ID: "1",
Name: "server-1",
Version: "v0.0.1",
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
}
instance2 := &registry.ServiceInstance{
ID: "2",
Name: "server-1",
Version: "v0.0.2",
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
}
type args struct {
ctx context.Context
opts []Option
instance *registry.ServiceInstance
initialInstance *registry.ServiceInstance
}

tests := []struct {
name string
args args
want []*registry.ServiceInstance
wantErr bool
}{
{
name: "When it has entered idle mode, but the old resolver has not completely exited, the watch will be re-established due to new requests coming in.",
args: args{
ctx: context.Background(),
initialInstance: instance1,
instance: instance2,
opts: []Option{
WithHealthCheck(false),
WithTimeout(time.Second * 2),
},
},
want: []*registry.ServiceInstance{instance2},
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := New(cli, tt.args.opts...)

err = r.Register(tt.args.ctx, tt.args.initialInstance)
if err != nil {
t.Error(err)
}
// first watch
ctx, cancel := context.WithCancel(context.Background())
watch, err := r.Watch(ctx, tt.args.instance.Name)
if err != nil {
t.Error(err)
}
service, err := watch.Next()
if (err != nil) != tt.wantErr {
t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService() got = %v", service)
}

time.Sleep(time.Second * 3)
// The simulation entered idle mode first, but the old resolver was not closed yet, and new requests triggered a new Watch.
watchCtx := context.Background()
// old resolver cancel
err = watch.Stop()
if err != nil {
t.Errorf("watch stop err:%v", err)
}
cancel()
// If it sleeps for a period of time, the old resolve goroutine will exit before the new Watch is processed, and there will be no problems at this time.
// time.Sleep(time.Second * 8)
newWatch, err := r.Watch(watchCtx, tt.args.instance.Name)
if err != nil {
t.Error(err)
}
service, err = newWatch.Next()
if (err != nil) != tt.wantErr {
t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService() got = %v", service)
}
// change register info
time.Sleep(time.Second * 1)
err = r.Deregister(tt.args.ctx, tt.args.initialInstance)
if err != nil {
t.Error(err)
}
time.Sleep(time.Second * 5)
err = r.Register(tt.args.ctx, tt.args.instance)
if err != nil {
t.Error(err)
}

time.Sleep(time.Second * 2)

newWatchCtx, newWatchCancel := context.WithCancel(context.Background())
c := make(chan struct{}, 1)

go func() {
service, err = newWatch.Next()
if (err != nil) != tt.wantErr {
t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService() got = %v", service)
return
}
if !reflect.DeepEqual(service, tt.want) {
t.Errorf("GetService() got = %v, want %v", service, tt.want)
}
c <- struct{}{}
}()
time.AfterFunc(time.Second*10, newWatchCancel)
select {
case <-newWatchCtx.Done():
t.Errorf("Timeout getservice. May be no new resolve goroutine to obtain the latest service information")
case <-c:
return
}
})
}
}

func getIntranetIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions contrib/registry/consul/service.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package consul

import (
"context"
"sync"
"sync/atomic"

Expand All @@ -13,9 +12,6 @@ type serviceSet struct {
watcher map[*watcher]struct{}
services *atomic.Value
lock sync.RWMutex

ctx context.Context
cancel context.CancelFunc
}

func (s *serviceSet) broadcast(ss []*registry.ServiceInstance) {
Expand Down
4 changes: 0 additions & 4 deletions contrib/registry/consul/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,5 @@ func (w *watcher) Stop() error {
w.set.lock.Lock()
defer w.set.lock.Unlock()
delete(w.set.watcher, w)
// close resolve
if len(w.set.watcher) == 0 {
w.set.cancel()
}
return nil
}

0 comments on commit 3395079

Please sign in to comment.