diff --git a/components/configstores/configstore.go b/components/configstores/configstore.go index 741346f3b9..aad351bff5 100644 --- a/components/configstores/configstore.go +++ b/components/configstores/configstore.go @@ -53,7 +53,9 @@ type Store interface { //StopSubscribe stop subs StopSubscribe() + // GetDefaultGroup returns default group.This method will be invoked if a request doesn't specify the group field GetDefaultGroup() string + // GetDefaultLabel returns default label GetDefaultLabel() string } diff --git a/pkg/grpc/api.go b/pkg/grpc/api.go index 5713dd688e..a27e943c73 100644 --- a/pkg/grpc/api.go +++ b/pkg/grpc/api.go @@ -270,43 +270,58 @@ func (a *api) SubscribeConfiguration(sub runtimev1pb.Runtime_SubscribeConfigurat respCh := make(chan *configstores.SubscribeResp) recvExitCh := make(chan struct{}) subscribedStore := make([]configstores.Store, 0, 1) + // TODO currently this goroutine model is error-prone,and it should be refactored after new version of configuration API being accepted + // 1. start a reader goroutine go func() { defer wg.Done() for { + // 1.1. read stream req, err := sub.Recv() + // 1.2. if an error happens,stop all the subscribers if err != nil { log.DefaultLogger.Errorf("occur error in subscribe, err: %+v", err) + // stop all the subscribers for _, store := range subscribedStore { + // TODO this method will stop subscribers created by other connections.Should be refactored store.StopSubscribe() } subErr = err - if len(subscribedStore) == 0 { - close(recvExitCh) - } + // stop writer goroutine + close(recvExitCh) return } + // 1.3. else find the component and delegate to it store, ok := a.configStores[req.StoreName] + // 1.3.1. stop if StoreName is not supported if !ok { log.DefaultLogger.Errorf("configure store [%+v] don't support now", req.StoreName) + // stop all the subscribers + for _, store := range subscribedStore { + store.StopSubscribe() + } subErr = errors.New(fmt.Sprintf("configure store [%+v] don't support now", req.StoreName)) + // stop writer goroutine close(recvExitCh) return } + // 1.3.2. use default settings if blank if strings.ReplaceAll(req.Group, " ", "") == "" { req.Group = store.GetDefaultGroup() } if strings.ReplaceAll(req.Label, " ", "") == "" { req.Label = store.GetDefaultLabel() } + // 1.3.3. delegate to the component store.Subscribe(&configstores.SubscribeReq{AppId: req.AppId, Group: req.Group, Label: req.Label, Keys: req.Keys, Metadata: req.Metadata}, respCh) subscribedStore = append(subscribedStore, store) } }() - + // 2. start a writer goroutine go func() { defer wg.Done() for { select { + // read response from components case resp, ok := <-respCh: if !ok { return @@ -315,7 +330,9 @@ func (a *api) SubscribeConfiguration(sub runtimev1pb.Runtime_SubscribeConfigurat for _, item := range resp.Items { items = append(items, &runtimev1pb.ConfigurationItem{Group: item.Group, Label: item.Label, Key: item.Key, Content: item.Content, Tags: item.Tags, Metadata: item.Metadata}) } + // write to response stream sub.Send(&runtimev1pb.SubscribeConfigurationResponse{StoreName: resp.StoreName, AppId: resp.StoreName, Items: items}) + // read exit signal case <-recvExitCh: return } diff --git a/pkg/grpc/api_test.go b/pkg/grpc/api_test.go index 8bd5f3134b..6af67cf932 100644 --- a/pkg/grpc/api_test.go +++ b/pkg/grpc/api_test.go @@ -162,16 +162,18 @@ func TestDeleteConfiguration(t *testing.T) { func TestSubscribeConfiguration(t *testing.T) { ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockConfigStore := mock.NewMockStore(ctrl) + api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil) + //test not support store type grpcServer := &MockGrpcServer{req: &runtimev1pb.SubscribeConfigurationRequest{}, err: nil} - api := NewAPI("", nil, map[string]configstores.Store{"mock": mockConfigStore}, nil, nil, nil, nil) err := api.SubscribeConfiguration(grpcServer) assert.NotNil(t, err) assert.Equal(t, err.Error(), "configure store [] don't support now") //test - mockConfigStore.EXPECT().StopSubscribe().Return().Times(1) grpcServer2 := &MockGrpcServer{req: &runtimev1pb.SubscribeConfigurationRequest{}, err: errors.New("exit")} err = api.SubscribeConfiguration(grpcServer2) assert.NotNil(t, err)