Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions pkg/app/piped/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,9 @@ func (p *piped) run(ctx context.Context, t cli.Telemetry) (runErr error) {
}

// Start running deployment trigger.
var lastTriggeredCommitGetter trigger.LastTriggeredCommitGetter
{
t := trigger.NewTrigger(
tr, err := trigger.NewTrigger(
apiClient,
gitClient,
applicationLister,
Expand All @@ -353,30 +354,38 @@ func (p *piped) run(ctx context.Context, t cli.Telemetry) (runErr error) {
p.gracePeriod,
t.Logger,
)
if err != nil {
t.Logger.Error("failed to initialize trigger", zap.Error(err))
return err
}
lastTriggeredCommitGetter = tr.GetLastTriggeredCommitGetter()

group.Go(func() error {
return t.Run(ctx)
return tr.Run(ctx)
})
}

// Start running event watcher.
{
// Start running event watcher.
t := eventwatcher.NewWatcher(
w := eventwatcher.NewWatcher(
cfg,
eventGetter,
gitClient,
t.Logger,
)
group.Go(func() error {
return t.Run(ctx)
return w.Run(ctx)
})
}

// Start running planpreview handler.
if p.enablePlanPreview {
// Start running planpreview handler.
h := planpreview.NewHandler(
gitClient,
commandLister,
applicationLister,
environmentStore,
lastTriggeredCommitGetter,
cfg,
planpreview.WithLogger(t.Logger),
)
Expand Down
1 change: 1 addition & 0 deletions pkg/app/piped/planpreview/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
importpath = "github.com/pipe-cd/pipe/pkg/app/piped/planpreview",
visibility = ["//visibility:public"],
deps = [
"//pkg/app/piped/trigger:go_default_library",
"//pkg/config:go_default_library",
"//pkg/git:go_default_library",
"//pkg/model:go_default_library",
Expand Down
94 changes: 82 additions & 12 deletions pkg/app/piped/planpreview/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,35 @@ import (

"go.uber.org/zap"

"github.com/pipe-cd/pipe/pkg/app/piped/trigger"
"github.com/pipe-cd/pipe/pkg/config"
"github.com/pipe-cd/pipe/pkg/git"
"github.com/pipe-cd/pipe/pkg/model"
)

type lastTriggeredCommitGetter interface {
Get(ctx context.Context, applicationID string) (string, error)
}

type Builder interface {
Build(ctx context.Context, id string, cmd model.Command_BuildPlanPreview) ([]*model.ApplicationPlanPreviewResult, error)
}

type builder struct {
gitClient gitClient
applicationLister applicationLister
environmentGetter environmentGetter
commitGetter lastTriggeredCommitGetter
config *config.PipedSpec
logger *zap.Logger
}

func newBuilder(gc gitClient, al applicationLister, cfg *config.PipedSpec, logger *zap.Logger) *builder {
func newBuilder(gc gitClient, al applicationLister, eg environmentGetter, cg lastTriggeredCommitGetter, cfg *config.PipedSpec, logger *zap.Logger) *builder {
return &builder{
gitClient: gc,
applicationLister: al,
environmentGetter: eg,
commitGetter: cg,
config: cfg,
logger: logger.Named("planpreview-builder"),
}
Expand All @@ -47,35 +57,95 @@ func newBuilder(gc gitClient, al applicationLister, cfg *config.PipedSpec, logge
func (b *builder) Build(ctx context.Context, id string, cmd model.Command_BuildPlanPreview) ([]*model.ApplicationPlanPreviewResult, error) {
b.logger.Info(fmt.Sprintf("start building planpreview result for command %s", id))

// Find the registered repository in Piped config and validate the command's payload against it.
repoCfg, ok := b.config.GetRepository(cmd.RepositoryId)
if !ok {
return nil, fmt.Errorf("repository %s was not found in Piped config", cmd.RepositoryId)
}
if repoCfg.Branch != cmd.BaseBranch {
return nil, fmt.Errorf("base branch repository %s was not correct, requested %s, expected %s", cmd.RepositoryId, cmd.BaseBranch, repoCfg.Branch)
return nil, fmt.Errorf("base branch repository %s was not matched, requested %s, expected %s", cmd.RepositoryId, cmd.BaseBranch, repoCfg.Branch)
}

// List all applications that belong to this Piped
// and are placed in the given repository.
apps := b.listApplications(repoCfg)
if len(apps) == 0 {
return nil, nil
}

// Clone the source code and checkout to the given branch, commit.
repo, err := b.gitClient.Clone(ctx, repoCfg.RepoID, repoCfg.Remote, repoCfg.Branch, "")
if err != nil {
return nil, fmt.Errorf("failed to clone git repository %s", cmd.RepositoryId)
}
defer repo.Clean()

// TODO: Implement planpreview builder.
// 1. Fetch the source code at the head commit.
// 2. Determine the list of applications that will be triggered.
// - Based on the changed files between 2 commits: head commit and mostRecentlyTriggeredCommit
// 3. For each application:
// 3.1. Start a builder to check what/why strategy will be used
// 3.2. Check what resources should be added, deleted and modified
// - Terraform app: used terraform plan command
// - Kubernetes app: calculate the diff of resources at head commit and mostRecentlySuccessfulCommit
return nil, fmt.Errorf("Not Implemented")
if err := repo.Checkout(ctx, cmd.HeadCommit); err != nil {
return nil, fmt.Errorf("failed to checkout the head commit %s: %w", cmd.HeadCommit, err)
}

// Compared to the total number of applications,
// the number of applications that should be triggered will be very smaller
// therefore we do not explicitly specify the capacity for these slices.
triggerApps := make([]*model.Application, 0)
results := make([]*model.ApplicationPlanPreviewResult, 0)

d := trigger.NewDeterminer(repo, cmd.HeadCommit, b.commitGetter, b.logger)

for _, app := range apps {
shouldTrigger, err := d.ShouldTrigger(ctx, app)
if err != nil {
// We only need the environment name
// so the returned error can be ignorable.
var envName string
if env, err := b.environmentGetter.Get(ctx, app.EnvId); err == nil {
envName = env.Name
}

r := model.MakeApplicationPlanPreviewResult(*app, envName)
r.Error = fmt.Sprintf("Failed while determining the application should be triggered or not, %v", err)
results = append(results, r)
continue
}

if shouldTrigger {
triggerApps = append(triggerApps, app)
}
}

// All triggered applications will be passed to plan.
for _, app := range triggerApps {
// We only need the environment name
// so the returned error can be ignorable.
var envName string
if env, err := b.environmentGetter.Get(ctx, app.EnvId); err == nil {
envName = env.Name
}

r := model.MakeApplicationPlanPreviewResult(*app, envName)
results = append(results, r)

strategy, changes, err := b.plan(repo, app, cmd)
if err != nil {
r.Error = fmt.Sprintf("Failed while planning, %v", err)
continue
}

r.SyncStrategy = strategy
r.Changes = changes
}

return results, nil
}

func (b *builder) plan(repo git.Repo, app *model.Application, cmd model.Command_BuildPlanPreview) (model.SyncStrategy, []byte, error) {
// TODO: Implement planpreview plan.
// 1. Start a planner to check what/why strategy will be used
// 2. Check what resources should be added, deleted and modified
// - Terraform app: used terraform plan command
// - Kubernetes app: calculate the diff of resources at head commit and mostRecentlySuccessfulCommit

return model.SyncStrategy_QUICK_SYNC, []byte("NOT IMPLEMENTED"), nil
}

func (b *builder) listApplications(repo config.PipedRepository) []*model.Application {
Expand Down
9 changes: 7 additions & 2 deletions pkg/app/piped/planpreview/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ type applicationLister interface {
List() []*model.Application
}

type environmentGetter interface {
Get(ctx context.Context, id string) (*model.Environment, error)
}

type commandLister interface {
ListBuildPlanPreviewCommands() []model.ReportableCommand
}
Expand All @@ -98,7 +102,7 @@ type Handler struct {
logger *zap.Logger
}

func NewHandler(gc gitClient, cl commandLister, al applicationLister, cfg *config.PipedSpec, opts ...Option) *Handler {
func NewHandler(gc gitClient, cl commandLister, al applicationLister, eg environmentGetter, cg lastTriggeredCommitGetter, cfg *config.PipedSpec, opts ...Option) *Handler {
opt := &options{
workerNum: defaultWorkerNum,
commandQueueBufferSize: defaultCommandQueueBufferSize,
Expand All @@ -118,7 +122,7 @@ func NewHandler(gc gitClient, cl commandLister, al applicationLister, cfg *confi
logger: opt.logger.Named("planpreview-handler"),
}
h.builderFactory = func() Builder {
return newBuilder(gc, al, cfg, h.logger)
return newBuilder(gc, al, eg, cg, cfg, h.logger)
}

return h
Expand Down Expand Up @@ -197,6 +201,7 @@ func (h *Handler) enqueueNewCommands(ctx context.Context) {
func (h *Handler) handleCommand(ctx context.Context, cmd model.ReportableCommand) {
result := &model.PlanPreviewCommandResult{
CommandId: cmd.Id,
PipedId: cmd.PipedId,
}

reportError := func(err error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/planpreview/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestHandler(t *testing.T) {
var mu sync.Mutex
var wg sync.WaitGroup

handler := NewHandler(nil, cl, nil, nil,
handler := NewHandler(nil, cl, nil, nil, nil, nil,
WithWorkerNum(2),
// Use a long interval because we will directly call enqueueNewCommands function in this test.
WithCommandCheckInterval(time.Hour),
Expand Down
6 changes: 5 additions & 1 deletion pkg/app/piped/trigger/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"cache.go",
"deployment.go",
"determiner.go",
"trigger.go",
],
importpath = "github.com/pipe-cd/pipe/pkg/app/piped/trigger",
visibility = ["//visibility:public"],
deps = [
"//pkg/app/api/service/pipedservice:go_default_library",
"//pkg/cache:go_default_library",
"//pkg/cache/memorycache:go_default_library",
"//pkg/config:go_default_library",
"//pkg/filematcher:go_default_library",
"//pkg/git:go_default_library",
Expand All @@ -25,7 +29,7 @@ go_library(
go_test(
name = "go_default_test",
size = "small",
srcs = ["trigger_test.go"],
srcs = ["determiner_test.go"],
embed = [":go_default_library"],
deps = ["@com_github_stretchr_testify//assert:go_default_library"],
)
79 changes: 79 additions & 0 deletions pkg/app/piped/trigger/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2021 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trigger

import (
"context"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pipe-cd/pipe/pkg/app/api/service/pipedservice"
"github.com/pipe-cd/pipe/pkg/cache"
"github.com/pipe-cd/pipe/pkg/model"
)

type lastTriggeredCommitStore struct {
apiClient apiClient
cache cache.Cache
}

func (s *lastTriggeredCommitStore) Get(ctx context.Context, applicationID string) (string, error) {
// Firstly, find from memory cache.
commit, err := s.cache.Get(applicationID)
if err == nil {
return commit.(string), nil
}

// No data in memorycache so we have to cost a RPC call to get from control-plane.
deploy, err := s.getLastTriggeredDeployment(ctx, applicationID)
switch {
case err == nil:
return deploy.Trigger.Commit.Hash, nil

case status.Code(err) == codes.NotFound:
// It seems this application has not been deployed anytime.
return "", nil

default:
return "", err
}
}

func (s *lastTriggeredCommitStore) Put(applicationID, commit string) error {
return s.cache.Put(applicationID, commit)
}

func (s *lastTriggeredCommitStore) getLastTriggeredDeployment(ctx context.Context, applicationID string) (*model.ApplicationDeploymentReference, error) {
var (
err error
resp *pipedservice.GetApplicationMostRecentDeploymentResponse
retry = pipedservice.NewRetry(3)
req = &pipedservice.GetApplicationMostRecentDeploymentRequest{
ApplicationId: applicationID,
Status: model.DeploymentStatus_DEPLOYMENT_PENDING,
}
)

for retry.WaitNext(ctx) {
if resp, err = s.apiClient.GetApplicationMostRecentDeployment(ctx, req); err == nil {
return resp.Deployment, nil
}
if !pipedservice.Retriable(err) {
return nil, err
}
}
return nil, err
}
Loading