Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
4066174
Support multiple manual approvals
Nov 19, 2021
7d7c088
Fix unused variables
Nov 19, 2021
3a1981f
Format codes
Nov 19, 2021
fbeeb96
Fix test
Nov 19, 2021
3643ad2
Fix formats
Nov 19, 2021
3bd9b03
Fix formats
Nov 19, 2021
2c8ff73
Format unfmt-ed files
Nov 19, 2021
b6d5e99
Update log messages
Nov 19, 2021
ab87d54
Fix tests
Nov 19, 2021
e73e886
Update pkg/app/piped/executor/waitapproval/waitapproval.go
Nov 19, 2021
e511ec0
Update pkg/app/piped/executor/waitapproval/waitapproval.go
Nov 19, 2021
3c1d901
Update pkg/app/piped/executor/waitapproval/waitapproval.go
Nov 19, 2021
29e97bf
Update pkg/app/piped/executor/waitapproval/waitapproval.go
Nov 19, 2021
3f10ecc
Use num
Nov 19, 2021
ad2fd57
Rename key name
Nov 24, 2021
19c2c80
Move log part to into validateApproverNum function
Nov 24, 2021
b391a82
Fix test
Nov 24, 2021
9ee377d
fix test
Nov 24, 2021
ee6792c
fix
Nov 24, 2021
f0408d2
fix
Nov 24, 2021
a123648
Fix formats
Nov 24, 2021
c7d1cdc
Rename num
Nov 24, 2021
64c2361
Use fmt.Sprintf instead of addition
Nov 24, 2021
0074085
Use StageConfig instead of metadata
Nov 24, 2021
650f3bb
Fix test
Nov 24, 2021
8f9a544
Fix
Nov 24, 2021
c0e6d91
Fix typo
Nov 25, 2021
b2ab617
Refactor the logic
Nov 25, 2021
411f7fa
Format code
Nov 25, 2021
d4a7810
fix test
Nov 25, 2021
ddf3196
fix
Nov 25, 2021
8eeed0a
fix
Nov 25, 2021
64f0cea
fix
Nov 25, 2021
bcd2ea0
fix
Nov 25, 2021
d332a42
fix
Nov 25, 2021
c592269
fix
Nov 25, 2021
41d336b
fix
Nov 25, 2021
1101e89
format
Nov 25, 2021
19fbc7a
Fix
Nov 25, 2021
7d60a88
fix test
Nov 25, 2021
9449372
fix
Nov 25, 2021
82c2d8e
fix test
Nov 25, 2021
0c3dce1
fix
Nov 25, 2021
ed37174
Call validation for WaitApprovalStageOptions
Nov 26, 2021
741101e
Update pkg/config/deployment.go
Nov 26, 2021
2df64f6
Update pkg/app/piped/executor/waitapproval/waitapproval.go
Nov 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion pkg/app/piped/executor/waitapproval/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
Expand All @@ -12,3 +12,18 @@ go_library(
"@org_uber_go_zap//:go_default_library",
],
)

