Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 212 additions & 1 deletion pkg/app/piped/cloudprovider/lambda/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package lambda

import (
"context"
"errors"
"fmt"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -28,6 +29,11 @@ import (
"go.uber.org/zap"
)

const defaultAliasName = "Service"

// ErrNotFound lambda resource occurred.
var ErrNotFound = errors.New("lambda resource not found")

type client struct {
region string
client *lambda.Lambda
Expand Down Expand Up @@ -66,7 +72,31 @@ func newClient(region, profile, credentialsFile string, logger *zap.Logger) (*cl
return c, nil
}

func (c *client) Apply(ctx context.Context, fm FunctionManifest) error {
func (c *client) IsFunctionExist(ctx context.Context, name string) (bool, error) {
input := &lambda.GetFunctionInput{
FunctionName: aws.String(name),
}
_, err := c.client.GetFunctionWithContext(ctx, input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case lambda.ErrCodeInvalidParameterValueException:
return false, fmt.Errorf("invalid parameter given: %w", err)
case lambda.ErrCodeServiceException:
return false, fmt.Errorf("aws lambda service encountered an internal error: %w", err)
case lambda.ErrCodeTooManyRequestsException:
return false, fmt.Errorf("request throughput limit was exceeded: %w", err)
// Only in case ResourceNotFound error occurred, the FunctionName is available for create so do not raise error.
case lambda.ErrCodeResourceNotFoundException:
return false, nil
}
}
return false, fmt.Errorf("unknown error given: %w", err)
}
return true, nil
}

func (c *client) CreateFunction(ctx context.Context, fm FunctionManifest) error {
input := &lambda.CreateFunctionInput{
Code: &lambda.FunctionCode{
ImageUri: aws.String(fm.Spec.ImageURI),
Expand All @@ -87,9 +117,190 @@ func (c *client) Apply(ctx context.Context, fm FunctionManifest) error {
return fmt.Errorf("total code size per account exceeded: %w", err)
case lambda.ErrCodeResourceNotFoundException, lambda.ErrCodeResourceNotReadyException:
return fmt.Errorf("resource error occurred: %w", err)
case lambda.ErrCodeTooManyRequestsException:
return fmt.Errorf("request throughput limit was exceeded: %w", err)
}
}
return fmt.Errorf("unknown error given: %w", err)
}
return nil
}

func (c *client) UpdateFunction(ctx context.Context, fm FunctionManifest) error {
codeInput := &lambda.UpdateFunctionCodeInput{
FunctionName: aws.String(fm.Spec.Name),
ImageUri: aws.String(fm.Spec.ImageURI),
}
_, err := c.client.UpdateFunctionCodeWithContext(ctx, codeInput)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case lambda.ErrCodeInvalidParameterValueException:
return fmt.Errorf("invalid parameter given: %w", err)
case lambda.ErrCodeServiceException:
return fmt.Errorf("aws lambda service encountered an internal error: %w", err)
case lambda.ErrCodeCodeStorageExceededException:
return fmt.Errorf("total code size per account exceeded: %w", err)
case lambda.ErrCodeTooManyRequestsException:
return fmt.Errorf("request throughput limit was exceeded: %w", err)
case lambda.ErrCodeResourceConflictException:
return fmt.Errorf("resource already existed or in progress: %w", err)
}
}
return fmt.Errorf("unknown error given: %w", err)
}

// TODO: Support more configurable fields using Lambda.UpdateFunctionConfiguration.
// https://docs.aws.amazon.com/sdk-for-go/api/service/lambda/#UpdateFunctionConfiguration

return nil
}

