diff --git a/pkg/app/piped/executor/waitapproval/BUILD.bazel b/pkg/app/piped/executor/waitapproval/BUILD.bazel index 62096e90a6..d25e1ab4b4 100644 --- a/pkg/app/piped/executor/waitapproval/BUILD.bazel +++ b/pkg/app/piped/executor/waitapproval/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", @@ -12,3 +12,18 @@ go_library( "@org_uber_go_zap//:go_default_library", ], ) + +go_test( + name = "go_default_test", + size = "small", + srcs = ["waitapproval_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/app/api/service/pipedservice:go_default_library", + "//pkg/app/piped/executor:go_default_library", + "//pkg/app/piped/metadatastore:go_default_library", + "//pkg/model:go_default_library", + "@com_github_stretchr_testify//assert:go_default_library", + "@org_golang_google_grpc//:go_default_library", + ], +) diff --git a/pkg/app/piped/executor/waitapproval/waitapproval.go b/pkg/app/piped/executor/waitapproval/waitapproval.go index add7d62827..2a2f16a444 100644 --- a/pkg/app/piped/executor/waitapproval/waitapproval.go +++ b/pkg/app/piped/executor/waitapproval/waitapproval.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" "go.uber.org/zap" @@ -61,13 +62,13 @@ func (e *Executor) Execute(sig executor.StopSignal) model.StageStatus { timer := time.NewTimer(timeout) e.reportRequiringApproval() - e.LogPersister.Info("Waiting for an approval...") + + num := e.StageConfig.WaitApprovalStageOptions.MinApproverNum + e.LogPersister.Infof("Waiting for approval from at least %d user(s)...", num) for { select { case <-ticker.C: - if commander, ok := e.checkApproval(ctx); ok { - e.reportApproved(commander) - e.LogPersister.Infof("Got an approval from %s", commander) + if e.checkApproval(ctx, num) { return model.StageStatus_STAGE_SUCCESS } @@ -87,7 +88,7 @@ func (e *Executor) Execute(sig executor.StopSignal) model.StageStatus { } } -func (e *Executor) checkApproval(ctx context.Context) (string, bool) { +func (e *Executor) checkApproval(ctx context.Context, num int) bool { var approveCmd *model.ReportableCommand commands := e.CommandLister.ListCommands() @@ -98,21 +99,14 @@ func (e *Executor) checkApproval(ctx context.Context) (string, bool) { } } if approveCmd == nil { - return "", false - } - - metadata := map[string]string{ - approvedByKey: approveCmd.Commander, - } - if err := e.MetadataStore.Stage(e.Stage.Id).PutMulti(ctx, metadata); err != nil { - e.LogPersister.Errorf("Unabled to save approver information to deployment, %v", err) - return "", false + return false } + reached := e.validateApproverNum(ctx, approveCmd.Commander, num) if err := approveCmd.Report(ctx, model.CommandStatus_COMMAND_SUCCEEDED, nil, nil); err != nil { e.Logger.Error("failed to report handled command", zap.Error(err)) } - return approveCmd.Commander, true + return reached } func (e *Executor) reportApproved(approver string) { @@ -161,3 +155,45 @@ func (e *Executor) getMentionedAccounts(event model.NotificationEventType) ([]st return notification.FindSlackAccounts(event), nil } + +// validateApproverNum checks if number of approves is valid. +func (e *Executor) validateApproverNum(ctx context.Context, approver string, minApproverNum int) bool { + if minApproverNum == 1 { + if err := e.MetadataStore.Stage(e.Stage.Id).Put(ctx, approvedByKey, approver); err != nil { + e.LogPersister.Errorf("Unable to save approver information to deployment, %v", err) + } + e.LogPersister.Infof("Got approval from %q", approver) + e.reportApproved(approver) + e.LogPersister.Infof("This stage has been approved by %d user (%s)", minApproverNum, approver) + return true + } + + const delimiter = ", " + as, _ := e.MetadataStore.Stage(e.Stage.Id).Get(approvedByKey) + var approvedUsers []string + if as != "" { + approvedUsers = strings.Split(as, delimiter) + } + + for _, u := range approvedUsers { + if u == approver { + e.LogPersister.Infof("Approval from the same user (%s) will not be counted", approver) + return false + } + } + e.LogPersister.Infof("Got approval from %q", approver) + approvedUsers = append(approvedUsers, approver) + aus := strings.Join(approvedUsers, delimiter) + + if err := e.MetadataStore.Stage(e.Stage.Id).Put(ctx, approvedByKey, aus); err != nil { + e.LogPersister.Errorf("Unable to save approver information to deployment, %v", err) + } + if remain := minApproverNum - len(approvedUsers); remain > 0 { + e.LogPersister.Infof("Waiting for %d other approvers...", remain) + return false + } + e.reportApproved(aus) + e.LogPersister.Info("Received all needed approvals") + e.LogPersister.Infof("This stage has been approved by %d users (%s)", minApproverNum, aus) + return true +} diff --git a/pkg/app/piped/executor/waitapproval/waitapproval_test.go b/pkg/app/piped/executor/waitapproval/waitapproval_test.go new file mode 100644 index 0000000000..4c2c212e22 --- /dev/null +++ b/pkg/app/piped/executor/waitapproval/waitapproval_test.go @@ -0,0 +1,217 @@ +// Copyright 2021 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package waitapproval + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + + "github.com/pipe-cd/pipe/pkg/app/api/service/pipedservice" + "github.com/pipe-cd/pipe/pkg/app/piped/executor" + "github.com/pipe-cd/pipe/pkg/app/piped/metadatastore" + "github.com/pipe-cd/pipe/pkg/model" +) + +type fakeLogPersister struct{} + +func (l *fakeLogPersister) Write(_ []byte) (int, error) { return 0, nil } +func (l *fakeLogPersister) Info(_ string) {} +func (l *fakeLogPersister) Infof(_ string, _ ...interface{}) {} +func (l *fakeLogPersister) Success(_ string) {} +func (l *fakeLogPersister) Successf(_ string, _ ...interface{}) {} +func (l *fakeLogPersister) Error(_ string) {} +func (l *fakeLogPersister) Errorf(_ string, _ ...interface{}) {} + +type metadata map[string]string + +type fakeAPIClient struct { + shared metadata + stages map[string]metadata +} + +func (c *fakeAPIClient) SaveDeploymentMetadata(_ context.Context, req *pipedservice.SaveDeploymentMetadataRequest, _ ...grpc.CallOption) (*pipedservice.SaveDeploymentMetadataResponse, error) { + md := make(map[string]string, len(c.shared)+len(req.Metadata)) + for k, v := range c.shared { + md[k] = v + } + for k, v := range req.Metadata { + md[k] = v + } + c.shared = md + return &pipedservice.SaveDeploymentMetadataResponse{}, nil +} + +func (c *fakeAPIClient) SaveStageMetadata(_ context.Context, req *pipedservice.SaveStageMetadataRequest, _ ...grpc.CallOption) (*pipedservice.SaveStageMetadataResponse, error) { + ori := c.stages[req.StageId] + md := make(map[string]string, len(ori)+len(req.Metadata)) + for k, v := range ori { + md[k] = v + } + for k, v := range req.Metadata { + md[k] = v + } + c.stages[req.StageId] = md + return &pipedservice.SaveStageMetadataResponse{}, nil +} + +type fakeNotifier struct{} + +func (n *fakeNotifier) Notify(_ model.NotificationEvent) {} + +func TestValidateApproverNum(t *testing.T) { + ctx := context.Background() + + ac := &fakeAPIClient{ + shared: make(map[string]string, 0), + stages: make(map[string]metadata, 0), + } + testcases := []struct { + name string + approver string + minApproverNum int + executor *Executor + want bool + }{ + { + name: "return the person who just approved", + approver: "user-1", + minApproverNum: 0, + executor: &Executor{ + Input: executor.Input{ + Stage: &model.PipelineStage{ + Id: "stage-1", + }, + LogPersister: &fakeLogPersister{}, + MetadataStore: metadatastore.NewMetadataStore(ac, &model.Deployment{ + Stages: []*model.PipelineStage{ + { + Id: "stage-1", + Metadata: map[string]string{}, + }, + }, + }), + Notifier: &fakeNotifier{}, + }, + }, + want: true, + }, + { + name: "return an empty string because number of current approver is not enough", + approver: "user-1", + minApproverNum: 2, + executor: &Executor{ + Input: executor.Input{ + Stage: &model.PipelineStage{ + Id: "stage-1", + }, + LogPersister: &fakeLogPersister{}, + MetadataStore: metadatastore.NewMetadataStore(ac, &model.Deployment{ + Stages: []*model.PipelineStage{ + { + Id: "stage-1", + Metadata: map[string]string{}, + }, + }, + }), + }, + }, + want: false, + }, + { + name: "return an empty string because current approver is same as an approver in metadata", + approver: "user-1", + minApproverNum: 2, + executor: &Executor{ + Input: executor.Input{ + Stage: &model.PipelineStage{ + Id: "stage-1", + }, + LogPersister: &fakeLogPersister{}, + MetadataStore: metadatastore.NewMetadataStore(ac, &model.Deployment{ + Stages: []*model.PipelineStage{ + { + Id: "stage-1", + Metadata: map[string]string{ + approvedByKey: "user-1", + }, + }, + }, + }), + Notifier: &fakeNotifier{}, + }, + }, + want: false, + }, + { + name: "return an empty string because number of current approver and approvers in metadata is not enough", + approver: "user-2", + minApproverNum: 3, + executor: &Executor{ + Input: executor.Input{ + Stage: &model.PipelineStage{ + Id: "stage-1", + }, + LogPersister: &fakeLogPersister{}, + MetadataStore: metadatastore.NewMetadataStore(ac, &model.Deployment{ + Stages: []*model.PipelineStage{ + { + Id: "stage-1", + Metadata: map[string]string{ + approvedByKey: "user-1", + }, + }, + }, + }), + Notifier: &fakeNotifier{}, + }, + }, + want: false, + }, + { + name: "return all approvers", + approver: "user-2", + minApproverNum: 2, + executor: &Executor{ + Input: executor.Input{ + Stage: &model.PipelineStage{ + Id: "stage-1", + }, + LogPersister: &fakeLogPersister{}, + MetadataStore: metadatastore.NewMetadataStore(ac, &model.Deployment{ + Stages: []*model.PipelineStage{ + { + Id: "stage-1", + Metadata: map[string]string{ + approvedByKey: "user-1", + }, + }, + }, + }), + Notifier: &fakeNotifier{}, + }, + }, + want: true, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + got := tc.executor.validateApproverNum(ctx, tc.approver, tc.minApproverNum) + assert.Equal(t, tc.want, got) + }) + } +} diff --git a/pkg/config/deployment.go b/pkg/config/deployment.go index accc977e9c..1340afec86 100644 --- a/pkg/config/deployment.go +++ b/pkg/config/deployment.go @@ -105,6 +105,11 @@ func (s *GenericDeploymentSpec) Validate() error { return err } } + if stage.WaitApprovalStageOptions != nil { + if err := stage.WaitApprovalStageOptions.Validate(); err != nil { + return err + } + } } } @@ -365,8 +370,16 @@ type WaitStageOptions struct { type WaitApprovalStageOptions struct { // The maximum length of time to wait before giving up. // Defaults to 6h. - Timeout Duration `json:"timeout"` - Approvers []string `json:"approvers"` + Timeout Duration `json:"timeout"` + Approvers []string `json:"approvers"` + MinApproverNum int `json:"minApproverNum" default:"1"` +} + +func (w *WaitApprovalStageOptions) Validate() error { + if w.MinApproverNum < 1 { + return fmt.Errorf("minApproverNum %d should be greater than 0", w.MinApproverNum) + } + return nil } // AnalysisStageOptions contains all configurable values for a K8S_ANALYSIS stage. diff --git a/pkg/config/deployment_terraform_test.go b/pkg/config/deployment_terraform_test.go index dae30249b9..f17f5c4a2f 100644 --- a/pkg/config/deployment_terraform_test.go +++ b/pkg/config/deployment_terraform_test.go @@ -135,7 +135,8 @@ func TestTerraformDeploymentConfig(t *testing.T) { WaitApprovalStageOptions: &WaitApprovalStageOptions{ Approvers: []string{"foo", "bar"}, // Use defaultWaitApprovalTimeout on unset timeout value for WaitApprovalStage. - Timeout: defaultWaitApprovalTimeout, + Timeout: defaultWaitApprovalTimeout, + MinApproverNum: 1, }, }, { diff --git a/pkg/config/deployment_test.go b/pkg/config/deployment_test.go index 5c961a4d99..8226ede056 100644 --- a/pkg/config/deployment_test.go +++ b/pkg/config/deployment_test.go @@ -74,6 +74,34 @@ func TestHasStage(t *testing.T) { } } +func TestValidateWaitApprovalStageOptions(t *testing.T) { + testcases := []struct { + name string + minApproverNum int + wantErr bool + }{ + { + name: "valid", + minApproverNum: 1, + wantErr: false, + }, + { + name: "invalid", + minApproverNum: -1, + wantErr: true, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + w := &WaitApprovalStageOptions{ + MinApproverNum: tc.minApproverNum, + } + err := w.Validate() + assert.Equal(t, tc.wantErr, err != nil) + }) + } +} + func TestFindSlackAccounts(t *testing.T) { testcases := []struct { name string