Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions pkg/app/piped/cloudprovider/lambda/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package lambda

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
Expand All @@ -30,12 +31,19 @@ import (
"go.uber.org/zap"
)

type TrafficConfigKeyName string

const (
defaultAliasName = "Service"
// RequestRetryTime represents the number of times calling to AWS resource control.
RequestRetryTime = 3
// RetryIntervalDuration represents duration time between retry.
RetryIntervalDuration = 1 * time.Minute

// TrafficPrimaryVersionKeyName represents the key points to primary version config on traffic routing map.
TrafficPrimaryVersionKeyName TrafficConfigKeyName = "primary"
// TrafficSecondaryVersionKeyName represents the key points to primary version config on traffic routing map.
TrafficSecondaryVersionKeyName TrafficConfigKeyName = "secondary"
)

// ErrNotFound lambda resource occurred.
Expand Down Expand Up @@ -195,12 +203,27 @@ func (c *client) PublishFunction(ctx context.Context, fm FunctionManifest) (vers
}

// RoutingTrafficConfig presents a map of primary and secondary version traffic for lambda function alias.
type RoutingTrafficConfig map[string]VersionTraffic
type RoutingTrafficConfig map[TrafficConfigKeyName]VersionTraffic

func (c *RoutingTrafficConfig) Encode() (string, error) {
out, err := json.Marshal(c)
if err != nil {
return "", err
}
return string(out), nil
}

func (c *RoutingTrafficConfig) Decode(data []byte) error {
if err := json.Unmarshal(data, c); err != nil {
return err
}
return nil
}

// VersionTraffic presents the version, and the percent of traffic that's routed to it.
type VersionTraffic struct {
Version string
Percent float64
Version string `json:"version"`
Percent float64 `json:"percent"`
}

