Skip to content
Merged
81 changes: 61 additions & 20 deletions pkg/app/piped/cloudprovider/lambda/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,16 @@ func (c *client) PublishFunction(ctx context.Context, fm FunctionManifest) (vers
return
}

// RoutingTrafficConfig presents a map of primary and secondary version traffic for lambda function alias.
type RoutingTrafficConfig map[string]VersionTraffic

// VersionTraffic presents the version, and the percent of traffic that's routed to it.
type VersionTraffic struct {
Version string
Percent float64
}

func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (routingTrafficCfg []VersionTraffic, err error) {
func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (routingTrafficCfg RoutingTrafficConfig, err error) {
input := &lambda.GetAliasInput{
FunctionName: aws.String(fm.Spec.Name),
Name: aws.String(defaultAliasName),
Expand All @@ -226,17 +229,51 @@ func (c *client) GetTrafficConfig(ctx context.Context, fm FunctionManifest) (rou
return
}

// TODO: Fix Lambda.AliasConfiguration.RoutingConfig nil value.
routingTrafficCfg = make(map[string]VersionTraffic)
/* The current return value from GetAlias as below
{
"AliasArn": "arn:aws:lambda:ap-northeast-1:769161735124:function:SimpleCanaryFunction:Service",
"Name": "Service",
"FunctionVersion": "1",
"Description": "",
"RoutingConfig": {
"AdditionalVersionWeights": {
"3": 0.9
}
},
"RevisionId": "fe08805f-9851-44fc-9a79-6e086aefc290"
}
Note:
- In case RoutingConfig is nil, this mean 100% of traffic is handled by version represented by FunctionVersion value (PRIMARY version).
- In case RoutingConfig is not nil, RoutingConfig.AdditionalVersionWeights is expected to have ONLY ONE key/value pair
which presents the SECONDARY version handling traffic (represented by the value of the pair).
in short
_ version: 1 - FunctionVersion (the PRIMARY) handles (1 - 0.9) percentage of current traffic.
_ version: 3 - AdditionalVersionWeights key (the SECONDARY) handles 0.9 percentage of current traffic.
*/
// In case RoutingConfig is nil, 100 percent of current traffic is handled by FunctionVersion version.
if cfg.RoutingConfig == nil {
routingTrafficCfg["primary"] = VersionTraffic{
Version: *cfg.FunctionVersion,
Percent: 100,
}
return
}
routingTrafficCfg = make([]VersionTraffic, 0, len(cfg.RoutingConfig.AdditionalVersionWeights))
// In case RoutingConfig is provided, FunctionVersion value represents the primary version while
// RoutingConfig.AdditionalVersionWeights key represents the secondary version.
var newerVersionTraffic float64
for version := range cfg.RoutingConfig.AdditionalVersionWeights {
routingTrafficCfg = append(routingTrafficCfg, VersionTraffic{
newerVersionTraffic = percentageToPercent(*cfg.RoutingConfig.AdditionalVersionWeights[version])
routingTrafficCfg["secondary"] = VersionTraffic{
Version: version,
Percent: *cfg.RoutingConfig.AdditionalVersionWeights[version],
})
Percent: newerVersionTraffic,
}
}
routingTrafficCfg["primary"] = VersionTraffic{
Version: *cfg.FunctionVersion,
Percent: 100 - newerVersionTraffic,
}

return
}

Expand Down Expand Up @@ -267,24 +304,24 @@ func (c *client) CreateTrafficConfig(ctx context.Context, fm FunctionManifest, v
return nil
}

func (c *client) UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, routingTraffic []VersionTraffic) error {
routingTrafficMap := make(map[string]*float64)
switch len(routingTraffic) {
case 2:
routingTrafficMap[routingTraffic[0].Version] = aws.Float64(precentToPercentage(routingTraffic[0].Percent))
routingTrafficMap[routingTraffic[1].Version] = aws.Float64(precentToPercentage(routingTraffic[1].Percent))
case 1:
routingTrafficMap[routingTraffic[0].Version] = aws.Float64(precentToPercentage(routingTraffic[0].Percent))
default:
return fmt.Errorf("invalid routing traffic configuration given")
func (c *client) UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, routingTraffic RoutingTrafficConfig) error {
primary, ok := routingTraffic["primary"]
if !ok {
return fmt.Errorf("invalid routing traffic configuration given: primary version not found")
}

input := &lambda.UpdateAliasInput{
FunctionName: aws.String(fm.Spec.Name),
Name: aws.String(defaultAliasName),
RoutingConfig: &lambda.AliasRoutingConfiguration{
FunctionName: aws.String(fm.Spec.Name),
Name: aws.String(defaultAliasName),
FunctionVersion: aws.String(primary.Version),
}

if secondary, ok := routingTraffic["secondary"]; ok {
routingTrafficMap := make(map[string]*float64)
routingTrafficMap[secondary.Version] = aws.Float64(precentToPercentage(secondary.Percent))
input.RoutingConfig = &lambda.AliasRoutingConfiguration{
AdditionalVersionWeights: routingTrafficMap,
},
}
}

_, err := c.client.UpdateAliasWithContext(ctx, input)
Expand All @@ -311,3 +348,7 @@ func (c *client) UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, r
func precentToPercentage(in float64) float64 {
return in / 100.0
}

func percentageToPercent(in float64) float64 {
return in * 100
}
4 changes: 2 additions & 2 deletions pkg/app/piped/cloudprovider/lambda/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ type Client interface {
CreateFunction(ctx context.Context, fm FunctionManifest) error
UpdateFunction(ctx context.Context, fm FunctionManifest) error
PublishFunction(ctx context.Context, fm FunctionManifest) (version string, err error)
GetTrafficConfig(ctx context.Context, fm FunctionManifest) (routingTrafficCfg []VersionTraffic, err error)
GetTrafficConfig(ctx context.Context, fm FunctionManifest) (routingTrafficCfg RoutingTrafficConfig, err error)
CreateTrafficConfig(ctx context.Context, fm FunctionManifest, version string) error
UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, routingTraffic []VersionTraffic) error
UpdateTrafficConfig(ctx context.Context, fm FunctionManifest, routingTraffic RoutingTrafficConfig) error
}

// Registry holds a pool of aws client wrappers.
Expand Down
13 changes: 12 additions & 1 deletion pkg/app/piped/executor/lambda/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
Expand All @@ -17,3 +17,14 @@ go_library(
"//pkg/model:go_default_library",
],
)

go_test(
name = "go_default_test",
size = "small",
srcs = ["lambda_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/app/piped/cloudprovider/lambda:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
],
)
29 changes: 26 additions & 3 deletions pkg/app/piped/executor/lambda/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
status = e.ensureSync(ctx)
case model.StageLambdaPromote:
status = e.ensurePromote(ctx)
case model.StageLambdaCanaryRollout:
status = e.ensureRollout(ctx)
default:
e.LogPersister.Errorf("Unsupported stage %s for lambda application", e.Stage.Name)
return model.StageStatus_STAGE_FAILURE
Expand All @@ -84,7 +86,28 @@ func (e *deployExecutor) ensureSync(ctx context.Context) model.StageStatus {
return model.StageStatus_STAGE_SUCCESS
}

func (e *deployExecutor) ensurePromote(_ context.Context) model.StageStatus {
e.LogPersister.Error("This stage is not implemented yet")
return model.StageStatus_STAGE_FAILURE
func (e *deployExecutor) ensurePromote(ctx context.Context) model.StageStatus {
fm, ok := loadFunctionManifest(&e.Input, e.deployCfg.Input.FunctionManifestFile, e.deploySource)
if !ok {
return model.StageStatus_STAGE_FAILURE
}

if !promote(ctx, &e.Input, e.cloudProviderName, e.cloudProviderCfg, fm) {
return model.StageStatus_STAGE_FAILURE
}

return model.StageStatus_STAGE_SUCCESS
}

func (e *deployExecutor) ensureRollout(ctx context.Context) model.StageStatus {
fm, ok := loadFunctionManifest(&e.Input, e.deployCfg.Input.FunctionManifestFile, e.deploySource)
if !ok {
return model.StageStatus_STAGE_FAILURE
}

if !rollout(ctx, &e.Input, e.cloudProviderName, e.cloudProviderCfg, fm) {
return model.StageStatus_STAGE_FAILURE
}

return model.StageStatus_STAGE_SUCCESS
}
Loading