func (c *client) PublishFunction(ctx context.Context, fm FunctionManifest) (version string, err error) {
input := &lambda.PublishVersionInput{
FunctionName: aws.String(fm.Spec.Name),
}
cfg, err := c.client.PublishVersionWithContext(ctx, input)
if err != nil {
aerr, ok := err.(awserr.Error)
if !ok {
err = fmt.Errorf("unknown error given: %w", err)
return
}
switch aerr.Code() {
case lambda.ErrCodeInvalidParameterValueException:
err = fmt.Errorf("invalid parameter given: %w", err)
case lambda.ErrCodeServiceException:
err = fmt.Errorf("aws lambda service encountered an internal error: %w", err)
case lambda.ErrCodeTooManyRequestsException:
err = fmt.Errorf("request throughput limit was exceeded: %w", err)
case lambda.ErrCodeCodeStorageExceededException:
err = fmt.Errorf("total code size per account exceeded: %w", err)
case lambda.ErrCodeResourceNotFoundException:
err = fmt.Errorf("resource not found: %w", err)
case lambda.ErrCodeResourceConflictException:
err = fmt.Errorf("resource already existed or in progress: %w", err)
}
return
}
version = *cfg.Version
return
}

// VersionTraffic presents the version, and the percent of traffic that's routed to it.
type VersionTraffic struct {
Version string
Percent float64
}

func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (routingTrafficCfg []VersionTraffic, err error) {
input := &lambda.GetAliasInput{
FunctionName: aws.String(fm.Spec.Name),
Name: aws.String(defaultAliasName),
}

cfg, err := c.client.GetAliasWithContext(ctx, input)
if err != nil {
aerr, ok := err.(awserr.Error)
if !ok {
err = fmt.Errorf("unknown error given: %w", err)
return
}
switch aerr.Code() {
case lambda.ErrCodeInvalidParameterValueException:
err = fmt.Errorf("invalid parameter given: %w", err)
case lambda.ErrCodeServiceException:
err = fmt.Errorf("aws lambda service encountered an internal error: %w", err)
case lambda.ErrCodeTooManyRequestsException:
err = fmt.Errorf("request throughput limit was exceeded: %w", err)
case lambda.ErrCodeResourceNotFoundException:
err = ErrNotFound
}
return
}

// TODO: Fix Lambda.AliasConfiguration.RoutingConfig nil value.
if cfg.RoutingConfig == nil {
return
}
routingTrafficCfg = make([]VersionTraffic, 0, len(cfg.RoutingConfig.AdditionalVersionWeights))
for version := range cfg.RoutingConfig.AdditionalVersionWeights {
routingTrafficCfg = append(routingTrafficCfg, VersionTraffic{
Version: version,
Percent: *cfg.RoutingConfig.AdditionalVersionWeights[version],
})
}
return
}

func (c *client) CreateTrafficConfig(ctx context.Context, fm FunctionManifest, version string) error {
input := &lambda.CreateAliasInput{
FunctionName: aws.String(fm.Spec.Name),
FunctionVersion: aws.String(version),
Name: aws.String(defaultAliasName),
}
_, err := c.client.CreateAliasWithContext(ctx, input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case lambda.ErrCodeInvalidParameterValueException:
return fmt.Errorf("invalid parameter given: %w", err)
case lambda.ErrCodeServiceException:
return fmt.Errorf("aws lambda service encountered an internal error: %w", err)
case lambda.ErrCodeTooManyRequestsException:
return fmt.Errorf("request throughput limit was exceeded: %w", err)
case lambda.ErrCodeResourceNotFoundException:
return fmt.Errorf("resource not found: %w", err)
case lambda.ErrCodeResourceConflictException:
return fmt.Errorf("resource already existed or in progress: %w", err)
}
}
return fmt.Errorf("unknown error given: %w", err)
}
return nil
}