func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (routingTrafficCfg RoutingTrafficConfig, err error) {
Expand Down Expand Up @@ -229,7 +252,7 @@ func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (rou
return
}

routingTrafficCfg = make(map[string]VersionTraffic)
routingTrafficCfg = make(map[TrafficConfigKeyName]VersionTraffic)
/* The current return value from GetAlias as below
{
"AliasArn": "arn:aws:lambda:ap-northeast-1:769161735124:function:SimpleCanaryFunction:Service",
Expand All @@ -253,7 +276,7 @@ func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (rou
*/
// In case RoutingConfig is nil, 100 percent of current traffic is handled by FunctionVersion version.
if cfg.RoutingConfig == nil {
routingTrafficCfg["primary"] = VersionTraffic{
routingTrafficCfg[TrafficPrimaryVersionKeyName] = VersionTraffic{
Version: aws.StringValue(cfg.FunctionVersion),
Percent: 100,
}
Expand All @@ -264,12 +287,12 @@ func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (rou
var secondaryVersionTraffic float64
for version, weight := range cfg.RoutingConfig.AdditionalVersionWeights {
secondaryVersionTraffic = percentageToPercent(aws.Float64Value(weight))
routingTrafficCfg["secondary"] = VersionTraffic{
routingTrafficCfg[TrafficSecondaryVersionKeyName] = VersionTraffic{
Version: version,
Percent: secondaryVersionTraffic,
}
}
routingTrafficCfg["primary"] = VersionTraffic{
routingTrafficCfg[TrafficPrimaryVersionKeyName] = VersionTraffic{
Version: aws.StringValue(cfg.FunctionVersion),
Percent: 100 - secondaryVersionTraffic,
}
Expand Down Expand Up @@ -305,7 +328,7 @@ func (c *client) CreateTrafficConfig(ctx context.Context, fm FunctionManifest, v
}

func (c *client) UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, routingTraffic RoutingTrafficConfig) error {
primary, ok := routingTraffic["primary"]
primary, ok := routingTraffic[TrafficPrimaryVersionKeyName]
if !ok {
return fmt.Errorf("invalid routing traffic configuration given: primary version not found")
}
Expand All @@ -316,7 +339,7 @@ func (c *client) UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, r
FunctionVersion: aws.String(primary.Version),
}

if secondary, ok := routingTraffic["secondary"]; ok {
if secondary, ok := routingTraffic[TrafficSecondaryVersionKeyName]; ok {
routingTrafficMap := make(map[string]*float64)
routingTrafficMap[secondary.Version] = aws.Float64(precentToPercentage(secondary.Percent))
input.RoutingConfig = &lambda.AliasRoutingConfiguration{
Expand Down
1 change: 1 addition & 0 deletions pkg/app/piped/executor/lambda/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"deploy.go",
"lambda.go",
"rollback.go",
],
importpath = "github.com/pipe-cd/pipe/pkg/app/piped/executor/lambda",
visibility = ["//visibility:public"],
Expand Down
56 changes: 51 additions & 5 deletions pkg/app/piped/executor/lambda/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ func Register(r registerer) {
r.Register(model.StageLambdaSync, f)
r.Register(model.StageLambdaPromote, f)
r.Register(model.StageLambdaCanaryRollout, f)

r.RegisterRollback(model.ApplicationKind_LAMBDA, func(in executor.Input) executor.Executor {
return &rollbackExecutor{
Input: in,
}
})
}

func findCloudProvider(in *executor.Input) (name string, cfg *config.CloudProviderLambdaConfig, found bool) {
Expand Down Expand Up @@ -104,6 +110,19 @@ func sync(ctx context.Context, in *executor.Input, cloudProviderName string, clo
in.LogPersister.Errorf("Failed to prepare traffic routing for Lambda function %s: %v", fm.Spec.Name, err)
return false
}
// Store the current traffic config for rollback if necessary.
if trafficCfg != nil {
originalTrafficCfg, err := trafficCfg.Encode()
if err != nil {
in.LogPersister.Errorf("Unable to store current traffic config for rollback: encode failed: %v", err)
return false
}
originalTrafficKeyName := fmt.Sprintf("original-traffic-%s", in.Deployment.RunningCommitHash)
if e := in.MetadataStore.Set(ctx, originalTrafficKeyName, originalTrafficCfg); e != nil {
in.LogPersister.Errorf("Unable to store current traffic config for rollback: %v", e)
return false
}
}

// Update 100% traffic to the new lambda version.
if !configureTrafficRouting(trafficCfg, version, 100) {
Expand Down Expand Up @@ -142,6 +161,21 @@ func rollout(ctx context.Context, in *executor.Input, cloudProviderName string,
return false
}

// Store current traffic config for rollback if necessary.
if trafficCfg, err := client.GetTrafficConfig(ctx, fm); err == nil {
// Store the current traffic config.
originalTrafficCfg, err := trafficCfg.Encode()
if err != nil {
in.LogPersister.Errorf("Unable to store current traffic config for rollback: encode failed: %v", err)
return false
}
originalTrafficKeyName := fmt.Sprintf("original-traffic-%s", in.Deployment.RunningCommitHash)
if e := in.MetadataStore.Set(ctx, originalTrafficKeyName, originalTrafficCfg); e != nil {
in.LogPersister.Errorf("Unable to store current traffic config for rollback: %v", e)
return false
}
}

return true
}

Expand Down Expand Up @@ -191,6 +225,18 @@ func promote(ctx context.Context, in *executor.Input, cloudProviderName string,
return false
}

// Store promote traffic config for rollback if necessary.
promoteTrafficCfgData, err := trafficCfg.Encode()
if err != nil {
in.LogPersister.Errorf("Unable to store current traffic config for rollback: encode failed: %v", err)
return false
}
promoteTrafficKeyName := fmt.Sprintf("latest-promote-traffic-%s", in.Deployment.RunningCommitHash)
if err := in.MetadataStore.Set(ctx, promoteTrafficKeyName, promoteTrafficCfgData); err != nil {
in.LogPersister.Errorf("Unable to store promote traffic config for rollback: %v", err)
return false
}

if err = client.UpdateTrafficConfig(ctx, fm, trafficCfg); err != nil {
in.LogPersister.Errorf("Failed to update traffic routing for Lambda function %s (version: %s): %v", fm.Spec.Name, version, err)
return false
Expand All @@ -202,25 +248,25 @@ func promote(ctx context.Context, in *executor.Input, cloudProviderName string,

func configureTrafficRouting(trafficCfg provider.RoutingTrafficConfig, version string, percent int) bool {
// The primary version has to be set on trafficCfg.
primary, ok := trafficCfg["primary"]
primary, ok := trafficCfg[provider.TrafficPrimaryVersionKeyName]
if !ok {
return false
}
// Set built version by rollout stage as new primary.
trafficCfg["primary"] = provider.VersionTraffic{
trafficCfg[provider.TrafficPrimaryVersionKeyName] = provider.VersionTraffic{
Version: version,
Percent: float64(percent),
}
// Make the current primary version as new secondary version in case it's not the latest built version by rollout stage.
if primary.Version != version {
trafficCfg["secondary"] = provider.VersionTraffic{
trafficCfg[provider.TrafficSecondaryVersionKeyName] = provider.VersionTraffic{
Version: primary.Version,
Percent: float64(100 - percent),
}
} else {
// Update traffic to the secondary and keep it as new secondary.
if secondary, ok := trafficCfg["secondary"]; ok {
trafficCfg["secondary"] = provider.VersionTraffic{
if secondary, ok := trafficCfg[provider.TrafficSecondaryVersionKeyName]; ok {
trafficCfg[provider.TrafficSecondaryVersionKeyName] = provider.VersionTraffic{
Version: secondary.Version,
Percent: float64(100 - percent),
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/app/piped/executor/lambda/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,19 @@ func TestConfigureTrafficRouting(t *testing.T) {

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
trafficCfg := make(map[string]provider.VersionTraffic)
trafficCfg := make(map[provider.TrafficConfigKeyName]provider.VersionTraffic)
if tc.primary != nil {
trafficCfg["primary"] = *tc.primary
trafficCfg[provider.TrafficPrimaryVersionKeyName] = *tc.primary
}
if tc.secondary != nil {
trafficCfg["secondary"] = *tc.secondary
trafficCfg[provider.TrafficSecondaryVersionKeyName] = *tc.secondary
}
ok := configureTrafficRouting(trafficCfg, tc.version, tc.percent)
assert.Equal(t, tc.out, ok)
if primary, ok := trafficCfg["primary"]; ok {
if primary, ok := trafficCfg[provider.TrafficPrimaryVersionKeyName]; ok {
assert.Equal(t, tc.version, primary.Version)
assert.Equal(t, float64(tc.percent), primary.Percent)
if secondary, ok := trafficCfg["secondary"]; ok {
if secondary, ok := trafficCfg[provider.TrafficSecondaryVersionKeyName]; ok {
assert.Equal(t, float64(100-tc.percent), secondary.Percent)
}
}
Expand Down
Loading