Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 64 additions & 50 deletions pkg/app/piped/eventwatcher/eventwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,15 @@ type gitClient interface {
Clone(ctx context.Context, repoID, remote, branch, destination string) (git.Repo, error)
}

type commit struct {
changes map[string][]byte
message string
}

type watcher struct {
config *config.PipedSpec
eventGetter eventGetter
gitClient gitClient
logger *zap.Logger
wg sync.WaitGroup

// All cloned repository will be placed under this.
workingDir string
}

func NewWatcher(cfg *config.PipedSpec, eventGetter eventGetter, gitClient gitClient, logger *zap.Logger) Watcher {
Expand All @@ -81,16 +79,19 @@ func NewWatcher(cfg *config.PipedSpec, eventGetter eventGetter, gitClient gitCli
func (w *watcher) Run(ctx context.Context) error {
w.logger.Info("start running event watcher")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making a temporary working directory for watcher and storing all cloned source code into that directory?
By that way, we can add a defer to clean the whole directory to ensure that everything will be removed when the watcher stopped.
And don't have to worry about this ignore:

repo, _ = w.cloneRepo(ctx, repoCfg)

workingDir, err := ioutil.TempDir("", "event-watcher")
if err != nil {
return fmt.Errorf("failed to create the working directory: %w", err)
}
defer os.RemoveAll(workingDir)
w.workingDir = workingDir

for _, repoCfg := range w.config.Repositories {
repo, err := w.gitClient.Clone(ctx, repoCfg.RepoID, repoCfg.Remote, repoCfg.Branch, "")
repo, err := w.cloneRepo(ctx, repoCfg)
if err != nil {
w.logger.Error("failed to clone repository",
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
return fmt.Errorf("failed to clone repository %s: %w", repoCfg.RepoID, err)
return err
}
defer os.RemoveAll(repo.GetPath())
defer repo.Clean()

w.wg.Add(1)
go w.run(ctx, repo, repoCfg)
Expand Down Expand Up @@ -138,6 +139,22 @@ func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg config.PipedRe
zap.String("branch", repo.GetClonedBranch()),
zap.Error(err),
)
if err := repo.Clean(); err != nil {
w.logger.Error("failed to remove repo directory",
zap.String("path", repo.GetPath()),
zap.Error(err),
)
}
w.logger.Info("Try to re-clone because it's more likely to be unable to pull the next time too",
zap.String("repo-id", repoCfg.RepoID),
)
repo, err = w.cloneRepo(ctx, repoCfg)
if err != nil {
w.logger.Error("failed to re-clone repository",
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
}
continue
}
cfg, err := config.LoadEventWatcher(repo.GetPath(), includedCfgs, excludedCfgs)
Expand Down Expand Up @@ -165,52 +182,49 @@ func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg config.PipedRe
}
}

// cloneRepo clones the git repository under the working directory.
func (w *watcher) cloneRepo(ctx context.Context, repoCfg config.PipedRepository) (git.Repo, error) {
dst, err := ioutil.TempDir(w.workingDir, repoCfg.RepoID)
if err != nil {
return nil, fmt.Errorf("failed to create a new temporary directory: %w", err)
}
repo, err := w.gitClient.Clone(ctx, repoCfg.RepoID, repoCfg.Remote, repoCfg.Branch, dst)
if err != nil {
return nil, fmt.Errorf("failed to clone repository %s: %w", repoCfg.RepoID, err)
}
return repo, nil
}

// updateValues inspects all Event-definition and pushes the changes to git repo if there is.
func (w *watcher) updateValues(ctx context.Context, repo git.Repo, events []config.EventWatcherEvent, commitMsg string) error {
// Copy the repo to another directory to avoid pull failure in the future.
tmpDir, err := ioutil.TempDir("", "event-watcher")
// Copy the repo to another directory to modify local file to avoid reverting previous changes.
tmpDir, err := ioutil.TempDir(w.workingDir, "repo")
if err != nil {
return fmt.Errorf("failed to create a new temporary directory: %w", err)
}
defer os.RemoveAll(tmpDir)
tmpRepo, err := repo.Copy(filepath.Join(tmpDir, "tmp-repo"))
if err != nil {
return fmt.Errorf("failed to copy the repository to the temporary directory: %w", err)
}
defer tmpRepo.Clean()

commits := make([]*commit, 0)
for _, e := range events {
latestEvent, ok := w.eventGetter.GetLatest(ctx, e.Name, e.Labels)
if !ok {
continue
}
c, err := w.modifyFiles(latestEvent, &e, tmpRepo, commitMsg)
if err != nil {
w.logger.Error("failed to modify outdated files", zap.Error(err))
if err := w.commitFiles(ctx, e, tmpRepo, commitMsg); err != nil {
w.logger.Error("failed to commit outdated files", zap.Error(err))
continue
}
if c != nil {
commits = append(commits, c)
}
}
if len(commits) == 0 {
return nil
}

w.logger.Info(fmt.Sprintf("event watcher will update %d outdated values", len(commits)))
for _, c := range commits {
if err := tmpRepo.CommitChanges(ctx, tmpRepo.GetClonedBranch(), c.message, false, c.changes); err != nil {
return fmt.Errorf("failed to perform git commit: %w", err)
}
}
return tmpRepo.Push(ctx, tmpRepo.GetClonedBranch())
}

// modifyFiles modifies files defined in a given Event if any deviation exists between the value in
// the git repository and one in the control-plane. And gives back a change contents.
func (w *watcher) modifyFiles(latestEvent *model.Event, eventCfg *config.EventWatcherEvent, repo git.Repo, commitMsg string) (*commit, error) {
// Determine files to be changed.
changes := make(map[string][]byte, 0)
// commitFiles commits changes if the data in Git is different from the latest event.
func (w *watcher) commitFiles(ctx context.Context, eventCfg config.EventWatcherEvent, repo git.Repo, commitMsg string) error {
latestEvent, ok := w.eventGetter.GetLatest(ctx, eventCfg.Name, eventCfg.Labels)
if !ok {
return nil
}
// Determine files to be changed by comparing with the latest event.
changes := make(map[string][]byte, len(eventCfg.Replacements))
for _, r := range eventCfg.Replacements {
var (
path = filepath.Join(repo.GetPath(), r.File)
Expand All @@ -227,29 +241,29 @@ func (w *watcher) modifyFiles(latestEvent *model.Event, eventCfg *config.EventWa
// TODO: Empower Event watcher to parse HCL format
}
if err != nil {
return nil, err
return err
}
if upToDate {
continue
}
// To avoid being conflict, we have to update the local file.

if err := ioutil.WriteFile(path, newContent, os.ModePerm); err != nil {
return nil, fmt.Errorf("failed to write file: %w", err)
return fmt.Errorf("failed to write file: %w", err)
}
changes[r.File] = newContent
}

if len(changes) == 0 {
return nil, nil
return nil
}

if commitMsg == "" {
commitMsg = fmt.Sprintf(defaultCommitMessageFormat, latestEvent.Data, eventCfg.Name)
}
return &commit{
changes: changes,
message: commitMsg,
}, nil
if err := repo.CommitChanges(ctx, repo.GetClonedBranch(), commitMsg, false, changes); err != nil {
return fmt.Errorf("failed to perform git commit: %w", err)
}
w.logger.Info(fmt.Sprintf("event watcher will update values of Event %q", eventCfg.Name))
return nil
}

// modifyYAML returns a new YAML content as a first returned value if the value of given
Expand Down