go_test(
name = "go_default_test",
size = "small",
srcs = ["waitapproval_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/app/api/service/pipedservice:go_default_library",
"//pkg/app/piped/executor:go_default_library",
"//pkg/app/piped/metadatastore:go_default_library",
"//pkg/model:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
"@org_golang_google_grpc//:go_default_library",
],
)
66 changes: 51 additions & 15 deletions pkg/app/piped/executor/waitapproval/waitapproval.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -61,13 +62,13 @@ func (e *Executor) Execute(sig executor.StopSignal) model.StageStatus {
timer := time.NewTimer(timeout)

e.reportRequiringApproval()
e.LogPersister.Info("Waiting for an approval...")

num := e.StageConfig.WaitApprovalStageOptions.MinApproverNum
e.LogPersister.Infof("Waiting for approval from at least %d user(s)...", num)
for {
select {
case <-ticker.C:
if commander, ok := e.checkApproval(ctx); ok {
e.reportApproved(commander)
e.LogPersister.Infof("Got an approval from %s", commander)
if e.checkApproval(ctx, num) {
return model.StageStatus_STAGE_SUCCESS
}

Expand All @@ -87,7 +88,7 @@ func (e *Executor) Execute(sig executor.StopSignal) model.StageStatus {
}
}

func (e *Executor) checkApproval(ctx context.Context) (string, bool) {
func (e *Executor) checkApproval(ctx context.Context, num int) bool {
var approveCmd *model.ReportableCommand
commands := e.CommandLister.ListCommands()

Expand All @@ -98,21 +99,14 @@ func (e *Executor) checkApproval(ctx context.Context) (string, bool) {
}
}
if approveCmd == nil {
return "", false
}

metadata := map[string]string{
approvedByKey: approveCmd.Commander,
}
if err := e.MetadataStore.Stage(e.Stage.Id).PutMulti(ctx, metadata); err != nil {
e.LogPersister.Errorf("Unabled to save approver information to deployment, %v", err)
return "", false
return false
}

reached := e.validateApproverNum(ctx, approveCmd.Commander, num)
if err := approveCmd.Report(ctx, model.CommandStatus_COMMAND_SUCCEEDED, nil, nil); err != nil {
e.Logger.Error("failed to report handled command", zap.Error(err))
}
return approveCmd.Commander, true
return reached
}

func (e *Executor) reportApproved(approver string) {
Expand Down Expand Up @@ -161,3 +155,45 @@ func (e *Executor) getMentionedAccounts(event model.NotificationEventType) ([]st

return notification.FindSlackAccounts(event), nil
}

// validateApproverNum checks if number of approves is valid.
func (e *Executor) validateApproverNum(ctx context.Context, approver string, minApproverNum int) bool {
if minApproverNum == 1 {
if err := e.MetadataStore.Stage(e.Stage.Id).Put(ctx, approvedByKey, approver); err != nil {
e.LogPersister.Errorf("Unable to save approver information to deployment, %v", err)
}
e.LogPersister.Infof("Got approval from %q", approver)
e.reportApproved(approver)
e.LogPersister.Infof("This stage has been approved by %d user (%s)", minApproverNum, approver)
return true
}

const delimiter = ", "
as, _ := e.MetadataStore.Stage(e.Stage.Id).Get(approvedByKey)
var approvedUsers []string
if as != "" {
approvedUsers = strings.Split(as, delimiter)
}

for _, u := range approvedUsers {
if u == approver {
e.LogPersister.Infof("Approval from the same user (%s) will not be counted", approver)
return false
}
}
e.LogPersister.Infof("Got approval from %q", approver)
approvedUsers = append(approvedUsers, approver)
aus := strings.Join(approvedUsers, delimiter)

if err := e.MetadataStore.Stage(e.Stage.Id).Put(ctx, approvedByKey, aus); err != nil {
e.LogPersister.Errorf("Unable to save approver information to deployment, %v", err)
}
if remain := minApproverNum - len(approvedUsers); remain > 0 {
e.LogPersister.Infof("Waiting for %d other approvers...", remain)
return false
}
e.reportApproved(aus)
e.LogPersister.Info("Received all needed approvals")
e.LogPersister.Infof("This stage has been approved by %d users (%s)", minApproverNum, aus)
return true
}
217 changes: 217 additions & 0 deletions pkg/app/piped/executor/waitapproval/waitapproval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright 2021 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package waitapproval

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"

"github.com/pipe-cd/pipe/pkg/app/api/service/pipedservice"
"github.com/pipe-cd/pipe/pkg/app/piped/executor"
"github.com/pipe-cd/pipe/pkg/app/piped/metadatastore"
"github.com/pipe-cd/pipe/pkg/model"
)

type fakeLogPersister struct{}

func (l *fakeLogPersister) Write(_ []byte) (int, error) { return 0, nil }
func (l *fakeLogPersister) Info(_ string) {}
func (l *fakeLogPersister) Infof(_ string, _ ...interface{}) {}
func (l *fakeLogPersister) Success(_ string) {}
func (l *fakeLogPersister) Successf(_ string, _ ...interface{}) {}
func (l *fakeLogPersister) Error(_ string) {}
func (l *fakeLogPersister) Errorf(_ string, _ ...interface{}) {}

type metadata map[string]string

type fakeAPIClient struct {
shared metadata
stages map[string]metadata
}

func (c *fakeAPIClient) SaveDeploymentMetadata(_ context.Context, req *pipedservice.SaveDeploymentMetadataRequest, _ ...grpc.CallOption) (*pipedservice.SaveDeploymentMetadataResponse, error) {
md := make(map[string]string, len(c.shared)+len(req.Metadata))
for k, v := range c.shared {
md[k] = v
}
for k, v := range req.Metadata {
md[k] = v
}
c.shared = md
return &pipedservice.SaveDeploymentMetadataResponse{}, nil
}

func (c *fakeAPIClient) SaveStageMetadata(_ context.Context, req *pipedservice.SaveStageMetadataRequest, _ ...grpc.CallOption) (*pipedservice.SaveStageMetadataResponse, error) {
ori := c.stages[req.StageId]
md := make(map[string]string, len(ori)+len(req.Metadata))
for k, v := range ori {
md[k] = v
}
for k, v := range req.Metadata {
md[k] = v
}
c.stages[req.StageId] = md
return &pipedservice.SaveStageMetadataResponse{}, nil
}

type fakeNotifier struct{}

func (n *fakeNotifier) Notify(_ model.NotificationEvent) {}

func TestValidateApproverNum(t *testing.T) {
ctx := context.Background()

ac := &fakeAPIClient{
shared: make(map[string]string, 0),
stages: make(map[string]metadata, 0),
}
testcases := []struct {
name string
approver string
minApproverNum int
executor *Executor
want bool
}{
{
name: "return the person who just approved",
approver: "user-1",
minApproverNum: 0,
executor: &Executor{
Input: executor.Input{
Stage: &model.PipelineStage{
Id: "stage-1",
},
LogPersister: &fakeLogPersister{},
MetadataStore: metadatastore.NewMetadataStore(ac, &model.Deployment{
Stages: []*model.PipelineStage{
{
Id: "stage-1",
Metadata: map[string]string{},
},
},
}),
Notifier: &fakeNotifier{},
},
},
want: true,
},
{
name: "return an empty string because number of current approver is not enough",
approver: "user-1",
minApproverNum: 2,
executor: &Executor{
Input: executor.Input{
Stage: &model.PipelineStage{
Id: "stage-1",
},
LogPersister: &fakeLogPersister{},
MetadataStore: metadatastore.NewMetadataStore(ac, &model.Deployment{
Stages: []*model.PipelineStage{
{
Id: "stage-1",
Metadata: map[string]string{},
},
},
}),
},
},
want: false,
},
{
name: "return an empty string because current approver is same as an approver in metadata",
approver: "user-1",
minApproverNum: 2,
executor: &Executor{
Input: executor.Input{
Stage: &model.PipelineStage{
Id: "stage-1",
},
LogPersister: &fakeLogPersister{},
MetadataStore: metadatastore.NewMetadataStore(ac, &model.Deployment{
Stages: []*model.PipelineStage{
{
Id: "stage-1",
Metadata: map[string]string{
approvedByKey: "user-1",
},
},
},
}),
Notifier: &fakeNotifier{},
},
},
want: false,
},
{
name: "return an empty string because number of current approver and approvers in metadata is not enough",
approver: "user-2",
minApproverNum: 3,
executor: &Executor{
Input: executor.Input{
Stage: &model.PipelineStage{
Id: "stage-1",
},
LogPersister: &fakeLogPersister{},
MetadataStore: metadatastore.NewMetadataStore(ac, &model.Deployment{
Stages: []*model.PipelineStage{
{
Id: "stage-1",
Metadata: map[string]string{
approvedByKey: "user-1",
},
},
},
}),
Notifier: &fakeNotifier{},
},
},
want: false,
},
{
name: "return all approvers",
approver: "user-2",
minApproverNum: 2,
executor: &Executor{
Input: executor.Input{
Stage: &model.PipelineStage{
Id: "stage-1",
},
LogPersister: &fakeLogPersister{},
MetadataStore: metadatastore.NewMetadataStore(ac, &model.Deployment{
Stages: []*model.PipelineStage{
{
Id: "stage-1",
Metadata: map[string]string{
approvedByKey: "user-1",
},
},
},
}),
Notifier: &fakeNotifier{},
},
},
want: true,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
got := tc.executor.validateApproverNum(ctx, tc.approver, tc.minApproverNum)
assert.Equal(t, tc.want, got)
})
}
}
17 changes: 15 additions & 2 deletions pkg/config/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ func (s *GenericDeploymentSpec) Validate() error {
return err
}
}
if stage.WaitApprovalStageOptions != nil {
if err := stage.WaitApprovalStageOptions.Validate(); err != nil {
return err
}
}
}
}

Expand Down Expand Up @@ -365,8 +370,16 @@ type WaitStageOptions struct {
type WaitApprovalStageOptions struct {
// The maximum length of time to wait before giving up.
// Defaults to 6h.
Timeout Duration `json:"timeout"`
Approvers []string `json:"approvers"`
Timeout Duration `json:"timeout"`
Approvers []string `json:"approvers"`
MinApproverNum int `json:"minApproverNum" default:"1"`
}

func (w *WaitApprovalStageOptions) Validate() error {
if w.MinApproverNum < 1 {
return fmt.Errorf("minApproverNum %d should be greater than 0", w.MinApproverNum)
}
return nil
}

// AnalysisStageOptions contains all configurable values for a K8S_ANALYSIS stage.
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/deployment_terraform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ func TestTerraformDeploymentConfig(t *testing.T) {
WaitApprovalStageOptions: &WaitApprovalStageOptions{
Approvers: []string{"foo", "bar"},
// Use defaultWaitApprovalTimeout on unset timeout value for WaitApprovalStage.
Timeout: defaultWaitApprovalTimeout,
Timeout: defaultWaitApprovalTimeout,
MinApproverNum: 1,
},
},
{
Expand Down
Loading