diff --git a/pkg/app/piped/imagewatcher/watcher.go b/pkg/app/piped/imagewatcher/watcher.go index c03ff57fed..6c8ea32640 100644 --- a/pkg/app/piped/imagewatcher/watcher.go +++ b/pkg/app/piped/imagewatcher/watcher.go @@ -33,6 +33,8 @@ import ( "github.com/pipe-cd/pipe/pkg/yamlprocessor" ) +const defaultCommitMessageFormat = "Update image %s to %s defined at %s in %s" + type Watcher interface { Run(context.Context) error } @@ -46,7 +48,8 @@ type watcher struct { gitClient gitClient logger *zap.Logger wg sync.WaitGroup - mu sync.Mutex + // For file locking. + mu sync.Mutex // Indexed by repo id. gitRepos map[string]git.Repo @@ -63,15 +66,17 @@ func NewWatcher(cfg *config.PipedSpec, gitClient gitClient, logger *zap.Logger) // Run spawns goroutines for each image provider. They periodically pull the image // from the container registry to compare the image with one in the git repository. func (w *watcher) Run(ctx context.Context) error { + // TODO: Spawn goroutines for each repository // Pre-clone to cache the registered git repositories. for _, r := range w.config.Repositories { + // TODO: Clone repository another temporary destination repo, err := w.gitClient.Clone(ctx, r.RepoID, r.Remote, r.Branch, "") if err != nil { w.logger.Error("failed to clone repository", zap.String("repo-id", r.RepoID), zap.Error(err), ) - return err + return fmt.Errorf("failed to clone repository %s: %w", r.RepoID, err) } w.gitRepos[r.RepoID] = repo } @@ -79,7 +84,7 @@ func (w *watcher) Run(ctx context.Context) error { for _, cfg := range w.config.ImageProviders { p, err := imageprovider.NewProvider(&cfg, w.logger) if err != nil { - return err + return fmt.Errorf("failed to yield image provider %s: %w", cfg.Name, err) } w.wg.Add(1) @@ -89,7 +94,7 @@ func (w *watcher) Run(ctx context.Context) error { return nil } -// run periodically compares the image stored in the given provider and one stored in git. +// run periodically compares the image in the given provider and one in git repository. // And then pushes those with differences. func (w *watcher) run(ctx context.Context, provider imageprovider.Provider, interval time.Duration) { defer w.wg.Done() @@ -101,51 +106,48 @@ func (w *watcher) run(ctx context.Context, provider imageprovider.Provider, inte case <-ctx.Done(): return case <-ticker.C: - updates := make([]config.ImageWatcherTarget, 0) + // Inspect all targets in all repos, and compare only images managed by the given provider. for id, repo := range w.gitRepos { - u, err := w.determineUpdates(ctx, id, repo, provider) + cfg, err := w.loadImageWatcherConfig(ctx, id, repo) if err != nil { - w.logger.Error("failed to determine images to be updated", + w.logger.Error("failed to load image watcher config", zap.String("repo-id", id), zap.Error(err), ) continue } - updates = append(updates, u...) - } - if len(updates) == 0 { - w.logger.Info("no image to be updated", - zap.String("image-provider", provider.Name()), - ) - continue - } - if err := update(updates); err != nil { - w.logger.Error("failed to update image", - zap.String("image-provider", provider.Name()), - zap.Error(err), - ) - continue + for _, target := range cfg.Targets { + if target.Provider != provider.Name() { + continue + } + if err := w.updateOutdatedImage(ctx, &target, repo, provider); err != nil { + w.logger.Error("failed to update image", + zap.String("repo-id", id), + zap.String("image-provider", provider.Name()), + zap.Error(err), + ) + continue + } + } } } } } -// determineUpdates gives back target images to be updated for a given repo. -func (w *watcher) determineUpdates(ctx context.Context, repoID string, repo git.Repo, provider imageprovider.Provider) ([]config.ImageWatcherTarget, error) { - branch := repo.GetClonedBranch() +// loadImageWatcherConfig gives back an Image Watcher Config for the given repo. +func (w *watcher) loadImageWatcherConfig(ctx context.Context, repoID string, repo git.Repo) (*config.ImageWatcherSpec, error) { w.mu.Lock() - err := repo.Pull(ctx, branch) + err := repo.Pull(ctx, repo.GetClonedBranch()) w.mu.Unlock() if err != nil { - return nil, fmt.Errorf("failed to fetch from and integrate with a local branch: %w", err) + return nil, fmt.Errorf("failed to perform git pull: %w", err) } - // Load Image Watcher Config for the given repo. var includes, excludes []string - for _, target := range w.config.ImageWatcher.Repos { - if target.RepoID == repoID { - includes = target.Includes - excludes = target.Excludes + for _, repos := range w.config.ImageWatcher.Repos { + if repos.RepoID == repoID { + includes = repos.Includes + excludes = repos.Excludes break } } @@ -156,51 +158,61 @@ func (w *watcher) determineUpdates(ctx context.Context, repoID string, repo git. if !ok { return nil, fmt.Errorf("configuration file for Image Watcher not found: %w", err) } - - updates := make([]config.ImageWatcherTarget, 0) - for _, target := range cfg.Targets { - if provider.Name() != target.Provider { - continue - } - outdated, err := checkOutdated(ctx, target, repo, provider) - if err != nil { - return nil, fmt.Errorf("failed to check the image is outdated: %w", err) - } - if outdated { - updates = append(updates, target) - } - } - return updates, nil + return cfg, nil } -// checkOutdated checks if the image defined in the given target is identical to the one in image provider. -func checkOutdated(ctx context.Context, target config.ImageWatcherTarget, repo git.Repo, provider imageprovider.Provider) (bool, error) { +// updateOutdatedImage first compares the image in git repository and one in image provider. +// Then pushes rewritten one to the git repository if any deviation exists. +func (w *watcher) updateOutdatedImage(ctx context.Context, target *config.ImageWatcherTarget, repo git.Repo, provider imageprovider.Provider) error { + // Fetch from the image provider. i, err := provider.ParseImage(target.Image) if err != nil { - return false, err + return fmt.Errorf("failed to parse image string \"%s\": %w", target.Image, err) } // TODO: Control not to reach the rate limit - imageRef, err := provider.GetLatestImage(ctx, i) + imageInRegistry, err := provider.GetLatestImage(ctx, i) if err != nil { - return false, err + return fmt.Errorf("failed to get latest image from %s: %w", provider.Name(), err) } - yml, err := ioutil.ReadFile(filepath.Join(repo.GetPath(), target.FilePath)) + // Fetch from the git repository. + path := filepath.Join(repo.GetPath(), target.FilePath) + yml, err := ioutil.ReadFile(path) if err != nil { - return false, err + return fmt.Errorf("failed to read file: %w", err) } value, err := yamlprocessor.GetValue(yml, target.Field) if err != nil { - return false, err + return fmt.Errorf("failed to get value at %s in %s: %w", target.Field, target.FilePath, err) } - v, ok := value.(string) + imageInGit, ok := value.(string) if !ok { - return false, fmt.Errorf("unknown value is defined at %s in %s", target.FilePath, target.Field) + return fmt.Errorf("unknown value is defined at %s in %s", target.FilePath, target.Field) } - return imageRef.String() != v, nil -} -func update(targets []config.ImageWatcherTarget) error { - // TODO: Make it possible to push outdated images to Git + outdated := imageInRegistry.String() != imageInGit + if !outdated { + return nil + } + + // Update the outdated image. + newYml, err := yamlprocessor.ReplaceValue(yml, target.Field, imageInRegistry.String()) + if err != nil { + return fmt.Errorf("failed to replace value at %s with %s: %w", target.Field, imageInRegistry, err) + } + changes := map[string][]byte{ + target.FilePath: newYml, + } + // TODO: Make it changeable the commit message + msg := fmt.Sprintf(defaultCommitMessageFormat, imageInGit, imageInRegistry.String(), target.Field, target.FilePath) + w.mu.Lock() + if err := repo.CommitChanges(ctx, repo.GetClonedBranch(), msg, false, changes); err != nil { + return fmt.Errorf("failed to perform git commit: %w", err) + } + err = repo.Push(ctx, repo.GetClonedBranch()) + w.mu.Unlock() + if err != nil { + return fmt.Errorf("failed to perform git push: %w", err) + } return nil }