func (c *client) UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, routingTraffic []VersionTraffic) error {
routingTrafficMap := make(map[string]*float64)
switch len(routingTraffic) {
case 2:
routingTrafficMap[routingTraffic[0].Version] = aws.Float64(precentToPercentage(routingTraffic[0].Percent))
routingTrafficMap[routingTraffic[1].Version] = aws.Float64(precentToPercentage(routingTraffic[1].Percent))
case 1:
routingTrafficMap[routingTraffic[0].Version] = aws.Float64(precentToPercentage(routingTraffic[0].Percent))
default:
return fmt.Errorf("invalid routing traffic configuration given")
}

input := &lambda.UpdateAliasInput{
FunctionName: aws.String(fm.Spec.Name),
Name: aws.String(defaultAliasName),
RoutingConfig: &lambda.AliasRoutingConfiguration{
AdditionalVersionWeights: routingTrafficMap,
},
}

_, err := c.client.UpdateAliasWithContext(ctx, input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case lambda.ErrCodeInvalidParameterValueException:
return fmt.Errorf("invalid parameter given: %w", err)
case lambda.ErrCodeServiceException:
return fmt.Errorf("aws lambda service encountered an internal error: %w", err)
case lambda.ErrCodeTooManyRequestsException:
return fmt.Errorf("request throughput limit was exceeded: %w", err)
case lambda.ErrCodeResourceNotFoundException:
return fmt.Errorf("resource not found: %w", err)
case lambda.ErrCodeResourceConflictException:
return fmt.Errorf("resource already existed or in progress: %w", err)
}
}
return fmt.Errorf("unknown error given: %w", err)
}
return nil
}

func precentToPercentage(in float64) float64 {
return in / 100.0
}
16 changes: 13 additions & 3 deletions pkg/app/piped/cloudprovider/lambda/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,28 @@ import (
"github.com/pipe-cd/pipe/pkg/config"
)

const DefaultFunctionManifestFilename = "function.yaml"
const defaultFunctionManifestFilename = "function.yaml"

// Client is wrapper of AWS client.
type Client interface {
Apply(ctx context.Context, fm FunctionManifest) error
IsFunctionExist(ctx context.Context, name string) (bool, error)
CreateFunction(ctx context.Context, fm FunctionManifest) error
UpdateFunction(ctx context.Context, fm FunctionManifest) error
PublishFunction(ctx context.Context, fm FunctionManifest) (version string, err error)
GetTrafficConfig(ctx context.Context, fm FunctionManifest) (routingTrafficCfg []VersionTraffic, err error)
CreateTrafficConfig(ctx context.Context, fm FunctionManifest, version string) error
UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, routingTraffic []VersionTraffic) error
}

// Registry holds a pool of aws client wrappers.
type Registry interface {
Client(name string, cfg *config.CloudProviderLambdaConfig, logger *zap.Logger) (Client, error)
}

