diff --git a/pkg/app/piped/eventwatcher/eventwatcher.go b/pkg/app/piped/eventwatcher/eventwatcher.go index 03111d9459..8d741050fb 100644 --- a/pkg/app/piped/eventwatcher/eventwatcher.go +++ b/pkg/app/piped/eventwatcher/eventwatcher.go @@ -54,17 +54,15 @@ type gitClient interface { Clone(ctx context.Context, repoID, remote, branch, destination string) (git.Repo, error) } -type commit struct { - changes map[string][]byte - message string -} - type watcher struct { config *config.PipedSpec eventGetter eventGetter gitClient gitClient logger *zap.Logger wg sync.WaitGroup + + // All cloned repository will be placed under this. + workingDir string } func NewWatcher(cfg *config.PipedSpec, eventGetter eventGetter, gitClient gitClient, logger *zap.Logger) Watcher { @@ -81,16 +79,19 @@ func NewWatcher(cfg *config.PipedSpec, eventGetter eventGetter, gitClient gitCli func (w *watcher) Run(ctx context.Context) error { w.logger.Info("start running event watcher") + workingDir, err := ioutil.TempDir("", "event-watcher") + if err != nil { + return fmt.Errorf("failed to create the working directory: %w", err) + } + defer os.RemoveAll(workingDir) + w.workingDir = workingDir + for _, repoCfg := range w.config.Repositories { - repo, err := w.gitClient.Clone(ctx, repoCfg.RepoID, repoCfg.Remote, repoCfg.Branch, "") + repo, err := w.cloneRepo(ctx, repoCfg) if err != nil { - w.logger.Error("failed to clone repository", - zap.String("repo-id", repoCfg.RepoID), - zap.Error(err), - ) - return fmt.Errorf("failed to clone repository %s: %w", repoCfg.RepoID, err) + return err } - defer os.RemoveAll(repo.GetPath()) + defer repo.Clean() w.wg.Add(1) go w.run(ctx, repo, repoCfg) @@ -138,6 +139,22 @@ func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg config.PipedRe zap.String("branch", repo.GetClonedBranch()), zap.Error(err), ) + if err := repo.Clean(); err != nil { + w.logger.Error("failed to remove repo directory", + zap.String("path", repo.GetPath()), + zap.Error(err), + ) + } + w.logger.Info("Try to re-clone because it's more likely to be unable to pull the next time too", + zap.String("repo-id", repoCfg.RepoID), + ) + repo, err = w.cloneRepo(ctx, repoCfg) + if err != nil { + w.logger.Error("failed to re-clone repository", + zap.String("repo-id", repoCfg.RepoID), + zap.Error(err), + ) + } continue } cfg, err := config.LoadEventWatcher(repo.GetPath(), includedCfgs, excludedCfgs) @@ -165,52 +182,49 @@ func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg config.PipedRe } } +// cloneRepo clones the git repository under the working directory. +func (w *watcher) cloneRepo(ctx context.Context, repoCfg config.PipedRepository) (git.Repo, error) { + dst, err := ioutil.TempDir(w.workingDir, repoCfg.RepoID) + if err != nil { + return nil, fmt.Errorf("failed to create a new temporary directory: %w", err) + } + repo, err := w.gitClient.Clone(ctx, repoCfg.RepoID, repoCfg.Remote, repoCfg.Branch, dst) + if err != nil { + return nil, fmt.Errorf("failed to clone repository %s: %w", repoCfg.RepoID, err) + } + return repo, nil +} + // updateValues inspects all Event-definition and pushes the changes to git repo if there is. func (w *watcher) updateValues(ctx context.Context, repo git.Repo, events []config.EventWatcherEvent, commitMsg string) error { - // Copy the repo to another directory to avoid pull failure in the future. - tmpDir, err := ioutil.TempDir("", "event-watcher") + // Copy the repo to another directory to modify local file to avoid reverting previous changes. + tmpDir, err := ioutil.TempDir(w.workingDir, "repo") if err != nil { return fmt.Errorf("failed to create a new temporary directory: %w", err) } - defer os.RemoveAll(tmpDir) tmpRepo, err := repo.Copy(filepath.Join(tmpDir, "tmp-repo")) if err != nil { return fmt.Errorf("failed to copy the repository to the temporary directory: %w", err) } + defer tmpRepo.Clean() - commits := make([]*commit, 0) for _, e := range events { - latestEvent, ok := w.eventGetter.GetLatest(ctx, e.Name, e.Labels) - if !ok { - continue - } - c, err := w.modifyFiles(latestEvent, &e, tmpRepo, commitMsg) - if err != nil { - w.logger.Error("failed to modify outdated files", zap.Error(err)) + if err := w.commitFiles(ctx, e, tmpRepo, commitMsg); err != nil { + w.logger.Error("failed to commit outdated files", zap.Error(err)) continue } - if c != nil { - commits = append(commits, c) - } - } - if len(commits) == 0 { - return nil - } - - w.logger.Info(fmt.Sprintf("event watcher will update %d outdated values", len(commits))) - for _, c := range commits { - if err := tmpRepo.CommitChanges(ctx, tmpRepo.GetClonedBranch(), c.message, false, c.changes); err != nil { - return fmt.Errorf("failed to perform git commit: %w", err) - } } return tmpRepo.Push(ctx, tmpRepo.GetClonedBranch()) } -// modifyFiles modifies files defined in a given Event if any deviation exists between the value in -// the git repository and one in the control-plane. And gives back a change contents. -func (w *watcher) modifyFiles(latestEvent *model.Event, eventCfg *config.EventWatcherEvent, repo git.Repo, commitMsg string) (*commit, error) { - // Determine files to be changed. - changes := make(map[string][]byte, 0) +// commitFiles commits changes if the data in Git is different from the latest event. +func (w *watcher) commitFiles(ctx context.Context, eventCfg config.EventWatcherEvent, repo git.Repo, commitMsg string) error { + latestEvent, ok := w.eventGetter.GetLatest(ctx, eventCfg.Name, eventCfg.Labels) + if !ok { + return nil + } + // Determine files to be changed by comparing with the latest event. + changes := make(map[string][]byte, len(eventCfg.Replacements)) for _, r := range eventCfg.Replacements { var ( path = filepath.Join(repo.GetPath(), r.File) @@ -227,29 +241,29 @@ func (w *watcher) modifyFiles(latestEvent *model.Event, eventCfg *config.EventWa // TODO: Empower Event watcher to parse HCL format } if err != nil { - return nil, err + return err } if upToDate { continue } - // To avoid being conflict, we have to update the local file. + if err := ioutil.WriteFile(path, newContent, os.ModePerm); err != nil { - return nil, fmt.Errorf("failed to write file: %w", err) + return fmt.Errorf("failed to write file: %w", err) } changes[r.File] = newContent } - if len(changes) == 0 { - return nil, nil + return nil } if commitMsg == "" { commitMsg = fmt.Sprintf(defaultCommitMessageFormat, latestEvent.Data, eventCfg.Name) } - return &commit{ - changes: changes, - message: commitMsg, - }, nil + if err := repo.CommitChanges(ctx, repo.GetClonedBranch(), commitMsg, false, changes); err != nil { + return fmt.Errorf("failed to perform git commit: %w", err) + } + w.logger.Info(fmt.Sprintf("event watcher will update values of Event %q", eventCfg.Name)) + return nil } // modifyYAML returns a new YAML content as a first returned value if the value of given