diff --git a/pkg/app/piped/cloudprovider/lambda/client.go b/pkg/app/piped/cloudprovider/lambda/client.go index e895e1ac69..81941126f4 100644 --- a/pkg/app/piped/cloudprovider/lambda/client.go +++ b/pkg/app/piped/cloudprovider/lambda/client.go @@ -194,13 +194,16 @@ func (c *client) PublishFunction(ctx context.Context, fm FunctionManifest) (vers return } +// RoutingTrafficConfig presents a map of primary and secondary version traffic for lambda function alias. +type RoutingTrafficConfig map[string]VersionTraffic + // VersionTraffic presents the version, and the percent of traffic that's routed to it. type VersionTraffic struct { Version string Percent float64 } -func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (routingTrafficCfg []VersionTraffic, err error) { +func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (routingTrafficCfg RoutingTrafficConfig, err error) { input := &lambda.GetAliasInput{ FunctionName: aws.String(fm.Spec.Name), Name: aws.String(defaultAliasName), @@ -226,17 +229,51 @@ func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (rou return } - // TODO: Fix Lambda.AliasConfiguration.RoutingConfig nil value. + routingTrafficCfg = make(map[string]VersionTraffic) + /* The current return value from GetAlias as below + { + "AliasArn": "arn:aws:lambda:ap-northeast-1:769161735124:function:SimpleCanaryFunction:Service", + "Name": "Service", + "FunctionVersion": "1", + "Description": "", + "RoutingConfig": { + "AdditionalVersionWeights": { + "3": 0.9 + } + }, + "RevisionId": "fe08805f-9851-44fc-9a79-6e086aefc290" + } + Note: + - In case RoutingConfig is nil, this mean 100% of traffic is handled by version represented by FunctionVersion value (PRIMARY version). + - In case RoutingConfig is not nil, RoutingConfig.AdditionalVersionWeights is expected to have ONLY ONE key/value pair + which presents the SECONDARY version handling traffic (represented by the value of the pair). + in short + _ version: 1 - FunctionVersion (the PRIMARY) handles (1 - 0.9) percentage of current traffic. + _ version: 3 - AdditionalVersionWeights key (the SECONDARY) handles 0.9 percentage of current traffic. + */ + // In case RoutingConfig is nil, 100 percent of current traffic is handled by FunctionVersion version. if cfg.RoutingConfig == nil { + routingTrafficCfg["primary"] = VersionTraffic{ + Version: *cfg.FunctionVersion, + Percent: 100, + } return } - routingTrafficCfg = make([]VersionTraffic, 0, len(cfg.RoutingConfig.AdditionalVersionWeights)) + // In case RoutingConfig is provided, FunctionVersion value represents the primary version while + // RoutingConfig.AdditionalVersionWeights key represents the secondary version. + var newerVersionTraffic float64 for version := range cfg.RoutingConfig.AdditionalVersionWeights { - routingTrafficCfg = append(routingTrafficCfg, VersionTraffic{ + newerVersionTraffic = percentageToPercent(*cfg.RoutingConfig.AdditionalVersionWeights[version]) + routingTrafficCfg["secondary"] = VersionTraffic{ Version: version, - Percent: *cfg.RoutingConfig.AdditionalVersionWeights[version], - }) + Percent: newerVersionTraffic, + } + } + routingTrafficCfg["primary"] = VersionTraffic{ + Version: *cfg.FunctionVersion, + Percent: 100 - newerVersionTraffic, } + return } @@ -267,24 +304,24 @@ func (c *client) CreateTrafficConfig(ctx context.Context, fm FunctionManifest, v return nil } -func (c *client) UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, routingTraffic []VersionTraffic) error { - routingTrafficMap := make(map[string]*float64) - switch len(routingTraffic) { - case 2: - routingTrafficMap[routingTraffic[0].Version] = aws.Float64(precentToPercentage(routingTraffic[0].Percent)) - routingTrafficMap[routingTraffic[1].Version] = aws.Float64(precentToPercentage(routingTraffic[1].Percent)) - case 1: - routingTrafficMap[routingTraffic[0].Version] = aws.Float64(precentToPercentage(routingTraffic[0].Percent)) - default: - return fmt.Errorf("invalid routing traffic configuration given") +func (c *client) UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, routingTraffic RoutingTrafficConfig) error { + primary, ok := routingTraffic["primary"] + if !ok { + return fmt.Errorf("invalid routing traffic configuration given: primary version not found") } input := &lambda.UpdateAliasInput{ - FunctionName: aws.String(fm.Spec.Name), - Name: aws.String(defaultAliasName), - RoutingConfig: &lambda.AliasRoutingConfiguration{ + FunctionName: aws.String(fm.Spec.Name), + Name: aws.String(defaultAliasName), + FunctionVersion: aws.String(primary.Version), + } + + if secondary, ok := routingTraffic["secondary"]; ok { + routingTrafficMap := make(map[string]*float64) + routingTrafficMap[secondary.Version] = aws.Float64(precentToPercentage(secondary.Percent)) + input.RoutingConfig = &lambda.AliasRoutingConfiguration{ AdditionalVersionWeights: routingTrafficMap, - }, + } } _, err := c.client.UpdateAliasWithContext(ctx, input) @@ -311,3 +348,7 @@ func (c *client) UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, r func precentToPercentage(in float64) float64 { return in / 100.0 } + +func percentageToPercent(in float64) float64 { + return in * 100 +} diff --git a/pkg/app/piped/cloudprovider/lambda/lambda.go b/pkg/app/piped/cloudprovider/lambda/lambda.go index 632e071783..e05ecf9ced 100644 --- a/pkg/app/piped/cloudprovider/lambda/lambda.go +++ b/pkg/app/piped/cloudprovider/lambda/lambda.go @@ -33,9 +33,9 @@ type Client interface { CreateFunction(ctx context.Context, fm FunctionManifest) error UpdateFunction(ctx context.Context, fm FunctionManifest) error PublishFunction(ctx context.Context, fm FunctionManifest) (version string, err error) - GetTrafficConfig(ctx context.Context, fm FunctionManifest) (routingTrafficCfg []VersionTraffic, err error) + GetTrafficConfig(ctx context.Context, fm FunctionManifest) (routingTrafficCfg RoutingTrafficConfig, err error) CreateTrafficConfig(ctx context.Context, fm FunctionManifest, version string) error - UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, routingTraffic []VersionTraffic) error + UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, routingTraffic RoutingTrafficConfig) error } // Registry holds a pool of aws client wrappers. diff --git a/pkg/app/piped/executor/lambda/BUILD.bazel b/pkg/app/piped/executor/lambda/BUILD.bazel index b65d3003fc..b8ef7daa35 100644 --- a/pkg/app/piped/executor/lambda/BUILD.bazel +++ b/pkg/app/piped/executor/lambda/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", @@ -17,3 +17,14 @@ go_library( "//pkg/model:go_default_library", ], ) + +go_test( + name = "go_default_test", + size = "small", + srcs = ["lambda_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/app/piped/cloudprovider/lambda:go_default_library", + "@com_github_stretchr_testify//assert:go_default_library", + ], +) diff --git a/pkg/app/piped/executor/lambda/deploy.go b/pkg/app/piped/executor/lambda/deploy.go index 1a8460387d..179cb8bc72 100644 --- a/pkg/app/piped/executor/lambda/deploy.go +++ b/pkg/app/piped/executor/lambda/deploy.go @@ -63,6 +63,8 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus { status = e.ensureSync(ctx) case model.StageLambdaPromote: status = e.ensurePromote(ctx) + case model.StageLambdaCanaryRollout: + status = e.ensureRollout(ctx) default: e.LogPersister.Errorf("Unsupported stage %s for lambda application", e.Stage.Name) return model.StageStatus_STAGE_FAILURE @@ -84,7 +86,28 @@ func (e *deployExecutor) ensureSync(ctx context.Context) model.StageStatus { return model.StageStatus_STAGE_SUCCESS } -func (e *deployExecutor) ensurePromote(_ context.Context) model.StageStatus { - e.LogPersister.Error("This stage is not implemented yet") - return model.StageStatus_STAGE_FAILURE +func (e *deployExecutor) ensurePromote(ctx context.Context) model.StageStatus { + fm, ok := loadFunctionManifest(&e.Input, e.deployCfg.Input.FunctionManifestFile, e.deploySource) + if !ok { + return model.StageStatus_STAGE_FAILURE + } + + if !promote(ctx, &e.Input, e.cloudProviderName, e.cloudProviderCfg, fm) { + return model.StageStatus_STAGE_FAILURE + } + + return model.StageStatus_STAGE_SUCCESS +} + +func (e *deployExecutor) ensureRollout(ctx context.Context) model.StageStatus { + fm, ok := loadFunctionManifest(&e.Input, e.deployCfg.Input.FunctionManifestFile, e.deploySource) + if !ok { + return model.StageStatus_STAGE_FAILURE + } + + if !rollout(ctx, &e.Input, e.cloudProviderName, e.cloudProviderCfg, fm) { + return model.StageStatus_STAGE_FAILURE + } + + return model.StageStatus_STAGE_SUCCESS } diff --git a/pkg/app/piped/executor/lambda/lambda.go b/pkg/app/piped/executor/lambda/lambda.go index 1301efafd8..e50c220705 100644 --- a/pkg/app/piped/executor/lambda/lambda.go +++ b/pkg/app/piped/executor/lambda/lambda.go @@ -17,6 +17,7 @@ package lambda import ( "context" "errors" + "fmt" "time" provider "github.com/pipe-cd/pipe/pkg/app/piped/cloudprovider/lambda" @@ -40,6 +41,7 @@ func Register(r registerer) { } r.Register(model.StageLambdaSync, f) r.Register(model.StageLambdaPromote, f) + r.Register(model.StageLambdaCanaryRollout, f) } func findCloudProvider(in *executor.Input) (name string, cfg *config.CloudProviderLambdaConfig, found bool) { @@ -73,40 +75,175 @@ func loadFunctionManifest(in *executor.Input, functionManifestFile string, ds *d return fm, true } -func decideRevisionName(in *executor.Input, fm provider.FunctionManifest, commit string) (revision string, ok bool) { - var err error - revision, err = provider.DecideRevisionName(fm, commit) +func sync(ctx context.Context, in *executor.Input, cloudProviderName string, cloudProviderCfg *config.CloudProviderLambdaConfig, fm provider.FunctionManifest) bool { + in.LogPersister.Infof("Start applying the lambda function manifest") + client, err := provider.DefaultRegistry().Client(cloudProviderName, cloudProviderCfg, in.Logger) if err != nil { - in.LogPersister.Errorf("Unable to decide revision name for the commit %s (%v)", commit, err) - return + in.LogPersister.Errorf("Unable to create Lambda client for the provider %s: %v", cloudProviderName, err) + return false } - ok = true - return + // Build and publish new version of Lambda function. + version, ok := build(ctx, in, client, fm) + if !ok { + in.LogPersister.Errorf("Failed to build new version for Lambda function %s", fm.Spec.Name) + return false + } + + trafficCfg, err := client.GetTrafficConfig(ctx, fm) + // Create Alias on not yet existed. + if errors.Is(err, provider.ErrNotFound) { + if err := client.CreateTrafficConfig(ctx, fm, version); err != nil { + in.LogPersister.Errorf("Failed to create traffic routing for Lambda function %s (version: %s): %v", fm.Spec.Name, version, err) + return false + } + in.LogPersister.Infof("Successfully applied the lambda function manifest") + return true + } + if err != nil { + in.LogPersister.Errorf("Failed to prepare traffic routing for Lambda function %s: %v", fm.Spec.Name, err) + return false + } + + // Update 100% traffic to the new lambda version. + if !configureTrafficRouting(trafficCfg, version, 100) { + in.LogPersister.Errorf("Failed to prepare traffic routing for Lambda function %s", fm.Spec.Name) + return false + } + + if err = client.UpdateTrafficConfig(ctx, fm, trafficCfg); err != nil { + in.LogPersister.Errorf("Failed to update traffic routing for Lambda function %s (version: %s): %v", fm.Spec.Name, version, err) + return false + } + + in.LogPersister.Infof("Successfully applied the manifest for Lambda function %s version (v%s)", fm.Spec.Name, version) + return true } -func sync(ctx context.Context, in *executor.Input, cloudProviderName string, cloudProviderCfg *config.CloudProviderLambdaConfig, fm provider.FunctionManifest) bool { - in.LogPersister.Infof("Start applying the lambda function manifest") +func rollout(ctx context.Context, in *executor.Input, cloudProviderName string, cloudProviderCfg *config.CloudProviderLambdaConfig, fm provider.FunctionManifest) bool { + in.LogPersister.Infof("Start rolling out the lambda function: %s", fm.Spec.Name) client, err := provider.DefaultRegistry().Client(cloudProviderName, cloudProviderCfg, in.Logger) if err != nil { in.LogPersister.Errorf("Unable to create Lambda client for the provider %s: %v", cloudProviderName, err) return false } + // Build and publish new version of Lambda function. + version, ok := build(ctx, in, client, fm) + if !ok { + in.LogPersister.Errorf("Failed to build new version for Lambda function %s", fm.Spec.Name) + return false + } + + // Update rolled out version name to metadata store + rolloutVersionKeyName := fmt.Sprintf("%s-rollout", fm.Spec.Name) + if err := in.MetadataStore.Set(ctx, rolloutVersionKeyName, version); err != nil { + in.LogPersister.Errorf("Failed to update latest version name to metadata store for Lambda function %s: %v", fm.Spec.Name, err) + return false + } + + return true +} + +func promote(ctx context.Context, in *executor.Input, cloudProviderName string, cloudProviderCfg *config.CloudProviderLambdaConfig, fm provider.FunctionManifest) bool { + in.LogPersister.Infof("Start promote new version of the lambda function: %s", fm.Spec.Name) + client, err := provider.DefaultRegistry().Client(cloudProviderName, cloudProviderCfg, in.Logger) + if err != nil { + in.LogPersister.Errorf("Unable to create Lambda client for the provider %s: %v", cloudProviderName, err) + return false + } + + rolloutVersionKeyName := fmt.Sprintf("%s-rollout", fm.Spec.Name) + version, ok := in.MetadataStore.Get(rolloutVersionKeyName) + if !ok { + in.LogPersister.Errorf("Unable to prepare version to promote for Lambda function %s: Not found", fm.Spec.Name) + return false + } + + options := in.StageConfig.LambdaPromoteStageOptions + if options == nil { + in.LogPersister.Errorf("Malformed configuration for stage %s", in.Stage.Name) + return false + } + + trafficCfg, err := client.GetTrafficConfig(ctx, fm) + // Create Alias on not yet existed. + if errors.Is(err, provider.ErrNotFound) { + if options.Percent != 100 { + in.LogPersister.Errorf("Not previous version available to handle traffic, new version has to get 100 percent of traffic") + return false + } + if err := client.CreateTrafficConfig(ctx, fm, version); err != nil { + in.LogPersister.Errorf("Failed to create traffic routing for Lambda function %s (version: %s): %v", fm.Spec.Name, version, err) + return false + } + in.LogPersister.Infof("Successfully route all traffic to the lambda function %s (version %s)", fm.Spec.Name, version) + return true + } + if err != nil { + in.LogPersister.Errorf("Failed to prepare traffic routing for Lambda function %s: %v", fm.Spec.Name, err) + return false + } + + // Update traffic to the new lambda version. + if !configureTrafficRouting(trafficCfg, version, options.Percent) { + in.LogPersister.Errorf("Failed to prepare traffic routing for Lambda function %s", fm.Spec.Name) + return false + } + + if err = client.UpdateTrafficConfig(ctx, fm, trafficCfg); err != nil { + in.LogPersister.Errorf("Failed to update traffic routing for Lambda function %s (version: %s): %v", fm.Spec.Name, version, err) + return false + } + + in.LogPersister.Infof("Successfully promote new version (v%s) of Lambda function %s, it will handle %v percent of traffic", version, fm.Spec.Name, options.Percent) + return true +} + +func configureTrafficRouting(trafficCfg provider.RoutingTrafficConfig, version string, percent int) bool { + // The primary version has to be set on trafficCfg. + primary, ok := trafficCfg["primary"] + if !ok { + return false + } + // Set built version by rollout stage as new primary. + trafficCfg["primary"] = provider.VersionTraffic{ + Version: version, + Percent: float64(percent), + } + // Make the current primary version as new secondary version in case it's not the latest built version by rollout stage. + if primary.Version != version { + trafficCfg["secondary"] = provider.VersionTraffic{ + Version: primary.Version, + Percent: float64(100 - percent), + } + } else { + // Update traffic to the secondary and keep it as new secondary. + if secondary, ok := trafficCfg["secondary"]; ok { + trafficCfg["secondary"] = provider.VersionTraffic{ + Version: secondary.Version, + Percent: float64(100 - percent), + } + } + } + return true +} + +func build(ctx context.Context, in *executor.Input, client provider.Client, fm provider.FunctionManifest) (version string, ok bool) { found, err := client.IsFunctionExist(ctx, fm.Spec.Name) if err != nil { in.LogPersister.Errorf("Unable to validate function name %s: %v", fm.Spec.Name, err) - return false + return } if found { if err := client.UpdateFunction(ctx, fm); err != nil { in.LogPersister.Errorf("Failed to update lambda function %s: %v", fm.Spec.Name, err) - return false + return } } else { if err := client.CreateFunction(ctx, fm); err != nil { in.LogPersister.Errorf("Failed to create lambda function %s: %v", fm.Spec.Name, err) - return false + return } } @@ -114,7 +251,6 @@ func sync(ctx context.Context, in *executor.Input, cloudProviderName string, clo retry := backoff.NewRetry(provider.RequestRetryTime, backoff.NewConstant(provider.RetryIntervalDuration)) publishFunctionSucceed := false startWaitingStamp := time.Now() - var version string for retry.WaitNext(ctx) { // Commit version for applied Lambda function. // Note: via the current docs of [Lambda.PublishVersion](https://docs.aws.amazon.com/sdk-for-go/api/service/lambda/#Lambda.PublishVersion) @@ -124,43 +260,16 @@ func sync(ctx context.Context, in *executor.Input, cloudProviderName string, clo if err != nil { in.Logger.Error("Failed publish new version for Lambda function") } else { - in.LogPersister.Infof("Successfully committed new version for Lambda function %s after duration %v", fm.Spec.Name, time.Since(startWaitingStamp)) publishFunctionSucceed = true break } } if !publishFunctionSucceed { in.LogPersister.Errorf("Failed to commit new version for Lambda function %s: %v", fm.Spec.Name, err) - return false - } - - _, err = client.GetTrafficConfig(ctx, fm) - // Create Alias on not yet existed. - if errors.Is(err, provider.ErrNotFound) { - if err := client.CreateTrafficConfig(ctx, fm, version); err != nil { - in.LogPersister.Errorf("Failed to create traffic routing for Lambda function %s (version: %s): %v", fm.Spec.Name, version, err) - return false - } - in.LogPersister.Infof("Successfully applied the lambda function manifest") - return true - } - if err != nil { - in.LogPersister.Errorf("Failed to prepare traffic routing for Lambda function %s: %v", fm.Spec.Name, err) - return false - } - - // Update 100% traffic to the new lambda version. - routingCfg := []provider.VersionTraffic{ - { - Version: version, - Percent: 100, - }, - } - if err = client.UpdateTrafficConfig(ctx, fm, routingCfg); err != nil { - in.LogPersister.Errorf("Failed to update traffic routing for Lambda function %s (version: %s): %v", fm.Spec.Name, version, err) - return false + return } - in.LogPersister.Infof("Successfully applied the lambda function manifest") - return true + in.LogPersister.Infof("Successfully committed new version (v%s) for Lambda function %s after duration %v", version, fm.Spec.Name, time.Since(startWaitingStamp)) + ok = true + return } diff --git a/pkg/app/piped/executor/lambda/lambda_test.go b/pkg/app/piped/executor/lambda/lambda_test.go new file mode 100644 index 0000000000..1f8b4d53a6 --- /dev/null +++ b/pkg/app/piped/executor/lambda/lambda_test.go @@ -0,0 +1,141 @@ +// Copyright 2021 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lambda + +import ( + "testing" + + provider "github.com/pipe-cd/pipe/pkg/app/piped/cloudprovider/lambda" + "github.com/stretchr/testify/assert" +) + +func TestConfigureTrafficRouting(t *testing.T) { + testcases := []struct { + name string + version string + percent int + primary *provider.VersionTraffic + secondary *provider.VersionTraffic + out bool + }{ + { + name: "failed on invalid routing config: primary is missing", + version: "2", + percent: 100, + primary: nil, + secondary: nil, + out: false, + }, + { + name: "configure successfully in case only primary provided", + version: "2", + percent: 100, + primary: &provider.VersionTraffic{ + Version: "1", + Percent: 100, + }, + secondary: nil, + out: true, + }, + { + name: "configure successfully in case set new primary lower than 100 percent", + version: "2", + percent: 70, + primary: &provider.VersionTraffic{ + Version: "1", + Percent: 100, + }, + secondary: nil, + out: true, + }, + { + name: "configure successfully in case set new primary lower than 100 percent and currently 2 versions is set", + version: "3", + percent: 70, + primary: &provider.VersionTraffic{ + Version: "2", + Percent: 50, + }, + secondary: &provider.VersionTraffic{ + Version: "1", + Percent: 50, + }, + out: true, + }, + { + name: "configure successfully in case set new primary to 100 percent and currently 2 versions is set", + version: "3", + percent: 100, + primary: &provider.VersionTraffic{ + Version: "2", + Percent: 50, + }, + secondary: &provider.VersionTraffic{ + Version: "1", + Percent: 50, + }, + out: true, + }, + { + name: "configure successfully in case new primary is the same as current primary", + version: "2", + percent: 100, + primary: &provider.VersionTraffic{ + Version: "2", + Percent: 50, + }, + secondary: &provider.VersionTraffic{ + Version: "1", + Percent: 50, + }, + out: true, + }, + { + name: "configure successfully in case new primary is the same as current secondary", + version: "2", + percent: 100, + primary: &provider.VersionTraffic{ + Version: "1", + Percent: 50, + }, + secondary: &provider.VersionTraffic{ + Version: "2", + Percent: 50, + }, + out: true, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + trafficCfg := make(map[string]provider.VersionTraffic) + if tc.primary != nil { + trafficCfg["primary"] = *tc.primary + } + if tc.secondary != nil { + trafficCfg["secondary"] = *tc.secondary + } + ok := configureTrafficRouting(trafficCfg, tc.version, tc.percent) + assert.Equal(t, tc.out, ok) + if primary, ok := trafficCfg["primary"]; ok { + assert.Equal(t, tc.version, primary.Version) + assert.Equal(t, float64(tc.percent), primary.Percent) + if secondary, ok := trafficCfg["secondary"]; ok { + assert.Equal(t, float64(100-tc.percent), secondary.Percent) + } + } + }) + } +} diff --git a/pkg/app/piped/planner/lambda/lambda.go b/pkg/app/piped/planner/lambda/lambda.go index 3677d10aeb..7e8ddb0be8 100644 --- a/pkg/app/piped/planner/lambda/lambda.go +++ b/pkg/app/piped/planner/lambda/lambda.go @@ -70,7 +70,12 @@ func (p *Planner) Plan(ctx context.Context, in planner.Input) (out planner.Outpu out.Summary = fmt.Sprintf("Quick sync to deploy image %s and configure all traffic to it (forced via web)", out.Version) return case model.SyncStrategy_PIPELINE: - err = fmt.Errorf("pipeline sync for lambda application is not yet implemented") + if cfg.Pipeline == nil { + err = fmt.Errorf("unable to force sync with pipeline because no pipeline was specified") + return + } + out.Stages = buildProgressivePipeline(cfg.Pipeline, cfg.Input.AutoRollback, time.Now()) + out.Summary = fmt.Sprintf("Sync with pipeline to deploy image %s (forced via web)", out.Version) return } @@ -89,7 +94,18 @@ func (p *Planner) Plan(ctx context.Context, in planner.Input) (out planner.Outpu return } - err = fmt.Errorf("currently only QUICK_SYNC strategy deployement is supported") + // Load service manifest at the last deployed commit to decide running version. + ds, err = in.RunningDSP.Get(ctx, ioutil.Discard) + if err == nil { + if lastVersion, e := determineVersion(ds.AppDir, cfg.Input.FunctionManifestFile); e == nil { + out.Stages = buildProgressivePipeline(cfg.Pipeline, cfg.Input.AutoRollback, time.Now()) + out.Summary = fmt.Sprintf("Sync with pipeline to update image from %s to %s", lastVersion, out.Version) + return + } + } + + out.Stages = buildProgressivePipeline(cfg.Pipeline, cfg.Input.AutoRollback, time.Now()) + out.Summary = "Sync with the specified pipeline" return } diff --git a/pkg/app/piped/planner/lambda/pipeline.go b/pkg/app/piped/planner/lambda/pipeline.go index 88057a0f61..3a8acdf526 100644 --- a/pkg/app/piped/planner/lambda/pipeline.go +++ b/pkg/app/piped/planner/lambda/pipeline.go @@ -71,3 +71,49 @@ func buildQuickSyncPipeline(autoRollback bool, now time.Time) []*model.PipelineS return out } + +func buildProgressivePipeline(pp *config.DeploymentPipeline, autoRollback bool, now time.Time) []*model.PipelineStage { + var ( + preStageID = "" + out = make([]*model.PipelineStage, 0, len(pp.Stages)) + ) + + for i, s := range pp.Stages { + id := s.Id + if id == "" { + id = fmt.Sprintf("stage-%d", i) + } + stage := &model.PipelineStage{ + Id: id, + Name: s.Name.String(), + Desc: s.Desc, + Index: int32(i), + Predefined: false, + Visible: true, + Status: model.StageStatus_STAGE_NOT_STARTED_YET, + CreatedAt: now.Unix(), + UpdatedAt: now.Unix(), + } + if preStageID != "" { + stage.Requires = []string{preStageID} + } + preStageID = id + out = append(out, stage) + } + + if autoRollback { + s, _ := planner.GetPredefinedStage(planner.PredefinedStageRollback) + out = append(out, &model.PipelineStage{ + Id: s.Id, + Name: s.Name.String(), + Desc: s.Desc, + Predefined: true, + Visible: false, + Status: model.StageStatus_STAGE_NOT_STARTED_YET, + CreatedAt: now.Unix(), + UpdatedAt: now.Unix(), + }) + } + + return out +} diff --git a/pkg/config/deployment.go b/pkg/config/deployment.go index 7e7f5ffe60..21375e86e0 100644 --- a/pkg/config/deployment.go +++ b/pkg/config/deployment.go @@ -222,6 +222,11 @@ func (s *PipelineStage) UnmarshalJSON(data []byte) error { if len(gs.With) > 0 { err = json.Unmarshal(gs.With, s.LambdaPromoteStageOptions) } + case model.StageLambdaCanaryRollout: + s.LambdaCanaryRolloutStageOptions = &LambdaCanaryRolloutStageOptions{} + if len(gs.With) > 0 { + err = json.Unmarshal(gs.With, s.LambdaCanaryRolloutStageOptions) + } default: err = fmt.Errorf("unsupported stage name: %s", s.Name) diff --git a/pkg/model/stage.go b/pkg/model/stage.go index 4e60989d2a..12d1410f72 100644 --- a/pkg/model/stage.go +++ b/pkg/model/stage.go @@ -68,6 +68,9 @@ const ( // StageLambdaSync does quick sync by rolling out the new version // and switching all traffic to it. StageLambdaSync Stage = "LAMBDA_SYNC" + // StageLambdaCanaryRollout represents the state where + // the CANARY variant resources has been rolled out with the new version/configuration. + StageLambdaCanaryRollout Stage = "LAMBDA_CANARY_ROLLOUT" // StageLambdaPromote prmotes the new version to receive amount of traffic. StageLambdaPromote Stage = "LAMBDA_PROMOTE"