diff --git a/pkg/app/piped/cmd/piped/piped.go b/pkg/app/piped/cmd/piped/piped.go index 9c320c37d5..fbcf0f4e7d 100644 --- a/pkg/app/piped/cmd/piped/piped.go +++ b/pkg/app/piped/cmd/piped/piped.go @@ -341,8 +341,9 @@ func (p *piped) run(ctx context.Context, t cli.Telemetry) (runErr error) { } // Start running deployment trigger. + var lastTriggeredCommitGetter trigger.LastTriggeredCommitGetter { - t := trigger.NewTrigger( + tr, err := trigger.NewTrigger( apiClient, gitClient, applicationLister, @@ -353,30 +354,38 @@ func (p *piped) run(ctx context.Context, t cli.Telemetry) (runErr error) { p.gracePeriod, t.Logger, ) + if err != nil { + t.Logger.Error("failed to initialize trigger", zap.Error(err)) + return err + } + lastTriggeredCommitGetter = tr.GetLastTriggeredCommitGetter() + group.Go(func() error { - return t.Run(ctx) + return tr.Run(ctx) }) } + // Start running event watcher. { - // Start running event watcher. - t := eventwatcher.NewWatcher( + w := eventwatcher.NewWatcher( cfg, eventGetter, gitClient, t.Logger, ) group.Go(func() error { - return t.Run(ctx) + return w.Run(ctx) }) } + // Start running planpreview handler. if p.enablePlanPreview { - // Start running planpreview handler. h := planpreview.NewHandler( gitClient, commandLister, applicationLister, + environmentStore, + lastTriggeredCommitGetter, cfg, planpreview.WithLogger(t.Logger), ) diff --git a/pkg/app/piped/planpreview/BUILD.bazel b/pkg/app/piped/planpreview/BUILD.bazel index 5827d1d77c..65c1938b5c 100644 --- a/pkg/app/piped/planpreview/BUILD.bazel +++ b/pkg/app/piped/planpreview/BUILD.bazel @@ -9,6 +9,7 @@ go_library( importpath = "github.com/pipe-cd/pipe/pkg/app/piped/planpreview", visibility = ["//visibility:public"], deps = [ + "//pkg/app/piped/trigger:go_default_library", "//pkg/config:go_default_library", "//pkg/git:go_default_library", "//pkg/model:go_default_library", diff --git a/pkg/app/piped/planpreview/builder.go b/pkg/app/piped/planpreview/builder.go index 7ccca7c1cc..eb47e4b8c0 100644 --- a/pkg/app/piped/planpreview/builder.go +++ b/pkg/app/piped/planpreview/builder.go @@ -20,10 +20,16 @@ import ( "go.uber.org/zap" + "github.com/pipe-cd/pipe/pkg/app/piped/trigger" "github.com/pipe-cd/pipe/pkg/config" + "github.com/pipe-cd/pipe/pkg/git" "github.com/pipe-cd/pipe/pkg/model" ) +type lastTriggeredCommitGetter interface { + Get(ctx context.Context, applicationID string) (string, error) +} + type Builder interface { Build(ctx context.Context, id string, cmd model.Command_BuildPlanPreview) ([]*model.ApplicationPlanPreviewResult, error) } @@ -31,14 +37,18 @@ type Builder interface { type builder struct { gitClient gitClient applicationLister applicationLister + environmentGetter environmentGetter + commitGetter lastTriggeredCommitGetter config *config.PipedSpec logger *zap.Logger } -func newBuilder(gc gitClient, al applicationLister, cfg *config.PipedSpec, logger *zap.Logger) *builder { +func newBuilder(gc gitClient, al applicationLister, eg environmentGetter, cg lastTriggeredCommitGetter, cfg *config.PipedSpec, logger *zap.Logger) *builder { return &builder{ gitClient: gc, applicationLister: al, + environmentGetter: eg, + commitGetter: cg, config: cfg, logger: logger.Named("planpreview-builder"), } @@ -47,35 +57,95 @@ func newBuilder(gc gitClient, al applicationLister, cfg *config.PipedSpec, logge func (b *builder) Build(ctx context.Context, id string, cmd model.Command_BuildPlanPreview) ([]*model.ApplicationPlanPreviewResult, error) { b.logger.Info(fmt.Sprintf("start building planpreview result for command %s", id)) + // Find the registered repository in Piped config and validate the command's payload against it. repoCfg, ok := b.config.GetRepository(cmd.RepositoryId) if !ok { return nil, fmt.Errorf("repository %s was not found in Piped config", cmd.RepositoryId) } if repoCfg.Branch != cmd.BaseBranch { - return nil, fmt.Errorf("base branch repository %s was not correct, requested %s, expected %s", cmd.RepositoryId, cmd.BaseBranch, repoCfg.Branch) + return nil, fmt.Errorf("base branch repository %s was not matched, requested %s, expected %s", cmd.RepositoryId, cmd.BaseBranch, repoCfg.Branch) } + // List all applications that belong to this Piped + // and are placed in the given repository. apps := b.listApplications(repoCfg) if len(apps) == 0 { return nil, nil } + // Clone the source code and checkout to the given branch, commit. repo, err := b.gitClient.Clone(ctx, repoCfg.RepoID, repoCfg.Remote, repoCfg.Branch, "") if err != nil { return nil, fmt.Errorf("failed to clone git repository %s", cmd.RepositoryId) } defer repo.Clean() - // TODO: Implement planpreview builder. - // 1. Fetch the source code at the head commit. - // 2. Determine the list of applications that will be triggered. - // - Based on the changed files between 2 commits: head commit and mostRecentlyTriggeredCommit - // 3. For each application: - // 3.1. Start a builder to check what/why strategy will be used - // 3.2. Check what resources should be added, deleted and modified - // - Terraform app: used terraform plan command - // - Kubernetes app: calculate the diff of resources at head commit and mostRecentlySuccessfulCommit - return nil, fmt.Errorf("Not Implemented") + if err := repo.Checkout(ctx, cmd.HeadCommit); err != nil { + return nil, fmt.Errorf("failed to checkout the head commit %s: %w", cmd.HeadCommit, err) + } + + // Compared to the total number of applications, + // the number of applications that should be triggered will be very smaller + // therefore we do not explicitly specify the capacity for these slices. + triggerApps := make([]*model.Application, 0) + results := make([]*model.ApplicationPlanPreviewResult, 0) + + d := trigger.NewDeterminer(repo, cmd.HeadCommit, b.commitGetter, b.logger) + + for _, app := range apps { + shouldTrigger, err := d.ShouldTrigger(ctx, app) + if err != nil { + // We only need the environment name + // so the returned error can be ignorable. + var envName string + if env, err := b.environmentGetter.Get(ctx, app.EnvId); err == nil { + envName = env.Name + } + + r := model.MakeApplicationPlanPreviewResult(*app, envName) + r.Error = fmt.Sprintf("Failed while determining the application should be triggered or not, %v", err) + results = append(results, r) + continue + } + + if shouldTrigger { + triggerApps = append(triggerApps, app) + } + } + + // All triggered applications will be passed to plan. + for _, app := range triggerApps { + // We only need the environment name + // so the returned error can be ignorable. + var envName string + if env, err := b.environmentGetter.Get(ctx, app.EnvId); err == nil { + envName = env.Name + } + + r := model.MakeApplicationPlanPreviewResult(*app, envName) + results = append(results, r) + + strategy, changes, err := b.plan(repo, app, cmd) + if err != nil { + r.Error = fmt.Sprintf("Failed while planning, %v", err) + continue + } + + r.SyncStrategy = strategy + r.Changes = changes + } + + return results, nil +} + +func (b *builder) plan(repo git.Repo, app *model.Application, cmd model.Command_BuildPlanPreview) (model.SyncStrategy, []byte, error) { + // TODO: Implement planpreview plan. + // 1. Start a planner to check what/why strategy will be used + // 2. Check what resources should be added, deleted and modified + // - Terraform app: used terraform plan command + // - Kubernetes app: calculate the diff of resources at head commit and mostRecentlySuccessfulCommit + + return model.SyncStrategy_QUICK_SYNC, []byte("NOT IMPLEMENTED"), nil } func (b *builder) listApplications(repo config.PipedRepository) []*model.Application { diff --git a/pkg/app/piped/planpreview/handler.go b/pkg/app/piped/planpreview/handler.go index 78d102c3a3..03933ccb0e 100644 --- a/pkg/app/piped/planpreview/handler.go +++ b/pkg/app/piped/planpreview/handler.go @@ -82,6 +82,10 @@ type applicationLister interface { List() []*model.Application } +type environmentGetter interface { + Get(ctx context.Context, id string) (*model.Environment, error) +} + type commandLister interface { ListBuildPlanPreviewCommands() []model.ReportableCommand } @@ -98,7 +102,7 @@ type Handler struct { logger *zap.Logger } -func NewHandler(gc gitClient, cl commandLister, al applicationLister, cfg *config.PipedSpec, opts ...Option) *Handler { +func NewHandler(gc gitClient, cl commandLister, al applicationLister, eg environmentGetter, cg lastTriggeredCommitGetter, cfg *config.PipedSpec, opts ...Option) *Handler { opt := &options{ workerNum: defaultWorkerNum, commandQueueBufferSize: defaultCommandQueueBufferSize, @@ -118,7 +122,7 @@ func NewHandler(gc gitClient, cl commandLister, al applicationLister, cfg *confi logger: opt.logger.Named("planpreview-handler"), } h.builderFactory = func() Builder { - return newBuilder(gc, al, cfg, h.logger) + return newBuilder(gc, al, eg, cg, cfg, h.logger) } return h @@ -197,6 +201,7 @@ func (h *Handler) enqueueNewCommands(ctx context.Context) { func (h *Handler) handleCommand(ctx context.Context, cmd model.ReportableCommand) { result := &model.PlanPreviewCommandResult{ CommandId: cmd.Id, + PipedId: cmd.PipedId, } reportError := func(err error) { diff --git a/pkg/app/piped/planpreview/handler_test.go b/pkg/app/piped/planpreview/handler_test.go index 86417ccd0e..99d818aa00 100644 --- a/pkg/app/piped/planpreview/handler_test.go +++ b/pkg/app/piped/planpreview/handler_test.go @@ -61,7 +61,7 @@ func TestHandler(t *testing.T) { var mu sync.Mutex var wg sync.WaitGroup - handler := NewHandler(nil, cl, nil, nil, + handler := NewHandler(nil, cl, nil, nil, nil, nil, WithWorkerNum(2), // Use a long interval because we will directly call enqueueNewCommands function in this test. WithCommandCheckInterval(time.Hour), diff --git a/pkg/app/piped/trigger/BUILD.bazel b/pkg/app/piped/trigger/BUILD.bazel index 0af9a3739c..28126f1dca 100644 --- a/pkg/app/piped/trigger/BUILD.bazel +++ b/pkg/app/piped/trigger/BUILD.bazel @@ -3,13 +3,17 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "cache.go", "deployment.go", + "determiner.go", "trigger.go", ], importpath = "github.com/pipe-cd/pipe/pkg/app/piped/trigger", visibility = ["//visibility:public"], deps = [ "//pkg/app/api/service/pipedservice:go_default_library", + "//pkg/cache:go_default_library", + "//pkg/cache/memorycache:go_default_library", "//pkg/config:go_default_library", "//pkg/filematcher:go_default_library", "//pkg/git:go_default_library", @@ -25,7 +29,7 @@ go_library( go_test( name = "go_default_test", size = "small", - srcs = ["trigger_test.go"], + srcs = ["determiner_test.go"], embed = [":go_default_library"], deps = ["@com_github_stretchr_testify//assert:go_default_library"], ) diff --git a/pkg/app/piped/trigger/cache.go b/pkg/app/piped/trigger/cache.go new file mode 100644 index 0000000000..b290a92694 --- /dev/null +++ b/pkg/app/piped/trigger/cache.go @@ -0,0 +1,79 @@ +// Copyright 2021 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trigger + +import ( + "context" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/pipe-cd/pipe/pkg/app/api/service/pipedservice" + "github.com/pipe-cd/pipe/pkg/cache" + "github.com/pipe-cd/pipe/pkg/model" +) + +type lastTriggeredCommitStore struct { + apiClient apiClient + cache cache.Cache +} + +func (s *lastTriggeredCommitStore) Get(ctx context.Context, applicationID string) (string, error) { + // Firstly, find from memory cache. + commit, err := s.cache.Get(applicationID) + if err == nil { + return commit.(string), nil + } + + // No data in memorycache so we have to cost a RPC call to get from control-plane. + deploy, err := s.getLastTriggeredDeployment(ctx, applicationID) + switch { + case err == nil: + return deploy.Trigger.Commit.Hash, nil + + case status.Code(err) == codes.NotFound: + // It seems this application has not been deployed anytime. + return "", nil + + default: + return "", err + } +} + +func (s *lastTriggeredCommitStore) Put(applicationID, commit string) error { + return s.cache.Put(applicationID, commit) +} + +func (s *lastTriggeredCommitStore) getLastTriggeredDeployment(ctx context.Context, applicationID string) (*model.ApplicationDeploymentReference, error) { + var ( + err error + resp *pipedservice.GetApplicationMostRecentDeploymentResponse + retry = pipedservice.NewRetry(3) + req = &pipedservice.GetApplicationMostRecentDeploymentRequest{ + ApplicationId: applicationID, + Status: model.DeploymentStatus_DEPLOYMENT_PENDING, + } + ) + + for retry.WaitNext(ctx) { + if resp, err = s.apiClient.GetApplicationMostRecentDeployment(ctx, req); err == nil { + return resp.Deployment, nil + } + if !pipedservice.Retriable(err) { + return nil, err + } + } + return nil, err +} diff --git a/pkg/app/piped/trigger/determiner.go b/pkg/app/piped/trigger/determiner.go new file mode 100644 index 0000000000..9399ceb88f --- /dev/null +++ b/pkg/app/piped/trigger/determiner.go @@ -0,0 +1,148 @@ +// Copyright 2021 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trigger + +import ( + "context" + "fmt" + "path/filepath" + "strings" + + "go.uber.org/zap" + + "github.com/pipe-cd/pipe/pkg/config" + "github.com/pipe-cd/pipe/pkg/filematcher" + "github.com/pipe-cd/pipe/pkg/git" + "github.com/pipe-cd/pipe/pkg/model" +) + +type LastTriggeredCommitGetter interface { + Get(ctx context.Context, applicationID string) (string, error) +} + +type Determiner struct { + repo git.Repo + targetCommit string + commitGetter LastTriggeredCommitGetter + logger *zap.Logger +} + +func NewDeterminer(repo git.Repo, targetCommit string, cg LastTriggeredCommitGetter, logger *zap.Logger) *Determiner { + return &Determiner{ + repo: repo, + targetCommit: targetCommit, + commitGetter: cg, + logger: logger.Named("determiner"), + } +} + +// ShouldTrigger decides whether a given application should be triggered or not. +func (d *Determiner) ShouldTrigger(ctx context.Context, app *model.Application) (bool, error) { + logger := d.logger.With( + zap.String("app", app.Name), + zap.String("app-id", app.Id), + zap.String("target-commit", d.targetCommit), + ) + + preCommit, err := d.commitGetter.Get(ctx, app.Id) + if err != nil { + logger.Error("failed to get last triggered commit", zap.Error(err)) + return false, err + } + + // There is no previous deployment so we don't need to check anymore. + // Just do it. + if preCommit == "" { + logger.Info("no previously triggered deployment was found") + return true, nil + } + + // Check whether the most recently applied one is the target commit or not. + // If so, nothing to do for this time. + if preCommit == d.targetCommit { + logger.Info(fmt.Sprintf("no update to sync for application, hash: %s", d.targetCommit)) + return false, nil + } + + // List the changed files between those two commits and + // determine whether this application was touch by those changed files. + changedFiles, err := d.repo.ChangedFiles(ctx, preCommit, d.targetCommit) + if err != nil { + return false, err + } + + deployConfig, err := loadDeploymentConfiguration(d.repo.GetPath(), app) + if err != nil { + return false, err + } + + touched, err := isTouchedByChangedFiles(app.GitPath.Path, deployConfig.TriggerPaths, changedFiles) + if err != nil { + return false, err + } + + if !touched { + logger.Info("application was not touched by any new commits", zap.String("last-triggered-commit", preCommit)) + return false, nil + } + + return true, nil +} + +func loadDeploymentConfiguration(repoPath string, app *model.Application) (*config.GenericDeploymentSpec, error) { + path := filepath.Join(repoPath, app.GitPath.GetDeploymentConfigFilePath()) + cfg, err := config.LoadFromYAML(path) + if err != nil { + return nil, err + } + if appKind, ok := config.ToApplicationKind(cfg.Kind); !ok || appKind != app.Kind { + return nil, fmt.Errorf("invalid application kind in the deployment config file, got: %s, expected: %s", appKind, app.Kind) + } + + spec, ok := cfg.GetGenericDeployment() + if !ok { + return nil, fmt.Errorf("unsupported application kind: %s", app.Kind) + } + + return &spec, nil +} + +func isTouchedByChangedFiles(appDir string, changes []string, changedFiles []string) (bool, error) { + if !strings.HasSuffix(appDir, "/") { + appDir += "/" + } + + // If any files inside the application directory was changed + // this application is considered as touched. + for _, cf := range changedFiles { + if ok := strings.HasPrefix(cf, appDir); ok { + return true, nil + } + } + + // If any changed files matches the specified "changes" + // this application is consided as touched too. + for _, change := range changes { + matcher, err := filematcher.NewPatternMatcher([]string{change}) + if err != nil { + return false, err + } + if matcher.MatchesAny(changedFiles) { + return true, nil + } + } + + return false, nil +} diff --git a/pkg/app/piped/trigger/trigger_test.go b/pkg/app/piped/trigger/determiner_test.go similarity index 97% rename from pkg/app/piped/trigger/trigger_test.go rename to pkg/app/piped/trigger/determiner_test.go index c9eef55151..8ad5d19e9c 100644 --- a/pkg/app/piped/trigger/trigger_test.go +++ b/pkg/app/piped/trigger/determiner_test.go @@ -1,4 +1,4 @@ -// Copyright 2020 The PipeCD Authors. +// Copyright 2021 The PipeCD Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/app/piped/trigger/trigger.go b/pkg/app/piped/trigger/trigger.go index ce33a68426..7b3831a75c 100644 --- a/pkg/app/piped/trigger/trigger.go +++ b/pkg/app/piped/trigger/trigger.go @@ -20,24 +20,21 @@ package trigger import ( "context" "fmt" - "path/filepath" - "strings" "time" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/pipe-cd/pipe/pkg/app/api/service/pipedservice" + "github.com/pipe-cd/pipe/pkg/cache/memorycache" "github.com/pipe-cd/pipe/pkg/config" - "github.com/pipe-cd/pipe/pkg/filematcher" "github.com/pipe-cd/pipe/pkg/git" "github.com/pipe-cd/pipe/pkg/model" ) -var ( - commandCheckInterval = 10 * time.Second +const ( + commandCheckInterval = 10 * time.Second + defaultLastTriggeredCommitCacheSize = 500 ) const ( @@ -72,17 +69,17 @@ type notifier interface { } type Trigger struct { - apiClient apiClient - gitClient gitClient - applicationLister applicationLister - commandLister commandLister - environmentLister environmentLister - notifier notifier - config *config.PipedSpec - mostRecentlyTriggeredCommits map[string]string - gitRepos map[string]git.Repo - gracePeriod time.Duration - logger *zap.Logger + apiClient apiClient + gitClient gitClient + applicationLister applicationLister + commandLister commandLister + environmentLister environmentLister + notifier notifier + config *config.PipedSpec + commitStore *lastTriggeredCommitStore + gitRepos map[string]git.Repo + gracePeriod time.Duration + logger *zap.Logger } // NewTrigger creates a new instance for Trigger. @@ -96,21 +93,32 @@ func NewTrigger( cfg *config.PipedSpec, gracePeriod time.Duration, logger *zap.Logger, -) *Trigger { - - return &Trigger{ - apiClient: apiClient, - gitClient: gitClient, - applicationLister: appLister, - commandLister: commandLister, - environmentLister: environmentLister, - notifier: notifier, - config: cfg, - mostRecentlyTriggeredCommits: make(map[string]string), - gitRepos: make(map[string]git.Repo, len(cfg.Repositories)), - gracePeriod: gracePeriod, - logger: logger.Named("trigger"), +) (*Trigger, error) { + + cache, err := memorycache.NewLRUCache(defaultLastTriggeredCommitCacheSize) + if err != nil { + return nil, err + } + commitStore := &lastTriggeredCommitStore{ + apiClient: apiClient, + cache: cache, + } + + t := &Trigger{ + apiClient: apiClient, + gitClient: gitClient, + applicationLister: appLister, + commandLister: commandLister, + environmentLister: environmentLister, + notifier: notifier, + config: cfg, + commitStore: commitStore, + gitRepos: make(map[string]git.Repo, len(cfg.Repositories)), + gracePeriod: gracePeriod, + logger: logger.Named("trigger"), } + + return t, nil } // Run starts running Trigger until the specified context has done. @@ -143,10 +151,10 @@ L: select { case <-commandTicker.C: - t.checkCommand(ctx) + t.checkNewCommands(ctx) case <-commitTicker.C: - t.checkCommit(ctx) + t.checkNewCommits(ctx) case <-ctx.Done(): break L @@ -157,13 +165,19 @@ L: return nil } -func (t *Trigger) checkCommand(ctx context.Context) error { +func (t *Trigger) GetLastTriggeredCommitGetter() LastTriggeredCommitGetter { + return t.commitStore +} + +func (t *Trigger) checkNewCommands(ctx context.Context) error { commands := t.commandLister.ListApplicationCommands() + for _, cmd := range commands { syncCmd := cmd.GetSyncApplication() if syncCmd == nil { continue } + app, ok := t.applicationLister.Get(syncCmd.ApplicationId) if !ok { t.logger.Warn("detected an AppSync command for an unregistered application", @@ -173,6 +187,7 @@ func (t *Trigger) checkCommand(ctx context.Context) error { ) continue } + d, err := t.syncApplication(ctx, app, cmd.Commander, syncCmd.SyncStrategy) if err != nil { t.logger.Error("failed to sync application", @@ -192,29 +207,11 @@ func (t *Trigger) checkCommand(ctx context.Context) error { t.logger.Error("failed to report command status", zap.Error(err)) } } - return nil -} - -func (t *Trigger) syncApplication(ctx context.Context, app *model.Application, commander string, syncStrategy model.SyncStrategy) (*model.Deployment, error) { - _, branch, headCommit, err := t.updateRepoToLatest(ctx, app.GitPath.Repo.Id) - if err != nil { - return nil, err - } - - // Build deployment model and send a request to API to create a new deployment. - t.logger.Info(fmt.Sprintf("application %s will be synced because of a sync command", app.Id), - zap.String("head-commit", headCommit.Hash), - ) - d, err := t.triggerDeployment(ctx, app, branch, headCommit, commander, syncStrategy) - if err != nil { - return nil, err - } - t.mostRecentlyTriggeredCommits[app.Id] = headCommit.Hash - return d, nil + return nil } -func (t *Trigger) checkCommit(ctx context.Context) error { +func (t *Trigger) checkNewCommits(ctx context.Context) error { if len(t.gitRepos) == 0 { t.logger.Info("no repositories were configured for this piped") return nil @@ -230,93 +227,49 @@ func (t *Trigger) checkCommit(ctx context.Context) error { if err != nil { continue } + d := NewDeterminer(gitRepo, headCommit.Hash, t.commitStore, t.logger) + for _, app := range apps { - if err := t.checkApplication(ctx, app, gitRepo, branch, headCommit); err != nil { + shouldTrigger, err := d.ShouldTrigger(ctx, app) + if err != nil { t.logger.Error(fmt.Sprintf("failed to check application: %s", app.Id), zap.Error(err)) + continue } - } - } - return nil -} -func (t *Trigger) checkApplication(ctx context.Context, app *model.Application, repo git.Repo, branch string, headCommit git.Commit) error { - logger := t.logger.With( - zap.String("app", app.Name), - zap.String("app-id", app.Id), - zap.String("head-commit", headCommit.Hash), - ) - - // Get the most recently triggered commit of this application. - // Most of the cases that data can be loaded from in-memory cache but - // when the piped is restared that data will be cleared too. - // So in that case, we have to make an API call. - preCommitHash := t.mostRecentlyTriggeredCommits[app.Id] - if preCommitHash == "" { - mostRecent, err := t.getMostRecentlyTriggeredDeployment(ctx, app.Id) - switch { - case err == nil: - preCommitHash = mostRecent.Trigger.Commit.Hash - t.mostRecentlyTriggeredCommits[app.Id] = preCommitHash - - case status.Code(err) == codes.NotFound: - logger.Info("there is no previously triggered commit for this application") - - default: - logger.Error("unable to get the most recently triggered deployment", zap.Error(err)) - return err - } - } - - // Check whether the most recently applied one is the head commit or not. - // If so, nothing to do for this time. - if headCommit.Hash == preCommitHash { - logger.Info(fmt.Sprintf("no update to sync for application, hash: %s", headCommit.Hash)) - return nil - } + if !shouldTrigger { + t.commitStore.Put(app.Id, headCommit.Hash) + continue + } - trigger := func() error { - // Build deployment model and send a request to API to create a new deployment. - logger.Info("application should be synced because of the new commit", - zap.String("most-recently-triggered-commit", preCommitHash), - ) - if _, err := t.triggerDeployment(ctx, app, branch, headCommit, "", model.SyncStrategy_AUTO); err != nil { - return err + // Build deployment model and send a request to API to create a new deployment. + t.logger.Info("application should be synced because of the new commit") + if _, err := t.triggerDeployment(ctx, app, branch, headCommit, "", model.SyncStrategy_AUTO); err != nil { + t.logger.Error(fmt.Sprintf("failed to trigger application: %s", app.Id), zap.Error(err)) + } + t.commitStore.Put(app.Id, headCommit.Hash) } - t.mostRecentlyTriggeredCommits[app.Id] = headCommit.Hash - return nil } - // There is no previous deployment so we don't need to check anymore. - // Just do it. - if preCommitHash == "" { - return trigger() - } - - // List the changed files between those two commits and - // determine whether this application was touch by those changed files. - changedFiles, err := repo.ChangedFiles(ctx, preCommitHash, headCommit.Hash) - if err != nil { - return err - } + return nil +} - deployConfig, err := loadDeploymentConfiguration(repo.GetPath(), app) +func (t *Trigger) syncApplication(ctx context.Context, app *model.Application, commander string, syncStrategy model.SyncStrategy) (*model.Deployment, error) { + _, branch, headCommit, err := t.updateRepoToLatest(ctx, app.GitPath.Repo.Id) if err != nil { - return err + return nil, err } - touched, err := isTouchedByChangedFiles(app.GitPath.Path, deployConfig.TriggerPaths, changedFiles) + // Build deployment model and send a request to API to create a new deployment. + t.logger.Info(fmt.Sprintf("application %s will be synced because of a sync command", app.Id), + zap.String("head-commit", headCommit.Hash), + ) + d, err := t.triggerDeployment(ctx, app, branch, headCommit, commander, syncStrategy) if err != nil { - return err - } - if !touched { - logger.Info("application was not touched by the new commit", - zap.String("most-recently-triggered-commit", preCommitHash), - ) - t.mostRecentlyTriggeredCommits[app.Id] = headCommit.Hash - return nil + return nil, err } + t.commitStore.Put(app.Id, headCommit.Hash) - return trigger() + return d, nil } func (t *Trigger) updateRepoToLatest(ctx context.Context, repoID string) (repo git.Repo, branch string, headCommit git.Commit, err error) { @@ -375,71 +328,3 @@ func (t *Trigger) listApplications() map[string][]*model.Application { } return m } - -func (t *Trigger) getMostRecentlyTriggeredDeployment(ctx context.Context, applicationID string) (*model.ApplicationDeploymentReference, error) { - var ( - err error - resp *pipedservice.GetApplicationMostRecentDeploymentResponse - retry = pipedservice.NewRetry(3) - req = &pipedservice.GetApplicationMostRecentDeploymentRequest{ - ApplicationId: applicationID, - Status: model.DeploymentStatus_DEPLOYMENT_PENDING, - } - ) - - for retry.WaitNext(ctx) { - if resp, err = t.apiClient.GetApplicationMostRecentDeployment(ctx, req); err == nil { - return resp.Deployment, nil - } - if !pipedservice.Retriable(err) { - return nil, err - } - } - return nil, err -} - -func loadDeploymentConfiguration(repoPath string, app *model.Application) (*config.GenericDeploymentSpec, error) { - path := filepath.Join(repoPath, app.GitPath.GetDeploymentConfigFilePath()) - cfg, err := config.LoadFromYAML(path) - if err != nil { - return nil, err - } - if appKind, ok := config.ToApplicationKind(cfg.Kind); !ok || appKind != app.Kind { - return nil, fmt.Errorf("invalid application kind in the deployment config file, got: %s, expected: %s", appKind, app.Kind) - } - - spec, ok := cfg.GetGenericDeployment() - if !ok { - return nil, fmt.Errorf("unsupported application kind: %s", app.Kind) - } - - return &spec, nil -} - -func isTouchedByChangedFiles(appDir string, changes []string, changedFiles []string) (bool, error) { - if !strings.HasSuffix(appDir, "/") { - appDir += "/" - } - - // If any files inside the application directory was changed - // this application is considered as touched. - for _, cf := range changedFiles { - if ok := strings.HasPrefix(cf, appDir); ok { - return true, nil - } - } - - // If any changed files matches the specified "changes" - // this application is consided as touched too. - for _, change := range changes { - matcher, err := filematcher.NewPatternMatcher([]string{change}) - if err != nil { - return false, err - } - if matcher.MatchesAny(changedFiles) { - return true, nil - } - } - - return false, nil -} diff --git a/pkg/model/planpreview.go b/pkg/model/planpreview.go index eee7daad81..765ddc0124 100644 --- a/pkg/model/planpreview.go +++ b/pkg/model/planpreview.go @@ -14,6 +14,8 @@ package model +import "time" + func (r *PlanPreviewCommandResult) FillURLs(baseURL string) { r.PipedUrl = MakePipedURL(baseURL, r.PipedId) for _, ar := range r.Results { @@ -21,3 +23,18 @@ func (r *PlanPreviewCommandResult) FillURLs(baseURL string) { ar.EnvUrl = MakeEnvironmentURL(baseURL, ar.EnvId) } } + +func MakeApplicationPlanPreviewResult(app Application, envName string) *ApplicationPlanPreviewResult { + r := &ApplicationPlanPreviewResult{ + ApplicationId: app.Id, + ApplicationName: app.Name, + ApplicationKind: app.Kind, + ApplicationDirectory: app.GitPath.Path, + EnvId: app.EnvId, + EnvName: envName, + PipedId: app.PipedId, + ProjectId: app.ProjectId, + CreatedAt: time.Now().Unix(), + } + return r +} diff --git a/pkg/model/planpreview.proto b/pkg/model/planpreview.proto index f48ef6ed30..b025bc3641 100644 --- a/pkg/model/planpreview.proto +++ b/pkg/model/planpreview.proto @@ -45,7 +45,7 @@ message ApplicationPlanPreviewResult { string application_directory = 5 [(validate.rules).string.min_len = 1]; string env_id = 6 [(validate.rules).string.min_len = 1]; - string env_name = 7 [(validate.rules).string.min_len = 1]; + string env_name = 7; // Web URL to the environment page. // This is only filled before returning to the client. string env_url = 8; @@ -54,8 +54,8 @@ message ApplicationPlanPreviewResult { string project_id = 10 [(validate.rules).string.min_len = 1]; // Target commit information. - string target_branch = 20 [(validate.rules).string.min_len = 1]; - string target_head_commit = 21 [(validate.rules).string.min_len = 1]; + string head_branch = 20 [(validate.rules).string.min_len = 1]; + string head_commit = 21 [(validate.rules).string.min_len = 1]; // Planpreview result. SyncStrategy sync_strategy = 30;