From 34e8ff031959ca99e662ea0e632db0edd6fd8514 Mon Sep 17 00:00:00 2001 From: seeflood <349895584@qq.com> Date: Tue, 13 Jul 2021 21:22:35 +0800 Subject: [PATCH] fix configuration API ut error (#134) --- components/configstores/configstore.go | 2 ++ go.mod | 2 +- go.sum | 3 +++ pkg/grpc/api.go | 25 +++++++++++++++++++++---- pkg/grpc/api_test.go | 6 ++++-- 5 files changed, 31 insertions(+), 7 deletions(-) diff --git a/components/configstores/configstore.go b/components/configstores/configstore.go index 741346f3b9..aad351bff5 100644 --- a/components/configstores/configstore.go +++ b/components/configstores/configstore.go @@ -53,7 +53,9 @@ type Store interface { //StopSubscribe stop subs StopSubscribe() + // GetDefaultGroup returns default group.This method will be invoked if a request doesn't specify the group field GetDefaultGroup() string + // GetDefaultLabel returns default label GetDefaultLabel() string } diff --git a/go.mod b/go.mod index f9dcb73ba6..fdf26b8a10 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/dapr/components-contrib v1.2.0 github.com/dapr/kit v0.0.1 github.com/gammazero/workerpool v1.1.2 - github.com/golang/mock v1.4.4 + github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.2.0 github.com/json-iterator/go v1.1.11 diff --git a/go.sum b/go.sum index 9d72318519..2eda6641da 100644 --- a/go.sum +++ b/go.sum @@ -478,6 +478,8 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v0.0.0-20181025225059-d3de96c4c28e/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -1557,6 +1559,7 @@ golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82u golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/grpc/api.go b/pkg/grpc/api.go index 5713dd688e..a27e943c73 100644 --- a/pkg/grpc/api.go +++ b/pkg/grpc/api.go @@ -270,43 +270,58 @@ func (a *api) SubscribeConfiguration(sub runtimev1pb.Runtime_SubscribeConfigurat respCh := make(chan *configstores.SubscribeResp) recvExitCh := make(chan struct{}) subscribedStore := make([]configstores.Store, 0, 1) + // TODO currently this goroutine model is error-prone,and it should be refactored after new version of configuration API being accepted + // 1. start a reader goroutine go func() { defer wg.Done() for { + // 1.1. read stream req, err := sub.Recv() + // 1.2. if an error happens,stop all the subscribers if err != nil { log.DefaultLogger.Errorf("occur error in subscribe, err: %+v", err) + // stop all the subscribers for _, store := range subscribedStore { + // TODO this method will stop subscribers created by other connections.Should be refactored store.StopSubscribe() } subErr = err - if len(subscribedStore) == 0 { - close(recvExitCh) - } + // stop writer goroutine + close(recvExitCh) return } + // 1.3. else find the component and delegate to it store, ok := a.configStores[req.StoreName] + // 1.3.1. stop if StoreName is not supported if !ok { log.DefaultLogger.Errorf("configure store [%+v] don't support now", req.StoreName) + // stop all the subscribers + for _, store := range subscribedStore { + store.StopSubscribe() + } subErr = errors.New(fmt.Sprintf("configure store [%+v] don't support now", req.StoreName)) + // stop writer goroutine close(recvExitCh) return } + // 1.3.2. use default settings if blank if strings.ReplaceAll(req.Group, " ", "") == "" { req.Group = store.GetDefaultGroup() } if strings.ReplaceAll(req.Label, " ", "") == "" { req.Label = store.GetDefaultLabel() } + // 1.3.3. delegate to the component store.Subscribe(&configstores.SubscribeReq{AppId: req.AppId, Group: req.Group, Label: req.Label, Keys: req.Keys, Metadata: req.Metadata}, respCh) subscribedStore = append(subscribedStore, store) } }() - + // 2. start a writer goroutine go func() { defer wg.Done() for { select { + // read response from components case resp, ok := <-respCh: if !ok { return @@ -315,7 +330,9 @@ func (a *api) SubscribeConfiguration(sub runtimev1pb.Runtime_SubscribeConfigurat for _, item := range resp.Items { items = append(items, &runtimev1pb.ConfigurationItem{Group: item.Group, Label: item.Label, Key: item.Key, Content: item.Content, Tags: item.Tags, Metadata: item.Metadata}) } + // write to response stream sub.Send(&runtimev1pb.SubscribeConfigurationResponse{StoreName: resp.StoreName, AppId: resp.StoreName, Items: items}) + // read exit signal case <-recvExitCh: return } diff --git a/pkg/grpc/api_test.go b/pkg/grpc/api_test.go index 8bd5f3134b..6af67cf932 100644 --- a/pkg/grpc/api_test.go +++ b/pkg/grpc/api_test.go @@ -162,16 +162,18 @@ func TestDeleteConfiguration(t *testing.T) { func TestSubscribeConfiguration(t *testing.T) { ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockConfigStore := mock.NewMockStore(ctrl) + api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil) + //test not support store type grpcServer := &MockGrpcServer{req: &runtimev1pb.SubscribeConfigurationRequest{}, err: nil} - api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil) err := api.SubscribeConfiguration(grpcServer) assert.NotNil(t, err) assert.Equal(t, err.Error(), "configure store [] don't support now") //test - mockConfigStore.EXPECT().StopSubscribe().Return().Times(1) grpcServer2 := &MockGrpcServer{req: &runtimev1pb.SubscribeConfigurationRequest{}, err: errors.New("exit")} err = api.SubscribeConfiguration(grpcServer2) assert.NotNil(t, err)