Skip to content

Commit

Permalink
Merge branch 'main' into zookeeper_lock
Browse files Browse the repository at this point in the history
  • Loading branch information
seeflood authored Jul 13, 2021
2 parents c717e80 + 34e8ff0 commit bbb32be
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 6 deletions.
2 changes: 2 additions & 0 deletions components/configstores/configstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ type Store interface {
//StopSubscribe stop subs
StopSubscribe()

// GetDefaultGroup returns default group.This method will be invoked if a request doesn't specify the group field
GetDefaultGroup() string

// GetDefaultLabel returns default label
GetDefaultLabel() string
}
25 changes: 21 additions & 4 deletions pkg/grpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,43 +270,58 @@ func (a *api) SubscribeConfiguration(sub runtimev1pb.Runtime_SubscribeConfigurat
respCh := make(chan *configstores.SubscribeResp)
recvExitCh := make(chan struct{})
subscribedStore := make([]configstores.Store, 0, 1)
// TODO currently this goroutine model is error-prone,and it should be refactored after new version of configuration API being accepted
// 1. start a reader goroutine
go func() {
defer wg.Done()
for {
// 1.1. read stream
req, err := sub.Recv()
// 1.2. if an error happens,stop all the subscribers
if err != nil {
log.DefaultLogger.Errorf("occur error in subscribe, err: %+v", err)
// stop all the subscribers
for _, store := range subscribedStore {
// TODO this method will stop subscribers created by other connections.Should be refactored
store.StopSubscribe()
}
subErr = err
if len(subscribedStore) == 0 {
close(recvExitCh)
}
// stop writer goroutine
close(recvExitCh)
return
}
// 1.3. else find the component and delegate to it
store, ok := a.configStores[req.StoreName]
// 1.3.1. stop if StoreName is not supported
if !ok {
log.DefaultLogger.Errorf("configure store [%+v] don't support now", req.StoreName)
// stop all the subscribers
for _, store := range subscribedStore {
store.StopSubscribe()
}
subErr = errors.New(fmt.Sprintf("configure store [%+v] don't support now", req.StoreName))
// stop writer goroutine
close(recvExitCh)
return
}
// 1.3.2. use default settings if blank
if strings.ReplaceAll(req.Group, " ", "") == "" {
req.Group = store.GetDefaultGroup()
}
if strings.ReplaceAll(req.Label, " ", "") == "" {
req.Label = store.GetDefaultLabel()
}
// 1.3.3. delegate to the component
store.Subscribe(&configstores.SubscribeReq{AppId: req.AppId, Group: req.Group, Label: req.Label, Keys: req.Keys, Metadata: req.Metadata}, respCh)
subscribedStore = append(subscribedStore, store)
}
}()

// 2. start a writer goroutine
go func() {
defer wg.Done()
for {
select {
// read response from components
case resp, ok := <-respCh:
if !ok {
return
Expand All @@ -315,7 +330,9 @@ func (a *api) SubscribeConfiguration(sub runtimev1pb.Runtime_SubscribeConfigurat
for _, item := range resp.Items {
items = append(items, &runtimev1pb.ConfigurationItem{Group: item.Group, Label: item.Label, Key: item.Key, Content: item.Content, Tags: item.Tags, Metadata: item.Metadata})
}
// write to response stream
sub.Send(&runtimev1pb.SubscribeConfigurationResponse{StoreName: resp.StoreName, AppId: resp.StoreName, Items: items})
// read exit signal
case <-recvExitCh:
return
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/grpc/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,18 @@ func TestDeleteConfiguration(t *testing.T) {

func TestSubscribeConfiguration(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockConfigStore := mock.NewMockStore(ctrl)
api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil)

//test not support store type
grpcServer := &MockGrpcServer{req: &runtimev1pb.SubscribeConfigurationRequest{}, err: nil}
api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil)
err := api.SubscribeConfiguration(grpcServer)
assert.NotNil(t, err)
assert.Equal(t, err.Error(), "configure store [] don't support now")

//test
mockConfigStore.EXPECT().StopSubscribe().Return().Times(1)
grpcServer2 := &MockGrpcServer{req: &runtimev1pb.SubscribeConfigurationRequest{}, err: errors.New("exit")}
err = api.SubscribeConfiguration(grpcServer2)
assert.NotNil(t, err)
Expand Down

0 comments on commit bbb32be

Please sign in to comment.