Skip to content

Commit

Permalink
fix configuration API ut error (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
seeflood authored Jul 13, 2021
1 parent eb3b92b commit 34e8ff0
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 7 deletions.
2 changes: 2 additions & 0 deletions components/configstores/configstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ type Store interface {
//StopSubscribe stop subs
StopSubscribe()

// GetDefaultGroup returns default group.This method will be invoked if a request doesn't specify the group field
GetDefaultGroup() string

// GetDefaultLabel returns default label
GetDefaultLabel() string
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/dapr/components-contrib v1.2.0
github.com/dapr/kit v0.0.1
github.com/gammazero/workerpool v1.1.2
github.com/golang/mock v1.4.4
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.2.0
github.com/json-iterator/go v1.1.11
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v0.0.0-20181025225059-d3de96c4c28e/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -1557,6 +1559,7 @@ golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82u
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA=
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
25 changes: 21 additions & 4 deletions pkg/grpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,43 +270,58 @@ func (a *api) SubscribeConfiguration(sub runtimev1pb.Runtime_SubscribeConfigurat
respCh := make(chan *configstores.SubscribeResp)
recvExitCh := make(chan struct{})
subscribedStore := make([]configstores.Store, 0, 1)
// TODO currently this goroutine model is error-prone,and it should be refactored after new version of configuration API being accepted
// 1. start a reader goroutine
go func() {
defer wg.Done()
for {
// 1.1. read stream
req, err := sub.Recv()
// 1.2. if an error happens,stop all the subscribers
if err != nil {
log.DefaultLogger.Errorf("occur error in subscribe, err: %+v", err)
// stop all the subscribers
for _, store := range subscribedStore {
// TODO this method will stop subscribers created by other connections.Should be refactored
store.StopSubscribe()
}
subErr = err
if len(subscribedStore) == 0 {
close(recvExitCh)
}
// stop writer goroutine
close(recvExitCh)
return
}
// 1.3. else find the component and delegate to it
store, ok := a.configStores[req.StoreName]
// 1.3.1. stop if StoreName is not supported
if !ok {
log.DefaultLogger.Errorf("configure store [%+v] don't support now", req.StoreName)
// stop all the subscribers
for _, store := range subscribedStore {
store.StopSubscribe()
}
subErr = errors.New(fmt.Sprintf("configure store [%+v] don't support now", req.StoreName))
// stop writer goroutine
close(recvExitCh)
return
}
// 1.3.2. use default settings if blank
if strings.ReplaceAll(req.Group, " ", "") == "" {
req.Group = store.GetDefaultGroup()
}
if strings.ReplaceAll(req.Label, " ", "") == "" {
req.Label = store.GetDefaultLabel()
}
// 1.3.3. delegate to the component
store.Subscribe(&configstores.SubscribeReq{AppId: req.AppId, Group: req.Group, Label: req.Label, Keys: req.Keys, Metadata: req.Metadata}, respCh)
subscribedStore = append(subscribedStore, store)
}
}()

// 2. start a writer goroutine
go func() {
defer wg.Done()
for {
select {
// read response from components
case resp, ok := <-respCh:
if !ok {
return
Expand All @@ -315,7 +330,9 @@ func (a *api) SubscribeConfiguration(sub runtimev1pb.Runtime_SubscribeConfigurat
for _, item := range resp.Items {
items = append(items, &runtimev1pb.ConfigurationItem{Group: item.Group, Label: item.Label, Key: item.Key, Content: item.Content, Tags: item.Tags, Metadata: item.Metadata})
}
// write to response stream
sub.Send(&runtimev1pb.SubscribeConfigurationResponse{StoreName: resp.StoreName, AppId: resp.StoreName, Items: items})
// read exit signal
case <-recvExitCh:
return
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/grpc/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,18 @@ func TestDeleteConfiguration(t *testing.T) {

func TestSubscribeConfiguration(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockConfigStore := mock.NewMockStore(ctrl)
api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil)

//test not support store type
grpcServer := &MockGrpcServer{req: &runtimev1pb.SubscribeConfigurationRequest{}, err: nil}
api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil)
err := api.SubscribeConfiguration(grpcServer)
assert.NotNil(t, err)
assert.Equal(t, err.Error(), "configure store [] don't support now")

//test
mockConfigStore.EXPECT().StopSubscribe().Return().Times(1)
grpcServer2 := &MockGrpcServer{req: &runtimev1pb.SubscribeConfigurationRequest{}, err: errors.New("exit")}
err = api.SubscribeConfiguration(grpcServer2)
assert.NotNil(t, err)
Expand Down

0 comments on commit 34e8ff0

Please sign in to comment.