diff --git a/pkg/app/piped/cloudprovider/lambda/client.go b/pkg/app/piped/cloudprovider/lambda/client.go index 9facbe954f..63cae176ff 100644 --- a/pkg/app/piped/cloudprovider/lambda/client.go +++ b/pkg/app/piped/cloudprovider/lambda/client.go @@ -16,6 +16,7 @@ package lambda import ( "context" + "errors" "fmt" "github.com/aws/aws-sdk-go/aws" @@ -28,6 +29,11 @@ import ( "go.uber.org/zap" ) +const defaultAliasName = "Service" + +// ErrNotFound lambda resource occurred. +var ErrNotFound = errors.New("lambda resource not found") + type client struct { region string client *lambda.Lambda @@ -66,7 +72,31 @@ func newClient(region, profile, credentialsFile string, logger *zap.Logger) (*cl return c, nil } -func (c *client) Apply(ctx context.Context, fm FunctionManifest) error { +func (c *client) IsFunctionExist(ctx context.Context, name string) (bool, error) { + input := &lambda.GetFunctionInput{ + FunctionName: aws.String(name), + } + _, err := c.client.GetFunctionWithContext(ctx, input) + if err != nil { + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case lambda.ErrCodeInvalidParameterValueException: + return false, fmt.Errorf("invalid parameter given: %w", err) + case lambda.ErrCodeServiceException: + return false, fmt.Errorf("aws lambda service encountered an internal error: %w", err) + case lambda.ErrCodeTooManyRequestsException: + return false, fmt.Errorf("request throughput limit was exceeded: %w", err) + // Only in case ResourceNotFound error occurred, the FunctionName is available for create so do not raise error. + case lambda.ErrCodeResourceNotFoundException: + return false, nil + } + } + return false, fmt.Errorf("unknown error given: %w", err) + } + return true, nil +} + +func (c *client) CreateFunction(ctx context.Context, fm FunctionManifest) error { input := &lambda.CreateFunctionInput{ Code: &lambda.FunctionCode{ ImageUri: aws.String(fm.Spec.ImageURI), @@ -87,9 +117,190 @@ func (c *client) Apply(ctx context.Context, fm FunctionManifest) error { return fmt.Errorf("total code size per account exceeded: %w", err) case lambda.ErrCodeResourceNotFoundException, lambda.ErrCodeResourceNotReadyException: return fmt.Errorf("resource error occurred: %w", err) + case lambda.ErrCodeTooManyRequestsException: + return fmt.Errorf("request throughput limit was exceeded: %w", err) + } + } + return fmt.Errorf("unknown error given: %w", err) + } + return nil +} + +func (c *client) UpdateFunction(ctx context.Context, fm FunctionManifest) error { + codeInput := &lambda.UpdateFunctionCodeInput{ + FunctionName: aws.String(fm.Spec.Name), + ImageUri: aws.String(fm.Spec.ImageURI), + } + _, err := c.client.UpdateFunctionCodeWithContext(ctx, codeInput) + if err != nil { + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case lambda.ErrCodeInvalidParameterValueException: + return fmt.Errorf("invalid parameter given: %w", err) + case lambda.ErrCodeServiceException: + return fmt.Errorf("aws lambda service encountered an internal error: %w", err) + case lambda.ErrCodeCodeStorageExceededException: + return fmt.Errorf("total code size per account exceeded: %w", err) + case lambda.ErrCodeTooManyRequestsException: + return fmt.Errorf("request throughput limit was exceeded: %w", err) + case lambda.ErrCodeResourceConflictException: + return fmt.Errorf("resource already existed or in progress: %w", err) + } + } + return fmt.Errorf("unknown error given: %w", err) + } + + // TODO: Support more configurable fields using Lambda.UpdateFunctionConfiguration. + // https://docs.aws.amazon.com/sdk-for-go/api/service/lambda/#UpdateFunctionConfiguration + + return nil +} + +func (c *client) PublishFunction(ctx context.Context, fm FunctionManifest) (version string, err error) { + input := &lambda.PublishVersionInput{ + FunctionName: aws.String(fm.Spec.Name), + } + cfg, err := c.client.PublishVersionWithContext(ctx, input) + if err != nil { + aerr, ok := err.(awserr.Error) + if !ok { + err = fmt.Errorf("unknown error given: %w", err) + return + } + switch aerr.Code() { + case lambda.ErrCodeInvalidParameterValueException: + err = fmt.Errorf("invalid parameter given: %w", err) + case lambda.ErrCodeServiceException: + err = fmt.Errorf("aws lambda service encountered an internal error: %w", err) + case lambda.ErrCodeTooManyRequestsException: + err = fmt.Errorf("request throughput limit was exceeded: %w", err) + case lambda.ErrCodeCodeStorageExceededException: + err = fmt.Errorf("total code size per account exceeded: %w", err) + case lambda.ErrCodeResourceNotFoundException: + err = fmt.Errorf("resource not found: %w", err) + case lambda.ErrCodeResourceConflictException: + err = fmt.Errorf("resource already existed or in progress: %w", err) + } + return + } + version = *cfg.Version + return +} + +// VersionTraffic presents the version, and the percent of traffic that's routed to it. +type VersionTraffic struct { + Version string + Percent float64 +} + +func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (routingTrafficCfg []VersionTraffic, err error) { + input := &lambda.GetAliasInput{ + FunctionName: aws.String(fm.Spec.Name), + Name: aws.String(defaultAliasName), + } + + cfg, err := c.client.GetAliasWithContext(ctx, input) + if err != nil { + aerr, ok := err.(awserr.Error) + if !ok { + err = fmt.Errorf("unknown error given: %w", err) + return + } + switch aerr.Code() { + case lambda.ErrCodeInvalidParameterValueException: + err = fmt.Errorf("invalid parameter given: %w", err) + case lambda.ErrCodeServiceException: + err = fmt.Errorf("aws lambda service encountered an internal error: %w", err) + case lambda.ErrCodeTooManyRequestsException: + err = fmt.Errorf("request throughput limit was exceeded: %w", err) + case lambda.ErrCodeResourceNotFoundException: + err = ErrNotFound + } + return + } + + // TODO: Fix Lambda.AliasConfiguration.RoutingConfig nil value. + if cfg.RoutingConfig == nil { + return + } + routingTrafficCfg = make([]VersionTraffic, 0, len(cfg.RoutingConfig.AdditionalVersionWeights)) + for version := range cfg.RoutingConfig.AdditionalVersionWeights { + routingTrafficCfg = append(routingTrafficCfg, VersionTraffic{ + Version: version, + Percent: *cfg.RoutingConfig.AdditionalVersionWeights[version], + }) + } + return +} + +func (c *client) CreateTrafficConfig(ctx context.Context, fm FunctionManifest, version string) error { + input := &lambda.CreateAliasInput{ + FunctionName: aws.String(fm.Spec.Name), + FunctionVersion: aws.String(version), + Name: aws.String(defaultAliasName), + } + _, err := c.client.CreateAliasWithContext(ctx, input) + if err != nil { + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case lambda.ErrCodeInvalidParameterValueException: + return fmt.Errorf("invalid parameter given: %w", err) + case lambda.ErrCodeServiceException: + return fmt.Errorf("aws lambda service encountered an internal error: %w", err) + case lambda.ErrCodeTooManyRequestsException: + return fmt.Errorf("request throughput limit was exceeded: %w", err) + case lambda.ErrCodeResourceNotFoundException: + return fmt.Errorf("resource not found: %w", err) + case lambda.ErrCodeResourceConflictException: + return fmt.Errorf("resource already existed or in progress: %w", err) } } return fmt.Errorf("unknown error given: %w", err) } return nil } + +func (c *client) UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, routingTraffic []VersionTraffic) error { + routingTrafficMap := make(map[string]*float64) + switch len(routingTraffic) { + case 2: + routingTrafficMap[routingTraffic[0].Version] = aws.Float64(precentToPercentage(routingTraffic[0].Percent)) + routingTrafficMap[routingTraffic[1].Version] = aws.Float64(precentToPercentage(routingTraffic[1].Percent)) + case 1: + routingTrafficMap[routingTraffic[0].Version] = aws.Float64(precentToPercentage(routingTraffic[0].Percent)) + default: + return fmt.Errorf("invalid routing traffic configuration given") + } + + input := &lambda.UpdateAliasInput{ + FunctionName: aws.String(fm.Spec.Name), + Name: aws.String(defaultAliasName), + RoutingConfig: &lambda.AliasRoutingConfiguration{ + AdditionalVersionWeights: routingTrafficMap, + }, + } + + _, err := c.client.UpdateAliasWithContext(ctx, input) + if err != nil { + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case lambda.ErrCodeInvalidParameterValueException: + return fmt.Errorf("invalid parameter given: %w", err) + case lambda.ErrCodeServiceException: + return fmt.Errorf("aws lambda service encountered an internal error: %w", err) + case lambda.ErrCodeTooManyRequestsException: + return fmt.Errorf("request throughput limit was exceeded: %w", err) + case lambda.ErrCodeResourceNotFoundException: + return fmt.Errorf("resource not found: %w", err) + case lambda.ErrCodeResourceConflictException: + return fmt.Errorf("resource already existed or in progress: %w", err) + } + } + return fmt.Errorf("unknown error given: %w", err) + } + return nil +} + +func precentToPercentage(in float64) float64 { + return in / 100.0 +} diff --git a/pkg/app/piped/cloudprovider/lambda/lambda.go b/pkg/app/piped/cloudprovider/lambda/lambda.go index a46494da90..632e071783 100644 --- a/pkg/app/piped/cloudprovider/lambda/lambda.go +++ b/pkg/app/piped/cloudprovider/lambda/lambda.go @@ -25,19 +25,28 @@ import ( "github.com/pipe-cd/pipe/pkg/config" ) -const DefaultFunctionManifestFilename = "function.yaml" +const defaultFunctionManifestFilename = "function.yaml" +// Client is wrapper of AWS client. type Client interface { - Apply(ctx context.Context, fm FunctionManifest) error + IsFunctionExist(ctx context.Context, name string) (bool, error) + CreateFunction(ctx context.Context, fm FunctionManifest) error + UpdateFunction(ctx context.Context, fm FunctionManifest) error + PublishFunction(ctx context.Context, fm FunctionManifest) (version string, err error) + GetTrafficConfig(ctx context.Context, fm FunctionManifest) (routingTrafficCfg []VersionTraffic, err error) + CreateTrafficConfig(ctx context.Context, fm FunctionManifest, version string) error + UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, routingTraffic []VersionTraffic) error } +// Registry holds a pool of aws client wrappers. type Registry interface { Client(name string, cfg *config.CloudProviderLambdaConfig, logger *zap.Logger) (Client, error) } +// LoadFunctionManifest returns FunctionManifest object from a given Function config manifest file. func LoadFunctionManifest(appDir, functionManifestFilename string) (FunctionManifest, error) { if functionManifestFilename == "" { - functionManifestFilename = DefaultFunctionManifestFilename + functionManifestFilename = defaultFunctionManifestFilename } path := filepath.Join(appDir, functionManifestFilename) return loadFunctionManifest(path) @@ -77,6 +86,7 @@ var defaultRegistry = ®istry{ newGroup: &singleflight.Group{}, } +// DefaultRegistry returns a pool of aws clients and a mutex associated with it. func DefaultRegistry() Registry { return defaultRegistry } diff --git a/pkg/app/piped/executor/lambda/deploy.go b/pkg/app/piped/executor/lambda/deploy.go index 809beca203..1a8460387d 100644 --- a/pkg/app/piped/executor/lambda/deploy.go +++ b/pkg/app/piped/executor/lambda/deploy.go @@ -77,7 +77,7 @@ func (e *deployExecutor) ensureSync(ctx context.Context) model.StageStatus { return model.StageStatus_STAGE_FAILURE } - if !apply(ctx, &e.Input, e.cloudProviderName, e.cloudProviderCfg, fm) { + if !sync(ctx, &e.Input, e.cloudProviderName, e.cloudProviderCfg, fm) { return model.StageStatus_STAGE_FAILURE } diff --git a/pkg/app/piped/executor/lambda/lambda.go b/pkg/app/piped/executor/lambda/lambda.go index b16baaa2fa..bb2edbedff 100644 --- a/pkg/app/piped/executor/lambda/lambda.go +++ b/pkg/app/piped/executor/lambda/lambda.go @@ -16,6 +16,8 @@ package lambda import ( "context" + "errors" + "time" provider "github.com/pipe-cd/pipe/pkg/app/piped/cloudprovider/lambda" "github.com/pipe-cd/pipe/pkg/app/piped/deploysource" @@ -82,7 +84,7 @@ func decideRevisionName(in *executor.Input, fm provider.FunctionManifest, commit return } -func apply(ctx context.Context, in *executor.Input, cloudProviderName string, cloudProviderCfg *config.CloudProviderLambdaConfig, fm provider.FunctionManifest) bool { +func sync(ctx context.Context, in *executor.Input, cloudProviderName string, cloudProviderCfg *config.CloudProviderLambdaConfig, fm provider.FunctionManifest) bool { in.LogPersister.Infof("Start applying the lambda function manifest") client, err := provider.DefaultRegistry().Client(cloudProviderName, cloudProviderCfg, in.Logger) if err != nil { @@ -90,8 +92,62 @@ func apply(ctx context.Context, in *executor.Input, cloudProviderName string, cl return false } - if err := client.Apply(ctx, fm); err != nil { - in.LogPersister.Errorf("Failed to apply the lambda function manifest (%v)", err) + found, err := client.IsFunctionExist(ctx, fm.Spec.Name) + if err != nil { + in.LogPersister.Errorf("Unable to validate function name %s: %v", fm.Spec.Name, err) + return false + } + if found { + if err := client.UpdateFunction(ctx, fm); err != nil { + in.LogPersister.Errorf("Failed to update lambda function %s: %v", fm.Spec.Name, err) + return false + } + } else { + if err := client.CreateFunction(ctx, fm); err != nil { + in.LogPersister.Errorf("Failed to create lambda function %s: %v", fm.Spec.Name, err) + return false + } + } + + // TODO: Using backoff instead of time sleep waiting for a specific duration of time. + // Wait before ready to commit change. + in.LogPersister.Info("Waiting to update lambda function in progress...") + time.Sleep(3 * time.Minute) + + // Commit version for applied Lambda function. + // Note: via the current docs of [Lambda.PublishVersion](https://docs.aws.amazon.com/sdk-for-go/api/service/lambda/#Lambda.PublishVersion) + // AWS Lambda doesn't publish a version if the function's configuration and code haven't changed since the last version. + // But currently, unchanged revision is able to make publish (versionId++) as usual. + version, err := client.PublishFunction(ctx, fm) + if err != nil { + in.LogPersister.Errorf("Failed to commit new version for Lambda function %s: %v", fm.Spec.Name, err) + return false + } + + _, err = client.GetTrafficConfig(ctx, fm) + // Create Alias on not yet existed. + if errors.Is(err, provider.ErrNotFound) { + if err := client.CreateTrafficConfig(ctx, fm, version); err != nil { + in.LogPersister.Errorf("Failed to create traffic routing for Lambda function %s (version: %s): %v", fm.Spec.Name, version, err) + return false + } + in.LogPersister.Infof("Successfully applied the lambda function manifest") + return true + } + if err != nil { + in.LogPersister.Errorf("Failed to prepare traffic routing for Lambda function %s: %v", fm.Spec.Name, err) + return false + } + + // Update 100% traffic to the new lambda version. + routingCfg := []provider.VersionTraffic{ + { + Version: version, + Percent: 100, + }, + } + if err = client.UpdateTrafficConfig(ctx, fm, routingCfg); err != nil { + in.LogPersister.Errorf("Failed to update traffic routing for Lambda function %s (version: %s): %v", fm.Spec.Name, version, err) return false }