diff --git a/pkg/app/piped/imagewatcher/BUILD.bazel b/pkg/app/piped/imagewatcher/BUILD.bazel index 11fa942c48..3a068049a5 100644 --- a/pkg/app/piped/imagewatcher/BUILD.bazel +++ b/pkg/app/piped/imagewatcher/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//pkg/app/piped/imageprovider:go_default_library", "//pkg/config:go_default_library", "//pkg/git:go_default_library", + "//pkg/yamlprocessor:go_default_library", "@org_uber_go_zap//:go_default_library", ], ) diff --git a/pkg/app/piped/imagewatcher/watcher.go b/pkg/app/piped/imagewatcher/watcher.go index d860271e86..38d4345ddc 100644 --- a/pkg/app/piped/imagewatcher/watcher.go +++ b/pkg/app/piped/imagewatcher/watcher.go @@ -19,6 +19,9 @@ package imagewatcher import ( "context" + "fmt" + "io/ioutil" + "path/filepath" "sync" "time" @@ -27,6 +30,7 @@ import ( "github.com/pipe-cd/pipe/pkg/app/piped/imageprovider" "github.com/pipe-cd/pipe/pkg/config" "github.com/pipe-cd/pipe/pkg/git" + "github.com/pipe-cd/pipe/pkg/yamlprocessor" ) type Watcher interface { @@ -56,7 +60,8 @@ func NewWatcher(cfg *config.PipedSpec, gitClient gitClient, logger *zap.Logger) } } -// Run spawns goroutines for each image provider. +// Run spawns goroutines for each image provider. They periodically pull the image +// from the container registry to compare the image with one in the git repository. func (w *watcher) Run(ctx context.Context) error { // Pre-clone to cache the registered git repositories. for _, r := range w.config.Repositories { @@ -96,17 +101,23 @@ func (w *watcher) run(ctx context.Context, provider imageprovider.Provider, inte case <-ctx.Done(): return case <-ticker.C: - targets := w.collectTargets(ctx, provider) - outdated, err := determineUpdates(ctx, targets, provider) - if err != nil { - w.logger.Error("failed to determine which one should be updated", zap.Error(err)) - continue + updates := make([]config.ImageWatcherTarget, 0) + for id, repo := range w.gitRepos { + u, err := w.determineUpdates(ctx, id, repo, provider) + if err != nil { + w.logger.Error("failed to determine images to be updated", + zap.String("repo-id", id), + zap.Error(err), + ) + continue + } + updates = append(updates, u...) } - if len(outdated) == 0 { + if len(updates) == 0 { w.logger.Info("no image to be updated") continue } - if err := update(outdated); err != nil { + if err := update(updates); err != nil { w.logger.Error("failed to update image", zap.Error(err)) continue } @@ -114,72 +125,75 @@ func (w *watcher) run(ctx context.Context, provider imageprovider.Provider, inte } } -// collectTarget collects target images for each git repository. -func (w *watcher) collectTargets(ctx context.Context, provider imageprovider.Provider) (targets []config.ImageWatcherTarget) { - for id, repo := range w.gitRepos { - branch := repo.GetClonedBranch() - w.mu.Lock() - err := repo.Pull(ctx, branch) - w.mu.Unlock() - if err != nil { - w.logger.Error("failed to update repository branch", - zap.String("repo-id", id), - zap.Error(err), - ) +// determineUpdates gives back target images to be updated for a given repo. +func (w *watcher) determineUpdates(ctx context.Context, repoID string, repo git.Repo, provider imageprovider.Provider) ([]config.ImageWatcherTarget, error) { + branch := repo.GetClonedBranch() + w.mu.Lock() + err := repo.Pull(ctx, branch) + w.mu.Unlock() + if err != nil { + return nil, fmt.Errorf("failed to fetch from and integrate with a local branch: %w", err) + } + + // Load Image Watcher Config for the given repo. + includes := make([]string, 0) + excludes := make([]string, 0) + for _, target := range w.config.ImageWatcher.Repos { + if target.RepoID != repoID { continue } + includes = append(includes, target.Includes...) + excludes = append(excludes, target.Excludes...) + } + cfg, ok, err := config.LoadImageWatcher(repo.GetPath(), includes, excludes) + if err != nil { + return nil, fmt.Errorf("failed to load configuration file for Image Watcher: %w", err) + } + if !ok { + return nil, fmt.Errorf("configuration file for Image Watcher not found: %w", err) + } - includes := make([]string, 0) - excludes := make([]string, 0) - for _, target := range w.config.ImageWatcher.Repos { - if target.RepoID != id { - continue - } - includes = append(includes, target.Includes...) - excludes = append(excludes, target.Excludes...) + updates := make([]config.ImageWatcherTarget, 0) + for _, target := range cfg.Targets { + if provider.Name() != target.Provider { + continue } - cfg, ok, err := config.LoadImageWatcher(repo.GetPath(), includes, excludes) + outdated, err := checkOutdated(ctx, target, repo, provider) if err != nil { - w.logger.Error("failed to load configuration file for Image Watcher", zap.Error(err)) - continue + return nil, fmt.Errorf("failed to check the image is outdated: %w", err) } - if !ok { - w.logger.Error("configuration file for Image Watcher not found", zap.Error(err)) - continue + if outdated { + updates = append(updates, target) } - t := filterTargets(provider.Name(), cfg.Targets) - targets = append(targets, t...) } - return + return updates, nil } -// filterTargets gives back the targets corresponding to the given provider. -func filterTargets(provider string, targets []config.ImageWatcherTarget) (filtered []config.ImageWatcherTarget) { - for _, t := range targets { - if t.Provider == provider { - filtered = append(filtered, t) - } +// checkOutdated checks if the image defined in the given target is identical to the one in image provider. +func checkOutdated(ctx context.Context, target config.ImageWatcherTarget, repo git.Repo, provider imageprovider.Provider) (bool, error) { + i, err := provider.ParseImage(target.Image) + if err != nil { + return false, err } - return -} - -// determineUpdates gives back target images to be updated. -func determineUpdates(ctx context.Context, targets []config.ImageWatcherTarget, provider imageprovider.Provider) (outdated []config.ImageWatcherTarget, err error) { - for _, target := range targets { - i, err := provider.ParseImage(target.Image) - if err != nil { - return nil, err - } - // TODO: Control not to reach the rate limit - _, err = provider.GetLatestImage(ctx, i) - if err != nil { - return nil, err - } - // TODO: Compares between image repos in the image registry and image repos in git - // And then gives back image repos to be updated. + // TODO: Control not to reach the rate limit + imageRef, err := provider.GetLatestImage(ctx, i) + if err != nil { + return false, err } - return + yml, err := ioutil.ReadFile(filepath.Join(repo.GetPath(), target.FilePath)) + if err != nil { + return false, err + } + value, err := yamlprocessor.GetValue(yml, target.Field) + if err != nil { + return false, err + } + v, ok := value.(string) + if !ok { + return false, fmt.Errorf("unknown value is defined at %s in %s", target.FilePath, target.Field) + } + return imageRef.String() != v, nil } func update(targets []config.ImageWatcherTarget) error {