diff --git a/pkg/app/piped/cloudprovider/lambda/client.go b/pkg/app/piped/cloudprovider/lambda/client.go index e5a9bff4d3..201f5b57bd 100644 --- a/pkg/app/piped/cloudprovider/lambda/client.go +++ b/pkg/app/piped/cloudprovider/lambda/client.go @@ -16,6 +16,7 @@ package lambda import ( "context" + "encoding/json" "errors" "fmt" "time" @@ -30,12 +31,19 @@ import ( "go.uber.org/zap" ) +type TrafficConfigKeyName string + const ( defaultAliasName = "Service" // RequestRetryTime represents the number of times calling to AWS resource control. RequestRetryTime = 3 // RetryIntervalDuration represents duration time between retry. RetryIntervalDuration = 1 * time.Minute + + // TrafficPrimaryVersionKeyName represents the key points to primary version config on traffic routing map. + TrafficPrimaryVersionKeyName TrafficConfigKeyName = "primary" + // TrafficSecondaryVersionKeyName represents the key points to primary version config on traffic routing map. + TrafficSecondaryVersionKeyName TrafficConfigKeyName = "secondary" ) // ErrNotFound lambda resource occurred. @@ -195,12 +203,27 @@ func (c *client) PublishFunction(ctx context.Context, fm FunctionManifest) (vers } // RoutingTrafficConfig presents a map of primary and secondary version traffic for lambda function alias. -type RoutingTrafficConfig map[string]VersionTraffic +type RoutingTrafficConfig map[TrafficConfigKeyName]VersionTraffic + +func (c *RoutingTrafficConfig) Encode() (string, error) { + out, err := json.Marshal(c) + if err != nil { + return "", err + } + return string(out), nil +} + +func (c *RoutingTrafficConfig) Decode(data []byte) error { + if err := json.Unmarshal(data, c); err != nil { + return err + } + return nil +} // VersionTraffic presents the version, and the percent of traffic that's routed to it. type VersionTraffic struct { - Version string - Percent float64 + Version string `json:"version"` + Percent float64 `json:"percent"` } func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (routingTrafficCfg RoutingTrafficConfig, err error) { @@ -229,7 +252,7 @@ func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (rou return } - routingTrafficCfg = make(map[string]VersionTraffic) + routingTrafficCfg = make(map[TrafficConfigKeyName]VersionTraffic) /* The current return value from GetAlias as below { "AliasArn": "arn:aws:lambda:ap-northeast-1:769161735124:function:SimpleCanaryFunction:Service", @@ -253,7 +276,7 @@ func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (rou */ // In case RoutingConfig is nil, 100 percent of current traffic is handled by FunctionVersion version. if cfg.RoutingConfig == nil { - routingTrafficCfg["primary"] = VersionTraffic{ + routingTrafficCfg[TrafficPrimaryVersionKeyName] = VersionTraffic{ Version: aws.StringValue(cfg.FunctionVersion), Percent: 100, } @@ -264,12 +287,12 @@ func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (rou var secondaryVersionTraffic float64 for version, weight := range cfg.RoutingConfig.AdditionalVersionWeights { secondaryVersionTraffic = percentageToPercent(aws.Float64Value(weight)) - routingTrafficCfg["secondary"] = VersionTraffic{ + routingTrafficCfg[TrafficSecondaryVersionKeyName] = VersionTraffic{ Version: version, Percent: secondaryVersionTraffic, } } - routingTrafficCfg["primary"] = VersionTraffic{ + routingTrafficCfg[TrafficPrimaryVersionKeyName] = VersionTraffic{ Version: aws.StringValue(cfg.FunctionVersion), Percent: 100 - secondaryVersionTraffic, } @@ -305,7 +328,7 @@ func (c *client) CreateTrafficConfig(ctx context.Context, fm FunctionManifest, v } func (c *client) UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, routingTraffic RoutingTrafficConfig) error { - primary, ok := routingTraffic["primary"] + primary, ok := routingTraffic[TrafficPrimaryVersionKeyName] if !ok { return fmt.Errorf("invalid routing traffic configuration given: primary version not found") } @@ -316,7 +339,7 @@ func (c *client) UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, r FunctionVersion: aws.String(primary.Version), } - if secondary, ok := routingTraffic["secondary"]; ok { + if secondary, ok := routingTraffic[TrafficSecondaryVersionKeyName]; ok { routingTrafficMap := make(map[string]*float64) routingTrafficMap[secondary.Version] = aws.Float64(precentToPercentage(secondary.Percent)) input.RoutingConfig = &lambda.AliasRoutingConfiguration{ diff --git a/pkg/app/piped/executor/lambda/BUILD.bazel b/pkg/app/piped/executor/lambda/BUILD.bazel index b8ef7daa35..ecb1037024 100644 --- a/pkg/app/piped/executor/lambda/BUILD.bazel +++ b/pkg/app/piped/executor/lambda/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "deploy.go", "lambda.go", + "rollback.go", ], importpath = "github.com/pipe-cd/pipe/pkg/app/piped/executor/lambda", visibility = ["//visibility:public"], diff --git a/pkg/app/piped/executor/lambda/lambda.go b/pkg/app/piped/executor/lambda/lambda.go index e50c220705..0eb80e8ae9 100644 --- a/pkg/app/piped/executor/lambda/lambda.go +++ b/pkg/app/piped/executor/lambda/lambda.go @@ -42,6 +42,12 @@ func Register(r registerer) { r.Register(model.StageLambdaSync, f) r.Register(model.StageLambdaPromote, f) r.Register(model.StageLambdaCanaryRollout, f) + + r.RegisterRollback(model.ApplicationKind_LAMBDA, func(in executor.Input) executor.Executor { + return &rollbackExecutor{ + Input: in, + } + }) } func findCloudProvider(in *executor.Input) (name string, cfg *config.CloudProviderLambdaConfig, found bool) { @@ -104,6 +110,19 @@ func sync(ctx context.Context, in *executor.Input, cloudProviderName string, clo in.LogPersister.Errorf("Failed to prepare traffic routing for Lambda function %s: %v", fm.Spec.Name, err) return false } + // Store the current traffic config for rollback if necessary. + if trafficCfg != nil { + originalTrafficCfg, err := trafficCfg.Encode() + if err != nil { + in.LogPersister.Errorf("Unable to store current traffic config for rollback: encode failed: %v", err) + return false + } + originalTrafficKeyName := fmt.Sprintf("original-traffic-%s", in.Deployment.RunningCommitHash) + if e := in.MetadataStore.Set(ctx, originalTrafficKeyName, originalTrafficCfg); e != nil { + in.LogPersister.Errorf("Unable to store current traffic config for rollback: %v", e) + return false + } + } // Update 100% traffic to the new lambda version. if !configureTrafficRouting(trafficCfg, version, 100) { @@ -142,6 +161,21 @@ func rollout(ctx context.Context, in *executor.Input, cloudProviderName string, return false } + // Store current traffic config for rollback if necessary. + if trafficCfg, err := client.GetTrafficConfig(ctx, fm); err == nil { + // Store the current traffic config. + originalTrafficCfg, err := trafficCfg.Encode() + if err != nil { + in.LogPersister.Errorf("Unable to store current traffic config for rollback: encode failed: %v", err) + return false + } + originalTrafficKeyName := fmt.Sprintf("original-traffic-%s", in.Deployment.RunningCommitHash) + if e := in.MetadataStore.Set(ctx, originalTrafficKeyName, originalTrafficCfg); e != nil { + in.LogPersister.Errorf("Unable to store current traffic config for rollback: %v", e) + return false + } + } + return true } @@ -191,6 +225,18 @@ func promote(ctx context.Context, in *executor.Input, cloudProviderName string, return false } + // Store promote traffic config for rollback if necessary. + promoteTrafficCfgData, err := trafficCfg.Encode() + if err != nil { + in.LogPersister.Errorf("Unable to store current traffic config for rollback: encode failed: %v", err) + return false + } + promoteTrafficKeyName := fmt.Sprintf("latest-promote-traffic-%s", in.Deployment.RunningCommitHash) + if err := in.MetadataStore.Set(ctx, promoteTrafficKeyName, promoteTrafficCfgData); err != nil { + in.LogPersister.Errorf("Unable to store promote traffic config for rollback: %v", err) + return false + } + if err = client.UpdateTrafficConfig(ctx, fm, trafficCfg); err != nil { in.LogPersister.Errorf("Failed to update traffic routing for Lambda function %s (version: %s): %v", fm.Spec.Name, version, err) return false @@ -202,25 +248,25 @@ func promote(ctx context.Context, in *executor.Input, cloudProviderName string, func configureTrafficRouting(trafficCfg provider.RoutingTrafficConfig, version string, percent int) bool { // The primary version has to be set on trafficCfg. - primary, ok := trafficCfg["primary"] + primary, ok := trafficCfg[provider.TrafficPrimaryVersionKeyName] if !ok { return false } // Set built version by rollout stage as new primary. - trafficCfg["primary"] = provider.VersionTraffic{ + trafficCfg[provider.TrafficPrimaryVersionKeyName] = provider.VersionTraffic{ Version: version, Percent: float64(percent), } // Make the current primary version as new secondary version in case it's not the latest built version by rollout stage. if primary.Version != version { - trafficCfg["secondary"] = provider.VersionTraffic{ + trafficCfg[provider.TrafficSecondaryVersionKeyName] = provider.VersionTraffic{ Version: primary.Version, Percent: float64(100 - percent), } } else { // Update traffic to the secondary and keep it as new secondary. - if secondary, ok := trafficCfg["secondary"]; ok { - trafficCfg["secondary"] = provider.VersionTraffic{ + if secondary, ok := trafficCfg[provider.TrafficSecondaryVersionKeyName]; ok { + trafficCfg[provider.TrafficSecondaryVersionKeyName] = provider.VersionTraffic{ Version: secondary.Version, Percent: float64(100 - percent), } diff --git a/pkg/app/piped/executor/lambda/lambda_test.go b/pkg/app/piped/executor/lambda/lambda_test.go index 1f8b4d53a6..a4341adc35 100644 --- a/pkg/app/piped/executor/lambda/lambda_test.go +++ b/pkg/app/piped/executor/lambda/lambda_test.go @@ -120,19 +120,19 @@ func TestConfigureTrafficRouting(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - trafficCfg := make(map[string]provider.VersionTraffic) + trafficCfg := make(map[provider.TrafficConfigKeyName]provider.VersionTraffic) if tc.primary != nil { - trafficCfg["primary"] = *tc.primary + trafficCfg[provider.TrafficPrimaryVersionKeyName] = *tc.primary } if tc.secondary != nil { - trafficCfg["secondary"] = *tc.secondary + trafficCfg[provider.TrafficSecondaryVersionKeyName] = *tc.secondary } ok := configureTrafficRouting(trafficCfg, tc.version, tc.percent) assert.Equal(t, tc.out, ok) - if primary, ok := trafficCfg["primary"]; ok { + if primary, ok := trafficCfg[provider.TrafficPrimaryVersionKeyName]; ok { assert.Equal(t, tc.version, primary.Version) assert.Equal(t, float64(tc.percent), primary.Percent) - if secondary, ok := trafficCfg["secondary"]; ok { + if secondary, ok := trafficCfg[provider.TrafficSecondaryVersionKeyName]; ok { assert.Equal(t, float64(100-tc.percent), secondary.Percent) } } diff --git a/pkg/app/piped/executor/lambda/rollback.go b/pkg/app/piped/executor/lambda/rollback.go new file mode 100644 index 0000000000..59efb2e4f2 --- /dev/null +++ b/pkg/app/piped/executor/lambda/rollback.go @@ -0,0 +1,155 @@ +// Copyright 2021 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lambda + +import ( + "context" + "fmt" + + provider "github.com/pipe-cd/pipe/pkg/app/piped/cloudprovider/lambda" + "github.com/pipe-cd/pipe/pkg/app/piped/executor" + "github.com/pipe-cd/pipe/pkg/config" + "github.com/pipe-cd/pipe/pkg/model" +) + +type rollbackExecutor struct { + executor.Input +} + +func (e *rollbackExecutor) Execute(sig executor.StopSignal) model.StageStatus { + var ( + ctx = sig.Context() + originalStatus = e.Stage.Status + status model.StageStatus + ) + + switch model.Stage(e.Stage.Name) { + case model.StageRollback: + status = e.ensureRollback(ctx) + default: + e.LogPersister.Errorf("Unsupported stage %s for cloudrun application", e.Stage.Name) + return model.StageStatus_STAGE_FAILURE + } + + return executor.DetermineStageStatus(sig.Signal(), originalStatus, status) +} + +func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus { + // Not rollback in case this is the first deployment. + if e.Deployment.RunningCommitHash == "" { + e.LogPersister.Errorf("Unable to determine the last deployed commit to rollback. It seems this is the first deployment.") + return model.StageStatus_STAGE_FAILURE + } + + runningDS, err := e.RunningDSP.GetReadOnly(ctx, e.LogPersister) + if err != nil { + e.LogPersister.Errorf("Failed to prepare running deploy source data (%v)", err) + return model.StageStatus_STAGE_FAILURE + } + + deployCfg := runningDS.DeploymentConfig.LambdaDeploymentSpec + if deployCfg == nil { + e.LogPersister.Errorf("Malformed deployment configuration: missing LambdaDeploymentSpec") + return model.StageStatus_STAGE_FAILURE + } + + cloudProviderName, cloudProviderCfg, found := findCloudProvider(&e.Input) + if !found { + return model.StageStatus_STAGE_FAILURE + } + + fm, ok := loadFunctionManifest(&e.Input, deployCfg.Input.FunctionManifestFile, runningDS) + if !ok { + return model.StageStatus_STAGE_FAILURE + } + + if !rollback(ctx, &e.Input, cloudProviderName, cloudProviderCfg, fm) { + return model.StageStatus_STAGE_FAILURE + } + + return model.StageStatus_STAGE_SUCCESS +} + +func rollback(ctx context.Context, in *executor.Input, cloudProviderName string, cloudProviderCfg *config.CloudProviderLambdaConfig, fm provider.FunctionManifest) bool { + in.LogPersister.Infof("Start rollback the lambda function: %s to original stage", fm.Spec.Name) + client, err := provider.DefaultRegistry().Client(cloudProviderName, cloudProviderCfg, in.Logger) + if err != nil { + in.LogPersister.Errorf("Unable to create Lambda client for the provider %s: %v", cloudProviderName, err) + return false + } + + // Restore original traffic config from metadata store. + originalTrafficKeyName := fmt.Sprintf("original-traffic-%s", in.Deployment.RunningCommitHash) + originalTrafficCfgData, ok := in.MetadataStore.Get(originalTrafficKeyName) + if !ok { + in.LogPersister.Errorf("Unable to prepare original traffic config to rollback Lambda function %s. It seems this is the first deployment.", fm.Spec.Name) + return false + } + + originalTrafficCfg := provider.RoutingTrafficConfig{} + if err := originalTrafficCfg.Decode([]byte(originalTrafficCfgData)); err != nil { + in.LogPersister.Errorf("Unable to prepare original traffic config to rollback Lambda function %s: %v", fm.Spec.Name, err) + return false + } + + // Restore promoted traffic config from metadata store. + promotedTrafficKeyName := fmt.Sprintf("latest-promote-traffic-%s", in.Deployment.RunningCommitHash) + promotedTrafficCfgData, ok := in.MetadataStore.Get(promotedTrafficKeyName) + // If there is no previous promoted traffic config, which mean no promote run previously so no need to do anything to rollback. + if !ok { + in.LogPersister.Info("It seems the traffic has not been changed during the deployment process. No need to rollback the traffic config.") + return true + } + + promotedTrafficCfg := provider.RoutingTrafficConfig{} + if err := promotedTrafficCfg.Decode([]byte(promotedTrafficCfgData)); err != nil { + in.LogPersister.Errorf("Unable to prepare promoted traffic config to rollback Lambda function %s: %v", fm.Spec.Name, err) + return false + } + + switch len(originalTrafficCfg) { + // Original traffic config has both PRIMARY and SECONDARY version config. + case 2: + if err = client.UpdateTrafficConfig(ctx, fm, originalTrafficCfg); err != nil { + in.LogPersister.Errorf("Failed to rollback original traffic config for Lambda function %s: %v", fm.Spec.Name, err) + return false + } + return true + // Original traffic config is PRIMARY ONLY config, + // we need to reset any others SECONDARY created by previous (until failed) PROMOTE stages. + case 1: + // Validate stored original traffic config, since it PRIMARY ONLY, the percent must be float64(100) + primary, ok := originalTrafficCfg[provider.TrafficPrimaryVersionKeyName] + if !ok || primary.Percent != float64(100) { + in.LogPersister.Errorf("Unable to prepare original traffic config: invalid original traffic config stored") + return false + } + + // Update promoted traffic config by add 0% SECONDARY for reset remote promoted version config. + if !configureTrafficRouting(promotedTrafficCfg, primary.Version, 100) { + in.LogPersister.Errorf("Unable to prepare traffic config to rollback Lambda function %s: can not reset promoted version", fm.Spec.Name) + return false + } + + if err = client.UpdateTrafficConfig(ctx, fm, promotedTrafficCfg); err != nil { + in.LogPersister.Errorf("Failed to rollback original traffic config for Lambda function %s: %v", fm.Spec.Name, err) + return false + } + return true + default: + in.LogPersister.Errorf("Unable to prepare original traffic config: invalid original traffic config stored") + return false + } +}