// LoadFunctionManifest returns FunctionManifest object from a given Function config manifest file.
func LoadFunctionManifest(appDir, functionManifestFilename string) (FunctionManifest, error) {
if functionManifestFilename == "" {
functionManifestFilename = DefaultFunctionManifestFilename
functionManifestFilename = defaultFunctionManifestFilename
}
path := filepath.Join(appDir, functionManifestFilename)
return loadFunctionManifest(path)
Expand Down Expand Up @@ -77,6 +86,7 @@ var defaultRegistry = &registry{
newGroup: &singleflight.Group{},
}

// DefaultRegistry returns a pool of aws clients and a mutex associated with it.
func DefaultRegistry() Registry {
return defaultRegistry
}
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/lambda/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (e *deployExecutor) ensureSync(ctx context.Context) model.StageStatus {
return model.StageStatus_STAGE_FAILURE
}

if !apply(ctx, &e.Input, e.cloudProviderName, e.cloudProviderCfg, fm) {
if !sync(ctx, &e.Input, e.cloudProviderName, e.cloudProviderCfg, fm) {
return model.StageStatus_STAGE_FAILURE
}

Expand Down
62 changes: 59 additions & 3 deletions pkg/app/piped/executor/lambda/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package lambda

import (
"context"
"errors"
"time"

provider "github.com/pipe-cd/pipe/pkg/app/piped/cloudprovider/lambda"
"github.com/pipe-cd/pipe/pkg/app/piped/deploysource"
Expand Down Expand Up @@ -82,16 +84,70 @@ func decideRevisionName(in *executor.Input, fm provider.FunctionManifest, commit
return
}

func apply(ctx context.Context, in *executor.Input, cloudProviderName string, cloudProviderCfg *config.CloudProviderLambdaConfig, fm provider.FunctionManifest) bool {
func sync(ctx context.Context, in *executor.Input, cloudProviderName string, cloudProviderCfg *config.CloudProviderLambdaConfig, fm provider.FunctionManifest) bool {
in.LogPersister.Infof("Start applying the lambda function manifest")
client, err := provider.DefaultRegistry().Client(cloudProviderName, cloudProviderCfg, in.Logger)
if err != nil {
in.LogPersister.Errorf("Unable to create Lambda client for the provider %s: %v", cloudProviderName, err)
return false
}

if err := client.Apply(ctx, fm); err != nil {
in.LogPersister.Errorf("Failed to apply the lambda function manifest (%v)", err)
found, err := client.IsFunctionExist(ctx, fm.Spec.Name)
if err != nil {
in.LogPersister.Errorf("Unable to validate function name %s: %v", fm.Spec.Name, err)
return false
}
if found {
if err := client.UpdateFunction(ctx, fm); err != nil {
in.LogPersister.Errorf("Failed to update lambda function %s: %v", fm.Spec.Name, err)
return false
}
} else {
if err := client.CreateFunction(ctx, fm); err != nil {
in.LogPersister.Errorf("Failed to create lambda function %s: %v", fm.Spec.Name, err)
return false
}
}

// TODO: Using backoff instead of time sleep waiting for a specific duration of time.
// Wait before ready to commit change.
in.LogPersister.Info("Waiting to update lambda function in progress...")
time.Sleep(3 * time.Minute)
Comment on lines +113 to +115
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, the Lambda's CreateFunction/UpdateFunction API call returns right after we make a request to update the Lambda function code/config, but the update process still running internally in Lambda servers. If we make PublishFunction request immediately, the error ResourceConflictException: The operation cannot be performed at this time. will be raised.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? 😢 I think fixed 3 minutes is not safe for all cases, but we don't know how much is good.
So I think in addition to waiting for a fixed amount of time (e.g. 1m) we should also do PublishFunction with a constant backoff.
https://github.com/pipe-cd/pipe/tree/master/pkg/backoff

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/ping

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working on it, but shall we make another PR to fix this separately from this PR 👀

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is ok. Let's add a TODO and do it in another PR.


// Commit version for applied Lambda function.
// Note: via the current docs of [Lambda.PublishVersion](https://docs.aws.amazon.com/sdk-for-go/api/service/lambda/#Lambda.PublishVersion)
// AWS Lambda doesn't publish a version if the function's configuration and code haven't changed since the last version.
// But currently, unchanged revision is able to make publish (versionId++) as usual.
version, err := client.PublishFunction(ctx, fm)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that when users clicked the SYNC button to sync the application without any changes.
In that case, no changes were added on AWS Lambda side and it is ok to publish the function?
I think we may receive a "NoChange" error or something like that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AWS Lambda doesn't publish a version if the function's configuration and code haven't changed since the last version. Use UpdateFunctionCode or UpdateFunctionConfiguration to update the function before publishing a version.

Yes, you guess right, due to the docs of PublishVersion, we have to make some changes on source/configuration of the function to be able to publish a new version of it. So the error returned here for that case is predictable. 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a note about this lack on the AWS documentation.

if err != nil {
in.LogPersister.Errorf("Failed to commit new version for Lambda function %s: %v", fm.Spec.Name, err)
return false
}

_, err = client.GetTrafficConfig(ctx, fm)
// Create Alias on not yet existed.
if errors.Is(err, provider.ErrNotFound) {
if err := client.CreateTrafficConfig(ctx, fm, version); err != nil {
in.LogPersister.Errorf("Failed to create traffic routing for Lambda function %s (version: %s): %v", fm.Spec.Name, version, err)
return false
}
in.LogPersister.Infof("Successfully applied the lambda function manifest")
return true
}
if err != nil {
in.LogPersister.Errorf("Failed to prepare traffic routing for Lambda function %s: %v", fm.Spec.Name, err)
return false
}

// Update 100% traffic to the new lambda version.
routingCfg := []provider.VersionTraffic{
{
Version: version,
Percent: 100,
},
}
if err = client.UpdateTrafficConfig(ctx, fm, routingCfg); err != nil {
in.LogPersister.Errorf("Failed to update traffic routing for Lambda function %s (version: %s): %v", fm.Spec.Name, version, err)
return false
}

Expand Down