Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/app/piped/imagewatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//pkg/app/piped/imageprovider:go_default_library",
"//pkg/config:go_default_library",
"//pkg/git:go_default_library",
"//pkg/yamlprocessor:go_default_library",
"@org_uber_go_zap//:go_default_library",
],
)
135 changes: 74 additions & 61 deletions pkg/app/piped/imagewatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package imagewatcher

import (
"context"
"fmt"
"io/ioutil"
"path/filepath"
"sync"
"time"

Expand All @@ -27,6 +30,7 @@ import (
"github.com/pipe-cd/pipe/pkg/app/piped/imageprovider"
"github.com/pipe-cd/pipe/pkg/config"
"github.com/pipe-cd/pipe/pkg/git"
"github.com/pipe-cd/pipe/pkg/yamlprocessor"
)

type Watcher interface {
Expand Down Expand Up @@ -96,90 +100,99 @@ func (w *watcher) run(ctx context.Context, provider imageprovider.Provider, inte
case <-ctx.Done():
return
case <-ticker.C:
targets := w.collectTargets(ctx, provider)
outdated, err := determineUpdates(ctx, targets, provider)
if err != nil {
w.logger.Error("failed to determine which one should be updated", zap.Error(err))
continue
updates := make([]config.ImageWatcherTarget, 0)
for id, repo := range w.gitRepos {
u, err := w.determineUpdates(ctx, id, repo, provider)
if err != nil {
w.logger.Error("failed to determine images to be updated",
zap.String("repo-id", id),
zap.Error(err),
)
continue
}
updates = append(updates, u...)
}
if len(outdated) == 0 {
if len(updates) == 0 {
w.logger.Info("no image to be updated")
continue
}
if err := update(outdated); err != nil {
if err := update(updates); err != nil {
w.logger.Error("failed to update image", zap.Error(err))
continue
}
}
}
}

// collectTarget collects target images for each git repository.
func (w *watcher) collectTargets(ctx context.Context, provider imageprovider.Provider) (targets []config.ImageWatcherTarget) {
for id, repo := range w.gitRepos {
branch := repo.GetClonedBranch()
w.mu.Lock()
err := repo.Pull(ctx, branch)
w.mu.Unlock()
if err != nil {
w.logger.Error("failed to update repository branch",
zap.String("repo-id", id),
zap.Error(err),
)
// determineUpdates gives back target images to be updated for a given repo.
func (w *watcher) determineUpdates(ctx context.Context, repoID string, repo git.Repo, provider imageprovider.Provider) ([]config.ImageWatcherTarget, error) {
branch := repo.GetClonedBranch()
w.mu.Lock()
err := repo.Pull(ctx, branch)
w.mu.Unlock()
if err != nil {
return nil, fmt.Errorf("failed to update repository branch: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "failed to get update from branch" or "failed to pull branch"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, you're right, thank you!

}

// Load Image Watcher Config
includes := make([]string, 0)
excludes := make([]string, 0)
for _, target := range w.config.ImageWatcher.Repos {
if target.RepoID != repoID {
continue
}
includes = append(includes, target.Includes...)
excludes = append(excludes, target.Excludes...)
}
cfg, ok, err := config.LoadImageWatcher(repo.GetPath(), includes, excludes)
if err != nil {
return nil, fmt.Errorf("failed to load configuration file for Image Watcher: %w", err)
}
if !ok {
return nil, fmt.Errorf("configuration file for Image Watcher not found: %w", err)
}

includes := make([]string, 0)
excludes := make([]string, 0)
for _, target := range w.config.ImageWatcher.Repos {
if target.RepoID != id {
continue
}
includes = append(includes, target.Includes...)
excludes = append(excludes, target.Excludes...)
}
cfg, ok, err := config.LoadImageWatcher(repo.GetPath(), includes, excludes)
updates := make([]config.ImageWatcherTarget, 0)
for _, target := range cfg.Targets {
outdated, err := checkOutdated(ctx, target, repo, provider)
if err != nil {
w.logger.Error("failed to load configuration file for Image Watcher", zap.Error(err))
continue
return nil, fmt.Errorf("failed to check the image is outdated: %w", err)
}
if !ok {
w.logger.Error("configuration file for Image Watcher not found", zap.Error(err))
continue
if outdated {
updates = append(updates, target)
}
t := filterTargets(provider.Name(), cfg.Targets)
targets = append(targets, t...)
}
return
return updates, nil
}

// filterTargets gives back the targets corresponding to the given provider.
func filterTargets(provider string, targets []config.ImageWatcherTarget) (filtered []config.ImageWatcherTarget) {
for _, t := range targets {
if t.Provider == provider {
filtered = append(filtered, t)
}
// checkOutdated checks if the image defined in the given target is identical to the one in image provider.
func checkOutdated(ctx context.Context, target config.ImageWatcherTarget, repo git.Repo, provider imageprovider.Provider) (bool, error) {
if provider.Name() != target.Provider {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you add notes to clarify why do we treat this case as a not outdated image 🙏

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you feel, it's unclear. gonna update them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated them.
The Run() spawns runners for each image provider, so the checkOutdated function is called in the context for image provider. We should ignore the targets for other providers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙆

return false, nil
}
return
}

// determineUpdates gives back target images to be updated.
func determineUpdates(ctx context.Context, targets []config.ImageWatcherTarget, provider imageprovider.Provider) (outdated []config.ImageWatcherTarget, err error) {
for _, target := range targets {
i, err := provider.ParseImage(target.Image)
if err != nil {
return nil, err
}
// TODO: Control not to reach the rate limit
_, err = provider.GetLatestImage(ctx, i)
if err != nil {
return nil, err
}
// TODO: Compares between image repos in the image registry and image repos in git
// And then gives back image repos to be updated.
i, err := provider.ParseImage(target.Image)
if err != nil {
return false, err
}
// TODO: Control not to reach the rate limit
imageRef, err := provider.GetLatestImage(ctx, i)
if err != nil {
return false, err
}

return
yml, err := ioutil.ReadFile(filepath.Join(repo.GetPath(), target.FilePath))
if err != nil {
return false, err
}
value, err := yamlprocessor.GetValue(yml, target.Field)
if err != nil {
return false, err
}
v, ok := value.(string)
if !ok {
return false, fmt.Errorf("unknown value is defined at %s in %s", target.FilePath, target.Field)
}
return imageRef.String() != v, nil
}

func update(targets []config.ImageWatcherTarget) error {
Expand Down