Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/app/piped/imagewatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//pkg/app/piped/imageprovider:go_default_library",
"//pkg/config:go_default_library",
"//pkg/git:go_default_library",
"//pkg/yamlprocessor:go_default_library",
"@org_uber_go_zap//:go_default_library",
],
)
136 changes: 75 additions & 61 deletions pkg/app/piped/imagewatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package imagewatcher

import (
"context"
"fmt"
"io/ioutil"
"path/filepath"
"sync"
"time"

Expand All @@ -27,6 +30,7 @@ import (
"github.com/pipe-cd/pipe/pkg/app/piped/imageprovider"
"github.com/pipe-cd/pipe/pkg/config"
"github.com/pipe-cd/pipe/pkg/git"
"github.com/pipe-cd/pipe/pkg/yamlprocessor"
)

type Watcher interface {
Expand Down Expand Up @@ -56,7 +60,8 @@ func NewWatcher(cfg *config.PipedSpec, gitClient gitClient, logger *zap.Logger)
}
}

// Run spawns goroutines for each image provider.
// Run spawns goroutines for each image provider. They periodically pull the image
// from the container registry to compare the image with one in the git repository.
func (w *watcher) Run(ctx context.Context) error {
// Pre-clone to cache the registered git repositories.
for _, r := range w.config.Repositories {
Expand Down Expand Up @@ -96,90 +101,99 @@ func (w *watcher) run(ctx context.Context, provider imageprovider.Provider, inte
case <-ctx.Done():
return
case <-ticker.C:
targets := w.collectTargets(ctx, provider)
outdated, err := determineUpdates(ctx, targets, provider)
if err != nil {
w.logger.Error("failed to determine which one should be updated", zap.Error(err))
continue
updates := make([]config.ImageWatcherTarget, 0)
for id, repo := range w.gitRepos {
u, err := w.determineUpdates(ctx, id, repo, provider)
if err != nil {
w.logger.Error("failed to determine images to be updated",
zap.String("repo-id", id),
zap.Error(err),
)
continue
}
updates = append(updates, u...)
}
if len(outdated) == 0 {
if len(updates) == 0 {
w.logger.Info("no image to be updated")
continue
}
if err := update(outdated); err != nil {
if err := update(updates); err != nil {
w.logger.Error("failed to update image", zap.Error(err))
continue
}
}
}
}

// collectTarget collects target images for each git repository.
func (w *watcher) collectTargets(ctx context.Context, provider imageprovider.Provider) (targets []config.ImageWatcherTarget) {
for id, repo := range w.gitRepos {
branch := repo.GetClonedBranch()
w.mu.Lock()
err := repo.Pull(ctx, branch)
w.mu.Unlock()
if err != nil {
w.logger.Error("failed to update repository branch",
zap.String("repo-id", id),
zap.Error(err),
)
// determineUpdates gives back target images to be updated for a given repo.
func (w *watcher) determineUpdates(ctx context.Context, repoID string, repo git.Repo, provider imageprovider.Provider) ([]config.ImageWatcherTarget, error) {
branch := repo.GetClonedBranch()
w.mu.Lock()
err := repo.Pull(ctx, branch)
w.mu.Unlock()
if err != nil {
return nil, fmt.Errorf("failed to fetch from and integrate with a local branch: %w", err)
}

// Load Image Watcher Config for the given repo.
includes := make([]string, 0)
excludes := make([]string, 0)
for _, target := range w.config.ImageWatcher.Repos {
if target.RepoID != repoID {
continue
}
includes = append(includes, target.Includes...)
excludes = append(excludes, target.Excludes...)
}
cfg, ok, err := config.LoadImageWatcher(repo.GetPath(), includes, excludes)
if err != nil {
return nil, fmt.Errorf("failed to load configuration file for Image Watcher: %w", err)
}
if !ok {
return nil, fmt.Errorf("configuration file for Image Watcher not found: %w", err)
}

includes := make([]string, 0)
excludes := make([]string, 0)
for _, target := range w.config.ImageWatcher.Repos {
if target.RepoID != id {
continue
}
includes = append(includes, target.Includes...)
excludes = append(excludes, target.Excludes...)
updates := make([]config.ImageWatcherTarget, 0)
for _, target := range cfg.Targets {
if provider.Name() != target.Provider {
continue
}
cfg, ok, err := config.LoadImageWatcher(repo.GetPath(), includes, excludes)
outdated, err := checkOutdated(ctx, target, repo, provider)
if err != nil {
w.logger.Error("failed to load configuration file for Image Watcher", zap.Error(err))
continue
return nil, fmt.Errorf("failed to check the image is outdated: %w", err)
}
if !ok {
w.logger.Error("configuration file for Image Watcher not found", zap.Error(err))
continue
if outdated {
updates = append(updates, target)
}
t := filterTargets(provider.Name(), cfg.Targets)
targets = append(targets, t...)
}
return
return updates, nil
}

// filterTargets gives back the targets corresponding to the given provider.
func filterTargets(provider string, targets []config.ImageWatcherTarget) (filtered []config.ImageWatcherTarget) {
for _, t := range targets {
if t.Provider == provider {
filtered = append(filtered, t)
}
// checkOutdated checks if the image defined in the given target is identical to the one in image provider.
func checkOutdated(ctx context.Context, target config.ImageWatcherTarget, repo git.Repo, provider imageprovider.Provider) (bool, error) {
i, err := provider.ParseImage(target.Image)
if err != nil {
return false, err
}
return
}

// determineUpdates gives back target images to be updated.
func determineUpdates(ctx context.Context, targets []config.ImageWatcherTarget, provider imageprovider.Provider) (outdated []config.ImageWatcherTarget, err error) {
for _, target := range targets {
i, err := provider.ParseImage(target.Image)
if err != nil {
return nil, err
}
// TODO: Control not to reach the rate limit
_, err = provider.GetLatestImage(ctx, i)
if err != nil {
return nil, err
}
// TODO: Compares between image repos in the image registry and image repos in git
// And then gives back image repos to be updated.
// TODO: Control not to reach the rate limit
imageRef, err := provider.GetLatestImage(ctx, i)
if err != nil {
return false, err
}

return
yml, err := ioutil.ReadFile(filepath.Join(repo.GetPath(), target.FilePath))
if err != nil {
return false, err
}
value, err := yamlprocessor.GetValue(yml, target.Field)
if err != nil {
return false, err
}
v, ok := value.(string)
if !ok {
return false, fmt.Errorf("unknown value is defined at %s in %s", target.FilePath, target.Field)
}
return imageRef.String() != v, nil
}

func update(targets []config.ImageWatcherTarget) error {
Expand Down